Skip to content

Commit 9ae69fd

Browse files
committed
fix #531
1 parent bda51d6 commit 9ae69fd

3 files changed

Lines changed: 54 additions & 11 deletions

File tree

lib/Commands.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -902,7 +902,9 @@ uint64_t Commands::serializeSingleMessagesToBatchPayload(SharedBuffer& batchPayl
902902
batchPayload.write(payload.data(), payload.readableBytes());
903903
}
904904

905-
return messages.back().impl_->metadata.sequence_id();
905+
// Use the first message's sequence_id so that ackReceived can compute
906+
// lastSequenceIdPublished_ = sequenceId + messagesCount - 1 correctly.
907+
return messages.front().impl_->metadata.sequence_id();
906908
}
907909

908910
Message Commands::deSerializeSingleMessageInBatch(Message& batchedMessage, int32_t batchIndex,

lib/ProducerImpl.cc

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -933,19 +933,23 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& rawMessageId) {
933933
return false;
934934
}
935935

936-
uint64_t expectedSequenceId = op.sendArgs->sequenceId;
937-
if (sequenceId > expectedSequenceId) {
938-
LOG_WARN(getName() << "Got ack for msg " << sequenceId //
939-
<< " expecting: " << expectedSequenceId << " queue size=" //
940-
<< pendingMessagesQueue_.size() << " producer: " << producerId_);
936+
const uint64_t expectedFirstSequenceId = op.sendArgs->sequenceId;
937+
const uint64_t expectedLastSequenceId = expectedFirstSequenceId + op.messagesCount - 1;
938+
// Broker may ack with either the first or the last sequence id of the batch.
939+
if (sequenceId > expectedLastSequenceId) {
940+
LOG_WARN(getName() << "Got ack for msg " << sequenceId << " expecting last: " << expectedLastSequenceId
941+
<< " queue size=" << pendingMessagesQueue_.size() << " producer: " << producerId_);
941942
return false;
942-
} else if (sequenceId < expectedSequenceId) {
943+
}
944+
if (sequenceId < expectedFirstSequenceId) {
943945
// Ignoring the ack since it's referring to a message that has already timed out.
944-
LOG_DEBUG(getName() << "Got ack for timed out msg " << sequenceId //
945-
<< " -- MessageId - " << messageId << " last-seq: " << expectedSequenceId
946-
<< " producer: " << producerId_);
946+
LOG_DEBUG(getName() << "Got ack for timed out msg " << sequenceId << " -- MessageId - " << messageId
947+
<< " first-seq: " << expectedFirstSequenceId << " producer: " << producerId_);
947948
return true;
948949
}
950+
// sequenceId is in [expectedFirstSequenceId, expectedLastSequenceId]; accept as matching this op.
951+
const bool brokerSentFirst = (sequenceId == expectedFirstSequenceId);
952+
lastSequenceIdPublished_ = brokerSentFirst ? expectedLastSequenceId : sequenceId;
949953

950954
// Message was persisted correctly
951955
LOG_DEBUG(getName() << "Received ack for msg " << sequenceId);
@@ -960,7 +964,6 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& rawMessageId) {
960964
}
961965

962966
releaseSemaphoreForSendOp(op);
963-
lastSequenceIdPublished_ = sequenceId + op.messagesCount - 1;
964967

965968
std::unique_ptr<OpSendMsg> opSendMsg{pendingMessagesQueue_.front().release()};
966969
pendingMessagesQueue_.pop_front();

tests/ProducerTest.cc

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -441,6 +441,44 @@ TEST_P(ProducerTest, testFlushNoBatch) {
441441
client.close();
442442
}
443443

444+
// Verifies that getLastSequenceId() is correct after sendAsync + flush when batching is enabled.
445+
// Previously the batch used the last message's sequence_id, causing lastSequenceIdPublished_ to be
446+
// doubled (e.g. 3 messages yielded 4 instead of 2). The batch must use the first message's
447+
// sequence_id so that lastSequenceIdPublished_ = sequenceId + messagesCount - 1 is correct.
448+
TEST(ProducerTest, testGetLastSequenceIdAfterBatchFlush) {
449+
Client client(serviceUrl);
450+
451+
const std::string topicName =
452+
"persistent://public/default/testGetLastSequenceIdAfterBatchFlush-" + std::to_string(time(nullptr));
453+
454+
ProducerConfiguration producerConfiguration;
455+
producerConfiguration.setBatchingEnabled(true);
456+
producerConfiguration.setBatchingMaxMessages(10);
457+
producerConfiguration.setBatchingMaxPublishDelayMs(60000);
458+
459+
Producer producer;
460+
ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfiguration, producer));
461+
462+
// Send 3 messages in a batch, then flush. Sequence ids are [0, 1, 2], so getLastSequenceId() must be 2.
463+
for (int i = 0; i < 3; i++) {
464+
Message msg = MessageBuilder().setContent("content").build();
465+
producer.sendAsync(msg, nullptr);
466+
}
467+
ASSERT_EQ(ResultOk, producer.flush());
468+
ASSERT_EQ(producer.getLastSequenceId(), 2) << "After 3 messages, last sequence id should be 2";
469+
470+
// Send 2 more (total 5), flush. Sequence ids for these are [3, 4], so getLastSequenceId() must be 4.
471+
for (int i = 0; i < 2; i++) {
472+
Message msg = MessageBuilder().setContent("content").build();
473+
producer.sendAsync(msg, nullptr);
474+
}
475+
ASSERT_EQ(ResultOk, producer.flush());
476+
ASSERT_EQ(producer.getLastSequenceId(), 4) << "After 5 messages total, last sequence id should be 4";
477+
478+
producer.close();
479+
client.close();
480+
}
481+
444482
TEST_P(ProducerTest, testFlushBatch) {
445483
Client client(serviceUrl);
446484

0 commit comments

Comments
 (0)