diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ConsumerPriorityLevelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ConsumerPriorityLevelTest.java
new file mode 100644
index 0000000000000..448c26eaaa270
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ConsumerPriorityLevelTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Set;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for {@link QueueConsumerBuilder#priorityLevel(int)}: lower priority
+ * values dispatch first within a Shared subscription. With a low-priority
+ * consumer's prefetch fully populated, higher-priority levels see no traffic
+ * until the lower-priority consumer either ack-flushes or blocks on its queue.
+ *
+ *
Single-segment topic to keep the dispatch deterministic — V5 broker-side
+ * priority is enforced per-segment, so a multi-segment topic can fan messages
+ * across segments and obscure the priority ordering.
+ */
+public class V5ConsumerPriorityLevelTest extends V5ClientBaseTest {
+
+ /**
+ * Two consumers, priority 0 (high) and priority 1 (low). With both subscribed
+ * before any traffic, every message produced should land on the high-priority
+ * consumer and none on the low-priority one — until the high consumer's
+ * prefetch is full, which we keep clear by acking each message.
+ */
+ @Test
+ public void testHigherPriorityConsumerReceivesAllWhenPrefetchAvailable() throws Exception {
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ QueueConsumer high = v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("priority-sub")
+ .subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .priorityLevel(0)
+ .receiverQueueSize(50)
+ .consumerName("high")
+ .subscribe();
+ @Cleanup
+ QueueConsumer low = v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("priority-sub")
+ .subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .priorityLevel(1)
+ .receiverQueueSize(50)
+ .consumerName("low")
+ .subscribe();
+
+ @Cleanup
+ Producer producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+
+ int n = 20;
+ Set sent = new HashSet<>();
+ for (int i = 0; i < n; i++) {
+ String v = "msg-" + i;
+ producer.newMessage().key("k-" + i).value(v).send();
+ sent.add(v);
+ }
+
+ // High-priority consumer drains everything (acking as it goes so its
+ // prefetch never fills). The broker's priority dispatcher must hand all
+ // messages to the priority-0 consumer before considering priority-1.
+ Set received = new HashSet<>();
+ for (int i = 0; i < n; i++) {
+ Message msg = high.receive(Duration.ofSeconds(5));
+ assertNotNull(msg, "high-priority consumer must receive message #" + (i + 1));
+ received.add(msg.value());
+ high.acknowledge(msg.id());
+ }
+ assertEquals(received, sent, "high-priority consumer must receive every message");
+
+ // Low-priority consumer must have seen nothing.
+ Message stragglers = low.receive(Duration.ofMillis(500));
+ assertNull(stragglers, "low-priority consumer must not receive while high one is draining");
+ }
+
+ /**
+ * When the high-priority consumer pauses (no acks, prefetch fills), the
+ * broker overflows to the low-priority consumer for further dispatch.
+ */
+ @Test
+ public void testLowerPriorityConsumerReceivesOverflow() throws Exception {
+ String topic = newScalableTopic(1);
+ int highReceiverQueue = 5;
+
+ @Cleanup
+ QueueConsumer high = v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("overflow-priority-sub")
+ .subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .priorityLevel(0)
+ .receiverQueueSize(highReceiverQueue)
+ .consumerName("high-paused")
+ .subscribe();
+ @Cleanup
+ QueueConsumer low = v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("overflow-priority-sub")
+ .subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .priorityLevel(1)
+ .receiverQueueSize(50)
+ .consumerName("low-active")
+ .subscribe();
+
+ @Cleanup
+ Producer producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+
+ // Publish more than the high consumer's prefetch — once it's full, the
+ // remainder must spill to the low-priority consumer.
+ int total = highReceiverQueue * 4;
+ for (int i = 0; i < total; i++) {
+ producer.newMessage().key("k-" + i).value("v-" + i).send();
+ }
+
+ // Low-priority consumer must see the spillover (without "high" ever
+ // calling receive(), the broker treats it as having a full prefetch
+ // queue and routes onward).
+ Set lowSeen = new HashSet<>();
+ long deadline = System.currentTimeMillis() + 10_000L;
+ while (lowSeen.size() < total - highReceiverQueue && System.currentTimeMillis() < deadline) {
+ Message msg = low.receive(Duration.ofMillis(500));
+ if (msg != null) {
+ lowSeen.add(msg.value());
+ low.acknowledge(msg.id());
+ }
+ }
+ // Allow some tolerance — the boundary between "high prefetch full" and
+ // "broker spills to low" depends on broker timing. The minimum guarantee
+ // is that the low consumer receives strictly more than zero overflow.
+ assertEquals(lowSeen.size() >= total - highReceiverQueue * 2 - 1, true,
+ "low-priority consumer must see overflow once high consumer's prefetch fills, got: "
+ + lowSeen.size());
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ConsumerReceiverQueueSizeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ConsumerReceiverQueueSizeTest.java
new file mode 100644
index 0000000000000..592e8369abcaf
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ConsumerReceiverQueueSizeTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for {@link QueueConsumerBuilder#receiverQueueSize(int)}: the V5 builder
+ * must wire the user-supplied prefetch depth down to every per-segment v4
+ * {@code ConsumerImpl}. Without this wiring, the V5 setting would be silently
+ * ignored — the v4 layer would default to its own receiver queue size and the
+ * caller would have no way of knowing.
+ *
+ * Verified end-to-end by reaching into the V5 internals via reflection (the
+ * V5 {@link QueueConsumer} interface itself doesn't expose the v4 consumers,
+ * since end users never need them).
+ */
+public class V5ConsumerReceiverQueueSizeTest extends V5ClientBaseTest {
+
+ @Test
+ public void testReceiverQueueSizePropagatesToV4Consumer() throws Exception {
+ String topic = newScalableTopic(1);
+ int requested = 17;
+
+ @Cleanup
+ QueueConsumer consumer = v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("rq-size-sub")
+ .receiverQueueSize(requested)
+ .subscribe();
+
+ int actual = readV4ReceiverQueueSize(consumer);
+ assertEquals(actual, requested,
+ "V5 receiverQueueSize must propagate to the per-segment v4 ConsumerImpl");
+ }
+
+ /**
+ * Multi-segment topic: every per-segment v4 consumer must inherit the same
+ * V5-configured prefetch depth, otherwise individual segments would buffer
+ * more than the user asked for.
+ */
+ @Test
+ public void testReceiverQueueSizeAppliesToEverySegment() throws Exception {
+ String topic = newScalableTopic(3);
+ int requested = 9;
+
+ @Cleanup
+ QueueConsumer consumer = v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("rq-size-multi-sub")
+ .receiverQueueSize(requested)
+ .subscribe();
+
+ Map segmentConsumers = readSegmentConsumers(consumer);
+ assertEquals(segmentConsumers.size(), 3, "expected one v4 consumer per segment");
+ for (CompletableFuture> future : asConsumerFutures(segmentConsumers)) {
+ Object v4Consumer = future.get();
+ int actual = invokeGetCurrentReceiverQueueSize(v4Consumer);
+ assertEquals(actual, requested,
+ "every segment's v4 ConsumerImpl must carry the same receiverQueueSize");
+ }
+ }
+
+ // --- Helpers ---
+
+ private static int readV4ReceiverQueueSize(QueueConsumer> consumer) throws Exception {
+ Map segmentConsumers = readSegmentConsumers(consumer);
+ assertEquals(segmentConsumers.size(), 1, "expected a single segment for this test");
+ CompletableFuture> future = asConsumerFutures(segmentConsumers).iterator().next();
+ Object v4Consumer = future.get();
+ return invokeGetCurrentReceiverQueueSize(v4Consumer);
+ }
+
+ private static Map readSegmentConsumers(QueueConsumer> consumer) throws Exception {
+ Field field = consumer.getClass().getDeclaredField("segmentConsumers");
+ field.setAccessible(true);
+ Object map = field.get(consumer);
+ assertNotNull(map, "expected segmentConsumers map on V5 queue consumer");
+ return (Map) map;
+ }
+
+ @SuppressWarnings("unchecked")
+ private static Iterable> asConsumerFutures(Map segmentConsumers) {
+ return (Iterable>) (Iterable>) segmentConsumers.values();
+ }
+
+ private static int invokeGetCurrentReceiverQueueSize(Object v4Consumer) throws Exception {
+ // Defined on org.apache.pulsar.client.impl.ConsumerBase — use reflection so the
+ // test doesn't have to import a non-API class.
+ var method = v4Consumer.getClass().getMethod("getCurrentReceiverQueueSize");
+ return (int) method.invoke(v4Consumer);
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ProducerFlowControlTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ProducerFlowControlTest.java
new file mode 100644
index 0000000000000..d303c93a0318c
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ProducerFlowControlTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import java.lang.reflect.Field;
+import java.time.Duration;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for {@link ProducerBuilder#sendTimeout(Duration)} and
+ * {@link ProducerBuilder#blockIfQueueFull(boolean)}: the V5 builder must wire
+ * each user-supplied flow-control knob down to every per-segment v4
+ * {@code ProducerImpl}. Without this wiring, the V5 setting would be silently
+ * ignored — the v4 layer would default and the caller would have no way of
+ * knowing.
+ *
+ * Behavioural verification of the actual timeout-firing and block-on-full
+ * paths lives in the v4 test suite (e.g. {@code SimpleProducerConsumerTest
+ * .testSendTimeout}); those tests stop the broker mid-send to force the
+ * pending-queue overflow / timeout, which the in-process shared cluster used
+ * here cannot do. The plumbing test suffices as a regression guard for the V5
+ * → v4 mapping.
+ */
+public class V5ProducerFlowControlTest extends V5ClientBaseTest {
+
+ @Test
+ public void testSendTimeoutPropagatesToV4Producer() throws Exception {
+ String topic = newScalableTopic(1);
+ Duration requested = Duration.ofSeconds(7);
+
+ @Cleanup
+ Producer producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .sendTimeout(requested)
+ .create();
+ // V5 segment producers are created lazily on first send — produce a
+ // message so the per-segment v4 ProducerImpl exists to inspect.
+ producer.newMessage().value("warm-up").send();
+
+ ProducerConfigurationData conf = readV4ProducerConf(producer);
+ assertEquals(conf.getSendTimeoutMs(), requested.toMillis(),
+ "V5 sendTimeout must propagate to the per-segment v4 ProducerImpl");
+ }
+
+ @Test
+ public void testBlockIfQueueFullTrue() throws Exception {
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .blockIfQueueFull(true)
+ .create();
+ producer.newMessage().value("warm-up").send();
+
+ ProducerConfigurationData conf = readV4ProducerConf(producer);
+ assertTrue(conf.isBlockIfQueueFull(),
+ "blockIfQueueFull(true) must propagate to the v4 ProducerImpl");
+ }
+
+ @Test
+ public void testBlockIfQueueFullFalse() throws Exception {
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .blockIfQueueFull(false)
+ .create();
+ producer.newMessage().value("warm-up").send();
+
+ ProducerConfigurationData conf = readV4ProducerConf(producer);
+ assertFalse(conf.isBlockIfQueueFull(),
+ "blockIfQueueFull(false) must propagate to the v4 ProducerImpl");
+ }
+
+ /**
+ * Multi-segment topic: every per-segment v4 producer must inherit the same
+ * V5-configured sendTimeout, otherwise individual segments would honor
+ * different deadlines than the user asked for.
+ */
+ @Test
+ public void testSendTimeoutAppliesToEverySegment() throws Exception {
+ String topic = newScalableTopic(3);
+ Duration requested = Duration.ofMillis(2_500);
+
+ @Cleanup
+ Producer producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .sendTimeout(requested)
+ .create();
+ // Send across keys so every segment lazily materializes its v4 producer.
+ for (int i = 0; i < 30; i++) {
+ producer.newMessage().key("k-" + i).value("warm-" + i).send();
+ }
+
+ Map segmentProducers = readSegmentProducers(producer);
+ assertEquals(segmentProducers.size(), 3, "expected one v4 producer per segment");
+ for (CompletableFuture> future : asProducerFutures(segmentProducers)) {
+ Object v4Producer = future.get();
+ ProducerConfigurationData conf = readConfField(v4Producer);
+ assertEquals(conf.getSendTimeoutMs(), requested.toMillis(),
+ "every segment's v4 ProducerImpl must carry the same sendTimeout");
+ }
+ }
+
+ // --- Helpers ---
+
+ private static ProducerConfigurationData readV4ProducerConf(Producer> producer) throws Exception {
+ Map segmentProducers = readSegmentProducers(producer);
+ assertEquals(segmentProducers.size(), 1, "expected a single segment for this test");
+ CompletableFuture> future = asProducerFutures(segmentProducers).iterator().next();
+ Object v4Producer = future.get();
+ return readConfField(v4Producer);
+ }
+
+ private static Map readSegmentProducers(Producer> producer) throws Exception {
+ Field field = producer.getClass().getDeclaredField("segmentProducers");
+ field.setAccessible(true);
+ Object map = field.get(producer);
+ assertNotNull(map, "expected segmentProducers map on V5 producer");
+ return (Map) map;
+ }
+
+ @SuppressWarnings("unchecked")
+ private static Iterable> asProducerFutures(Map segmentProducers) {
+ return (Iterable>) (Iterable>) segmentProducers.values();
+ }
+
+ private static ProducerConfigurationData readConfField(Object v4Producer) throws Exception {
+ // ProducerBase#conf is protected; walk the class hierarchy to find it.
+ Class> c = v4Producer.getClass();
+ while (c != null) {
+ try {
+ Field f = c.getDeclaredField("conf");
+ f.setAccessible(true);
+ return (ProducerConfigurationData) f.get(v4Producer);
+ } catch (NoSuchFieldException e) {
+ c = c.getSuperclass();
+ }
+ }
+ throw new NoSuchFieldException("conf not found on " + v4Producer.getClass());
+ }
+}