[fournos_launcher] Allow launching multiple jobs concurrently#49
[fournos_launcher] Allow launching multiple jobs concurrently#49kpouget merged 33 commits intoopenshift-psap:mainfrom
Conversation
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughIntroduces thread-local script execution context and per-thread ARTIFACT_DIR handling, a Parallel context manager for concurrent tasks, a /parallel FOURNOS directive and parallel submission flow with per-job artifact suffixes, new cleanup CLI, config/preset changes, and several CLI/env renames and export updates. Changes
Sequence DiagramsequenceDiagram
participant CLI as User/CLI
participant Parser as Directive Parser
participant Config as Config Manager
participant Submit as Submit Orchestrator
participant Script as ScriptManager
participant Worker as Job Worker Thread
CLI->>Parser: submit with /parallel directive
Parser->>Config: write fournos_launcher.parallel_jobs[index]=[presets]
CLI->>Submit: trigger submission
Submit->>Config: read fournos_launcher.parallel_jobs
alt parallel entries present
Submit->>Submit: build shared timestamp base name
loop per job
Submit->>Worker: start thread with job args (+artifact_dirname_suffix)
Worker->>Script: start_execution_context(source_file)
Worker->>Worker: set thread-local ARTIFACT_DIR (if dedicated)
Worker->>Worker: call submit_and_wait(...)
Worker-->>Script: clear_execution_context() (finally)
end
Submit->>Submit: aggregate failures
Submit->>Submit: call cleanup_fjob per submitted job
else no parallel config
Submit->>Worker: single submit_and_wait (normal flow)
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
/test fournos skeleton |
1 similar comment
|
/test fournos skeleton |
There was a problem hiding this comment.
Actionable comments posted: 10
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@projects/core/dsl/runtime.py`:
- Around line 203-205: The finally block calls
script_manager.clear_execution_context() but script_manager is only assigned
inside the try, so if earlier operations raise an exception the finally will
raise UnboundLocalError and hide the original error; to fix, initialize
script_manager = None (or a sentinel) before the try and in the finally guard
the call (e.g., if script_manager is not None:
script_manager.clear_execution_context()), referencing the script_manager
variable and its clear_execution_context() method to locate where to add the
initialization and the conditional cleanup.
In `@projects/core/library/env.py`:
- Around line 37-39: The TLS setter _set_tls_artifact_dir currently has no
effect because NextArtifactDir, TempArtifactDir, and next_artifact_index still
read/write the module-global ARTIFACT_DIR/ARTIFACT_INDEX; update those helpers
to prefer thread-local storage (read _tls_artifact_dir.val and a thread-local
index like _tls_artifact_index.val) and only fall back to the module-global
ARTIFACT_DIR/ARTIFACT_INDEX when the TLS value is None, and ensure writes
(incrementing index or setting current dir) update the TLS slot so each worker
thread gets isolated artifact dirs and indexes; change references inside
NextArtifactDir, TempArtifactDir, and next_artifact_index accordingly.
- Around line 15-25: The module-level ARTIFACT_DIR currently exists so Python
never calls __getattr__ for env.ARTIFACT_DIR; remove or stop defining the
top-level ARTIFACT_DIR constant so attribute access will fall through to
__getattr__ (which checks _tls_artifact_dir and globals().get("ARTIFACT_DIR") as
a fallback). Locate and delete the module-level ARTIFACT_DIR definition (or
ensure it is removed from globals() during import) so __getattr__ can intercept
reads and return thread-local values using _tls_artifact_dir.val.
In `@projects/core/library/run.py`:
- Around line 199-202: Parallel blocks with no tasks cause
ThreadPoolExecutor(max_workers=0) to raise; update run.Parallel handling by
checking self.parallel_tasks before creating the executor and treat an empty
list as a no-op. Specifically, before computing max_workers or entering
ThreadPoolExecutor, return/exit early (or skip executor creation) when
self.parallel_tasks is empty; alternatively ensure max_workers is computed as
max(1, min(len(self.parallel_tasks), os.cpu_count() or 1)). Change the logic
around the max_workers variable and ThreadPoolExecutor usage so
run.Parallel(...) with no tasks does not instantiate ThreadPoolExecutor with 0
workers.
- Around line 174-213: The bug is that a single parallel_dir is created once and
shared by all futures so downstream env.next_artifact_index() calls still race;
to fix, generate a unique artifact directory per task before submitting: inside
the loop over self.parallel_tasks call env.next_artifact_index() (use
env.next_artifact_index()) for each delayed_func, build and mkdir a
task-specific parallel_dir, then submit _run_with_artifact_dir with that unique
parallel_dir (instead of the shared parallel_dir); keep use of
_run_with_artifact_dir, mkdir(exist_ok=True) and pass the created dir into
executor.submit so each worker gets its own non-colliding subtree.
- Line 9: The file imports joblib unconditionally via the top-level import
joblib which will raise ModuleNotFoundError if joblib is not declared as a
dependency; either add "joblib" to the project's dependencies in pyproject.toml,
or change the import to the safe pattern used in projects/legacy/library/run.py:
wrap import joblib in try/except ImportError, set a sentinel (e.g., joblib =
None) and guard any usage of joblib (functions/classes that call joblib) so
behavior degrades gracefully; reference the top-level import joblib and the
EXTRA_PACKAGES = ["joblib"] note in run_ci.py when choosing the fix.
- Around line 164-165: The code incorrectly uses joblib.delayed(function)(*args,
**kwargs) which returns a tuple-like delayed object and later code expects
attributes .func/.args/.keywords; change the append site
(self.parallel_tasks.append(...)) to store the raw tuple (function, args,
kwargs) instead of using joblib.delayed, then update the execution path that
reads those entries (the logic around the lines that access .func, .args,
.keywords) to unpack the tuple as (func, args, kwargs) and call func(*args,
**kwargs); also remove the unnecessary joblib.delayed usage/import if no longer
needed so only the plain tuple is used when scheduling and running tasks.
In `@projects/fournos_launcher/orchestration/pr_args.py`:
- Around line 241-254: The parser currently accepts any non-negative integer for
the /parallel index and returns {f"fournos_launcher.parallel_jobs.{idx}":
presets}, but config.yaml only defines slots 0–9 so indices >=10 will be ignored
later; after converting idx (and after the non-negative check) validate that idx
is within the allowed range (e.g. 0 <= idx < 10) and raise a ValueError with a
clear message like "Invalid /parallel directive: index out of range, must be
0..9, got {idx} in '{line}'"; implement this check near the existing idx parsing
block (or introduce a MAX_PARALLEL_JOBS constant) to prevent silently dropped
configs.
In `@projects/fournos_launcher/orchestration/submit.py`:
- Around line 135-148: The submit_kwargs dict in submit.py captures status_dest
as env.ARTIFACT_DIR too early, causing all parallel submissions to share the
same artifact dir; update submit_kwargs (the dict built in submit.py) to not
freeze status_dest to env.ARTIFACT_DIR — either remove the "status_dest" key or
set it to None so submit_and_wait can use the thread-local/artifact-dir chosen
by the worker/run.Parallel; ensure submit_and_wait still accepts and handles a
None status_dest fallback to the per-worker env.ARTIFACT_DIR/"artifacts".
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: ac92e4f5-2162-41d1-b95e-cda672e7a047
📒 Files selected for processing (7)
projects/core/dsl/runtime.pyprojects/core/dsl/script_manager.pyprojects/core/library/env.pyprojects/core/library/run.pyprojects/fournos_launcher/orchestration/config.yamlprojects/fournos_launcher/orchestration/pr_args.pyprojects/fournos_launcher/orchestration/submit.py
| try: | ||
| idx = int(idx_str) | ||
| if idx < 0: | ||
| raise ValueError( | ||
| f"Invalid /parallel directive: index must be non-negative, got {idx} in '{line}'" | ||
| ) | ||
| except ValueError as e: | ||
| if "non-negative" in str(e): | ||
| raise | ||
| raise ValueError( | ||
| f"Invalid /parallel directive: index must be a number, got '{idx_str}' in '{line}'" | ||
| ) from None | ||
|
|
||
| return {f"fournos_launcher.parallel_jobs.{idx}": presets} |
There was a problem hiding this comment.
Validate /parallel indices against the configured slots.
config.yaml only declares fournos_launcher.parallel_jobs.0 through .9. A directive like /parallel 10 ... passes parsing here, but submit.py only reads the declared map, so that job config can be silently dropped.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@projects/fournos_launcher/orchestration/pr_args.py` around lines 241 - 254,
The parser currently accepts any non-negative integer for the /parallel index
and returns {f"fournos_launcher.parallel_jobs.{idx}": presets}, but config.yaml
only defines slots 0–9 so indices >=10 will be ignored later; after converting
idx (and after the non-negative check) validate that idx is within the allowed
range (e.g. 0 <= idx < 10) and raise a ValueError with a clear message like
"Invalid /parallel directive: index out of range, must be 0..9, got {idx} in
'{line}'"; implement this check near the existing idx parsing block (or
introduce a MAX_PARALLEL_JOBS constant) to prevent silently dropped configs.
|
/test fournos skeleton |
2 similar comments
|
/test fournos skeleton |
|
/test fournos skeleton |
|
🔴 Test of 'fournos_launcher submit' failed after 00 hours 00 minutes 02 seconds 🔴 • Link to the test results. • No reports index generated... Test configuration: • Failure indicator: Empty. |
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (3)
projects/core/dsl/script_manager.py (1)
98-116:clear_taskslacks thread-safety synchronization.While
register_taskis protected byself._lock,clear_tasksmutates both_task_registryand_task_resultswithout acquiring the lock. This could cause race conditions if tasks are being registered concurrently with a clear operation.🔒 Proposed fix to add lock protection
def clear_tasks(self, source_file: str | None = None) -> None: """ Clear tasks from registry Args: source_file: If specified, only clear tasks from this file. If None, clear all tasks from all files. """ - if source_file is None: - # Clear all tasks from all files - logger.debug("Clearing all tasks from script manager") - self._task_registry.clear() - self._task_results.clear() - else: - # Clear tasks from specific file - if source_file in self._task_registry: - tasks_to_remove = self._task_registry[source_file] - - # Clear task results for tasks from this file - for task_info in tasks_to_remove: - task_name = task_info["name"] - if task_name in self._task_results: - del self._task_results[task_name] - - # Remove tasks from this file - del self._task_registry[source_file] - logger.debug(f"Cleared {len(tasks_to_remove)} tasks from {source_file}") + with self._lock: + if source_file is None: + # Clear all tasks from all files + logger.debug("Clearing all tasks from script manager") + self._task_registry.clear() + self._task_results.clear() + else: + # Clear tasks from specific file + if source_file in self._task_registry: + tasks_to_remove = self._task_registry[source_file] + + # Clear task results for tasks from this file + for task_info in tasks_to_remove: + task_name = task_info["name"] + if task_name in self._task_results: + del self._task_results[task_name] + + # Remove tasks from this file + del self._task_registry[source_file] + logger.debug(f"Cleared {len(tasks_to_remove)} tasks from {source_file}")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@projects/core/dsl/script_manager.py` around lines 98 - 116, clear_tasks mutates shared state (_task_registry and _task_results) without acquiring the instance lock, creating a race with register_task which uses self._lock; wrap the entire clear_tasks implementation in the same lock used by register_task (acquire self._lock at method start and release at end, including both the branch that clears all tasks and the branch that clears a specific source_file) so all reads/writes to _task_registry and _task_results are synchronized with other methods using self._lock.projects/fournos_launcher/toolbox/submit_fjob/main.py (1)
179-183: Consider moving import to module level.The
yamlimport inside the task function works but is unconventional. Moving it to the top of the file with other imports improves readability and makes dependencies explicit.♻️ Proposed refactor
At the top of the file:
import logging from datetime import datetime + +import yaml from projects.core.dsl import (Then remove line 179:
- import yaml - job_info_file = args.artifact_dir / "artifacts" / f"{ctx.final_job_name}-submission-info.yaml"🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@projects/fournos_launcher/toolbox/submit_fjob/main.py` around lines 179 - 183, Move the local "import yaml" out of the task function and add it to the module-level imports at the top of the file so dependencies are explicit; then remove the inline import on the block that writes job_info_file (the code that constructs job_info_file using args.artifact_dir and ctx.final_job_name and calls yaml.dump(job_info, ...)) so the function simply uses the module-level yaml reference.projects/fournos_launcher/orchestration/submit.py (1)
138-151:status_destinsubmit_kwargsis unused bysubmit_fjob.Looking at the
submit_fjobfunction signature (context snippet 1), it doesn't accept astatus_destparameter. This key insubmit_kwargswill be ignored when unpacked with**submit_kwargsat line 181.This isn't a bug since
wait_fjobscalls explicitly passstatus_dest, but the unused key adds confusion.♻️ Proposed fix to remove unused key
submit_kwargs = { "cluster_name": cluster_name, "project": config.project.get_config("ci_job.project"), "variables_overrides": overrides, "namespace": config.project.get_config("fournos.namespace"), "owner": config.project.get_config("fournos.job.owner"), "pipeline_name": config.project.get_config("fournos.job.pipeline_name"), "env": env_dict, - "status_dest": env.ARTIFACT_DIR, "ci_label": config.project.get_config("fournos.job.ci_label"), "exclusive": config.project.get_config("fournos.job.exclusive"), "gpu_count": gpu_count, "gpu_type": gpu_type, }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@projects/fournos_launcher/orchestration/submit.py` around lines 138 - 151, submit_kwargs contains a "status_dest" key that submit_fjob does not accept and will be ignored when **submit_kwargs is used; remove the "status_dest": env.ARTIFACT_DIR entry from the submit_kwargs dict (in projects/fournos_launcher/orchestration/submit.py) and instead pass env.ARTIFACT_DIR explicitly to wait_fjobs (which already expects status_dest) or add status_dest as an explicit argument where needed—do not change submit_fjob signature unless you intend for it to consume status_dest.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@projects/fournos_launcher/toolbox/wait_fjobs/main.py`:
- Around line 334-340: The check for command output uses yaml_list.stdout which
will be empty when stdout was redirected via stdout_dest; instead, after running
shell.run(...) keep the yaml_list.success (or returncode) check and verify the
artifact file (artifact_dir / f"{job_name}-pods.yaml") exists and is non-empty
(e.g., .exists() and .stat().st_size > 0) before incrementing captured_count;
update the condition that references yaml_list.stdout to check the file on disk
and leave yaml_list (the shell.run result) for success/returncode checks.
- Around line 306-314: The code currently checks both yaml_list.success and
yaml_list.stdout and yaml_list.stdout.strip() before counting captured
PipelineRun specs; update the condition to only check yaml_list.success (as
other similar code paths do when stdout_dest is used) so replace the combined
stdout checks with a single yaml_list.success in the function that builds the
message returning "Captured PipelineRun specs..." (look for yaml_list and
ctx.target_jobs) and apply the identical simplification in the
capture_all_pod_specs location where the same triple-check appears.
- Around line 122-123: The retry decorator currently hardcodes
`@retry`(attempts=120, delay=10, backoff=1.0) which yields a 20-minute limit;
update this to be configurable instead of fixed: read attempts, delay and
backoff from environment variables or function parameters (e.g., RETRY_ATTEMPTS,
RETRY_DELAY, RETRY_BACKOFF) and apply them to the retry decorator or construct
the decorator dynamically so longer (or overridable) timeouts are possible for
FOURNOS model jobs; alternatively, increase default values to reflect multi-hour
ML runs but ensure the new values are settable at runtime.
---
Nitpick comments:
In `@projects/core/dsl/script_manager.py`:
- Around line 98-116: clear_tasks mutates shared state (_task_registry and
_task_results) without acquiring the instance lock, creating a race with
register_task which uses self._lock; wrap the entire clear_tasks implementation
in the same lock used by register_task (acquire self._lock at method start and
release at end, including both the branch that clears all tasks and the branch
that clears a specific source_file) so all reads/writes to _task_registry and
_task_results are synchronized with other methods using self._lock.
In `@projects/fournos_launcher/orchestration/submit.py`:
- Around line 138-151: submit_kwargs contains a "status_dest" key that
submit_fjob does not accept and will be ignored when **submit_kwargs is used;
remove the "status_dest": env.ARTIFACT_DIR entry from the submit_kwargs dict (in
projects/fournos_launcher/orchestration/submit.py) and instead pass
env.ARTIFACT_DIR explicitly to wait_fjobs (which already expects status_dest) or
add status_dest as an explicit argument where needed—do not change submit_fjob
signature unless you intend for it to consume status_dest.
In `@projects/fournos_launcher/toolbox/submit_fjob/main.py`:
- Around line 179-183: Move the local "import yaml" out of the task function and
add it to the module-level imports at the top of the file so dependencies are
explicit; then remove the inline import on the block that writes job_info_file
(the code that constructs job_info_file using args.artifact_dir and
ctx.final_job_name and calls yaml.dump(job_info, ...)) so the function simply
uses the module-level yaml reference.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 0e2b6019-d8bc-4f91-9835-6fdf68c88041
📒 Files selected for processing (8)
projects/core/ci_entrypoint/run_ci.pyprojects/core/dsl/runtime.pyprojects/core/dsl/script_manager.pyprojects/fournos_launcher/orchestration/config.yamlprojects/fournos_launcher/orchestration/pr_args.pyprojects/fournos_launcher/orchestration/submit.pyprojects/fournos_launcher/toolbox/submit_fjob/main.pyprojects/fournos_launcher/toolbox/wait_fjobs/main.py
✅ Files skipped from review due to trivial changes (2)
- projects/core/ci_entrypoint/run_ci.py
- projects/fournos_launcher/orchestration/config.yaml
🚧 Files skipped from review as they are similar to previous changes (2)
- projects/core/dsl/runtime.py
- projects/fournos_launcher/orchestration/pr_args.py
2a9c539 to
0cfb72a
Compare
|
/test fournos skeleton |
|
🔴 Test of 'fournos_launcher submit' failed after 00 hours 00 minutes 53 seconds 🔴 • Link to the test results. • No reports index generated... Test configuration: |
|
/test fournos skeleton |
|
🔴 Test of 'fournos_launcher submit' failed after 00 hours 00 minutes 20 seconds 🔴 • Link to the test results. • No reports index generated... Test configuration: |
|
/test fournos skeleton |
|
🔴 Test of 'fournos_launcher submit' failed after 00 hours 01 minutes 04 seconds 🔴 • Link to the test results. • No reports index generated... Test configuration: |
|
/test fournos skeleton |
|
🔴 Test of 'fournos_launcher submit' failed after 00 hours 00 minutes 02 seconds 🔴 • Link to the test results. • No reports index generated... Test configuration: |
|
/test fournos skeleton |
|
🔴 Test of 'fournos_launcher submit' failed after 00 hours 00 minutes 04 seconds 🔴 • Link to the test results. • No reports index generated... Test configuration: • Failure indicator: Empty. |
|
/test fournos skeleton quick_test |
|
🔴 Test of 'fournos_launcher submit' failed after 00 hours 00 minutes 20 seconds 🔴 • Link to the test results. • No reports index generated... Test configuration: |
|
/test fournos skeleton quick_test |
|
🔴 Test of 'fournos_launcher submit' failed after 00 hours 01 minutes 06 seconds 🔴 • Link to the test results. • No reports index generated... Test configuration: |
|
🟢 Test of 'fournos_launcher submit' succeeded after 00 hours 06 minutes 32 seconds 🟢 • Link to the test results. • No reports index generated... Test configuration: |
|
/test fournos skeleton |
|
🟢 Test of 'skeleton test' succeeded after 00 hours 00 minutes 10 seconds 🟢 • Link to the test results. • No reports index generated... Test configuration: |
|
🟢 Test of 'skeleton test' succeeded after 00 hours 00 minutes 10 seconds 🟢 • Link to the test results. • No reports index generated... Test configuration: |
|
🟢 Test of 'fournos_launcher submit' succeeded after 00 hours 02 minutes 37 seconds 🟢 • Link to the test results. • No reports index generated... Test configuration: |
|
/test fournos skeleton |
|
🟢 Test of 'skeleton test' succeeded after 00 hours 00 minutes 10 seconds 🟢 • Link to the test results. • No reports index generated... Test configuration: |
|
🟢 Test of 'skeleton test' succeeded after 00 hours 00 minutes 10 seconds 🟢 • Link to the test results. • No reports index generated... Test configuration: |
|
🟢 Test of 'fournos_launcher submit' succeeded after 00 hours 02 minutes 33 seconds 🟢 • Link to the test results. • No reports index generated... Test configuration: |
|
/test fournos skeleton |
|
🟢 Test of 'skeleton test' succeeded after 00 hours 02 minutes 00 seconds 🟢 • Link to the test results. • No reports index generated... Test configuration: |
|
good, merging this version! |
|
/test fournos skeleton |
Summary by CodeRabbit
New Features
Changes