Skip to content

fix: address test reliability, correctness, and silent stream failures#8

Merged
intel352 merged 5 commits intomainfrom
copilot/sub-pr-6
Feb 23, 2026
Merged

fix: address test reliability, correctness, and silent stream failures#8
intel352 merged 5 commits intomainfrom
copilot/sub-pr-6

Conversation

Copy link
Contributor

Copilot AI commented Feb 23, 2026

  • Review comments from PR fix: address test reliability, correctness, and silent stream failures #8
  • Fix TestBentoTrigger_MultipleSubscriptions: replace time.Sleep(200ms) with deadline-based polling
  • Fix TestBentoTrigger_CallbackError: assert count >= 2 instead of count > 0
  • Fix TestProcessorStep_ExecuteWithContextCancel: use errors.Is(err, context.Canceled) hard assertion instead of t.Logf
  • Code review and security scan

✨ Let Copilot coding agent set things up for you — coding agent works faster and does higher quality work when set up for your repo.

Copilot AI changed the title [WIP] Add comprehensive unit tests for modules, steps, and triggers fix: address test reliability, correctness, and silent stream failures Feb 23, 2026
Copilot AI requested a review from intel352 February 23, 2026 10:56
Base automatically changed from feat/issue-2-tests to main February 23, 2026 13:50
@intel352 intel352 marked this pull request as ready for review February 23, 2026 13:51
Copilot AI review requested due to automatic review settings February 23, 2026 13:51
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR improves the reliability and correctness of the Bento plugin test suite by replacing timing-dependent assertions with deadline polling, ensuring failure-paths are bounded by timeouts, and fixing silent stream/test failures caused by missing Bento component registration and invalid Bloblang usage in test configs.

Changes:

  • Replace flaky time.Sleep-based assertions with deadline polling in key async tests.
  • Fix tests to actually exercise intended error paths (callback error, invalid Bloblang) and bound failure paths with timeouts.
  • Register Bento “pure” components in the test process and correct Bloblang count() usage; run go mod tidy updates.

Reviewed changes

Copilot reviewed 9 out of 10 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
internal/trigger_test.go Adds comprehensive trigger tests; includes polling improvements and callback error coverage.
internal/testmain_test.go Side-effect import to register Bento pure components for tests.
internal/stream_module_test.go Adds stream module tests covering init/start/stop and invalid configs.
internal/processor_step_test.go Adds processor step tests (Bloblang, merges, invalid syntax, context cancel).
internal/plugin_test.go Adds plugin interface/manifest/schema coverage and module/step/trigger creation tests.
internal/output_module_test.go Adds output module tests with mock subscriber and subscription lifecycle checks.
internal/input_module_test.go Adds input module tests with mock publisher and publish assertions using polling.
internal/broker_module_test.go Adds broker module tests including concurrent ensureStream behavior.
go.mod go mod tidy updates for newly required indirect dependencies.
go.sum Corresponding checksum updates from go mod tidy.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

intel352 and others added 3 commits February 23, 2026 09:09
…riggers

- Add lifecycle tests for stream, input, output, and broker modules
- Add Bloblang processor step tests with valid/invalid mappings
- Add trigger lifecycle tests with subscriptions and callbacks
- Add plugin registration tests verifying all interfaces
- Test error scenarios and config validation
- Mock dependencies (MessagePublisher, MessageSubscriber) for isolated testing

Note: Some integration tests that require actual Bento stream execution
have known issues with YAML format expectations. Core functionality and
interfaces are fully tested. Future work can refine Bento-specific YAML
configs for full end-to-end integration tests.

Closes #2

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
- Replace time.Sleep with deadline-based polling in TestInputModule_PublishMessages and TestBentoTrigger_StartStop
- Remove unused wantType field from TestBentoPlugin_CreateModule test table
- Add context timeout to TestProcessorStep_ExecuteWithInvalidBloblang
- Fix TestBentoTrigger_CallbackError to actually return a non-nil error and verify clean stop
- Fix count() Bloblang calls to include required name argument
- Add testmain_test.go to register Bento pure components for tests

Co-authored-by: intel352 <77607+intel352@users.noreply.github.com>
@intel352
Copy link
Contributor

@copilot apply changes based on the comments in this thread

- broker_module_test.go: use generate transport (not memory) for
  ensureStream tests since memory is not a valid Bento input type;
  keep memory transport for Init/StartStop tests that don't call
  ensureStream; restore time.Sleep before Stop to allow goroutines
  to start running streams
- processor_step_test.go: use mapping: YAML format (not - bloblang:
  list format) since AddProcessorYAML expects a single processor
  object, not a sequence; reduce invalid bloblang timeout from 5s
  to 2s; keep lenient error handling for construction-time failures
- stream_module_test.go: restore errors import and timeout context
  for StopWithoutStart since Stop blocks on done channel when stream
  was never started
- trigger_test.go: restore timeout context for StopWithoutStart
  since done channel is never closed when Start was not called

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Replace time.Sleep(200ms) with deadline-based polling in TestBentoTrigger_MultipleSubscriptions
- Strengthen TestBentoTrigger_CallbackError assertion to require at least 2 invocations
- Change TestProcessorStep_ExecuteWithContextCancel from t.Logf to a hard errors.Is assertion

Co-authored-by: intel352 <77607+intel352@users.noreply.github.com>
Copy link
Contributor Author

Copilot AI commented Feb 23, 2026

@copilot apply changes based on the comments in this thread

