fix: address test reliability, correctness, and silent stream failures#8
Merged
fix: address test reliability, correctness, and silent stream failures#8
Conversation
Copilot
AI
changed the title
[WIP] Add comprehensive unit tests for modules, steps, and triggers
fix: address test reliability, correctness, and silent stream failures
Feb 23, 2026
There was a problem hiding this comment.
Pull request overview
This PR improves the reliability and correctness of the Bento plugin test suite by replacing timing-dependent assertions with deadline polling, ensuring failure-paths are bounded by timeouts, and fixing silent stream/test failures caused by missing Bento component registration and invalid Bloblang usage in test configs.
Changes:
- Replace flaky
time.Sleep-based assertions with deadline polling in key async tests. - Fix tests to actually exercise intended error paths (callback error, invalid Bloblang) and bound failure paths with timeouts.
- Register Bento “pure” components in the test process and correct Bloblang
count()usage; rungo mod tidyupdates.
Reviewed changes
Copilot reviewed 9 out of 10 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| internal/trigger_test.go | Adds comprehensive trigger tests; includes polling improvements and callback error coverage. |
| internal/testmain_test.go | Side-effect import to register Bento pure components for tests. |
| internal/stream_module_test.go | Adds stream module tests covering init/start/stop and invalid configs. |
| internal/processor_step_test.go | Adds processor step tests (Bloblang, merges, invalid syntax, context cancel). |
| internal/plugin_test.go | Adds plugin interface/manifest/schema coverage and module/step/trigger creation tests. |
| internal/output_module_test.go | Adds output module tests with mock subscriber and subscription lifecycle checks. |
| internal/input_module_test.go | Adds input module tests with mock publisher and publish assertions using polling. |
| internal/broker_module_test.go | Adds broker module tests including concurrent ensureStream behavior. |
| go.mod | go mod tidy updates for newly required indirect dependencies. |
| go.sum | Corresponding checksum updates from go mod tidy. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
…riggers - Add lifecycle tests for stream, input, output, and broker modules - Add Bloblang processor step tests with valid/invalid mappings - Add trigger lifecycle tests with subscriptions and callbacks - Add plugin registration tests verifying all interfaces - Test error scenarios and config validation - Mock dependencies (MessagePublisher, MessageSubscriber) for isolated testing Note: Some integration tests that require actual Bento stream execution have known issues with YAML format expectations. Core functionality and interfaces are fully tested. Future work can refine Bento-specific YAML configs for full end-to-end integration tests. Closes #2 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
- Replace time.Sleep with deadline-based polling in TestInputModule_PublishMessages and TestBentoTrigger_StartStop - Remove unused wantType field from TestBentoPlugin_CreateModule test table - Add context timeout to TestProcessorStep_ExecuteWithInvalidBloblang - Fix TestBentoTrigger_CallbackError to actually return a non-nil error and verify clean stop - Fix count() Bloblang calls to include required name argument - Add testmain_test.go to register Bento pure components for tests Co-authored-by: intel352 <77607+intel352@users.noreply.github.com>
Contributor
|
@copilot apply changes based on the comments in this thread |
- broker_module_test.go: use generate transport (not memory) for ensureStream tests since memory is not a valid Bento input type; keep memory transport for Init/StartStop tests that don't call ensureStream; restore time.Sleep before Stop to allow goroutines to start running streams - processor_step_test.go: use mapping: YAML format (not - bloblang: list format) since AddProcessorYAML expects a single processor object, not a sequence; reduce invalid bloblang timeout from 5s to 2s; keep lenient error handling for construction-time failures - stream_module_test.go: restore errors import and timeout context for StopWithoutStart since Stop blocks on done channel when stream was never started - trigger_test.go: restore timeout context for StopWithoutStart since done channel is never closed when Start was not called Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
7b01fd7 to
7fdedba
Compare
- Replace time.Sleep(200ms) with deadline-based polling in TestBentoTrigger_MultipleSubscriptions - Strengthen TestBentoTrigger_CallbackError assertion to require at least 2 invocations - Change TestProcessorStep_ExecuteWithContextCancel from t.Logf to a hard errors.Is assertion Co-authored-by: intel352 <77607+intel352@users.noreply.github.com>
Contributor
Author
Applied all three changes from the review in commit
|
intel352
added a commit
that referenced
this pull request
Feb 24, 2026
#19, #20) - #4 output_module: reinitialize done channel at Start for restart support - #6 broker_module: copy streams map under lock then release before calling stream.Stop to prevent potential deadlock in Stop() - #8 processor_step: use sync.Once in consumer func so fan-out (processor emitting multiple messages) does not block on the size-1 channel - #9 processor_step: deferred cleanup selects on ctx.Done() when draining streamDone so a cancelled parent context does not cause an unconditional block - #17 input_module: extract real transport type via inputTransportType() helper so LogStreamStart logs "kafka"/"generate"/etc. instead of "bento.input" - #18 output_module: extract real transport type via outputTransportType() helper so LogStreamStart logs the actual Bento output type - #19 bridge: mapToMessage now returns (*service.Message, error); json.Marshal errors are surfaced instead of silently ignored; updated all call sites - #20 tests: replace remaining time.Sleep(50ms) calls with polling loops in stream_module_test, output_module_test, and broker_module_test Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
intel352
added a commit
that referenced
this pull request
Feb 24, 2026
* feat: add structured logging and observability hooks (#3) - Add internal/logger.go: bentoLogger wrapping log/slog with structured fields (component, name) and helpers LogStreamStart, LogStreamStop, LogStreamError, LogMessageProcessed, LogTopicEvent, LogProcessingStart, LogProcessingComplete, LogProcessingError - Add internal/metrics.go: thread-safe StreamMetrics using sync/atomic for message-in/out and error counters, plus mutex-guarded start time and last-message-time; Snapshot() for point-in-time read - Add internal/health.go: healthTracker derives HealthStatus (healthy / degraded / unhealthy) from metrics and running state; HealthReport struct for per-stream health reporting - Update stream_module, input_module, output_module, broker_module: attach logger/metrics/health on construction; emit log lines at start, stop, error, and per-message events; expose Health() method - Update processor_step: log processing start, complete, and error events - Fix pre-existing lint issues: check MetaWalkMut return value, remove unused mapToMessage helper, nolint annotation on future-use ensureStream - Add internal/logger_test.go: 21 unit tests covering log output format, metrics counting, health state transitions, and concurrent metric updates Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix: format bridge.go * chore: retrigger CI * fix: apply observability PR review feedback (#9) * Initial plan * fix: apply PR review feedback for observability improvements - Fix wrong comment on LogStreamStop: 'uptimeSeconds' -> 'messagesProcessed' - Use slog.Default() in newLogger to respect global slog configuration - Pass snap.Errors (not always-zero snap.MessagesIn) to LogStreamStop in stream_module - Remove unnecessary mutex lock/unlock in newStreamMetrics initialization Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> * fix: observability lifecycle correctness — health on unexpected exit, uptime semantics, subscribe ordering (#13) * Initial plan * fix: apply review feedback - lifecycle hooks, health on unexpected exit, subscribe ordering Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> * fix: output stream cleanup on subscribe error + Health() test coverage for all modules (#15) * Initial plan * fix: stop built stream on subscribe error and add Health() tests for all modules Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> * fix: hot-path atomics, debug log guard, goroutine cleanup, deterministic health tests (#17) * Initial plan * fix: hot-path atomics, debug guard, goroutine cleanup, deterministic tests Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> * fix: address remaining PR #7 review comments (#4, #6, #8, #9, #17, #18, #19, #20) - #4 output_module: reinitialize done channel at Start for restart support - #6 broker_module: copy streams map under lock then release before calling stream.Stop to prevent potential deadlock in Stop() - #8 processor_step: use sync.Once in consumer func so fan-out (processor emitting multiple messages) does not block on the size-1 channel - #9 processor_step: deferred cleanup selects on ctx.Done() when draining streamDone so a cancelled parent context does not cause an unconditional block - #17 input_module: extract real transport type via inputTransportType() helper so LogStreamStart logs "kafka"/"generate"/etc. instead of "bento.input" - #18 output_module: extract real transport type via outputTransportType() helper so LogStreamStart logs the actual Bento output type - #19 bridge: mapToMessage now returns (*service.Message, error); json.Marshal errors are surfaced instead of silently ignored; updated all call sites - #20 tests: replace remaining time.Sleep(50ms) calls with polling loops in stream_module_test, output_module_test, and broker_module_test Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com> Co-authored-by: Copilot <198982749+Copilot@users.noreply.github.com> Co-authored-by: intel352 <77607+intel352@users.noreply.github.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
TestBentoTrigger_MultipleSubscriptions: replacetime.Sleep(200ms)with deadline-based pollingTestBentoTrigger_CallbackError: assertcount >= 2instead ofcount > 0TestProcessorStep_ExecuteWithContextCancel: useerrors.Is(err, context.Canceled)hard assertion instead oft.Logf✨ Let Copilot coding agent set things up for you — coding agent works faster and does higher quality work when set up for your repo.