Skip to content

Commit dbe4502

Browse files
grishafcursoragent
andcommitted
feat: support null value messages (tombstones) for compacted topics
Add support for sending and detecting null value messages, which are used as tombstones on compacted topics to delete entries for specific keys. This wraps the C++ client's MessageBuilder::setNullValue() and Message::hasNullValue() APIs added in pulsar-client-cpp#563. Changes: - Add pybind11 bindings for set_null_value and has_null_value - Allow Producer.send(None) to produce a null value message - Add Message.has_null_value() to detect tombstone messages - Skip schema encoding when content is None (mirrors Java client) - Add integration tests for null values, compaction, and table view Requires pulsar-client-cpp >= 4.2.0 (not yet released). Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 7e74cd6 commit dbe4502

4 files changed

Lines changed: 167 additions & 6 deletions

File tree

dependencies.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
# under the License.
1818
#
1919

20-
pulsar-cpp: 4.1.0
20+
pulsar-cpp: 4.2.0
2121
pybind11: 3.0.1
2222
# The OpenSSL dependency is only used when building Python from source
2323
openssl: 1.1.1q

pulsar/__init__.py

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,19 @@ def producer_name(self) -> str:
366366
"""
367367
return self._message.producer_name()
368368

369+
def has_null_value(self) -> bool:
370+
"""
371+
Check if the message has a null value (tombstone).
372+
373+
Messages with null values are used on compacted topics to delete
374+
the entry for a specific key.
375+
376+
Returns
377+
----------
378+
True if the message has a null value, False otherwise.
379+
"""
380+
return self._message.has_null_value()
381+
369382
def encryption_context(self) -> EncryptionContext | None:
370383
"""
371384
Get the encryption context for this message or None if it's not encrypted.
@@ -1693,7 +1706,9 @@ def send(self, content,
16931706
----------
16941707
16951708
content:
1696-
A ``bytes`` object with the message payload.
1709+
A ``bytes`` object with the message payload, or ``None`` to send a null value
1710+
message (tombstone). Null value messages are used on compacted topics to delete
1711+
the entry for a specific key.
16971712
properties: optional
16981713
A dict of application-defined string properties.
16991714
partition_key: optional
@@ -1775,7 +1790,9 @@ def callback(res, msg_id):
17751790
----------
17761791
17771792
content
1778-
A `bytes` object with the message payload.
1793+
A ``bytes`` object with the message payload, or ``None`` to send a null value
1794+
message (tombstone). Null value messages are used on compacted topics to delete
1795+
the entry for a specific key.
17791796
callback
17801797
A callback that is invoked once the message has been acknowledged by the broker.
17811798
properties: optional
@@ -1823,9 +1840,12 @@ def close(self):
18231840
def _build_msg(self, content, properties, partition_key, ordering_key, sequence_id,
18241841
replication_clusters, disable_replication, event_timestamp,
18251842
deliver_at, deliver_after):
1826-
data = self._schema.encode(content)
1843+
if content is not None:
1844+
data = self._schema.encode(content)
1845+
_check_type(bytes, data, 'data')
1846+
else:
1847+
data = None
18271848

1828-
_check_type(bytes, data, 'data')
18291849
_check_type_or_none(dict, properties, 'properties')
18301850
_check_type_or_none(str, partition_key, 'partition_key')
18311851
_check_type_or_none(str, ordering_key, 'ordering_key')
@@ -1837,7 +1857,10 @@ def _build_msg(self, content, properties, partition_key, ordering_key, sequence_
18371857
_check_type_or_none(timedelta, deliver_after, 'deliver_after')
18381858

18391859
mb = _pulsar.MessageBuilder()
1840-
mb.content(data)
1860+
if data is not None:
1861+
mb.content(data)
1862+
else:
1863+
mb.set_null_value()
18411864
if properties:
18421865
for k, v in properties.items():
18431866
mb.property(k, v)

src/message.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ void export_message(py::module_& m) {
4646
.def("event_timestamp", &MessageBuilder::setEventTimestamp, return_value_policy::reference)
4747
.def("replication_clusters", &MessageBuilder::setReplicationClusters, return_value_policy::reference)
4848
.def("disable_replication", &MessageBuilder::disableReplication, return_value_policy::reference)
49+
.def("set_null_value", &MessageBuilder::setNullValue, return_value_policy::reference)
4950
.def("build", &MessageBuilder::build);
5051

5152
class_<MessageId>(m, "MessageId")
@@ -121,6 +122,7 @@ void export_message(py::module_& m) {
121122
.def("int_schema_version", &Message::getLongSchemaVersion)
122123
.def("schema_version", &Message::getSchemaVersion, return_value_policy::copy)
123124
.def("producer_name", &Message::getProducerName, return_value_policy::copy)
125+
.def("has_null_value", &Message::hasNullValue)
124126
.def("encryption_context", &Message::getEncryptionContext, return_value_policy::reference);
125127

126128
MessageBatch& (MessageBatch::*MessageBatchParseFromString)(const std::string& payload,

tests/pulsar_test.py

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2136,6 +2136,142 @@ def router(msg: pulsar.Message, num_partitions: int):
21362136

21372137
client.close()
21382138

2139+
def test_null_value_message(self):
2140+
client = Client(self.serviceUrl)
2141+
topic = "null-value-%s" % uuid.uuid4()
2142+
producer = client.create_producer(topic, batching_enabled=False)
2143+
consumer = client.subscribe(topic, "sub", initial_position=InitialPosition.Earliest)
2144+
2145+
producer.send(b"not null", partition_key="k1")
2146+
producer.send(None, partition_key="k2")
2147+
producer.send(b"also not null", partition_key="k3")
2148+
2149+
msg1 = consumer.receive(TM)
2150+
self.assertEqual(msg1.data(), b"not null")
2151+
self.assertFalse(msg1.has_null_value())
2152+
2153+
msg2 = consumer.receive(TM)
2154+
self.assertTrue(msg2.has_null_value())
2155+
self.assertEqual(msg2.data(), b"")
2156+
2157+
msg3 = consumer.receive(TM)
2158+
self.assertEqual(msg3.data(), b"also not null")
2159+
self.assertFalse(msg3.has_null_value())
2160+
2161+
consumer.close()
2162+
producer.close()
2163+
client.close()
2164+
2165+
def test_null_value_vs_empty_bytes(self):
2166+
client = Client(self.serviceUrl)
2167+
topic = "null-vs-empty-%s" % uuid.uuid4()
2168+
producer = client.create_producer(topic, batching_enabled=False)
2169+
consumer = client.subscribe(topic, "sub", initial_position=InitialPosition.Earliest)
2170+
2171+
producer.send(b"", partition_key="k1")
2172+
producer.send(None, partition_key="k2")
2173+
2174+
msg1 = consumer.receive(TM)
2175+
self.assertFalse(msg1.has_null_value())
2176+
self.assertEqual(msg1.data(), b"")
2177+
2178+
msg2 = consumer.receive(TM)
2179+
self.assertTrue(msg2.has_null_value())
2180+
2181+
consumer.close()
2182+
producer.close()
2183+
client.close()
2184+
2185+
def test_null_value_compaction(self):
2186+
client = Client(self.serviceUrl)
2187+
topic = "null-compact-%s" % uuid.uuid4()
2188+
producer = client.create_producer(topic, batching_enabled=False)
2189+
2190+
consumer = client.subscribe(topic, "my-sub1", is_read_compacted=True)
2191+
consumer.close()
2192+
2193+
# key1: value then tombstone -> removed after compaction
2194+
producer.send(b"hello-1", partition_key="key1")
2195+
producer.send(None, partition_key="key1")
2196+
2197+
# key2: value only -> survives
2198+
producer.send(b"hello-2", partition_key="key2")
2199+
2200+
# key3: value then tombstone -> removed
2201+
producer.send(b"hello-3", partition_key="key3")
2202+
producer.send(None, partition_key="key3")
2203+
2204+
# key4: value only -> survives
2205+
producer.send(b"hello-4", partition_key="key4")
2206+
producer.close()
2207+
2208+
url = "%s/admin/v2/persistent/public/default/%s/compaction" % (self.adminUrl, topic)
2209+
doHttpPut(url, "")
2210+
while True:
2211+
s = doHttpGet(url).decode("utf-8")
2212+
if "RUNNING" in s:
2213+
time.sleep(0.2)
2214+
else:
2215+
self.assertTrue("SUCCESS" in s)
2216+
break
2217+
2218+
consumer = client.subscribe(topic, "my-sub1", is_read_compacted=True)
2219+
messages = []
2220+
while True:
2221+
try:
2222+
msg = consumer.receive(2000)
2223+
messages.append(msg)
2224+
except pulsar.Timeout:
2225+
break
2226+
2227+
keys = [m.partition_key() for m in messages]
2228+
self.assertIn("key2", keys)
2229+
self.assertIn("key4", keys)
2230+
self.assertNotIn("key1", keys)
2231+
self.assertNotIn("key3", keys)
2232+
self.assertEqual(len(messages), 2)
2233+
2234+
consumer.close()
2235+
client.close()
2236+
2237+
def test_null_value_table_view(self):
2238+
client = Client(self.serviceUrl)
2239+
topic = "null-tv-%s" % uuid.uuid4()
2240+
producer = client.create_producer(topic, batching_enabled=False)
2241+
2242+
producer.send(b"hello", partition_key="key1")
2243+
2244+
tv = client.create_table_view(topic)
2245+
self.assertEqual(tv.get("key1"), b"hello")
2246+
2247+
producer.send(None, partition_key="key1")
2248+
for _ in range(50):
2249+
if tv.get("key1") is None:
2250+
break
2251+
time.sleep(0.1)
2252+
self.assertIsNone(tv.get("key1"))
2253+
2254+
tv.close()
2255+
producer.close()
2256+
client.close()
2257+
2258+
def test_null_value_with_properties(self):
2259+
client = Client(self.serviceUrl)
2260+
topic = "null-props-%s" % uuid.uuid4()
2261+
producer = client.create_producer(topic, batching_enabled=False)
2262+
consumer = client.subscribe(topic, "sub", initial_position=InitialPosition.Earliest)
2263+
2264+
producer.send(None, partition_key="k1", properties={"action": "delete"})
2265+
2266+
msg = consumer.receive(TM)
2267+
self.assertTrue(msg.has_null_value())
2268+
self.assertEqual(msg.properties(), {"action": "delete"})
2269+
self.assertEqual(msg.partition_key(), "k1")
2270+
2271+
consumer.close()
2272+
producer.close()
2273+
client.close()
2274+
21392275

21402276
if __name__ == "__main__":
21412277
main()

0 commit comments

Comments
 (0)