Applied all three changes from the review in commit 11e3758:

  • TestBentoTrigger_MultipleSubscriptions: replaced time.Sleep(200ms) with a deadline-based polling loop (same pattern as TestBentoTrigger_StartStop) that fails explicitly with the observed count if the deadline is reached.
  • TestBentoTrigger_CallbackError: strengthened assertion from count == 0 to count < 2, requiring at least 2 invocations (1 error + 1 success) as stated in the test comment.
  • TestProcessorStep_ExecuteWithContextCancel: replaced the t.Logf non-assertion with a hard errors.Is(err, context.Canceled) check, so the test fails if the wrong error type is returned.

@intel352 intel352 merged commit 4d32676 into main Feb 23, 2026
8 checks passed
@intel352 intel352 deleted the copilot/sub-pr-6 branch February 23, 2026 20:35
intel352 added a commit that referenced this pull request Feb 24, 2026
#19, #20)

- #4  output_module: reinitialize done channel at Start for restart support
- #6  broker_module: copy streams map under lock then release before calling
      stream.Stop to prevent potential deadlock in Stop()
- #8  processor_step: use sync.Once in consumer func so fan-out (processor
      emitting multiple messages) does not block on the size-1 channel
- #9  processor_step: deferred cleanup selects on ctx.Done() when draining
      streamDone so a cancelled parent context does not cause an unconditional
      block
- #17 input_module: extract real transport type via inputTransportType() helper
      so LogStreamStart logs "kafka"/"generate"/etc. instead of "bento.input"
- #18 output_module: extract real transport type via outputTransportType()
      helper so LogStreamStart logs the actual Bento output type
- #19 bridge: mapToMessage now returns (*service.Message, error); json.Marshal
      errors are surfaced instead of silently ignored; updated all call sites
- #20 tests: replace remaining time.Sleep(50ms) calls with polling loops in
      stream_module_test, output_module_test, and broker_module_test

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
intel352 added a commit that referenced this pull request Feb 24, 2026
* feat: add structured logging and observability hooks (#3)

- Add internal/logger.go: bentoLogger wrapping log/slog with structured
  fields (component, name) and helpers LogStreamStart, LogStreamStop,
  LogStreamError, LogMessageProcessed, LogTopicEvent, LogProcessingStart,
  LogProcessingComplete, LogProcessingError
- Add internal/metrics.go: thread-safe StreamMetrics using sync/atomic
  for message-in/out and error counters, plus mutex-guarded start time and
  last-message-time; Snapshot() for point-in-time read
- Add internal/health.go: healthTracker derives HealthStatus (healthy /
  degraded / unhealthy) from metrics and running state; HealthReport struct
  for per-stream health reporting
- Update stream_module, input_module, output_module, broker_module:
  attach logger/metrics/health on construction; emit log lines at start,
  stop, error, and per-message events; expose Health() method
- Update processor_step: log processing start, complete, and error events
- Fix pre-existing lint issues: check MetaWalkMut return value, remove
  unused mapToMessage helper, nolint annotation on future-use ensureStream
- Add internal/logger_test.go: 21 unit tests covering log output format,
  metrics counting, health state transitions, and concurrent metric updates

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: format bridge.go

* chore: retrigger CI

* fix: apply observability PR review feedback (#9)

* Initial plan

* fix: apply PR review feedback for observability improvements

- Fix wrong comment on LogStreamStop: 'uptimeSeconds' -> 'messagesProcessed'
- Use slog.Default() in newLogger to respect global slog configuration
- Pass snap.Errors (not always-zero snap.MessagesIn) to LogStreamStop in stream_module
- Remove unnecessary mutex lock/unlock in newStreamMetrics initialization

Co-authored-by: intel352 <77607+intel352@users.noreply.github.com>

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: intel352 <77607+intel352@users.noreply.github.com>

* fix: observability lifecycle correctness — health on unexpected exit, uptime semantics, subscribe ordering (#13)

* Initial plan

* fix: apply review feedback - lifecycle hooks, health on unexpected exit, subscribe ordering

Co-authored-by: intel352 <77607+intel352@users.noreply.github.com>

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: intel352 <77607+intel352@users.noreply.github.com>

* fix: output stream cleanup on subscribe error + Health() test coverage for all modules (#15)

* Initial plan

* fix: stop built stream on subscribe error and add Health() tests for all modules

Co-authored-by: intel352 <77607+intel352@users.noreply.github.com>

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: intel352 <77607+intel352@users.noreply.github.com>

* fix: hot-path atomics, debug log guard, goroutine cleanup, deterministic health tests (#17)

* Initial plan

* fix: hot-path atomics, debug guard, goroutine cleanup, deterministic tests

Co-authored-by: intel352 <77607+intel352@users.noreply.github.com>

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: intel352 <77607+intel352@users.noreply.github.com>

* fix: address remaining PR #7 review comments (#4, #6, #8, #9, #17, #18, #19, #20)

- #4  output_module: reinitialize done channel at Start for restart support
- #6  broker_module: copy streams map under lock then release before calling
      stream.Stop to prevent potential deadlock in Stop()
- #8  processor_step: use sync.Once in consumer func so fan-out (processor
      emitting multiple messages) does not block on the size-1 channel
- #9  processor_step: deferred cleanup selects on ctx.Done() when draining
      streamDone so a cancelled parent context does not cause an unconditional
      block
- #17 input_module: extract real transport type via inputTransportType() helper
      so LogStreamStart logs "kafka"/"generate"/etc. instead of "bento.input"
- #18 output_module: extract real transport type via outputTransportType()
      helper so LogStreamStart logs the actual Bento output type
- #19 bridge: mapToMessage now returns (*service.Message, error); json.Marshal
      errors are surfaced instead of silently ignored; updated all call sites
- #20 tests: replace remaining time.Sleep(50ms) calls with polling loops in
      stream_module_test, output_module_test, and broker_module_test

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
Co-authored-by: Copilot <198982749+Copilot@users.noreply.github.com>
Co-authored-by: intel352 <77607+intel352@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants