Skip to content

Commit f2c0fec

Browse files
feat: expose replicated_from proto field to Message (#583)
1 parent 0dab2ba commit f2c0fec

6 files changed

Lines changed: 57 additions & 0 deletions

File tree

include/pulsar/Message.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,14 @@ class PULSAR_PUBLIC Message {
220220
*/
221221
const std::string& getProducerName() const noexcept;
222222

223+
/**
224+
* Get the source cluster from which the message was replicated.
225+
*
226+
* @return the optional pointer to the source cluster name if the message was replicated, the pointer is
227+
* valid as the Message instance is alive
228+
*/
229+
std::optional<const std::string*> getReplicatedFrom() const;
230+
223231
/**
224232
* @return the optional encryption context that is present when the message is encrypted, the pointer is
225233
* valid as the Message instance is alive

include/pulsar/c/message.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,15 @@ PULSAR_PUBLIC void pulsar_message_set_schema_version(pulsar_message_t *message,
230230
*/
231231
PULSAR_PUBLIC const char *pulsar_message_get_producer_name(pulsar_message_t *message);
232232

233+
/**
234+
* Get the source cluster from which the message was replicated.
235+
*
236+
* The pointer points to internal storage owned by the message wrapper, so the caller should not free it.
237+
*
238+
* @return the source cluster name, or NULL if the message is not replicated
239+
*/
240+
PULSAR_PUBLIC const char *pulsar_message_get_replicated_from(pulsar_message_t *message);
241+
233242
/**
234243
* Check if the message has a null value.
235244
*

lib/Message.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,13 @@ const std::string& Message::getProducerName() const noexcept {
239239
return impl_->metadata.producer_name();
240240
}
241241

242+
std::optional<const std::string*> Message::getReplicatedFrom() const {
243+
if (!impl_ || !impl_->metadata.has_replicated_from()) {
244+
return std::nullopt;
245+
}
246+
return &impl_->metadata.replicated_from();
247+
}
248+
242249
std::optional<const EncryptionContext*> Message::getEncryptionContext() const {
243250
if (!impl_ || !impl_->encryptionContext_.has_value()) {
244251
return std::nullopt;

lib/c/c_Message.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,4 +151,9 @@ const char *pulsar_message_get_producer_name(pulsar_message_t *message) {
151151
return message->message.getProducerName().c_str();
152152
}
153153

154+
const char *pulsar_message_get_replicated_from(pulsar_message_t *message) {
155+
const auto replicatedFrom = message->message.getReplicatedFrom();
156+
return replicatedFrom ? replicatedFrom.value()->c_str() : nullptr;
157+
}
158+
154159
int pulsar_message_has_null_value(pulsar_message_t *message) { return message->message.hasNullValue(); }

tests/MessageTest.cc

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
#include <string>
2424

25+
#include "PulsarFriend.h"
2526
#include "lib/MessageImpl.h"
2627

2728
using namespace pulsar;
@@ -154,6 +155,17 @@ TEST(MessageTest, testGetTopicNameOnProducerMessage) {
154155
ASSERT_TRUE(msg.getTopicName().empty());
155156
}
156157

158+
TEST(MessageTest, testReplicationMetadataAccessors) {
159+
auto msg = MessageBuilder().setContent("test").build();
160+
ASSERT_FALSE(msg.getReplicatedFrom().has_value());
161+
162+
PulsarFriend::getMessageMetadata(msg).set_replicated_from("us-west1");
163+
164+
const auto replicatedFrom = msg.getReplicatedFrom();
165+
ASSERT_TRUE(replicatedFrom.has_value());
166+
ASSERT_EQ(*replicatedFrom.value(), "us-west1");
167+
}
168+
157169
TEST(MessageTest, testNullValueMessage) {
158170
{
159171
auto msg = MessageBuilder().setContent("test").build();

tests/c/c_MessageTest.cc

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
#include <lib/c/c_structs.h>
2121
#include <pulsar/c/message.h>
2222

23+
#include "../PulsarFriend.h"
24+
2325
TEST(c_MessageTest, MessageCopy) {
2426
pulsar_message_t *from = pulsar_message_create();
2527
pulsar_message_set_content(from, "hello", 5);
@@ -32,3 +34,17 @@ TEST(c_MessageTest, MessageCopy) {
3234
pulsar_message_free(from);
3335
pulsar_message_free(to);
3436
}
37+
38+
TEST(c_MessageTest, ReplicationMetadataAccessors) {
39+
pulsar_message_t *message = pulsar_message_create();
40+
pulsar_message_set_content(message, "hello", 5);
41+
message->message = message->builder.build();
42+
43+
ASSERT_EQ(nullptr, pulsar_message_get_replicated_from(message));
44+
45+
PulsarFriend::getMessageMetadata(message->message).set_replicated_from("us-west1");
46+
47+
ASSERT_STREQ("us-west1", pulsar_message_get_replicated_from(message));
48+
49+
pulsar_message_free(message);
50+
}

0 commit comments

Comments
 (0)