Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,16 @@ COPY integration-tests/conf/.htpasswd \
COPY . /pulsar/pulsar-client-go

ENV PULSAR_EXTRA_OPTS="-Dpulsar.auth.basic.conf=/pulsar/conf/.htpasswd"

WORKDIR /pulsar/pulsar-client-go

ENV GOPATH=/pulsar/go
ENV GOCACHE=/tmp/go-cache

# Install dependencies
RUN go mod download

# Basic compilation
RUN go build ./pulsar
RUN go build ./pulsaradmin
RUN go build -o bin/pulsar-perf ./perf
10 changes: 9 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,17 @@ container:
--build-arg PULSAR_IMAGE="${PULSAR_IMAGE}" \
--build-arg ARCH="${CONTAINER_ARCH}" .

test: container
test: container test_standalone test_clustered

test_standalone: container
docker run -i ${IMAGE_NAME} bash -c "cd /pulsar/pulsar-client-go && ./scripts/run-ci.sh"

test_clustered: container
PULSAR_VERSION=${PULSAR_VERSION} docker compose -f integration-tests/clustered/docker-compose.yml up -d || true
until curl http://localhost:8080/metrics > /dev/null 2>&1; do sleep 1; done
docker run --network "clustered_pulsar" -i ${IMAGE_NAME} bash -c "cd /pulsar/pulsar-client-go && ./scripts/run-ci-clustered.sh"
PULSAR_VERSION=${PULSAR_VERSION} docker compose -f integration-tests/clustered/docker-compose.yml down

clean:
docker rmi --force $(IMAGE_NAME) || true
rm bin/*
167 changes: 167 additions & 0 deletions integration-tests/clustered/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

version: '3'
networks:
pulsar:
driver: bridge
services:
# Start ZooKeeper
zookeeper:
image: apachepulsar/pulsar:${PULSAR_VERSION}
container_name: zookeeper
restart: on-failure
networks:
- pulsar
environment:
- metadataStoreUrl=zk:zookeeper:2181
- PULSAR_MEM=-Xms128m -Xmx128m -XX:MaxDirectMemorySize=56m
command: >
bash -c "bin/apply-config-from-env.py conf/zookeeper.conf && \
bin/generate-zookeeper-config.sh conf/zookeeper.conf && \
exec bin/pulsar zookeeper"
healthcheck:
test: ["CMD", "bin/pulsar-zookeeper-ruok.sh"]
interval: 10s
timeout: 5s
retries: 30

# Initialize cluster metadata
pulsar-init:
container_name: pulsar-init
hostname: pulsar-init
image: apachepulsar/pulsar:${PULSAR_VERSION}
networks:
- pulsar
environment:
- PULSAR_MEM=-Xms128m -Xmx128m -XX:MaxDirectMemorySize=56m
command: >
bin/pulsar initialize-cluster-metadata \
--cluster cluster-a \
--zookeeper zookeeper:2181 \
--configuration-store zookeeper:2181 \
--web-service-url http://broker-1:8080 \
--broker-service-url pulsar://broker-1:6650
depends_on:
zookeeper:
condition: service_healthy

# Start bookie
bookie:
image: apachepulsar/pulsar:${PULSAR_VERSION}
container_name: bookie
restart: on-failure
networks:
- pulsar
environment:
- clusterName=cluster-a
- zkServers=zookeeper:2181
- metadataServiceUri=metadata-store:zk:zookeeper:2181
- advertisedAddress=bookie
- BOOKIE_MEM=-Xms128m -Xmx128m -XX:MaxDirectMemorySize=56m
depends_on:
zookeeper:
condition: service_healthy
pulsar-init:
condition: service_completed_successfully
command: bash -c "bin/apply-config-from-env.py conf/bookkeeper.conf && exec bin/pulsar bookie"

proxy:
image: apachepulsar/pulsar:${PULSAR_VERSION}
container_name: proxy
hostname: proxy
restart: on-failure
networks:
- pulsar
environment:
- metadataStoreUrl=zk:zookeeper:2181
- zookeeperServers=zookeeper:2181
- clusterName=cluster-a
- PULSAR_MEM=-Xms128m -Xmx128m -XX:MaxDirectMemorySize=56m
ports:
- "8080:8080"
- "6650:6650"
depends_on:
broker-1:
condition: service_healthy
broker-2:
condition: service_healthy
command: bash -c "bin/apply-config-from-env.py conf/proxy.conf && exec bin/pulsar proxy"

# Start broker 1
broker-1:
image: apachepulsar/pulsar:${PULSAR_VERSION}
container_name: broker-1
hostname: broker-1
restart: on-failure
networks:
- pulsar
environment:
- metadataStoreUrl=zk:zookeeper:2181
- zookeeperServers=zookeeper:2181
- clusterName=cluster-a
- managedLedgerDefaultEnsembleSize=1
- managedLedgerDefaultWriteQuorum=1
- managedLedgerDefaultAckQuorum=1
- advertisedAddress=broker-1
- internalListenerName=internal
- advertisedListeners=internal:pulsar://broker-1:6650
- PULSAR_MEM=-Xms256m -Xmx256m -XX:MaxDirectMemorySize=56m
- PULSAR_PREFIX_defaultNumberOfNamespaceBundles=1
depends_on:
zookeeper:
condition: service_healthy
bookie:
condition: service_started
command: bash -c "bin/apply-config-from-env.py conf/broker.conf && exec bin/pulsar broker"
healthcheck:
test: ["CMD-SHELL", "curl -f http://localhost:8080/metrics || exit 1"]
interval: 10s
timeout: 5s
retries: 30

# Start broker 2
broker-2:
image: apachepulsar/pulsar:${PULSAR_VERSION}
container_name: broker-2
hostname: broker-2
restart: on-failure
networks:
- pulsar
environment:
- metadataStoreUrl=zk:zookeeper:2181
- zookeeperServers=zookeeper:2181
- clusterName=cluster-a
- managedLedgerDefaultEnsembleSize=1
- managedLedgerDefaultWriteQuorum=1
- managedLedgerDefaultAckQuorum=1
- advertisedAddress=broker-2
- internalListenerName=internal
- advertisedListeners=internal:pulsar://broker-2:6650
- PULSAR_MEM=-Xms256m -Xmx256m -XX:MaxDirectMemorySize=56m
- PULSAR_PREFIX_defaultNumberOfNamespaceBundles=1
depends_on:
zookeeper:
condition: service_healthy
bookie:
condition: service_started
command: bash -c "bin/apply-config-from-env.py conf/broker.conf && exec bin/pulsar broker"
healthcheck:
test: ["CMD-SHELL", "curl -f http://localhost:8080/metrics || exit 1"]
interval: 10s
timeout: 5s
retries: 30
89 changes: 89 additions & 0 deletions pulsar/client_impl_clustered_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
//go:build clustered

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package pulsar

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/suite"
)

type clientClusteredTestSuite struct {
suite.Suite
}

func TestClientClusteredTestSuite(t *testing.T) {
suite.Run(t, new(clientClusteredTestSuite))
}

func (suite *clientClusteredTestSuite) TestRetryWithMultipleHosts() {
req := suite.Require()
// Multi hosts included an unreached port and the actual port for verify retry logic
client, err := NewClient(ClientOptions{
URL: "pulsar://broker-1:6600,broker-1:6650",
})
req.NoError(err)
defer client.Close()

topic := "persistent://public/default/retry-multiple-hosts-" + generateRandomName()

producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
})
req.NoError(err)
defer producer.Close()

ctx := context.Background()
var msgIDs [][]byte

for i := 0; i < 10; i++ {
if msgID, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
}); err != nil {
req.NoError(err)
} else {
req.NotNil(msgID)
msgIDs = append(msgIDs, msgID.Serialize())
}
}

req.Equal(10, len(msgIDs))

consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "retry-multi-hosts-sub",
Type: Shared,
SubscriptionInitialPosition: SubscriptionPositionEarliest,
})
req.NoError(err)
defer consumer.Close()

for i := 0; i < 10; i++ {
msg, err := consumer.Receive(context.Background())
req.NoError(err)
req.Contains(msgIDs, msg.ID().Serialize())
consumer.Ack(msg)
}

err = consumer.Unsubscribe()
req.NoError(err)
}
55 changes: 0 additions & 55 deletions pulsar/client_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,61 +570,6 @@ func anonymousNamespacePolicy() map[string]interface{} {
}
}

func TestRetryWithMultipleHosts(t *testing.T) {
// Multi hosts included an unreached port and the actual port for verify retry logic
client, err := NewClient(ClientOptions{
URL: "pulsar://localhost:6600,localhost:6650",
})

assert.Nil(t, err)
defer client.Close()

topic := "persistent://public/default/retry-multiple-hosts-" + generateRandomName()

producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
})

assert.Nil(t, err)
defer producer.Close()

ctx := context.Background()
var msgIDs [][]byte

for i := 0; i < 10; i++ {
if msgID, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
}); err != nil {
assert.Nil(t, err)
} else {
assert.NotNil(t, msgID)
msgIDs = append(msgIDs, msgID.Serialize())
}
}

assert.Equal(t, 10, len(msgIDs))

consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "retry-multi-hosts-sub",
Type: Shared,
SubscriptionInitialPosition: SubscriptionPositionEarliest,
})
assert.Nil(t, err)
defer consumer.Close()

for i := 0; i < 10; i++ {
msg, err := consumer.Receive(context.Background())
assert.Nil(t, err)
assert.Contains(t, msgIDs, msg.ID().Serialize())
consumer.Ack(msg)
}

err = consumer.Unsubscribe()
assert.Nil(t, err)

}

func TestHTTPSConnectionCAError(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: webServiceURLTLS,
Expand Down
7 changes: 4 additions & 3 deletions pulsar/internal/connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ func NewConnectionPool(
}

func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.URL) (Connection, error) {
key := p.getMapKey(logicalAddr)
p.log.WithField("logicalAddr", logicalAddr).WithField("physicalAddr", physicalAddr).Debug("Getting pooled connection")
key := p.getMapKey(logicalAddr, physicalAddr)

p.Lock()
conn, ok := p.connections[key]
Expand Down Expand Up @@ -133,13 +134,13 @@ func (p *connectionPool) Close() {
p.Unlock()
}

func (p *connectionPool) getMapKey(addr *url.URL) string {
func (p *connectionPool) getMapKey(logicalAddr *url.URL, physicalAddr *url.URL) string {
cnt := atomic.AddInt32(&p.roundRobinCnt, 1)
if cnt < 0 {
cnt = -cnt
}
idx := cnt % p.maxConnectionsPerHost
return fmt.Sprint(addr.Host, '-', idx)
return fmt.Sprint(logicalAddr.Host, "-", physicalAddr.Host, "-", idx)
}

func (p *connectionPool) checkAndCleanIdleConnections(maxIdleTime time.Duration) {
Expand Down
Loading