Skip to content

PEN-118: add daemon dispatch lifecycle#81

Closed
wauputr4 wants to merge 1 commit into
mainfrom
agent/dimas/c2a9a4f1
Closed

PEN-118: add daemon dispatch lifecycle#81
wauputr4 wants to merge 1 commit into
mainfrom
agent/dimas/c2a9a4f1

Conversation

@wauputr4

Copy link
Copy Markdown
Member

Summary

  • add dispatch_jobs storage and daemon job polling/complete/fail control-plane endpoints
  • route non-streaming auth_mode=daemon model routes through eligible healthy daemon nodes with timeout/failure handling
  • teach mizan-daemon to poll jobs, execute against its local OpenAI-compatible provider, and report normalized results
  • document daemon route setup and migration/index coverage

Verification

  • git diff --cached --check
  • git diff --check
  • Not run: cargo fmt / cargo check / cargo test, because cargo is not installed in this runtime

Co-authored-by: multica-agent <github@multica.ai>

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements a daemon dispatch lifecycle to route client traffic through daemon nodes, introducing a new dispatch module, database migrations for dispatch_jobs, and polling/execution logic in both the API gateway and the daemon. The review identifies several critical issues: the daemon's job polling is coupled with the heartbeat loop, causing up to 30-second latencies; jobs are processed sequentially, ignoring concurrency limits; frequent database updates within the busy-wait loop will cause severe write contention; and the gateway fails immediately when a selected node is busy instead of load-balancing to other healthy nodes.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment on lines +103 to 109
if let Err(error) = drain_dispatch_jobs(&client, &next_job_url, &token, &config).await {
warn!(error = %error, "daemon dispatch polling failed");
}
sleep(Duration::from_secs(u64::from(
config.heartbeat_interval_seconds.max(1),
)))
.await;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The job polling loop (drain_dispatch_jobs) is executed within the same loop as the daemon heartbeat, which sleeps for heartbeat_interval_seconds (defaulting to 30 seconds).

This means that once the daemon finishes processing any active jobs, it will sleep for up to 30 seconds before polling for new jobs again. This introduces massive, unacceptable latency for client requests routed through the daemon.

The job polling mechanism should run independently of the heartbeat loop (e.g., in a separate tokio task) and poll much more frequently (or use long-polling) to ensure low-latency dispatching.

Comment on lines +292 to +294
loop {
expire_timed_out_jobs(database, database_backend).await?;
let row = query_as::<_, DispatchJobRow>(&prepare_sql(

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Calling expire_timed_out_jobs inside the busy-wait polling loop of wait_for_dispatch_result is highly inefficient. Since this loop runs every 50ms, it will execute database write queries (UPDATE dispatch_jobs ...) 20 times per second per waiting request. Under load, this will cause severe database write contention and lockups, especially on SQLite.

Since the current job's timeout is already handled by the deadline check and mark_dispatch_timed_out (lines 334-337), there is no need to expire all other timed-out jobs on every iteration of this loop.

    loop {
        let row = query_as::<_, DispatchJobRow>(&prepare_sql(

Comment on lines +119 to +128
loop {
let next = request_next_job(client, next_job_url, token).await?;
let Some(job) = next.job else {
return Ok(());
};

if let Err(error) = execute_dispatch_job(client, token, config, job).await {
warn!(error = %error, "dispatch job execution failed");
}
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

In drain_dispatch_jobs, the daemon processes jobs sequentially by awaiting execute_dispatch_job inside the loop.

This completely serializes job execution, meaning the daemon can only process one job at a time. This ignores the configured max_concurrency and will cause severe bottlenecks if a job takes a long time to complete.

Consider spawning a new tokio task for each job execution (while respecting max_concurrency using a semaphore or FuturesUnordered) so that multiple jobs can be processed concurrently.

Comment on lines +757 to +784
let node = daemon_nodes::select_eligible_daemon_node(
&state.database,
state.database_backend(),
&route.upstream_model,
i64::from(state.config.daemon_stale_seconds),
)
.instrument(info_span!(
"daemon_select",
request_id = %context.request_id,
trace_id = %context.trace_id,
route = %public_model,
model = %route.upstream_model,
))
.await?
.ok_or_else(|| AppError::provider("no eligible live daemon node for model"))?;

let active_count = dispatch::active_dispatch_count_for_node(
&state.database,
state.database_backend(),
node.id,
)
.await?;
if active_count >= i64::from(node.max_concurrency) {
return Err(AppError::LimitExceeded(format!(
"daemon node concurrency exceeded for {}",
node.id
)));
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current implementation of execute_daemon_dispatch selects a single eligible daemon node using select_eligible_daemon_node and then checks if it has exceeded its concurrency limit. If it has, the request immediately fails with a LimitExceeded error.

This approach breaks load balancing: if the first selected node is busy, the request fails even if there are other healthy, idle daemon nodes available.

Instead, the gateway should either:

  1. Query for all eligible nodes and filter/sort them by their active dispatch count.
  2. Or, if the selected node is busy, attempt to select and dispatch to another eligible node before failing.

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 6f4c12b3be

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +169 to +171
let body: OpenAiChatCompletionResponse = response.json().await.map_err(|error| {
AppError::infrastructure(format!("invalid local provider response: {error}"))
})?;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Mark jobs failed when 200 responses cannot be decoded

When the local provider returns HTTP 200 with malformed/non-OpenAI JSON (or fields this struct rejects, such as a null message content), this ? exits execute_dispatch_job and drain_dispatch_jobs only logs the error, leaving the dispatch job in running until the gateway-side timeout expires. The non-2xx and transport branches report /fail, so decode failures should do the same to avoid making clients wait for a timeout instead of receiving the real provider error promptly.

Useful? React with 👍 / 👎.

return Ok(());
};

if let Err(error) = execute_dispatch_job(client, token, config, job).await {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Run daemon jobs up to advertised concurrency

When max_concurrency is configured above 1, this loop still awaits each local provider call before polling the next job, so a daemon advertises capacity the worker never actually uses. The gateway can lease multiple jobs to that node up to the advertised limit, but all but the first remain queued behind this await and can time out under longer local completions instead of running concurrently.

Useful? React with 👍 / 👎.

Comment on lines +779 to +784
if active_count >= i64::from(node.max_concurrency) {
return Err(AppError::LimitExceeded(format!(
"daemon node concurrency exceeded for {}",
node.id
)));
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Try other daemon nodes before rejecting capacity

When multiple healthy daemons serve the same model and the node returned by select_eligible_daemon_node is already at max_concurrency, this branch returns 429 even if another eligible node is idle, because the selection query only orders by liveness/creation and does not include active dispatch count. In that multi-node context, route dispatch should continue to another eligible node or select with capacity included instead of rejecting available cluster capacity.

Useful? React with 👍 / 👎.

@wauputr4

Copy link
Copy Markdown
Member Author

Closing as obsolete duplicate of #80 for PEN-118. PR #80 has the updated commit and green Rust CI.

@wauputr4 wauputr4 closed this Jun 11, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant