diff --git a/pulsar/__init__.py b/pulsar/__init__.py index c1195de7..0007e61d 100644 --- a/pulsar/__init__.py +++ b/pulsar/__init__.py @@ -46,7 +46,7 @@ import _pulsar from _pulsar import Result, CompressionType, ConsumerType, InitialPosition, PartitionsRoutingMode, BatchingType, \ - LoggerLevel # noqa: F401 + LoggerLevel, BatchReceivePolicy # noqa: F401 from pulsar.exceptions import * @@ -657,7 +657,8 @@ def subscribe(self, topic, subscription_name, replicate_subscription_state_enabled=False, max_pending_chunked_message=10, auto_ack_oldest_chunked_message_on_queue_full=False, - start_message_id_inclusive=False + start_message_id_inclusive=False, + batch_receive_policy=None ): """ Subscribe to the given topic and subscription combination. @@ -740,6 +741,8 @@ def my_listener(consumer, message): if autoAckOldestChunkedMessageOnQueueFull is true else it marks them for redelivery. start_message_id_inclusive: bool, default=False Set the consumer to include the given position of any reset operation like Consumer::seek. + batch_receive_policy: class ConsumerBatchReceivePolicy + Set the batch collection policy for batch receiving. """ _check_type(str, subscription_name, 'subscription_name') _check_type(ConsumerType, consumer_type, 'consumer_type') @@ -759,6 +762,7 @@ def my_listener(consumer, message): _check_type(int, max_pending_chunked_message, 'max_pending_chunked_message') _check_type(bool, auto_ack_oldest_chunked_message_on_queue_full, 'auto_ack_oldest_chunked_message_on_queue_full') _check_type(bool, start_message_id_inclusive, 'start_message_id_inclusive') + _check_type_or_none(ConsumerBatchReceivePolicy, batch_receive_policy, 'batch_receive_policy') conf = _pulsar.ConsumerConfiguration() conf.consumer_type(consumer_type) @@ -788,6 +792,8 @@ def my_listener(consumer, message): conf.max_pending_chunked_message(max_pending_chunked_message) conf.auto_ack_oldest_chunked_message_on_queue_full(auto_ack_oldest_chunked_message_on_queue_full) conf.start_message_id_inclusive(start_message_id_inclusive) + if batch_receive_policy: + conf.batch_receive_policy(batch_receive_policy.policy()) c = Consumer() if isinstance(topic, str): @@ -1237,6 +1243,20 @@ def receive(self, timeout_millis=None): m._schema = self._schema return m + def batch_receive(self): + """ + Batch receiving messages. + + This calls blocks until has enough messages or wait timeout, more details to see {@link BatchReceivePolicy}. + """ + messages = [] + msgs = self._consumer.batch_receive() + for msg in msgs: + m = Message() + m._message = msg + messages.append(m) + return messages + def acknowledge(self, message): """ Acknowledge the reception of a single message. @@ -1354,6 +1374,32 @@ def get_last_message_id(self): """ return self._consumer.get_last_message_id() +class ConsumerBatchReceivePolicy: + """ + Batch receive policy can limit the number and bytes of messages in a single batch, + and can specify a timeout for waiting for enough messages for this batch. + + A batch receive action is completed as long as any one of the conditions (the batch has enough number + or size of messages, or the waiting timeout is passed) are met. + """ + def __init__(self, max_num_message, max_num_bytes, timeout_ms): + """ + Wrapper BatchReceivePolicy. + + Parameters + ---------- + + max_num_message: Max num message, if less than 0, it means no limit. default: -1 + max_num_bytes: Max num bytes, if less than 0, it means no limit. default: 10 * 1024 * 1024 + timeout_ms: If less than 0, it means no limit. default: 100 + """ + self._policy = BatchReceivePolicy(max_num_message, max_num_bytes, timeout_ms) + + def policy(self): + """ + Returns the actual one BatchReceivePolicy. + """ + return self._policy class Reader: """ diff --git a/src/config.cc b/src/config.cc index d2ed1035..f2b71879 100644 --- a/src/config.cc +++ b/src/config.cc @@ -253,6 +253,11 @@ void export_config() { .def("encryption_key", &ProducerConfiguration::addEncryptionKey, return_self<>()) .def("crypto_key_reader", &ProducerConfiguration_setCryptoKeyReader, return_self<>()); + class_("BatchReceivePolicy", init()) + .def("getTimeoutMs", &BatchReceivePolicy::getTimeoutMs) + .def("getMaxNumMessages", &BatchReceivePolicy::getMaxNumMessages) + .def("getMaxNumBytes", &BatchReceivePolicy::getMaxNumBytes); + class_("ConsumerConfiguration") .def("consumer_type", &ConsumerConfiguration::getConsumerType) .def("consumer_type", &ConsumerConfiguration::setConsumerType, return_self<>()) @@ -267,6 +272,8 @@ void export_config() { &ConsumerConfiguration::setMaxTotalReceiverQueueSizeAcrossPartitions) .def("consumer_name", &ConsumerConfiguration::getConsumerName, return_value_policy()) + .def("batch_receive_policy", &ConsumerConfiguration::getBatchReceivePolicy, return_value_policy()) + .def("batch_receive_policy", &ConsumerConfiguration::setBatchReceivePolicy) .def("consumer_name", &ConsumerConfiguration::setConsumerName) .def("unacked_messages_timeout_ms", &ConsumerConfiguration::getUnAckedMessagesTimeoutMs) .def("unacked_messages_timeout_ms", &ConsumerConfiguration::setUnAckedMessagesTimeoutMs) diff --git a/src/consumer.cc b/src/consumer.cc index 811ceb3d..5298fae8 100644 --- a/src/consumer.cc +++ b/src/consumer.cc @@ -42,6 +42,15 @@ Message Consumer_receive_timeout(Consumer& consumer, int timeoutMs) { return msg; } +Messages Consumer_batch_receive(Consumer& consumer) { + Messages msgs; + Result res; + Py_BEGIN_ALLOW_THREADS res = consumer.batchReceive(msgs); + Py_END_ALLOW_THREADS + CHECK_RESULT(res); + return msgs; +} + void Consumer_acknowledge(Consumer& consumer, const Message& msg) { consumer.acknowledgeAsync(msg, nullptr); } void Consumer_acknowledge_message_id(Consumer& consumer, const MessageId& msgId) { @@ -103,6 +112,7 @@ void export_consumer() { .def("unsubscribe", &Consumer_unsubscribe) .def("receive", &Consumer_receive) .def("receive", &Consumer_receive_timeout) + .def("batch_receive", &Consumer_batch_receive) .def("acknowledge", &Consumer_acknowledge) .def("acknowledge", &Consumer_acknowledge_message_id) .def("acknowledge_cumulative", &Consumer_acknowledge_cumulative) diff --git a/tests/pulsar_test.py b/tests/pulsar_test.py index d0f1ba05..30f451d9 100755 --- a/tests/pulsar_test.py +++ b/tests/pulsar_test.py @@ -39,6 +39,7 @@ AuthenticationToken, InitialPosition, CryptoKeyReader, + ConsumerBatchReceivePolicy, ) from pulsar.schema import JsonSchema, Record, Integer @@ -1064,6 +1065,27 @@ def test_topics_pattern_consumer(self): consumer.receive(100) client.close() + def test_batch_receive(self): + client = Client(self.serviceUrl) + topic = "my-python-topic-batch-receive-" + str(time.time()) + consumer = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.Shared, + start_message_id_inclusive=True, batch_receive_policy=ConsumerBatchReceivePolicy(10, -1, -1)) + producer = client.create_producer(topic) + + + for i in range(10): + if i > 0: + time.sleep(0.02) + producer.send(b"hello-%d" % i) + + msgs = consumer.batch_receive() + i = 0 + for msg in msgs: + self.assertEqual(msg.data(), b"hello-%d" % i) + i += 1 + + client.close() + def test_message_id(self): s = MessageId.earliest.serialize() self.assertEqual(MessageId.deserialize(s), MessageId.earliest)