Skip to content

feat(stream): add jobTrigger and kaiAgent sink types#2573

Draft
Matovidlo wants to merge 6 commits intomainfrom
feat/job-trigger-sink
Draft

feat(stream): add jobTrigger and kaiAgent sink types#2573
Matovidlo wants to merge 6 commits intomainfrom
feat/job-trigger-sink

Conversation

@Matovidlo
Copy link
Copy Markdown
Contributor

@Matovidlo Matovidlo commented Apr 6, 2026

Release Notes

  • Adds jobTrigger sink type: each incoming HTTP webhook directly triggers a Keboola Queue job — configurable componentId/configId/branchId plus optional configDataTemplate Jsonnet field to extract runtime parameters from the webhook body
  • Adds kaiAgent sink type: each incoming webhook record is forwarded directly to kai-agent.keboola.com, either as a user chat message (POST /api/chat, fire-and-forget SSE) or as a suggestions request (POST /api/suggestions, JSON response)
  • Both new sink types use optional Jsonnet templates to shape the outgoing payload from the incoming request body
  • Service URL for kai-agent is derived automatically from StorageAPIHost (connection.Xkai-agent.X); no extra config needed
  • Job trigger stats (triggered/failed counts, timestamps) and kai-agent stats (sent/failed) are persisted in etcd and exposed via the sink statistics API

Plans for customer communication

None. Both are new optional sink types; existing table sinks and all other service behaviour are unaffected.

Impact analysis

  • internal/pkg/service/stream/definition: new SinkTypeJobTrigger, SinkTypeKaiAgent constants; JobTriggerSink and KaiAgentSink structs; sink_base.go adds both optional fields
  • internal/pkg/service/stream/sink/type/jobtriggersink: Bridge (SAPI token + stats in etcd), Pipeline (WriteRecord fires Queue job via SDK), SinkStats
  • internal/pkg/service/stream/sink/type/kaiagentsink: Bridge (SAPI token + stats in etcd, URL derivation), Pipeline (WriteRecord POSTs to kai-agent /api/chat or /api/suggestions), SinkStats
  • internal/pkg/service/stream/api/mapper: sink_request.go and sink_response.go extended with jobTrigger and kaiAgent cases; Jsonnet template validation on create/update
  • internal/pkg/service/stream/api/service/sink.go: SinkStatisticsTotal returns job trigger stats for jobTrigger sinks
  • api/stream/design.go + generated OpenAPI/server code: new Goa types for both sink types; SinkType enum extended
  • internal/pkg/service/stream/dependencies: new JobTriggerBridge() and KaiAgentBridge() on ServiceScope; both openers registered in source.go
  • No breaking changes; etcd key prefixes created on demand on first sink activation

Change type

Feature — New jobTrigger and kaiAgent sink types for the Stream service

Justification

Users receiving webhooks via HTTP sources currently can only write data to Keboola Storage tables. These two sink types enable event-driven patterns without a separate orchestration layer: jobTrigger turns any webhook into a Queue job invocation; kaiAgent turns any webhook into an AI chat message or contextual suggestion request, enabling enrichment of streaming data on intake via the kai-agent service.

Deployment

Merge & automatic deploy. No database migrations. New etcd key prefixes (stream/job-trigger/token/, stream/job-trigger/stats/, stream/kai-agent/token/, stream/kai-agent/stats/) are created on demand.

Rollback plan

Revert of this PR. Any sinks of either new type created before rollback will remain in etcd but the service will reject them (unknown type). Token/stats keys can be cleaned up manually under the respective etcd prefixes.

Post release support plan

Monitor logs for "job trigger failed" (Error) and "kai-agent chat POST failed" / "kai-agent suggestions POST failed" (Error) after first sinks of these types go live.

Matovidlo and others added 2 commits April 6, 2026 11:05
Introduces a new sink type that fires a Keboola Queue job directly on
every received webhook record. No local storage or buffering — each
HTTP record triggers one Queue API call.

Key changes:
- definition.SinkTypeJobTrigger + JobTriggerSink struct (componentId,
  configId, branchId, optional configDataTemplate Jsonnet field)
- jobtriggersink package: Bridge (stores SAPI token in etcd at sink
  activation), Pipeline (evaluates optional Jsonnet configDataTemplate,
  calls Queue API fire-and-forget)
- Token lifecycle: stored in etcd under stream/job-trigger/token/<sinkKey>
  via OnSinkActivation/OnSinkDeactivation hooks; read back by the pipeline
  opener when processing each record
- Goa DSL updated with JobTrigger* types; API regenerated (OpenAPI, types,
  encode/decode)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- API mappers: add jobTrigger case to NewSinkEntity, UpdateSinkEntity,
  NewSinkResponse so the type can be created/updated/read via API
