From a3d8e4e985ce9fbbba1123276ae02a30ec77d1a3 Mon Sep 17 00:00:00 2001 From: shuaoz Date: Mon, 24 Jan 2022 10:56:59 -0500 Subject: [PATCH 1/2] feat: support chunked msg. producer part --- pulsar/internal/batch_builder.go | 59 ++++++++++++++++++ pulsar/internal/commands.go | 14 +++++ pulsar/producer.go | 4 ++ pulsar/producer_partition.go | 104 ++++++++++++++++++++++++++++++- 4 files changed, 179 insertions(+), 2 deletions(-) diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go index 9d18f26ca6..a516af5dbc 100644 --- a/pulsar/internal/batch_builder.go +++ b/pulsar/internal/batch_builder.go @@ -51,6 +51,14 @@ type BatchBuilder interface { callback interface{}, replicateTo []string, deliverAt time.Time, ) bool + // AddMessageMetaData will add a message to batch, + // currently it will only be used when sending chunked msg. + AddMessageMetaData( + metadata *pb.MessageMetadata, sequenceIDGenerator *uint64, + payload []byte, + callback interface{}, replicateTo []string, deliverAt time.Time, + ) bool + // Flush all the messages buffered in the client and wait until all messages have been successfully persisted. Flush() (batchData Buffer, sequenceID uint64, callbacks []interface{}, err error) @@ -207,6 +215,57 @@ func (bc *batchContainer) Add( return true } +func (bc *batchContainer) AddMessageMetaData( + metadata *pb.MessageMetadata, sequenceIDGenerator *uint64, + payload []byte, + callback interface{}, replicateTo []string, deliverAt time.Time, +) bool { + if replicateTo != nil && bc.numMessages != 0 { + // If the current batch is not empty and we're trying to set the replication clusters, + // then we need to force the current batch to flush and send the message individually + return false + } else if bc.msgMetadata.ReplicateTo != nil { + // There's already a message with cluster replication list. need to flush before next + // message can be sent + return false + } else if !bc.hasSpace(payload) { + // The current batch is full. Producer has to call Flush() to + return false + } + + if bc.numMessages == 0 { + var sequenceID uint64 + if metadata.SequenceId != nil { + sequenceID = *metadata.SequenceId + } else { + sequenceID = GetAndAdd(sequenceIDGenerator, 1) + } + bc.msgMetadata.SequenceId = proto.Uint64(sequenceID) + bc.msgMetadata.PublishTime = proto.Uint64(TimestampMillis(time.Now())) + bc.msgMetadata.ProducerName = &bc.producerName + bc.msgMetadata.ReplicateTo = replicateTo + bc.msgMetadata.PartitionKey = metadata.PartitionKey + bc.msgMetadata.Properties = metadata.Properties + + // Special field for chunks + bc.msgMetadata.Uuid = metadata.Uuid + bc.msgMetadata.NumChunksFromMsg = metadata.NumChunksFromMsg + bc.msgMetadata.TotalChunkMsgSize = metadata.TotalChunkMsgSize + bc.msgMetadata.ChunkId = metadata.ChunkId + + if deliverAt.UnixNano() > 0 { + bc.msgMetadata.DeliverAtTime = proto.Int64(int64(TimestampMillis(deliverAt))) + } + + bc.cmdSend.Send.SequenceId = proto.Uint64(sequenceID) + } + addMessageToBatch(bc.buffer, metadata, payload) + + bc.numMessages++ + bc.callbacks = append(bc.callbacks, callback) + return true +} + func (bc *batchContainer) reset() { bc.numMessages = 0 bc.buffer.Clear() diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go index b91c0b6341..9bb21ce493 100644 --- a/pulsar/internal/commands.go +++ b/pulsar/internal/commands.go @@ -218,6 +218,20 @@ func addSingleMessageToBatch(wb Buffer, smm *pb.SingleMessageMetadata, payload [ wb.Write(payload) } +func addMessageToBatch(wb Buffer, mm *pb.MessageMetadata, payload []byte) { + metadataSize := uint32(mm.Size()) + wb.WriteUint32(metadataSize) + + wb.ResizeIfNeeded(metadataSize) + _, err := mm.MarshalToSizedBuffer(wb.WritableSlice()[:metadataSize]) + if err != nil { + panic(fmt.Sprintf("Protobuf serialization error: %v", err)) + } + + wb.WrittenBytes(metadataSize) + wb.Write(payload) +} + func serializeBatch(wb Buffer, cmdSend *pb.BaseCommand, msgMetadata *pb.MessageMetadata, diff --git a/pulsar/producer.go b/pulsar/producer.go index d9b2307a33..489396fdb1 100644 --- a/pulsar/producer.go +++ b/pulsar/producer.go @@ -168,6 +168,10 @@ type ProducerOptions struct { // Encryption specifies the fields required to encrypt a message Encryption *ProducerEncryptionInfo + + // ChunkingEnabled specifies if Producer will split the original message into chunks and publish + // them with chunked metadata when message payload size is larger than broker can support. + ChunkingEnabled bool } // Producer is used to publish messages on a topic diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 3f1e54b6a5..e0869aaa74 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -19,6 +19,7 @@ package pulsar import ( "context" + uuidGen "github.com/google/uuid" "strings" "sync" "sync/atomic" @@ -411,8 +412,8 @@ func (p *partitionProducer) internalSend(request *sendRequest) { payload = schemaPayload } - // if msg is too large - if len(payload) > int(p._getConn().GetMaxMessageSize()) { + // if msg is too large and chunked msg not enabled + if len(payload) > int(p._getConn().GetMaxMessageSize()) && !p.options.ChunkingEnabled { p.publishSemaphore.Release() request.callback(nil, request.msg, errMessageTooLarge) p.log.WithError(errMessageTooLarge). @@ -423,6 +424,16 @@ func (p *partitionProducer) internalSend(request *sendRequest) { return } + // if msg is too large and chunked msg enabled, send chunked msg + if len(payload) > int(p._getConn().GetMaxMessageSize()) && p.options.ChunkingEnabled { + p.log. + WithField("size", len(payload)). + WithField("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize())). + Info("Size exceed limit, send with chunks") + p.internalSendWithTrunks(request, payload) + return + } + deliverAt := msg.DeliverAt if msg.DeliverAfter.Nanoseconds() > 0 { deliverAt = time.Now().Add(msg.DeliverAfter) @@ -496,6 +507,95 @@ func (p *partitionProducer) internalSend(request *sendRequest) { } } +func (p *partitionProducer) internalSendWithTrunks(request *sendRequest, payload []byte) { + chunkSize := int(p._getConn().GetMaxMessageSize()) + totalChunks := (len(payload)+1)/chunkSize + 1 + uuid := uuidGen.New().String() + + for chunkId := 0; chunkId < chunkSize; chunkId++ { + left := chunkId * chunkSize + right := left + chunkSize + if right > len(payload)-1 { + right = len(payload) - 1 + } + // [left, right) + p.internalSendSingleChunk(request, payload[left:right], uuid, totalChunks, len(payload), chunkId) + } +} + +func (p *partitionProducer) internalSendSingleChunk(request *sendRequest, payload []byte, + uuid string, totalChunks int, totalSize int, chunkId int) { + + msg := request.msg + deliverAt := msg.DeliverAt + if msg.DeliverAfter.Nanoseconds() > 0 { + deliverAt = time.Now().Add(msg.DeliverAfter) + } + + mm := &pb.MessageMetadata{} + + if msg.EventTime.UnixNano() != 0 { + mm.EventTime = proto.Uint64(internal.TimestampMillis(msg.EventTime)) + } + + if msg.Key != "" { + mm.PartitionKey = proto.String(msg.Key) + } + + if len(msg.OrderingKey) != 0 { + mm.OrderingKey = []byte(msg.OrderingKey) + } + + if msg.Properties != nil { + mm.Properties = internal.ConvertFromStringMap(msg.Properties) + } + + if msg.SequenceID != nil { + sequenceID := uint64(*msg.SequenceID) + mm.SequenceId = proto.Uint64(sequenceID) + } + + // Fields required for chunked data + mm.Uuid = proto.String(uuid) + mm.NumChunksFromMsg = proto.Int(totalChunks) + mm.TotalChunkMsgSize = proto.Int(totalSize) + mm.ChunkId = proto.Int(chunkId) + + p.internalFlushCurrentBatch() + + if msg.DisableReplication { + msg.ReplicationClusters = []string{"__local__"} + } + + added := p.batchBuilder.AddMessageMetaData(mm, p.sequenceIDGenerator, payload, request, + msg.ReplicationClusters, deliverAt) + if !added { + // The current batch is full.. flush it and retry + if p.batchBuilder.IsMultiBatches() { + p.internalFlushCurrentBatches() + } else { + p.internalFlushCurrentBatch() + } + + // after flushing try again to add the current payload + if ok := p.batchBuilder.AddMessageMetaData(mm, p.sequenceIDGenerator, payload, request, + msg.ReplicationClusters, deliverAt); !ok { + p.publishSemaphore.Release() + request.callback(nil, request.msg, errFailAddToBatch) + p.log.WithField("size", len(payload)). + WithField("properties", msg.Properties). + Error("unable to add message to batch") + return + } + } + + if p.batchBuilder.IsMultiBatches() { + p.internalFlushCurrentBatches() + } else { + p.internalFlushCurrentBatch() + } +} + type pendingItem struct { sync.Mutex batchData internal.Buffer From c403885c5377bcc90402bc5e02bd8f0591ef097f Mon Sep 17 00:00:00 2001 From: shuaoz Date: Tue, 25 Jan 2022 16:49:21 -0500 Subject: [PATCH 2/2] send msg without batchbuilder --- pulsar/internal/commands.go | 14 +++++++++++ pulsar/producer_partition.go | 48 +++++++++++------------------------- 2 files changed, 29 insertions(+), 33 deletions(-) diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go index 9bb21ce493..acdb437f39 100644 --- a/pulsar/internal/commands.go +++ b/pulsar/internal/commands.go @@ -232,6 +232,20 @@ func addMessageToBatch(wb Buffer, mm *pb.MessageMetadata, payload []byte) { wb.Write(payload) } +func ConstructBufferFromMessage(wb Buffer, mm *pb.MessageMetadata, payload []byte) { + metadataSize := uint32(mm.Size()) + wb.WriteUint32(metadataSize) + + wb.ResizeIfNeeded(metadataSize) + _, err := mm.MarshalToSizedBuffer(wb.WritableSlice()[:metadataSize]) + if err != nil { + panic(fmt.Sprintf("Protobuf serialization error: %v", err)) + } + + wb.WrittenBytes(metadataSize) + wb.Write(payload) +} + func serializeBatch(wb Buffer, cmdSend *pb.BaseCommand, msgMetadata *pb.MessageMetadata, diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index e0869aaa74..dea07e62af 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -527,13 +527,14 @@ func (p *partitionProducer) internalSendSingleChunk(request *sendRequest, payloa uuid string, totalChunks int, totalSize int, chunkId int) { msg := request.msg + mm := &pb.MessageMetadata{} + deliverAt := msg.DeliverAt if msg.DeliverAfter.Nanoseconds() > 0 { deliverAt = time.Now().Add(msg.DeliverAfter) + mm.DeliverAtTime = proto.Int64(int64(internal.TimestampMillis(deliverAt))) } - mm := &pb.MessageMetadata{} - if msg.EventTime.UnixNano() != 0 { mm.EventTime = proto.Uint64(internal.TimestampMillis(msg.EventTime)) } @@ -561,39 +562,20 @@ func (p *partitionProducer) internalSendSingleChunk(request *sendRequest, payloa mm.TotalChunkMsgSize = proto.Int(totalSize) mm.ChunkId = proto.Int(chunkId) - p.internalFlushCurrentBatch() - - if msg.DisableReplication { - msg.ReplicationClusters = []string{"__local__"} - } - - added := p.batchBuilder.AddMessageMetaData(mm, p.sequenceIDGenerator, payload, request, - msg.ReplicationClusters, deliverAt) - if !added { - // The current batch is full.. flush it and retry - if p.batchBuilder.IsMultiBatches() { - p.internalFlushCurrentBatches() - } else { - p.internalFlushCurrentBatch() - } + // Directly construct a buffer and put it to the pending queue + newBuffer := p.GetBuffer() + internal.ConstructBufferFromMessage(newBuffer, mm, payload) - // after flushing try again to add the current payload - if ok := p.batchBuilder.AddMessageMetaData(mm, p.sequenceIDGenerator, payload, request, - msg.ReplicationClusters, deliverAt); !ok { - p.publishSemaphore.Release() - request.callback(nil, request.msg, errFailAddToBatch) - p.log.WithField("size", len(payload)). - WithField("properties", msg.Properties). - Error("unable to add message to batch") - return - } - } + callbacks := make([]interface{}, 1) + callbacks[0] = request.callback - if p.batchBuilder.IsMultiBatches() { - p.internalFlushCurrentBatches() - } else { - p.internalFlushCurrentBatch() - } + p.pendingQueue.Put(&pendingItem{ + sentAt: time.Now(), + batchData: newBuffer, + sequenceID: uint64(*msg.SequenceID), + sendRequests: callbacks, + }) + p._getConn().WriteData(newBuffer) } type pendingItem struct {