diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ConsumerPriorityLevelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ConsumerPriorityLevelTest.java new file mode 100644 index 0000000000000..448c26eaaa270 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ConsumerPriorityLevelTest.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api.v5; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import java.time.Duration; +import java.util.HashSet; +import java.util.Set; +import lombok.Cleanup; +import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.v5.schema.Schema; +import org.testng.annotations.Test; + +/** + * Coverage for {@link QueueConsumerBuilder#priorityLevel(int)}: lower priority + * values dispatch first within a Shared subscription. With a low-priority + * consumer's prefetch fully populated, higher-priority levels see no traffic + * until the lower-priority consumer either ack-flushes or blocks on its queue. + * + *

Single-segment topic to keep the dispatch deterministic — V5 broker-side + * priority is enforced per-segment, so a multi-segment topic can fan messages + * across segments and obscure the priority ordering. + */ +public class V5ConsumerPriorityLevelTest extends V5ClientBaseTest { + + /** + * Two consumers, priority 0 (high) and priority 1 (low). With both subscribed + * before any traffic, every message produced should land on the high-priority + * consumer and none on the low-priority one — until the high consumer's + * prefetch is full, which we keep clear by acking each message. + */ + @Test + public void testHigherPriorityConsumerReceivesAllWhenPrefetchAvailable() throws Exception { + String topic = newScalableTopic(1); + + @Cleanup + QueueConsumer high = v5Client.newQueueConsumer(Schema.string()) + .topic(topic) + .subscriptionName("priority-sub") + .subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST) + .priorityLevel(0) + .receiverQueueSize(50) + .consumerName("high") + .subscribe(); + @Cleanup + QueueConsumer low = v5Client.newQueueConsumer(Schema.string()) + .topic(topic) + .subscriptionName("priority-sub") + .subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST) + .priorityLevel(1) + .receiverQueueSize(50) + .consumerName("low") + .subscribe(); + + @Cleanup + Producer producer = v5Client.newProducer(Schema.string()) + .topic(topic) + .create(); + + int n = 20; + Set sent = new HashSet<>(); + for (int i = 0; i < n; i++) { + String v = "msg-" + i; + producer.newMessage().key("k-" + i).value(v).send(); + sent.add(v); + } + + // High-priority consumer drains everything (acking as it goes so its + // prefetch never fills). The broker's priority dispatcher must hand all + // messages to the priority-0 consumer before considering priority-1. + Set received = new HashSet<>(); + for (int i = 0; i < n; i++) { + Message msg = high.receive(Duration.ofSeconds(5)); + assertNotNull(msg, "high-priority consumer must receive message #" + (i + 1)); + received.add(msg.value()); + high.acknowledge(msg.id()); + } + assertEquals(received, sent, "high-priority consumer must receive every message"); + + // Low-priority consumer must have seen nothing. + Message stragglers = low.receive(Duration.ofMillis(500)); + assertNull(stragglers, "low-priority consumer must not receive while high one is draining"); + } + + /** + * When the high-priority consumer pauses (no acks, prefetch fills), the + * broker overflows to the low-priority consumer for further dispatch. + */ + @Test + public void testLowerPriorityConsumerReceivesOverflow() throws Exception { + String topic = newScalableTopic(1); + int highReceiverQueue = 5; + + @Cleanup + QueueConsumer high = v5Client.newQueueConsumer(Schema.string()) + .topic(topic) + .subscriptionName("overflow-priority-sub") + .subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST) + .priorityLevel(0) + .receiverQueueSize(highReceiverQueue) + .consumerName("high-paused") + .subscribe(); + @Cleanup + QueueConsumer low = v5Client.newQueueConsumer(Schema.string()) + .topic(topic) + .subscriptionName("overflow-priority-sub") + .subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST) + .priorityLevel(1) + .receiverQueueSize(50) + .consumerName("low-active") + .subscribe(); + + @Cleanup + Producer producer = v5Client.newProducer(Schema.string()) + .topic(topic) + .create(); + + // Publish more than the high consumer's prefetch — once it's full, the + // remainder must spill to the low-priority consumer. + int total = highReceiverQueue * 4; + for (int i = 0; i < total; i++) { + producer.newMessage().key("k-" + i).value("v-" + i).send(); + } + + // Low-priority consumer must see the spillover (without "high" ever + // calling receive(), the broker treats it as having a full prefetch + // queue and routes onward). + Set lowSeen = new HashSet<>(); + long deadline = System.currentTimeMillis() + 10_000L; + while (lowSeen.size() < total - highReceiverQueue && System.currentTimeMillis() < deadline) { + Message msg = low.receive(Duration.ofMillis(500)); + if (msg != null) { + lowSeen.add(msg.value()); + low.acknowledge(msg.id()); + } + } + // Allow some tolerance — the boundary between "high prefetch full" and + // "broker spills to low" depends on broker timing. The minimum guarantee + // is that the low consumer receives strictly more than zero overflow. + assertEquals(lowSeen.size() >= total - highReceiverQueue * 2 - 1, true, + "low-priority consumer must see overflow once high consumer's prefetch fills, got: " + + lowSeen.size()); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ConsumerReceiverQueueSizeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ConsumerReceiverQueueSizeTest.java new file mode 100644 index 0000000000000..592e8369abcaf --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ConsumerReceiverQueueSizeTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api.v5; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import java.lang.reflect.Field; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import lombok.Cleanup; +import org.apache.pulsar.client.api.v5.schema.Schema; +import org.testng.annotations.Test; + +/** + * Coverage for {@link QueueConsumerBuilder#receiverQueueSize(int)}: the V5 builder + * must wire the user-supplied prefetch depth down to every per-segment v4 + * {@code ConsumerImpl}. Without this wiring, the V5 setting would be silently + * ignored — the v4 layer would default to its own receiver queue size and the + * caller would have no way of knowing. + * + *

Verified end-to-end by reaching into the V5 internals via reflection (the + * V5 {@link QueueConsumer} interface itself doesn't expose the v4 consumers, + * since end users never need them). + */ +public class V5ConsumerReceiverQueueSizeTest extends V5ClientBaseTest { + + @Test + public void testReceiverQueueSizePropagatesToV4Consumer() throws Exception { + String topic = newScalableTopic(1); + int requested = 17; + + @Cleanup + QueueConsumer consumer = v5Client.newQueueConsumer(Schema.string()) + .topic(topic) + .subscriptionName("rq-size-sub") + .receiverQueueSize(requested) + .subscribe(); + + int actual = readV4ReceiverQueueSize(consumer); + assertEquals(actual, requested, + "V5 receiverQueueSize must propagate to the per-segment v4 ConsumerImpl"); + } + + /** + * Multi-segment topic: every per-segment v4 consumer must inherit the same + * V5-configured prefetch depth, otherwise individual segments would buffer + * more than the user asked for. + */ + @Test + public void testReceiverQueueSizeAppliesToEverySegment() throws Exception { + String topic = newScalableTopic(3); + int requested = 9; + + @Cleanup + QueueConsumer consumer = v5Client.newQueueConsumer(Schema.string()) + .topic(topic) + .subscriptionName("rq-size-multi-sub") + .receiverQueueSize(requested) + .subscribe(); + + Map segmentConsumers = readSegmentConsumers(consumer); + assertEquals(segmentConsumers.size(), 3, "expected one v4 consumer per segment"); + for (CompletableFuture future : asConsumerFutures(segmentConsumers)) { + Object v4Consumer = future.get(); + int actual = invokeGetCurrentReceiverQueueSize(v4Consumer); + assertEquals(actual, requested, + "every segment's v4 ConsumerImpl must carry the same receiverQueueSize"); + } + } + + // --- Helpers --- + + private static int readV4ReceiverQueueSize(QueueConsumer consumer) throws Exception { + Map segmentConsumers = readSegmentConsumers(consumer); + assertEquals(segmentConsumers.size(), 1, "expected a single segment for this test"); + CompletableFuture future = asConsumerFutures(segmentConsumers).iterator().next(); + Object v4Consumer = future.get(); + return invokeGetCurrentReceiverQueueSize(v4Consumer); + } + + private static Map readSegmentConsumers(QueueConsumer consumer) throws Exception { + Field field = consumer.getClass().getDeclaredField("segmentConsumers"); + field.setAccessible(true); + Object map = field.get(consumer); + assertNotNull(map, "expected segmentConsumers map on V5 queue consumer"); + return (Map) map; + } + + @SuppressWarnings("unchecked") + private static Iterable> asConsumerFutures(Map segmentConsumers) { + return (Iterable>) (Iterable) segmentConsumers.values(); + } + + private static int invokeGetCurrentReceiverQueueSize(Object v4Consumer) throws Exception { + // Defined on org.apache.pulsar.client.impl.ConsumerBase — use reflection so the + // test doesn't have to import a non-API class. + var method = v4Consumer.getClass().getMethod("getCurrentReceiverQueueSize"); + return (int) method.invoke(v4Consumer); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ProducerFlowControlTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ProducerFlowControlTest.java new file mode 100644 index 0000000000000..d303c93a0318c --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ProducerFlowControlTest.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api.v5; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; +import java.lang.reflect.Field; +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import lombok.Cleanup; +import org.apache.pulsar.client.api.v5.schema.Schema; +import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; +import org.testng.annotations.Test; + +/** + * Coverage for {@link ProducerBuilder#sendTimeout(Duration)} and + * {@link ProducerBuilder#blockIfQueueFull(boolean)}: the V5 builder must wire + * each user-supplied flow-control knob down to every per-segment v4 + * {@code ProducerImpl}. Without this wiring, the V5 setting would be silently + * ignored — the v4 layer would default and the caller would have no way of + * knowing. + * + *

Behavioural verification of the actual timeout-firing and block-on-full + * paths lives in the v4 test suite (e.g. {@code SimpleProducerConsumerTest + * .testSendTimeout}); those tests stop the broker mid-send to force the + * pending-queue overflow / timeout, which the in-process shared cluster used + * here cannot do. The plumbing test suffices as a regression guard for the V5 + * → v4 mapping. + */ +public class V5ProducerFlowControlTest extends V5ClientBaseTest { + + @Test + public void testSendTimeoutPropagatesToV4Producer() throws Exception { + String topic = newScalableTopic(1); + Duration requested = Duration.ofSeconds(7); + + @Cleanup + Producer producer = v5Client.newProducer(Schema.string()) + .topic(topic) + .sendTimeout(requested) + .create(); + // V5 segment producers are created lazily on first send — produce a + // message so the per-segment v4 ProducerImpl exists to inspect. + producer.newMessage().value("warm-up").send(); + + ProducerConfigurationData conf = readV4ProducerConf(producer); + assertEquals(conf.getSendTimeoutMs(), requested.toMillis(), + "V5 sendTimeout must propagate to the per-segment v4 ProducerImpl"); + } + + @Test + public void testBlockIfQueueFullTrue() throws Exception { + String topic = newScalableTopic(1); + + @Cleanup + Producer producer = v5Client.newProducer(Schema.string()) + .topic(topic) + .blockIfQueueFull(true) + .create(); + producer.newMessage().value("warm-up").send(); + + ProducerConfigurationData conf = readV4ProducerConf(producer); + assertTrue(conf.isBlockIfQueueFull(), + "blockIfQueueFull(true) must propagate to the v4 ProducerImpl"); + } + + @Test + public void testBlockIfQueueFullFalse() throws Exception { + String topic = newScalableTopic(1); + + @Cleanup + Producer producer = v5Client.newProducer(Schema.string()) + .topic(topic) + .blockIfQueueFull(false) + .create(); + producer.newMessage().value("warm-up").send(); + + ProducerConfigurationData conf = readV4ProducerConf(producer); + assertFalse(conf.isBlockIfQueueFull(), + "blockIfQueueFull(false) must propagate to the v4 ProducerImpl"); + } + + /** + * Multi-segment topic: every per-segment v4 producer must inherit the same + * V5-configured sendTimeout, otherwise individual segments would honor + * different deadlines than the user asked for. + */ + @Test + public void testSendTimeoutAppliesToEverySegment() throws Exception { + String topic = newScalableTopic(3); + Duration requested = Duration.ofMillis(2_500); + + @Cleanup + Producer producer = v5Client.newProducer(Schema.string()) + .topic(topic) + .sendTimeout(requested) + .create(); + // Send across keys so every segment lazily materializes its v4 producer. + for (int i = 0; i < 30; i++) { + producer.newMessage().key("k-" + i).value("warm-" + i).send(); + } + + Map segmentProducers = readSegmentProducers(producer); + assertEquals(segmentProducers.size(), 3, "expected one v4 producer per segment"); + for (CompletableFuture future : asProducerFutures(segmentProducers)) { + Object v4Producer = future.get(); + ProducerConfigurationData conf = readConfField(v4Producer); + assertEquals(conf.getSendTimeoutMs(), requested.toMillis(), + "every segment's v4 ProducerImpl must carry the same sendTimeout"); + } + } + + // --- Helpers --- + + private static ProducerConfigurationData readV4ProducerConf(Producer producer) throws Exception { + Map segmentProducers = readSegmentProducers(producer); + assertEquals(segmentProducers.size(), 1, "expected a single segment for this test"); + CompletableFuture future = asProducerFutures(segmentProducers).iterator().next(); + Object v4Producer = future.get(); + return readConfField(v4Producer); + } + + private static Map readSegmentProducers(Producer producer) throws Exception { + Field field = producer.getClass().getDeclaredField("segmentProducers"); + field.setAccessible(true); + Object map = field.get(producer); + assertNotNull(map, "expected segmentProducers map on V5 producer"); + return (Map) map; + } + + @SuppressWarnings("unchecked") + private static Iterable> asProducerFutures(Map segmentProducers) { + return (Iterable>) (Iterable) segmentProducers.values(); + } + + private static ProducerConfigurationData readConfField(Object v4Producer) throws Exception { + // ProducerBase#conf is protected; walk the class hierarchy to find it. + Class c = v4Producer.getClass(); + while (c != null) { + try { + Field f = c.getDeclaredField("conf"); + f.setAccessible(true); + return (ProducerConfigurationData) f.get(v4Producer); + } catch (NoSuchFieldException e) { + c = c.getSuperclass(); + } + } + throw new NoSuchFieldException("conf not found on " + v4Producer.getClass()); + } +}