Add subagent work units across providers#1199
Add subagent work units across providers#1199juliusmarminge wants to merge 4 commits intosplit-stack/claude-siblingfrom
Conversation
- add decoding defaults in `AppSettingsSchema` so older persisted settings load safely - export shared `Schema.Literals` types for `EnvMode` and `TimestampFormat` - add a regression test covering pre-new-key settings hydration
Co-authored-by: codex <codex@users.noreply.github.com>
|
Important Review skippedAuto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Repository UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
📝 Coding Plan
Comment |
| const ProjectionThreadWorkUnitDbRowSchema = ProjectionThreadWorkUnit.mapFields( | ||
| Struct.assign({ | ||
| providerRefs: Schema.NullOr( | ||
| Schema.fromJsonString(ProjectionThreadWorkUnit.fields.providerRefs), | ||
| ), | ||
| }), | ||
| ); |
There was a problem hiding this comment.
🟡 Medium Layers/ProjectionSnapshotQuery.ts:67
ProjectionThreadWorkUnitDbRowSchema decodes provider_refs_json using Schema.fromJsonString(ProjectionThreadWorkUnit.fields.providerRefs), but providerRefs is defined as Schema.optional(OrchestrationWorkUnitProviderRefs). Passing the optional-wrapped schema to fromJsonString is semantically wrong because JSON cannot represent undefined — the decoder will never produce a valid value for this field. Other fields like attachments use the correct pattern: Schema.NullOr(Schema.fromJsonString(...)) with the underlying type schema. Consider passing OrchestrationWorkUnitProviderRefs directly to fromJsonString.
-const ProjectionThreadWorkUnitDbRowSchema = ProjectionThreadWorkUnit.mapFields(
- Struct.assign({
- providerRefs: Schema.NullOr(
- Schema.fromJsonString(ProjectionThreadWorkUnit.fields.providerRefs),
- ),
- }),
-);🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts around lines 67-73:
`ProjectionThreadWorkUnitDbRowSchema` decodes `provider_refs_json` using `Schema.fromJsonString(ProjectionThreadWorkUnit.fields.providerRefs)`, but `providerRefs` is defined as `Schema.optional(OrchestrationWorkUnitProviderRefs)`. Passing the optional-wrapped schema to `fromJsonString` is semantically wrong because JSON cannot represent `undefined` — the decoder will never produce a valid value for this field. Other fields like `attachments` use the correct pattern: `Schema.NullOr(Schema.fromJsonString(...))` with the underlying type schema. Consider passing `OrchestrationWorkUnitProviderRefs` directly to `fromJsonString`.
Evidence trail:
apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts lines 67-72 (REVIEWED_COMMIT) - shows `Schema.fromJsonString(ProjectionThreadWorkUnit.fields.providerRefs)` usage
apps/server/src/persistence/Services/ProjectionThreadWorkUnits.ts line 27 (REVIEWED_COMMIT) - defines `providerRefs: Schema.optional(OrchestrationWorkUnitProviderRefs)`
apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts lines 50, 56, 63, 77 (REVIEWED_COMMIT) - shows other `fromJsonString` usages all using concrete types directly, not optional-wrapped schemas
| function retainThreadWorkUnitsAfterRevert( | ||
| workUnits: ReadonlyArray<OrchestrationThread["workUnits"][number]>, | ||
| retainedTurnIds: ReadonlySet<string>, | ||
| ): ReadonlyArray<OrchestrationThread["workUnits"][number]> { | ||
| const retained = workUnits.filter((workUnit) => retainedTurnIds.has(workUnit.turnId)); | ||
| const retainedIds = new Set(retained.map((workUnit) => workUnit.id)); | ||
| return retained.filter( | ||
| (workUnit) => workUnit.parentWorkUnitId === null || retainedIds.has(workUnit.parentWorkUnitId), | ||
| ); | ||
| } |
There was a problem hiding this comment.
🟢 Low orchestration/projector.ts:140
retainThreadWorkUnitsAfterRevert drops a parent work unit while keeping its child, leaving an orphaned work unit with an invalid parentWorkUnitId. Given work unit B (parent A) and C (parent B), reverting to retain only C's turn keeps C but removes B, so C's parentWorkUnitId points to a non-existent work unit. The parent filter needs to recursively remove descendants of any dropped work unit.
-function retainThreadWorkUnitsAfterRevert(
- workUnits: ReadonlyArray<OrchestrationThread["workUnits"][number]>,
- retainedTurnIds: ReadonlySet<string>,
-): ReadonlyArray<OrchestrationThread["workUnits"][number]> {
- const retained = workUnits.filter((workUnit) => retainedTurnIds.has(workUnit.turnId));
- const retainedIds = new Set(retained.map((workUnit) => workUnit.id));
- return retained.filter(
- (workUnit) => workUnit.parentWorkUnitId === null || retainedIds.has(workUnit.parentWorkUnitId),
- );
-}🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file apps/server/src/orchestration/projector.ts around lines 140-149:
`retainThreadWorkUnitsAfterRevert` drops a parent work unit while keeping its child, leaving an orphaned work unit with an invalid `parentWorkUnitId`. Given work unit B (parent A) and C (parent B), reverting to retain only C's turn keeps C but removes B, so C's `parentWorkUnitId` points to a non-existent work unit. The parent filter needs to recursively remove descendants of any dropped work unit.
Evidence trail:
apps/server/src/orchestration/projector.ts lines 140-147 (REVIEWED_COMMIT): The `retainThreadWorkUnitsAfterRevert` function computes `retainedIds` from the turn-filtered set, then applies parent filtering. When a work unit is removed by the parent filter, its children still pass because `retainedIds` was computed before the parent filtering pass.
|
|
||
| if (eventTurnId !== undefined) { | ||
| yield* ensureRootWorkUnit(thread, event, eventTurnId, now); | ||
| } |
There was a problem hiding this comment.
🟢 Low Layers/ProviderRuntimeIngestion.ts:1122
When a turn.completed event arrives, ensureRootWorkUnit (line 1124) dispatches a work unit with state: "running" before the turn.completed handler (lines 1143-1157) dispatches the correct completion state. This produces a transient incorrect state where a completed turn's work unit is momentarily marked as running. Consider removing the ensureRootWorkUnit call for turn.completed events, since the completion handler already handles the state transition.
if (eventTurnId !== undefined) {
- yield* ensureRootWorkUnit(thread, event, eventTurnId, now);
+ if (event.type !== "turn.completed") {
+ yield* ensureRootWorkUnit(thread, event, eventTurnId, now);
+ }
}🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts around lines 1122-1125:
When a `turn.completed` event arrives, `ensureRootWorkUnit` (line 1124) dispatches a work unit with `state: "running"` before the `turn.completed` handler (lines 1143-1157) dispatches the correct completion state. This produces a transient incorrect state where a completed turn's work unit is momentarily marked as running. Consider removing the `ensureRootWorkUnit` call for `turn.completed` events, since the completion handler already handles the state transition.
Evidence trail:
apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts lines 1123-1124 (ensureRootWorkUnit called for all events with eventTurnId), lines 978-997 (ensureRootWorkUnit always dispatches state: "running" at line 991), lines 1143-1157 (turn.completed handler dispatches correct state), lines 930-976 (upsertWorkUnit dispatches to orchestrationEngine.dispatch)
| return typeof value === "number" && Number.isFinite(value) ? value : undefined; | ||
| } | ||
|
|
||
| function toTurnId(value: string | undefined): TurnId | undefined { |
There was a problem hiding this comment.
🟢 Low Layers/CodexAdapter.ts:112
toTurnId checks value?.trim() for truthiness but then passes the original untrimmed value to TurnId.makeUnsafe. If the input contains leading/trailing whitespace (e.g., " abc "), the check passes but a TurnId containing whitespace is created. Consider using TurnId.makeUnsafe(value.trim()) to match the validation logic.
-function toTurnId(value: string | undefined): TurnId | undefined {
- return value?.trim() ? TurnId.makeUnsafe(value) : undefined;
-}🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file apps/server/src/provider/Layers/CodexAdapter.ts around line 112:
`toTurnId` checks `value?.trim()` for truthiness but then passes the original untrimmed `value` to `TurnId.makeUnsafe`. If the input contains leading/trailing whitespace (e.g., `" abc "`), the check passes but a `TurnId` containing whitespace is created. Consider using `TurnId.makeUnsafe(value.trim())` to match the validation logic.
Evidence trail:
apps/server/src/provider/Layers/CodexAdapter.ts lines 112-113 (at REVIEWED_COMMIT): `function toTurnId(value: string | undefined): TurnId | undefined { return value?.trim() ? TurnId.makeUnsafe(value) : undefined; }` - validates trimmed value but passes untrimmed value. packages/contracts/src/baseSchemas.ts lines 14-29 (at REVIEWED_COMMIT): TurnId is defined via `makeEntityId` using `TrimmedNonEmptyString` which includes `Schema.Trim`, but `makeUnsafe` bypasses schema transformations.
This stacked draft PR builds on
split-stack/claude-siblingand introduces a generic turn-scoped work-unit model for subagent execution.Today our orchestration model treats a turn as the user-visible response boundary, but it did not have a first-class way to represent delegated or nested execution inside that turn. That left us flattening provider-specific subagent behavior into raw activity text or, in Codex's case, risking child-conversation state being mistaken for top-level turn state. The user-visible consequence was that plan/progress state and future subagent UI work had no durable, provider-agnostic execution model to build on.
The core change in this PR is a new
WorkUnitIddomain entity and a new orchestration work-unit shape owned by a turn. Activities can now optionally reference aworkUnitId, and threads now expose aworkUnitscollection alongside messages, plans, checkpoints, and activities. This keeps turns as the top-level interaction boundary while giving us a stable execution tree for delegated work. Historical compatibility is preserved by decoding missingworkUnitsand missing activityworkUnitIdvalues to empty/null defaults.On the persistence side, this PR adds a dedicated
projection_thread_work_unitstable plus a migration that backfills one root work unit per historical turn and links historical activities to that root. Projection write/read paths were updated so work units are projected, reverted, and hydrated as first-class read-model state instead of being reconstructed ad hoc from activity payloads.On the runtime-ingestion side, provider runtime events now produce work units generically. A turn creates or updates a root
primary_agentwork unit, andtask.started/task.progress/task.completedcreate or update delegateddelegated_agentwork units under that root. The delegated work unit keepsruntimeTaskId, and now also preserves the runtime item provenance when the provider can identify the spawning subagent tool item.Both providers are now mapped into that generic shape. Claude task telemetry is correlated back to the
Tasktool'stool_use_id, so later Claudetask.*events inherit the same subagent item identity. Codex child-conversation task events now inherit the parent collaboration tool item from the manager's receiver-thread routing, so flattened child task progress no longer loses the parent subagent provenance. This still keeps everything on the parent turn and thread timeline for now, but the data model is ready for richer subagent UI later.I also updated the small fixture surfaces that construct orchestration threads/activities directly so they carry the new required fields, and added regression coverage across contracts, projector, projection pipeline, snapshot hydration, Codex manager routing, both adapters, and runtime ingestion.
Validation run for this branch:
bun fmtbun lintbun typecheckcd packages/contracts && bun run test src/orchestration.test.tscd apps/server && bun run test src/orchestration/projector.test.tscd apps/server && bun run test src/orchestration/Layers/ProjectionPipeline.test.tscd apps/server && bun run test src/orchestration/Layers/ProjectionSnapshotQuery.test.tscd apps/server && bun run test src/orchestration/Layers/ProviderRuntimeIngestion.test.tscd apps/server && bun run test src/codexAppServerManager.test.tscd apps/server && bun run test src/provider/Layers/CodexAdapter.test.tscd apps/server && bun run test src/provider/Layers/ClaudeAdapter.test.tsNote
Add subagent work units tracking across providers and projection pipeline
OrchestrationWorkUnitas a first-class contract entity with kinds (primary_agent,delegated_agent), states, and provider refs; adds athread.work-unit.upsertcommand andthread.work-unit-upsertedevent to the orchestration system.ProviderRuntimeIngestion.tsto create and update work units as provider runtime events are processed: a rootprimary_agentwork unit per turn, and childdelegated_agentwork units for eachtask.started/task.progress/task.completedevent.projection_thread_work_unitstable (migration 016) and a new projector inProjectionPipeline.tsthat persists work units and prunes them onthread.reverted.work_unit_idcolumn onprojection_thread_activities, populated during upsert and exposed in snapshot queries.itemId/providerRefslinkage on Claudetask.*andtool.progressevents, and surfacesExitPlanModetool calls asturn.proposed.completedevents (denied viacanUseTool).getSnapshotnow queries an additional table and work unitupdatedAtcan advance the snapshot's overallupdatedAttimestamp.📊 Macroscope summarized e6d856b. 37 files reviewed, 9 issues evaluated, 1 issue filtered, 4 comments posted
🗂️ Filtered Issues
apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts — 0 comments posted, 1 evaluated, 1 filtered
let runtimevariable declared at line 64 is never assigned becausecreateHarnessdeclares a localconst runtimeat line 220 that shadows it. This means theafterEachcleanup at lines 76–79 (if (runtime) { await runtime.dispose(); }) will never execute, leaking theManagedRuntime(and its underlying SQLite connections, fibers, etc.) after each test. While this is a pre-existing issue (not introduced by the diff), the addition of the newProjectionThreadWorkUnitRepositoryand its projector in the pipeline means each leaked runtime now holds slightly more resources. [ Out of scope ]