feat(stream): add jobTrigger and kaiAgent sink types#2573
feat(stream): add jobTrigger and kaiAgent sink types#2573
Conversation
Introduces a new sink type that fires a Keboola Queue job directly on every received webhook record. No local storage or buffering — each HTTP record triggers one Queue API call. Key changes: - definition.SinkTypeJobTrigger + JobTriggerSink struct (componentId, configId, branchId, optional configDataTemplate Jsonnet field) - jobtriggersink package: Bridge (stores SAPI token in etcd at sink activation), Pipeline (evaluates optional Jsonnet configDataTemplate, calls Queue API fire-and-forget) - Token lifecycle: stored in etcd under stream/job-trigger/token/<sinkKey> via OnSinkActivation/OnSinkDeactivation hooks; read back by the pipeline opener when processing each record - Goa DSL updated with JobTrigger* types; API regenerated (OpenAPI, types, encode/decode) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- API mappers: add jobTrigger case to NewSinkEntity, UpdateSinkEntity, NewSinkResponse so the type can be created/updated/read via API - Stats tracking: SinkStats struct stored in etcd under stream/job-trigger/stats/<sinkKey>; Bridge.AddStats() does read-modify-write flushed from pipeline.Close(); Bridge.Stats() queried by the API - SinkStatisticsTotal handler extended: jobTrigger sinks return triggered/ failed counts via Target.RecordsCount instead of slice-based storage stats - Pipeline: log triggered job ID at Info level; accumulate stats atomically in memory and flush to etcd on Close - E2E tests: fix create-005 enum error message; add create-016 (happy path) and create-017 (missing jobTrigger body 400) fixtures Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
@claude review |
- create-017: add missing trailing period to expected error message - SinkStatisticsTotal: restore sinkMustExist check before Sink().Get() so that a missing source returns sourceNotFound (not sinkNotFound) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Review: feat(stream) add jobTrigger sink type
The overall design is clean and well-scoped. The architecture (Bridge/Pipeline/Stats separation, plugin lifecycle hooks, etcd-backed token storage) follows existing patterns correctly. A few issues need attention before merge.
Bugs
Nil pointer dereference in UpdateSinkEntity default branch (see inline on sink_request.go:109)
The switch is on entity.Type but the error message calls payload.Type.String(). payload.Type is optional on updates and will be nil when omitted, causing a panic rather than a 400.
Behavioural gaps — pre-existing handlers not updated for the new type
Two existing handlers now silently misbehave for jobTrigger sinks:
-
SinkStatisticsClear(sink.go:384): callsStatisticsRepository().ResetSinkStats()which has no effect on job-trigger stats (those live instream/job-trigger/stats/<sinkKey>via the bridge). The API returns HTTP 200 while the actual triggered/failed counters remain unchanged. -
SinkStatisticsFiles(sink.go:320): runs against the storage file list, which is always empty for ajobTriggersink. Returns a successful empty array with no indication that the endpoint is inapplicable to this sink type.
Both should either handle the new type explicitly or return a clear error.
Design issues
Non-atomic stats read-modify-write (inline on bridge.go:140-165)
AddStats does a plain read → mutate → write without a transaction. Two nodes flushing concurrently can silently overwrite each other's updates, losing one batch of stats entirely — not just slight drift. An etcd compare-and-swap / transaction loop is the standard fix.
Orphaned stats on deactivation (inline on bridge.go:90-96)
OnSinkDeactivation deletes the token but leaves the stats key. Deleted-then-undeleted sinks accumulate stats on top of stale values; permanently deleted sinks leave orphaned keys in etcd indefinitely.
ConfigDataTemplate not validated at creation time (inline on pipeline.go:59-76)
Table sink column templates are validated eagerly via vm.Validate() in the mapper. ConfigDataTemplate is only validated at webhook-dispatch time, so an invalid Jsonnet expression is accepted silently at sink creation and surfaces as a per-request RecordError under live traffic.
Silent nil JobTrigger in response (inline on sink_response.go:41-44)
If entity.JobTrigger is nil for a jobTrigger-typed sink (e.g. after a version rollback), the response omits the field with HTTP 200. An explicit error is preferable to a silently malformed response.
There was a problem hiding this comment.
Pull request overview
This PR introduces a new jobTrigger sink type to the Stream service, allowing each incoming webhook record to synchronously trigger a Keboola Queue job (optionally deriving configData via Jsonnet), and exposes basic job-trigger statistics via the existing sink statistics API.
Changes:
- Adds
jobTriggersink definition/schema and pipeline implementation that triggers Queue jobs per record. - Persists a per-sink token and job-trigger counters in etcd and maps them into the sink statistics API.
- Extends API (Goa design + generated server/OpenAPI) and adds API test fixtures for create/validation scenarios.
Reviewed changes
Copilot reviewed 44 out of 46 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
internal/pkg/service/stream/definition/sink_job_trigger.go |
Adds SinkTypeJobTrigger and JobTriggerSink config struct. |
internal/pkg/service/stream/definition/sink_base.go |
Extends sink base schema with JobTrigger *JobTriggerSink. |
internal/pkg/service/stream/sink/type/jobtriggersink/bridge.go |
Adds etcd-backed token storage + stats bridge and lifecycle hooks. |
internal/pkg/service/stream/sink/type/jobtriggersink/pipeline.go |
Implements pipeline that evaluates Jsonnet template and triggers Queue jobs. |
internal/pkg/service/stream/sink/type/jobtriggersink/stats.go |
Defines etcd schema and struct for job-trigger stats. |
internal/pkg/service/stream/dependencies/dependencies.go |
Adds KeboolaTokenCtxKey and exposes JobTriggerBridge() in ServiceScope. |
internal/pkg/service/stream/dependencies/service.go |
Instantiates JobTriggerBridge and wires token extraction from request context. |
internal/pkg/service/stream/dependencies/source.go |
Registers the jobTrigger sink pipeline opener in the plugins system. |
internal/pkg/service/stream/api/service/auth.go |
Stores Storage API token into request context for lifecycle hooks. |
internal/pkg/service/stream/api/service/sink.go |
Returns jobTrigger stats via stats API for jobTrigger sinks. |
internal/pkg/service/stream/api/mapper/sink_request.go |
Maps create/update payloads into JobTriggerSink entity config. |
internal/pkg/service/stream/api/mapper/sink_response.go |
Maps JobTriggerSink entity config into API response types. |
api/stream/design.go |
Extends Goa design with jobTrigger sink type + types and enum value. |
internal/pkg/service/stream/api/gen/stream/service.go |
Regenerates Goa types to include JobTriggerSink*. |
internal/pkg/service/stream/api/gen/http/stream/server/types.go |
Regenerates HTTP server request/response structs + validators for jobTrigger. |
internal/pkg/service/stream/api/gen/http/stream/server/encode_decode.go |
Regenerates marshal/unmarshal helpers for jobTrigger. |
internal/pkg/service/stream/api/openapi/openapi3.yaml |
Updates OpenAPI 3 spec to include jobTrigger schema/fields/examples. |
internal/pkg/service/stream/api/openapi/openapi.yaml |
Updates OpenAPI 2 spec to include jobTrigger schema/fields/examples. |
internal/pkg/service/stream/api/openapi/openapi.json |
Updates Swagger JSON to include jobTrigger schema/fields/examples. |
test/stream/api/sink/create-016-job-trigger-ok/** |
Adds API test fixture for successful jobTrigger sink creation (+ expected etcd KVs). |
test/stream/api/sink/create-017-job-trigger-missing-body-400/** |
Adds API test fixture for missing jobTrigger body validation. |
test/stream/api/sink/create-005-invalid-type-400/003-create-sink/expected-response.json |
Updates invalid-enum error message to include jobTrigger. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
- sink_request.go: fix nil pointer in UpdateSinkEntity default case (entity.Type vs payload.Type) - sink_request.go: validate ConfigDataTemplate Jsonnet syntax eagerly at create/update time - sink_response.go: return error when jobTrigger sink has nil JobTrigger config instead of silently omitting the field - pipeline.go: add explicit nil check after Jsonnet configDataTemplate evaluation (null output → error) - bridge.go: encrypt SAPI token at rest using cloudencrypt.GenericEncryptor[string]; add Encryptor() to bridgeDeps - bridge.go: delete stats key on sink deactivation alongside token to prevent orphaned stats - test fixture: update expected etcd token format to encryptedToken Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…lexity Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
… integration Each incoming webhook record is forwarded to kai-agent via one of two modes: - chat: POST /api/chat with fire-and-forget SSE (server processes asynchronously) - suggestions: POST /api/suggestions, JSON response logged Token lifecycle mirrors jobTrigger: SAPI token stored encrypted in etcd at sink activation, removed on deactivation. Service URL derived automatically from StorageAPIHost (connection.X → kai-agent.X). Both modes support optional Jsonnet templates for customising the outgoing payload from the incoming webhook body (messageTemplate / dataTemplate). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
@claude review |
Release Notes
jobTriggersink type: each incoming HTTP webhook directly triggers a Keboola Queue job — configurablecomponentId/configId/branchIdplus optionalconfigDataTemplateJsonnet field to extract runtime parameters from the webhook bodykaiAgentsink type: each incoming webhook record is forwarded directly tokai-agent.keboola.com, either as a user chat message (POST /api/chat, fire-and-forget SSE) or as a suggestions request (POST /api/suggestions, JSON response)StorageAPIHost(connection.X→kai-agent.X); no extra config neededPlans for customer communication
None. Both are new optional sink types; existing table sinks and all other service behaviour are unaffected.
Impact analysis
internal/pkg/service/stream/definition: newSinkTypeJobTrigger,SinkTypeKaiAgentconstants;JobTriggerSinkandKaiAgentSinkstructs;sink_base.goadds both optional fieldsinternal/pkg/service/stream/sink/type/jobtriggersink:Bridge(SAPI token + stats in etcd),Pipeline(WriteRecord fires Queue job via SDK),SinkStatsinternal/pkg/service/stream/sink/type/kaiagentsink:Bridge(SAPI token + stats in etcd, URL derivation),Pipeline(WriteRecord POSTs to kai-agent/api/chator/api/suggestions),SinkStatsinternal/pkg/service/stream/api/mapper:sink_request.goandsink_response.goextended withjobTriggerandkaiAgentcases; Jsonnet template validation on create/updateinternal/pkg/service/stream/api/service/sink.go:SinkStatisticsTotalreturns job trigger stats forjobTriggersinksapi/stream/design.go+ generated OpenAPI/server code: new Goa types for both sink types;SinkTypeenum extendedinternal/pkg/service/stream/dependencies: newJobTriggerBridge()andKaiAgentBridge()onServiceScope; both openers registered insource.goChange type
Feature — New
jobTriggerandkaiAgentsink types for the Stream serviceJustification
Users receiving webhooks via HTTP sources currently can only write data to Keboola Storage tables. These two sink types enable event-driven patterns without a separate orchestration layer:
jobTriggerturns any webhook into a Queue job invocation;kaiAgentturns any webhook into an AI chat message or contextual suggestion request, enabling enrichment of streaming data on intake via the kai-agent service.Deployment
Merge & automatic deploy. No database migrations. New etcd key prefixes (
stream/job-trigger/token/,stream/job-trigger/stats/,stream/kai-agent/token/,stream/kai-agent/stats/) are created on demand.Rollback plan
Revert of this PR. Any sinks of either new type created before rollback will remain in etcd but the service will reject them (unknown type). Token/stats keys can be cleaned up manually under the respective etcd prefixes.
Post release support plan
Monitor logs for
"job trigger failed"(Error) and"kai-agent chat POST failed"/"kai-agent suggestions POST failed"(Error) after first sinks of these types go live.