Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api.v5;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import java.time.Duration;
import java.util.HashSet;
import java.util.Set;
import lombok.Cleanup;
import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.v5.schema.Schema;
import org.testng.annotations.Test;

/**
* Coverage for {@link QueueConsumerBuilder#priorityLevel(int)}: lower priority
* values dispatch first within a Shared subscription. With a low-priority
* consumer's prefetch fully populated, higher-priority levels see no traffic
* until the lower-priority consumer either ack-flushes or blocks on its queue.
*
* <p>Single-segment topic to keep the dispatch deterministic — V5 broker-side
* priority is enforced per-segment, so a multi-segment topic can fan messages
* across segments and obscure the priority ordering.
*/
public class V5ConsumerPriorityLevelTest extends V5ClientBaseTest {

/**
* Two consumers, priority 0 (high) and priority 1 (low). With both subscribed
* before any traffic, every message produced should land on the high-priority
* consumer and none on the low-priority one — until the high consumer's
* prefetch is full, which we keep clear by acking each message.
*/
@Test
public void testHigherPriorityConsumerReceivesAllWhenPrefetchAvailable() throws Exception {
String topic = newScalableTopic(1);

@Cleanup
QueueConsumer<String> high = v5Client.newQueueConsumer(Schema.string())
.topic(topic)
.subscriptionName("priority-sub")
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
.priorityLevel(0)
.receiverQueueSize(50)
.consumerName("high")
.subscribe();
@Cleanup
QueueConsumer<String> low = v5Client.newQueueConsumer(Schema.string())
.topic(topic)
.subscriptionName("priority-sub")
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
.priorityLevel(1)
.receiverQueueSize(50)
.consumerName("low")
.subscribe();

@Cleanup
Producer<String> producer = v5Client.newProducer(Schema.string())
.topic(topic)
.create();

int n = 20;
Set<String> sent = new HashSet<>();
for (int i = 0; i < n; i++) {
String v = "msg-" + i;
producer.newMessage().key("k-" + i).value(v).send();
sent.add(v);
}

// High-priority consumer drains everything (acking as it goes so its
// prefetch never fills). The broker's priority dispatcher must hand all
// messages to the priority-0 consumer before considering priority-1.
Set<String> received = new HashSet<>();
for (int i = 0; i < n; i++) {
Message<String> msg = high.receive(Duration.ofSeconds(5));
assertNotNull(msg, "high-priority consumer must receive message #" + (i + 1));
received.add(msg.value());
high.acknowledge(msg.id());
}
assertEquals(received, sent, "high-priority consumer must receive every message");

// Low-priority consumer must have seen nothing.
Message<String> stragglers = low.receive(Duration.ofMillis(500));
assertNull(stragglers, "low-priority consumer must not receive while high one is draining");
}

/**
* When the high-priority consumer pauses (no acks, prefetch fills), the
* broker overflows to the low-priority consumer for further dispatch.
*/
@Test
public void testLowerPriorityConsumerReceivesOverflow() throws Exception {
String topic = newScalableTopic(1);
int highReceiverQueue = 5;

@Cleanup
QueueConsumer<String> high = v5Client.newQueueConsumer(Schema.string())
.topic(topic)
.subscriptionName("overflow-priority-sub")
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
.priorityLevel(0)
.receiverQueueSize(highReceiverQueue)
.consumerName("high-paused")
.subscribe();
@Cleanup
QueueConsumer<String> low = v5Client.newQueueConsumer(Schema.string())
.topic(topic)
.subscriptionName("overflow-priority-sub")
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
.priorityLevel(1)
.receiverQueueSize(50)
.consumerName("low-active")
.subscribe();

@Cleanup
Producer<String> producer = v5Client.newProducer(Schema.string())
.topic(topic)
.create();

// Publish more than the high consumer's prefetch — once it's full, the
// remainder must spill to the low-priority consumer.
int total = highReceiverQueue * 4;
for (int i = 0; i < total; i++) {
producer.newMessage().key("k-" + i).value("v-" + i).send();
}

// Low-priority consumer must see the spillover (without "high" ever
// calling receive(), the broker treats it as having a full prefetch
// queue and routes onward).
Set<String> lowSeen = new HashSet<>();
long deadline = System.currentTimeMillis() + 10_000L;
while (lowSeen.size() < total - highReceiverQueue && System.currentTimeMillis() < deadline) {
Message<String> msg = low.receive(Duration.ofMillis(500));
if (msg != null) {
lowSeen.add(msg.value());
low.acknowledge(msg.id());
}
}
// Allow some tolerance — the boundary between "high prefetch full" and
// "broker spills to low" depends on broker timing. The minimum guarantee
// is that the low consumer receives strictly more than zero overflow.
assertEquals(lowSeen.size() >= total - highReceiverQueue * 2 - 1, true,
"low-priority consumer must see overflow once high consumer's prefetch fills, got: "
+ lowSeen.size());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api.v5;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import java.lang.reflect.Field;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import lombok.Cleanup;
import org.apache.pulsar.client.api.v5.schema.Schema;
import org.testng.annotations.Test;

/**
* Coverage for {@link QueueConsumerBuilder#receiverQueueSize(int)}: the V5 builder
* must wire the user-supplied prefetch depth down to every per-segment v4
* {@code ConsumerImpl}. Without this wiring, the V5 setting would be silently
* ignored — the v4 layer would default to its own receiver queue size and the
* caller would have no way of knowing.
*
* <p>Verified end-to-end by reaching into the V5 internals via reflection (the
* V5 {@link QueueConsumer} interface itself doesn't expose the v4 consumers,
* since end users never need them).
*/
public class V5ConsumerReceiverQueueSizeTest extends V5ClientBaseTest {

@Test
public void testReceiverQueueSizePropagatesToV4Consumer() throws Exception {
String topic = newScalableTopic(1);
int requested = 17;

@Cleanup
QueueConsumer<String> consumer = v5Client.newQueueConsumer(Schema.string())
.topic(topic)
.subscriptionName("rq-size-sub")
.receiverQueueSize(requested)
.subscribe();

int actual = readV4ReceiverQueueSize(consumer);
assertEquals(actual, requested,
"V5 receiverQueueSize must propagate to the per-segment v4 ConsumerImpl");
}

/**
* Multi-segment topic: every per-segment v4 consumer must inherit the same
* V5-configured prefetch depth, otherwise individual segments would buffer
* more than the user asked for.
*/
@Test
public void testReceiverQueueSizeAppliesToEverySegment() throws Exception {
String topic = newScalableTopic(3);
int requested = 9;

@Cleanup
QueueConsumer<String> consumer = v5Client.newQueueConsumer(Schema.string())
.topic(topic)
.subscriptionName("rq-size-multi-sub")
.receiverQueueSize(requested)
.subscribe();

Map<Long, ?> segmentConsumers = readSegmentConsumers(consumer);
assertEquals(segmentConsumers.size(), 3, "expected one v4 consumer per segment");
for (CompletableFuture<?> future : asConsumerFutures(segmentConsumers)) {
Object v4Consumer = future.get();
int actual = invokeGetCurrentReceiverQueueSize(v4Consumer);
assertEquals(actual, requested,
"every segment's v4 ConsumerImpl must carry the same receiverQueueSize");
}
}

// --- Helpers ---

private static int readV4ReceiverQueueSize(QueueConsumer<?> consumer) throws Exception {
Map<Long, ?> segmentConsumers = readSegmentConsumers(consumer);
assertEquals(segmentConsumers.size(), 1, "expected a single segment for this test");
CompletableFuture<?> future = asConsumerFutures(segmentConsumers).iterator().next();
Object v4Consumer = future.get();
return invokeGetCurrentReceiverQueueSize(v4Consumer);
}

private static Map<Long, ?> readSegmentConsumers(QueueConsumer<?> consumer) throws Exception {
Field field = consumer.getClass().getDeclaredField("segmentConsumers");
field.setAccessible(true);
Object map = field.get(consumer);
assertNotNull(map, "expected segmentConsumers map on V5 queue consumer");
return (Map<Long, ?>) map;
}

@SuppressWarnings("unchecked")
private static Iterable<CompletableFuture<?>> asConsumerFutures(Map<Long, ?> segmentConsumers) {
return (Iterable<CompletableFuture<?>>) (Iterable<?>) segmentConsumers.values();
}

private static int invokeGetCurrentReceiverQueueSize(Object v4Consumer) throws Exception {
// Defined on org.apache.pulsar.client.impl.ConsumerBase — use reflection so the
// test doesn't have to import a non-API class.
var method = v4Consumer.getClass().getMethod("getCurrentReceiverQueueSize");
return (int) method.invoke(v4Consumer);
}
}
Loading
Loading