From 840f15b21e2ec01524ae9883cb7a93bc6f943cd6 Mon Sep 17 00:00:00 2001 From: Serena Zamarripa Date: Thu, 19 Mar 2026 15:33:29 -0500 Subject: [PATCH] Fix Publisher metrics --- internal/consumer/consumer.go | 3 ++- internal/consumer/handler.go | 2 +- internal/publisher/publisher.go | 9 ++++++++- 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/internal/consumer/consumer.go b/internal/consumer/consumer.go index fdfc5b5..8fcb6c3 100644 --- a/internal/consumer/consumer.go +++ b/internal/consumer/consumer.go @@ -271,7 +271,6 @@ func (c *KafkaConsumer) pollLoop() { "offset": record.Offset, }) } - c.handleOutcome(outcome, err, record) c.metricEmitter.Notify(metrics.Event{ Name: metrics.PublisherOutcomes, @@ -280,6 +279,8 @@ func (c *KafkaConsumer) pollLoop() { metrics.OutcomeLabel, outcome.String(), }, }) + + c.handleOutcome(outcome, err, record) }) } } diff --git a/internal/consumer/handler.go b/internal/consumer/handler.go index 5d3804b..739d9d0 100644 --- a/internal/consumer/handler.go +++ b/internal/consumer/handler.go @@ -161,7 +161,7 @@ func (h *WRPMessageHandler) HandleMessage(ctx context.Context, record *kgo.Recor } h.emitLog(log.LevelDebug, "successfully routed WRP message", map[string]any{ - "outcome": outcome, + "outcome": outcome.String(), }) return getOutcome(outcome), nil diff --git a/internal/publisher/publisher.go b/internal/publisher/publisher.go index 1e22b1f..15ebe80 100644 --- a/internal/publisher/publisher.go +++ b/internal/publisher/publisher.go @@ -202,7 +202,7 @@ func (p *KafkaPublisher) Produce(ctx context.Context, msg *wrp.Message) (wrpkafk metrics.ErrorTypeLabel, "not_started", }, }) - return 0, ErrPublisherNotStarted // Return 0 (which corresponds to wrpkafka.Accepted) as default + return 0, ErrPublisherNotStarted // Return 0 (which corresponds to wrpkafka.Attempted) as default } outcome, err := p.wrpPublisher.Produce(ctx, msg) @@ -213,6 +213,13 @@ func (p *KafkaPublisher) Produce(ctx context.Context, msg *wrp.Message) (wrpkafk "source": msg.Source, "destination": msg.Destination, })) + p.metricEmitter.Notify(metrics.Event{ + Name: metrics.PublisherErrorsCounter, + Value: 1, + Labels: []string{ + metrics.ErrorTypeLabel, "failed_to_produce_message", + }, + }) return outcome, fmt.Errorf("failed to produce message: %w", err) }