- Stats tracking: SinkStats struct stored in etcd under
  stream/job-trigger/stats/<sinkKey>; Bridge.AddStats() does read-modify-write
  flushed from pipeline.Close(); Bridge.Stats() queried by the API
- SinkStatisticsTotal handler extended: jobTrigger sinks return triggered/
  failed counts via Target.RecordsCount instead of slice-based storage stats
- Pipeline: log triggered job ID at Info level; accumulate stats atomically
  in memory and flush to etcd on Close
- E2E tests: fix create-005 enum error message; add create-016 (happy path)
  and create-017 (missing jobTrigger body 400) fixtures

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@Matovidlo
Copy link
Copy Markdown
Contributor Author

@claude review

- create-017: add missing trailing period to expected error message
- SinkStatisticsTotal: restore sinkMustExist check before Sink().Get() so
  that a missing source returns sourceNotFound (not sinkNotFound)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Comment thread internal/pkg/service/stream/sink/type/jobtriggersink/bridge.go
Comment thread internal/pkg/service/stream/sink/type/jobtriggersink/bridge.go Outdated
Comment thread internal/pkg/service/stream/sink/type/jobtriggersink/pipeline.go
Comment thread internal/pkg/service/stream/api/mapper/sink_request.go Outdated
Comment thread internal/pkg/service/stream/api/mapper/sink_response.go
Copy link
Copy Markdown

@claude claude Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review: feat(stream) add jobTrigger sink type

The overall design is clean and well-scoped. The architecture (Bridge/Pipeline/Stats separation, plugin lifecycle hooks, etcd-backed token storage) follows existing patterns correctly. A few issues need attention before merge.

Bugs

Nil pointer dereference in UpdateSinkEntity default branch (see inline on sink_request.go:109)
The switch is on entity.Type but the error message calls payload.Type.String(). payload.Type is optional on updates and will be nil when omitted, causing a panic rather than a 400.

Behavioural gaps — pre-existing handlers not updated for the new type

Two existing handlers now silently misbehave for jobTrigger sinks:

  • SinkStatisticsClear (sink.go:384): calls StatisticsRepository().ResetSinkStats() which has no effect on job-trigger stats (those live in stream/job-trigger/stats/<sinkKey> via the bridge). The API returns HTTP 200 while the actual triggered/failed counters remain unchanged.

  • SinkStatisticsFiles (sink.go:320): runs against the storage file list, which is always empty for a jobTrigger sink. Returns a successful empty array with no indication that the endpoint is inapplicable to this sink type.

Both should either handle the new type explicitly or return a clear error.

Design issues

Non-atomic stats read-modify-write (inline on bridge.go:140-165)
AddStats does a plain read → mutate → write without a transaction. Two nodes flushing concurrently can silently overwrite each other's updates, losing one batch of stats entirely — not just slight drift. An etcd compare-and-swap / transaction loop is the standard fix.

Orphaned stats on deactivation (inline on bridge.go:90-96)
OnSinkDeactivation deletes the token but leaves the stats key. Deleted-then-undeleted sinks accumulate stats on top of stale values; permanently deleted sinks leave orphaned keys in etcd indefinitely.

ConfigDataTemplate not validated at creation time (inline on pipeline.go:59-76)
Table sink column templates are validated eagerly via vm.Validate() in the mapper. ConfigDataTemplate is only validated at webhook-dispatch time, so an invalid Jsonnet expression is accepted silently at sink creation and surfaces as a per-request RecordError under live traffic.

Silent nil JobTrigger in response (inline on sink_response.go:41-44)
If entity.JobTrigger is nil for a jobTrigger-typed sink (e.g. after a version rollback), the response omits the field with HTTP 200. An explicit error is preferable to a silently malformed response.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a new jobTrigger sink type to the Stream service, allowing each incoming webhook record to synchronously trigger a Keboola Queue job (optionally deriving configData via Jsonnet), and exposes basic job-trigger statistics via the existing sink statistics API.

Changes:

  • Adds jobTrigger sink definition/schema and pipeline implementation that triggers Queue jobs per record.
  • Persists a per-sink token and job-trigger counters in etcd and maps them into the sink statistics API.
  • Extends API (Goa design + generated server/OpenAPI) and adds API test fixtures for create/validation scenarios.

Reviewed changes

