Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
36e0371
[core] library: env: add thread safe ARTIFACT_DIR
kpouget Apr 29, 2026
eb23d29
[core] library: run: add a multi-thread Parallel context
kpouget Apr 29, 2026
411428a
[core] dsl: make thread safe
kpouget Apr 29, 2026
7dad239
[core] dsl: runtime: add the ability to customize the command prefix/…
kpouget Apr 30, 2026
7c132a7
[core] library: config: allow creating top fields if they don't exist
kpouget Apr 30, 2026
eaa8d00
[core] ci_entrypoint: fournos_resolve: use the right FJOB_NAME name
kpouget Apr 30, 2026
c07959e
[core] library: export: give the correct subcommand name
kpouget Apr 30, 2026
f1df707
[core] library: export: export from ARTIFACT_BASE_DIR
kpouget Apr 30, 2026
af53ee3
[core] notifications: send: show pr_config.txt or variable_overrides.…
kpouget Apr 30, 2026
e3256db
[core] library: export: use the fjob name as run name
kpouget Apr 30, 2026
178a374
[fournos_launcher] orchestration: add parsing support for /parallel
kpouget Apr 29, 2026
20ffe97
[fournos_launcher] orchestration: submit: allow parallel launches
kpouget Apr 29, 2026
147497d
[fournos_launcher] toolbox: submit_and_wait: accept temporarily the a…
kpouget Apr 30, 2026
75e6c18
[llm_d_legacy] orchestration: ci: expose the resolve command
kpouget Apr 30, 2026
5d2a204
[skeleton] orchestration: ci: remove the explicit export after test
kpouget Apr 30, 2026
e402cde
[core] library: export: update the FJob.status.engineStatus
kpouget Apr 30, 2026
f4ef838
[fournos_launcher] toolbox: submit_and_wait: don't cleanup after the …
kpouget Apr 30, 2026
109bee4
[fournos_launcher] orchestration: submit: cleanup after all the tests
kpouget Apr 30, 2026
2208063
[fournos_launcher] toolbox: cleanup_fjob: cleanup job
kpouget Apr 30, 2026
bd79fee
[core] library: env: thread safe
kpouget Apr 30, 2026
0314955
[core] library: run: thread safe
kpouget Apr 30, 2026
60f17f0
[skeleton] orchestration: presets: update the preset names
kpouget Apr 30, 2026
da8b002
[core] ci_entrypoint: fournos: consume the fournos-prepare fjob
kpouget Apr 30, 2026
b56b35b
[llm_d_legacy] make it work with Forge launcher
kpouget Apr 30, 2026
fe14f38
[core] dsl: define the task ID as filename:lineno
kpouget May 4, 2026
d97d6dc
[core] library: config: move _create_first_parent_config_key to a ded…
kpouget May 4, 2026
2e4d685
[core] library: env: don't update ARTIFACT_DIR in the threads
kpouget May 4, 2026
ce8fabd
[core] library: export: use run.run
kpouget May 4, 2026
f89e4d3
[core] library: run_parallel: move the Parallel code to a dedicate file
kpouget May 4, 2026
d282d53
toolbox: simplify the entrypoint mechanism
kpouget May 4, 2026
1da5b7d
[fournos_launcher] orchestration: signal handlers
kpouget May 4, 2026
77fe447
[core] dsl: runtime: use a thread safe logging
kpouget May 4, 2026
6124bae
[core] library: env: make the ordering thread safe
kpouget May 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions projects/cluster/toolbox/build_image/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@

from projects.core.dsl import (
always,
entrypoint,
execute_tasks,
retry,
shell,
task,
template,
toolbox,
)

logger = logging.getLogger("TOOLBOX")
Expand Down Expand Up @@ -92,6 +92,7 @@ def _capture_all_container_logs(buildrun_name: str, namespace: str, artifact_dir
logger.debug(f"Captured init container logs for '{container_name}' to {log_file}")


@entrypoint
def run(
repo_name: str,
commit: str,
Expand Down Expand Up @@ -327,9 +328,5 @@ def generate_build_summary(args, ctx):
logger.info(f"Target Image: {getattr(ctx, 'image_tag', 'unknown')}")


# Create the main function using the toolbox library
main = toolbox.create_toolbox_main(run)


if __name__ == "__main__":
main()
run.main()
9 changes: 3 additions & 6 deletions projects/cluster/toolbox/rebuild_image/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@

from projects.core.dsl import (
always,
entrypoint,
execute_tasks,
retry,
shell,
task,
template,
toolbox,
)

logger = logging.getLogger("DSL")
Expand Down Expand Up @@ -92,6 +92,7 @@ def _capture_all_container_logs(buildrun_name: str, namespace: str, artifact_dir
logger.debug(f"Captured init container logs for '{container_name}' to {log_file}")


@entrypoint
def run(
build_name: str,
*,
Expand Down Expand Up @@ -269,9 +270,5 @@ def generate_rebuild_summary(args, ctx):
logger.info(f"Timeout: {args.timeout_minutes} minutes")


# Create the main function using the toolbox library
main = toolbox.create_toolbox_main(run)


if __name__ == "__main__":
main()
run.main()
52 changes: 24 additions & 28 deletions projects/core/ci_entrypoint/fournos.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@

import yaml

from projects.core.library import run

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -129,8 +127,8 @@ def parse_and_save_pr_arguments_fournos():
# Create CI metadata directory
metadata_dir.mkdir(parents=True, exist_ok=True)

# Fetch and save FournosJob YAML
fjob, fjob_spec = fetch_and_save_fjob_yaml(metadata_dir / "fjob.yaml")
# Load FournosJob YAML
fjob, fjob_spec = load_fjob_yaml(metadata_dir.parent / "fournos_fjob.yaml")

Comment thread
coderabbitai[bot] marked this conversation as resolved.
if not fjob_spec:
raise ValueError("FournosJob YAML not found, cannot parse FOURNOS PR arguments")
Expand Down Expand Up @@ -247,40 +245,38 @@ def process_forge_config(fjob_spec, metadata_dir, fjob):
return variable_overrides


def fetch_and_save_fjob_yaml(fjob_yaml_dest):
def load_fjob_yaml(fjob_yaml_path):
"""
Fetch FournosJob YAML and save to artifact directory.
Load FournosJob YAML from file.

Args:
fjob_yaml_dest: Path where to save the FournosJob YAML
fjob_yaml_path: Path to the FournosJob YAML file

Returns:
Tuple of (full_fjob, fjob_spec), or (None, None) if not found
"""

namespace = "psap-automation-wip"

result = run.run(
f"oc get fjobs -n {namespace} -oyaml",
capture_stdout=True,
)

fjobs_data = yaml.safe_load(result.stdout)
if not (fjobs_data and "items" in fjobs_data and fjobs_data["items"]):
logger.warning("No FournosJobs found")
if not fjob_yaml_path.exists():
logger.warning(f"FournosJob YAML file not found: {fjob_yaml_path}")
return None, None

# Take the last fjob from the list
fjob = fjobs_data["items"][-1]

try:
del fjob["status"]
except KeyError:
pass # ignore
with open(fjob_yaml_path) as f:
fjob = yaml.safe_load(f)

with open(fjob_yaml_dest, "w") as f:
yaml.dump(fjob, f, default_flow_style=False, sort_keys=False)
if not fjob:
logger.warning(f"Empty or invalid FournosJob YAML: {fjob_yaml_path}")
return None, None

logger.info(f"Saved FournosJob YAML to {fjob_yaml_dest}")
# Remove status if present
try:
del fjob["status"]
except KeyError:
pass # ignore

return fjob, fjob["spec"]
logger.info(f"Loaded FournosJob YAML from {fjob_yaml_path}")

return fjob, fjob["spec"]

except Exception as e:
logger.error(f"Failed to load FournosJob YAML from {fjob_yaml_path}: {e}")
return None, None
Comment on lines +266 to +282
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Avoid collapsing malformed YAML into a “not found” outcome.

At Line 266-282, parse/shape errors and missing file all become (None, None), which later triggers a misleading “not found” message at Line 134. Add explicit structure checks (dict root + spec mapping) and log precise failure reasons.

Suggested fix
 def load_fjob_yaml(fjob_yaml_path):
@@
     try:
         with open(fjob_yaml_path) as f:
             fjob = yaml.safe_load(f)

-        if not fjob:
+        if not fjob:
             logger.warning(f"Empty or invalid FournosJob YAML: {fjob_yaml_path}")
             return None, None
+
+        if not isinstance(fjob, dict):
+            logger.error(f"Invalid FournosJob root type in {fjob_yaml_path}: expected mapping")
+            return None, None
+
+        fjob_spec = fjob.get("spec")
+        if not isinstance(fjob_spec, dict):
+            logger.error(f"Invalid or missing 'spec' section in {fjob_yaml_path}")
+            return None, None

         # Remove status if present
-        try:
-            del fjob["status"]
-        except KeyError:
-            pass  # ignore
+        fjob.pop("status", None)

         logger.info(f"Loaded FournosJob YAML from {fjob_yaml_path}")
-
-        return fjob, fjob["spec"]
+        return fjob, fjob_spec
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@projects/core/ci_entrypoint/fournos.py` around lines 266 - 282, The loader
currently collapses all failures into (None, None) and a generic error, so add
explicit checks and finer-grained exception handling around the YAML load: after
parsing (the fjob variable) verify isinstance(fjob, dict) and that "spec" in
fjob and isinstance(fjob["spec"], dict); if those checks fail, logger.error a
clear message like "malformed FournosJob: root must be mapping" or
"missing/invalid 'spec' mapping" and return (None, None). Also catch
FileNotFoundError separately and log "FournosJob YAML not found at
{fjob_yaml_path}", and catch YAML parsing errors (yaml.YAMLError or the parser
exception) and log "Failed to parse FournosJob YAML: <error>"; keep deleting
fjob["status"] as-is but only after the structure checks. Use the existing
logger, fjob, fjob_yaml_path symbols to locate and implement these checks.

10 changes: 5 additions & 5 deletions projects/core/ci_entrypoint/fournos_resolve.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ def fetch_fournos_job() -> tuple[str, str, dict]:
RuntimeError: If fetch or parsing fails
"""
# Get environment variables
job_name = os.environ.get("FOURNOS_JOB_NAME")
job_name = os.environ.get("FJOB_NAME")
namespace = os.environ.get("FOURNOS_NAMESPACE")

if not job_name:
raise ValueError("FOURNOS_JOB_NAME environment variable is required")
raise ValueError("FJOB_NAME environment variable is required")
if not namespace:
raise ValueError("FOURNOS_NAMESPACE environment variable is required")

Expand Down Expand Up @@ -197,8 +197,8 @@ def create_fournos_resolve_command(
@click.command("resolve-fournos-config")
@click.option(
"--fjob-name",
help="FournosJob name (sets FOURNOS_JOB_NAME if provided)",
envvar="FOURNOS_JOB_NAME",
help="FournosJob name (sets FJOB_NAME if provided)",
envvar="FJOB_NAME",
)
@click.option(
"--namespace",
Expand All @@ -216,7 +216,7 @@ def fournos_resolve_command(ctx, fjob_name, namespace, dry_run):
"""Resolve the FournosJob object configuration."""

if fjob_name:
os.environ["FOURNOS_JOB_NAME"] = fjob_name
os.environ["FJOB_NAME"] = fjob_name
if namespace:
os.environ["FOURNOS_NAMESPACE"] = namespace

Expand Down
3 changes: 2 additions & 1 deletion projects/core/dsl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
from . import context, shell, template, toolbox
from .runtime import clear_tasks, execute_tasks
from .script_manager import get_script_manager, reset_script_manager
from .task import RetryFailure, always, retry, task, when
from .task import RetryFailure, always, entrypoint, retry, task, when

__all__ = [
"always",
"clear_tasks",
"context",
"entrypoint",
"execute_tasks",
"get_script_manager",
"reset_script_manager",
Expand Down
88 changes: 79 additions & 9 deletions projects/core/dsl/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import inspect
import logging
import os
import threading
import types
from datetime import datetime
from pathlib import Path
Expand Down Expand Up @@ -76,6 +77,37 @@ def execute_tasks(function_args: dict = None):
filename = caller_frame.f_code.co_filename
command_name = _get_toolbox_function_name(filename)

# Get DSL runtime parameters from function args or wrapper attributes
prefix = function_args.pop("artifact_dirname_prefix", None)
suffix = function_args.pop("artifact_dirname_suffix", None)

# Also check if they're stored in the calling function (from @entrypoint decorator)
try:
# Get the calling function from the frame
calling_func = caller_frame.f_globals.get(caller_frame.f_code.co_name)
if calling_func and hasattr(calling_func, "_dsl_runtime_params"):
runtime_params = calling_func._dsl_runtime_params
prefix = prefix or runtime_params.get("artifact_dirname_prefix")
suffix = suffix or runtime_params.get("artifact_dirname_suffix")
except (AttributeError, KeyError):
# If we can't get the calling function, that's fine - just continue
pass

# Debug logging to see if parameters are found
if suffix or prefix:
logger.info(f"DSL runtime: found prefix='{prefix}', suffix='{suffix}'")

# Prepend prefix to command name if provided
if prefix:
command_name = f"{prefix}_{command_name}"

# Append suffix to command name if provided
if suffix:
command_name = f"{command_name}_{suffix}"

# Log the final command name for debugging
logger.info(f"DSL runtime: using command_name='{command_name}'")

# Get relative filename to match task registration
try:
rel_filename = str(Path(filename).relative_to(env.FORGE_HOME))
Expand Down Expand Up @@ -109,6 +141,10 @@ def execute_tasks(function_args: dict = None):

# Execute tasks only from the calling file
script_manager = get_script_manager()

# Start thread-local execution context for this execution
script_manager.start_execution_context(rel_filename)

file_tasks = list(script_manager.get_tasks_from_file(rel_filename))

if not file_tasks:
Expand Down Expand Up @@ -195,11 +231,15 @@ def execute_tasks(function_args: dict = None):
shared_context.__dict__["artifact_dir"] = args.artifact_dir

return shared_context

finally:
# Clean up the file handler to prevent leaks
dsl_logger = logging.getLogger("DSL")
dsl_logger.removeHandler(file_handler)
file_handler.close()
# Clear thread-local execution context
script_manager.clear_execution_context()
Comment thread
coderabbitai[bot] marked this conversation as resolved.
# Clean up the thread-local file handler to prevent leaks
if hasattr(_thread_local_handlers, "file_handler"):
_thread_local_handlers.file_handler.close()
# Remove the reference to prevent memory leaks
del _thread_local_handlers.file_handler


def _execute_single_task(task_info, args, shared_context):
Expand Down Expand Up @@ -348,20 +388,50 @@ def _generate_env_file(meta_dir):
logger.debug(f"Generated environment file: {env_file}")


# Thread-local storage for DSL logger handlers
_thread_local_handlers = threading.local()


class ThreadLocalHandler(logging.Handler):
"""A logging handler that routes messages to thread-specific files"""

def __init__(self):
super().__init__()

def emit(self, record):
# Only emit if we have a thread-local file handler for this thread
if hasattr(_thread_local_handlers, "file_handler"):
try:
_thread_local_handlers.file_handler.emit(record)
except Exception:
# Ignore errors in logging to avoid breaking execution
pass


def _setup_execution_logging(artifact_dir):
"""Setup file logging to capture all stdout/stderr during execution"""
"""Setup thread-safe file logging to capture all stdout/stderr during execution"""
log_file = artifact_dir / "task.log"

# Create file handler for the DSL logger
# Create file handler for this specific execution
file_handler = logging.FileHandler(log_file, mode="w")
file_handler.setLevel(logging.INFO)

# Use same format as console output
file_handler.setFormatter(logging.Formatter("%(message)s"))

# Add handler to the DSL logger so all DSL modules inherit it
dsl_logger = logging.getLogger("DSL")
dsl_logger.addHandler(file_handler)
# Store the file handler in thread-local storage
_thread_local_handlers.file_handler = file_handler

# Add thread-local handler to main DSL logger only once (globally)
main_dsl_logger = logging.getLogger("DSL")

# Check if our thread-local handler is already added
has_thread_handler = any(isinstance(h, ThreadLocalHandler) for h in main_dsl_logger.handlers)

if not has_thread_handler:
thread_handler = ThreadLocalHandler()
thread_handler.setLevel(logging.INFO)
main_dsl_logger.addHandler(thread_handler)

return log_file, file_handler

Expand Down
Loading
Loading