Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions lib/event/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,12 @@ func (consumer *Consumer) listenOnce(f func(b []byte) error) error {

consumer.logger.Debug("started rabbitmq consumer.", "tag", tag, "exchange", consumer.exchangeName, "queue", q.Name)

// Handle connection close notification
connCloseChan := consumer.connManager.conn.NotifyClose(make(chan *amqp.Error, 1))
// Handle connection close notification — use GetConnection() to avoid data race
conn, err := consumer.connManager.GetConnection()
if err != nil {
return err
}
connCloseChan := conn.NotifyClose(make(chan *amqp.Error, 1))
chCloseChan := ch.NotifyClose(make(chan *amqp.Error, 1))

for {
Expand Down
63 changes: 51 additions & 12 deletions lib/event/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package event

import (
"context"
"sync"

amqp "github.com/rabbitmq/amqp091-go"
"log/slog"
)
Expand All @@ -11,34 +13,58 @@ type Emitter struct {
connManager *ConnectionManager
config ConnectionConfig
logger *slog.Logger
mu sync.Mutex
ch *amqp.Channel
exchanges map[string]bool // tracks declared exchanges on current channel
}

func (e *Emitter) setup() error {
channel, err := e.connManager.GetChannel()
// Validate that the connection manager can provide a channel
ch, err := e.connManager.GetChannel()
if err != nil {
return err
}
defer channel.Close()
e.ch = ch
return nil
}

return declareExchange(channel, "logs_topic")
// getChannel returns the persistent channel, creating a new one if needed.
func (e *Emitter) getChannel() (*amqp.Channel, error) {
if e.ch != nil && !e.ch.IsClosed() {
return e.ch, nil
}
// Channel is nil or closed — get a fresh one
ch, err := e.connManager.GetChannel()
if err != nil {
return nil, err
}
e.ch = ch
e.exchanges = make(map[string]bool) // reset declared exchanges
return ch, nil
}

// Push (Publish) a specified message to the AMQP exchange with retry logic
// Publish a message to the AMQP exchange with retry logic.
func (e *Emitter) Publish(exchange string, topic string, message string) error {
return RetryWithBackoff(context.Background(), e.config, e.logger, func() error {
channel, err := e.connManager.GetChannel()
e.mu.Lock()
defer e.mu.Unlock()

ch, err := e.getChannel()
if err != nil {
return err
}
defer channel.Close()

// Ensure exchange exists
err = declareExchange(channel, exchange)
if err != nil {
return err
// Declare exchange once per channel lifetime
if !e.exchanges[exchange] {
if err := declareExchange(ch, exchange); err != nil {
// Channel may be broken — force reset on next call
e.ch = nil
return err
}
e.exchanges[exchange] = true
}

return channel.PublishWithContext(
err = ch.PublishWithContext(
context.Background(),
exchange,
topic,
Expand All @@ -50,6 +76,12 @@ func (e *Emitter) Publish(exchange string, topic string, message string) error {
DeliveryMode: 2,
},
)
if err != nil {
// Channel may be broken — force reset on next call
e.ch = nil
return err
}
return nil
})
}

Expand All @@ -60,6 +92,7 @@ func NewEventEmitter(connManager *ConnectionManager) (Emitter, error) {
connManager: connManager,
config: DefaultConnectionConfig(),
logger: slog.Default(),
exchanges: make(map[string]bool),
}

err := emitter.setup()
Expand All @@ -80,6 +113,7 @@ func NewEventEmitterWithConfig(connManager *ConnectionManager, config Connection
connManager: connManager,
config: config,
logger: logger,
exchanges: make(map[string]bool),
}

err := emitter.setup()
Expand All @@ -90,8 +124,13 @@ func NewEventEmitterWithConfig(connManager *ConnectionManager, config Connection
return emitter, nil
}

// Close closes the emitter and its connection manager
// Close closes the emitter's channel and its connection manager
func (e *Emitter) Close() error {
e.mu.Lock()
if e.ch != nil && !e.ch.IsClosed() {
e.ch.Close()
}
e.mu.Unlock()
return e.connManager.Close()
}

Expand Down
4 changes: 0 additions & 4 deletions lib/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@ import (
"log/slog"
)

func getExchangeName() string {
return "logs_topic"
}

func declareQueue(ch *amqp.Channel, name string, durable bool) (amqp.Queue, error) {
return ch.QueueDeclare(
name, // name
Expand Down