From c3d0834e9e6c77d15b4526f4cd0e2a52e8bbcfb4 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Tue, 5 May 2026 19:33:01 -0700 Subject: [PATCH] [test] PIP-468: V5 plumbing-knob test coverage MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pin the V5 → v4 contract for the remaining low-risk plumbing knobs that the audit flagged as untested. Each setter is a one-line forwarder to the underlying v4 config; without these tests, a regression that broke the mapping would ship silently. V5ClientBuilderConfigTest: - listenerName, description, memoryLimit, openTelemetry. Each propagates from the V5 builder to the wrapped v4 ClientConfigurationData (verified via reflection on PulsarClientV5#v4Client + getConfiguration()). V5ClientLifecycleTest: - closeAsync() flips the v4 client to closed. - shutdown() delegates to v4 without throwing and is idempotent. (Note: v4's shutdown() is the "fast" path — it stops executors and releases connections but deliberately does not flip the client's state to Closed, so isClosed() can still return false. The contract for V5 here is just "delegate to v4 without throwing"; observable post-shutdown behaviour is v4's responsibility and is exercised by the v4 test suite.) - newTransactionAsync() resolves to a Transaction in OPEN state. V5ConsumerSubscriptionPropertiesTest: - subscriptionProperties(Map) propagates to every per-segment v4 Consumer via ConsumerConfigurationData.getSubscriptionProperties() (single-segment + 3-segment topics). V5MessageReplicationClustersTest: - MessageBuilder.replicationClusters(List) lands in the v4 message metadata. Verified by sending via V5, receiving via V5, and reflecting through the V5 MessageV5 wrapper to call MessageImpl.getReplicateTo() on the underlying v4 message. --- .../api/v5/V5ClientBuilderConfigTest.java | 109 ++++++++++++++++ .../client/api/v5/V5ClientLifecycleTest.java | 93 ++++++++++++++ .../V5ConsumerSubscriptionPropertiesTest.java | 118 ++++++++++++++++++ .../v5/V5MessageReplicationClustersTest.java | 89 +++++++++++++ 4 files changed, 409 insertions(+) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ClientBuilderConfigTest.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ClientLifecycleTest.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ConsumerSubscriptionPropertiesTest.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MessageReplicationClustersTest.java diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ClientBuilderConfigTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ClientBuilderConfigTest.java new file mode 100644 index 0000000000000..6cd4200058ff1 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ClientBuilderConfigTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api.v5; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertSame; +import io.opentelemetry.api.OpenTelemetry; +import java.lang.reflect.Field; +import lombok.Cleanup; +import org.apache.pulsar.client.api.v5.config.MemorySize; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.testng.annotations.Test; + +/** + * Coverage for the V5 {@link PulsarClientBuilder} configuration knobs that don't + * have observable end-to-end behaviour against a single in-process broker: + * {@code listenerName}, {@code description}, {@code memoryLimit}, and + * {@code openTelemetry}. These all just plumb a value into the underlying v4 + * {@code ClientConfigurationData} — so the tests reflect into the wrapped v4 + * client and assert the value made it through. + * + *

If any of these setters silently dropped the value, the only way it would + * surface today is by a user reporting that their telemetry collector / broker + * listener / memory cap doesn't apply. These tests pin the contract. + */ +public class V5ClientBuilderConfigTest extends V5ClientBaseTest { + + @Test + public void testListenerNamePropagates() throws Exception { + @Cleanup + PulsarClient client = PulsarClient.builder() + .serviceUrl(getBrokerServiceUrl()) + .listenerName("internal") + .build(); + + ClientConfigurationData conf = readV4Conf(client); + assertEquals(conf.getListenerName(), "internal", + "listenerName must propagate to the underlying v4 client config"); + } + + @Test + public void testDescriptionPropagates() throws Exception { + @Cleanup + PulsarClient client = PulsarClient.builder() + .serviceUrl(getBrokerServiceUrl()) + .description("v5-test-client") + .build(); + + ClientConfigurationData conf = readV4Conf(client); + assertEquals(conf.getDescription(), "v5-test-client", + "description must propagate to the underlying v4 client config"); + } + + @Test + public void testMemoryLimitPropagates() throws Exception { + @Cleanup + PulsarClient client = PulsarClient.builder() + .serviceUrl(getBrokerServiceUrl()) + .memoryLimit(MemorySize.ofMegabytes(64)) + .build(); + + ClientConfigurationData conf = readV4Conf(client); + assertEquals(conf.getMemoryLimitBytes(), 64L * 1024 * 1024, + "memoryLimit must propagate to the underlying v4 client config"); + } + + @Test + public void testOpenTelemetryPropagates() throws Exception { + OpenTelemetry custom = OpenTelemetry.noop(); + @Cleanup + PulsarClient client = PulsarClient.builder() + .serviceUrl(getBrokerServiceUrl()) + .openTelemetry(custom) + .build(); + + ClientConfigurationData conf = readV4Conf(client); + // Same instance — the v4 layer does not clone or wrap the OpenTelemetry handle. + assertSame(conf.getOpenTelemetry(), custom, + "openTelemetry instance must be the exact one the user supplied"); + } + + // --- Helpers --- + + private static ClientConfigurationData readV4Conf(PulsarClient v5Client) throws Exception { + Field f = v5Client.getClass().getDeclaredField("v4Client"); + f.setAccessible(true); + Object v4Client = f.get(v5Client); + assertNotNull(v4Client, "expected v4Client on V5 PulsarClient"); + return ((PulsarClientImpl) v4Client).getConfiguration(); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ClientLifecycleTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ClientLifecycleTest.java new file mode 100644 index 0000000000000..57dc8977aa5e4 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ClientLifecycleTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api.v5; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; +import java.lang.reflect.Field; +import java.time.Duration; +import org.apache.pulsar.client.api.v5.config.TransactionPolicy; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.testng.annotations.Test; + +/** + * Coverage for V5 {@link PulsarClient} lifecycle entry points that the + * existing test suite doesn't exercise: {@link PulsarClient#closeAsync()}, + * {@link PulsarClient#shutdown()}, and {@link PulsarClient#newTransactionAsync()}. + * + *

The synchronous {@code close()} and {@code newTransaction()} are already + * exercised heavily by every other V5 test (close via {@code @Cleanup} / + * {@code @AfterClass}, transactions via {@link V5TransactionTest}). The async / + * shutdown variants share most of the v4 plumbing, but had no direct coverage + * — these tests pin the contract. + */ +public class V5ClientLifecycleTest extends V5ClientBaseTest { + + @Test + public void testCloseAsyncCompletes() throws Exception { + PulsarClient client = newV5Client(); + Object v4Client = readField(client, "v4Client"); + assertTrue(v4Client instanceof PulsarClientImpl, + "expected v4Client to be a PulsarClientImpl, got " + v4Client.getClass()); + + client.closeAsync().get(10, java.util.concurrent.TimeUnit.SECONDS); + assertTrue(((PulsarClientImpl) v4Client).isClosed(), + "underlying v4 client must be closed after closeAsync()"); + } + + @Test + public void testShutdownDelegatesToV4() throws Exception { + // shutdown() is the v4 "fast" path: stops executors, releases connections, + // but deliberately does not flip the client's state to Closed (so + // isClosed() can still return false). The contract for V5 here is just + // "delegate to v4 without throwing"; observable post-shutdown behaviour + // is v4's responsibility and is exercised by the v4 test suite. This + // test pins the V5 → v4 delegation. + PulsarClient client = newV5Client(); + client.shutdown(); + // Calling shutdown() again must remain side-effect-free and not throw. + client.shutdown(); + } + + @Test + public void testNewTransactionAsyncReturnsOpenTransaction() throws Exception { + // Need a client with transactions enabled — the shared v5Client doesn't. + PulsarClient client = track(PulsarClient.builder() + .serviceUrl(getBrokerServiceUrl()) + .transactionPolicy(TransactionPolicy.builder().timeout(Duration.ofMinutes(1)).build()) + .build()); + + Transaction txn = client.newTransactionAsync() + .get(10, java.util.concurrent.TimeUnit.SECONDS); + assertNotNull(txn, "newTransactionAsync() future must resolve to a Transaction"); + assertEquals(txn.state(), Transaction.State.OPEN, + "freshly opened transaction must be in OPEN state"); + // Clean up — abort to leave no dangling txn on the broker. + txn.abort(); + } + + // --- Helpers --- + + private static Object readField(Object target, String name) throws Exception { + Field f = target.getClass().getDeclaredField(name); + f.setAccessible(true); + return f.get(target); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ConsumerSubscriptionPropertiesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ConsumerSubscriptionPropertiesTest.java new file mode 100644 index 0000000000000..719e8fb3a1bc4 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ConsumerSubscriptionPropertiesTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api.v5; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import java.lang.reflect.Field; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import lombok.Cleanup; +import org.apache.pulsar.client.api.v5.schema.Schema; +import org.testng.annotations.Test; + +/** + * Coverage for {@link QueueConsumerBuilder#subscriptionProperties(Map)}: the V5 + * setter must propagate to every per-segment v4 {@code Consumer}'s + * {@link org.apache.pulsar.client.impl.conf.ConsumerConfigurationData}, which + * is what the v4 wire layer ships to the broker on subscribe. + */ +public class V5ConsumerSubscriptionPropertiesTest extends V5ClientBaseTest { + + @Test + public void testSubscriptionPropertiesPropagateToV4Consumer() throws Exception { + String topic = newScalableTopic(1); + Map props = Map.of("env", "prod", "team", "data-platform"); + + @Cleanup + QueueConsumer consumer = v5Client.newQueueConsumer(Schema.string()) + .topic(topic) + .subscriptionName("sub-props-test") + .subscriptionProperties(props) + .subscribe(); + + Map actual = readSubscriptionPropertiesFromV4(consumer); + assertEquals(actual, props, + "V5 subscriptionProperties must propagate to the v4 consumer config"); + } + + @Test + public void testSubscriptionPropertiesAppliesToEverySegment() throws Exception { + String topic = newScalableTopic(3); + Map props = Map.of("region", "us-east-1"); + + @Cleanup + QueueConsumer consumer = v5Client.newQueueConsumer(Schema.string()) + .topic(topic) + .subscriptionName("sub-props-multi-test") + .subscriptionProperties(props) + .subscribe(); + + Map segmentConsumers = readSegmentConsumers(consumer); + assertEquals(segmentConsumers.size(), 3, "expected one v4 consumer per segment"); + for (CompletableFuture future : asConsumerFutures(segmentConsumers)) { + Object v4Consumer = future.get(); + Map v4Props = readConfSubscriptionProperties(v4Consumer); + assertEquals(v4Props, props, + "every segment's v4 Consumer must carry the same subscriptionProperties"); + } + } + + // --- Helpers --- + + private static Map readSubscriptionPropertiesFromV4(QueueConsumer consumer) + throws Exception { + Map segmentConsumers = readSegmentConsumers(consumer); + assertEquals(segmentConsumers.size(), 1, "expected a single segment for this test"); + CompletableFuture future = asConsumerFutures(segmentConsumers).iterator().next(); + Object v4Consumer = future.get(); + return readConfSubscriptionProperties(v4Consumer); + } + + private static Map readSegmentConsumers(QueueConsumer consumer) throws Exception { + Field f = consumer.getClass().getDeclaredField("segmentConsumers"); + f.setAccessible(true); + Object map = f.get(consumer); + assertNotNull(map, "expected segmentConsumers map on V5 queue consumer"); + return (Map) map; + } + + @SuppressWarnings("unchecked") + private static Iterable> asConsumerFutures(Map segmentConsumers) { + return (Iterable>) (Iterable) segmentConsumers.values(); + } + + @SuppressWarnings("unchecked") + private static Map readConfSubscriptionProperties(Object v4Consumer) throws Exception { + // ConsumerBase#conf is protected; walk the class hierarchy. + Class c = v4Consumer.getClass(); + while (c != null) { + try { + Field f = c.getDeclaredField("conf"); + f.setAccessible(true); + Object conf = f.get(v4Consumer); + var getter = conf.getClass().getMethod("getSubscriptionProperties"); + return (Map) getter.invoke(conf); + } catch (NoSuchFieldException e) { + c = c.getSuperclass(); + } + } + throw new NoSuchFieldException("conf not found on " + v4Consumer.getClass()); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MessageReplicationClustersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MessageReplicationClustersTest.java new file mode 100644 index 0000000000000..0e3c1f624bf50 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MessageReplicationClustersTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api.v5; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import java.lang.reflect.Field; +import java.time.Duration; +import java.util.List; +import lombok.Cleanup; +import org.apache.pulsar.broker.service.SharedPulsarCluster; +import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.v5.schema.Schema; +import org.apache.pulsar.client.impl.MessageImpl; +import org.testng.annotations.Test; + +/** + * Coverage for {@link MessageBuilder#replicationClusters(java.util.List)}: the + * V5 setter must propagate to the v4 {@code TypedMessageBuilder} that V5 uses + * internally, so the cluster-restriction list lands in the message metadata + * the broker stores. + * + *

Verified by sending a V5 message with an explicit cluster restriction and + * reflecting into the V5 {@code MessageV5} wrapper to inspect the underlying + * v4 {@code MessageImpl.getReplicateTo()} — that's where the message metadata + * becomes observable. + */ +public class V5MessageReplicationClustersTest extends V5ClientBaseTest { + + @Test + public void testReplicationClustersPropagatesToMessageMetadata() throws Exception { + String topic = newScalableTopic(1); + + @Cleanup + Producer producer = v5Client.newProducer(Schema.string()) + .topic(topic) + .create(); + @Cleanup + QueueConsumer consumer = v5Client.newQueueConsumer(Schema.string()) + .topic(topic) + .subscriptionName("repl-clusters-watcher") + .subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST) + .subscribe(); + + // Restrict to the local cluster only — same effective behaviour as the + // namespace default, but exercises the explicit-cluster code path. + List clusters = List.of(SharedPulsarCluster.CLUSTER_NAME); + + producer.newMessage() + .value("hello") + .replicationClusters(clusters) + .send(); + + Message msg = consumer.receive(Duration.ofSeconds(5)); + assertNotNull(msg, "consumer should receive the produced message"); + + // Reach into the V5 MessageV5 wrapper to inspect the underlying v4 metadata. + MessageImpl v4Impl = readUnderlyingV4Message(msg); + assertEquals(v4Impl.getReplicateTo(), clusters, + "replicationClusters from V5 must land in the message metadata"); + } + + private static MessageImpl readUnderlyingV4Message(Message v5Msg) throws Exception { + Field f = v5Msg.getClass().getDeclaredField("v4Message"); + f.setAccessible(true); + Object v4Msg = f.get(v5Msg); + assertNotNull(v4Msg, "expected v4Message inside V5 MessageV5 wrapper"); + if (!(v4Msg instanceof MessageImpl)) { + throw new AssertionError("expected MessageImpl, got: " + v4Msg.getClass()); + } + return (MessageImpl) v4Msg; + } +}