diff --git a/go.mod b/go.mod index a3ae414..ac73d18 100644 --- a/go.mod +++ b/go.mod @@ -24,7 +24,7 @@ require ( github.com/CrisisTextLine/modular/modules/eventbus v1.6.0 // indirect github.com/DataDog/datadog-go/v5 v5.4.0 // indirect github.com/GoCodeAlone/go-plugin v0.0.0-20260220090904-b4c35f0e4271 // indirect - github.com/GoCodeAlone/yaegi v0.17.0 // indirect + github.com/GoCodeAlone/yaegi v0.17.1 // indirect github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.30.0 // indirect github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.55.0 // indirect github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.55.0 // indirect diff --git a/go.sum b/go.sum index 8ec7245..ba04770 100644 --- a/go.sum +++ b/go.sum @@ -36,6 +36,12 @@ github.com/CrisisTextLine/modular/modules/eventbus v1.6.0 h1:40H5/mrhPw3Jzi9wntg github.com/CrisisTextLine/modular/modules/eventbus v1.6.0/go.mod h1:I1tGf3DmadwyMP2NE2m6XHYl9ebXB9wBc/KZLywTR4c= github.com/DataDog/datadog-go/v5 v5.4.0 h1:Ea3eXUVwrVV28F/fo3Dr3aa+TL/Z7Xi6SUPKW8L99aI= github.com/DataDog/datadog-go/v5 v5.4.0/go.mod h1:K9kcYBlxkcPP8tvvjZZKs/m1edNAUFzBbdpTUKfCsuw= +github.com/GoCodeAlone/go-plugin v0.0.0-20260220090904-b4c35f0e4271 h1:/oxxpYJ41BuK+/5Gp9c+0PHybyNFWeBHyCzkSVLCoMk= +github.com/GoCodeAlone/go-plugin v0.0.0-20260220090904-b4c35f0e4271/go.mod h1:HbGQRZUIa+jbDfjsaZIMJYvrz+LnxL0mJpggfynSTMk= +github.com/GoCodeAlone/workflow v0.1.5-0.20260222204709-54fb9ff5d076 h1:ahZ9r2GmZviE8M6tQhf/WeHrNcNPbGMrhOq440ukKIY= +github.com/GoCodeAlone/workflow v0.1.5-0.20260222204709-54fb9ff5d076/go.mod h1:VfNIuF0HZO5oZGRwhA0oKDyhE1Nn2Pa3SeTWw+7HntA= +github.com/GoCodeAlone/yaegi v0.17.1 h1:aPAwU29L9cGceRAff02c5pjQcT5KapDB4fWFZK9tElE= +github.com/GoCodeAlone/yaegi v0.17.1/go.mod h1:z5Pr6Wse6QJcQvpgxTxzMAevFarH0N37TG88Y9dprx0= github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.30.0 h1:sBEjpZlNHzK1voKq9695PJSX2o5NEXl7/OL3coiIY0c= github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.30.0/go.mod h1:P4WPRUkOhJC13W//jWpyfJNDAIpvRbAUIYLX/4jtlE0= github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.55.0 h1:UnDZ/zFfG1JhH/DqxIZYU/1CUAlTUScoXD/LcM2Ykk8= diff --git a/internal/broker_module.go b/internal/broker_module.go index db5a974..8568858 100644 --- a/internal/broker_module.go +++ b/internal/broker_module.go @@ -3,6 +3,7 @@ package internal import ( "context" "fmt" + "log/slog" "sync" sdk "github.com/GoCodeAlone/workflow/plugin/external/sdk" @@ -136,7 +137,7 @@ func (m *brokerModule) ensureStream(ctx context.Context, topic string) (*service go func() { if err := stream.Run(ctx); err != nil && ctx.Err() == nil { - _ = err + slog.Error("bento broker stream runtime error", "name", m.name, "topic", topic, "error", err) } }() diff --git a/internal/input_module.go b/internal/input_module.go index 5eadfa1..4505e00 100644 --- a/internal/input_module.go +++ b/internal/input_module.go @@ -3,6 +3,7 @@ package internal import ( "context" "fmt" + "log/slog" sdk "github.com/GoCodeAlone/workflow/plugin/external/sdk" "github.com/warpstreamlabs/bento/v4/public/service" @@ -118,7 +119,7 @@ func (m *inputModule) Start(ctx context.Context) error { go func() { defer close(m.done) if err := stream.Run(runCtx); err != nil && runCtx.Err() == nil { - _ = err + slog.Error("bento input stream runtime error", "name", m.name, "error", err) } }() diff --git a/internal/output_module.go b/internal/output_module.go index 564b515..f239573 100644 --- a/internal/output_module.go +++ b/internal/output_module.go @@ -3,6 +3,7 @@ package internal import ( "context" "fmt" + "log/slog" sdk "github.com/GoCodeAlone/workflow/plugin/external/sdk" "github.com/warpstreamlabs/bento/v4/public/service" @@ -100,7 +101,7 @@ func (m *outputModule) Start(ctx context.Context) error { go func() { defer close(m.done) if err := stream.Run(runCtx); err != nil && runCtx.Err() == nil { - _ = err + slog.Error("bento output stream runtime error", "name", m.name, "error", err) } }() diff --git a/internal/stream_module.go b/internal/stream_module.go index bebb7b2..3fd01e8 100644 --- a/internal/stream_module.go +++ b/internal/stream_module.go @@ -3,6 +3,7 @@ package internal import ( "context" "fmt" + "log/slog" "github.com/warpstreamlabs/bento/v4/public/service" ) @@ -56,8 +57,7 @@ func (m *streamModule) Start(ctx context.Context) error { go func() { defer close(m.done) if err := stream.Run(runCtx); err != nil && runCtx.Err() == nil { - // Log error but don't panic — stream may have been stopped intentionally. - _ = err + slog.Error("bento stream runtime error", "name", m.name, "error", err) } }() diff --git a/internal/trigger.go b/internal/trigger.go index 7963842..5ce528c 100644 --- a/internal/trigger.go +++ b/internal/trigger.go @@ -3,6 +3,7 @@ package internal import ( "context" "fmt" + "log/slog" "sync" sdk "github.com/GoCodeAlone/workflow/plugin/external/sdk" @@ -147,7 +148,10 @@ func (t *bentoTrigger) Start(ctx context.Context) error { go func() { defer wg.Done() if err := stream.Run(runCtx); err != nil && runCtx.Err() == nil { - _ = err + slog.Error("bento trigger stream runtime error", "workflow", workflow, "action", action, "error", err) + if cbErr := cb("stream_error", map[string]any{"error": err.Error(), "workflow": workflow, "action": action}); cbErr != nil { + slog.Error("bento trigger stream_error callback error", "workflow", workflow, "action", action, "callback_error", cbErr) + } } }() }