PEN-118: add daemon dispatch lifecycle#81
Conversation
Co-authored-by: multica-agent <github@multica.ai>
There was a problem hiding this comment.
Code Review
This pull request implements a daemon dispatch lifecycle to route client traffic through daemon nodes, introducing a new dispatch module, database migrations for dispatch_jobs, and polling/execution logic in both the API gateway and the daemon. The review identifies several critical issues: the daemon's job polling is coupled with the heartbeat loop, causing up to 30-second latencies; jobs are processed sequentially, ignoring concurrency limits; frequent database updates within the busy-wait loop will cause severe write contention; and the gateway fails immediately when a selected node is busy instead of load-balancing to other healthy nodes.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| if let Err(error) = drain_dispatch_jobs(&client, &next_job_url, &token, &config).await { | ||
| warn!(error = %error, "daemon dispatch polling failed"); | ||
| } | ||
| sleep(Duration::from_secs(u64::from( | ||
| config.heartbeat_interval_seconds.max(1), | ||
| ))) | ||
| .await; |
There was a problem hiding this comment.
The job polling loop (drain_dispatch_jobs) is executed within the same loop as the daemon heartbeat, which sleeps for heartbeat_interval_seconds (defaulting to 30 seconds).
This means that once the daemon finishes processing any active jobs, it will sleep for up to 30 seconds before polling for new jobs again. This introduces massive, unacceptable latency for client requests routed through the daemon.
The job polling mechanism should run independently of the heartbeat loop (e.g., in a separate tokio task) and poll much more frequently (or use long-polling) to ensure low-latency dispatching.
| loop { | ||
| expire_timed_out_jobs(database, database_backend).await?; | ||
| let row = query_as::<_, DispatchJobRow>(&prepare_sql( |
There was a problem hiding this comment.
Calling expire_timed_out_jobs inside the busy-wait polling loop of wait_for_dispatch_result is highly inefficient. Since this loop runs every 50ms, it will execute database write queries (UPDATE dispatch_jobs ...) 20 times per second per waiting request. Under load, this will cause severe database write contention and lockups, especially on SQLite.
Since the current job's timeout is already handled by the deadline check and mark_dispatch_timed_out (lines 334-337), there is no need to expire all other timed-out jobs on every iteration of this loop.
loop {
let row = query_as::<_, DispatchJobRow>(&prepare_sql(| loop { | ||
| let next = request_next_job(client, next_job_url, token).await?; | ||
| let Some(job) = next.job else { | ||
| return Ok(()); | ||
| }; | ||
|
|
||
| if let Err(error) = execute_dispatch_job(client, token, config, job).await { | ||
| warn!(error = %error, "dispatch job execution failed"); | ||
| } | ||
| } |
There was a problem hiding this comment.
In drain_dispatch_jobs, the daemon processes jobs sequentially by awaiting execute_dispatch_job inside the loop.
This completely serializes job execution, meaning the daemon can only process one job at a time. This ignores the configured max_concurrency and will cause severe bottlenecks if a job takes a long time to complete.
Consider spawning a new tokio task for each job execution (while respecting max_concurrency using a semaphore or FuturesUnordered) so that multiple jobs can be processed concurrently.
| let node = daemon_nodes::select_eligible_daemon_node( | ||
| &state.database, | ||
| state.database_backend(), | ||
| &route.upstream_model, | ||
| i64::from(state.config.daemon_stale_seconds), | ||
| ) | ||
| .instrument(info_span!( | ||
| "daemon_select", | ||
| request_id = %context.request_id, | ||
| trace_id = %context.trace_id, | ||
| route = %public_model, | ||
| model = %route.upstream_model, | ||
| )) | ||
| .await? | ||
| .ok_or_else(|| AppError::provider("no eligible live daemon node for model"))?; | ||
|
|
||
| let active_count = dispatch::active_dispatch_count_for_node( | ||
| &state.database, | ||
| state.database_backend(), | ||
| node.id, | ||
| ) | ||
| .await?; | ||
| if active_count >= i64::from(node.max_concurrency) { | ||
| return Err(AppError::LimitExceeded(format!( | ||
| "daemon node concurrency exceeded for {}", | ||
| node.id | ||
| ))); | ||
| } |
There was a problem hiding this comment.
The current implementation of execute_daemon_dispatch selects a single eligible daemon node using select_eligible_daemon_node and then checks if it has exceeded its concurrency limit. If it has, the request immediately fails with a LimitExceeded error.
This approach breaks load balancing: if the first selected node is busy, the request fails even if there are other healthy, idle daemon nodes available.
Instead, the gateway should either:
- Query for all eligible nodes and filter/sort them by their active dispatch count.
- Or, if the selected node is busy, attempt to select and dispatch to another eligible node before failing.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 6f4c12b3be
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| let body: OpenAiChatCompletionResponse = response.json().await.map_err(|error| { | ||
| AppError::infrastructure(format!("invalid local provider response: {error}")) | ||
| })?; |
There was a problem hiding this comment.
Mark jobs failed when 200 responses cannot be decoded
When the local provider returns HTTP 200 with malformed/non-OpenAI JSON (or fields this struct rejects, such as a null message content), this ? exits execute_dispatch_job and drain_dispatch_jobs only logs the error, leaving the dispatch job in running until the gateway-side timeout expires. The non-2xx and transport branches report /fail, so decode failures should do the same to avoid making clients wait for a timeout instead of receiving the real provider error promptly.
Useful? React with 👍 / 👎.
| return Ok(()); | ||
| }; | ||
|
|
||
| if let Err(error) = execute_dispatch_job(client, token, config, job).await { |
There was a problem hiding this comment.
Run daemon jobs up to advertised concurrency
When max_concurrency is configured above 1, this loop still awaits each local provider call before polling the next job, so a daemon advertises capacity the worker never actually uses. The gateway can lease multiple jobs to that node up to the advertised limit, but all but the first remain queued behind this await and can time out under longer local completions instead of running concurrently.
Useful? React with 👍 / 👎.
| if active_count >= i64::from(node.max_concurrency) { | ||
| return Err(AppError::LimitExceeded(format!( | ||
| "daemon node concurrency exceeded for {}", | ||
| node.id | ||
| ))); | ||
| } |
There was a problem hiding this comment.
Try other daemon nodes before rejecting capacity
When multiple healthy daemons serve the same model and the node returned by select_eligible_daemon_node is already at max_concurrency, this branch returns 429 even if another eligible node is idle, because the selection query only orders by liveness/creation and does not include active dispatch count. In that multi-node context, route dispatch should continue to another eligible node or select with capacity included instead of rejecting available cluster capacity.
Useful? React with 👍 / 👎.
Summary
Verification