From 8c6dfbeb41a4c9a2e6f154fb71705a98a2eb4a0b Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Thu, 10 Nov 2022 10:09:00 +0800 Subject: [PATCH 1/2] [feat] Support conumser batch receive. --- pulsar/__init__.py | 32 ++++++++++++++++++++++++++++++-- src/config.cc | 7 +++++++ src/consumer.cc | 10 ++++++++++ tests/pulsar_test.py | 23 ++++++++++++++++++++++- 4 files changed, 69 insertions(+), 3 deletions(-) diff --git a/pulsar/__init__.py b/pulsar/__init__.py index c1195de7..c88537bd 100644 --- a/pulsar/__init__.py +++ b/pulsar/__init__.py @@ -46,7 +46,7 @@ import _pulsar from _pulsar import Result, CompressionType, ConsumerType, InitialPosition, PartitionsRoutingMode, BatchingType, \ - LoggerLevel # noqa: F401 + LoggerLevel, BatchReceivePolicy # noqa: F401 from pulsar.exceptions import * @@ -657,7 +657,8 @@ def subscribe(self, topic, subscription_name, replicate_subscription_state_enabled=False, max_pending_chunked_message=10, auto_ack_oldest_chunked_message_on_queue_full=False, - start_message_id_inclusive=False + start_message_id_inclusive=False, + batch_receive_policy=None ): """ Subscribe to the given topic and subscription combination. @@ -740,6 +741,16 @@ def my_listener(consumer, message): if autoAckOldestChunkedMessageOnQueueFull is true else it marks them for redelivery. start_message_id_inclusive: bool, default=False Set the consumer to include the given position of any reset operation like Consumer::seek. + batch_receive_policy: class BatchReceivePolicy, Constructor parameters (in order): + : param maxNumMessage: Max num message, if less than 0, it means no limit. default: -1 + : param maxNumBytes: Max num bytes, if less than 0, it means no limit. default: 10 * 1024 * 1024 + : param timeoutMs: If less than 0, it means no limit. default: 100 + + Batch receive policy can limit the number and bytes of messages in a single batch, + and can specify a timeout for waiting for enough messages for this batch. + + A batch receive action is completed as long as any one of the conditions (the batch has enough number + or size of messages, or the waiting timeout is passed) are met. """ _check_type(str, subscription_name, 'subscription_name') _check_type(ConsumerType, consumer_type, 'consumer_type') @@ -759,6 +770,7 @@ def my_listener(consumer, message): _check_type(int, max_pending_chunked_message, 'max_pending_chunked_message') _check_type(bool, auto_ack_oldest_chunked_message_on_queue_full, 'auto_ack_oldest_chunked_message_on_queue_full') _check_type(bool, start_message_id_inclusive, 'start_message_id_inclusive') + _check_type_or_none(BatchReceivePolicy, batch_receive_policy, 'batch_receive_policy') conf = _pulsar.ConsumerConfiguration() conf.consumer_type(consumer_type) @@ -788,6 +800,8 @@ def my_listener(consumer, message): conf.max_pending_chunked_message(max_pending_chunked_message) conf.auto_ack_oldest_chunked_message_on_queue_full(auto_ack_oldest_chunked_message_on_queue_full) conf.start_message_id_inclusive(start_message_id_inclusive) + if batch_receive_policy: + conf.batch_receive_policy(batch_receive_policy) c = Consumer() if isinstance(topic, str): @@ -1237,6 +1251,20 @@ def receive(self, timeout_millis=None): m._schema = self._schema return m + def batch_receive(self): + """ + Batch receiving messages. + + This calls blocks until has enough messages or wait timeout, more details to see {@link BatchReceivePolicy}. + """ + messages = [] + msgs = self._consumer.batch_receive() + for msg in msgs: + m = Message() + m._message = msg + messages.append(m) + return messages + def acknowledge(self, message): """ Acknowledge the reception of a single message. diff --git a/src/config.cc b/src/config.cc index d2ed1035..f2b71879 100644 --- a/src/config.cc +++ b/src/config.cc @@ -253,6 +253,11 @@ void export_config() { .def("encryption_key", &ProducerConfiguration::addEncryptionKey, return_self<>()) .def("crypto_key_reader", &ProducerConfiguration_setCryptoKeyReader, return_self<>()); + class_("BatchReceivePolicy", init()) + .def("getTimeoutMs", &BatchReceivePolicy::getTimeoutMs) + .def("getMaxNumMessages", &BatchReceivePolicy::getMaxNumMessages) + .def("getMaxNumBytes", &BatchReceivePolicy::getMaxNumBytes); + class_("ConsumerConfiguration") .def("consumer_type", &ConsumerConfiguration::getConsumerType) .def("consumer_type", &ConsumerConfiguration::setConsumerType, return_self<>()) @@ -267,6 +272,8 @@ void export_config() { &ConsumerConfiguration::setMaxTotalReceiverQueueSizeAcrossPartitions) .def("consumer_name", &ConsumerConfiguration::getConsumerName, return_value_policy()) + .def("batch_receive_policy", &ConsumerConfiguration::getBatchReceivePolicy, return_value_policy()) + .def("batch_receive_policy", &ConsumerConfiguration::setBatchReceivePolicy) .def("consumer_name", &ConsumerConfiguration::setConsumerName) .def("unacked_messages_timeout_ms", &ConsumerConfiguration::getUnAckedMessagesTimeoutMs) .def("unacked_messages_timeout_ms", &ConsumerConfiguration::setUnAckedMessagesTimeoutMs) diff --git a/src/consumer.cc b/src/consumer.cc index 811ceb3d..5298fae8 100644 --- a/src/consumer.cc +++ b/src/consumer.cc @@ -42,6 +42,15 @@ Message Consumer_receive_timeout(Consumer& consumer, int timeoutMs) { return msg; } +Messages Consumer_batch_receive(Consumer& consumer) { + Messages msgs; + Result res; + Py_BEGIN_ALLOW_THREADS res = consumer.batchReceive(msgs); + Py_END_ALLOW_THREADS + CHECK_RESULT(res); + return msgs; +} + void Consumer_acknowledge(Consumer& consumer, const Message& msg) { consumer.acknowledgeAsync(msg, nullptr); } void Consumer_acknowledge_message_id(Consumer& consumer, const MessageId& msgId) { @@ -103,6 +112,7 @@ void export_consumer() { .def("unsubscribe", &Consumer_unsubscribe) .def("receive", &Consumer_receive) .def("receive", &Consumer_receive_timeout) + .def("batch_receive", &Consumer_batch_receive) .def("acknowledge", &Consumer_acknowledge) .def("acknowledge", &Consumer_acknowledge_message_id) .def("acknowledge_cumulative", &Consumer_acknowledge_cumulative) diff --git a/tests/pulsar_test.py b/tests/pulsar_test.py index d0f1ba05..ed365444 100755 --- a/tests/pulsar_test.py +++ b/tests/pulsar_test.py @@ -42,7 +42,7 @@ ) from pulsar.schema import JsonSchema, Record, Integer -from _pulsar import ProducerConfiguration, ConsumerConfiguration +from _pulsar import ProducerConfiguration, ConsumerConfiguration, BatchReceivePolicy from schema_test import * @@ -1064,6 +1064,27 @@ def test_topics_pattern_consumer(self): consumer.receive(100) client.close() + def test_batch_receive(self): + client = Client(self.serviceUrl) + topic = "my-python-topic-batch-receive-" + str(time.time()) + consumer = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.Shared, + start_message_id_inclusive=True, batch_receive_policy=BatchReceivePolicy(10, -1, -1)) + producer = client.create_producer(topic) + + + for i in range(10): + if i > 0: + time.sleep(0.02) + producer.send(b"hello-%d" % i) + + msgs = consumer.batch_receive() + i = 0 + for msg in msgs: + self.assertEqual(msg.data(), b"hello-%d" % i) + i += 1 + + client.close() + def test_message_id(self): s = MessageId.earliest.serialize() self.assertEqual(MessageId.deserialize(s), MessageId.earliest) From f5f4e852154e7a4bf6c2fba42dad8c7f01361c41 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Thu, 10 Nov 2022 23:22:31 +0800 Subject: [PATCH 2/2] Wrapper BatchReceivePolicy. --- pulsar/__init__.py | 42 ++++++++++++++++++++++++++++++------------ tests/pulsar_test.py | 5 +++-- 2 files changed, 33 insertions(+), 14 deletions(-) diff --git a/pulsar/__init__.py b/pulsar/__init__.py index c88537bd..0007e61d 100644 --- a/pulsar/__init__.py +++ b/pulsar/__init__.py @@ -741,16 +741,8 @@ def my_listener(consumer, message): if autoAckOldestChunkedMessageOnQueueFull is true else it marks them for redelivery. start_message_id_inclusive: bool, default=False Set the consumer to include the given position of any reset operation like Consumer::seek. - batch_receive_policy: class BatchReceivePolicy, Constructor parameters (in order): - : param maxNumMessage: Max num message, if less than 0, it means no limit. default: -1 - : param maxNumBytes: Max num bytes, if less than 0, it means no limit. default: 10 * 1024 * 1024 - : param timeoutMs: If less than 0, it means no limit. default: 100 - - Batch receive policy can limit the number and bytes of messages in a single batch, - and can specify a timeout for waiting for enough messages for this batch. - - A batch receive action is completed as long as any one of the conditions (the batch has enough number - or size of messages, or the waiting timeout is passed) are met. + batch_receive_policy: class ConsumerBatchReceivePolicy + Set the batch collection policy for batch receiving. """ _check_type(str, subscription_name, 'subscription_name') _check_type(ConsumerType, consumer_type, 'consumer_type') @@ -770,7 +762,7 @@ def my_listener(consumer, message): _check_type(int, max_pending_chunked_message, 'max_pending_chunked_message') _check_type(bool, auto_ack_oldest_chunked_message_on_queue_full, 'auto_ack_oldest_chunked_message_on_queue_full') _check_type(bool, start_message_id_inclusive, 'start_message_id_inclusive') - _check_type_or_none(BatchReceivePolicy, batch_receive_policy, 'batch_receive_policy') + _check_type_or_none(ConsumerBatchReceivePolicy, batch_receive_policy, 'batch_receive_policy') conf = _pulsar.ConsumerConfiguration() conf.consumer_type(consumer_type) @@ -801,7 +793,7 @@ def my_listener(consumer, message): conf.auto_ack_oldest_chunked_message_on_queue_full(auto_ack_oldest_chunked_message_on_queue_full) conf.start_message_id_inclusive(start_message_id_inclusive) if batch_receive_policy: - conf.batch_receive_policy(batch_receive_policy) + conf.batch_receive_policy(batch_receive_policy.policy()) c = Consumer() if isinstance(topic, str): @@ -1382,6 +1374,32 @@ def get_last_message_id(self): """ return self._consumer.get_last_message_id() +class ConsumerBatchReceivePolicy: + """ + Batch receive policy can limit the number and bytes of messages in a single batch, + and can specify a timeout for waiting for enough messages for this batch. + + A batch receive action is completed as long as any one of the conditions (the batch has enough number + or size of messages, or the waiting timeout is passed) are met. + """ + def __init__(self, max_num_message, max_num_bytes, timeout_ms): + """ + Wrapper BatchReceivePolicy. + + Parameters + ---------- + + max_num_message: Max num message, if less than 0, it means no limit. default: -1 + max_num_bytes: Max num bytes, if less than 0, it means no limit. default: 10 * 1024 * 1024 + timeout_ms: If less than 0, it means no limit. default: 100 + """ + self._policy = BatchReceivePolicy(max_num_message, max_num_bytes, timeout_ms) + + def policy(self): + """ + Returns the actual one BatchReceivePolicy. + """ + return self._policy class Reader: """ diff --git a/tests/pulsar_test.py b/tests/pulsar_test.py index ed365444..30f451d9 100755 --- a/tests/pulsar_test.py +++ b/tests/pulsar_test.py @@ -39,10 +39,11 @@ AuthenticationToken, InitialPosition, CryptoKeyReader, + ConsumerBatchReceivePolicy, ) from pulsar.schema import JsonSchema, Record, Integer -from _pulsar import ProducerConfiguration, ConsumerConfiguration, BatchReceivePolicy +from _pulsar import ProducerConfiguration, ConsumerConfiguration from schema_test import * @@ -1068,7 +1069,7 @@ def test_batch_receive(self): client = Client(self.serviceUrl) topic = "my-python-topic-batch-receive-" + str(time.time()) consumer = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.Shared, - start_message_id_inclusive=True, batch_receive_policy=BatchReceivePolicy(10, -1, -1)) + start_message_id_inclusive=True, batch_receive_policy=ConsumerBatchReceivePolicy(10, -1, -1)) producer = client.create_producer(topic)