diff --git a/projects/cluster/toolbox/build_image/main.py b/projects/cluster/toolbox/build_image/main.py index 713cab6e..ad2a4949 100755 --- a/projects/cluster/toolbox/build_image/main.py +++ b/projects/cluster/toolbox/build_image/main.py @@ -12,12 +12,12 @@ from projects.core.dsl import ( always, + entrypoint, execute_tasks, retry, shell, task, template, - toolbox, ) logger = logging.getLogger("TOOLBOX") @@ -92,6 +92,7 @@ def _capture_all_container_logs(buildrun_name: str, namespace: str, artifact_dir logger.debug(f"Captured init container logs for '{container_name}' to {log_file}") +@entrypoint def run( repo_name: str, commit: str, @@ -327,9 +328,5 @@ def generate_build_summary(args, ctx): logger.info(f"Target Image: {getattr(ctx, 'image_tag', 'unknown')}") -# Create the main function using the toolbox library -main = toolbox.create_toolbox_main(run) - - if __name__ == "__main__": - main() + run.main() diff --git a/projects/cluster/toolbox/rebuild_image/main.py b/projects/cluster/toolbox/rebuild_image/main.py index 12d56057..79938aec 100644 --- a/projects/cluster/toolbox/rebuild_image/main.py +++ b/projects/cluster/toolbox/rebuild_image/main.py @@ -12,12 +12,12 @@ from projects.core.dsl import ( always, + entrypoint, execute_tasks, retry, shell, task, template, - toolbox, ) logger = logging.getLogger("DSL") @@ -92,6 +92,7 @@ def _capture_all_container_logs(buildrun_name: str, namespace: str, artifact_dir logger.debug(f"Captured init container logs for '{container_name}' to {log_file}") +@entrypoint def run( build_name: str, *, @@ -269,9 +270,5 @@ def generate_rebuild_summary(args, ctx): logger.info(f"Timeout: {args.timeout_minutes} minutes") -# Create the main function using the toolbox library -main = toolbox.create_toolbox_main(run) - - if __name__ == "__main__": - main() + run.main() diff --git a/projects/core/ci_entrypoint/fournos.py b/projects/core/ci_entrypoint/fournos.py index 06ffe918..f4b95c82 100644 --- a/projects/core/ci_entrypoint/fournos.py +++ b/projects/core/ci_entrypoint/fournos.py @@ -12,8 +12,6 @@ import yaml -from projects.core.library import run - logger = logging.getLogger(__name__) @@ -129,8 +127,8 @@ def parse_and_save_pr_arguments_fournos(): # Create CI metadata directory metadata_dir.mkdir(parents=True, exist_ok=True) - # Fetch and save FournosJob YAML - fjob, fjob_spec = fetch_and_save_fjob_yaml(metadata_dir / "fjob.yaml") + # Load FournosJob YAML + fjob, fjob_spec = load_fjob_yaml(metadata_dir.parent / "fournos_fjob.yaml") if not fjob_spec: raise ValueError("FournosJob YAML not found, cannot parse FOURNOS PR arguments") @@ -247,40 +245,38 @@ def process_forge_config(fjob_spec, metadata_dir, fjob): return variable_overrides -def fetch_and_save_fjob_yaml(fjob_yaml_dest): +def load_fjob_yaml(fjob_yaml_path): """ - Fetch FournosJob YAML and save to artifact directory. + Load FournosJob YAML from file. Args: - fjob_yaml_dest: Path where to save the FournosJob YAML + fjob_yaml_path: Path to the FournosJob YAML file Returns: Tuple of (full_fjob, fjob_spec), or (None, None) if not found """ - - namespace = "psap-automation-wip" - - result = run.run( - f"oc get fjobs -n {namespace} -oyaml", - capture_stdout=True, - ) - - fjobs_data = yaml.safe_load(result.stdout) - if not (fjobs_data and "items" in fjobs_data and fjobs_data["items"]): - logger.warning("No FournosJobs found") + if not fjob_yaml_path.exists(): + logger.warning(f"FournosJob YAML file not found: {fjob_yaml_path}") return None, None - # Take the last fjob from the list - fjob = fjobs_data["items"][-1] - try: - del fjob["status"] - except KeyError: - pass # ignore + with open(fjob_yaml_path) as f: + fjob = yaml.safe_load(f) - with open(fjob_yaml_dest, "w") as f: - yaml.dump(fjob, f, default_flow_style=False, sort_keys=False) + if not fjob: + logger.warning(f"Empty or invalid FournosJob YAML: {fjob_yaml_path}") + return None, None - logger.info(f"Saved FournosJob YAML to {fjob_yaml_dest}") + # Remove status if present + try: + del fjob["status"] + except KeyError: + pass # ignore - return fjob, fjob["spec"] + logger.info(f"Loaded FournosJob YAML from {fjob_yaml_path}") + + return fjob, fjob["spec"] + + except Exception as e: + logger.error(f"Failed to load FournosJob YAML from {fjob_yaml_path}: {e}") + return None, None diff --git a/projects/core/ci_entrypoint/fournos_resolve.py b/projects/core/ci_entrypoint/fournos_resolve.py index 3f5b4a18..b115a7b1 100644 --- a/projects/core/ci_entrypoint/fournos_resolve.py +++ b/projects/core/ci_entrypoint/fournos_resolve.py @@ -30,11 +30,11 @@ def fetch_fournos_job() -> tuple[str, str, dict]: RuntimeError: If fetch or parsing fails """ # Get environment variables - job_name = os.environ.get("FOURNOS_JOB_NAME") + job_name = os.environ.get("FJOB_NAME") namespace = os.environ.get("FOURNOS_NAMESPACE") if not job_name: - raise ValueError("FOURNOS_JOB_NAME environment variable is required") + raise ValueError("FJOB_NAME environment variable is required") if not namespace: raise ValueError("FOURNOS_NAMESPACE environment variable is required") @@ -197,8 +197,8 @@ def create_fournos_resolve_command( @click.command("resolve-fournos-config") @click.option( "--fjob-name", - help="FournosJob name (sets FOURNOS_JOB_NAME if provided)", - envvar="FOURNOS_JOB_NAME", + help="FournosJob name (sets FJOB_NAME if provided)", + envvar="FJOB_NAME", ) @click.option( "--namespace", @@ -216,7 +216,7 @@ def fournos_resolve_command(ctx, fjob_name, namespace, dry_run): """Resolve the FournosJob object configuration.""" if fjob_name: - os.environ["FOURNOS_JOB_NAME"] = fjob_name + os.environ["FJOB_NAME"] = fjob_name if namespace: os.environ["FOURNOS_NAMESPACE"] = namespace diff --git a/projects/core/dsl/__init__.py b/projects/core/dsl/__init__.py index a95ba693..2e016c73 100644 --- a/projects/core/dsl/__init__.py +++ b/projects/core/dsl/__init__.py @@ -5,12 +5,13 @@ from . import context, shell, template, toolbox from .runtime import clear_tasks, execute_tasks from .script_manager import get_script_manager, reset_script_manager -from .task import RetryFailure, always, retry, task, when +from .task import RetryFailure, always, entrypoint, retry, task, when __all__ = [ "always", "clear_tasks", "context", + "entrypoint", "execute_tasks", "get_script_manager", "reset_script_manager", diff --git a/projects/core/dsl/runtime.py b/projects/core/dsl/runtime.py index d1afff31..d05d91ac 100644 --- a/projects/core/dsl/runtime.py +++ b/projects/core/dsl/runtime.py @@ -5,6 +5,7 @@ import inspect import logging import os +import threading import types from datetime import datetime from pathlib import Path @@ -76,6 +77,37 @@ def execute_tasks(function_args: dict = None): filename = caller_frame.f_code.co_filename command_name = _get_toolbox_function_name(filename) + # Get DSL runtime parameters from function args or wrapper attributes + prefix = function_args.pop("artifact_dirname_prefix", None) + suffix = function_args.pop("artifact_dirname_suffix", None) + + # Also check if they're stored in the calling function (from @entrypoint decorator) + try: + # Get the calling function from the frame + calling_func = caller_frame.f_globals.get(caller_frame.f_code.co_name) + if calling_func and hasattr(calling_func, "_dsl_runtime_params"): + runtime_params = calling_func._dsl_runtime_params + prefix = prefix or runtime_params.get("artifact_dirname_prefix") + suffix = suffix or runtime_params.get("artifact_dirname_suffix") + except (AttributeError, KeyError): + # If we can't get the calling function, that's fine - just continue + pass + + # Debug logging to see if parameters are found + if suffix or prefix: + logger.info(f"DSL runtime: found prefix='{prefix}', suffix='{suffix}'") + + # Prepend prefix to command name if provided + if prefix: + command_name = f"{prefix}_{command_name}" + + # Append suffix to command name if provided + if suffix: + command_name = f"{command_name}_{suffix}" + + # Log the final command name for debugging + logger.info(f"DSL runtime: using command_name='{command_name}'") + # Get relative filename to match task registration try: rel_filename = str(Path(filename).relative_to(env.FORGE_HOME)) @@ -109,6 +141,10 @@ def execute_tasks(function_args: dict = None): # Execute tasks only from the calling file script_manager = get_script_manager() + + # Start thread-local execution context for this execution + script_manager.start_execution_context(rel_filename) + file_tasks = list(script_manager.get_tasks_from_file(rel_filename)) if not file_tasks: @@ -195,11 +231,15 @@ def execute_tasks(function_args: dict = None): shared_context.__dict__["artifact_dir"] = args.artifact_dir return shared_context + finally: - # Clean up the file handler to prevent leaks - dsl_logger = logging.getLogger("DSL") - dsl_logger.removeHandler(file_handler) - file_handler.close() + # Clear thread-local execution context + script_manager.clear_execution_context() + # Clean up the thread-local file handler to prevent leaks + if hasattr(_thread_local_handlers, "file_handler"): + _thread_local_handlers.file_handler.close() + # Remove the reference to prevent memory leaks + del _thread_local_handlers.file_handler def _execute_single_task(task_info, args, shared_context): @@ -348,20 +388,50 @@ def _generate_env_file(meta_dir): logger.debug(f"Generated environment file: {env_file}") +# Thread-local storage for DSL logger handlers +_thread_local_handlers = threading.local() + + +class ThreadLocalHandler(logging.Handler): + """A logging handler that routes messages to thread-specific files""" + + def __init__(self): + super().__init__() + + def emit(self, record): + # Only emit if we have a thread-local file handler for this thread + if hasattr(_thread_local_handlers, "file_handler"): + try: + _thread_local_handlers.file_handler.emit(record) + except Exception: + # Ignore errors in logging to avoid breaking execution + pass + + def _setup_execution_logging(artifact_dir): - """Setup file logging to capture all stdout/stderr during execution""" + """Setup thread-safe file logging to capture all stdout/stderr during execution""" log_file = artifact_dir / "task.log" - # Create file handler for the DSL logger + # Create file handler for this specific execution file_handler = logging.FileHandler(log_file, mode="w") file_handler.setLevel(logging.INFO) # Use same format as console output file_handler.setFormatter(logging.Formatter("%(message)s")) - # Add handler to the DSL logger so all DSL modules inherit it - dsl_logger = logging.getLogger("DSL") - dsl_logger.addHandler(file_handler) + # Store the file handler in thread-local storage + _thread_local_handlers.file_handler = file_handler + + # Add thread-local handler to main DSL logger only once (globally) + main_dsl_logger = logging.getLogger("DSL") + + # Check if our thread-local handler is already added + has_thread_handler = any(isinstance(h, ThreadLocalHandler) for h in main_dsl_logger.handlers) + + if not has_thread_handler: + thread_handler = ThreadLocalHandler() + thread_handler.setLevel(logging.INFO) + main_dsl_logger.addHandler(thread_handler) return log_file, file_handler diff --git a/projects/core/dsl/script_manager.py b/projects/core/dsl/script_manager.py index 28758be2..dfa1cd89 100644 --- a/projects/core/dsl/script_manager.py +++ b/projects/core/dsl/script_manager.py @@ -5,6 +5,7 @@ """ import logging +import threading logger = logging.getLogger("DSL") @@ -12,8 +13,8 @@ class TaskResult: """Container for task results that can be referenced in conditions""" - def __init__(self, task_name: str): - self.task_name = task_name + def __init__(self, task_id: str): + self.task_id = task_id self._result = None self._executed = False @@ -40,6 +41,10 @@ def __init__(self): self._task_registry: dict[str, list[dict]] = {} # Task results organized by task name self._task_results: dict[str, TaskResult] = {} + # Thread-local storage for execution-specific task results + self._thread_local = threading.local() + # Lock for thread-safe registration + self._lock = threading.Lock() def register_task(self, task_info: dict, source_file: str) -> None: """ @@ -49,20 +54,26 @@ def register_task(self, task_info: dict, source_file: str) -> None: task_info: Dictionary containing task metadata source_file: Relative path to the source file defining the task """ - if source_file not in self._task_registry: - self._task_registry[source_file] = [] + with self._lock: + if source_file not in self._task_registry: + self._task_registry[source_file] = [] - self._task_registry[source_file].append(task_info) + self._task_registry[source_file].append(task_info) - # Create result container for this task - task_name = task_info["name"] - self._task_results[task_name] = TaskResult(task_name) + # Create result container for this task + task_id = task_info["id"] + self._task_results[task_id] = TaskResult(task_id) - logger.debug(f"Registered task '{task_name}' from {source_file}") + logger.debug(f"Registered task '{task_id}' from {source_file}") - def get_task_result(self, task_name: str) -> TaskResult | None: + def get_task_result(self, task_id: str) -> TaskResult | None: """Get the result container for a specific task""" - return self._task_results.get(task_name) + # Check thread-local results (for current execution) + thread_results = getattr(self._thread_local, "task_results", None) + if thread_results and task_id in thread_results: + return thread_results[task_id] + + return None def get_tasks_from_file(self, source_file: str) -> list[dict]: """ @@ -96,9 +107,9 @@ def clear_tasks(self, source_file: str | None = None) -> None: # Clear task results for tasks from this file for task_info in tasks_to_remove: - task_name = task_info["name"] - if task_name in self._task_results: - del self._task_results[task_name] + task_id = task_info["id"] + if task_id in self._task_results: + del self._task_results[task_id] # Remove tasks from this file del self._task_registry[source_file] @@ -125,6 +136,40 @@ def get_total_task_count(self) -> int: """Get the total number of registered tasks across all files""" return sum(len(tasks) for tasks in self._task_registry.values()) + def start_execution_context(self, source_file: str) -> None: + """ + Start a new execution context for the current thread. + + Creates thread-local task results for tasks from the specified file. + This allows parallel executions to have isolated task results. + + Args: + source_file: Source file being executed + """ + if not self.has_execution_context(): + self._thread_local.task_results = {} + + # Create thread-local task results for tasks in this file + file_tasks = self.get_tasks_from_file(source_file) + for task_info in file_tasks: + task_id = task_info["id"] + if task_id not in self._thread_local.task_results: + self._thread_local.task_results[task_id] = TaskResult(task_id) + + logger.debug( + f"Started execution context for {source_file} in thread {threading.current_thread().name}" + ) + + def clear_execution_context(self) -> None: + """Clear the thread-local execution context.""" + if self.has_execution_context(): + del self._thread_local.task_results + logger.debug(f"Cleared execution context in thread {threading.current_thread().name}") + + def has_execution_context(self) -> bool: + """Check if the current thread has an active execution context.""" + return hasattr(self._thread_local, "task_results") + # Global script manager instance # This provides the interface while keeping state encapsulated diff --git a/projects/core/dsl/task.py b/projects/core/dsl/task.py index d18e4cdb..64172ac7 100644 --- a/projects/core/dsl/task.py +++ b/projects/core/dsl/task.py @@ -230,7 +230,8 @@ def wrapper(*args, **kwargs): result = func(*args, **kwargs) # Store result for conditional execution script_manager = get_script_manager() - task_result = script_manager.get_task_result(task_name) + task_id = wrapper._task_info["id"] + task_result = script_manager.get_task_result(task_id) if task_result: task_result._set_result(result) return result @@ -249,6 +250,7 @@ def wrapper(*args, **kwargs): # Register the task with the script manager task_info = { + "id": f"{rel_definition_filename}:{definition_line_no}", "name": func.__name__, "func": wrapper, "condition": getattr(func, "_when_condition", None), @@ -263,7 +265,7 @@ def wrapper(*args, **kwargs): wrapper._task_info = task_info # Make the result accessible as an attribute of the function - wrapper.status = script_manager.get_task_result(func.__name__) + wrapper.status = script_manager.get_task_result(task_info["id"]) return wrapper @@ -340,3 +342,78 @@ def decorator(func): return func return decorator + + +def entrypoint(func): + """ + Mark a function as a DSL entrypoint, automatically adding artifact directory parameters + and creating a main() function for CLI execution. + + Automatically injects artifact_dirname_suffix and artifact_dirname_prefix parameters + to the function signature and creates a main() function accessible as func.main(). + + Usage: + @entrypoint + def run(project: str, cluster_name: str): + # Function will automatically accept artifact_dirname_suffix and artifact_dirname_prefix + pass + + if __name__ == "__main__": + run.main() + """ + # Get the original function signature + sig = inspect.signature(func) + + # Add the artifact directory parameters to the signature + new_params = list(sig.parameters.values()) + + # Add suffix parameter + suffix_param = inspect.Parameter( + "artifact_dirname_suffix", inspect.Parameter.KEYWORD_ONLY, default=None, annotation=str + ) + new_params.append(suffix_param) + + # Add prefix parameter + prefix_param = inspect.Parameter( + "artifact_dirname_prefix", inspect.Parameter.KEYWORD_ONLY, default=None, annotation=str + ) + new_params.append(prefix_param) + + # Create new signature + new_sig = sig.replace(parameters=new_params) + + @functools.wraps(func) + def wrapper(*args, **kwargs): + # Get the original function's parameter names + orig_sig = inspect.signature(func) + orig_param_names = set(orig_sig.parameters.keys()) + + # Split kwargs into original function params and DSL runtime params + func_kwargs = {} + dsl_kwargs = {} + + for key, value in kwargs.items(): + if key in orig_param_names: + func_kwargs[key] = value + else: + dsl_kwargs[key] = value + + # Store DSL parameters for runtime access + wrapper._dsl_runtime_params = dsl_kwargs + + return func(*args, **func_kwargs) + + # Set the new signature on the wrapper + wrapper.__signature__ = new_sig + + # Create main function for CLI execution + def main(): + """CLI entrypoint with dynamic argument discovery""" + from . import toolbox + + toolbox.run_toolbox_command(wrapper) + + # Attach main function to the wrapper + wrapper.main = main + + return wrapper diff --git a/projects/core/library/config.py b/projects/core/library/config.py index 55d3cb24..d84005c4 100644 --- a/projects/core/library/config.py +++ b/projects/core/library/config.py @@ -114,6 +114,33 @@ def save_config_overrides(self): self.config["overrides"] = variable_overrides + def _create_first_parent_config_key(self, key: str, value) -> None: + """Create a new config key if its parent exists and is a dict.""" + key_parts = key.split(".") + if len(key_parts) <= 1: + raise ValueError( + f"Config key '{key}' does not exist, and cannot create it at the moment :/" + ) + + parent_key = ".".join(key_parts[:-1]) + try: + parent_value = self.get_config( + parent_key, print=False, warn=False, handled_secretly=True + ) + except Exception as e: + raise ValueError( + f"Config key '{key}' does not exist, and cannot create it at the moment :/" + ) from e + + if not isinstance(parent_value, dict): + raise ValueError( + f"Config key '{key}' does not exist, and cannot create it at the moment :/" + ) + + # Parent exists and is a dict, create the new key + child_key = key_parts[-1] + parent_value[child_key] = value + def apply_config_overrides( self, *, ignore_not_found=False, variable_overrides_path=None, log=True ): @@ -146,12 +173,22 @@ def apply_config_overrides( handled_secretly=handled_secretly, ) if current_value == MAGIC_DEFAULT_VALUE: - if ignore_not_found: + try: + # Try to create the key if parent exists and is a dict + self._create_first_parent_config_key(key, value) + self.save_config() + except ValueError: + if not ignore_not_found: + raise + + if log: + logger.info(f"config override IGNORED: {key} --> {value}") continue - raise ValueError( - f"Config key '{key}' does not exist, and cannot create it at the moment :/" - ) + self.save_config() + if log: + logger.info(f"config override (new key): {key} --> {value}") + continue self.set_config(key, value, print=False) actual_value = self.get_config( @@ -439,7 +476,7 @@ def init(orchestration_dir, *, apply_config_overrides=True): if not apply_config_overrides: logger.info( - "config.init: running with 'apply_config_overrides', " + "config.init: running with 'apply_config_overrides=False', " "skipping the overrides. Saving it as 'overrides' " "field in the project configuration." ) diff --git a/projects/core/library/env.py b/projects/core/library/env.py index 7cd4c44f..09f92da0 100644 --- a/projects/core/library/env.py +++ b/projects/core/library/env.py @@ -1,23 +1,84 @@ import logging import os import pathlib +import threading import time -ARTIFACT_DIR = None +# ARTIFACT_DIR is accessed via __getattr__ for thread-local support BASE_ARTIFACT_DIR = None # Immutable copy of the initial ARTIFACT_DIR +_GLOBAL_ARTIFACT_DIR = None # Internal global fallback FORGE_HOME = pathlib.Path(__file__).parents[3] +# Thread-local storage for ARTIFACT_DIR (thread-safe) +_tls_artifact_dir = threading.local() + +# Global lock for artifact directory numbering to ensure sequential numbering in parallel execution +_artifact_dir_lock = threading.Lock() + + +def __getattr__(name): + """Support thread-local ARTIFACT_DIR access.""" + if name == "ARTIFACT_DIR": + # Each thread (including main) gets its own copy + try: + return _tls_artifact_dir.val + except AttributeError: + # Thread-local not set - this should not happen after proper initialization + # Return global as emergency fallback but warn + import logging + + logger = logging.getLogger(__name__) + logger.warning( + f"Thread {threading.current_thread().name} accessing ARTIFACT_DIR without thread-local copy" + ) + return globals().get("_GLOBAL_ARTIFACT_DIR") + + return globals()[name] + + +def get_tls_artifact_dir(): + """Get thread-local artifact directory.""" + try: + return _tls_artifact_dir.val + except AttributeError: + # Thread-local not set - this should not happen after proper initialization + import logging + + logger = logging.getLogger(__name__) + logger.warning(f"Thread {threading.current_thread().name} has no thread-local ARTIFACT_DIR") + return None + + +def _set_tls_artifact_dir(value): + """Set thread-local artifact directory (thread-safe).""" + _tls_artifact_dir.val = value + + +def ensure_thread_artifact_dir(): + """Ensure current thread has its own copy of ARTIFACT_DIR.""" + try: + # Thread already has its own copy + return _tls_artifact_dir.val + except AttributeError: + # Thread doesn't have a copy, inherit from global + global_artifact_dir = globals().get("_GLOBAL_ARTIFACT_DIR") + if global_artifact_dir is not None: + _set_tls_artifact_dir(global_artifact_dir) + return global_artifact_dir + else: + raise ValueError("No ARTIFACT_DIR available to copy to thread") from None + def _set_artifact_dir(value): - global ARTIFACT_DIR - ARTIFACT_DIR = value + global _GLOBAL_ARTIFACT_DIR + _GLOBAL_ARTIFACT_DIR = value def reset_artifact_dir(): """Reset ARTIFACT_DIR to its original BASE_ARTIFACT_DIR value.""" - global ARTIFACT_DIR + global _GLOBAL_ARTIFACT_DIR if BASE_ARTIFACT_DIR is not None: - ARTIFACT_DIR = BASE_ARTIFACT_DIR + _GLOBAL_ARTIFACT_DIR = BASE_ARTIFACT_DIR os.environ["ARTIFACT_DIR"] = str(BASE_ARTIFACT_DIR) @@ -51,41 +112,87 @@ def init(daily_artifact_dir=False): os.environ["FORGE_BASE_ARTIFACT_DIR"] = str(BASE_ARTIFACT_DIR) _set_artifact_dir(artifact_dir) + # Also set in thread-local storage for main thread + _set_tls_artifact_dir(artifact_dir) def NextArtifactDir(name, *, lock=None, counter_p=None): - if lock: - with lock: - next_count = counter_p[0] - counter_p[0] += 1 - else: - next_count = next_artifact_index() + # Use global lock to ensure sequential numbering in parallel execution + with _artifact_dir_lock: + if lock: + with lock: + next_count = counter_p[0] + counter_p[0] += 1 + else: + next_count = next_artifact_index() + + # Use thread-local ARTIFACT_DIR for directory creation + current_artifact_dir = None + try: + current_artifact_dir = _tls_artifact_dir.val + except AttributeError: + # Fallback to global if thread-local not set + current_artifact_dir = globals().get("_GLOBAL_ARTIFACT_DIR") - dirname = ARTIFACT_DIR / f"{next_count:03d}__{name}" + if current_artifact_dir is None: + raise ValueError("ARTIFACT_DIR not set in either thread-local or global scope") - return TempArtifactDir(dirname) + dirname = current_artifact_dir / f"{next_count:03d}__{name}" + + # Create the TempArtifactDir which will mkdir in __init__ + return TempArtifactDir(dirname) class TempArtifactDir: def __init__(self, dirname): self.dirname = pathlib.Path(dirname) self.previous_dirname = None - - def __enter__(self): - self.previous_dirname = ARTIFACT_DIR - os.environ["ARTIFACT_DIR"] = str(self.dirname) + # Create directory immediately to ensure proper numbering sequence self.dirname.mkdir(exist_ok=True) - _set_artifact_dir(self.dirname) + def __enter__(self): + # Store current thread-local ARTIFACT_DIR + try: + self.previous_dirname = _tls_artifact_dir.val + except AttributeError: + # Fallback to global if thread-local not set + self.previous_dirname = globals().get("_GLOBAL_ARTIFACT_DIR") + + # Only update environment variable in main thread to avoid parallel conflicts + if threading.current_thread() == threading.main_thread(): + os.environ["ARTIFACT_DIR"] = str(self.dirname) + # Set global for main thread compatibility + _set_artifact_dir(self.dirname) + + # Always set thread-local (each thread gets its own) + # Note: directory is already created in __init__ + _set_tls_artifact_dir(self.dirname) return True def __exit__(self, ex_type, ex_value, exc_traceback): - os.environ["ARTIFACT_DIR"] = str(self.previous_dirname) - _set_artifact_dir(self.previous_dirname) + # Only restore environment variable in main thread to avoid parallel conflicts + if threading.current_thread() == threading.main_thread(): + os.environ["ARTIFACT_DIR"] = str(self.previous_dirname) + # Restore global for main thread compatibility + _set_artifact_dir(self.previous_dirname) + + # Always restore thread-local (each thread manages its own) + _set_tls_artifact_dir(self.previous_dirname) return False # If we returned True here, any exception would be suppressed! def next_artifact_index(): - return len(list(ARTIFACT_DIR.glob("*__*"))) + # Use thread-local ARTIFACT_DIR for counting + current_artifact_dir = None + try: + current_artifact_dir = _tls_artifact_dir.val + except AttributeError: + # Fallback to global if thread-local not set + current_artifact_dir = globals().get("_GLOBAL_ARTIFACT_DIR") + + if current_artifact_dir is None: + return 0 + + return len(list(current_artifact_dir.glob("*__*"))) diff --git a/projects/core/library/export.py b/projects/core/library/export.py index 67804d17..101ebc23 100644 --- a/projects/core/library/export.py +++ b/projects/core/library/export.py @@ -8,6 +8,7 @@ from __future__ import annotations import logging +import os from pathlib import Path import click @@ -15,22 +16,90 @@ from projects.caliper.orchestration.export import run_from_orchestration_config from projects.core.library import ci as ci_lib -from projects.core.library import config +from projects.core.library import config, run logger = logging.getLogger(__name__) +def _update_fjob_export_status(status: dict): + """Update FournosJob status with export artifacts status.""" + if os.environ.get("FOURNOS_CI") != "true": + return + + # Unset KUBECONFIG to use the pod SA access + original_kubeconfig = os.environ.get("KUBECONFIG") + if "KUBECONFIG" in os.environ: + del os.environ["KUBECONFIG"] + + try: + import json + + fjob_name = os.environ["FJOB_NAME"] + namespace = os.environ["FOURNOS_NAMESPACE"] + + # Get current fjob status + get_cmd = f"oc get fjob/{fjob_name} -n {namespace} -ojson" + result = run.run(get_cmd, capture_stdout=True, check=False) + + if result.returncode != 0: + logger.warning(f"Failed to get fjob/{fjob_name}") + return + + fjob_data = json.loads(result.stdout) + + # Initialize status.engine.status if it doesn't exist + if "status" not in fjob_data: + fjob_data["status"] = {} + if "engineStatus" not in fjob_data["status"]: + fjob_data["status"]["engineStatus"] = {} + if "status" not in fjob_data["status"]["engineStatus"]: + fjob_data["status"]["engineStatus"]["status"] = {} + + # Update with export-artifacts status + fjob_data["status"]["engineStatus"]["export-artifacts"] = status + + # Patch the fjob + patch_data = {"status": fjob_data["status"]} + patch_cmd = f"oc patch fjob/{fjob_name} -n {namespace} --type=merge --subresource=status -p '{json.dumps(patch_data)}'" + + patch_result = run.run(patch_cmd, check=False) + if patch_result.returncode == 0: + logger.info(f"Updated fjob/{fjob_name} status with export artifacts status") + else: + logger.warning(f"Failed to update fjob status: {patch_cmd}") + + except Exception as e: + logger.warning(f"Failed to update fjob status: {e}") + finally: + # Restore KUBECONFIG if it was set + if original_kubeconfig is not None: + os.environ["KUBECONFIG"] = original_kubeconfig + + def run_caliper_orchestration_export(*, artifact_directory: Path | None): """Set optional ``caliper.export.from`` and run orchestration export.""" + + if artifact_directory is None and "ARTIFACT_BASE_DIR" in os.environ: + artifact_directory = os.environ["ARTIFACT_BASE_DIR"] + if artifact_directory is not None: config.project.set_config("caliper.export.from", str(artifact_directory)) + # Use FJOB_NAME as fallback for mlflow run_name if not configured + run_name = config.project.get_config( + "caliper.export.backend.mlflow.config.run_name", None, print=False, warn=False + ) + if run_name is None and "FJOB_NAME" in os.environ: + config.project.set_config( + "caliper.export.backend.mlflow.config.run_name", os.environ["FJOB_NAME"], print=False + ) + caliper_cfg = config.project.get_config("caliper", print=False) return run_from_orchestration_config(caliper_cfg) -@click.command("export") +@click.command("export-artifacts") @click.option( "--artifact-directory", "artifact_directory", @@ -45,4 +114,8 @@ def caliper_export_command(_ctx, artifact_directory: Path | None): status = run_caliper_orchestration_export(artifact_directory=artifact_directory) logger.info("Export status:\n" + yaml.dump(status, indent=4)) + + # Update fjob status with export results + _update_fjob_export_status(status) + return 0 diff --git a/projects/core/library/run_parallel.py b/projects/core/library/run_parallel.py new file mode 100644 index 00000000..4b729831 --- /dev/null +++ b/projects/core/library/run_parallel.py @@ -0,0 +1,148 @@ +import logging +import os +import time +import traceback +from concurrent.futures import ThreadPoolExecutor, as_completed + +from . import env + +logger = logging.getLogger(__name__) + + +class Parallel: + """ + Context manager for parallel execution of tasks with immediate cancellation on failure. + + Uses ThreadPoolExecutor for multithreaded execution with thread-safe artifact directory handling. + When any task fails, all remaining tasks are immediately cancelled rather than waiting + for completion. + + Each parallel task runs with its own dedicated artifact directory accessible via env.ARTIFACT_DIR, + allowing tasks to write artifacts without conflicts. The dedicated directory is created as: + env.ARTIFACT_DIR / f"{next_count:03d}__{name}" + + Usage: + with run_parallel.Parallel("task_name") as parallel: + parallel.delayed(function1, arg1, arg2) + parallel.delayed(function2, arg3, kwarg=value) + # Tasks execute in parallel when exiting the context + # Each task can access its dedicated directory via env.ARTIFACT_DIR + """ + + def __init__(self, name, exit_on_exception=True, dedicated_dir=True): + """ + Initialize parallel execution context. + + Args: + name: Name for the parallel execution (used for artifact directory) + exit_on_exception: If True, kill process group on exception + dedicated_dir: If True, create dedicated artifact directory for this parallel execution + """ + self.name = name + self.parallel_tasks = None + self.exit_on_exception = exit_on_exception + self.dedicated_dir = dedicated_dir + + def __enter__(self): + """Enter the parallel context.""" + self.parallel_tasks = [] + return self + + def delayed(self, function, *args, **kwargs): + """ + Add a function to be executed in parallel. + + Args: + function: Function to execute + *args: Positional arguments for the function + **kwargs: Keyword arguments for the function + """ + # Simple task container - no joblib needed + from collections import namedtuple + + DelayedTask = namedtuple("DelayedTask", ["func", "args", "keywords"]) + task = DelayedTask(func=function, args=args, keywords=kwargs) + self.parallel_tasks.append(task) + + def __exit__(self, ex_type, ex_value, exc_traceback): + """Execute all delayed tasks in parallel with immediate cancellation on failure.""" + if ex_value: + logger.warning( + f"An exception occured while preparing the '{self.name}' Parallel execution ..." + ) + return False + + if self.dedicated_dir: + # Create dedicated directory without modifying global ARTIFACT_DIR + # to avoid race conditions in multithreaded execution. + # Each parallel task will inherit the current ARTIFACT_DIR context. + next_count = env.next_artifact_index() + parallel_dir = env.ARTIFACT_DIR / f"{next_count:03d}__{self.name}" + parallel_dir.mkdir(exist_ok=True) + logger.debug(f"Created parallel execution directory: {parallel_dir}") + else: + parallel_dir = None + + def _run_with_artifact_dir(func, artifact_dir, *args, **kwargs): + """Wrapper to run function with specific ARTIFACT_DIR for this thread.""" + if artifact_dir: + # Ensure thread has its own copy of ARTIFACT_DIR, then set it to dedicated directory + try: + original_artifact_dir = env.ensure_thread_artifact_dir() + except ValueError: + # No global ARTIFACT_DIR to inherit from + original_artifact_dir = None + + try: + env._set_tls_artifact_dir(artifact_dir) + return func(*args, **kwargs) + finally: + if original_artifact_dir is not None: + env._set_tls_artifact_dir(original_artifact_dir) + else: + # No dedicated directory, ensure thread has its own copy of main ARTIFACT_DIR + try: + env.ensure_thread_artifact_dir() + except ValueError: + pass # Continue without thread-local copy + return func(*args, **kwargs) + + # Use ThreadPoolExecutor for better cancellation control + max_workers = min(len(self.parallel_tasks), os.cpu_count() or 1) + + with ThreadPoolExecutor(max_workers=max_workers) as executor: + # Submit all tasks with artifact directory context + futures = [] + for delayed_func in self.parallel_tasks: + # Wrap each function to run with the dedicated artifact directory + future = executor.submit( + _run_with_artifact_dir, + delayed_func.func, + parallel_dir, + *delayed_func.args, + **delayed_func.keywords, + ) + futures.append(future) + + try: + # Wait for all tasks to complete + for future in as_completed(futures): + future.result() # This will raise any exception that occurred + + except Exception as e: + # Cancel all remaining tasks immediately + logger.error("Exception in parallel task, cancelling remaining tasks...") + for future in futures: + future.cancel() + + # Give a short time for cancellation to take effect + time.sleep(0.1) + + if not self.exit_on_exception: + raise e + + traceback.print_exc() + logger.error(f"Exception caught during the '{self.name}' Parallel execution.") + raise SystemExit(1) from e + + return False # If we returned True here, any exception would be suppressed! diff --git a/projects/core/notifications/send.py b/projects/core/notifications/send.py index f3b503c3..48911826 100644 --- a/projects/core/notifications/send.py +++ b/projects/core/notifications/send.py @@ -185,10 +185,22 @@ def get_common_message(finish_reason: str, status: str, get_link, get_italics, g ``` {f.read().strip()} ``` +""" + elif ( + var_over := pathlib.Path(os.environ.get("ARTIFACT_DIR", "")) + / "000__ci_metadata" + / "variable_overrides.yaml" + ).exists(): + with open(var_over) as f: + message += f""" +{get_bold("Test configuration")}: +``` +{f.read().strip()} +``` """ else: message += """ -• No test configuration (`variable_overrides.yaml`) available. +• No test configuration (`variable_overrides.yaml/pr_config.txt`) available. """ if (failures := pathlib.Path(os.environ.get("ARTIFACT_DIR", "")) / "FAILURES").exists(): diff --git a/projects/fournos_launcher/orchestration/config.yaml b/projects/fournos_launcher/orchestration/config.yaml index cd807b25..4ff135ed 100644 --- a/projects/fournos_launcher/orchestration/config.yaml +++ b/projects/fournos_launcher/orchestration/config.yaml @@ -10,6 +10,8 @@ ci_job: overrides: {} # don't touch this one directly, the config module will populate it extra_overrides: {} +fournos_launcher: + parallel_jobs: {} fournos: namespace: psap-automation diff --git a/projects/fournos_launcher/orchestration/job_management.py b/projects/fournos_launcher/orchestration/job_management.py index 82631b18..a82fe7b9 100644 --- a/projects/fournos_launcher/orchestration/job_management.py +++ b/projects/fournos_launcher/orchestration/job_management.py @@ -69,6 +69,7 @@ def shutdown_running_fjobs(): except Exception as e: logger.error(f"Exception while shutting down FournosJobs: {e}") + raise def shutdown_fjobs_on_interrupt(): @@ -81,3 +82,4 @@ def shutdown_fjobs_on_interrupt(): shutdown_running_fjobs() except Exception as e: logger.error(f"Failed to shutdown FournosJobs: {e}") + raise diff --git a/projects/fournos_launcher/orchestration/pr_args.py b/projects/fournos_launcher/orchestration/pr_args.py index 72d534b7..a4485c55 100644 --- a/projects/fournos_launcher/orchestration/pr_args.py +++ b/projects/fournos_launcher/orchestration/pr_args.py @@ -49,6 +49,11 @@ def get_supported_fournos_directives() -> dict[str, str]: Example: /gpu h100 4 /gpu a100 8 Effect: Sets fournos.job.hardware.gpu_type and fournos.job.hardware.gpu_count.""", + "/parallel": """Set parallel job presets for specific indices. + Format: /parallel idx preset1 preset2 preset3... + Example: /parallel 1 gpu-basic cpu-intensive + /parallel 2 memory-heavy network-test + Effect: Sets fournos_launcher.parallel_jobs[idx] = [preset1, preset2, ...]""", "/help": """Show all supported FOURNOS directives. Format: /help Effect: Logs available directive information.""", @@ -196,6 +201,59 @@ def handle_help_directive(line: str) -> dict[str, str]: return result +def handle_parallel_directive(line: str) -> dict[str, str]: + """ + Handle /parallel directive for setting parallel job presets by index. + + Format: /parallel idx preset1 preset2 preset3... + + Args: + line: The directive line + + Returns: + Dictionary with parallel_jobs configuration + + Raises: + ValueError: If format is invalid or values are missing + """ + parallel_content = line.removeprefix("/parallel ").strip() + + if not parallel_content: + raise ValueError(f"Invalid /parallel directive: parameters cannot be empty in '{line}'") + + parts = parallel_content.split() + if len(parts) < 2: + raise ValueError( + f"Invalid /parallel directive: format must be '/parallel idx preset1 [preset2...]' in '{line}'" + ) + + idx_str = parts[0].strip() + presets = [p.strip() for p in parts[1:] if p.strip()] + + if not idx_str: + raise ValueError(f"Invalid /parallel directive: index cannot be empty in '{line}'") + + if not presets: + raise ValueError( + f"Invalid /parallel directive: at least one preset must be specified in '{line}'" + ) + + try: + idx = int(idx_str) + if idx < 0: + raise ValueError( + f"Invalid /parallel directive: index must be non-negative, got {idx} in '{line}'" + ) + except ValueError as e: + if "non-negative" in str(e): + raise + raise ValueError( + f"Invalid /parallel directive: index must be a number, got '{idx_str}' in '{line}'" + ) from None + + return {f"fournos_launcher.parallel_jobs.{idx}": presets} + + def get_fournos_directive_handlers() -> dict[str, callable]: """ Get a mapping of FOURNOS directive prefixes to their handler functions. @@ -208,6 +266,7 @@ def get_fournos_directive_handlers() -> dict[str, callable]: "/exclusive": handle_exclusive_directive, "/pipeline": handle_pipeline_directive, "/gpu": handle_gpu_directive, + "/parallel": handle_parallel_directive, "/help": handle_help_directive, } diff --git a/projects/fournos_launcher/orchestration/submit.py b/projects/fournos_launcher/orchestration/submit.py index 0862c9f5..2611f343 100644 --- a/projects/fournos_launcher/orchestration/submit.py +++ b/projects/fournos_launcher/orchestration/submit.py @@ -2,9 +2,17 @@ import os import pathlib import signal +import sys +import threading +import traceback +from datetime import datetime from projects.core.library import config, env, run, vault +from projects.core.library.run_parallel import Parallel from projects.fournos_launcher.orchestration import job_management, pr_args +from projects.fournos_launcher.toolbox.cleanup_fjob.main import ( + run as cleanup_fjob, +) from projects.fournos_launcher.toolbox.submit_and_wait.main import ( run as submit_and_wait, ) @@ -17,7 +25,7 @@ def _signal_handler_sigint(sig, frame): print("\n🚫 FOURNOS launcher received SIGINT - shutting down jobs...") env.reset_artifact_dir() job_management.shutdown_fjobs_on_interrupt() - # Don't call sys.exit here - let the original handler handle it + sys.exit(137) def _signal_handler_sigterm(sig, frame): @@ -25,7 +33,7 @@ def _signal_handler_sigterm(sig, frame): print("\nšŸ›‘ FOURNOS launcher received SIGTERM - shutting down jobs...") env.reset_artifact_dir() job_management.shutdown_fjobs_on_interrupt() - # Don't call sys.exit here - let the original handler handle it + sys.exit(143) def _setup_signal_handlers(): @@ -120,21 +128,142 @@ def submit_job(): f"or both must be null. Got gpu_count={gpu_count}, gpu_type={gpu_type}" ) - submit_and_wait( - cluster_name=cluster_name, - project=config.project.get_config("ci_job.project"), - args=config.project.get_config("ci_job.args"), - variables_overrides=overrides, - namespace=config.project.get_config("fournos.namespace"), - owner=config.project.get_config("fournos.job.owner"), - display_name=config.project.get_config("fournos.job.display_name"), - pipeline_name=config.project.get_config("fournos.job.pipeline_name"), - env=env_dict, - status_dest=env.ARTIFACT_DIR, - ci_label=config.project.get_config("fournos.job.ci_label"), - exclusive=config.project.get_config("fournos.job.exclusive"), - gpu_count=gpu_count, - gpu_type=gpu_type, - ) + # Check if parallel jobs are configured + parallel_jobs = config.project.get_config("fournos_launcher.parallel_jobs", {}, print=False) + parallel_job_configs = [] + + for idx, job_args_list in parallel_jobs.items(): + if job_args_list: # Non-empty list + parallel_job_configs.append((idx, job_args_list)) + + # Prepare common submit_and_wait arguments + submit_kwargs = { + "cluster_name": cluster_name, + "project": config.project.get_config("ci_job.project"), + "variables_overrides": overrides, + "namespace": config.project.get_config("fournos.namespace"), + "owner": config.project.get_config("fournos.job.owner"), + "pipeline_name": config.project.get_config("fournos.job.pipeline_name"), + "env": env_dict, + "ci_label": config.project.get_config("fournos.job.ci_label"), + "exclusive": config.project.get_config("fournos.job.exclusive"), + "gpu_count": gpu_count, + "gpu_type": gpu_type, + } + + if parallel_job_configs: + logger.info( + f"Found {len(parallel_job_configs)} parallel job configurations, launching in parallel" + ) + + # Generate timestamp for parallel job names (shared across all parallel jobs) + timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") + raw_name = f"forge-{project_name}-{timestamp}" + + # Track failures and job names across parallel jobs + failure_lock = threading.Lock() + has_failures = [False] # Use list for mutable reference + submitted_job_names = [] + submitted_job_lock = threading.Lock() + + def submit_parallel_job(job_index, job_args_list): + """Submit a single parallel job with specific args.""" + # Combine base ci_job.args with parallel job-specific args + base_args = config.project.get_config("ci_job.args") + combined_args = base_args + job_args_list + + logger.info(f"Submitting parallel job {job_index} with args: {combined_args}") + + # Create display name consistent with single job format + args_str = " ".join(combined_args) + parallel_display_name = f"{project_name} {args_str} (job {job_index})".strip() + + # Create unique job name with timestamp and index + unique_job_name = f"{raw_name}-{job_index}" + + try: + # Track the job name for cleanup (job gets submitted even if waiting fails) + with submitted_job_lock: + submitted_job_names.append(unique_job_name) + + # Create job-specific status directory + job_status_dest = env.ARTIFACT_DIR / unique_job_name + job_status_dest.mkdir(parents=True, exist_ok=True) + + submit_and_wait( + **submit_kwargs, + args=combined_args, + display_name=parallel_display_name, + job_name=unique_job_name, + artifact_dirname_suffix=str(job_index), + status_dest=job_status_dest, + ) + logger.info(f"Parallel job {job_index} completed successfully") + except Exception as e: + logger.error(f"Parallel job {job_index} failed: {e}") + traceback.print_exc() + + # Register failure in thread-safe way + with failure_lock: + has_failures[0] = True + + # Submit all parallel jobs with exit_on_exception=False to let others complete + with Parallel("parallel_jobs", exit_on_exception=False) as parallel: + for job_index, job_args_list in parallel_job_configs: + parallel.delayed(submit_parallel_job, job_index, job_args_list) + + # Cleanup all submitted jobs + if submitted_job_names: + logger.info( + f"Cleaning up {len(submitted_job_names)} parallel jobs: {', '.join(submitted_job_names)}" + ) + for job_name in submitted_job_names: + try: + cleanup_fjob( + job_name=job_name, + namespace=config.project.get_config("fournos.namespace"), + ) + logger.info(f"Cleaned up job: {job_name}") + except Exception as cleanup_e: + logger.warning(f"Failed to cleanup job {job_name}: {cleanup_e}") + + # Check if any jobs failed + if has_failures[0]: + logger.error("One or more parallel jobs failed") + return 1 + else: + logger.info("All parallel jobs completed successfully") + return 0 + else: + # No parallel jobs configured, run single job as before + logger.info("No parallel jobs configured, running single job") + + # Generate unique job name for single job + timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") + single_job_name = f"forge-{project_name}-{timestamp}" + + try: + submit_and_wait( + **submit_kwargs, + args=config.project.get_config("ci_job.args"), + display_name=config.project.get_config("fournos.job.display_name"), + job_name=single_job_name, + status_dest=env.ARTIFACT_DIR, + ) + logger.info("Single job completed successfully") + return_code = 0 + except Exception as e: + logger.error(f"Single job failed: {e}") + return_code = 1 + + # Cleanup the job + try: + cleanup_fjob( + job_name=single_job_name, + namespace=config.project.get_config("fournos.namespace"), + ) + logger.info(f"Cleaned up job: {single_job_name}") + except Exception as cleanup_e: + logger.warning(f"Failed to cleanup job {single_job_name}: {cleanup_e}") - return 0 + return return_code diff --git a/projects/fournos_launcher/toolbox/cleanup_fjob/main.py b/projects/fournos_launcher/toolbox/cleanup_fjob/main.py new file mode 100644 index 00000000..4a7691c2 --- /dev/null +++ b/projects/fournos_launcher/toolbox/cleanup_fjob/main.py @@ -0,0 +1,129 @@ +#!/usr/bin/env python3 + +""" +FOURNOS job cleanup using task-based DSL +Cleans up a specific FOURNOS job +""" + +import logging + +from projects.core.dsl import ( + entrypoint, + execute_tasks, + shell, + task, +) + +logger = logging.getLogger(__name__) + + +@entrypoint +def run( + job_name: str, + namespace: str = "fournos-jobs", + skip_shutdown_check: bool = False, +): + """ + Clean up a FOURNOS job + + Args: + job_name: Name of the FOURNOS job to cleanup + namespace: Kubernetes namespace where the job is located (default: "fournos-jobs") + skip_shutdown_check: If True, skip checking for shutdown state and force cleanup (default: False) + + Examples: + # Basic cleanup: + run(job_name="forge-llm-d-20241203-143022") + + # Force cleanup regardless of shutdown state: + run(job_name="forge-llm-d-20241203-143022", skip_shutdown_check=True) + """ + # Execute all registered tasks in order + return execute_tasks(locals()) + + +@task +def validate_inputs(args, ctx): + """Validate input parameters""" + + if not args.job_name: + raise ValueError("job_name is required") + + if not args.namespace: + raise ValueError("namespace is required") + + return "Inputs validated" + + +@task +def ensure_oc(args, ctx): + """Ensure oc is available and connected""" + + shell.run("which oc || (echo 'oc not found in PATH' && exit 1)") + shell.run("oc whoami") + + +@task +def check_job_exists(args, ctx): + """Check if the job exists""" + + result = shell.run( + f"oc get fournosjob {args.job_name} -n {args.namespace}", + check=False, + ) + + if not result.success: + if "not found" in result.stderr.lower(): + ctx.job_exists = False + return f"Job {args.job_name} not found - nothing to cleanup" + else: + raise RuntimeError(f"Failed to check job existence: {result.stderr}") + + ctx.job_exists = True + return f"Job {args.job_name} exists" + + +@task +def check_shutdown_state(args, ctx): + """Check if shutdown has been requested""" + + if not ctx.job_exists: + return "Job doesn't exist - skipping shutdown check" + + if args.skip_shutdown_check: + return "Shutdown check skipped by request" + + shutdown_result = shell.run( + f'oc get fournosjob {args.job_name} -n {args.namespace} -o jsonpath="{{.spec.shutdown}}"', + check=False, + ) + + if shutdown_result.success and shutdown_result.stdout.strip(): + shutdown_value = shutdown_result.stdout.strip() + ctx.should_preserve = True + ctx.shutdown_reason = shutdown_value + return f"Job has spec.shutdown={shutdown_value} - should be preserved to let it export artifacts" + + ctx.should_preserve = False + return "No shutdown detected - safe to cleanup" + + +@task +def cleanup_fjob(args, ctx): + """Clean up the FOURNOS job""" + + if not ctx.job_exists: + return "Job doesn't exist - nothing to cleanup" + + if ctx.should_preserve: + return f"Job preserved due to shutdown state: {ctx.shutdown_reason}" + + shell.run( + f"oc delete fournosjob {args.job_name} -n {args.namespace} --ignore-not-found", + ) + + return f"Successfully cleaned up job: {args.job_name}" + + +if __name__ == "__main__": + run.main() diff --git a/projects/fournos_launcher/toolbox/shutdown_fjobs/main.py b/projects/fournos_launcher/toolbox/shutdown_fjobs/main.py index b03b6645..7e7fc22e 100755 --- a/projects/fournos_launcher/toolbox/shutdown_fjobs/main.py +++ b/projects/fournos_launcher/toolbox/shutdown_fjobs/main.py @@ -10,15 +10,16 @@ import shlex from projects.core.dsl import ( + entrypoint, execute_tasks, shell, task, - toolbox, ) logger = logging.getLogger(__name__) +@entrypoint def run( namespace: str = "psap-automation", ci_label: str = None, @@ -231,9 +232,5 @@ def capture_final_status(args, ctx): return f"Captured status for {len(ctx.job_names)} jobs in artifacts/ directory" -# Create the main function using the toolbox library -main = toolbox.create_toolbox_main(run) - - if __name__ == "__main__": - main() + run.main() diff --git a/projects/fournos_launcher/toolbox/submit_and_wait/main.py b/projects/fournos_launcher/toolbox/submit_and_wait/main.py index ce3942f2..6693ef42 100755 --- a/projects/fournos_launcher/toolbox/submit_and_wait/main.py +++ b/projects/fournos_launcher/toolbox/submit_and_wait/main.py @@ -11,18 +11,20 @@ from projects.core.dsl import ( always, + entrypoint, execute_tasks, retry, shell, task, template, - toolbox, ) from projects.core.dsl.utils.k8s import sanitize_k8s_name +from projects.core.library import env as env_mod logger = logging.getLogger(__name__) +@entrypoint def run( cluster_name: str, project: str, @@ -86,7 +88,7 @@ def run( if env is None: env = {} if status_dest is None: - status_dest = env.ARTIFACT_DIR / "artifacts" + status_dest = env_mod.ARTIFACT_DIR / "artifacts" else: status_dest = Path(status_dest) if not status_dest.exists(): @@ -353,35 +355,5 @@ def capture_pod_specs(args, ctx): return f"Wrote the pod spec file under {artifact_dir} (label {label})" -@always -@task -def cleanup_job(args, ctx): - """Clean up the job object""" - - # Guard: Check if job name was generated (might not exist if early validation failed) - if not hasattr(ctx, "final_job_name"): - return "No job name available - skipping job cleanup" - - # Check if shutdown has been requested - if so, preserve the job to let it export its artifacts - shutdown_result = shell.run( - f'oc get fournosjob {ctx.final_job_name} -n {args.namespace} -o jsonpath="{{.spec.shutdown}}"', - check=False, - ) - - if shutdown_result.success and shutdown_result.stdout.strip(): - shutdown_value = shutdown_result.stdout.strip() - return f"Job has spec.shutdown={shutdown_value} - preserving it to let it export its artifacts, skipping deletion" - - shell.run( - f"oc delete fournosjob {ctx.final_job_name} -n {args.namespace} --ignore-not-found", - ) - - return "Job cleanup completed" - - -# Create the main function using the toolbox library -main = toolbox.create_toolbox_main(run) - - if __name__ == "__main__": - main() + run.main() diff --git a/projects/llm_d_legacy/orchestration/ci.py b/projects/llm_d_legacy/orchestration/ci.py index 0577f46b..cbd73f8b 100755 --- a/projects/llm_d_legacy/orchestration/ci.py +++ b/projects/llm_d_legacy/orchestration/ci.py @@ -11,7 +11,9 @@ import click +from projects.core.ci_entrypoint.fournos_resolve import create_fournos_resolve_command from projects.core.library import env +from projects.core.library import config as forge_config from projects.core.library import ci as ci_lib from projects.core.library.export import caliper_export_command from projects.legacy.library import config @@ -21,17 +23,6 @@ logger = logging.getLogger(__name__) - -def _caliper_export_at_end() -> int: - """Set ``caliper.export.from`` to ``env.BASE_ARTIFACT_DIR`` and run orchestration export.""" - root = env.BASE_ARTIFACT_DIR - config.project.set_config("caliper.export.from", str(root), print=False) - caliper = config.project.get_config("caliper", print=False) - status = run_from_orchestration_config(caliper) - logger.info("Caliper export: %s", status) - return 0 - - # Add the testing directory to path for imports testing_dir = Path(__file__).parent.parent / "testing" if str(testing_dir) not in sys.path: @@ -60,14 +51,19 @@ def main(ctx): """Jump CI Project CI Operations for TOPSAIL-NG.""" ctx.ensure_object(dict) - +inited = False def init(): + global inited + if inited: + return + inited = True + env.init() legacy_env.init() testing_dir = Path(__file__).parent.parent / "testing" config.init(testing_dir) - + forge_config.init(testing_dir) presets = config.project.get_config("project.args") for preset in presets: config.project.apply_preset(preset) @@ -84,22 +80,50 @@ def test(ctx): """Test phase - Trigger the project's test method.""" log("Starting test phase...") - failed = True - try: - init() - failed = test_llmd.test() - finally: - logger.info("[llm-d] running the Caliper export") - try: - _caliper_export_at_end() - except Exception: - logger.exception("Caliper export failed") - failed = True + init() + failed = test_llmd.test() sys.exit(1 if failed else 0) main.add_command(caliper_export_command) +def resolve_hardware_request(hardware_spec: dict): + """ + Resolve hardware requirements for FournosJob based on skeleton project configuration. + + This is a stub implementation. Update spec.hardware based on project configuration. + + Args: + hardware_spec: The current spec.hardware dict from the FournosJob. This object should be updated. + + """ + init() + + logger.info("Hardware resolution: stub implementation - no changes made") + + # Stub implementation - could be extended to: + # - Read hardware config from project config + # - Set hardware requirements based on workload needs + # - Handle different hardware profiles (GPU, CPU, memory requirements) + # - Example: return {"gpu": {"type": "nvidia-tesla-v100", "count": 1}, "memory": "32Gi"} + + hardware_spec["gpuType"] = "h200" + hardware_spec["gpuCount"] = 4 + + return hardware_spec + +def list_vaults(): + init() + return config.project.get_config("vaults") + +main.add_command( + create_fournos_resolve_command( + vault_list_func=list_vaults, + hardware_resolver_func=resolve_hardware_request, + ) +) + + if __name__ == "__main__": main() diff --git a/projects/llm_d_legacy/testing/config.yaml b/projects/llm_d_legacy/testing/config.yaml index dd5371b8..1bb657ad 100644 --- a/projects/llm_d_legacy/testing/config.yaml +++ b/projects/llm_d_legacy/testing/config.yaml @@ -2,6 +2,17 @@ vaults: - psap-forge-notifications - psap-forge-mlflow-export +project: + name: null + args: [] +ci_job: + name: null + fjob: null + cluster: null + exclusive: null + hardware: null + owner: null + project: name: null args: [] @@ -14,109 +25,6 @@ ci_presets: local_config: null to_apply: [] - # Preset that automatically scales up GPU nodes if none exist, and scales down at the end - scale_up: - prepare.cluster.nodes.auto_scale: true - prepare.cluster.nodes.auto_scale_down_on_exit: true - - dev: - matbench.enabled: false - tests.llmd.inference_service.model: facebook-opt-125m - - opt-125m: - tests.llmd.inference_service.model: facebook-opt-125m - tests.llmd.inference_service.vllm_args[5]: "--max-model-len=2048" - - psap_h200: - extends: [llama-70b] - tests.capture_prom: false - tests.capture_prom_uwm: false - tests.llmd.skip_prepare: true - prepare.namespace.name: kpouget-dev - prepare.cluster.skip: true - tests.llmd.inference_service.gateway.name: gateway-internal - - llmisvc_pd: - tests.llmd.flavors: [pd-x2-ptp1-px4-dtp4] - - pvc_rwx: - prepare.pvc.name: storage-rwx - prepare.pvc.access_mode: ReadWriteMany - - azure: - security.run_as_root: true - prepare.preload.skip: true - prepare.operators.skip: true - prepare.cluster.skip: true - prepare.rhoai.skip: true - - tests.capture_prom: false - tests.capture_prom_uwm: false - tests.llmd.skip_prepare: false - - azure_light: - extends: [azure, opt-125m] - prepare.pvc.storage_class: managed-csi - - cks: - extends: [pvc_rwx, llama-70b] - - tests.capture_prom: false - tests.capture_prom_uwm: false - tests.llmd.inference_service.metrics.manual_capture: false - tests.llmd.inference_service.gateway.name: gateway-internal - tests.llmd.inference_service.image_pull_secrets: kpouget-pull-secret - - tests.llmd.skip_prepare: true - prepare.namespace.name: kpouget-dev - prepare.preload.node_selector_key: gpu.nvidia.com/class - prepare.preload.node_selector_value: "H200" - tests.llmd.inference_service.extra_properties: - spec.template.affinity: - nodeAffinity: - requiredDuringSchedulingIgnoredDuringExecution: - nodeSelectorTerms: - - matchExpressions: - - key: kubernetes.io/hostname - operator: NotIn - values: - - gf48e48 - - gf4334a - prepare.preload.extra_images: - vllm-cuda-rhel9: registry.redhat.io/rhaiis/vllm-cuda-rhel9@sha256:094db84a1da5e8a575d0c9eade114fa30f4a2061064a338e3e032f3578f8082a - llm-d-inference-scheduler: ghcr.io/opendatahub-io/rhaii-on-xks/llm-d-inference-scheduler:e6b5db0@sha256:43e8b8edc158f31535c8b23d77629f8cde111cc762a8f4ee5f2f884470566211 - guidellm: ghcr.io/vllm-project/guidellm:v0.5.4 - - baseline-flavors: - tests.llmd.flavors: [simple, simple-tp4, simple-tp2-x4] - - intelligentrouting-flavors: - tests.llmd.flavors: [intelligentrouting-tp2-x4] - - guidellm_light: - tests.llmd.benchmarks.guidellm.load_shape_name: lightweight - tests.llmd.benchmarks.guidellm.rate: "1,10,50" - tests.llmd.benchmarks.guidellm.args.data: prompt_tokens=256,output_tokens=128 - tests.llmd.benchmarks.guidellm.args.max_seconds: 30 - - guidellm_multiturn_eval: - tests.llmd.benchmarks.guidellm.load_shape_name: Multiturn - tests.llmd.benchmarks.guidellm.rate: [32, 64, 128, 256, 512] # keep as a list, multi-rate not supported by guidellm-multiturn - tests.llmd.benchmarks.guidellm.args.data: "prompt_tokens=128,output_tokens=128,turns=5,prefix_tokens=10000,prefix_count={2*rate}" - tests.llmd.benchmarks.guidellm.args.max_requests: "{10*rate}" - - guidellm_heterogeneous_eval: - tests.llmd.benchmarks.guidellm.load_shape_name: Heterogeneous - tests.llmd.benchmarks.guidellm.rate: "1,10,50,100,200,300" - tests.llmd.benchmarks.guidellm.args.data: prompt_tokens=8000,prompt_tokens_stdev=8500,prompt_tokens_min=50,prompt_tokens_max=30000,output_tokens=800,output_tokens_stdev=1500,output_tokens_min=20,output_tokens_max=8000 - tests.llmd.benchmarks.guidellm.args.max_seconds: 600 - - gpt-oss: - tests.llmd.inference_service.model: gpt-oss-120 - tests.llmd.benchmarks.guidellm.args.request_type: text_completions - - llama-70b: - tests.llmd.inference_service.model: llama3-3-70b clusters: cleanup_on_exit: false diff --git a/projects/llm_d_legacy/testing/presets.d/presets.yaml b/projects/llm_d_legacy/testing/presets.d/presets.yaml new file mode 100644 index 00000000..984e6126 --- /dev/null +++ b/projects/llm_d_legacy/testing/presets.d/presets.yaml @@ -0,0 +1,105 @@ +__multiple: true + +# Preset that automatically scales up GPU nodes if none exist, and scales down at the end +scale_up: + prepare.cluster.nodes.auto_scale: true + prepare.cluster.nodes.auto_scale_down_on_exit: true + +dev: + matbench.enabled: false + tests.llmd.inference_service.model: facebook-opt-125m + +opt-125m: + tests.llmd.inference_service.model: facebook-opt-125m + tests.llmd.inference_service.vllm_args[5]: "--max-model-len=2048" + +psap_h200: + extends: [llama-70b] + tests.capture_prom: false + tests.capture_prom_uwm: false + tests.llmd.skip_prepare: true + prepare.namespace.name: kpouget-dev + prepare.cluster.skip: true + tests.llmd.inference_service.gateway.name: gateway-internal + +llmisvc_pd: + tests.llmd.flavors: [pd-x2-ptp1-px4-dtp4] + +pvc_rwx: + prepare.pvc.name: storage-rwx + prepare.pvc.access_mode: ReadWriteMany + +azure: + security.run_as_root: true + prepare.preload.skip: true + prepare.operators.skip: true + prepare.cluster.skip: true + prepare.rhoai.skip: true + + tests.capture_prom: false + tests.capture_prom_uwm: false + tests.llmd.skip_prepare: false + +azure_light: + extends: [azure, opt-125m] + prepare.pvc.storage_class: managed-csi + +cks: + extends: [pvc_rwx, llama-70b] + + tests.capture_prom: false + tests.capture_prom_uwm: false + tests.llmd.inference_service.metrics.manual_capture: false + tests.llmd.inference_service.gateway.name: gateway-internal + tests.llmd.inference_service.image_pull_secrets: kpouget-pull-secret + + tests.llmd.skip_prepare: true + prepare.namespace.name: kpouget-dev + prepare.preload.node_selector_key: gpu.nvidia.com/class + prepare.preload.node_selector_value: "H200" + tests.llmd.inference_service.extra_properties: + spec.template.affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: kubernetes.io/hostname + operator: NotIn + values: + - gf48e48 + - gf4334a + prepare.preload.extra_images: + vllm-cuda-rhel9: registry.redhat.io/rhaiis/vllm-cuda-rhel9@sha256:094db84a1da5e8a575d0c9eade114fa30f4a2061064a338e3e032f3578f8082a + llm-d-inference-scheduler: ghcr.io/opendatahub-io/rhaii-on-xks/llm-d-inference-scheduler:e6b5db0@sha256:43e8b8edc158f31535c8b23d77629f8cde111cc762a8f4ee5f2f884470566211 + guidellm: ghcr.io/vllm-project/guidellm:v0.5.4 + +baseline-flavors: + tests.llmd.flavors: [simple, simple-tp4, simple-tp2-x4] + +intelligentrouting-flavors: + tests.llmd.flavors: [intelligentrouting-tp2-x4] + +guidellm_light: + tests.llmd.benchmarks.guidellm.load_shape_name: lightweight + tests.llmd.benchmarks.guidellm.rate: "1,10,50" + tests.llmd.benchmarks.guidellm.args.data: prompt_tokens=256,output_tokens=128 + tests.llmd.benchmarks.guidellm.args.max_seconds: 30 + +guidellm_multiturn_eval: + tests.llmd.benchmarks.guidellm.load_shape_name: Multiturn + tests.llmd.benchmarks.guidellm.rate: [32, 64, 128, 256, 512] # keep as a list, multi-rate not supported by guidellm-multiturn + tests.llmd.benchmarks.guidellm.args.data: "prompt_tokens=128,output_tokens=128,turns=5,prefix_tokens=10000,prefix_count={2*rate}" + tests.llmd.benchmarks.guidellm.args.max_requests: "{10*rate}" + +guidellm_heterogeneous_eval: + tests.llmd.benchmarks.guidellm.load_shape_name: Heterogeneous + tests.llmd.benchmarks.guidellm.rate: "1,10,50,100,200,300" + tests.llmd.benchmarks.guidellm.args.data: prompt_tokens=8000,prompt_tokens_stdev=8500,prompt_tokens_min=50,prompt_tokens_max=30000,output_tokens=800,output_tokens_stdev=1500,output_tokens_min=20,output_tokens_max=8000 + tests.llmd.benchmarks.guidellm.args.max_seconds: 600 + +gpt-oss: + tests.llmd.inference_service.model: gpt-oss-120 + tests.llmd.benchmarks.guidellm.args.request_type: text_completions + +llama-70b: + tests.llmd.inference_service.model: llama3-3-70b diff --git a/projects/skeleton/orchestration/ci.py b/projects/skeleton/orchestration/ci.py index 91bf25a6..9c152242 100755 --- a/projects/skeleton/orchestration/ci.py +++ b/projects/skeleton/orchestration/ci.py @@ -7,30 +7,18 @@ building your own projects. """ -import logging import types import click import prepare_skeleton import test_skeleton -from projects.caliper.orchestration.export import run_from_orchestration_config from projects.core.ci_entrypoint.fournos_resolve import create_fournos_resolve_command from projects.core.library import ci as ci_lib -from projects.core.library import config, env +from projects.core.library import config from projects.core.library.export import caliper_export_command -def _caliper_export_at_end() -> int: - """Set ``caliper.export.from`` to ``env.BASE_ARTIFACT_DIR`` and run orchestration export.""" - root = env.BASE_ARTIFACT_DIR - config.project.set_config("caliper.export.from", str(root), print=False) - caliper = config.project.get_config("caliper", print=False) - status = run_from_orchestration_config(caliper) - logging.info("Caliper export: %s", status) - return 0 - - @click.group() @click.pass_context @ci_lib.safe_ci_function @@ -57,10 +45,7 @@ def prepare(ctx): @ci_lib.safe_ci_command def test(ctx): """Test phase - Execute the main testing logic.""" - try: - return test_skeleton.test() - finally: - _caliper_export_at_end() + return test_skeleton.test() @main.command() @@ -72,6 +57,7 @@ def pre_cleanup(ctx): main.add_command(caliper_export_command) + main.add_command( create_fournos_resolve_command( vault_list_func=lambda: config.project.get_config("vaults"), diff --git a/projects/skeleton/orchestration/presets.d/presets.yaml b/projects/skeleton/orchestration/presets.d/presets.yaml index 453c4676..e49eca21 100644 --- a/projects/skeleton/orchestration/presets.d/presets.yaml +++ b/projects/skeleton/orchestration/presets.d/presets.yaml @@ -11,5 +11,8 @@ side_testing: quick_test: skeleton.test.duration_seconds: 10 # Quick 10 second test +medium_test: + skeleton.test.duration_seconds: 120 # Medium 2 minute test + long_test: - skeleton.test.duration_seconds: 600 # Long 10 minute test + skeleton.test.duration_seconds: 300 # Long 5 minute test diff --git a/projects/skeleton/toolbox/cluster_info/main.py b/projects/skeleton/toolbox/cluster_info/main.py index 44f4a838..c7ba4adb 100755 --- a/projects/skeleton/toolbox/cluster_info/main.py +++ b/projects/skeleton/toolbox/cluster_info/main.py @@ -8,13 +8,14 @@ """ from projects.core.dsl import ( + entrypoint, execute_tasks, shell, task, - toolbox, ) +@entrypoint def run(*, output_format: str = "text"): """ Gather basic cluster and environment information @@ -73,9 +74,5 @@ def get_cluster_nodes(args, ctx): ) -# Create the main function using the toolbox library -main = toolbox.create_toolbox_main(run) - - if __name__ == "__main__": - main() + run.main()