diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ClientBuilderConfigTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ClientBuilderConfigTest.java new file mode 100644 index 0000000000000..6cd4200058ff1 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ClientBuilderConfigTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api.v5; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertSame; +import io.opentelemetry.api.OpenTelemetry; +import java.lang.reflect.Field; +import lombok.Cleanup; +import org.apache.pulsar.client.api.v5.config.MemorySize; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.testng.annotations.Test; + +/** + * Coverage for the V5 {@link PulsarClientBuilder} configuration knobs that don't + * have observable end-to-end behaviour against a single in-process broker: + * {@code listenerName}, {@code description}, {@code memoryLimit}, and + * {@code openTelemetry}. These all just plumb a value into the underlying v4 + * {@code ClientConfigurationData} — so the tests reflect into the wrapped v4 + * client and assert the value made it through. + * + *

If any of these setters silently dropped the value, the only way it would + * surface today is by a user reporting that their telemetry collector / broker + * listener / memory cap doesn't apply. These tests pin the contract. + */ +public class V5ClientBuilderConfigTest extends V5ClientBaseTest { + + @Test + public void testListenerNamePropagates() throws Exception { + @Cleanup + PulsarClient client = PulsarClient.builder() + .serviceUrl(getBrokerServiceUrl()) + .listenerName("internal") + .build(); + + ClientConfigurationData conf = readV4Conf(client); + assertEquals(conf.getListenerName(), "internal", + "listenerName must propagate to the underlying v4 client config"); + } + + @Test + public void testDescriptionPropagates() throws Exception { + @Cleanup + PulsarClient client = PulsarClient.builder() + .serviceUrl(getBrokerServiceUrl()) + .description("v5-test-client") + .build(); + + ClientConfigurationData conf = readV4Conf(client); + assertEquals(conf.getDescription(), "v5-test-client", + "description must propagate to the underlying v4 client config"); + } + + @Test + public void testMemoryLimitPropagates() throws Exception { + @Cleanup + PulsarClient client = PulsarClient.builder() + .serviceUrl(getBrokerServiceUrl()) + .memoryLimit(MemorySize.ofMegabytes(64)) + .build(); + + ClientConfigurationData conf = readV4Conf(client); + assertEquals(conf.getMemoryLimitBytes(), 64L * 1024 * 1024, + "memoryLimit must propagate to the underlying v4 client config"); + } + + @Test + public void testOpenTelemetryPropagates() throws Exception { + OpenTelemetry custom = OpenTelemetry.noop(); + @Cleanup + PulsarClient client = PulsarClient.builder() + .serviceUrl(getBrokerServiceUrl()) + .openTelemetry(custom) + .build(); + + ClientConfigurationData conf = readV4Conf(client); + // Same instance — the v4 layer does not clone or wrap the OpenTelemetry handle. + assertSame(conf.getOpenTelemetry(), custom, + "openTelemetry instance must be the exact one the user supplied"); + } + + // --- Helpers --- + + private static ClientConfigurationData readV4Conf(PulsarClient v5Client) throws Exception { + Field f = v5Client.getClass().getDeclaredField("v4Client"); + f.setAccessible(true); + Object v4Client = f.get(v5Client); + assertNotNull(v4Client, "expected v4Client on V5 PulsarClient"); + return ((PulsarClientImpl) v4Client).getConfiguration(); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ClientLifecycleTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ClientLifecycleTest.java new file mode 100644 index 0000000000000..57dc8977aa5e4 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ClientLifecycleTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api.v5; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; +import java.lang.reflect.Field; +import java.time.Duration; +import org.apache.pulsar.client.api.v5.config.TransactionPolicy; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.testng.annotations.Test; + +/** + * Coverage for V5 {@link PulsarClient} lifecycle entry points that the + * existing test suite doesn't exercise: {@link PulsarClient#closeAsync()}, + * {@link PulsarClient#shutdown()}, and {@link PulsarClient#newTransactionAsync()}. + * + *

The synchronous {@code close()} and {@code newTransaction()} are already + * exercised heavily by every other V5 test (close via {@code @Cleanup} / + * {@code @AfterClass}, transactions via {@link V5TransactionTest}). The async / + * shutdown variants share most of the v4 plumbing, but had no direct coverage + * — these tests pin the contract. + */ +public class V5ClientLifecycleTest extends V5ClientBaseTest { + + @Test + public void testCloseAsyncCompletes() throws Exception { + PulsarClient client = newV5Client(); + Object v4Client = readField(client, "v4Client"); + assertTrue(v4Client instanceof PulsarClientImpl, + "expected v4Client to be a PulsarClientImpl, got " + v4Client.getClass()); + + client.closeAsync().get(10, java.util.concurrent.TimeUnit.SECONDS); + assertTrue(((PulsarClientImpl) v4Client).isClosed(), + "underlying v4 client must be closed after closeAsync()"); + } + + @Test + public void testShutdownDelegatesToV4() throws Exception { + // shutdown() is the v4 "fast" path: stops executors, releases connections, + // but deliberately does not flip the client's state to Closed (so + // isClosed() can still return false). The contract for V5 here is just + // "delegate to v4 without throwing"; observable post-shutdown behaviour + // is v4's responsibility and is exercised by the v4 test suite. This + // test pins the V5 → v4 delegation. + PulsarClient client = newV5Client(); + client.shutdown(); + // Calling shutdown() again must remain side-effect-free and not throw. + client.shutdown(); + } + + @Test + public void testNewTransactionAsyncReturnsOpenTransaction() throws Exception { + // Need a client with transactions enabled — the shared v5Client doesn't. + PulsarClient client = track(PulsarClient.builder() + .serviceUrl(getBrokerServiceUrl()) + .transactionPolicy(TransactionPolicy.builder().timeout(Duration.ofMinutes(1)).build()) + .build()); + + Transaction txn = client.newTransactionAsync() + .get(10, java.util.concurrent.TimeUnit.SECONDS); + assertNotNull(txn, "newTransactionAsync() future must resolve to a Transaction"); + assertEquals(txn.state(), Transaction.State.OPEN, + "freshly opened transaction must be in OPEN state"); + // Clean up — abort to leave no dangling txn on the broker. + txn.abort(); + } + + // --- Helpers --- + + private static Object readField(Object target, String name) throws Exception { + Field f = target.getClass().getDeclaredField(name); + f.setAccessible(true); + return f.get(target); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ConsumerSubscriptionPropertiesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ConsumerSubscriptionPropertiesTest.java new file mode 100644 index 0000000000000..719e8fb3a1bc4 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ConsumerSubscriptionPropertiesTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api.v5; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import java.lang.reflect.Field; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import lombok.Cleanup; +import org.apache.pulsar.client.api.v5.schema.Schema; +import org.testng.annotations.Test; + +/** + * Coverage for {@link QueueConsumerBuilder#subscriptionProperties(Map)}: the V5 + * setter must propagate to every per-segment v4 {@code Consumer}'s + * {@link org.apache.pulsar.client.impl.conf.ConsumerConfigurationData}, which + * is what the v4 wire layer ships to the broker on subscribe. + */ +public class V5ConsumerSubscriptionPropertiesTest extends V5ClientBaseTest { + + @Test + public void testSubscriptionPropertiesPropagateToV4Consumer() throws Exception { + String topic = newScalableTopic(1); + Map props = Map.of("env", "prod", "team", "data-platform"); + + @Cleanup + QueueConsumer consumer = v5Client.newQueueConsumer(Schema.string()) + .topic(topic) + .subscriptionName("sub-props-test") + .subscriptionProperties(props) + .subscribe(); + + Map actual = readSubscriptionPropertiesFromV4(consumer); + assertEquals(actual, props, + "V5 subscriptionProperties must propagate to the v4 consumer config"); + } + + @Test + public void testSubscriptionPropertiesAppliesToEverySegment() throws Exception { + String topic = newScalableTopic(3); + Map props = Map.of("region", "us-east-1"); + + @Cleanup + QueueConsumer consumer = v5Client.newQueueConsumer(Schema.string()) + .topic(topic) + .subscriptionName("sub-props-multi-test") + .subscriptionProperties(props) + .subscribe(); + + Map segmentConsumers = readSegmentConsumers(consumer); + assertEquals(segmentConsumers.size(), 3, "expected one v4 consumer per segment"); + for (CompletableFuture future : asConsumerFutures(segmentConsumers)) { + Object v4Consumer = future.get(); + Map v4Props = readConfSubscriptionProperties(v4Consumer); + assertEquals(v4Props, props, + "every segment's v4 Consumer must carry the same subscriptionProperties"); + } + } + + // --- Helpers --- + + private static Map readSubscriptionPropertiesFromV4(QueueConsumer consumer) + throws Exception { + Map segmentConsumers = readSegmentConsumers(consumer); + assertEquals(segmentConsumers.size(), 1, "expected a single segment for this test"); + CompletableFuture future = asConsumerFutures(segmentConsumers).iterator().next(); + Object v4Consumer = future.get(); + return readConfSubscriptionProperties(v4Consumer); + } + + private static Map readSegmentConsumers(QueueConsumer consumer) throws Exception { + Field f = consumer.getClass().getDeclaredField("segmentConsumers"); + f.setAccessible(true); + Object map = f.get(consumer); + assertNotNull(map, "expected segmentConsumers map on V5 queue consumer"); + return (Map) map; + } + + @SuppressWarnings("unchecked") + private static Iterable> asConsumerFutures(Map segmentConsumers) { + return (Iterable>) (Iterable) segmentConsumers.values(); + } + + @SuppressWarnings("unchecked") + private static Map readConfSubscriptionProperties(Object v4Consumer) throws Exception { + // ConsumerBase#conf is protected; walk the class hierarchy. + Class c = v4Consumer.getClass(); + while (c != null) { + try { + Field f = c.getDeclaredField("conf"); + f.setAccessible(true); + Object conf = f.get(v4Consumer); + var getter = conf.getClass().getMethod("getSubscriptionProperties"); + return (Map) getter.invoke(conf); + } catch (NoSuchFieldException e) { + c = c.getSuperclass(); + } + } + throw new NoSuchFieldException("conf not found on " + v4Consumer.getClass()); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MessageReplicationClustersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MessageReplicationClustersTest.java new file mode 100644 index 0000000000000..0e3c1f624bf50 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MessageReplicationClustersTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api.v5; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import java.lang.reflect.Field; +import java.time.Duration; +import java.util.List; +import lombok.Cleanup; +import org.apache.pulsar.broker.service.SharedPulsarCluster; +import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.v5.schema.Schema; +import org.apache.pulsar.client.impl.MessageImpl; +import org.testng.annotations.Test; + +/** + * Coverage for {@link MessageBuilder#replicationClusters(java.util.List)}: the + * V5 setter must propagate to the v4 {@code TypedMessageBuilder} that V5 uses + * internally, so the cluster-restriction list lands in the message metadata + * the broker stores. + * + *

Verified by sending a V5 message with an explicit cluster restriction and + * reflecting into the V5 {@code MessageV5} wrapper to inspect the underlying + * v4 {@code MessageImpl.getReplicateTo()} — that's where the message metadata + * becomes observable. + */ +public class V5MessageReplicationClustersTest extends V5ClientBaseTest { + + @Test + public void testReplicationClustersPropagatesToMessageMetadata() throws Exception { + String topic = newScalableTopic(1); + + @Cleanup + Producer producer = v5Client.newProducer(Schema.string()) + .topic(topic) + .create(); + @Cleanup + QueueConsumer consumer = v5Client.newQueueConsumer(Schema.string()) + .topic(topic) + .subscriptionName("repl-clusters-watcher") + .subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST) + .subscribe(); + + // Restrict to the local cluster only — same effective behaviour as the + // namespace default, but exercises the explicit-cluster code path. + List clusters = List.of(SharedPulsarCluster.CLUSTER_NAME); + + producer.newMessage() + .value("hello") + .replicationClusters(clusters) + .send(); + + Message msg = consumer.receive(Duration.ofSeconds(5)); + assertNotNull(msg, "consumer should receive the produced message"); + + // Reach into the V5 MessageV5 wrapper to inspect the underlying v4 metadata. + MessageImpl v4Impl = readUnderlyingV4Message(msg); + assertEquals(v4Impl.getReplicateTo(), clusters, + "replicationClusters from V5 must land in the message metadata"); + } + + private static MessageImpl readUnderlyingV4Message(Message v5Msg) throws Exception { + Field f = v5Msg.getClass().getDeclaredField("v4Message"); + f.setAccessible(true); + Object v4Msg = f.get(v5Msg); + assertNotNull(v4Msg, "expected v4Message inside V5 MessageV5 wrapper"); + if (!(v4Msg instanceof MessageImpl)) { + throw new AssertionError("expected MessageImpl, got: " + v4Msg.getClass()); + } + return (MessageImpl) v4Msg; + } +}