diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ClientBuilderConfigTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ClientBuilderConfigTest.java new file mode 100644 index 0000000000000..6cd4200058ff1 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ClientBuilderConfigTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api.v5; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertSame; +import io.opentelemetry.api.OpenTelemetry; +import java.lang.reflect.Field; +import lombok.Cleanup; +import org.apache.pulsar.client.api.v5.config.MemorySize; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.testng.annotations.Test; + +/** + * Coverage for the V5 {@link PulsarClientBuilder} configuration knobs that don't + * have observable end-to-end behaviour against a single in-process broker: + * {@code listenerName}, {@code description}, {@code memoryLimit}, and + * {@code openTelemetry}. These all just plumb a value into the underlying v4 + * {@code ClientConfigurationData} — so the tests reflect into the wrapped v4 + * client and assert the value made it through. + * + *
If any of these setters silently dropped the value, the only way it would + * surface today is by a user reporting that their telemetry collector / broker + * listener / memory cap doesn't apply. These tests pin the contract. + */ +public class V5ClientBuilderConfigTest extends V5ClientBaseTest { + + @Test + public void testListenerNamePropagates() throws Exception { + @Cleanup + PulsarClient client = PulsarClient.builder() + .serviceUrl(getBrokerServiceUrl()) + .listenerName("internal") + .build(); + + ClientConfigurationData conf = readV4Conf(client); + assertEquals(conf.getListenerName(), "internal", + "listenerName must propagate to the underlying v4 client config"); + } + + @Test + public void testDescriptionPropagates() throws Exception { + @Cleanup + PulsarClient client = PulsarClient.builder() + .serviceUrl(getBrokerServiceUrl()) + .description("v5-test-client") + .build(); + + ClientConfigurationData conf = readV4Conf(client); + assertEquals(conf.getDescription(), "v5-test-client", + "description must propagate to the underlying v4 client config"); + } + + @Test + public void testMemoryLimitPropagates() throws Exception { + @Cleanup + PulsarClient client = PulsarClient.builder() + .serviceUrl(getBrokerServiceUrl()) + .memoryLimit(MemorySize.ofMegabytes(64)) + .build(); + + ClientConfigurationData conf = readV4Conf(client); + assertEquals(conf.getMemoryLimitBytes(), 64L * 1024 * 1024, + "memoryLimit must propagate to the underlying v4 client config"); + } + + @Test + public void testOpenTelemetryPropagates() throws Exception { + OpenTelemetry custom = OpenTelemetry.noop(); + @Cleanup + PulsarClient client = PulsarClient.builder() + .serviceUrl(getBrokerServiceUrl()) + .openTelemetry(custom) + .build(); + + ClientConfigurationData conf = readV4Conf(client); + // Same instance — the v4 layer does not clone or wrap the OpenTelemetry handle. + assertSame(conf.getOpenTelemetry(), custom, + "openTelemetry instance must be the exact one the user supplied"); + } + + // --- Helpers --- + + private static ClientConfigurationData readV4Conf(PulsarClient v5Client) throws Exception { + Field f = v5Client.getClass().getDeclaredField("v4Client"); + f.setAccessible(true); + Object v4Client = f.get(v5Client); + assertNotNull(v4Client, "expected v4Client on V5 PulsarClient"); + return ((PulsarClientImpl) v4Client).getConfiguration(); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ClientLifecycleTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ClientLifecycleTest.java new file mode 100644 index 0000000000000..57dc8977aa5e4 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ClientLifecycleTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api.v5; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; +import java.lang.reflect.Field; +import java.time.Duration; +import org.apache.pulsar.client.api.v5.config.TransactionPolicy; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.testng.annotations.Test; + +/** + * Coverage for V5 {@link PulsarClient} lifecycle entry points that the + * existing test suite doesn't exercise: {@link PulsarClient#closeAsync()}, + * {@link PulsarClient#shutdown()}, and {@link PulsarClient#newTransactionAsync()}. + * + *
The synchronous {@code close()} and {@code newTransaction()} are already
+ * exercised heavily by every other V5 test (close via {@code @Cleanup} /
+ * {@code @AfterClass}, transactions via {@link V5TransactionTest}). The async /
+ * shutdown variants share most of the v4 plumbing, but had no direct coverage
+ * — these tests pin the contract.
+ */
+public class V5ClientLifecycleTest extends V5ClientBaseTest {
+
+ @Test
+ public void testCloseAsyncCompletes() throws Exception {
+ PulsarClient client = newV5Client();
+ Object v4Client = readField(client, "v4Client");
+ assertTrue(v4Client instanceof PulsarClientImpl,
+ "expected v4Client to be a PulsarClientImpl, got " + v4Client.getClass());
+
+ client.closeAsync().get(10, java.util.concurrent.TimeUnit.SECONDS);
+ assertTrue(((PulsarClientImpl) v4Client).isClosed(),
+ "underlying v4 client must be closed after closeAsync()");
+ }
+
+ @Test
+ public void testShutdownDelegatesToV4() throws Exception {
+ // shutdown() is the v4 "fast" path: stops executors, releases connections,
+ // but deliberately does not flip the client's state to Closed (so
+ // isClosed() can still return false). The contract for V5 here is just
+ // "delegate to v4 without throwing"; observable post-shutdown behaviour
+ // is v4's responsibility and is exercised by the v4 test suite. This
+ // test pins the V5 → v4 delegation.
+ PulsarClient client = newV5Client();
+ client.shutdown();
+ // Calling shutdown() again must remain side-effect-free and not throw.
+ client.shutdown();
+ }
+
+ @Test
+ public void testNewTransactionAsyncReturnsOpenTransaction() throws Exception {
+ // Need a client with transactions enabled — the shared v5Client doesn't.
+ PulsarClient client = track(PulsarClient.builder()
+ .serviceUrl(getBrokerServiceUrl())
+ .transactionPolicy(TransactionPolicy.builder().timeout(Duration.ofMinutes(1)).build())
+ .build());
+
+ Transaction txn = client.newTransactionAsync()
+ .get(10, java.util.concurrent.TimeUnit.SECONDS);
+ assertNotNull(txn, "newTransactionAsync() future must resolve to a Transaction");
+ assertEquals(txn.state(), Transaction.State.OPEN,
+ "freshly opened transaction must be in OPEN state");
+ // Clean up — abort to leave no dangling txn on the broker.
+ txn.abort();
+ }
+
+ // --- Helpers ---
+
+ private static Object readField(Object target, String name) throws Exception {
+ Field f = target.getClass().getDeclaredField(name);
+ f.setAccessible(true);
+ return f.get(target);
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ConsumerSubscriptionPropertiesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ConsumerSubscriptionPropertiesTest.java
new file mode 100644
index 0000000000000..719e8fb3a1bc4
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ConsumerSubscriptionPropertiesTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for {@link QueueConsumerBuilder#subscriptionProperties(Map)}: the V5
+ * setter must propagate to every per-segment v4 {@code Consumer}'s
+ * {@link org.apache.pulsar.client.impl.conf.ConsumerConfigurationData}, which
+ * is what the v4 wire layer ships to the broker on subscribe.
+ */
+public class V5ConsumerSubscriptionPropertiesTest extends V5ClientBaseTest {
+
+ @Test
+ public void testSubscriptionPropertiesPropagateToV4Consumer() throws Exception {
+ String topic = newScalableTopic(1);
+ Map Verified by sending a V5 message with an explicit cluster restriction and
+ * reflecting into the V5 {@code MessageV5} wrapper to inspect the underlying
+ * v4 {@code MessageImpl.getReplicateTo()} — that's where the message metadata
+ * becomes observable.
+ */
+public class V5MessageReplicationClustersTest extends V5ClientBaseTest {
+
+ @Test
+ public void testReplicationClustersPropagatesToMessageMetadata() throws Exception {
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer