From 3dbc6c7ef20dd4eec6183f80be098dec2c45f6c4 Mon Sep 17 00:00:00 2001 From: Dragos Misca Date: Thu, 21 Mar 2024 17:01:21 -0700 Subject: [PATCH 01/11] Include physical address in connection pool key --- pulsar/internal/connection_pool.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pulsar/internal/connection_pool.go b/pulsar/internal/connection_pool.go index 6ff7991992..db6bb8a217 100644 --- a/pulsar/internal/connection_pool.go +++ b/pulsar/internal/connection_pool.go @@ -79,7 +79,8 @@ func NewConnectionPool( } func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.URL) (Connection, error) { - key := p.getMapKey(logicalAddr) + p.log.WithField("logicalAddr", logicalAddr).WithField("physicalAddr", physicalAddr).Debug("Getting pooled connection") + key := p.getMapKey(logicalAddr, physicalAddr) p.Lock() conn, ok := p.connections[key] @@ -133,13 +134,13 @@ func (p *connectionPool) Close() { p.Unlock() } -func (p *connectionPool) getMapKey(addr *url.URL) string { +func (p *connectionPool) getMapKey(logicalAddr *url.URL, physicalAddr *url.URL) string { cnt := atomic.AddInt32(&p.roundRobinCnt, 1) if cnt < 0 { cnt = -cnt } idx := cnt % p.maxConnectionsPerHost - return fmt.Sprint(addr.Host, '-', idx) + return fmt.Sprint(logicalAddr.Host, '-', physicalAddr.Host, '-', idx) } func (p *connectionPool) checkAndCleanIdleConnections(maxIdleTime time.Duration) { From 06a4596841d225eca76187f484909a3f671e5c9b Mon Sep 17 00:00:00 2001 From: Dragos Misca Date: Wed, 20 Mar 2024 14:56:42 -0700 Subject: [PATCH 02/11] Support specifying container platform --- Dockerfile | 3 ++- Makefile | 7 +++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/Dockerfile b/Dockerfile index 62d22bf06d..818a106a9f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -21,8 +21,9 @@ ARG PULSAR_IMAGE=apachepulsar/pulsar:latest FROM $PULSAR_IMAGE USER root ARG GO_VERSION=1.18 +ARG ARCH=amd64 -RUN curl -L https://dl.google.com/go/go${GO_VERSION}.linux-amd64.tar.gz -o golang.tar.gz && \ +RUN curl -L https://dl.google.com/go/go${GO_VERSION}.linux-${ARCH}.tar.gz -o golang.tar.gz && \ mkdir -p /pulsar/go && tar -C /pulsar -xzf golang.tar.gz ENV PATH /pulsar/go/bin:$PATH diff --git a/Makefile b/Makefile index 527d53acf2..19ef091934 100644 --- a/Makefile +++ b/Makefile @@ -21,6 +21,7 @@ IMAGE_NAME = pulsar-client-go-test:latest PULSAR_VERSION ?= 3.2.0 PULSAR_IMAGE = apachepulsar/pulsar:$(PULSAR_VERSION) GO_VERSION ?= 1.18 +CONTAINER_ARCH ?= $(shell uname -m) # Golang standard bin directory. GOPATH ?= $(shell go env GOPATH) @@ -38,8 +39,10 @@ bin/golangci-lint: GOBIN=$(shell pwd)/bin go install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.51.2 container: - docker build -t ${IMAGE_NAME} --build-arg GO_VERSION="${GO_VERSION}" \ - --build-arg PULSAR_IMAGE="${PULSAR_IMAGE}" . + docker build -t ${IMAGE_NAME} \ + --build-arg GO_VERSION="${GO_VERSION}" \ + --build-arg PULSAR_IMAGE="${PULSAR_IMAGE}" \ + --build-arg ARCH="${CONTAINER_ARCH}" . test: container docker run -i ${IMAGE_NAME} bash -c "cd /pulsar/pulsar-client-go && ./scripts/run-ci.sh" From 9420cd400bc1d390038126307913e04eb0d91858 Mon Sep 17 00:00:00 2001 From: Dragos Misca Date: Fri, 12 Apr 2024 15:15:50 -0700 Subject: [PATCH 03/11] Fix arch name mismatch on x86_64 --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 19ef091934..62e44166cc 100644 --- a/Makefile +++ b/Makefile @@ -21,7 +21,7 @@ IMAGE_NAME = pulsar-client-go-test:latest PULSAR_VERSION ?= 3.2.0 PULSAR_IMAGE = apachepulsar/pulsar:$(PULSAR_VERSION) GO_VERSION ?= 1.18 -CONTAINER_ARCH ?= $(shell uname -m) +CONTAINER_ARCH ?= $(shell uname -m | sed s/x86_64/amd64/) # Golang standard bin directory. GOPATH ?= $(shell go env GOPATH) From 0188fc88c3c88df3ac3fffad00e8ff92f510ed22 Mon Sep 17 00:00:00 2001 From: Dragos Misca Date: Mon, 8 Apr 2024 11:12:54 -0700 Subject: [PATCH 04/11] Add clustered test suites --- Makefile | 6 + .../clustered/docker-compose.yml | 150 ++++++++++++++++++ pulsar/client_impl_clustered_test.go | 89 +++++++++++ pulsar/client_impl_test.go | 55 ------- pulsar/reader_clustered_test.go | 87 ++++++++++ pulsar/reader_test.go | 52 ------ scripts/run-ci-clustered.sh | 23 +++ 7 files changed, 355 insertions(+), 107 deletions(-) create mode 100644 integration-tests/clustered/docker-compose.yml create mode 100644 pulsar/client_impl_clustered_test.go create mode 100644 pulsar/reader_clustered_test.go create mode 100755 scripts/run-ci-clustered.sh diff --git a/Makefile b/Makefile index 62e44166cc..36f2fb133c 100644 --- a/Makefile +++ b/Makefile @@ -44,6 +44,12 @@ container: --build-arg PULSAR_IMAGE="${PULSAR_IMAGE}" \ --build-arg ARCH="${CONTAINER_ARCH}" . +test_clustered: container + PULSAR_VERSION=${PULSAR_VERSION} docker compose -f integration-tests/clustered/docker-compose.yml up -d || true + until curl http://localhost:8080/metrics > /dev/null 2>&1; do sleep 1; done + docker run --network "clustered_pulsar" -i ${IMAGE_NAME} bash -c "cd /pulsar/pulsar-client-go && ./scripts/run-ci-clustered.sh" + PULSAR_VERSION=${PULSAR_VERSION} docker compose -f integration-tests/clustered/docker-compose.yml down + test: container docker run -i ${IMAGE_NAME} bash -c "cd /pulsar/pulsar-client-go && ./scripts/run-ci.sh" diff --git a/integration-tests/clustered/docker-compose.yml b/integration-tests/clustered/docker-compose.yml new file mode 100644 index 0000000000..9984da50e1 --- /dev/null +++ b/integration-tests/clustered/docker-compose.yml @@ -0,0 +1,150 @@ +version: '3' +networks: + pulsar: + driver: bridge +services: + # Start ZooKeeper + zookeeper: + image: apachepulsar/pulsar:${PULSAR_VERSION} + container_name: zookeeper + restart: on-failure + networks: + - pulsar + environment: + - metadataStoreUrl=zk:zookeeper:2181 + - PULSAR_MEM=-Xms128m -Xmx128m -XX:MaxDirectMemorySize=56m + command: > + bash -c "bin/apply-config-from-env.py conf/zookeeper.conf && \ + bin/generate-zookeeper-config.sh conf/zookeeper.conf && \ + exec bin/pulsar zookeeper" + healthcheck: + test: ["CMD", "bin/pulsar-zookeeper-ruok.sh"] + interval: 10s + timeout: 5s + retries: 30 + + # Initialize cluster metadata + pulsar-init: + container_name: pulsar-init + hostname: pulsar-init + image: apachepulsar/pulsar:${PULSAR_VERSION} + networks: + - pulsar + environment: + - PULSAR_MEM=-Xms128m -Xmx128m -XX:MaxDirectMemorySize=56m + command: > + bin/pulsar initialize-cluster-metadata \ + --cluster cluster-a \ + --zookeeper zookeeper:2181 \ + --configuration-store zookeeper:2181 \ + --web-service-url http://broker-1:8080 \ + --broker-service-url pulsar://broker-1:6650 + depends_on: + zookeeper: + condition: service_healthy + + # Start bookie + bookie: + image: apachepulsar/pulsar:${PULSAR_VERSION} + container_name: bookie + restart: on-failure + networks: + - pulsar + environment: + - clusterName=cluster-a + - zkServers=zookeeper:2181 + - metadataServiceUri=metadata-store:zk:zookeeper:2181 + - advertisedAddress=bookie + - BOOKIE_MEM=-Xms128m -Xmx128m -XX:MaxDirectMemorySize=56m + depends_on: + zookeeper: + condition: service_healthy + pulsar-init: + condition: service_completed_successfully + command: bash -c "bin/apply-config-from-env.py conf/bookkeeper.conf && exec bin/pulsar bookie" + + proxy: + image: apachepulsar/pulsar:${PULSAR_VERSION} + container_name: proxy + hostname: proxy + restart: on-failure + networks: + - pulsar + environment: + - metadataStoreUrl=zk:zookeeper:2181 + - zookeeperServers=zookeeper:2181 + - clusterName=cluster-a + - PULSAR_MEM=-Xms128m -Xmx128m -XX:MaxDirectMemorySize=56m + ports: + - "8080:8080" + - "6650:6650" + depends_on: + broker-1: + condition: service_healthy + broker-2: + condition: service_healthy + command: bash -c "bin/apply-config-from-env.py conf/proxy.conf && exec bin/pulsar proxy" + + # Start broker 1 + broker-1: + image: apachepulsar/pulsar:${PULSAR_VERSION} + container_name: broker-1 + hostname: broker-1 + restart: on-failure + networks: + - pulsar + environment: + - metadataStoreUrl=zk:zookeeper:2181 + - zookeeperServers=zookeeper:2181 + - clusterName=cluster-a + - managedLedgerDefaultEnsembleSize=1 + - managedLedgerDefaultWriteQuorum=1 + - managedLedgerDefaultAckQuorum=1 + - advertisedAddress=broker-1 + - internalListenerName=internal + - advertisedListeners=internal:pulsar://broker-1:6650 + - PULSAR_MEM=-Xms256m -Xmx256m -XX:MaxDirectMemorySize=56m + - PULSAR_PREFIX_defaultNumberOfNamespaceBundles=1 + depends_on: + zookeeper: + condition: service_healthy + bookie: + condition: service_started + command: bash -c "bin/apply-config-from-env.py conf/broker.conf && exec bin/pulsar broker" + healthcheck: + test: ["CMD-SHELL", "curl -f http://localhost:8080/metrics || exit 1"] + interval: 10s + timeout: 5s + retries: 30 + + # Start broker 2 + broker-2: + image: apachepulsar/pulsar:${PULSAR_VERSION} + container_name: broker-2 + hostname: broker-2 + restart: on-failure + networks: + - pulsar + environment: + - metadataStoreUrl=zk:zookeeper:2181 + - zookeeperServers=zookeeper:2181 + - clusterName=cluster-a + - managedLedgerDefaultEnsembleSize=1 + - managedLedgerDefaultWriteQuorum=1 + - managedLedgerDefaultAckQuorum=1 + - advertisedAddress=broker-2 + - internalListenerName=internal + - advertisedListeners=internal:pulsar://broker-2:6650 + - PULSAR_MEM=-Xms256m -Xmx256m -XX:MaxDirectMemorySize=56m + - PULSAR_PREFIX_defaultNumberOfNamespaceBundles=1 + depends_on: + zookeeper: + condition: service_healthy + bookie: + condition: service_started + command: bash -c "bin/apply-config-from-env.py conf/broker.conf && exec bin/pulsar broker" + healthcheck: + test: ["CMD-SHELL", "curl -f http://localhost:8080/metrics || exit 1"] + interval: 10s + timeout: 5s + retries: 30 diff --git a/pulsar/client_impl_clustered_test.go b/pulsar/client_impl_clustered_test.go new file mode 100644 index 0000000000..2da397245e --- /dev/null +++ b/pulsar/client_impl_clustered_test.go @@ -0,0 +1,89 @@ +//go:build clustered + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pulsar + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/suite" +) + +type clientClusteredTestSuite struct { + suite.Suite +} + +func ClientClusteredTestSuite(t *testing.T) { + suite.Run(t, new(clientClusteredTestSuite)) +} + +func (suite *clientClusteredTestSuite) TestRetryWithMultipleHosts() { + req := suite.req + // Multi hosts included an unreached port and the actual port for verify retry logic + client, err := NewClient(ClientOptions{ + URL: "pulsar://broker-1:6600,broker-1:6650", + }) + req.NoError(err) + defer client.Close() + + topic := "persistent://public/default/retry-multiple-hosts-" + generateRandomName() + + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + }) + req.NoError(err) + defer producer.Close() + + ctx := context.Background() + var msgIDs [][]byte + + for i := 0; i < 10; i++ { + if msgID, err := producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + }); err != nil { + req.NoError(err) + } else { + req.NotNil(msgID) + msgIDs = append(msgIDs, msgID.Serialize()) + } + } + + req.Equal(10, len(msgIDs)) + + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "retry-multi-hosts-sub", + Type: Shared, + SubscriptionInitialPosition: SubscriptionPositionEarliest, + }) + req.NoError(err) + defer consumer.Close() + + for i := 0; i < 10; i++ { + msg, err := consumer.Receive(context.Background()) + req.NoError(err) + req.Contains(msgIDs, msg.ID().Serialize()) + consumer.Ack(msg) + } + + err = consumer.Unsubscribe() + req.NoError(err) +} diff --git a/pulsar/client_impl_test.go b/pulsar/client_impl_test.go index 78dc1cae68..5b6b8f1ff3 100644 --- a/pulsar/client_impl_test.go +++ b/pulsar/client_impl_test.go @@ -570,61 +570,6 @@ func anonymousNamespacePolicy() map[string]interface{} { } } -func TestRetryWithMultipleHosts(t *testing.T) { - // Multi hosts included an unreached port and the actual port for verify retry logic - client, err := NewClient(ClientOptions{ - URL: "pulsar://localhost:6600,localhost:6650", - }) - - assert.Nil(t, err) - defer client.Close() - - topic := "persistent://public/default/retry-multiple-hosts-" + generateRandomName() - - producer, err := client.CreateProducer(ProducerOptions{ - Topic: topic, - }) - - assert.Nil(t, err) - defer producer.Close() - - ctx := context.Background() - var msgIDs [][]byte - - for i := 0; i < 10; i++ { - if msgID, err := producer.Send(ctx, &ProducerMessage{ - Payload: []byte(fmt.Sprintf("hello-%d", i)), - }); err != nil { - assert.Nil(t, err) - } else { - assert.NotNil(t, msgID) - msgIDs = append(msgIDs, msgID.Serialize()) - } - } - - assert.Equal(t, 10, len(msgIDs)) - - consumer, err := client.Subscribe(ConsumerOptions{ - Topic: topic, - SubscriptionName: "retry-multi-hosts-sub", - Type: Shared, - SubscriptionInitialPosition: SubscriptionPositionEarliest, - }) - assert.Nil(t, err) - defer consumer.Close() - - for i := 0; i < 10; i++ { - msg, err := consumer.Receive(context.Background()) - assert.Nil(t, err) - assert.Contains(t, msgIDs, msg.ID().Serialize()) - consumer.Ack(msg) - } - - err = consumer.Unsubscribe() - assert.Nil(t, err) - -} - func TestHTTPSConnectionCAError(t *testing.T) { client, err := NewClient(ClientOptions{ URL: webServiceURLTLS, diff --git a/pulsar/reader_clustered_test.go b/pulsar/reader_clustered_test.go new file mode 100644 index 0000000000..1acbd41c1b --- /dev/null +++ b/pulsar/reader_clustered_test.go @@ -0,0 +1,87 @@ +// //go:build clustered + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pulsar + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/suite" +) + +type readerClusteredTestSuite struct { + suite.Suite +} + +func ReaderClusteredTestSuite(t *testing.T) { + suite.Run(t, new(readerClusteredTestSuite)) +} + +func (suite *readerClusteredTestSuite) TestRetryWithMultipleHosts() { + req := suite.Require() + // Multi hosts included an unreached port and the actual port for verify retry logic + client, err := NewClient(ClientOptions{ + URL: "pulsar://localhost:6600,localhost:6650", + }) + req.NoError(err) + defer client.Close() + + topic := newTopicName() + ctx := context.Background() + + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + DisableBatching: true, + }) + req.NoError(err) + defer producer.Close() + + // send 10 messages + for i := 0; i < 10; i++ { + msgID, err := producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + }) + req.NoError(err) + req.NotNil(msgID) + } + + // create reader on 5th message (not included) + reader, err := client.CreateReader(ReaderOptions{ + Topic: topic, + StartMessageID: EarliestMessageID(), + }) + req.NoError(err) + defer reader.Close() + + i := 0 + for reader.HasNext() { + msg, err := reader.Next(context.Background()) + req.NoError(err) + + expectMsg := fmt.Sprintf("hello-%d", i) + req.Equal([]byte(expectMsg), msg.Payload()) + + i++ + } + + req.Equal(10, i) +} diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index 78c222dac7..93787d106c 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -724,58 +724,6 @@ func TestReaderLatestInclusiveHasNext(t *testing.T) { assert.False(t, reader.HasNext()) } -func TestReaderWithMultiHosts(t *testing.T) { - // Multi hosts included an unreached port and the actual port for verify retry logic - client, err := NewClient(ClientOptions{ - URL: "pulsar://localhost:6600,localhost:6650", - }) - - assert.Nil(t, err) - defer client.Close() - - topic := newTopicName() - ctx := context.Background() - - // create producer - producer, err := client.CreateProducer(ProducerOptions{ - Topic: topic, - DisableBatching: true, - }) - assert.Nil(t, err) - defer producer.Close() - - // send 10 messages - for i := 0; i < 10; i++ { - msgID, err := producer.Send(ctx, &ProducerMessage{ - Payload: []byte(fmt.Sprintf("hello-%d", i)), - }) - assert.NoError(t, err) - assert.NotNil(t, msgID) - } - - // create reader on 5th message (not included) - reader, err := client.CreateReader(ReaderOptions{ - Topic: topic, - StartMessageID: EarliestMessageID(), - }) - - assert.Nil(t, err) - defer reader.Close() - - i := 0 - for reader.HasNext() { - msg, err := reader.Next(context.Background()) - assert.NoError(t, err) - - expectMsg := fmt.Sprintf("hello-%d", i) - assert.Equal(t, []byte(expectMsg), msg.Payload()) - - i++ - } - - assert.Equal(t, 10, i) -} - func TestProducerReaderRSAEncryption(t *testing.T) { client, err := NewClient(ClientOptions{ URL: lookupURL, diff --git a/scripts/run-ci-clustered.sh b/scripts/run-ci-clustered.sh new file mode 100755 index 0000000000..5d0196032a --- /dev/null +++ b/scripts/run-ci-clustered.sh @@ -0,0 +1,23 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +set -e -x + +go test -race -coverprofile=/tmp/coverage -timeout=5m -tags clustered -run *Clustered* -v ./... +go tool cover -html=/tmp/coverage -o coverage.html + From bc76f4248d37b5c1c8e882402f378f06ee1acc6d Mon Sep 17 00:00:00 2001 From: Dragos Misca Date: Mon, 8 Apr 2024 12:20:09 -0700 Subject: [PATCH 05/11] Fix build error --- pulsar/client_impl_clustered_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar/client_impl_clustered_test.go b/pulsar/client_impl_clustered_test.go index 2da397245e..8492126666 100644 --- a/pulsar/client_impl_clustered_test.go +++ b/pulsar/client_impl_clustered_test.go @@ -36,7 +36,7 @@ func ClientClusteredTestSuite(t *testing.T) { } func (suite *clientClusteredTestSuite) TestRetryWithMultipleHosts() { - req := suite.req + req := suite.Require() // Multi hosts included an unreached port and the actual port for verify retry logic client, err := NewClient(ClientOptions{ URL: "pulsar://broker-1:6600,broker-1:6650", From abef20cc0d6c19e9bf2812d0cd555032d922191e Mon Sep 17 00:00:00 2001 From: Dragos Misca Date: Fri, 12 Apr 2024 14:02:12 -0700 Subject: [PATCH 06/11] Update clustered test setup --- pulsar/client_impl_clustered_test.go | 2 +- pulsar/reader_clustered_test.go | 13 +++++++------ scripts/run-ci-clustered.sh | 2 +- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/pulsar/client_impl_clustered_test.go b/pulsar/client_impl_clustered_test.go index 8492126666..e572c7741c 100644 --- a/pulsar/client_impl_clustered_test.go +++ b/pulsar/client_impl_clustered_test.go @@ -31,7 +31,7 @@ type clientClusteredTestSuite struct { suite.Suite } -func ClientClusteredTestSuite(t *testing.T) { +func TestClientClusteredTestSuite(t *testing.T) { suite.Run(t, new(clientClusteredTestSuite)) } diff --git a/pulsar/reader_clustered_test.go b/pulsar/reader_clustered_test.go index 1acbd41c1b..feeacfb2f7 100644 --- a/pulsar/reader_clustered_test.go +++ b/pulsar/reader_clustered_test.go @@ -1,4 +1,4 @@ -// //go:build clustered +//go:build clustered // Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file @@ -27,19 +27,20 @@ import ( "github.com/stretchr/testify/suite" ) -type readerClusteredTestSuite struct { +type ReaderClusteredTestSuite struct { suite.Suite } -func ReaderClusteredTestSuite(t *testing.T) { - suite.Run(t, new(readerClusteredTestSuite)) +func TestReaderClusteredTestSuite(t *testing.T) { + suite.Run(t, new(ReaderClusteredTestSuite)) } -func (suite *readerClusteredTestSuite) TestRetryWithMultipleHosts() { +func (suite *ReaderClusteredTestSuite) TestRetryWithMultipleHosts() { req := suite.Require() + // Multi hosts included an unreached port and the actual port for verify retry logic client, err := NewClient(ClientOptions{ - URL: "pulsar://localhost:6600,localhost:6650", + URL: "pulsar://broker-1:6600,broker-1:6650", }) req.NoError(err) defer client.Close() diff --git a/scripts/run-ci-clustered.sh b/scripts/run-ci-clustered.sh index 5d0196032a..5e4f36fa4c 100755 --- a/scripts/run-ci-clustered.sh +++ b/scripts/run-ci-clustered.sh @@ -18,6 +18,6 @@ set -e -x -go test -race -coverprofile=/tmp/coverage -timeout=5m -tags clustered -run *Clustered* -v ./... +go test -race -coverprofile=/tmp/coverage -timeout=5m -tags clustered -v -run 'Test.*ClusteredTestSuite' -v ./pulsar go tool cover -html=/tmp/coverage -o coverage.html From bec8fc1d409e69c8a8c77367996697ab0fc25c7d Mon Sep 17 00:00:00 2001 From: Dragos Misca Date: Thu, 28 Mar 2024 16:49:43 -0700 Subject: [PATCH 07/11] Factor out common image build steps --- Dockerfile | 13 +++++++++++++ scripts/run-ci.sh | 10 ---------- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/Dockerfile b/Dockerfile index 818a106a9f..51e35f0a67 100644 --- a/Dockerfile +++ b/Dockerfile @@ -42,3 +42,16 @@ COPY integration-tests/conf/.htpasswd \ COPY . /pulsar/pulsar-client-go ENV PULSAR_EXTRA_OPTS="-Dpulsar.auth.basic.conf=/pulsar/conf/.htpasswd" + +WORKDIR /pulsar/pulsar-client-go + +ENV GOPATH=/pulsar/go +ENV GOCACHE=/tmp/go-cache + +# Install dependencies +RUN go mod download + +# Basic compilation +RUN go build ./pulsar +RUN go build ./pulsaradmin +RUN go build -o bin/pulsar-perf ./perf diff --git a/scripts/run-ci.sh b/scripts/run-ci.sh index 83246a39f0..e7a6d79ece 100755 --- a/scripts/run-ci.sh +++ b/scripts/run-ci.sh @@ -19,16 +19,6 @@ set -e -x -export GOPATH=/pulsar/go -export GOCACHE=/tmp/go-cache - -# Install dependencies -go mod download - -# Basic compilation -go build ./pulsar -go build -o bin/pulsar-perf ./perf - scripts/pulsar-test-service-start.sh go test -race -coverprofile=/tmp/coverage -timeout=20m -v ./... From ef3e3764c08d6cb820c1737f3e95c66462076dc7 Mon Sep 17 00:00:00 2001 From: Dragos Misca Date: Fri, 12 Apr 2024 15:58:26 -0700 Subject: [PATCH 08/11] Run all tests during 'make test' target --- Makefile | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/Makefile b/Makefile index 36f2fb133c..df4d539d49 100644 --- a/Makefile +++ b/Makefile @@ -44,15 +44,17 @@ container: --build-arg PULSAR_IMAGE="${PULSAR_IMAGE}" \ --build-arg ARCH="${CONTAINER_ARCH}" . +test: container test_standalone test_clustered + +test_standalone: container + docker run -i ${IMAGE_NAME} bash -c "cd /pulsar/pulsar-client-go && ./scripts/run-ci.sh" + test_clustered: container PULSAR_VERSION=${PULSAR_VERSION} docker compose -f integration-tests/clustered/docker-compose.yml up -d || true until curl http://localhost:8080/metrics > /dev/null 2>&1; do sleep 1; done docker run --network "clustered_pulsar" -i ${IMAGE_NAME} bash -c "cd /pulsar/pulsar-client-go && ./scripts/run-ci-clustered.sh" PULSAR_VERSION=${PULSAR_VERSION} docker compose -f integration-tests/clustered/docker-compose.yml down -test: container - docker run -i ${IMAGE_NAME} bash -c "cd /pulsar/pulsar-client-go && ./scripts/run-ci.sh" - clean: docker rmi --force $(IMAGE_NAME) || true rm bin/* From 73e0472a60337e5997a00e91ad29557fc8cd41c4 Mon Sep 17 00:00:00 2001 From: Dragos Misca Date: Wed, 17 Apr 2024 11:23:03 -0700 Subject: [PATCH 09/11] Cosmetic fix --- pulsar/internal/connection_pool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar/internal/connection_pool.go b/pulsar/internal/connection_pool.go index db6bb8a217..3d718b75d9 100644 --- a/pulsar/internal/connection_pool.go +++ b/pulsar/internal/connection_pool.go @@ -140,7 +140,7 @@ func (p *connectionPool) getMapKey(logicalAddr *url.URL, physicalAddr *url.URL) cnt = -cnt } idx := cnt % p.maxConnectionsPerHost - return fmt.Sprint(logicalAddr.Host, '-', physicalAddr.Host, '-', idx) + return fmt.Sprint(logicalAddr.Host, "-", physicalAddr.Host, "-", idx) } func (p *connectionPool) checkAndCleanIdleConnections(maxIdleTime time.Duration) { From 1122ea34217ebb585bf3b96c8f8cdb74279d122d Mon Sep 17 00:00:00 2001 From: Dragos Misca Date: Wed, 17 Apr 2024 11:40:59 -0700 Subject: [PATCH 10/11] Fix typo in TestReaderWithMultipleHosts name --- pulsar/reader_clustered_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar/reader_clustered_test.go b/pulsar/reader_clustered_test.go index feeacfb2f7..db2dc6f130 100644 --- a/pulsar/reader_clustered_test.go +++ b/pulsar/reader_clustered_test.go @@ -35,7 +35,7 @@ func TestReaderClusteredTestSuite(t *testing.T) { suite.Run(t, new(ReaderClusteredTestSuite)) } -func (suite *ReaderClusteredTestSuite) TestRetryWithMultipleHosts() { +func (suite *ReaderClusteredTestSuite) TestReaderWithMultipleHosts() { req := suite.Require() // Multi hosts included an unreached port and the actual port for verify retry logic From ba0582a5303e01edb7d3129e0f55ee958d4c9719 Mon Sep 17 00:00:00 2001 From: Dragos Misca Date: Wed, 17 Apr 2024 11:53:05 -0700 Subject: [PATCH 11/11] Add missing copyright information --- integration-tests/clustered/docker-compose.yml | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/integration-tests/clustered/docker-compose.yml b/integration-tests/clustered/docker-compose.yml index 9984da50e1..cce8edddad 100644 --- a/integration-tests/clustered/docker-compose.yml +++ b/integration-tests/clustered/docker-compose.yml @@ -1,3 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + version: '3' networks: pulsar: