Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions backend/internal/adapters/telemetry/fanout.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package telemetry

import (
"context"
"errors"

"github.com/aoagents/agent-orchestrator/backend/internal/ports"
)

// FanoutSink emits each event to multiple sinks.
type FanoutSink struct {
sinks []ports.EventSink
}

// NewFanoutSink builds a sink that forwards each event to every non-nil sink.
func NewFanoutSink(sinks ...ports.EventSink) *FanoutSink {
filtered := make([]ports.EventSink, 0, len(sinks))
for _, sink := range sinks {
if sink != nil {
filtered = append(filtered, sink)
}
}
return &FanoutSink{sinks: filtered}
}

// Emit forwards the event to each configured sink.
func (s *FanoutSink) Emit(ctx context.Context, ev ports.TelemetryEvent) {
for _, sink := range s.sinks {
sink.Emit(ctx, ev)
}
}

// Close closes every configured sink and joins any returned errors.
func (s *FanoutSink) Close(ctx context.Context) error {
var errs []error
for _, sink := range s.sinks {
if err := sink.Close(ctx); err != nil {
errs = append(errs, err)
}
}
return errors.Join(errs...)
}
124 changes: 124 additions & 0 deletions backend/internal/adapters/telemetry/localsqlite.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package telemetry

import (
"context"
"encoding/json"
"log/slog"
"sync"
"time"

"github.com/google/uuid"

"github.com/aoagents/agent-orchestrator/backend/internal/ports"
sqlitestore "github.com/aoagents/agent-orchestrator/backend/internal/storage/sqlite/store"
)

const (
localBufferSize = 128
localRetention = 30 * 24 * time.Hour
localPruneEvery = time.Hour
localPruneBatchLimit = int64(1000)
)

type localStore interface {
CreateTelemetryEvent(ctx context.Context, rec sqlitestore.TelemetryEventRecord) error
PruneTelemetryEventsBefore(ctx context.Context, before time.Time, limit int64) (int64, error)
}

// LocalSQLiteSink persists telemetry events into the daemon's SQLite database
// behind a small buffered worker so event emission stays best-effort.
type LocalSQLiteSink struct {
store localStore
log *slog.Logger
ch chan ports.TelemetryEvent
wg sync.WaitGroup
closeOnce sync.Once
now func() time.Time
newID func() string

pruneMu sync.Mutex
lastPrune time.Time
}

// NewLocalSQLiteSink starts a buffered SQLite-backed telemetry sink.
func NewLocalSQLiteSink(store localStore, log *slog.Logger) *LocalSQLiteSink {
s := &LocalSQLiteSink{
store: store,
log: log,
ch: make(chan ports.TelemetryEvent, localBufferSize),
now: time.Now,
newID: func() string { return "tev_" + uuid.NewString() },
}
s.wg.Add(1)
go s.loop()
return s
}

// Emit enqueues an event for best-effort persistence.
func (s *LocalSQLiteSink) Emit(_ context.Context, ev ports.TelemetryEvent) {
select {
case s.ch <- ev:
default:
s.log.Warn("telemetry local sink buffer full; dropping event", "name", ev.Name, "source", ev.Source)
}
}

// Close drains the worker until completion or context cancellation.
func (s *LocalSQLiteSink) Close(ctx context.Context) error {
s.closeOnce.Do(func() { close(s.ch) })
done := make(chan struct{})
go func() {
defer close(done)
s.wg.Wait()
}()
select {
case <-ctx.Done():
return ctx.Err()
case <-done:
return nil
}
}

func (s *LocalSQLiteSink) loop() {
defer s.wg.Done()
for ev := range s.ch {
s.persist(ev)
}
}

func (s *LocalSQLiteSink) persist(ev ports.TelemetryEvent) {
payloadJSON, err := json.Marshal(ev.Payload)
if err != nil {
s.log.Warn("telemetry payload marshal failed", "name", ev.Name, "error", err)
return
}
rec := sqlitestore.TelemetryEventRecord{
ID: s.newID(),
OccurredAt: ev.OccurredAt.UTC(),
Name: ev.Name,
Source: ev.Source,
Level: string(ev.Level),
ProjectID: ev.ProjectID,
SessionID: ev.SessionID,
RequestID: ev.RequestID,
PayloadJSON: string(payloadJSON),
}
if err := s.store.CreateTelemetryEvent(context.Background(), rec); err != nil {
s.log.Warn("telemetry local sink write failed", "name", ev.Name, "error", err)
return
}
s.maybePrune()
}

func (s *LocalSQLiteSink) maybePrune() {
s.pruneMu.Lock()
defer s.pruneMu.Unlock()
now := s.now().UTC()
if !s.lastPrune.IsZero() && now.Sub(s.lastPrune) < localPruneEvery {
return
}
s.lastPrune = now
if _, err := s.store.PruneTelemetryEventsBefore(context.Background(), now.Add(-localRetention), localPruneBatchLimit); err != nil {
s.log.Warn("telemetry local sink prune failed", "error", err)
}
}
16 changes: 16 additions & 0 deletions backend/internal/adapters/telemetry/noop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package telemetry

import (
"context"

"github.com/aoagents/agent-orchestrator/backend/internal/ports"
)

// NoopSink discards every event.
type NoopSink struct{}

// Emit discards the event.
func (NoopSink) Emit(context.Context, ports.TelemetryEvent) {}

// Close is a no-op.
func (NoopSink) Close(context.Context) error { return nil }
Loading
Loading