From 36e03716b43a80e0e5e45c650d59f5e101d9eebb Mon Sep 17 00:00:00 2001 From: Kevin Pouget Date: Wed, 29 Apr 2026 15:14:58 +0200 Subject: [PATCH 01/33] [core] library: env: add thread safe ARTIFACT_DIR --- projects/core/library/env.py | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/projects/core/library/env.py b/projects/core/library/env.py index 7cd4c44f..7c34353e 100644 --- a/projects/core/library/env.py +++ b/projects/core/library/env.py @@ -1,12 +1,43 @@ import logging import os import pathlib +import threading import time ARTIFACT_DIR = None BASE_ARTIFACT_DIR = None # Immutable copy of the initial ARTIFACT_DIR FORGE_HOME = pathlib.Path(__file__).parents[3] +# Thread-local storage for ARTIFACT_DIR (thread-safe) +_tls_artifact_dir = threading.local() + + +def __getattr__(name): + """Support thread-local ARTIFACT_DIR access.""" + if name == "ARTIFACT_DIR": + # Try thread-local first, fallback to global + try: + return _tls_artifact_dir.val + except AttributeError: + # Thread-local not set, use global + return globals().get("ARTIFACT_DIR") + + return globals()[name] + + +def get_tls_artifact_dir(): + """Get thread-local artifact directory.""" + try: + return _tls_artifact_dir.val + except AttributeError: + # Thread-local not set, use global + return globals().get("ARTIFACT_DIR") + + +def _set_tls_artifact_dir(value): + """Set thread-local artifact directory (thread-safe).""" + _tls_artifact_dir.val = value + def _set_artifact_dir(value): global ARTIFACT_DIR @@ -51,6 +82,8 @@ def init(daily_artifact_dir=False): os.environ["FORGE_BASE_ARTIFACT_DIR"] = str(BASE_ARTIFACT_DIR) _set_artifact_dir(artifact_dir) + # Also set in thread-local storage for main thread + _set_tls_artifact_dir(artifact_dir) def NextArtifactDir(name, *, lock=None, counter_p=None): From eb23d29c05fa5e9074d3ad85eafb1b21dfa05c65 Mon Sep 17 00:00:00 2001 From: Kevin Pouget Date: Wed, 29 Apr 2026 15:15:15 +0200 Subject: [PATCH 02/33] [core] library: run: add a multi-thread Parallel context --- projects/core/library/run.py | 134 +++++++++++++++++++++++++++++++++++ 1 file changed, 134 insertions(+) diff --git a/projects/core/library/run.py b/projects/core/library/run.py index 64116c22..e78a6ccf 100644 --- a/projects/core/library/run.py +++ b/projects/core/library/run.py @@ -2,6 +2,11 @@ import os import signal import subprocess +import time +import traceback +from concurrent.futures import ThreadPoolExecutor, as_completed + +from . import env logger = logging.getLogger(__name__) @@ -104,3 +109,132 @@ def run_and_catch(exc, fct, *args, **kwargs): logger.error(f"{e.__class__.__name__}: {e}") exc = exc or e return exc + + +class Parallel: + """ + Context manager for parallel execution of tasks with immediate cancellation on failure. + + Uses ThreadPoolExecutor for multithreaded execution with thread-safe artifact directory handling. + When any task fails, all remaining tasks are immediately cancelled rather than waiting + for completion. + + Each parallel task runs with its own dedicated artifact directory accessible via env.ARTIFACT_DIR, + allowing tasks to write artifacts without conflicts. The dedicated directory is created as: + env.ARTIFACT_DIR / f"{next_count:03d}__{name}" + + Usage: + with run.Parallel("task_name") as parallel: + parallel.delayed(function1, arg1, arg2) + parallel.delayed(function2, arg3, kwarg=value) + # Tasks execute in parallel when exiting the context + # Each task can access its dedicated directory via env.ARTIFACT_DIR + """ + + def __init__(self, name, exit_on_exception=True, dedicated_dir=True): + """ + Initialize parallel execution context. + + Args: + name: Name for the parallel execution (used for artifact directory) + exit_on_exception: If True, kill process group on exception + dedicated_dir: If True, create dedicated artifact directory for this parallel execution + """ + self.name = name + self.parallel_tasks = None + self.exit_on_exception = exit_on_exception + self.dedicated_dir = dedicated_dir + + def __enter__(self): + """Enter the parallel context.""" + self.parallel_tasks = [] + return self + + def delayed(self, function, *args, **kwargs): + """ + Add a function to be executed in parallel. + + Args: + function: Function to execute + *args: Positional arguments for the function + **kwargs: Keyword arguments for the function + """ + # Simple task container - no joblib needed + from collections import namedtuple + + DelayedTask = namedtuple("DelayedTask", ["func", "args", "keywords"]) + task = DelayedTask(func=function, args=args, keywords=kwargs) + self.parallel_tasks.append(task) + + def __exit__(self, ex_type, ex_value, exc_traceback): + """Execute all delayed tasks in parallel with immediate cancellation on failure.""" + if ex_value: + logger.warning( + f"An exception occured while preparing the '{self.name}' Parallel execution ..." + ) + return False + + if self.dedicated_dir: + # Create dedicated directory without modifying global ARTIFACT_DIR + # to avoid race conditions in multithreaded execution. + # Each parallel task will inherit the current ARTIFACT_DIR context. + next_count = env.next_artifact_index() + parallel_dir = env.ARTIFACT_DIR / f"{next_count:03d}__{self.name}" + parallel_dir.mkdir(exist_ok=True) + logger.debug(f"Created parallel execution directory: {parallel_dir}") + else: + parallel_dir = None + + def _run_with_artifact_dir(func, artifact_dir, *args, **kwargs): + """Wrapper to run function with specific ARTIFACT_DIR for this thread.""" + if artifact_dir: + # Use thread-local storage (thread-safe) + original_artifact_dir = env.ARTIFACT_DIR + try: + env._set_tls_artifact_dir(artifact_dir) + return func(*args, **kwargs) + finally: + env._set_tls_artifact_dir(original_artifact_dir) + else: + # No dedicated directory, run normally + return func(*args, **kwargs) + + # Use ThreadPoolExecutor for better cancellation control + max_workers = min(len(self.parallel_tasks), os.cpu_count() or 1) + + with ThreadPoolExecutor(max_workers=max_workers) as executor: + # Submit all tasks with artifact directory context + futures = [] + for delayed_func in self.parallel_tasks: + # Wrap each function to run with the dedicated artifact directory + future = executor.submit( + _run_with_artifact_dir, + delayed_func.func, + parallel_dir, + *delayed_func.args, + **delayed_func.keywords, + ) + futures.append(future) + + try: + # Wait for all tasks to complete + for future in as_completed(futures): + future.result() # This will raise any exception that occurred + + except Exception as e: + # Cancel all remaining tasks immediately + logger.error("Exception in parallel task, cancelling remaining tasks...") + for future in futures: + future.cancel() + + # Give a short time for cancellation to take effect + time.sleep(0.1) + + if not self.exit_on_exception: + raise e + + traceback.print_exc() + logger.error(f"Exception caught during the '{self.name}' Parallel execution.") + raise SystemExit(1) from e + + return False # If we returned True here, any exception would be suppressed! From 411428aa13f9cd626505cfc41c0a53e4412c5b47 Mon Sep 17 00:00:00 2001 From: Kevin Pouget Date: Wed, 29 Apr 2026 16:23:34 +0200 Subject: [PATCH 03/33] [core] dsl: make thread safe --- projects/core/dsl/runtime.py | 7 ++++ projects/core/dsl/script_manager.py | 61 +++++++++++++++++++++++++---- 2 files changed, 60 insertions(+), 8 deletions(-) diff --git a/projects/core/dsl/runtime.py b/projects/core/dsl/runtime.py index d1afff31..dc297a65 100644 --- a/projects/core/dsl/runtime.py +++ b/projects/core/dsl/runtime.py @@ -109,6 +109,10 @@ def execute_tasks(function_args: dict = None): # Execute tasks only from the calling file script_manager = get_script_manager() + + # Start thread-local execution context for this execution + script_manager.start_execution_context(rel_filename) + file_tasks = list(script_manager.get_tasks_from_file(rel_filename)) if not file_tasks: @@ -195,7 +199,10 @@ def execute_tasks(function_args: dict = None): shared_context.__dict__["artifact_dir"] = args.artifact_dir return shared_context + finally: + # Clear thread-local execution context + script_manager.clear_execution_context() # Clean up the file handler to prevent leaks dsl_logger = logging.getLogger("DSL") dsl_logger.removeHandler(file_handler) diff --git a/projects/core/dsl/script_manager.py b/projects/core/dsl/script_manager.py index 28758be2..991477ba 100644 --- a/projects/core/dsl/script_manager.py +++ b/projects/core/dsl/script_manager.py @@ -5,6 +5,7 @@ """ import logging +import threading logger = logging.getLogger("DSL") @@ -40,6 +41,10 @@ def __init__(self): self._task_registry: dict[str, list[dict]] = {} # Task results organized by task name self._task_results: dict[str, TaskResult] = {} + # Thread-local storage for execution-specific task results + self._thread_local = threading.local() + # Lock for thread-safe registration + self._lock = threading.Lock() def register_task(self, task_info: dict, source_file: str) -> None: """ @@ -49,20 +54,26 @@ def register_task(self, task_info: dict, source_file: str) -> None: task_info: Dictionary containing task metadata source_file: Relative path to the source file defining the task """ - if source_file not in self._task_registry: - self._task_registry[source_file] = [] + with self._lock: + if source_file not in self._task_registry: + self._task_registry[source_file] = [] - self._task_registry[source_file].append(task_info) + self._task_registry[source_file].append(task_info) - # Create result container for this task - task_name = task_info["name"] - self._task_results[task_name] = TaskResult(task_name) + # Create result container for this task + task_name = task_info["name"] + self._task_results[task_name] = TaskResult(task_name) - logger.debug(f"Registered task '{task_name}' from {source_file}") + logger.debug(f"Registered task '{task_name}' from {source_file}") def get_task_result(self, task_name: str) -> TaskResult | None: """Get the result container for a specific task""" - return self._task_results.get(task_name) + # Check thread-local results (for current execution) + thread_results = getattr(self._thread_local, "task_results", None) + if thread_results and task_name in thread_results: + return thread_results[task_name] + + return None def get_tasks_from_file(self, source_file: str) -> list[dict]: """ @@ -125,6 +136,40 @@ def get_total_task_count(self) -> int: """Get the total number of registered tasks across all files""" return sum(len(tasks) for tasks in self._task_registry.values()) + def start_execution_context(self, source_file: str) -> None: + """ + Start a new execution context for the current thread. + + Creates thread-local task results for tasks from the specified file. + This allows parallel executions to have isolated task results. + + Args: + source_file: Source file being executed + """ + if not self.has_execution_context(): + self._thread_local.task_results = {} + + # Create thread-local task results for tasks in this file + file_tasks = self.get_tasks_from_file(source_file) + for task_info in file_tasks: + task_name = task_info["name"] + if task_name not in self._thread_local.task_results: + self._thread_local.task_results[task_name] = TaskResult(task_name) + + logger.debug( + f"Started execution context for {source_file} in thread {threading.current_thread().name}" + ) + + def clear_execution_context(self) -> None: + """Clear the thread-local execution context.""" + if self.has_execution_context(): + del self._thread_local.task_results + logger.debug(f"Cleared execution context in thread {threading.current_thread().name}") + + def has_execution_context(self) -> bool: + """Check if the current thread has an active execution context.""" + return hasattr(self._thread_local, "task_results") + # Global script manager instance # This provides the interface while keeping state encapsulated From 7dad23926fc35e320343da5f4a13ec910c7d1417 Mon Sep 17 00:00:00 2001 From: Kevin Pouget Date: Thu, 30 Apr 2026 09:29:06 +0200 Subject: [PATCH 04/33] [core] dsl: runtime: add the ability to customize the command prefix/suffix --- projects/core/dsl/runtime.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/projects/core/dsl/runtime.py b/projects/core/dsl/runtime.py index dc297a65..6fd26049 100644 --- a/projects/core/dsl/runtime.py +++ b/projects/core/dsl/runtime.py @@ -76,6 +76,18 @@ def execute_tasks(function_args: dict = None): filename = caller_frame.f_code.co_filename command_name = _get_toolbox_function_name(filename) + # Prepend prefix to command name if provided + if prefix := function_args.get("artifact_dirname_prefix"): + command_name = f"{prefix}_{command_name}" + # Remove DSL framework parameter from function args + del function_args["artifact_dirname_prefix"] + + # Append suffix to command name if provided + if suffix := function_args.get("artifact_dirname_suffix"): + command_name = f"{command_name}_{suffix}" + # Remove DSL framework parameter from function args + del function_args["artifact_dirname_suffix"] + # Get relative filename to match task registration try: rel_filename = str(Path(filename).relative_to(env.FORGE_HOME)) From 7c132a705bdca51bd3234a4d35b94f3b4b05582d Mon Sep 17 00:00:00 2001 From: Kevin Pouget Date: Thu, 30 Apr 2026 09:51:23 +0200 Subject: [PATCH 05/33] [core] library: config: allow creating top fields if they don't exist --- projects/core/library/config.py | 32 +++++++++++++++++++++++++++++--- 1 file changed, 29 insertions(+), 3 deletions(-) diff --git a/projects/core/library/config.py b/projects/core/library/config.py index 55d3cb24..eb33faf7 100644 --- a/projects/core/library/config.py +++ b/projects/core/library/config.py @@ -149,9 +149,35 @@ def apply_config_overrides( if ignore_not_found: continue - raise ValueError( - f"Config key '{key}' does not exist, and cannot create it at the moment :/" - ) + # Try to create the key if parent exists and is a dict + key_parts = key.split(".") + if len(key_parts) <= 1: + raise ValueError( + f"Config key '{key}' does not exist, and cannot create it at the moment :/" + ) + + parent_key = ".".join(key_parts[:-1]) + try: + parent_value = self.get_config( + parent_key, print=False, warn=False, handled_secretly=True + ) + except Exception as e: + raise ValueError( + f"Config key '{key}' does not exist, and cannot create it at the moment :/" + ) from e + + if not isinstance(parent_value, dict): + raise ValueError( + f"Config key '{key}' does not exist, and cannot create it at the moment :/" + ) + + # Parent exists and is a dict, create the new key + child_key = key_parts[-1] + parent_value[child_key] = value + self.save_config() + if log: + logger.info(f"config override (new key): {key} --> {value}") + continue self.set_config(key, value, print=False) actual_value = self.get_config( From eaa8d00ef662b625cfb7e87bb5507235c036b69d Mon Sep 17 00:00:00 2001 From: Kevin Pouget Date: Thu, 30 Apr 2026 10:01:00 +0200 Subject: [PATCH 06/33] [core] ci_entrypoint: fournos_resolve: use the right FJOB_NAME name --- projects/core/ci_entrypoint/fournos_resolve.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/projects/core/ci_entrypoint/fournos_resolve.py b/projects/core/ci_entrypoint/fournos_resolve.py index 3f5b4a18..b115a7b1 100644 --- a/projects/core/ci_entrypoint/fournos_resolve.py +++ b/projects/core/ci_entrypoint/fournos_resolve.py @@ -30,11 +30,11 @@ def fetch_fournos_job() -> tuple[str, str, dict]: RuntimeError: If fetch or parsing fails """ # Get environment variables - job_name = os.environ.get("FOURNOS_JOB_NAME") + job_name = os.environ.get("FJOB_NAME") namespace = os.environ.get("FOURNOS_NAMESPACE") if not job_name: - raise ValueError("FOURNOS_JOB_NAME environment variable is required") + raise ValueError("FJOB_NAME environment variable is required") if not namespace: raise ValueError("FOURNOS_NAMESPACE environment variable is required") @@ -197,8 +197,8 @@ def create_fournos_resolve_command( @click.command("resolve-fournos-config") @click.option( "--fjob-name", - help="FournosJob name (sets FOURNOS_JOB_NAME if provided)", - envvar="FOURNOS_JOB_NAME", + help="FournosJob name (sets FJOB_NAME if provided)", + envvar="FJOB_NAME", ) @click.option( "--namespace", @@ -216,7 +216,7 @@ def fournos_resolve_command(ctx, fjob_name, namespace, dry_run): """Resolve the FournosJob object configuration.""" if fjob_name: - os.environ["FOURNOS_JOB_NAME"] = fjob_name + os.environ["FJOB_NAME"] = fjob_name if namespace: os.environ["FOURNOS_NAMESPACE"] = namespace From c07959ec395137dd662bd0d4eaea49bd20c3aac8 Mon Sep 17 00:00:00 2001 From: Kevin Pouget Date: Thu, 30 Apr 2026 10:42:00 +0200 Subject: [PATCH 07/33] [core] library: export: give the correct subcommand name --- projects/core/library/export.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/projects/core/library/export.py b/projects/core/library/export.py index 67804d17..1d7d846c 100644 --- a/projects/core/library/export.py +++ b/projects/core/library/export.py @@ -30,7 +30,7 @@ def run_caliper_orchestration_export(*, artifact_directory: Path | None): return run_from_orchestration_config(caliper_cfg) -@click.command("export") +@click.command("export-artifacts") @click.option( "--artifact-directory", "artifact_directory", From f1df707b81a9374764bd3533e70f01e06a6d34a9 Mon Sep 17 00:00:00 2001 From: Kevin Pouget Date: Thu, 30 Apr 2026 10:51:44 +0200 Subject: [PATCH 08/33] [core] library: export: export from ARTIFACT_BASE_DIR --- projects/core/library/export.py | 1 + 1 file changed, 1 insertion(+) diff --git a/projects/core/library/export.py b/projects/core/library/export.py index 1d7d846c..48b01636 100644 --- a/projects/core/library/export.py +++ b/projects/core/library/export.py @@ -8,6 +8,7 @@ from __future__ import annotations import logging +import os from pathlib import Path import click From af53ee3a8a38a77c16a8dce3683168e2781b4107 Mon Sep 17 00:00:00 2001 From: Kevin Pouget Date: Thu, 30 Apr 2026 11:06:30 +0200 Subject: [PATCH 09/33] [core] notifications: send: show pr_config.txt or variable_overrides.yaml if available --- projects/core/notifications/send.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/projects/core/notifications/send.py b/projects/core/notifications/send.py index f3b503c3..48911826 100644 --- a/projects/core/notifications/send.py +++ b/projects/core/notifications/send.py @@ -185,10 +185,22 @@ def get_common_message(finish_reason: str, status: str, get_link, get_italics, g ``` {f.read().strip()} ``` +""" + elif ( + var_over := pathlib.Path(os.environ.get("ARTIFACT_DIR", "")) + / "000__ci_metadata" + / "variable_overrides.yaml" + ).exists(): + with open(var_over) as f: + message += f""" +{get_bold("Test configuration")}: +``` +{f.read().strip()} +``` """ else: message += """ -• No test configuration (`variable_overrides.yaml`) available. +• No test configuration (`variable_overrides.yaml/pr_config.txt`) available. """ if (failures := pathlib.Path(os.environ.get("ARTIFACT_DIR", "")) / "FAILURES").exists(): From e3256dbd66d6c04c7244e40e264ad9ee4171f9cc Mon Sep 17 00:00:00 2001 From: Kevin Pouget Date: Thu, 30 Apr 2026 11:13:33 +0200 Subject: [PATCH 10/33] [core] library: export: use the fjob name as run name --- projects/core/library/export.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/projects/core/library/export.py b/projects/core/library/export.py index 48b01636..0c50bc3d 100644 --- a/projects/core/library/export.py +++ b/projects/core/library/export.py @@ -23,9 +23,22 @@ def run_caliper_orchestration_export(*, artifact_directory: Path | None): """Set optional ``caliper.export.from`` and run orchestration export.""" + + if artifact_directory is None and "ARTIFACT_BASE_DIR" in os.environ: + artifact_directory = os.environ["ARTIFACT_BASE_DIR"] + if artifact_directory is not None: config.project.set_config("caliper.export.from", str(artifact_directory)) + # Use FJOB_NAME as fallback for mlflow run_name if not configured + run_name = config.project.get_config( + "caliper.export.backend.mlflow.config.run_name", None, print=False, warn=False + ) + if run_name is None and "FJOB_NAME" in os.environ: + config.project.set_config( + "caliper.export.backend.mlflow.config.run_name", os.environ["FJOB_NAME"], print=False + ) + caliper_cfg = config.project.get_config("caliper", print=False) return run_from_orchestration_config(caliper_cfg) From 178a37414d88d2acdf005d8345f45eace05372dc Mon Sep 17 00:00:00 2001 From: Kevin Pouget Date: Wed, 29 Apr 2026 14:30:46 +0200 Subject: [PATCH 11/33] [fournos_launcher] orchestration: add parsing support for /parallel --- .../orchestration/config.yaml | 2 + .../fournos_launcher/orchestration/pr_args.py | 59 +++++++++++++++++++ 2 files changed, 61 insertions(+) diff --git a/projects/fournos_launcher/orchestration/config.yaml b/projects/fournos_launcher/orchestration/config.yaml index cd807b25..4ff135ed 100644 --- a/projects/fournos_launcher/orchestration/config.yaml +++ b/projects/fournos_launcher/orchestration/config.yaml @@ -10,6 +10,8 @@ ci_job: overrides: {} # don't touch this one directly, the config module will populate it extra_overrides: {} +fournos_launcher: + parallel_jobs: {} fournos: namespace: psap-automation diff --git a/projects/fournos_launcher/orchestration/pr_args.py b/projects/fournos_launcher/orchestration/pr_args.py index 72d534b7..a4485c55 100644 --- a/projects/fournos_launcher/orchestration/pr_args.py +++ b/projects/fournos_launcher/orchestration/pr_args.py @@ -49,6 +49,11 @@ def get_supported_fournos_directives() -> dict[str, str]: Example: /gpu h100 4 /gpu a100 8 Effect: Sets fournos.job.hardware.gpu_type and fournos.job.hardware.gpu_count.""", + "/parallel": """Set parallel job presets for specific indices. + Format: /parallel idx preset1 preset2 preset3... + Example: /parallel 1 gpu-basic cpu-intensive + /parallel 2 memory-heavy network-test + Effect: Sets fournos_launcher.parallel_jobs[idx] = [preset1, preset2, ...]""", "/help": """Show all supported FOURNOS directives. Format: /help Effect: Logs available directive information.""", @@ -196,6 +201,59 @@ def handle_help_directive(line: str) -> dict[str, str]: return result +def handle_parallel_directive(line: str) -> dict[str, str]: + """ + Handle /parallel directive for setting parallel job presets by index. + + Format: /parallel idx preset1 preset2 preset3... + + Args: + line: The directive line + + Returns: + Dictionary with parallel_jobs configuration + + Raises: + ValueError: If format is invalid or values are missing + """ + parallel_content = line.removeprefix("/parallel ").strip() + + if not parallel_content: + raise ValueError(f"Invalid /parallel directive: parameters cannot be empty in '{line}'") + + parts = parallel_content.split() + if len(parts) < 2: + raise ValueError( + f"Invalid /parallel directive: format must be '/parallel idx preset1 [preset2...]' in '{line}'" + ) + + idx_str = parts[0].strip() + presets = [p.strip() for p in parts[1:] if p.strip()] + + if not idx_str: + raise ValueError(f"Invalid /parallel directive: index cannot be empty in '{line}'") + + if not presets: + raise ValueError( + f"Invalid /parallel directive: at least one preset must be specified in '{line}'" + ) + + try: + idx = int(idx_str) + if idx < 0: + raise ValueError( + f"Invalid /parallel directive: index must be non-negative, got {idx} in '{line}'" + ) + except ValueError as e: + if "non-negative" in str(e): + raise + raise ValueError( + f"Invalid /parallel directive: index must be a number, got '{idx_str}' in '{line}'" + ) from None + + return {f"fournos_launcher.parallel_jobs.{idx}": presets} + + def get_fournos_directive_handlers() -> dict[str, callable]: """ Get a mapping of FOURNOS directive prefixes to their handler functions. @@ -208,6 +266,7 @@ def get_fournos_directive_handlers() -> dict[str, callable]: "/exclusive": handle_exclusive_directive, "/pipeline": handle_pipeline_directive, "/gpu": handle_gpu_directive, + "/parallel": handle_parallel_directive, "/help": handle_help_directive, } From 20ffe9776ba252522e55a4b981aa99564f4b3167 Mon Sep 17 00:00:00 2001 From: Kevin Pouget Date: Wed, 29 Apr 2026 15:25:44 +0200 Subject: [PATCH 12/33] [fournos_launcher] orchestration: submit: allow parallel launches --- .../fournos_launcher/orchestration/submit.py | 116 +++++++++++++++--- 1 file changed, 99 insertions(+), 17 deletions(-) diff --git a/projects/fournos_launcher/orchestration/submit.py b/projects/fournos_launcher/orchestration/submit.py index 0862c9f5..f89e77af 100644 --- a/projects/fournos_launcher/orchestration/submit.py +++ b/projects/fournos_launcher/orchestration/submit.py @@ -2,8 +2,12 @@ import os import pathlib import signal +import threading +import traceback +from datetime import datetime from projects.core.library import config, env, run, vault +from projects.core.library.run_parallel import Parallel from projects.fournos_launcher.orchestration import job_management, pr_args from projects.fournos_launcher.toolbox.submit_and_wait.main import ( run as submit_and_wait, @@ -120,21 +124,99 @@ def submit_job(): f"or both must be null. Got gpu_count={gpu_count}, gpu_type={gpu_type}" ) - submit_and_wait( - cluster_name=cluster_name, - project=config.project.get_config("ci_job.project"), - args=config.project.get_config("ci_job.args"), - variables_overrides=overrides, - namespace=config.project.get_config("fournos.namespace"), - owner=config.project.get_config("fournos.job.owner"), - display_name=config.project.get_config("fournos.job.display_name"), - pipeline_name=config.project.get_config("fournos.job.pipeline_name"), - env=env_dict, - status_dest=env.ARTIFACT_DIR, - ci_label=config.project.get_config("fournos.job.ci_label"), - exclusive=config.project.get_config("fournos.job.exclusive"), - gpu_count=gpu_count, - gpu_type=gpu_type, - ) + # Check if parallel jobs are configured + parallel_jobs = config.project.get_config("fournos_launcher.parallel_jobs", {}, print=False) + parallel_job_configs = [] + + for idx, job_args_list in parallel_jobs.items(): + if job_args_list: # Non-empty list + parallel_job_configs.append((idx, job_args_list)) + + # Prepare common submit_and_wait arguments + submit_kwargs = { + "cluster_name": cluster_name, + "project": config.project.get_config("ci_job.project"), + "variables_overrides": overrides, + "namespace": config.project.get_config("fournos.namespace"), + "owner": config.project.get_config("fournos.job.owner"), + "pipeline_name": config.project.get_config("fournos.job.pipeline_name"), + "env": env_dict, + "status_dest": env.ARTIFACT_DIR, + "ci_label": config.project.get_config("fournos.job.ci_label"), + "exclusive": config.project.get_config("fournos.job.exclusive"), + "gpu_count": gpu_count, + "gpu_type": gpu_type, + } + + if parallel_job_configs: + logger.info( + f"Found {len(parallel_job_configs)} parallel job configurations, launching in parallel" + ) - return 0 + # Generate timestamp for parallel job names (shared across all parallel jobs) + timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") + raw_name = f"forge-{project_name}-{timestamp}" + + # Track failures across parallel jobs + failure_lock = threading.Lock() + has_failures = [False] # Use list for mutable reference + + def submit_parallel_job(job_index, job_args_list): + """Submit a single parallel job with specific args.""" + # Combine base ci_job.args with parallel job-specific args + base_args = config.project.get_config("ci_job.args") + combined_args = base_args + job_args_list + + logger.info(f"Submitting parallel job {job_index} with args: {combined_args}") + + # Create display name consistent with single job format + args_str = " ".join(combined_args) + parallel_display_name = f"{project_name} {args_str} (job {job_index})".strip() + + # Create unique job name with timestamp and index + unique_job_name = f"{raw_name}-{job_index}" + + try: + submit_and_wait( + **submit_kwargs, + args=combined_args, + display_name=parallel_display_name, + job_name=unique_job_name, + artifact_dirname_suffix=str(job_index), + ) + logger.info(f"Parallel job {job_index} completed successfully") + except Exception as e: + logger.error(f"Parallel job {job_index} failed: {e}") + traceback.print_exc() + + # Register failure in thread-safe way + with failure_lock: + has_failures[0] = True + + # Submit all parallel jobs with exit_on_exception=False to let others complete + with Parallel("parallel_jobs", exit_on_exception=False) as parallel: + for job_index, job_args_list in parallel_job_configs: + parallel.delayed(submit_parallel_job, job_index, job_args_list) + + # Check if any jobs failed + if has_failures[0]: + logger.error("One or more parallel jobs failed") + return 1 + else: + logger.info("All parallel jobs completed successfully") + return 0 + else: + # No parallel jobs configured, run single job as before + logger.info("No parallel jobs configured, running single job") + + # Generate unique job name for single job + timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") + single_job_name = f"forge-{project_name}-{timestamp}" + + submit_and_wait( + **submit_kwargs, + args=config.project.get_config("ci_job.args"), + display_name=config.project.get_config("fournos.job.display_name"), + job_name=single_job_name, + ) + return 0 From 147497d580b2b600937b262365ff181c32e6d0b4 Mon Sep 17 00:00:00 2001 From: Kevin Pouget Date: Thu, 30 Apr 2026 10:01:26 +0200 Subject: [PATCH 13/33] [fournos_launcher] toolbox: submit_and_wait: accept temporarily the artifact_dirname_suffix arg --- projects/fournos_launcher/toolbox/submit_and_wait/main.py | 1 + 1 file changed, 1 insertion(+) diff --git a/projects/fournos_launcher/toolbox/submit_and_wait/main.py b/projects/fournos_launcher/toolbox/submit_and_wait/main.py index ce3942f2..eb8eb9d6 100755 --- a/projects/fournos_launcher/toolbox/submit_and_wait/main.py +++ b/projects/fournos_launcher/toolbox/submit_and_wait/main.py @@ -40,6 +40,7 @@ def run( exclusive: bool = True, gpu_count: int = None, gpu_type: str = None, + artifact_dirname_suffix: str = None, ): """ Submit a FOURNOS job and wait for completion From 75e6c1855c9f35448379d4018f0b222e6151a902 Mon Sep 17 00:00:00 2001 From: Kevin Pouget Date: Thu, 30 Apr 2026 10:42:23 +0200 Subject: [PATCH 14/33] [llm_d_legacy] orchestration: ci: expose the resolve command --- projects/llm_d_legacy/orchestration/ci.py | 72 +++++++++++++++-------- projects/llm_d_legacy/testing/config.yaml | 11 ++++ 2 files changed, 59 insertions(+), 24 deletions(-) diff --git a/projects/llm_d_legacy/orchestration/ci.py b/projects/llm_d_legacy/orchestration/ci.py index 0577f46b..cbd73f8b 100755 --- a/projects/llm_d_legacy/orchestration/ci.py +++ b/projects/llm_d_legacy/orchestration/ci.py @@ -11,7 +11,9 @@ import click +from projects.core.ci_entrypoint.fournos_resolve import create_fournos_resolve_command from projects.core.library import env +from projects.core.library import config as forge_config from projects.core.library import ci as ci_lib from projects.core.library.export import caliper_export_command from projects.legacy.library import config @@ -21,17 +23,6 @@ logger = logging.getLogger(__name__) - -def _caliper_export_at_end() -> int: - """Set ``caliper.export.from`` to ``env.BASE_ARTIFACT_DIR`` and run orchestration export.""" - root = env.BASE_ARTIFACT_DIR - config.project.set_config("caliper.export.from", str(root), print=False) - caliper = config.project.get_config("caliper", print=False) - status = run_from_orchestration_config(caliper) - logger.info("Caliper export: %s", status) - return 0 - - # Add the testing directory to path for imports testing_dir = Path(__file__).parent.parent / "testing" if str(testing_dir) not in sys.path: @@ -60,14 +51,19 @@ def main(ctx): """Jump CI Project CI Operations for TOPSAIL-NG.""" ctx.ensure_object(dict) - +inited = False def init(): + global inited + if inited: + return + inited = True + env.init() legacy_env.init() testing_dir = Path(__file__).parent.parent / "testing" config.init(testing_dir) - + forge_config.init(testing_dir) presets = config.project.get_config("project.args") for preset in presets: config.project.apply_preset(preset) @@ -84,22 +80,50 @@ def test(ctx): """Test phase - Trigger the project's test method.""" log("Starting test phase...") - failed = True - try: - init() - failed = test_llmd.test() - finally: - logger.info("[llm-d] running the Caliper export") - try: - _caliper_export_at_end() - except Exception: - logger.exception("Caliper export failed") - failed = True + init() + failed = test_llmd.test() sys.exit(1 if failed else 0) main.add_command(caliper_export_command) +def resolve_hardware_request(hardware_spec: dict): + """ + Resolve hardware requirements for FournosJob based on skeleton project configuration. + + This is a stub implementation. Update spec.hardware based on project configuration. + + Args: + hardware_spec: The current spec.hardware dict from the FournosJob. This object should be updated. + + """ + init() + + logger.info("Hardware resolution: stub implementation - no changes made") + + # Stub implementation - could be extended to: + # - Read hardware config from project config + # - Set hardware requirements based on workload needs + # - Handle different hardware profiles (GPU, CPU, memory requirements) + # - Example: return {"gpu": {"type": "nvidia-tesla-v100", "count": 1}, "memory": "32Gi"} + + hardware_spec["gpuType"] = "h200" + hardware_spec["gpuCount"] = 4 + + return hardware_spec + +def list_vaults(): + init() + return config.project.get_config("vaults") + +main.add_command( + create_fournos_resolve_command( + vault_list_func=list_vaults, + hardware_resolver_func=resolve_hardware_request, + ) +) + + if __name__ == "__main__": main() diff --git a/projects/llm_d_legacy/testing/config.yaml b/projects/llm_d_legacy/testing/config.yaml index dd5371b8..2afc7360 100644 --- a/projects/llm_d_legacy/testing/config.yaml +++ b/projects/llm_d_legacy/testing/config.yaml @@ -2,6 +2,17 @@ vaults: - psap-forge-notifications - psap-forge-mlflow-export +project: + name: null + args: [] +ci_job: + name: null + fjob: null + cluster: null + exclusive: null + hardware: null + owner: null + project: name: null args: [] From 5d2a2043101b532ca510b75dc9747bcd5fb6ec7d Mon Sep 17 00:00:00 2001 From: Kevin Pouget Date: Thu, 30 Apr 2026 10:06:25 +0200 Subject: [PATCH 15/33] [skeleton] orchestration: ci: remove the explicit export after test --- projects/skeleton/orchestration/ci.py | 20 +++----------------- 1 file changed, 3 insertions(+), 17 deletions(-) diff --git a/projects/skeleton/orchestration/ci.py b/projects/skeleton/orchestration/ci.py index 91bf25a6..9c152242 100755 --- a/projects/skeleton/orchestration/ci.py +++ b/projects/skeleton/orchestration/ci.py @@ -7,30 +7,18 @@ building your own projects. """ -import logging import types import click import prepare_skeleton import test_skeleton -from projects.caliper.orchestration.export import run_from_orchestration_config from projects.core.ci_entrypoint.fournos_resolve import create_fournos_resolve_command from projects.core.library import ci as ci_lib -from projects.core.library import config, env +from projects.core.library import config from projects.core.library.export import caliper_export_command -def _caliper_export_at_end() -> int: - """Set ``caliper.export.from`` to ``env.BASE_ARTIFACT_DIR`` and run orchestration export.""" - root = env.BASE_ARTIFACT_DIR - config.project.set_config("caliper.export.from", str(root), print=False) - caliper = config.project.get_config("caliper", print=False) - status = run_from_orchestration_config(caliper) - logging.info("Caliper export: %s", status) - return 0 - - @click.group() @click.pass_context @ci_lib.safe_ci_function @@ -57,10 +45,7 @@ def prepare(ctx): @ci_lib.safe_ci_command def test(ctx): """Test phase - Execute the main testing logic.""" - try: - return test_skeleton.test() - finally: - _caliper_export_at_end() + return test_skeleton.test() @main.command() @@ -72,6 +57,7 @@ def pre_cleanup(ctx): main.add_command(caliper_export_command) + main.add_command( create_fournos_resolve_command( vault_list_func=lambda: config.project.get_config("vaults"), From e402cde013841c0dbedda89b8e5ee7909e8c7cca Mon Sep 17 00:00:00 2001 From: Kevin Pouget Date: Thu, 30 Apr 2026 11:21:21 +0200 Subject: [PATCH 16/33] [core] library: export: update the FJob.status.engineStatus --- projects/core/library/export.py | 61 +++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/projects/core/library/export.py b/projects/core/library/export.py index 0c50bc3d..9891c0c6 100644 --- a/projects/core/library/export.py +++ b/projects/core/library/export.py @@ -21,6 +21,63 @@ logger = logging.getLogger(__name__) +def _update_fjob_export_status(status: dict): + """Update FournosJob status with export artifacts status.""" + if os.environ.get("FOURNOS_CI") != "true": + return + + # Unset KUBECONFIG to use the pod SA access + original_kubeconfig = os.environ.get("KUBECONFIG") + if "KUBECONFIG" in os.environ: + del os.environ["KUBECONFIG"] + + try: + import json + import subprocess + + fjob_name = os.environ["FJOB_NAME"] + namespace = os.environ["FOURNOS_NAMESPACE"] + + # Get current fjob status + get_cmd = f"oc get fjob/{fjob_name} -n {namespace} -o json" + result = subprocess.run(get_cmd, shell=True, capture_output=True, text=True) + + if result.returncode == 0: + fjob_data = json.loads(result.stdout) + + # Initialize status.engine.status if it doesn't exist + if "status" not in fjob_data: + fjob_data["status"] = {} + if "engineStatus" not in fjob_data["status"]: + fjob_data["status"]["engineStatus"] = {} + if "status" not in fjob_data["status"]["engineStatus"]: + fjob_data["status"]["engineStatus"]["status"] = {} + + # Update with export-artifacts status + fjob_data["status"]["engineStatus"]["export-artifacts"] = status + + # Patch the fjob + patch_data = {"status": fjob_data["status"]} + patch_cmd = f"oc patch fjob/{fjob_name} -n {namespace} --type=merge --subresource=status -p '{json.dumps(patch_data)}'" + + patch_result = subprocess.run(patch_cmd, shell=True, capture_output=True, text=True) + if patch_result.returncode == 0: + logger.info(f"Updated fjob/{fjob_name} status with export artifacts status") + else: + logger.warning( + f"Failed to update fjob status: {patch_cmd} --> {patch_result.stderr}" + ) + else: + logger.warning(f"Failed to get fjob/{fjob_name}: {result.stderr}") + + except Exception as e: + logger.warning(f"Failed to update fjob status: {e}") + finally: + # Restore KUBECONFIG if it was set + if original_kubeconfig is not None: + os.environ["KUBECONFIG"] = original_kubeconfig + + def run_caliper_orchestration_export(*, artifact_directory: Path | None): """Set optional ``caliper.export.from`` and run orchestration export.""" @@ -59,4 +116,8 @@ def caliper_export_command(_ctx, artifact_directory: Path | None): status = run_caliper_orchestration_export(artifact_directory=artifact_directory) logger.info("Export status:\n" + yaml.dump(status, indent=4)) + + # Update fjob status with export results + _update_fjob_export_status(status) + return 0 From f4ef838c1cd92aa144ab8c6cae06b0f6c97d82a3 Mon Sep 17 00:00:00 2001 From: Kevin Pouget Date: Thu, 30 Apr 2026 11:37:12 +0200 Subject: [PATCH 17/33] [fournos_launcher] toolbox: submit_and_wait: don't cleanup after the test --- .../toolbox/submit_and_wait/main.py | 29 ++----------------- 1 file changed, 2 insertions(+), 27 deletions(-) diff --git a/projects/fournos_launcher/toolbox/submit_and_wait/main.py b/projects/fournos_launcher/toolbox/submit_and_wait/main.py index eb8eb9d6..15351f01 100755 --- a/projects/fournos_launcher/toolbox/submit_and_wait/main.py +++ b/projects/fournos_launcher/toolbox/submit_and_wait/main.py @@ -19,6 +19,7 @@ toolbox, ) from projects.core.dsl.utils.k8s import sanitize_k8s_name +from projects.core.library import env as env_mod logger = logging.getLogger(__name__) @@ -87,7 +88,7 @@ def run( if env is None: env = {} if status_dest is None: - status_dest = env.ARTIFACT_DIR / "artifacts" + status_dest = env_mod.ARTIFACT_DIR / "artifacts" else: status_dest = Path(status_dest) if not status_dest.exists(): @@ -354,32 +355,6 @@ def capture_pod_specs(args, ctx): return f"Wrote the pod spec file under {artifact_dir} (label {label})" -@always -@task -def cleanup_job(args, ctx): - """Clean up the job object""" - - # Guard: Check if job name was generated (might not exist if early validation failed) - if not hasattr(ctx, "final_job_name"): - return "No job name available - skipping job cleanup" - - # Check if shutdown has been requested - if so, preserve the job to let it export its artifacts - shutdown_result = shell.run( - f'oc get fournosjob {ctx.final_job_name} -n {args.namespace} -o jsonpath="{{.spec.shutdown}}"', - check=False, - ) - - if shutdown_result.success and shutdown_result.stdout.strip(): - shutdown_value = shutdown_result.stdout.strip() - return f"Job has spec.shutdown={shutdown_value} - preserving it to let it export its artifacts, skipping deletion" - - shell.run( - f"oc delete fournosjob {ctx.final_job_name} -n {args.namespace} --ignore-not-found", - ) - - return "Job cleanup completed" - - # Create the main function using the toolbox library main = toolbox.create_toolbox_main(run) From 109bee4a90ddb2d763605b7c18a9607daedf7ad4 Mon Sep 17 00:00:00 2001 From: Kevin Pouget Date: Thu, 30 Apr 2026 11:37:42 +0200 Subject: [PATCH 18/33] [fournos_launcher] orchestration: submit: cleanup after all the tests --- .../fournos_launcher/orchestration/submit.py | 64 ++++++++++++++++--- 1 file changed, 55 insertions(+), 9 deletions(-) diff --git a/projects/fournos_launcher/orchestration/submit.py b/projects/fournos_launcher/orchestration/submit.py index f89e77af..203094d9 100644 --- a/projects/fournos_launcher/orchestration/submit.py +++ b/projects/fournos_launcher/orchestration/submit.py @@ -9,6 +9,9 @@ from projects.core.library import config, env, run, vault from projects.core.library.run_parallel import Parallel from projects.fournos_launcher.orchestration import job_management, pr_args +from projects.fournos_launcher.toolbox.cleanup_fjob.main import ( + run as cleanup_fjob, +) from projects.fournos_launcher.toolbox.submit_and_wait.main import ( run as submit_and_wait, ) @@ -141,7 +144,6 @@ def submit_job(): "owner": config.project.get_config("fournos.job.owner"), "pipeline_name": config.project.get_config("fournos.job.pipeline_name"), "env": env_dict, - "status_dest": env.ARTIFACT_DIR, "ci_label": config.project.get_config("fournos.job.ci_label"), "exclusive": config.project.get_config("fournos.job.exclusive"), "gpu_count": gpu_count, @@ -157,9 +159,11 @@ def submit_job(): timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") raw_name = f"forge-{project_name}-{timestamp}" - # Track failures across parallel jobs + # Track failures and job names across parallel jobs failure_lock = threading.Lock() has_failures = [False] # Use list for mutable reference + submitted_job_names = [] + submitted_job_lock = threading.Lock() def submit_parallel_job(job_index, job_args_list): """Submit a single parallel job with specific args.""" @@ -177,12 +181,21 @@ def submit_parallel_job(job_index, job_args_list): unique_job_name = f"{raw_name}-{job_index}" try: + # Track the job name for cleanup (job gets submitted even if waiting fails) + with submitted_job_lock: + submitted_job_names.append(unique_job_name) + + # Create job-specific status directory + job_status_dest = env.ARTIFACT_DIR / unique_job_name + job_status_dest.mkdir(parents=True, exist_ok=True) + submit_and_wait( **submit_kwargs, args=combined_args, display_name=parallel_display_name, job_name=unique_job_name, artifact_dirname_suffix=str(job_index), + status_dest=job_status_dest, ) logger.info(f"Parallel job {job_index} completed successfully") except Exception as e: @@ -198,6 +211,21 @@ def submit_parallel_job(job_index, job_args_list): for job_index, job_args_list in parallel_job_configs: parallel.delayed(submit_parallel_job, job_index, job_args_list) + # Cleanup all submitted jobs + if submitted_job_names: + logger.info( + f"Cleaning up {len(submitted_job_names)} parallel jobs: {', '.join(submitted_job_names)}" + ) + for job_name in submitted_job_names: + try: + cleanup_fjob( + job_name=job_name, + namespace=config.project.get_config("fournos.namespace"), + ) + logger.info(f"Cleaned up job: {job_name}") + except Exception as cleanup_e: + logger.warning(f"Failed to cleanup job {job_name}: {cleanup_e}") + # Check if any jobs failed if has_failures[0]: logger.error("One or more parallel jobs failed") @@ -213,10 +241,28 @@ def submit_parallel_job(job_index, job_args_list): timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") single_job_name = f"forge-{project_name}-{timestamp}" - submit_and_wait( - **submit_kwargs, - args=config.project.get_config("ci_job.args"), - display_name=config.project.get_config("fournos.job.display_name"), - job_name=single_job_name, - ) - return 0 + try: + submit_and_wait( + **submit_kwargs, + args=config.project.get_config("ci_job.args"), + display_name=config.project.get_config("fournos.job.display_name"), + job_name=single_job_name, + status_dest=env.ARTIFACT_DIR, + ) + logger.info("Single job completed successfully") + return_code = 0 + except Exception as e: + logger.error(f"Single job failed: {e}") + return_code = 1 + + # Cleanup the job + try: + cleanup_fjob( + job_name=single_job_name, + namespace=config.project.get_config("fournos.namespace"), + ) + logger.info(f"Cleaned up job: {single_job_name}") + except Exception as cleanup_e: + logger.warning(f"Failed to cleanup job {single_job_name}: {cleanup_e}") + + return return_code From 22080634dac564ccac801cd7c25811ff85900fd1 Mon Sep 17 00:00:00 2001 From: Kevin Pouget Date: Thu, 30 Apr 2026 11:38:04 +0200 Subject: [PATCH 19/33] [fournos_launcher] toolbox: cleanup_fjob: cleanup job --- .../toolbox/cleanup_fjob/main.py | 132 ++++++++++++++++++ 1 file changed, 132 insertions(+) create mode 100644 projects/fournos_launcher/toolbox/cleanup_fjob/main.py diff --git a/projects/fournos_launcher/toolbox/cleanup_fjob/main.py b/projects/fournos_launcher/toolbox/cleanup_fjob/main.py new file mode 100644 index 00000000..f054c355 --- /dev/null +++ b/projects/fournos_launcher/toolbox/cleanup_fjob/main.py @@ -0,0 +1,132 @@ +#!/usr/bin/env python3 + +""" +FOURNOS job cleanup using task-based DSL +Cleans up a specific FOURNOS job +""" + +import logging + +from projects.core.dsl import ( + execute_tasks, + shell, + task, + toolbox, +) + +logger = logging.getLogger(__name__) + + +def run( + job_name: str, + namespace: str = "fournos-jobs", + skip_shutdown_check: bool = False, +): + """ + Clean up a FOURNOS job + + Args: + job_name: Name of the FOURNOS job to cleanup + namespace: Kubernetes namespace where the job is located (default: "fournos-jobs") + skip_shutdown_check: If True, skip checking for shutdown state and force cleanup (default: False) + + Examples: + # Basic cleanup: + run(job_name="forge-llm-d-20241203-143022") + + # Force cleanup regardless of shutdown state: + run(job_name="forge-llm-d-20241203-143022", skip_shutdown_check=True) + """ + # Execute all registered tasks in order + return execute_tasks(locals()) + + +@task +def validate_inputs(args, ctx): + """Validate input parameters""" + + if not args.job_name: + raise ValueError("job_name is required") + + if not args.namespace: + raise ValueError("namespace is required") + + return "Inputs validated" + + +@task +def ensure_oc(args, ctx): + """Ensure oc is available and connected""" + + shell.run("which oc || (echo 'oc not found in PATH' && exit 1)") + shell.run("oc whoami") + + +@task +def check_job_exists(args, ctx): + """Check if the job exists""" + + result = shell.run( + f"oc get fournosjob {args.job_name} -n {args.namespace}", + check=False, + ) + + if not result.success: + if "not found" in result.stderr.lower(): + ctx.job_exists = False + return f"Job {args.job_name} not found - nothing to cleanup" + else: + raise RuntimeError(f"Failed to check job existence: {result.stderr}") + + ctx.job_exists = True + return f"Job {args.job_name} exists" + + +@task +def check_shutdown_state(args, ctx): + """Check if shutdown has been requested""" + + if not ctx.job_exists: + return "Job doesn't exist - skipping shutdown check" + + if args.skip_shutdown_check: + return "Shutdown check skipped by request" + + shutdown_result = shell.run( + f'oc get fournosjob {args.job_name} -n {args.namespace} -o jsonpath="{{.spec.shutdown}}"', + check=False, + ) + + if shutdown_result.success and shutdown_result.stdout.strip(): + shutdown_value = shutdown_result.stdout.strip() + ctx.should_preserve = True + ctx.shutdown_reason = shutdown_value + return f"Job has spec.shutdown={shutdown_value} - should be preserved to let it export artifacts" + + ctx.should_preserve = False + return "No shutdown detected - safe to cleanup" + + +@task +def cleanup_fjob(args, ctx): + """Clean up the FOURNOS job""" + + if not ctx.job_exists: + return "Job doesn't exist - nothing to cleanup" + + if ctx.should_preserve: + return f"Job preserved due to shutdown state: {ctx.shutdown_reason}" + + shell.run( + f"oc delete fournosjob {args.job_name} -n {args.namespace} --ignore-not-found", + ) + + return f"Successfully cleaned up job: {args.job_name}" + + +# Create the main function using the toolbox library +main = toolbox.create_toolbox_main(run) + + +if __name__ == "__main__": + main() From bd79fee7bbd2542bb687485f11f5a45c298c931e Mon Sep 17 00:00:00 2001 From: Kevin Pouget Date: Thu, 30 Apr 2026 11:53:05 +0200 Subject: [PATCH 20/33] [core] library: env: thread safe --- projects/core/library/env.py | 88 ++++++++++++++++++++++++++++++------ 1 file changed, 75 insertions(+), 13 deletions(-) diff --git a/projects/core/library/env.py b/projects/core/library/env.py index 7c34353e..8dda1e20 100644 --- a/projects/core/library/env.py +++ b/projects/core/library/env.py @@ -4,8 +4,9 @@ import threading import time -ARTIFACT_DIR = None +# ARTIFACT_DIR is accessed via __getattr__ for thread-local support BASE_ARTIFACT_DIR = None # Immutable copy of the initial ARTIFACT_DIR +_GLOBAL_ARTIFACT_DIR = None # Internal global fallback FORGE_HOME = pathlib.Path(__file__).parents[3] # Thread-local storage for ARTIFACT_DIR (thread-safe) @@ -15,12 +16,19 @@ def __getattr__(name): """Support thread-local ARTIFACT_DIR access.""" if name == "ARTIFACT_DIR": - # Try thread-local first, fallback to global + # Each thread (including main) gets its own copy try: return _tls_artifact_dir.val except AttributeError: - # Thread-local not set, use global - return globals().get("ARTIFACT_DIR") + # Thread-local not set - this should not happen after proper initialization + # Return global as emergency fallback but warn + import logging + + logger = logging.getLogger(__name__) + logger.warning( + f"Thread {threading.current_thread().name} accessing ARTIFACT_DIR without thread-local copy" + ) + return globals().get("_GLOBAL_ARTIFACT_DIR") return globals()[name] @@ -30,8 +38,12 @@ def get_tls_artifact_dir(): try: return _tls_artifact_dir.val except AttributeError: - # Thread-local not set, use global - return globals().get("ARTIFACT_DIR") + # Thread-local not set - this should not happen after proper initialization + import logging + + logger = logging.getLogger(__name__) + logger.warning(f"Thread {threading.current_thread().name} has no thread-local ARTIFACT_DIR") + return None def _set_tls_artifact_dir(value): @@ -39,16 +51,31 @@ def _set_tls_artifact_dir(value): _tls_artifact_dir.val = value +def ensure_thread_artifact_dir(): + """Ensure current thread has its own copy of ARTIFACT_DIR.""" + try: + # Thread already has its own copy + return _tls_artifact_dir.val + except AttributeError: + # Thread doesn't have a copy, inherit from global + global_artifact_dir = globals().get("_GLOBAL_ARTIFACT_DIR") + if global_artifact_dir is not None: + _set_tls_artifact_dir(global_artifact_dir) + return global_artifact_dir + else: + raise ValueError("No ARTIFACT_DIR available to copy to thread") from None + + def _set_artifact_dir(value): - global ARTIFACT_DIR - ARTIFACT_DIR = value + global _GLOBAL_ARTIFACT_DIR + _GLOBAL_ARTIFACT_DIR = value def reset_artifact_dir(): """Reset ARTIFACT_DIR to its original BASE_ARTIFACT_DIR value.""" - global ARTIFACT_DIR + global _GLOBAL_ARTIFACT_DIR if BASE_ARTIFACT_DIR is not None: - ARTIFACT_DIR = BASE_ARTIFACT_DIR + _GLOBAL_ARTIFACT_DIR = BASE_ARTIFACT_DIR os.environ["ARTIFACT_DIR"] = str(BASE_ARTIFACT_DIR) @@ -94,7 +121,18 @@ def NextArtifactDir(name, *, lock=None, counter_p=None): else: next_count = next_artifact_index() - dirname = ARTIFACT_DIR / f"{next_count:03d}__{name}" + # Use thread-local ARTIFACT_DIR for directory creation + current_artifact_dir = None + try: + current_artifact_dir = _tls_artifact_dir.val + except AttributeError: + # Fallback to global if thread-local not set + current_artifact_dir = globals().get("_GLOBAL_ARTIFACT_DIR") + + if current_artifact_dir is None: + raise ValueError("ARTIFACT_DIR not set in either thread-local or global scope") + + dirname = current_artifact_dir / f"{next_count:03d}__{name}" return TempArtifactDir(dirname) @@ -105,20 +143,44 @@ def __init__(self, dirname): self.previous_dirname = None def __enter__(self): - self.previous_dirname = ARTIFACT_DIR + # Store current thread-local ARTIFACT_DIR + try: + self.previous_dirname = _tls_artifact_dir.val + except AttributeError: + # Fallback to global if thread-local not set + self.previous_dirname = globals().get("_GLOBAL_ARTIFACT_DIR") + + # Update environment variable os.environ["ARTIFACT_DIR"] = str(self.dirname) self.dirname.mkdir(exist_ok=True) + # Set both global and thread-local (for compatibility) _set_artifact_dir(self.dirname) + _set_tls_artifact_dir(self.dirname) return True def __exit__(self, ex_type, ex_value, exc_traceback): + # Restore environment variable os.environ["ARTIFACT_DIR"] = str(self.previous_dirname) + + # Restore both global and thread-local _set_artifact_dir(self.previous_dirname) + _set_tls_artifact_dir(self.previous_dirname) return False # If we returned True here, any exception would be suppressed! def next_artifact_index(): - return len(list(ARTIFACT_DIR.glob("*__*"))) + # Use thread-local ARTIFACT_DIR for counting + current_artifact_dir = None + try: + current_artifact_dir = _tls_artifact_dir.val + except AttributeError: + # Fallback to global if thread-local not set + current_artifact_dir = globals().get("_GLOBAL_ARTIFACT_DIR") + + if current_artifact_dir is None: + return 0 + + return len(list(current_artifact_dir.glob("*__*"))) From 0314955ac9c85ca33cb2eb40491717363a111682 Mon Sep 17 00:00:00 2001 From: Kevin Pouget Date: Thu, 30 Apr 2026 11:53:45 +0200 Subject: [PATCH 21/33] [core] library: run: thread safe --- projects/core/library/run.py | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/projects/core/library/run.py b/projects/core/library/run.py index e78a6ccf..cfea876d 100644 --- a/projects/core/library/run.py +++ b/projects/core/library/run.py @@ -188,15 +188,25 @@ def __exit__(self, ex_type, ex_value, exc_traceback): def _run_with_artifact_dir(func, artifact_dir, *args, **kwargs): """Wrapper to run function with specific ARTIFACT_DIR for this thread.""" if artifact_dir: - # Use thread-local storage (thread-safe) - original_artifact_dir = env.ARTIFACT_DIR + # Ensure thread has its own copy of ARTIFACT_DIR, then set it to dedicated directory + try: + original_artifact_dir = env.ensure_thread_artifact_dir() + except ValueError: + # No global ARTIFACT_DIR to inherit from + original_artifact_dir = None + try: env._set_tls_artifact_dir(artifact_dir) return func(*args, **kwargs) finally: - env._set_tls_artifact_dir(original_artifact_dir) + if original_artifact_dir is not None: + env._set_tls_artifact_dir(original_artifact_dir) else: - # No dedicated directory, run normally + # No dedicated directory, ensure thread has its own copy of main ARTIFACT_DIR + try: + env.ensure_thread_artifact_dir() + except ValueError: + pass # Continue without thread-local copy return func(*args, **kwargs) # Use ThreadPoolExecutor for better cancellation control From 60f17f0602911f08096cda32f674c050ce22c78c Mon Sep 17 00:00:00 2001 From: Kevin Pouget Date: Thu, 30 Apr 2026 12:35:20 +0200 Subject: [PATCH 22/33] [skeleton] orchestration: presets: update the preset names --- projects/skeleton/orchestration/presets.d/presets.yaml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/projects/skeleton/orchestration/presets.d/presets.yaml b/projects/skeleton/orchestration/presets.d/presets.yaml index 453c4676..e49eca21 100644 --- a/projects/skeleton/orchestration/presets.d/presets.yaml +++ b/projects/skeleton/orchestration/presets.d/presets.yaml @@ -11,5 +11,8 @@ side_testing: quick_test: skeleton.test.duration_seconds: 10 # Quick 10 second test +medium_test: + skeleton.test.duration_seconds: 120 # Medium 2 minute test + long_test: - skeleton.test.duration_seconds: 600 # Long 10 minute test + skeleton.test.duration_seconds: 300 # Long 5 minute test From da8b00237f807487ac70698f50adc03d14eb39c2 Mon Sep 17 00:00:00 2001 From: Kevin Pouget Date: Thu, 30 Apr 2026 12:44:40 +0200 Subject: [PATCH 23/33] [core] ci_entrypoint: fournos: consume the fournos-prepare fjob --- projects/core/ci_entrypoint/fournos.py | 52 ++++++++++++-------------- 1 file changed, 24 insertions(+), 28 deletions(-) diff --git a/projects/core/ci_entrypoint/fournos.py b/projects/core/ci_entrypoint/fournos.py index 06ffe918..f4b95c82 100644 --- a/projects/core/ci_entrypoint/fournos.py +++ b/projects/core/ci_entrypoint/fournos.py @@ -12,8 +12,6 @@ import yaml -from projects.core.library import run - logger = logging.getLogger(__name__) @@ -129,8 +127,8 @@ def parse_and_save_pr_arguments_fournos(): # Create CI metadata directory metadata_dir.mkdir(parents=True, exist_ok=True) - # Fetch and save FournosJob YAML - fjob, fjob_spec = fetch_and_save_fjob_yaml(metadata_dir / "fjob.yaml") + # Load FournosJob YAML + fjob, fjob_spec = load_fjob_yaml(metadata_dir.parent / "fournos_fjob.yaml") if not fjob_spec: raise ValueError("FournosJob YAML not found, cannot parse FOURNOS PR arguments") @@ -247,40 +245,38 @@ def process_forge_config(fjob_spec, metadata_dir, fjob): return variable_overrides -def fetch_and_save_fjob_yaml(fjob_yaml_dest): +def load_fjob_yaml(fjob_yaml_path): """ - Fetch FournosJob YAML and save to artifact directory. + Load FournosJob YAML from file. Args: - fjob_yaml_dest: Path where to save the FournosJob YAML + fjob_yaml_path: Path to the FournosJob YAML file Returns: Tuple of (full_fjob, fjob_spec), or (None, None) if not found """ - - namespace = "psap-automation-wip" - - result = run.run( - f"oc get fjobs -n {namespace} -oyaml", - capture_stdout=True, - ) - - fjobs_data = yaml.safe_load(result.stdout) - if not (fjobs_data and "items" in fjobs_data and fjobs_data["items"]): - logger.warning("No FournosJobs found") + if not fjob_yaml_path.exists(): + logger.warning(f"FournosJob YAML file not found: {fjob_yaml_path}") return None, None - # Take the last fjob from the list - fjob = fjobs_data["items"][-1] - try: - del fjob["status"] - except KeyError: - pass # ignore + with open(fjob_yaml_path) as f: + fjob = yaml.safe_load(f) - with open(fjob_yaml_dest, "w") as f: - yaml.dump(fjob, f, default_flow_style=False, sort_keys=False) + if not fjob: + logger.warning(f"Empty or invalid FournosJob YAML: {fjob_yaml_path}") + return None, None - logger.info(f"Saved FournosJob YAML to {fjob_yaml_dest}") + # Remove status if present + try: + del fjob["status"] + except KeyError: + pass # ignore - return fjob, fjob["spec"] + logger.info(f"Loaded FournosJob YAML from {fjob_yaml_path}") + + return fjob, fjob["spec"] + + except Exception as e: + logger.error(f"Failed to load FournosJob YAML from {fjob_yaml_path}: {e}") + return None, None From b56b35b47e863a26b29713d15e27b561214409b9 Mon Sep 17 00:00:00 2001 From: Kevin Pouget Date: Thu, 30 Apr 2026 13:35:15 +0200 Subject: [PATCH 24/33] [llm_d_legacy] make it work with Forge launcher --- projects/llm_d_legacy/testing/config.yaml | 103 ----------------- .../testing/presets.d/presets.yaml | 105 ++++++++++++++++++ 2 files changed, 105 insertions(+), 103 deletions(-) create mode 100644 projects/llm_d_legacy/testing/presets.d/presets.yaml diff --git a/projects/llm_d_legacy/testing/config.yaml b/projects/llm_d_legacy/testing/config.yaml index 2afc7360..1bb657ad 100644 --- a/projects/llm_d_legacy/testing/config.yaml +++ b/projects/llm_d_legacy/testing/config.yaml @@ -25,109 +25,6 @@ ci_presets: local_config: null to_apply: [] - # Preset that automatically scales up GPU nodes if none exist, and scales down at the end - scale_up: - prepare.cluster.nodes.auto_scale: true - prepare.cluster.nodes.auto_scale_down_on_exit: true - - dev: - matbench.enabled: false - tests.llmd.inference_service.model: facebook-opt-125m - - opt-125m: - tests.llmd.inference_service.model: facebook-opt-125m - tests.llmd.inference_service.vllm_args[5]: "--max-model-len=2048" - - psap_h200: - extends: [llama-70b] - tests.capture_prom: false - tests.capture_prom_uwm: false - tests.llmd.skip_prepare: true - prepare.namespace.name: kpouget-dev - prepare.cluster.skip: true - tests.llmd.inference_service.gateway.name: gateway-internal - - llmisvc_pd: - tests.llmd.flavors: [pd-x2-ptp1-px4-dtp4] - - pvc_rwx: - prepare.pvc.name: storage-rwx - prepare.pvc.access_mode: ReadWriteMany - - azure: - security.run_as_root: true - prepare.preload.skip: true - prepare.operators.skip: true - prepare.cluster.skip: true - prepare.rhoai.skip: true - - tests.capture_prom: false - tests.capture_prom_uwm: false - tests.llmd.skip_prepare: false - - azure_light: - extends: [azure, opt-125m] - prepare.pvc.storage_class: managed-csi - - cks: - extends: [pvc_rwx, llama-70b] - - tests.capture_prom: false - tests.capture_prom_uwm: false - tests.llmd.inference_service.metrics.manual_capture: false - tests.llmd.inference_service.gateway.name: gateway-internal - tests.llmd.inference_service.image_pull_secrets: kpouget-pull-secret - - tests.llmd.skip_prepare: true - prepare.namespace.name: kpouget-dev - prepare.preload.node_selector_key: gpu.nvidia.com/class - prepare.preload.node_selector_value: "H200" - tests.llmd.inference_service.extra_properties: - spec.template.affinity: - nodeAffinity: - requiredDuringSchedulingIgnoredDuringExecution: - nodeSelectorTerms: - - matchExpressions: - - key: kubernetes.io/hostname - operator: NotIn - values: - - gf48e48 - - gf4334a - prepare.preload.extra_images: - vllm-cuda-rhel9: registry.redhat.io/rhaiis/vllm-cuda-rhel9@sha256:094db84a1da5e8a575d0c9eade114fa30f4a2061064a338e3e032f3578f8082a - llm-d-inference-scheduler: ghcr.io/opendatahub-io/rhaii-on-xks/llm-d-inference-scheduler:e6b5db0@sha256:43e8b8edc158f31535c8b23d77629f8cde111cc762a8f4ee5f2f884470566211 - guidellm: ghcr.io/vllm-project/guidellm:v0.5.4 - - baseline-flavors: - tests.llmd.flavors: [simple, simple-tp4, simple-tp2-x4] - - intelligentrouting-flavors: - tests.llmd.flavors: [intelligentrouting-tp2-x4] - - guidellm_light: - tests.llmd.benchmarks.guidellm.load_shape_name: lightweight - tests.llmd.benchmarks.guidellm.rate: "1,10,50" - tests.llmd.benchmarks.guidellm.args.data: prompt_tokens=256,output_tokens=128 - tests.llmd.benchmarks.guidellm.args.max_seconds: 30 - - guidellm_multiturn_eval: - tests.llmd.benchmarks.guidellm.load_shape_name: Multiturn - tests.llmd.benchmarks.guidellm.rate: [32, 64, 128, 256, 512] # keep as a list, multi-rate not supported by guidellm-multiturn - tests.llmd.benchmarks.guidellm.args.data: "prompt_tokens=128,output_tokens=128,turns=5,prefix_tokens=10000,prefix_count={2*rate}" - tests.llmd.benchmarks.guidellm.args.max_requests: "{10*rate}" - - guidellm_heterogeneous_eval: - tests.llmd.benchmarks.guidellm.load_shape_name: Heterogeneous - tests.llmd.benchmarks.guidellm.rate: "1,10,50,100,200,300" - tests.llmd.benchmarks.guidellm.args.data: prompt_tokens=8000,prompt_tokens_stdev=8500,prompt_tokens_min=50,prompt_tokens_max=30000,output_tokens=800,output_tokens_stdev=1500,output_tokens_min=20,output_tokens_max=8000 - tests.llmd.benchmarks.guidellm.args.max_seconds: 600 - - gpt-oss: - tests.llmd.inference_service.model: gpt-oss-120 - tests.llmd.benchmarks.guidellm.args.request_type: text_completions - - llama-70b: - tests.llmd.inference_service.model: llama3-3-70b clusters: cleanup_on_exit: false diff --git a/projects/llm_d_legacy/testing/presets.d/presets.yaml b/projects/llm_d_legacy/testing/presets.d/presets.yaml new file mode 100644 index 00000000..984e6126 --- /dev/null +++ b/projects/llm_d_legacy/testing/presets.d/presets.yaml @@ -0,0 +1,105 @@ +__multiple: true + +# Preset that automatically scales up GPU nodes if none exist, and scales down at the end +scale_up: + prepare.cluster.nodes.auto_scale: true + prepare.cluster.nodes.auto_scale_down_on_exit: true + +dev: + matbench.enabled: false + tests.llmd.inference_service.model: facebook-opt-125m + +opt-125m: + tests.llmd.inference_service.model: facebook-opt-125m + tests.llmd.inference_service.vllm_args[5]: "--max-model-len=2048" + +psap_h200: + extends: [llama-70b] + tests.capture_prom: false + tests.capture_prom_uwm: false + tests.llmd.skip_prepare: true + prepare.namespace.name: kpouget-dev + prepare.cluster.skip: true + tests.llmd.inference_service.gateway.name: gateway-internal + +llmisvc_pd: + tests.llmd.flavors: [pd-x2-ptp1-px4-dtp4] + +pvc_rwx: + prepare.pvc.name: storage-rwx + prepare.pvc.access_mode: ReadWriteMany + +azure: + security.run_as_root: true + prepare.preload.skip: true + prepare.operators.skip: true + prepare.cluster.skip: true + prepare.rhoai.skip: true + + tests.capture_prom: false + tests.capture_prom_uwm: false + tests.llmd.skip_prepare: false + +azure_light: + extends: [azure, opt-125m] + prepare.pvc.storage_class: managed-csi + +cks: + extends: [pvc_rwx, llama-70b] + + tests.capture_prom: false + tests.capture_prom_uwm: false + tests.llmd.inference_service.metrics.manual_capture: false + tests.llmd.inference_service.gateway.name: gateway-internal + tests.llmd.inference_service.image_pull_secrets: kpouget-pull-secret + + tests.llmd.skip_prepare: true + prepare.namespace.name: kpouget-dev + prepare.preload.node_selector_key: gpu.nvidia.com/class + prepare.preload.node_selector_value: "H200" + tests.llmd.inference_service.extra_properties: + spec.template.affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: kubernetes.io/hostname + operator: NotIn + values: + - gf48e48 + - gf4334a + prepare.preload.extra_images: + vllm-cuda-rhel9: registry.redhat.io/rhaiis/vllm-cuda-rhel9@sha256:094db84a1da5e8a575d0c9eade114fa30f4a2061064a338e3e032f3578f8082a + llm-d-inference-scheduler: ghcr.io/opendatahub-io/rhaii-on-xks/llm-d-inference-scheduler:e6b5db0@sha256:43e8b8edc158f31535c8b23d77629f8cde111cc762a8f4ee5f2f884470566211 + guidellm: ghcr.io/vllm-project/guidellm:v0.5.4 + +baseline-flavors: + tests.llmd.flavors: [simple, simple-tp4, simple-tp2-x4] + +intelligentrouting-flavors: + tests.llmd.flavors: [intelligentrouting-tp2-x4] + +guidellm_light: + tests.llmd.benchmarks.guidellm.load_shape_name: lightweight + tests.llmd.benchmarks.guidellm.rate: "1,10,50" + tests.llmd.benchmarks.guidellm.args.data: prompt_tokens=256,output_tokens=128 + tests.llmd.benchmarks.guidellm.args.max_seconds: 30 + +guidellm_multiturn_eval: + tests.llmd.benchmarks.guidellm.load_shape_name: Multiturn + tests.llmd.benchmarks.guidellm.rate: [32, 64, 128, 256, 512] # keep as a list, multi-rate not supported by guidellm-multiturn + tests.llmd.benchmarks.guidellm.args.data: "prompt_tokens=128,output_tokens=128,turns=5,prefix_tokens=10000,prefix_count={2*rate}" + tests.llmd.benchmarks.guidellm.args.max_requests: "{10*rate}" + +guidellm_heterogeneous_eval: + tests.llmd.benchmarks.guidellm.load_shape_name: Heterogeneous + tests.llmd.benchmarks.guidellm.rate: "1,10,50,100,200,300" + tests.llmd.benchmarks.guidellm.args.data: prompt_tokens=8000,prompt_tokens_stdev=8500,prompt_tokens_min=50,prompt_tokens_max=30000,output_tokens=800,output_tokens_stdev=1500,output_tokens_min=20,output_tokens_max=8000 + tests.llmd.benchmarks.guidellm.args.max_seconds: 600 + +gpt-oss: + tests.llmd.inference_service.model: gpt-oss-120 + tests.llmd.benchmarks.guidellm.args.request_type: text_completions + +llama-70b: + tests.llmd.inference_service.model: llama3-3-70b From fe14f386b7bea1b84564ccdb5dc4d79c4d260a64 Mon Sep 17 00:00:00 2001 From: Kevin Pouget Date: Mon, 4 May 2026 10:36:33 +0200 Subject: [PATCH 25/33] [core] dsl: define the task ID as filename:lineno --- projects/core/dsl/script_manager.py | 28 ++++++++++++++-------------- projects/core/dsl/task.py | 6 ++++-- 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/projects/core/dsl/script_manager.py b/projects/core/dsl/script_manager.py index 991477ba..dfa1cd89 100644 --- a/projects/core/dsl/script_manager.py +++ b/projects/core/dsl/script_manager.py @@ -13,8 +13,8 @@ class TaskResult: """Container for task results that can be referenced in conditions""" - def __init__(self, task_name: str): - self.task_name = task_name + def __init__(self, task_id: str): + self.task_id = task_id self._result = None self._executed = False @@ -61,17 +61,17 @@ def register_task(self, task_info: dict, source_file: str) -> None: self._task_registry[source_file].append(task_info) # Create result container for this task - task_name = task_info["name"] - self._task_results[task_name] = TaskResult(task_name) + task_id = task_info["id"] + self._task_results[task_id] = TaskResult(task_id) - logger.debug(f"Registered task '{task_name}' from {source_file}") + logger.debug(f"Registered task '{task_id}' from {source_file}") - def get_task_result(self, task_name: str) -> TaskResult | None: + def get_task_result(self, task_id: str) -> TaskResult | None: """Get the result container for a specific task""" # Check thread-local results (for current execution) thread_results = getattr(self._thread_local, "task_results", None) - if thread_results and task_name in thread_results: - return thread_results[task_name] + if thread_results and task_id in thread_results: + return thread_results[task_id] return None @@ -107,9 +107,9 @@ def clear_tasks(self, source_file: str | None = None) -> None: # Clear task results for tasks from this file for task_info in tasks_to_remove: - task_name = task_info["name"] - if task_name in self._task_results: - del self._task_results[task_name] + task_id = task_info["id"] + if task_id in self._task_results: + del self._task_results[task_id] # Remove tasks from this file del self._task_registry[source_file] @@ -152,9 +152,9 @@ def start_execution_context(self, source_file: str) -> None: # Create thread-local task results for tasks in this file file_tasks = self.get_tasks_from_file(source_file) for task_info in file_tasks: - task_name = task_info["name"] - if task_name not in self._thread_local.task_results: - self._thread_local.task_results[task_name] = TaskResult(task_name) + task_id = task_info["id"] + if task_id not in self._thread_local.task_results: + self._thread_local.task_results[task_id] = TaskResult(task_id) logger.debug( f"Started execution context for {source_file} in thread {threading.current_thread().name}" diff --git a/projects/core/dsl/task.py b/projects/core/dsl/task.py index d18e4cdb..a58b96c2 100644 --- a/projects/core/dsl/task.py +++ b/projects/core/dsl/task.py @@ -230,7 +230,8 @@ def wrapper(*args, **kwargs): result = func(*args, **kwargs) # Store result for conditional execution script_manager = get_script_manager() - task_result = script_manager.get_task_result(task_name) + task_id = wrapper._task_info["id"] + task_result = script_manager.get_task_result(task_id) if task_result: task_result._set_result(result) return result @@ -249,6 +250,7 @@ def wrapper(*args, **kwargs): # Register the task with the script manager task_info = { + "id": f"{rel_definition_filename}:{definition_line_no}", "name": func.__name__, "func": wrapper, "condition": getattr(func, "_when_condition", None), @@ -263,7 +265,7 @@ def wrapper(*args, **kwargs): wrapper._task_info = task_info # Make the result accessible as an attribute of the function - wrapper.status = script_manager.get_task_result(func.__name__) + wrapper.status = script_manager.get_task_result(task_info["id"]) return wrapper From d97d6dcf7a821bee470988e730295960fcfbdb94 Mon Sep 17 00:00:00 2001 From: Kevin Pouget Date: Mon, 4 May 2026 10:45:48 +0200 Subject: [PATCH 26/33] [core] library: config: move _create_first_parent_config_key to a dedicated function --- projects/core/library/config.py | 65 +++++++++++++++++++-------------- 1 file changed, 38 insertions(+), 27 deletions(-) diff --git a/projects/core/library/config.py b/projects/core/library/config.py index eb33faf7..d84005c4 100644 --- a/projects/core/library/config.py +++ b/projects/core/library/config.py @@ -114,6 +114,33 @@ def save_config_overrides(self): self.config["overrides"] = variable_overrides + def _create_first_parent_config_key(self, key: str, value) -> None: + """Create a new config key if its parent exists and is a dict.""" + key_parts = key.split(".") + if len(key_parts) <= 1: + raise ValueError( + f"Config key '{key}' does not exist, and cannot create it at the moment :/" + ) + + parent_key = ".".join(key_parts[:-1]) + try: + parent_value = self.get_config( + parent_key, print=False, warn=False, handled_secretly=True + ) + except Exception as e: + raise ValueError( + f"Config key '{key}' does not exist, and cannot create it at the moment :/" + ) from e + + if not isinstance(parent_value, dict): + raise ValueError( + f"Config key '{key}' does not exist, and cannot create it at the moment :/" + ) + + # Parent exists and is a dict, create the new key + child_key = key_parts[-1] + parent_value[child_key] = value + def apply_config_overrides( self, *, ignore_not_found=False, variable_overrides_path=None, log=True ): @@ -146,34 +173,18 @@ def apply_config_overrides( handled_secretly=handled_secretly, ) if current_value == MAGIC_DEFAULT_VALUE: - if ignore_not_found: - continue - - # Try to create the key if parent exists and is a dict - key_parts = key.split(".") - if len(key_parts) <= 1: - raise ValueError( - f"Config key '{key}' does not exist, and cannot create it at the moment :/" - ) - - parent_key = ".".join(key_parts[:-1]) try: - parent_value = self.get_config( - parent_key, print=False, warn=False, handled_secretly=True - ) - except Exception as e: - raise ValueError( - f"Config key '{key}' does not exist, and cannot create it at the moment :/" - ) from e - - if not isinstance(parent_value, dict): - raise ValueError( - f"Config key '{key}' does not exist, and cannot create it at the moment :/" - ) + # Try to create the key if parent exists and is a dict + self._create_first_parent_config_key(key, value) + self.save_config() + except ValueError: + if not ignore_not_found: + raise + + if log: + logger.info(f"config override IGNORED: {key} --> {value}") + continue - # Parent exists and is a dict, create the new key - child_key = key_parts[-1] - parent_value[child_key] = value self.save_config() if log: logger.info(f"config override (new key): {key} --> {value}") @@ -465,7 +476,7 @@ def init(orchestration_dir, *, apply_config_overrides=True): if not apply_config_overrides: logger.info( - "config.init: running with 'apply_config_overrides', " + "config.init: running with 'apply_config_overrides=False', " "skipping the overrides. Saving it as 'overrides' " "field in the project configuration." ) From 2e4d685ca993de04fd2e2be00a6191909719d78a Mon Sep 17 00:00:00 2001 From: Kevin Pouget Date: Mon, 4 May 2026 10:46:08 +0200 Subject: [PATCH 27/33] [core] library: env: don't update ARTIFACT_DIR in the threads --- projects/core/library/env.py | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/projects/core/library/env.py b/projects/core/library/env.py index 8dda1e20..425e2ca8 100644 --- a/projects/core/library/env.py +++ b/projects/core/library/env.py @@ -150,22 +150,26 @@ def __enter__(self): # Fallback to global if thread-local not set self.previous_dirname = globals().get("_GLOBAL_ARTIFACT_DIR") - # Update environment variable - os.environ["ARTIFACT_DIR"] = str(self.dirname) - self.dirname.mkdir(exist_ok=True) + # Only update environment variable in main thread to avoid parallel conflicts + if threading.current_thread() == threading.main_thread(): + os.environ["ARTIFACT_DIR"] = str(self.dirname) + # Set global for main thread compatibility + _set_artifact_dir(self.dirname) - # Set both global and thread-local (for compatibility) - _set_artifact_dir(self.dirname) + self.dirname.mkdir(exist_ok=True) + # Always set thread-local (each thread gets its own) _set_tls_artifact_dir(self.dirname) return True def __exit__(self, ex_type, ex_value, exc_traceback): - # Restore environment variable - os.environ["ARTIFACT_DIR"] = str(self.previous_dirname) + # Only restore environment variable in main thread to avoid parallel conflicts + if threading.current_thread() == threading.main_thread(): + os.environ["ARTIFACT_DIR"] = str(self.previous_dirname) + # Restore global for main thread compatibility + _set_artifact_dir(self.previous_dirname) - # Restore both global and thread-local - _set_artifact_dir(self.previous_dirname) + # Always restore thread-local (each thread manages its own) _set_tls_artifact_dir(self.previous_dirname) return False # If we returned True here, any exception would be suppressed! From ce8fabd9f5dc90c625a89a0df2461ca17bcc65ac Mon Sep 17 00:00:00 2001 From: Kevin Pouget Date: Mon, 4 May 2026 10:46:32 +0200 Subject: [PATCH 28/33] [core] library: export: use run.run --- projects/core/library/export.py | 60 ++++++++++++++++----------------- 1 file changed, 29 insertions(+), 31 deletions(-) diff --git a/projects/core/library/export.py b/projects/core/library/export.py index 9891c0c6..101ebc23 100644 --- a/projects/core/library/export.py +++ b/projects/core/library/export.py @@ -16,7 +16,7 @@ from projects.caliper.orchestration.export import run_from_orchestration_config from projects.core.library import ci as ci_lib -from projects.core.library import config +from projects.core.library import config, run logger = logging.getLogger(__name__) @@ -33,42 +33,40 @@ def _update_fjob_export_status(status: dict): try: import json - import subprocess fjob_name = os.environ["FJOB_NAME"] namespace = os.environ["FOURNOS_NAMESPACE"] # Get current fjob status - get_cmd = f"oc get fjob/{fjob_name} -n {namespace} -o json" - result = subprocess.run(get_cmd, shell=True, capture_output=True, text=True) - - if result.returncode == 0: - fjob_data = json.loads(result.stdout) - - # Initialize status.engine.status if it doesn't exist - if "status" not in fjob_data: - fjob_data["status"] = {} - if "engineStatus" not in fjob_data["status"]: - fjob_data["status"]["engineStatus"] = {} - if "status" not in fjob_data["status"]["engineStatus"]: - fjob_data["status"]["engineStatus"]["status"] = {} - - # Update with export-artifacts status - fjob_data["status"]["engineStatus"]["export-artifacts"] = status - - # Patch the fjob - patch_data = {"status": fjob_data["status"]} - patch_cmd = f"oc patch fjob/{fjob_name} -n {namespace} --type=merge --subresource=status -p '{json.dumps(patch_data)}'" - - patch_result = subprocess.run(patch_cmd, shell=True, capture_output=True, text=True) - if patch_result.returncode == 0: - logger.info(f"Updated fjob/{fjob_name} status with export artifacts status") - else: - logger.warning( - f"Failed to update fjob status: {patch_cmd} --> {patch_result.stderr}" - ) + get_cmd = f"oc get fjob/{fjob_name} -n {namespace} -ojson" + result = run.run(get_cmd, capture_stdout=True, check=False) + + if result.returncode != 0: + logger.warning(f"Failed to get fjob/{fjob_name}") + return + + fjob_data = json.loads(result.stdout) + + # Initialize status.engine.status if it doesn't exist + if "status" not in fjob_data: + fjob_data["status"] = {} + if "engineStatus" not in fjob_data["status"]: + fjob_data["status"]["engineStatus"] = {} + if "status" not in fjob_data["status"]["engineStatus"]: + fjob_data["status"]["engineStatus"]["status"] = {} + + # Update with export-artifacts status + fjob_data["status"]["engineStatus"]["export-artifacts"] = status + + # Patch the fjob + patch_data = {"status": fjob_data["status"]} + patch_cmd = f"oc patch fjob/{fjob_name} -n {namespace} --type=merge --subresource=status -p '{json.dumps(patch_data)}'" + + patch_result = run.run(patch_cmd, check=False) + if patch_result.returncode == 0: + logger.info(f"Updated fjob/{fjob_name} status with export artifacts status") else: - logger.warning(f"Failed to get fjob/{fjob_name}: {result.stderr}") + logger.warning(f"Failed to update fjob status: {patch_cmd}") except Exception as e: logger.warning(f"Failed to update fjob status: {e}") From f89e4d3194e437ecf9c020da5b48b6c0fbb2e9ad Mon Sep 17 00:00:00 2001 From: Kevin Pouget Date: Mon, 4 May 2026 10:52:05 +0200 Subject: [PATCH 29/33] [core] library: run_parallel: move the Parallel code to a dedicate file --- projects/core/library/run.py | 144 ------------------------- projects/core/library/run_parallel.py | 148 ++++++++++++++++++++++++++ 2 files changed, 148 insertions(+), 144 deletions(-) create mode 100644 projects/core/library/run_parallel.py diff --git a/projects/core/library/run.py b/projects/core/library/run.py index cfea876d..64116c22 100644 --- a/projects/core/library/run.py +++ b/projects/core/library/run.py @@ -2,11 +2,6 @@ import os import signal import subprocess -import time -import traceback -from concurrent.futures import ThreadPoolExecutor, as_completed - -from . import env logger = logging.getLogger(__name__) @@ -109,142 +104,3 @@ def run_and_catch(exc, fct, *args, **kwargs): logger.error(f"{e.__class__.__name__}: {e}") exc = exc or e return exc - - -class Parallel: - """ - Context manager for parallel execution of tasks with immediate cancellation on failure. - - Uses ThreadPoolExecutor for multithreaded execution with thread-safe artifact directory handling. - When any task fails, all remaining tasks are immediately cancelled rather than waiting - for completion. - - Each parallel task runs with its own dedicated artifact directory accessible via env.ARTIFACT_DIR, - allowing tasks to write artifacts without conflicts. The dedicated directory is created as: - env.ARTIFACT_DIR / f"{next_count:03d}__{name}" - - Usage: - with run.Parallel("task_name") as parallel: - parallel.delayed(function1, arg1, arg2) - parallel.delayed(function2, arg3, kwarg=value) - # Tasks execute in parallel when exiting the context - # Each task can access its dedicated directory via env.ARTIFACT_DIR - """ - - def __init__(self, name, exit_on_exception=True, dedicated_dir=True): - """ - Initialize parallel execution context. - - Args: - name: Name for the parallel execution (used for artifact directory) - exit_on_exception: If True, kill process group on exception - dedicated_dir: If True, create dedicated artifact directory for this parallel execution - """ - self.name = name - self.parallel_tasks = None - self.exit_on_exception = exit_on_exception - self.dedicated_dir = dedicated_dir - - def __enter__(self): - """Enter the parallel context.""" - self.parallel_tasks = [] - return self - - def delayed(self, function, *args, **kwargs): - """ - Add a function to be executed in parallel. - - Args: - function: Function to execute - *args: Positional arguments for the function - **kwargs: Keyword arguments for the function - """ - # Simple task container - no joblib needed - from collections import namedtuple - - DelayedTask = namedtuple("DelayedTask", ["func", "args", "keywords"]) - task = DelayedTask(func=function, args=args, keywords=kwargs) - self.parallel_tasks.append(task) - - def __exit__(self, ex_type, ex_value, exc_traceback): - """Execute all delayed tasks in parallel with immediate cancellation on failure.""" - if ex_value: - logger.warning( - f"An exception occured while preparing the '{self.name}' Parallel execution ..." - ) - return False - - if self.dedicated_dir: - # Create dedicated directory without modifying global ARTIFACT_DIR - # to avoid race conditions in multithreaded execution. - # Each parallel task will inherit the current ARTIFACT_DIR context. - next_count = env.next_artifact_index() - parallel_dir = env.ARTIFACT_DIR / f"{next_count:03d}__{self.name}" - parallel_dir.mkdir(exist_ok=True) - logger.debug(f"Created parallel execution directory: {parallel_dir}") - else: - parallel_dir = None - - def _run_with_artifact_dir(func, artifact_dir, *args, **kwargs): - """Wrapper to run function with specific ARTIFACT_DIR for this thread.""" - if artifact_dir: - # Ensure thread has its own copy of ARTIFACT_DIR, then set it to dedicated directory - try: - original_artifact_dir = env.ensure_thread_artifact_dir() - except ValueError: - # No global ARTIFACT_DIR to inherit from - original_artifact_dir = None - - try: - env._set_tls_artifact_dir(artifact_dir) - return func(*args, **kwargs) - finally: - if original_artifact_dir is not None: - env._set_tls_artifact_dir(original_artifact_dir) - else: - # No dedicated directory, ensure thread has its own copy of main ARTIFACT_DIR - try: - env.ensure_thread_artifact_dir() - except ValueError: - pass # Continue without thread-local copy - return func(*args, **kwargs) - - # Use ThreadPoolExecutor for better cancellation control - max_workers = min(len(self.parallel_tasks), os.cpu_count() or 1) - - with ThreadPoolExecutor(max_workers=max_workers) as executor: - # Submit all tasks with artifact directory context - futures = [] - for delayed_func in self.parallel_tasks: - # Wrap each function to run with the dedicated artifact directory - future = executor.submit( - _run_with_artifact_dir, - delayed_func.func, - parallel_dir, - *delayed_func.args, - **delayed_func.keywords, - ) - futures.append(future) - - try: - # Wait for all tasks to complete - for future in as_completed(futures): - future.result() # This will raise any exception that occurred - - except Exception as e: - # Cancel all remaining tasks immediately - logger.error("Exception in parallel task, cancelling remaining tasks...") - for future in futures: - future.cancel() - - # Give a short time for cancellation to take effect - time.sleep(0.1) - - if not self.exit_on_exception: - raise e - - traceback.print_exc() - logger.error(f"Exception caught during the '{self.name}' Parallel execution.") - raise SystemExit(1) from e - - return False # If we returned True here, any exception would be suppressed! diff --git a/projects/core/library/run_parallel.py b/projects/core/library/run_parallel.py new file mode 100644 index 00000000..4b729831 --- /dev/null +++ b/projects/core/library/run_parallel.py @@ -0,0 +1,148 @@ +import logging +import os +import time +import traceback +from concurrent.futures import ThreadPoolExecutor, as_completed + +from . import env + +logger = logging.getLogger(__name__) + + +class Parallel: + """ + Context manager for parallel execution of tasks with immediate cancellation on failure. + + Uses ThreadPoolExecutor for multithreaded execution with thread-safe artifact directory handling. + When any task fails, all remaining tasks are immediately cancelled rather than waiting + for completion. + + Each parallel task runs with its own dedicated artifact directory accessible via env.ARTIFACT_DIR, + allowing tasks to write artifacts without conflicts. The dedicated directory is created as: + env.ARTIFACT_DIR / f"{next_count:03d}__{name}" + + Usage: + with run_parallel.Parallel("task_name") as parallel: + parallel.delayed(function1, arg1, arg2) + parallel.delayed(function2, arg3, kwarg=value) + # Tasks execute in parallel when exiting the context + # Each task can access its dedicated directory via env.ARTIFACT_DIR + """ + + def __init__(self, name, exit_on_exception=True, dedicated_dir=True): + """ + Initialize parallel execution context. + + Args: + name: Name for the parallel execution (used for artifact directory) + exit_on_exception: If True, kill process group on exception + dedicated_dir: If True, create dedicated artifact directory for this parallel execution + """ + self.name = name + self.parallel_tasks = None + self.exit_on_exception = exit_on_exception + self.dedicated_dir = dedicated_dir + + def __enter__(self): + """Enter the parallel context.""" + self.parallel_tasks = [] + return self + + def delayed(self, function, *args, **kwargs): + """ + Add a function to be executed in parallel. + + Args: + function: Function to execute + *args: Positional arguments for the function + **kwargs: Keyword arguments for the function + """ + # Simple task container - no joblib needed + from collections import namedtuple + + DelayedTask = namedtuple("DelayedTask", ["func", "args", "keywords"]) + task = DelayedTask(func=function, args=args, keywords=kwargs) + self.parallel_tasks.append(task) + + def __exit__(self, ex_type, ex_value, exc_traceback): + """Execute all delayed tasks in parallel with immediate cancellation on failure.""" + if ex_value: + logger.warning( + f"An exception occured while preparing the '{self.name}' Parallel execution ..." + ) + return False + + if self.dedicated_dir: + # Create dedicated directory without modifying global ARTIFACT_DIR + # to avoid race conditions in multithreaded execution. + # Each parallel task will inherit the current ARTIFACT_DIR context. + next_count = env.next_artifact_index() + parallel_dir = env.ARTIFACT_DIR / f"{next_count:03d}__{self.name}" + parallel_dir.mkdir(exist_ok=True) + logger.debug(f"Created parallel execution directory: {parallel_dir}") + else: + parallel_dir = None + + def _run_with_artifact_dir(func, artifact_dir, *args, **kwargs): + """Wrapper to run function with specific ARTIFACT_DIR for this thread.""" + if artifact_dir: + # Ensure thread has its own copy of ARTIFACT_DIR, then set it to dedicated directory + try: + original_artifact_dir = env.ensure_thread_artifact_dir() + except ValueError: + # No global ARTIFACT_DIR to inherit from + original_artifact_dir = None + + try: + env._set_tls_artifact_dir(artifact_dir) + return func(*args, **kwargs) + finally: + if original_artifact_dir is not None: + env._set_tls_artifact_dir(original_artifact_dir) + else: + # No dedicated directory, ensure thread has its own copy of main ARTIFACT_DIR + try: + env.ensure_thread_artifact_dir() + except ValueError: + pass # Continue without thread-local copy + return func(*args, **kwargs) + + # Use ThreadPoolExecutor for better cancellation control + max_workers = min(len(self.parallel_tasks), os.cpu_count() or 1) + + with ThreadPoolExecutor(max_workers=max_workers) as executor: + # Submit all tasks with artifact directory context + futures = [] + for delayed_func in self.parallel_tasks: + # Wrap each function to run with the dedicated artifact directory + future = executor.submit( + _run_with_artifact_dir, + delayed_func.func, + parallel_dir, + *delayed_func.args, + **delayed_func.keywords, + ) + futures.append(future) + + try: + # Wait for all tasks to complete + for future in as_completed(futures): + future.result() # This will raise any exception that occurred + + except Exception as e: + # Cancel all remaining tasks immediately + logger.error("Exception in parallel task, cancelling remaining tasks...") + for future in futures: + future.cancel() + + # Give a short time for cancellation to take effect + time.sleep(0.1) + + if not self.exit_on_exception: + raise e + + traceback.print_exc() + logger.error(f"Exception caught during the '{self.name}' Parallel execution.") + raise SystemExit(1) from e + + return False # If we returned True here, any exception would be suppressed! From d282d53154f98982fa2ace16f1f1498645015db3 Mon Sep 17 00:00:00 2001 From: Kevin Pouget Date: Mon, 4 May 2026 11:25:16 +0200 Subject: [PATCH 30/33] toolbox: simplify the entrypoint mechanism --- projects/cluster/toolbox/build_image/main.py | 9 +-- .../cluster/toolbox/rebuild_image/main.py | 9 +-- projects/core/dsl/__init__.py | 3 +- projects/core/dsl/runtime.py | 31 ++++++-- projects/core/dsl/task.py | 75 +++++++++++++++++++ .../toolbox/cleanup_fjob/main.py | 9 +-- .../toolbox/shutdown_fjobs/main.py | 9 +-- .../toolbox/submit_and_wait/main.py | 10 +-- .../skeleton/toolbox/cluster_info/main.py | 9 +-- 9 files changed, 120 insertions(+), 44 deletions(-) diff --git a/projects/cluster/toolbox/build_image/main.py b/projects/cluster/toolbox/build_image/main.py index 713cab6e..ad2a4949 100755 --- a/projects/cluster/toolbox/build_image/main.py +++ b/projects/cluster/toolbox/build_image/main.py @@ -12,12 +12,12 @@ from projects.core.dsl import ( always, + entrypoint, execute_tasks, retry, shell, task, template, - toolbox, ) logger = logging.getLogger("TOOLBOX") @@ -92,6 +92,7 @@ def _capture_all_container_logs(buildrun_name: str, namespace: str, artifact_dir logger.debug(f"Captured init container logs for '{container_name}' to {log_file}") +@entrypoint def run( repo_name: str, commit: str, @@ -327,9 +328,5 @@ def generate_build_summary(args, ctx): logger.info(f"Target Image: {getattr(ctx, 'image_tag', 'unknown')}") -# Create the main function using the toolbox library -main = toolbox.create_toolbox_main(run) - - if __name__ == "__main__": - main() + run.main() diff --git a/projects/cluster/toolbox/rebuild_image/main.py b/projects/cluster/toolbox/rebuild_image/main.py index 12d56057..79938aec 100644 --- a/projects/cluster/toolbox/rebuild_image/main.py +++ b/projects/cluster/toolbox/rebuild_image/main.py @@ -12,12 +12,12 @@ from projects.core.dsl import ( always, + entrypoint, execute_tasks, retry, shell, task, template, - toolbox, ) logger = logging.getLogger("DSL") @@ -92,6 +92,7 @@ def _capture_all_container_logs(buildrun_name: str, namespace: str, artifact_dir logger.debug(f"Captured init container logs for '{container_name}' to {log_file}") +@entrypoint def run( build_name: str, *, @@ -269,9 +270,5 @@ def generate_rebuild_summary(args, ctx): logger.info(f"Timeout: {args.timeout_minutes} minutes") -# Create the main function using the toolbox library -main = toolbox.create_toolbox_main(run) - - if __name__ == "__main__": - main() + run.main() diff --git a/projects/core/dsl/__init__.py b/projects/core/dsl/__init__.py index a95ba693..2e016c73 100644 --- a/projects/core/dsl/__init__.py +++ b/projects/core/dsl/__init__.py @@ -5,12 +5,13 @@ from . import context, shell, template, toolbox from .runtime import clear_tasks, execute_tasks from .script_manager import get_script_manager, reset_script_manager -from .task import RetryFailure, always, retry, task, when +from .task import RetryFailure, always, entrypoint, retry, task, when __all__ = [ "always", "clear_tasks", "context", + "entrypoint", "execute_tasks", "get_script_manager", "reset_script_manager", diff --git a/projects/core/dsl/runtime.py b/projects/core/dsl/runtime.py index 6fd26049..6bf3f52c 100644 --- a/projects/core/dsl/runtime.py +++ b/projects/core/dsl/runtime.py @@ -76,17 +76,36 @@ def execute_tasks(function_args: dict = None): filename = caller_frame.f_code.co_filename command_name = _get_toolbox_function_name(filename) + # Get DSL runtime parameters from function args or wrapper attributes + prefix = function_args.pop("artifact_dirname_prefix", None) + suffix = function_args.pop("artifact_dirname_suffix", None) + + # Also check if they're stored in the calling function (from @entrypoint decorator) + try: + # Get the calling function from the frame + calling_func = caller_frame.f_globals.get(caller_frame.f_code.co_name) + if calling_func and hasattr(calling_func, "_dsl_runtime_params"): + runtime_params = calling_func._dsl_runtime_params + prefix = prefix or runtime_params.get("artifact_dirname_prefix") + suffix = suffix or runtime_params.get("artifact_dirname_suffix") + except (AttributeError, KeyError): + # If we can't get the calling function, that's fine - just continue + pass + + # Debug logging to see if parameters are found + if suffix or prefix: + logger.info(f"DSL runtime: found prefix='{prefix}', suffix='{suffix}'") + # Prepend prefix to command name if provided - if prefix := function_args.get("artifact_dirname_prefix"): + if prefix: command_name = f"{prefix}_{command_name}" - # Remove DSL framework parameter from function args - del function_args["artifact_dirname_prefix"] # Append suffix to command name if provided - if suffix := function_args.get("artifact_dirname_suffix"): + if suffix: command_name = f"{command_name}_{suffix}" - # Remove DSL framework parameter from function args - del function_args["artifact_dirname_suffix"] + + # Log the final command name for debugging + logger.info(f"DSL runtime: using command_name='{command_name}'") # Get relative filename to match task registration try: diff --git a/projects/core/dsl/task.py b/projects/core/dsl/task.py index a58b96c2..64172ac7 100644 --- a/projects/core/dsl/task.py +++ b/projects/core/dsl/task.py @@ -342,3 +342,78 @@ def decorator(func): return func return decorator + + +def entrypoint(func): + """ + Mark a function as a DSL entrypoint, automatically adding artifact directory parameters + and creating a main() function for CLI execution. + + Automatically injects artifact_dirname_suffix and artifact_dirname_prefix parameters + to the function signature and creates a main() function accessible as func.main(). + + Usage: + @entrypoint + def run(project: str, cluster_name: str): + # Function will automatically accept artifact_dirname_suffix and artifact_dirname_prefix + pass + + if __name__ == "__main__": + run.main() + """ + # Get the original function signature + sig = inspect.signature(func) + + # Add the artifact directory parameters to the signature + new_params = list(sig.parameters.values()) + + # Add suffix parameter + suffix_param = inspect.Parameter( + "artifact_dirname_suffix", inspect.Parameter.KEYWORD_ONLY, default=None, annotation=str + ) + new_params.append(suffix_param) + + # Add prefix parameter + prefix_param = inspect.Parameter( + "artifact_dirname_prefix", inspect.Parameter.KEYWORD_ONLY, default=None, annotation=str + ) + new_params.append(prefix_param) + + # Create new signature + new_sig = sig.replace(parameters=new_params) + + @functools.wraps(func) + def wrapper(*args, **kwargs): + # Get the original function's parameter names + orig_sig = inspect.signature(func) + orig_param_names = set(orig_sig.parameters.keys()) + + # Split kwargs into original function params and DSL runtime params + func_kwargs = {} + dsl_kwargs = {} + + for key, value in kwargs.items(): + if key in orig_param_names: + func_kwargs[key] = value + else: + dsl_kwargs[key] = value + + # Store DSL parameters for runtime access + wrapper._dsl_runtime_params = dsl_kwargs + + return func(*args, **func_kwargs) + + # Set the new signature on the wrapper + wrapper.__signature__ = new_sig + + # Create main function for CLI execution + def main(): + """CLI entrypoint with dynamic argument discovery""" + from . import toolbox + + toolbox.run_toolbox_command(wrapper) + + # Attach main function to the wrapper + wrapper.main = main + + return wrapper diff --git a/projects/fournos_launcher/toolbox/cleanup_fjob/main.py b/projects/fournos_launcher/toolbox/cleanup_fjob/main.py index f054c355..4a7691c2 100644 --- a/projects/fournos_launcher/toolbox/cleanup_fjob/main.py +++ b/projects/fournos_launcher/toolbox/cleanup_fjob/main.py @@ -8,15 +8,16 @@ import logging from projects.core.dsl import ( + entrypoint, execute_tasks, shell, task, - toolbox, ) logger = logging.getLogger(__name__) +@entrypoint def run( job_name: str, namespace: str = "fournos-jobs", @@ -124,9 +125,5 @@ def cleanup_fjob(args, ctx): return f"Successfully cleaned up job: {args.job_name}" -# Create the main function using the toolbox library -main = toolbox.create_toolbox_main(run) - - if __name__ == "__main__": - main() + run.main() diff --git a/projects/fournos_launcher/toolbox/shutdown_fjobs/main.py b/projects/fournos_launcher/toolbox/shutdown_fjobs/main.py index b03b6645..7e7fc22e 100755 --- a/projects/fournos_launcher/toolbox/shutdown_fjobs/main.py +++ b/projects/fournos_launcher/toolbox/shutdown_fjobs/main.py @@ -10,15 +10,16 @@ import shlex from projects.core.dsl import ( + entrypoint, execute_tasks, shell, task, - toolbox, ) logger = logging.getLogger(__name__) +@entrypoint def run( namespace: str = "psap-automation", ci_label: str = None, @@ -231,9 +232,5 @@ def capture_final_status(args, ctx): return f"Captured status for {len(ctx.job_names)} jobs in artifacts/ directory" -# Create the main function using the toolbox library -main = toolbox.create_toolbox_main(run) - - if __name__ == "__main__": - main() + run.main() diff --git a/projects/fournos_launcher/toolbox/submit_and_wait/main.py b/projects/fournos_launcher/toolbox/submit_and_wait/main.py index 15351f01..6693ef42 100755 --- a/projects/fournos_launcher/toolbox/submit_and_wait/main.py +++ b/projects/fournos_launcher/toolbox/submit_and_wait/main.py @@ -11,12 +11,12 @@ from projects.core.dsl import ( always, + entrypoint, execute_tasks, retry, shell, task, template, - toolbox, ) from projects.core.dsl.utils.k8s import sanitize_k8s_name from projects.core.library import env as env_mod @@ -24,6 +24,7 @@ logger = logging.getLogger(__name__) +@entrypoint def run( cluster_name: str, project: str, @@ -41,7 +42,6 @@ def run( exclusive: bool = True, gpu_count: int = None, gpu_type: str = None, - artifact_dirname_suffix: str = None, ): """ Submit a FOURNOS job and wait for completion @@ -355,9 +355,5 @@ def capture_pod_specs(args, ctx): return f"Wrote the pod spec file under {artifact_dir} (label {label})" -# Create the main function using the toolbox library -main = toolbox.create_toolbox_main(run) - - if __name__ == "__main__": - main() + run.main() diff --git a/projects/skeleton/toolbox/cluster_info/main.py b/projects/skeleton/toolbox/cluster_info/main.py index 44f4a838..c7ba4adb 100755 --- a/projects/skeleton/toolbox/cluster_info/main.py +++ b/projects/skeleton/toolbox/cluster_info/main.py @@ -8,13 +8,14 @@ """ from projects.core.dsl import ( + entrypoint, execute_tasks, shell, task, - toolbox, ) +@entrypoint def run(*, output_format: str = "text"): """ Gather basic cluster and environment information @@ -73,9 +74,5 @@ def get_cluster_nodes(args, ctx): ) -# Create the main function using the toolbox library -main = toolbox.create_toolbox_main(run) - - if __name__ == "__main__": - main() + run.main() From 1da5b7d1e7eaf62cc9a2089b902f97ae25fa1c24 Mon Sep 17 00:00:00 2001 From: Kevin Pouget Date: Mon, 4 May 2026 13:34:52 +0200 Subject: [PATCH 31/33] [fournos_launcher] orchestration: signal handlers --- projects/fournos_launcher/orchestration/job_management.py | 2 ++ projects/fournos_launcher/orchestration/submit.py | 5 +++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/projects/fournos_launcher/orchestration/job_management.py b/projects/fournos_launcher/orchestration/job_management.py index 82631b18..a82fe7b9 100644 --- a/projects/fournos_launcher/orchestration/job_management.py +++ b/projects/fournos_launcher/orchestration/job_management.py @@ -69,6 +69,7 @@ def shutdown_running_fjobs(): except Exception as e: logger.error(f"Exception while shutting down FournosJobs: {e}") + raise def shutdown_fjobs_on_interrupt(): @@ -81,3 +82,4 @@ def shutdown_fjobs_on_interrupt(): shutdown_running_fjobs() except Exception as e: logger.error(f"Failed to shutdown FournosJobs: {e}") + raise diff --git a/projects/fournos_launcher/orchestration/submit.py b/projects/fournos_launcher/orchestration/submit.py index 203094d9..2611f343 100644 --- a/projects/fournos_launcher/orchestration/submit.py +++ b/projects/fournos_launcher/orchestration/submit.py @@ -2,6 +2,7 @@ import os import pathlib import signal +import sys import threading import traceback from datetime import datetime @@ -24,7 +25,7 @@ def _signal_handler_sigint(sig, frame): print("\n🚫 FOURNOS launcher received SIGINT - shutting down jobs...") env.reset_artifact_dir() job_management.shutdown_fjobs_on_interrupt() - # Don't call sys.exit here - let the original handler handle it + sys.exit(137) def _signal_handler_sigterm(sig, frame): @@ -32,7 +33,7 @@ def _signal_handler_sigterm(sig, frame): print("\nšŸ›‘ FOURNOS launcher received SIGTERM - shutting down jobs...") env.reset_artifact_dir() job_management.shutdown_fjobs_on_interrupt() - # Don't call sys.exit here - let the original handler handle it + sys.exit(143) def _setup_signal_handlers(): From 77fe4472b9b592f06cb13af82aefa0c608e96d95 Mon Sep 17 00:00:00 2001 From: Kevin Pouget Date: Mon, 4 May 2026 13:52:46 +0200 Subject: [PATCH 32/33] [core] dsl: runtime: use a thread safe logging --- projects/core/dsl/runtime.py | 50 +++++++++++++++++++++++++++++------- 1 file changed, 41 insertions(+), 9 deletions(-) diff --git a/projects/core/dsl/runtime.py b/projects/core/dsl/runtime.py index 6bf3f52c..d05d91ac 100644 --- a/projects/core/dsl/runtime.py +++ b/projects/core/dsl/runtime.py @@ -5,6 +5,7 @@ import inspect import logging import os +import threading import types from datetime import datetime from pathlib import Path @@ -234,10 +235,11 @@ def execute_tasks(function_args: dict = None): finally: # Clear thread-local execution context script_manager.clear_execution_context() - # Clean up the file handler to prevent leaks - dsl_logger = logging.getLogger("DSL") - dsl_logger.removeHandler(file_handler) - file_handler.close() + # Clean up the thread-local file handler to prevent leaks + if hasattr(_thread_local_handlers, "file_handler"): + _thread_local_handlers.file_handler.close() + # Remove the reference to prevent memory leaks + del _thread_local_handlers.file_handler def _execute_single_task(task_info, args, shared_context): @@ -386,20 +388,50 @@ def _generate_env_file(meta_dir): logger.debug(f"Generated environment file: {env_file}") +# Thread-local storage for DSL logger handlers +_thread_local_handlers = threading.local() + + +class ThreadLocalHandler(logging.Handler): + """A logging handler that routes messages to thread-specific files""" + + def __init__(self): + super().__init__() + + def emit(self, record): + # Only emit if we have a thread-local file handler for this thread + if hasattr(_thread_local_handlers, "file_handler"): + try: + _thread_local_handlers.file_handler.emit(record) + except Exception: + # Ignore errors in logging to avoid breaking execution + pass + + def _setup_execution_logging(artifact_dir): - """Setup file logging to capture all stdout/stderr during execution""" + """Setup thread-safe file logging to capture all stdout/stderr during execution""" log_file = artifact_dir / "task.log" - # Create file handler for the DSL logger + # Create file handler for this specific execution file_handler = logging.FileHandler(log_file, mode="w") file_handler.setLevel(logging.INFO) # Use same format as console output file_handler.setFormatter(logging.Formatter("%(message)s")) - # Add handler to the DSL logger so all DSL modules inherit it - dsl_logger = logging.getLogger("DSL") - dsl_logger.addHandler(file_handler) + # Store the file handler in thread-local storage + _thread_local_handlers.file_handler = file_handler + + # Add thread-local handler to main DSL logger only once (globally) + main_dsl_logger = logging.getLogger("DSL") + + # Check if our thread-local handler is already added + has_thread_handler = any(isinstance(h, ThreadLocalHandler) for h in main_dsl_logger.handlers) + + if not has_thread_handler: + thread_handler = ThreadLocalHandler() + thread_handler.setLevel(logging.INFO) + main_dsl_logger.addHandler(thread_handler) return log_file, file_handler From 6124bae86262d73e6917ae7315e98b73cca7c35e Mon Sep 17 00:00:00 2001 From: Kevin Pouget Date: Mon, 4 May 2026 13:55:55 +0200 Subject: [PATCH 33/33] [core] library: env: make the ordering thread safe --- projects/core/library/env.py | 44 +++++++++++++++++++++--------------- 1 file changed, 26 insertions(+), 18 deletions(-) diff --git a/projects/core/library/env.py b/projects/core/library/env.py index 425e2ca8..09f92da0 100644 --- a/projects/core/library/env.py +++ b/projects/core/library/env.py @@ -12,6 +12,9 @@ # Thread-local storage for ARTIFACT_DIR (thread-safe) _tls_artifact_dir = threading.local() +# Global lock for artifact directory numbering to ensure sequential numbering in parallel execution +_artifact_dir_lock = threading.Lock() + def __getattr__(name): """Support thread-local ARTIFACT_DIR access.""" @@ -114,33 +117,38 @@ def init(daily_artifact_dir=False): def NextArtifactDir(name, *, lock=None, counter_p=None): - if lock: - with lock: - next_count = counter_p[0] - counter_p[0] += 1 - else: - next_count = next_artifact_index() + # Use global lock to ensure sequential numbering in parallel execution + with _artifact_dir_lock: + if lock: + with lock: + next_count = counter_p[0] + counter_p[0] += 1 + else: + next_count = next_artifact_index() - # Use thread-local ARTIFACT_DIR for directory creation - current_artifact_dir = None - try: - current_artifact_dir = _tls_artifact_dir.val - except AttributeError: - # Fallback to global if thread-local not set - current_artifact_dir = globals().get("_GLOBAL_ARTIFACT_DIR") + # Use thread-local ARTIFACT_DIR for directory creation + current_artifact_dir = None + try: + current_artifact_dir = _tls_artifact_dir.val + except AttributeError: + # Fallback to global if thread-local not set + current_artifact_dir = globals().get("_GLOBAL_ARTIFACT_DIR") - if current_artifact_dir is None: - raise ValueError("ARTIFACT_DIR not set in either thread-local or global scope") + if current_artifact_dir is None: + raise ValueError("ARTIFACT_DIR not set in either thread-local or global scope") - dirname = current_artifact_dir / f"{next_count:03d}__{name}" + dirname = current_artifact_dir / f"{next_count:03d}__{name}" - return TempArtifactDir(dirname) + # Create the TempArtifactDir which will mkdir in __init__ + return TempArtifactDir(dirname) class TempArtifactDir: def __init__(self, dirname): self.dirname = pathlib.Path(dirname) self.previous_dirname = None + # Create directory immediately to ensure proper numbering sequence + self.dirname.mkdir(exist_ok=True) def __enter__(self): # Store current thread-local ARTIFACT_DIR @@ -156,8 +164,8 @@ def __enter__(self): # Set global for main thread compatibility _set_artifact_dir(self.dirname) - self.dirname.mkdir(exist_ok=True) # Always set thread-local (each thread gets its own) + # Note: directory is already created in __init__ _set_tls_artifact_dir(self.dirname) return True