Skip to content

Add --job-threads flag for parallel job execution#763

Merged
SimonHeybrock merged 4 commits intomainfrom
worktree-job-threads
Mar 10, 2026
Merged

Add --job-threads flag for parallel job execution#763
SimonHeybrock merged 4 commits intomainfrom
worktree-job-threads

Conversation

@SimonHeybrock
Copy link
Copy Markdown
Member

@SimonHeybrock SimonHeybrock commented Mar 5, 2026

Summary

  • Add --job-threads N CLI flag to DataServiceRunner that parallelizes per-job accumulation and finalization using a ThreadPoolExecutor
  • When N=1 (default), behavior is unchanged — no thread pool is created
  • Complements --sync-scheduler from the base branch: the sync scheduler eliminates dask's internal GIL contention, while --job-threads adds outer parallelism across independent jobs

Motivation

Services like LOKI's detector_data run 18 simultaneous workflows (9 XY projection + 9 tube view). All job work (accumulate + finalize) was sequential. With --sync-scheduler --job-threads 5, the expensive per-job operations now run concurrently, limited mainly by GIL contention over scipp's C++ layer.

Observed improvement (local run, Kafka in Docker, LOKI with 18 detector jobs): dashboard-reported lag dropped from 1.0–1.5s to ~0.8–0.9s (with occasional spikes above 1s).

Mechanism

The new process_jobs method on JobManager combines push_data and compute_results into a single fan-out/fan-in pass over active jobs:

  1. Prepare (sequential, cheap): _advance_to_time() activates scheduled jobs, then each job's data is filtered from the incoming WorkflowData. Jobs with new data or pending primary data from a prior failed finalize are collected into work items.

  2. Execute (parallel via _map): Each work item runs _process_single_job which does both job.add() (accumulate) and job.get() (finalize) for a single job. The finalize decision is local — a job finalizes if this push delivered primary data successfully, or if it had pending primary data from a previous cycle. Since each job owns its own workflow processor instance, there is no shared mutable state between jobs.

  3. Bookkeep (sequential, cheap): Replies and results are collected back on the main thread. Job state tracking (_job_states, _job_error_messages, _jobs_with_primary_data, etc.) is updated sequentially.

The _map helper dispatches to ThreadPoolExecutor.map when job_threads > 1, or falls back to a plain list comprehension. push_data and compute_results remain as sequential methods used by unit tests.

OrchestratingProcessor.process() calls process_jobs instead of the previous push_data + compute_results pair.

Test plan

  • All 2544 existing tests pass
  • New tests verify: worker threads used when job_threads > 1, calling thread used when job_threads = 1, push and get run on the same thread per job, threaded results match sequential, error tracking propagates correctly
  • Manual: run LOKI detector_data service with --sync-scheduler --job-threads 5 and verify lag improvement in dashboard

🤖 Generated with Claude Code

@SimonHeybrock SimonHeybrock marked this pull request as draft March 5, 2026 12:24
Base automatically changed from sync-scheduler-flag to main March 5, 2026 13:15
@SimonHeybrock SimonHeybrock force-pushed the worktree-job-threads branch from d52de2a to 9ab8bf9 Compare March 5, 2026 13:17
@SimonHeybrock SimonHeybrock marked this pull request as ready for review March 5, 2026 13:18
@SimonHeybrock SimonHeybrock requested review from MridulS March 5, 2026 13:18
When --job-threads N is passed with N > 1, job accumulation (push_data)
and finalization (compute_results) are executed in a ThreadPoolExecutor.
Each job's work is independent, so the expensive per-job operations
(workflow accumulate and finalize) can run concurrently.

Combined with --sync-scheduler, this roughly halves wall-clock time for
services with many simultaneous workflows (e.g., LOKI detector_data
with 18 workflows).

Ref #756
Replace the two separate _map calls (one for push_data, one for
compute_results) with a single process_jobs method that runs both
accumulation and finalization as one task per job. This avoids two
fan-out/fan-in cycles and ensures each job's push and get run on the
same worker thread.

push_data and compute_results remain as sequential methods for
test convenience. The production path through OrchestratingProcessor
uses process_jobs.
Replace the _JobWorkItem/_JobWorkOutcome harness in job_manager with a
Job.process() method that encapsulates the push+finalize-if-primary
logic. This puts the decision of whether to finalize where it belongs —
on Job itself — and reduces job_manager's _process_job to a one-liner.
@SimonHeybrock SimonHeybrock force-pushed the worktree-job-threads branch from 598b212 to 68445fc Compare March 9, 2026 03:55
@SimonHeybrock SimonHeybrock merged commit 6afa992 into main Mar 10, 2026
4 checks passed
@SimonHeybrock SimonHeybrock deleted the worktree-job-threads branch March 10, 2026 07:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants