Add --job-threads flag for parallel job execution#763
Merged
SimonHeybrock merged 4 commits intomainfrom Mar 10, 2026
Merged
Conversation
d52de2a to
9ab8bf9
Compare
When --job-threads N is passed with N > 1, job accumulation (push_data) and finalization (compute_results) are executed in a ThreadPoolExecutor. Each job's work is independent, so the expensive per-job operations (workflow accumulate and finalize) can run concurrently. Combined with --sync-scheduler, this roughly halves wall-clock time for services with many simultaneous workflows (e.g., LOKI detector_data with 18 workflows). Ref #756
Replace the two separate _map calls (one for push_data, one for compute_results) with a single process_jobs method that runs both accumulation and finalization as one task per job. This avoids two fan-out/fan-in cycles and ensures each job's push and get run on the same worker thread. push_data and compute_results remain as sequential methods for test convenience. The production path through OrchestratingProcessor uses process_jobs.
Replace the _JobWorkItem/_JobWorkOutcome harness in job_manager with a Job.process() method that encapsulates the push+finalize-if-primary logic. This puts the decision of whether to finalize where it belongs — on Job itself — and reduces job_manager's _process_job to a one-liner.
598b212 to
68445fc
Compare
MridulS
approved these changes
Mar 9, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
--job-threads NCLI flag toDataServiceRunnerthat parallelizes per-job accumulation and finalization using aThreadPoolExecutorN=1(default), behavior is unchanged — no thread pool is created--sync-schedulerfrom the base branch: the sync scheduler eliminates dask's internal GIL contention, while--job-threadsadds outer parallelism across independent jobsMotivation
Services like LOKI's
detector_datarun 18 simultaneous workflows (9 XY projection + 9 tube view). All job work (accumulate + finalize) was sequential. With--sync-scheduler --job-threads 5, the expensive per-job operations now run concurrently, limited mainly by GIL contention over scipp's C++ layer.Observed improvement (local run, Kafka in Docker, LOKI with 18 detector jobs): dashboard-reported lag dropped from 1.0–1.5s to ~0.8–0.9s (with occasional spikes above 1s).
Mechanism
The new
process_jobsmethod onJobManagercombinespush_dataandcompute_resultsinto a single fan-out/fan-in pass over active jobs:Prepare (sequential, cheap):
_advance_to_time()activates scheduled jobs, then each job's data is filtered from the incomingWorkflowData. Jobs with new data or pending primary data from a prior failed finalize are collected into work items.Execute (parallel via
_map): Each work item runs_process_single_jobwhich does bothjob.add()(accumulate) andjob.get()(finalize) for a single job. The finalize decision is local — a job finalizes if this push delivered primary data successfully, or if it had pending primary data from a previous cycle. Since each job owns its own workflow processor instance, there is no shared mutable state between jobs.Bookkeep (sequential, cheap): Replies and results are collected back on the main thread. Job state tracking (
_job_states,_job_error_messages,_jobs_with_primary_data, etc.) is updated sequentially.The
_maphelper dispatches toThreadPoolExecutor.mapwhenjob_threads > 1, or falls back to a plain list comprehension.push_dataandcompute_resultsremain as sequential methods used by unit tests.OrchestratingProcessor.process()callsprocess_jobsinstead of the previouspush_data+compute_resultspair.Test plan
job_threads > 1, calling thread used whenjob_threads = 1, push and get run on the same thread per job, threaded results match sequential, error tracking propagates correctly--sync-scheduler --job-threads 5and verify lag improvement in dashboard🤖 Generated with Claude Code