diff --git a/Dockerfile b/Dockerfile index 818a106a9f..51e35f0a67 100644 --- a/Dockerfile +++ b/Dockerfile @@ -42,3 +42,16 @@ COPY integration-tests/conf/.htpasswd \ COPY . /pulsar/pulsar-client-go ENV PULSAR_EXTRA_OPTS="-Dpulsar.auth.basic.conf=/pulsar/conf/.htpasswd" + +WORKDIR /pulsar/pulsar-client-go + +ENV GOPATH=/pulsar/go +ENV GOCACHE=/tmp/go-cache + +# Install dependencies +RUN go mod download + +# Basic compilation +RUN go build ./pulsar +RUN go build ./pulsaradmin +RUN go build -o bin/pulsar-perf ./perf diff --git a/Makefile b/Makefile index 62e44166cc..df4d539d49 100644 --- a/Makefile +++ b/Makefile @@ -44,9 +44,17 @@ container: --build-arg PULSAR_IMAGE="${PULSAR_IMAGE}" \ --build-arg ARCH="${CONTAINER_ARCH}" . -test: container +test: container test_standalone test_clustered + +test_standalone: container docker run -i ${IMAGE_NAME} bash -c "cd /pulsar/pulsar-client-go && ./scripts/run-ci.sh" +test_clustered: container + PULSAR_VERSION=${PULSAR_VERSION} docker compose -f integration-tests/clustered/docker-compose.yml up -d || true + until curl http://localhost:8080/metrics > /dev/null 2>&1; do sleep 1; done + docker run --network "clustered_pulsar" -i ${IMAGE_NAME} bash -c "cd /pulsar/pulsar-client-go && ./scripts/run-ci-clustered.sh" + PULSAR_VERSION=${PULSAR_VERSION} docker compose -f integration-tests/clustered/docker-compose.yml down + clean: docker rmi --force $(IMAGE_NAME) || true rm bin/* diff --git a/integration-tests/clustered/docker-compose.yml b/integration-tests/clustered/docker-compose.yml new file mode 100644 index 0000000000..cce8edddad --- /dev/null +++ b/integration-tests/clustered/docker-compose.yml @@ -0,0 +1,167 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +version: '3' +networks: + pulsar: + driver: bridge +services: + # Start ZooKeeper + zookeeper: + image: apachepulsar/pulsar:${PULSAR_VERSION} + container_name: zookeeper + restart: on-failure + networks: + - pulsar + environment: + - metadataStoreUrl=zk:zookeeper:2181 + - PULSAR_MEM=-Xms128m -Xmx128m -XX:MaxDirectMemorySize=56m + command: > + bash -c "bin/apply-config-from-env.py conf/zookeeper.conf && \ + bin/generate-zookeeper-config.sh conf/zookeeper.conf && \ + exec bin/pulsar zookeeper" + healthcheck: + test: ["CMD", "bin/pulsar-zookeeper-ruok.sh"] + interval: 10s + timeout: 5s + retries: 30 + + # Initialize cluster metadata + pulsar-init: + container_name: pulsar-init + hostname: pulsar-init + image: apachepulsar/pulsar:${PULSAR_VERSION} + networks: + - pulsar + environment: + - PULSAR_MEM=-Xms128m -Xmx128m -XX:MaxDirectMemorySize=56m + command: > + bin/pulsar initialize-cluster-metadata \ + --cluster cluster-a \ + --zookeeper zookeeper:2181 \ + --configuration-store zookeeper:2181 \ + --web-service-url http://broker-1:8080 \ + --broker-service-url pulsar://broker-1:6650 + depends_on: + zookeeper: + condition: service_healthy + + # Start bookie + bookie: + image: apachepulsar/pulsar:${PULSAR_VERSION} + container_name: bookie + restart: on-failure + networks: + - pulsar + environment: + - clusterName=cluster-a + - zkServers=zookeeper:2181 + - metadataServiceUri=metadata-store:zk:zookeeper:2181 + - advertisedAddress=bookie + - BOOKIE_MEM=-Xms128m -Xmx128m -XX:MaxDirectMemorySize=56m + depends_on: + zookeeper: + condition: service_healthy + pulsar-init: + condition: service_completed_successfully + command: bash -c "bin/apply-config-from-env.py conf/bookkeeper.conf && exec bin/pulsar bookie" + + proxy: + image: apachepulsar/pulsar:${PULSAR_VERSION} + container_name: proxy + hostname: proxy + restart: on-failure + networks: + - pulsar + environment: + - metadataStoreUrl=zk:zookeeper:2181 + - zookeeperServers=zookeeper:2181 + - clusterName=cluster-a + - PULSAR_MEM=-Xms128m -Xmx128m -XX:MaxDirectMemorySize=56m + ports: + - "8080:8080" + - "6650:6650" + depends_on: + broker-1: + condition: service_healthy + broker-2: + condition: service_healthy + command: bash -c "bin/apply-config-from-env.py conf/proxy.conf && exec bin/pulsar proxy" + + # Start broker 1 + broker-1: + image: apachepulsar/pulsar:${PULSAR_VERSION} + container_name: broker-1 + hostname: broker-1 + restart: on-failure + networks: + - pulsar + environment: + - metadataStoreUrl=zk:zookeeper:2181 + - zookeeperServers=zookeeper:2181 + - clusterName=cluster-a + - managedLedgerDefaultEnsembleSize=1 + - managedLedgerDefaultWriteQuorum=1 + - managedLedgerDefaultAckQuorum=1 + - advertisedAddress=broker-1 + - internalListenerName=internal + - advertisedListeners=internal:pulsar://broker-1:6650 + - PULSAR_MEM=-Xms256m -Xmx256m -XX:MaxDirectMemorySize=56m + - PULSAR_PREFIX_defaultNumberOfNamespaceBundles=1 + depends_on: + zookeeper: + condition: service_healthy + bookie: + condition: service_started + command: bash -c "bin/apply-config-from-env.py conf/broker.conf && exec bin/pulsar broker" + healthcheck: + test: ["CMD-SHELL", "curl -f http://localhost:8080/metrics || exit 1"] + interval: 10s + timeout: 5s + retries: 30 + + # Start broker 2 + broker-2: + image: apachepulsar/pulsar:${PULSAR_VERSION} + container_name: broker-2 + hostname: broker-2 + restart: on-failure + networks: + - pulsar + environment: + - metadataStoreUrl=zk:zookeeper:2181 + - zookeeperServers=zookeeper:2181 + - clusterName=cluster-a + - managedLedgerDefaultEnsembleSize=1 + - managedLedgerDefaultWriteQuorum=1 + - managedLedgerDefaultAckQuorum=1 + - advertisedAddress=broker-2 + - internalListenerName=internal + - advertisedListeners=internal:pulsar://broker-2:6650 + - PULSAR_MEM=-Xms256m -Xmx256m -XX:MaxDirectMemorySize=56m + - PULSAR_PREFIX_defaultNumberOfNamespaceBundles=1 + depends_on: + zookeeper: + condition: service_healthy + bookie: + condition: service_started + command: bash -c "bin/apply-config-from-env.py conf/broker.conf && exec bin/pulsar broker" + healthcheck: + test: ["CMD-SHELL", "curl -f http://localhost:8080/metrics || exit 1"] + interval: 10s + timeout: 5s + retries: 30 diff --git a/pulsar/client_impl_clustered_test.go b/pulsar/client_impl_clustered_test.go new file mode 100644 index 0000000000..e572c7741c --- /dev/null +++ b/pulsar/client_impl_clustered_test.go @@ -0,0 +1,89 @@ +//go:build clustered + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pulsar + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/suite" +) + +type clientClusteredTestSuite struct { + suite.Suite +} + +func TestClientClusteredTestSuite(t *testing.T) { + suite.Run(t, new(clientClusteredTestSuite)) +} + +func (suite *clientClusteredTestSuite) TestRetryWithMultipleHosts() { + req := suite.Require() + // Multi hosts included an unreached port and the actual port for verify retry logic + client, err := NewClient(ClientOptions{ + URL: "pulsar://broker-1:6600,broker-1:6650", + }) + req.NoError(err) + defer client.Close() + + topic := "persistent://public/default/retry-multiple-hosts-" + generateRandomName() + + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + }) + req.NoError(err) + defer producer.Close() + + ctx := context.Background() + var msgIDs [][]byte + + for i := 0; i < 10; i++ { + if msgID, err := producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + }); err != nil { + req.NoError(err) + } else { + req.NotNil(msgID) + msgIDs = append(msgIDs, msgID.Serialize()) + } + } + + req.Equal(10, len(msgIDs)) + + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "retry-multi-hosts-sub", + Type: Shared, + SubscriptionInitialPosition: SubscriptionPositionEarliest, + }) + req.NoError(err) + defer consumer.Close() + + for i := 0; i < 10; i++ { + msg, err := consumer.Receive(context.Background()) + req.NoError(err) + req.Contains(msgIDs, msg.ID().Serialize()) + consumer.Ack(msg) + } + + err = consumer.Unsubscribe() + req.NoError(err) +} diff --git a/pulsar/client_impl_test.go b/pulsar/client_impl_test.go index 78dc1cae68..5b6b8f1ff3 100644 --- a/pulsar/client_impl_test.go +++ b/pulsar/client_impl_test.go @@ -570,61 +570,6 @@ func anonymousNamespacePolicy() map[string]interface{} { } } -func TestRetryWithMultipleHosts(t *testing.T) { - // Multi hosts included an unreached port and the actual port for verify retry logic - client, err := NewClient(ClientOptions{ - URL: "pulsar://localhost:6600,localhost:6650", - }) - - assert.Nil(t, err) - defer client.Close() - - topic := "persistent://public/default/retry-multiple-hosts-" + generateRandomName() - - producer, err := client.CreateProducer(ProducerOptions{ - Topic: topic, - }) - - assert.Nil(t, err) - defer producer.Close() - - ctx := context.Background() - var msgIDs [][]byte - - for i := 0; i < 10; i++ { - if msgID, err := producer.Send(ctx, &ProducerMessage{ - Payload: []byte(fmt.Sprintf("hello-%d", i)), - }); err != nil { - assert.Nil(t, err) - } else { - assert.NotNil(t, msgID) - msgIDs = append(msgIDs, msgID.Serialize()) - } - } - - assert.Equal(t, 10, len(msgIDs)) - - consumer, err := client.Subscribe(ConsumerOptions{ - Topic: topic, - SubscriptionName: "retry-multi-hosts-sub", - Type: Shared, - SubscriptionInitialPosition: SubscriptionPositionEarliest, - }) - assert.Nil(t, err) - defer consumer.Close() - - for i := 0; i < 10; i++ { - msg, err := consumer.Receive(context.Background()) - assert.Nil(t, err) - assert.Contains(t, msgIDs, msg.ID().Serialize()) - consumer.Ack(msg) - } - - err = consumer.Unsubscribe() - assert.Nil(t, err) - -} - func TestHTTPSConnectionCAError(t *testing.T) { client, err := NewClient(ClientOptions{ URL: webServiceURLTLS, diff --git a/pulsar/internal/connection_pool.go b/pulsar/internal/connection_pool.go index 6ff7991992..3d718b75d9 100644 --- a/pulsar/internal/connection_pool.go +++ b/pulsar/internal/connection_pool.go @@ -79,7 +79,8 @@ func NewConnectionPool( } func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.URL) (Connection, error) { - key := p.getMapKey(logicalAddr) + p.log.WithField("logicalAddr", logicalAddr).WithField("physicalAddr", physicalAddr).Debug("Getting pooled connection") + key := p.getMapKey(logicalAddr, physicalAddr) p.Lock() conn, ok := p.connections[key] @@ -133,13 +134,13 @@ func (p *connectionPool) Close() { p.Unlock() } -func (p *connectionPool) getMapKey(addr *url.URL) string { +func (p *connectionPool) getMapKey(logicalAddr *url.URL, physicalAddr *url.URL) string { cnt := atomic.AddInt32(&p.roundRobinCnt, 1) if cnt < 0 { cnt = -cnt } idx := cnt % p.maxConnectionsPerHost - return fmt.Sprint(addr.Host, '-', idx) + return fmt.Sprint(logicalAddr.Host, "-", physicalAddr.Host, "-", idx) } func (p *connectionPool) checkAndCleanIdleConnections(maxIdleTime time.Duration) { diff --git a/pulsar/reader_clustered_test.go b/pulsar/reader_clustered_test.go new file mode 100644 index 0000000000..db2dc6f130 --- /dev/null +++ b/pulsar/reader_clustered_test.go @@ -0,0 +1,88 @@ +//go:build clustered + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pulsar + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/suite" +) + +type ReaderClusteredTestSuite struct { + suite.Suite +} + +func TestReaderClusteredTestSuite(t *testing.T) { + suite.Run(t, new(ReaderClusteredTestSuite)) +} + +func (suite *ReaderClusteredTestSuite) TestReaderWithMultipleHosts() { + req := suite.Require() + + // Multi hosts included an unreached port and the actual port for verify retry logic + client, err := NewClient(ClientOptions{ + URL: "pulsar://broker-1:6600,broker-1:6650", + }) + req.NoError(err) + defer client.Close() + + topic := newTopicName() + ctx := context.Background() + + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + DisableBatching: true, + }) + req.NoError(err) + defer producer.Close() + + // send 10 messages + for i := 0; i < 10; i++ { + msgID, err := producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + }) + req.NoError(err) + req.NotNil(msgID) + } + + // create reader on 5th message (not included) + reader, err := client.CreateReader(ReaderOptions{ + Topic: topic, + StartMessageID: EarliestMessageID(), + }) + req.NoError(err) + defer reader.Close() + + i := 0 + for reader.HasNext() { + msg, err := reader.Next(context.Background()) + req.NoError(err) + + expectMsg := fmt.Sprintf("hello-%d", i) + req.Equal([]byte(expectMsg), msg.Payload()) + + i++ + } + + req.Equal(10, i) +} diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index 78c222dac7..93787d106c 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -724,58 +724,6 @@ func TestReaderLatestInclusiveHasNext(t *testing.T) { assert.False(t, reader.HasNext()) } -func TestReaderWithMultiHosts(t *testing.T) { - // Multi hosts included an unreached port and the actual port for verify retry logic - client, err := NewClient(ClientOptions{ - URL: "pulsar://localhost:6600,localhost:6650", - }) - - assert.Nil(t, err) - defer client.Close() - - topic := newTopicName() - ctx := context.Background() - - // create producer - producer, err := client.CreateProducer(ProducerOptions{ - Topic: topic, - DisableBatching: true, - }) - assert.Nil(t, err) - defer producer.Close() - - // send 10 messages - for i := 0; i < 10; i++ { - msgID, err := producer.Send(ctx, &ProducerMessage{ - Payload: []byte(fmt.Sprintf("hello-%d", i)), - }) - assert.NoError(t, err) - assert.NotNil(t, msgID) - } - - // create reader on 5th message (not included) - reader, err := client.CreateReader(ReaderOptions{ - Topic: topic, - StartMessageID: EarliestMessageID(), - }) - - assert.Nil(t, err) - defer reader.Close() - - i := 0 - for reader.HasNext() { - msg, err := reader.Next(context.Background()) - assert.NoError(t, err) - - expectMsg := fmt.Sprintf("hello-%d", i) - assert.Equal(t, []byte(expectMsg), msg.Payload()) - - i++ - } - - assert.Equal(t, 10, i) -} - func TestProducerReaderRSAEncryption(t *testing.T) { client, err := NewClient(ClientOptions{ URL: lookupURL, diff --git a/scripts/run-ci-clustered.sh b/scripts/run-ci-clustered.sh new file mode 100755 index 0000000000..5e4f36fa4c --- /dev/null +++ b/scripts/run-ci-clustered.sh @@ -0,0 +1,23 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +set -e -x + +go test -race -coverprofile=/tmp/coverage -timeout=5m -tags clustered -v -run 'Test.*ClusteredTestSuite' -v ./pulsar +go tool cover -html=/tmp/coverage -o coverage.html + diff --git a/scripts/run-ci.sh b/scripts/run-ci.sh index 83246a39f0..e7a6d79ece 100755 --- a/scripts/run-ci.sh +++ b/scripts/run-ci.sh @@ -19,16 +19,6 @@ set -e -x -export GOPATH=/pulsar/go -export GOCACHE=/tmp/go-cache - -# Install dependencies -go mod download - -# Basic compilation -go build ./pulsar -go build -o bin/pulsar-perf ./perf - scripts/pulsar-test-service-start.sh go test -race -coverprofile=/tmp/coverage -timeout=20m -v ./...