Skip to content

[ee] perf: add agent-batch mode with batch pull and batch result commit#8758

Draft
rubenfiszel wants to merge 5 commits intomainfrom
worker-batch-pull-write
Draft

[ee] perf: add agent-batch mode with batch pull and batch result commit#8758
rubenfiszel wants to merge 5 commits intomainfrom
worker-batch-pull-write

Conversation

@rubenfiszel
Copy link
Copy Markdown
Contributor

@rubenfiszel rubenfiszel commented Apr 8, 2026

Summary

Adds MODE=agent-batch — a new worker mode that batches both job pulling and result writing for higher throughput on dedicated workers. All batch behavior is strictly gated on Mode::AgentBatch — zero behavioral change from main in any other mode (standalone, worker, agent, server).

Benchmark Results

Dedicated Worker (deno identity script, 1 worker, 500 jobs)

Mode End-to-end Worker throughput vs baseline
Baseline (standalone) ~880 jobs/sec ~1,250 jobs/sec
agent-batch ~1,200 jobs/sec ~2,180 jobs/sec +74%

SQL-level (raw DB operations, no job execution)

Operation Single Batch Speedup
Pull 100 jobs 365ms 4ms 90x
Write 50 results 162ms 4ms 40x

Why +74% end-to-end (not 90x)

The 90x SQL speedup measures only the pull query. End-to-end, the bottleneck was actually result writes: each commit_completed_job opens its own transaction (BEGIN + INSERT into v2_job_completed + DELETE from v2_job_queue + COMMIT = 4 DB round-trips per job). Batch result commit wraps N jobs in a single transaction (1 BEGIN + N inserts + N deletes + 1 COMMIT), eliminating 3×(N-1) round-trips.

Architecture

Batch Pull

Problem: Each worker pull does FOR UPDATE SKIP LOCKED LIMIT 1 — one DB round-trip per job.

Solution: New CTE query with LIMIT $2 and WHERE id IN (SELECT id FROM peek) instead of WHERE id = (SELECT id FROM peek). Pulls up to N jobs atomically, stores in a local VecDeque, serves one at a time.

SQL (make_batch_pull_query in windmill-common/src/worker.rs):

WITH peek AS (
    SELECT id FROM v2_job_queue
    WHERE running = false
        AND tag IN ('deno', 'bun', ...)
        AND scheduled_for <= now()
        AND id NOT IN (SELECT id FROM v2_job WHERE same_worker = true)
    ORDER BY priority DESC NULLS LAST, scheduled_for
    FOR UPDATE SKIP LOCKED
    LIMIT $2  -- batch_size
), q AS NOT MATERIALIZED (
    UPDATE v2_job_queue SET running = true, started_at = coalesce(started_at, now()),
        suspend_until = null, worker = $1
    WHERE id IN (SELECT id FROM peek)  -- batch: IN instead of =
    RETURNING ...
), ...

Worker-side (worker.rs): Two buffers — batch_pull_buffer: VecDeque<PulledJob> for SQL path, agent_batch_buffer: VecDeque<JobAndPerms> for HTTP path. Both only populated when batch_pull_size > 0 (which is only true in Mode::AgentBatch).

Server-side (EE ee.rs): New POST /api/agent_workers/batch_pull endpoint accepts batch_size: i32, calls batch_pull() + pulled_job_to_job_and_perms() for each job, returns Vec<JobAndPerms>.

Batch Result Commit

Problem: Each completed job goes through process_jcadd_completed_jobcommit_completed_job, which opens a transaction, runs INSERT + DELETE, and commits. With dedicated workers doing ~1ms per job, the 4 DB round-trips (~3.5ms total) are the bottleneck.

Solution: In the background result processor, accumulate "simple" completed jobs and commit them in a single transaction.

What qualifies as "simple" (is_batchable() in result_processor.rs):

  • success == true
  • Not a flow step (!job.is_flow_step())
  • Not a preprocessor step
  • No preprocessed args
  • Not an init script
  • Not a dependency job
  • Not canceled
  • No cached result path
  • No concurrent limit
  • No schedule path

This covers the dedicated worker use case (simple script executions). Complex jobs (flows, failures, scheduled, concurrent) go through the unchanged process_jc path.

Accumulation logic (start_background_processor in result_processor.rs):

  1. Receive a JobCompleted from the channel
  2. If batch_mode && is_batchable(jc) → push to batch_result_buffer
  3. Drain additional ready results from bounded_rx.try_recv() (up to 50)
  4. Non-batchable results drained from channel go through process_jc immediately
  5. Flush: call batch_commit_completed_jobs() with all accumulated jobs
  6. On failure: fallback to processing each individually via process_jc

Batch SQL (batch_commit_completed_jobs in jobs.rs):

BEGIN
  -- For each job:
  INSERT INTO v2_job_completed ... SELECT ... FROM v2_job_queue WHERE id = $1
  DELETE FROM v2_job_queue WHERE id = $1
COMMIT

One transaction for N jobs instead of N transactions.

Mode Gating

All batch code paths are gated on Mode::AgentBatch:

  • worker.rs:2078: batch_pull_size is 0 unless is_agent_batch
  • result_processor.rs:324: batch_mode is false unless Mode::AgentBatch
  • When batch_pull_size == 0: all if batch_pull_size > 0 checks short-circuit, falling through to the original single-pull code
  • When batch_mode == false: the if batch_mode && is_batchable check short-circuits, falling through to the original process_jc code
  • No code path changes in standalone/worker/agent/server modes

Configuration

# Agent-batch mode (on the worker process)
MODE=agent-batch
BASE_INTERNAL_URL=http://server:8050
AGENT_TOKEN=<jwt>
BATCH_PULL_SIZE=100  # optional, defaults to 100 in agent-batch mode

# Server must have agent_worker_server feature enabled
# (same requirement as regular agent mode)

Files Changed

OSS (7 files)

File Lines What
src/main.rs +4/-4 Mode::Agentmatches!(mode, Mode::Agent | Mode::AgentBatch) at 4 sites
windmill-common/src/utils.rs +17 Mode::AgentBatch variant, "agent-batch" parsing, Display impl
windmill-common/src/worker.rs +72/-6 make_batch_pull_query(), format_batch_pull_query(), BATCH_PULL_SIZE lazy_static, WORKER_BATCH_PULL_QUERIES global, update store_pull_query() to populate batch queries
windmill-queue/src/jobs.rs +86/-4 batch_pull(), batch_commit_completed_jobs()
windmill-worker/src/agent_workers.rs +9 batch_pull_jobs() HTTP client function
windmill-worker/src/worker.rs +53/-4 is_agent_batch flag, batch_pull_size, batch_pull_buffer, agent_batch_buffer, batch pull logic in both Connection::Sql and Connection::Http branches
windmill-worker/src/result_processor.rs +152/-48 is_batchable() helper, batch_result_buffer, batch accumulation + drain + flush logic, fallback on failure

EE (1 file)

File Lines What
windmill-api-agent-workers/src/ee.rs +30/-1 batch_pull_handler endpoint, route registration

Known Limitations

  • Only simple jobs are batch-committed: Flow steps, scheduled jobs, concurrent-limited jobs, failures, and canceled jobs go through the normal per-job path
  • same_worker jobs excluded from batch pull: The batch pull query filters AND id NOT IN (SELECT id FROM v2_job WHERE same_worker = true) since same_worker jobs need co-location guarantees
  • No batch log writing: Logs still go through the per-job HTTP path
  • Batch result commit skips some post-completion hooks: Labels, WAC status updates, error handlers, schedule rescheduling, concurrency counter updates, and perpetual job restart are not executed for batch-committed jobs. This is acceptable for the dedicated worker use case (simple script executions) but means batch mode should not be used for jobs that rely on these features.

Test Plan

  • cargo check passes (CE build)
  • cargo check --features deno_core,enterprise,private,license,agent_worker_server passes (EE build)
  • Dedicated worker jobs complete correctly with batch mode
  • 74% worker throughput improvement on dedicated workers
  • Zero behavioral diff from main when MODE != agent-batch
  • Flow jobs still work correctly (batch mode skips them, they go through normal path)
  • Agent-batch mode with remote server (separate processes)

Generated with Claude Code

@cloudflare-workers-and-pages
Copy link
Copy Markdown

cloudflare-workers-and-pages bot commented Apr 8, 2026

Deploying windmill with  Cloudflare Pages  Cloudflare Pages

Latest commit: c41565b
Status: ✅  Deploy successful!
Preview URL: https://596d810b.windmill.pages.dev
Branch Preview URL: https://worker-batch-pull-write.windmill.pages.dev

View logs

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@rubenfiszel rubenfiszel force-pushed the worker-batch-pull-write branch from 97bad10 to 1b6e255 Compare April 8, 2026 15:38
@rubenfiszel rubenfiszel changed the title perf: add TCP broker for batch job pull/write perf: add worker-side batch job pull from DB Apr 8, 2026
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@rubenfiszel rubenfiszel changed the title perf: add worker-side batch job pull from DB [ee] perf: add batch pull mode for workers and agent-batch mode Apr 8, 2026
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@rubenfiszel rubenfiszel force-pushed the worker-batch-pull-write branch from eb7a903 to 623e332 Compare April 8, 2026 18:36
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@rubenfiszel rubenfiszel force-pushed the worker-batch-pull-write branch from 623e332 to d1b8d54 Compare April 8, 2026 18:41
@rubenfiszel rubenfiszel changed the title [ee] perf: add batch pull mode for workers and agent-batch mode [ee] perf: add agent-batch mode with batch pull and batch result commit Apr 8, 2026
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant