Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api.v5;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertSame;
import io.opentelemetry.api.OpenTelemetry;
import java.lang.reflect.Field;
import lombok.Cleanup;
import org.apache.pulsar.client.api.v5.config.MemorySize;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.testng.annotations.Test;

/**
* Coverage for the V5 {@link PulsarClientBuilder} configuration knobs that don't
* have observable end-to-end behaviour against a single in-process broker:
* {@code listenerName}, {@code description}, {@code memoryLimit}, and
* {@code openTelemetry}. These all just plumb a value into the underlying v4
* {@code ClientConfigurationData} — so the tests reflect into the wrapped v4
* client and assert the value made it through.
*
* <p>If any of these setters silently dropped the value, the only way it would
* surface today is by a user reporting that their telemetry collector / broker
* listener / memory cap doesn't apply. These tests pin the contract.
*/
public class V5ClientBuilderConfigTest extends V5ClientBaseTest {

@Test
public void testListenerNamePropagates() throws Exception {
@Cleanup
PulsarClient client = PulsarClient.builder()
.serviceUrl(getBrokerServiceUrl())
.listenerName("internal")
.build();

ClientConfigurationData conf = readV4Conf(client);
assertEquals(conf.getListenerName(), "internal",
"listenerName must propagate to the underlying v4 client config");
}

@Test
public void testDescriptionPropagates() throws Exception {
@Cleanup
PulsarClient client = PulsarClient.builder()
.serviceUrl(getBrokerServiceUrl())
.description("v5-test-client")
.build();

ClientConfigurationData conf = readV4Conf(client);
assertEquals(conf.getDescription(), "v5-test-client",
"description must propagate to the underlying v4 client config");
}

@Test
public void testMemoryLimitPropagates() throws Exception {
@Cleanup
PulsarClient client = PulsarClient.builder()
.serviceUrl(getBrokerServiceUrl())
.memoryLimit(MemorySize.ofMegabytes(64))
.build();

ClientConfigurationData conf = readV4Conf(client);
assertEquals(conf.getMemoryLimitBytes(), 64L * 1024 * 1024,
"memoryLimit must propagate to the underlying v4 client config");
}

@Test
public void testOpenTelemetryPropagates() throws Exception {
OpenTelemetry custom = OpenTelemetry.noop();
@Cleanup
PulsarClient client = PulsarClient.builder()
.serviceUrl(getBrokerServiceUrl())
.openTelemetry(custom)
.build();

ClientConfigurationData conf = readV4Conf(client);
// Same instance — the v4 layer does not clone or wrap the OpenTelemetry handle.
assertSame(conf.getOpenTelemetry(), custom,
"openTelemetry instance must be the exact one the user supplied");
}

// --- Helpers ---

private static ClientConfigurationData readV4Conf(PulsarClient v5Client) throws Exception {
Field f = v5Client.getClass().getDeclaredField("v4Client");
f.setAccessible(true);
Object v4Client = f.get(v5Client);
assertNotNull(v4Client, "expected v4Client on V5 PulsarClient");
return ((PulsarClientImpl) v4Client).getConfiguration();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api.v5;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import java.lang.reflect.Field;
import java.time.Duration;
import org.apache.pulsar.client.api.v5.config.TransactionPolicy;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.testng.annotations.Test;

/**
* Coverage for V5 {@link PulsarClient} lifecycle entry points that the
* existing test suite doesn't exercise: {@link PulsarClient#closeAsync()},
* {@link PulsarClient#shutdown()}, and {@link PulsarClient#newTransactionAsync()}.
*
* <p>The synchronous {@code close()} and {@code newTransaction()} are already
* exercised heavily by every other V5 test (close via {@code @Cleanup} /
* {@code @AfterClass}, transactions via {@link V5TransactionTest}). The async /
* shutdown variants share most of the v4 plumbing, but had no direct coverage
* — these tests pin the contract.
*/
public class V5ClientLifecycleTest extends V5ClientBaseTest {

@Test
public void testCloseAsyncCompletes() throws Exception {
PulsarClient client = newV5Client();
Object v4Client = readField(client, "v4Client");
assertTrue(v4Client instanceof PulsarClientImpl,
"expected v4Client to be a PulsarClientImpl, got " + v4Client.getClass());

client.closeAsync().get(10, java.util.concurrent.TimeUnit.SECONDS);
assertTrue(((PulsarClientImpl) v4Client).isClosed(),
"underlying v4 client must be closed after closeAsync()");
}

@Test
public void testShutdownDelegatesToV4() throws Exception {
// shutdown() is the v4 "fast" path: stops executors, releases connections,
// but deliberately does not flip the client's state to Closed (so
// isClosed() can still return false). The contract for V5 here is just
// "delegate to v4 without throwing"; observable post-shutdown behaviour
// is v4's responsibility and is exercised by the v4 test suite. This
// test pins the V5 → v4 delegation.
PulsarClient client = newV5Client();
client.shutdown();
// Calling shutdown() again must remain side-effect-free and not throw.
client.shutdown();
}

@Test
public void testNewTransactionAsyncReturnsOpenTransaction() throws Exception {
// Need a client with transactions enabled — the shared v5Client doesn't.
PulsarClient client = track(PulsarClient.builder()
.serviceUrl(getBrokerServiceUrl())
.transactionPolicy(TransactionPolicy.builder().timeout(Duration.ofMinutes(1)).build())
.build());

Transaction txn = client.newTransactionAsync()
.get(10, java.util.concurrent.TimeUnit.SECONDS);
assertNotNull(txn, "newTransactionAsync() future must resolve to a Transaction");
assertEquals(txn.state(), Transaction.State.OPEN,
"freshly opened transaction must be in OPEN state");
// Clean up — abort to leave no dangling txn on the broker.
txn.abort();
}

// --- Helpers ---

private static Object readField(Object target, String name) throws Exception {
Field f = target.getClass().getDeclaredField(name);
f.setAccessible(true);
return f.get(target);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api.v5;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import java.lang.reflect.Field;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import lombok.Cleanup;
import org.apache.pulsar.client.api.v5.schema.Schema;
import org.testng.annotations.Test;

/**
* Coverage for {@link QueueConsumerBuilder#subscriptionProperties(Map)}: the V5
* setter must propagate to every per-segment v4 {@code Consumer}'s
* {@link org.apache.pulsar.client.impl.conf.ConsumerConfigurationData}, which
* is what the v4 wire layer ships to the broker on subscribe.
*/
public class V5ConsumerSubscriptionPropertiesTest extends V5ClientBaseTest {

@Test
public void testSubscriptionPropertiesPropagateToV4Consumer() throws Exception {
String topic = newScalableTopic(1);
Map<String, String> props = Map.of("env", "prod", "team", "data-platform");

@Cleanup
QueueConsumer<String> consumer = v5Client.newQueueConsumer(Schema.string())
.topic(topic)
.subscriptionName("sub-props-test")
.subscriptionProperties(props)
.subscribe();

Map<String, String> actual = readSubscriptionPropertiesFromV4(consumer);
assertEquals(actual, props,
"V5 subscriptionProperties must propagate to the v4 consumer config");
}

@Test
public void testSubscriptionPropertiesAppliesToEverySegment() throws Exception {
String topic = newScalableTopic(3);
Map<String, String> props = Map.of("region", "us-east-1");

@Cleanup
QueueConsumer<String> consumer = v5Client.newQueueConsumer(Schema.string())
.topic(topic)
.subscriptionName("sub-props-multi-test")
.subscriptionProperties(props)
.subscribe();

Map<Long, ?> segmentConsumers = readSegmentConsumers(consumer);
assertEquals(segmentConsumers.size(), 3, "expected one v4 consumer per segment");
for (CompletableFuture<?> future : asConsumerFutures(segmentConsumers)) {
Object v4Consumer = future.get();
Map<String, String> v4Props = readConfSubscriptionProperties(v4Consumer);
assertEquals(v4Props, props,
"every segment's v4 Consumer must carry the same subscriptionProperties");
}
}

// --- Helpers ---

private static Map<String, String> readSubscriptionPropertiesFromV4(QueueConsumer<?> consumer)
throws Exception {
Map<Long, ?> segmentConsumers = readSegmentConsumers(consumer);
assertEquals(segmentConsumers.size(), 1, "expected a single segment for this test");
CompletableFuture<?> future = asConsumerFutures(segmentConsumers).iterator().next();
Object v4Consumer = future.get();
return readConfSubscriptionProperties(v4Consumer);
}

private static Map<Long, ?> readSegmentConsumers(QueueConsumer<?> consumer) throws Exception {
Field f = consumer.getClass().getDeclaredField("segmentConsumers");
f.setAccessible(true);
Object map = f.get(consumer);
assertNotNull(map, "expected segmentConsumers map on V5 queue consumer");
return (Map<Long, ?>) map;
}

@SuppressWarnings("unchecked")
private static Iterable<CompletableFuture<?>> asConsumerFutures(Map<Long, ?> segmentConsumers) {
return (Iterable<CompletableFuture<?>>) (Iterable<?>) segmentConsumers.values();
}

@SuppressWarnings("unchecked")
private static Map<String, String> readConfSubscriptionProperties(Object v4Consumer) throws Exception {
// ConsumerBase#conf is protected; walk the class hierarchy.
Class<?> c = v4Consumer.getClass();
while (c != null) {
try {
Field f = c.getDeclaredField("conf");
f.setAccessible(true);
Object conf = f.get(v4Consumer);
var getter = conf.getClass().getMethod("getSubscriptionProperties");
return (Map<String, String>) getter.invoke(conf);
} catch (NoSuchFieldException e) {
c = c.getSuperclass();
}
}
throw new NoSuchFieldException("conf not found on " + v4Consumer.getClass());
}
}
Loading
Loading