Copilot reviewed 44 out of 46 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
internal/pkg/service/stream/definition/sink_job_trigger.go Adds SinkTypeJobTrigger and JobTriggerSink config struct.
internal/pkg/service/stream/definition/sink_base.go Extends sink base schema with JobTrigger *JobTriggerSink.
internal/pkg/service/stream/sink/type/jobtriggersink/bridge.go Adds etcd-backed token storage + stats bridge and lifecycle hooks.
internal/pkg/service/stream/sink/type/jobtriggersink/pipeline.go Implements pipeline that evaluates Jsonnet template and triggers Queue jobs.
internal/pkg/service/stream/sink/type/jobtriggersink/stats.go Defines etcd schema and struct for job-trigger stats.
internal/pkg/service/stream/dependencies/dependencies.go Adds KeboolaTokenCtxKey and exposes JobTriggerBridge() in ServiceScope.
internal/pkg/service/stream/dependencies/service.go Instantiates JobTriggerBridge and wires token extraction from request context.
internal/pkg/service/stream/dependencies/source.go Registers the jobTrigger sink pipeline opener in the plugins system.
internal/pkg/service/stream/api/service/auth.go Stores Storage API token into request context for lifecycle hooks.
internal/pkg/service/stream/api/service/sink.go Returns jobTrigger stats via stats API for jobTrigger sinks.
internal/pkg/service/stream/api/mapper/sink_request.go Maps create/update payloads into JobTriggerSink entity config.
internal/pkg/service/stream/api/mapper/sink_response.go Maps JobTriggerSink entity config into API response types.
api/stream/design.go Extends Goa design with jobTrigger sink type + types and enum value.
internal/pkg/service/stream/api/gen/stream/service.go Regenerates Goa types to include JobTriggerSink*.
internal/pkg/service/stream/api/gen/http/stream/server/types.go Regenerates HTTP server request/response structs + validators for jobTrigger.
internal/pkg/service/stream/api/gen/http/stream/server/encode_decode.go Regenerates marshal/unmarshal helpers for jobTrigger.
internal/pkg/service/stream/api/openapi/openapi3.yaml Updates OpenAPI 3 spec to include jobTrigger schema/fields/examples.
internal/pkg/service/stream/api/openapi/openapi.yaml Updates OpenAPI 2 spec to include jobTrigger schema/fields/examples.
internal/pkg/service/stream/api/openapi/openapi.json Updates Swagger JSON to include jobTrigger schema/fields/examples.
test/stream/api/sink/create-016-job-trigger-ok/** Adds API test fixture for successful jobTrigger sink creation (+ expected etcd KVs).
test/stream/api/sink/create-017-job-trigger-missing-body-400/** Adds API test fixture for missing jobTrigger body validation.
test/stream/api/sink/create-005-invalid-type-400/003-create-sink/expected-response.json Updates invalid-enum error message to include jobTrigger.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread internal/pkg/service/stream/sink/type/jobtriggersink/bridge.go
Comment thread internal/pkg/service/stream/sink/type/jobtriggersink/pipeline.go
Comment thread internal/pkg/service/stream/sink/type/jobtriggersink/pipeline.go
Comment thread api/stream/design.go
Comment thread internal/pkg/service/stream/sink/type/jobtriggersink/pipeline.go
Matovidlo and others added 3 commits April 7, 2026 09:56
- sink_request.go: fix nil pointer in UpdateSinkEntity default case (entity.Type vs payload.Type)
- sink_request.go: validate ConfigDataTemplate Jsonnet syntax eagerly at create/update time
- sink_response.go: return error when jobTrigger sink has nil JobTrigger config instead of silently omitting the field
- pipeline.go: add explicit nil check after Jsonnet configDataTemplate evaluation (null output → error)
- bridge.go: encrypt SAPI token at rest using cloudencrypt.GenericEncryptor[string]; add Encryptor() to bridgeDeps
- bridge.go: delete stats key on sink deactivation alongside token to prevent orphaned stats
- test fixture: update expected etcd token format to encryptedToken

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…lexity

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
… integration

Each incoming webhook record is forwarded to kai-agent via one of two modes:
- chat: POST /api/chat with fire-and-forget SSE (server processes asynchronously)
- suggestions: POST /api/suggestions, JSON response logged

Token lifecycle mirrors jobTrigger: SAPI token stored encrypted in etcd at
sink activation, removed on deactivation. Service URL derived automatically
from StorageAPIHost (connection.X → kai-agent.X).

Both modes support optional Jsonnet templates for customising the outgoing
payload from the incoming webhook body (messageTemplate / dataTemplate).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@Matovidlo Matovidlo changed the title feat(stream): add jobTrigger sink type feat(stream): add jobTrigger and kaiAgent sink types May 6, 2026
@Matovidlo Matovidlo requested a review from Copilot May 6, 2026 16:28
@Matovidlo
Copy link
Copy Markdown
Contributor Author

@claude review

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants