From 7e6181d2000d953d0f0c392d577e4d3d3c682af1 Mon Sep 17 00:00:00 2001 From: Brodey Newman Date: Tue, 10 Mar 2026 22:18:05 -0400 Subject: [PATCH 1/3] chore: jobs --- sdk/python/src/p95/__init__.py | 12 + sdk/python/src/p95/cli.py | 32 ++- sdk/python/src/p95/client.py | 25 ++ sdk/python/src/p95/cloud_cli.py | 444 +++++++++++++++++++++++++++++++ sdk/python/src/p95/exceptions.py | 19 ++ sdk/python/src/p95/run.py | 85 +++++- sdk/python/src/p95/worker.py | 430 ++++++++++++++++++++++++++++++ 7 files changed, 1045 insertions(+), 2 deletions(-) create mode 100644 sdk/python/src/p95/cloud_cli.py create mode 100644 sdk/python/src/p95/worker.py diff --git a/sdk/python/src/p95/__init__.py b/sdk/python/src/p95/__init__.py index c62f4b7..b7597bd 100644 --- a/sdk/python/src/p95/__init__.py +++ b/sdk/python/src/p95/__init__.py @@ -32,6 +32,7 @@ from p95.exceptions import ( APIError, AuthenticationError, + EarlyStopException, ServerError, P95Error, ValidationError, @@ -39,22 +40,33 @@ from p95.run import Run, resume from p95.server import start_server, stop_server from p95.sweep import sweep, agent, should_prune, SweepConfig, ParameterSpec +from p95.worker import Worker, WorkerCapabilities, Job, start_worker __version__ = "0.1.0" __all__ = [ + # Core "Run", "resume", "configure", + # Server "start_server", "stop_server", + # Sweeps "sweep", "agent", "should_prune", "SweepConfig", "ParameterSpec", + # Workers (AI agent job execution) + "Worker", + "WorkerCapabilities", + "Job", + "start_worker", + # Exceptions "P95Error", "AuthenticationError", "APIError", "ValidationError", "ServerError", + "EarlyStopException", ] diff --git a/sdk/python/src/p95/cli.py b/sdk/python/src/p95/cli.py index 3dedd07..892b553 100644 --- a/sdk/python/src/p95/cli.py +++ b/sdk/python/src/p95/cli.py @@ -1,4 +1,9 @@ -"""Console entrypoint for the bundled pnf CLI.""" +"""Console entrypoint for the p95 CLI. + +Supports both local mode (Go binary) and cloud mode (Python commands). +Cloud commands: jobs, workers, runs (with cloud API), intervene +Local commands: tui, ls, show, serve +""" from __future__ import annotations @@ -10,6 +15,10 @@ from typing import Optional +# Cloud commands that are handled by Python +CLOUD_COMMANDS = {"jobs", "workers", "worker", "runs", "run"} + + def _platform_id() -> Optional[str]: system = platform.system().lower() machine = platform.machine().lower() @@ -52,7 +61,28 @@ def _find_binary() -> Optional[str]: return shutil.which(binary_name) +def _is_cloud_command() -> bool: + """Check if the command should be handled by cloud CLI.""" + if len(sys.argv) < 2: + return False + + cmd = sys.argv[1] + return cmd in CLOUD_COMMANDS + + def main() -> None: + """Main CLI entry point. + + Routes to cloud CLI for cloud commands (jobs, workers, runs intervene), + or to the Go binary for local commands (tui, ls, show, serve). + """ + # Check for cloud commands first + if _is_cloud_command(): + from p95.cloud_cli import main_cloud + main_cloud() + return + + # Fall back to Go binary for local commands binary = _find_binary() if not binary: print( diff --git a/sdk/python/src/p95/client.py b/sdk/python/src/p95/client.py index ea8a4c2..a7bbae8 100644 --- a/sdk/python/src/p95/client.py +++ b/sdk/python/src/p95/client.py @@ -234,3 +234,28 @@ def get_continuations(self, run_id: str) -> List[Dict[str, Any]]: """ response = self._request("GET", f"/runs/{run_id}/continuations") return response.get("continuations", []) + + def get_pending_intervention(self, run_id: str) -> Optional[Dict[str, Any]]: + """ + Get the pending intervention for a run, if any. + + Args: + run_id: The run ID + + Returns: + Intervention dictionary or None if no pending intervention + """ + response = self._request("GET", f"/runs/{run_id}/intervention/pending") + return response.get("intervention") + + def ack_intervention(self, intervention_id: str) -> Dict[str, Any]: + """ + Acknowledge and apply an intervention. + + Args: + intervention_id: The intervention ID + + Returns: + Updated intervention dictionary + """ + return self._request("POST", f"/interventions/{intervention_id}/apply") diff --git a/sdk/python/src/p95/cloud_cli.py b/sdk/python/src/p95/cloud_cli.py new file mode 100644 index 0000000..793c966 --- /dev/null +++ b/sdk/python/src/p95/cloud_cli.py @@ -0,0 +1,444 @@ +"""Cloud CLI commands for p95 AI-driven ML training. + +This module provides CLI commands for interacting with the p95 cloud API, +enabling Claude Code and other AI agents to query runs, create jobs, +and intervene in running experiments. +""" + +from __future__ import annotations + +import argparse +import json +import os +import sys +from typing import Any, Dict, List, Optional + +from p95.client import P95Client +from p95.config import SDKConfig, get_config + + +def _get_client() -> P95Client: + """Get a configured P95 cloud client.""" + config = get_config() + if not config.api_key: + api_key = os.environ.get("P95_API_KEY") + if not api_key: + _error("P95_API_KEY environment variable is required") + config.api_key = api_key + if not config.base_url: + config.base_url = os.environ.get("P95_URL", "https://api.p95.dev") + return P95Client(config) + + +def _output(data: Any, as_json: bool = True) -> None: + """Output data in JSON or human-readable format.""" + if as_json: + print(json.dumps({"success": True, "data": data, "error": None}, indent=2)) + else: + print(data) + + +def _error(message: str, as_json: bool = True) -> None: + """Output error message and exit.""" + if as_json: + print(json.dumps({"success": False, "data": None, "error": message})) + else: + print(f"Error: {message}", file=sys.stderr) + sys.exit(1) + + +def _parse_project(project: str) -> tuple[str, str]: + """Parse project string into team and app slugs.""" + parts = project.split("/") + if len(parts) != 2: + _error(f"Invalid project format: {project}. Expected 'team/app'") + return parts[0], parts[1] + + +# =========================================== +# Runs Commands +# =========================================== + +def runs_list(args: argparse.Namespace) -> None: + """List runs in a project.""" + client = _get_client() + team_slug, app_slug = _parse_project(args.project) + + params: Dict[str, Any] = {"limit": args.limit} + if args.status: + params["status"] = args.status + if args.offset: + params["offset"] = args.offset + + try: + response = client._request( + "GET", + f"/teams/{team_slug}/apps/{app_slug}/runs", + params=params, + ) + _output(response, args.json) + except Exception as e: + _error(str(e), args.json) + + +def runs_get(args: argparse.Namespace) -> None: + """Get run details by ID.""" + client = _get_client() + + try: + response = client._request("GET", f"/runs/{args.run_id}") + _output(response, args.json) + except Exception as e: + _error(str(e), args.json) + + +def runs_metrics(args: argparse.Namespace) -> None: + """Get metrics for a run.""" + client = _get_client() + + try: + if args.metric_name: + # Get specific metric series + params: Dict[str, Any] = {} + if args.since_step is not None: + params["min_step"] = args.since_step + response = client._request( + "GET", + f"/runs/{args.run_id}/metrics/{args.metric_name}", + params=params if params else None, + ) + else: + # Get all metrics summary + response = client._request("GET", f"/runs/{args.run_id}/metrics/latest") + _output(response, args.json) + except Exception as e: + _error(str(e), args.json) + + +def runs_intervene(args: argparse.Namespace) -> None: + """Create an intervention for a running experiment.""" + client = _get_client() + + data: Dict[str, Any] = { + "type": args.action, + "rationale": args.rationale, + } + + if args.config: + try: + data["config_delta"] = json.loads(args.config) + except json.JSONDecodeError as e: + _error(f"Invalid JSON in --config: {e}", args.json) + + if args.step is not None: + data["step"] = args.step + + try: + response = client._request( + "POST", + f"/runs/{args.run_id}/intervene", + data=data, + ) + _output(response, args.json) + except Exception as e: + _error(str(e), args.json) + + +def runs_interventions(args: argparse.Namespace) -> None: + """List interventions for a run.""" + client = _get_client() + + try: + response = client._request( + "GET", + f"/runs/{args.run_id}/interventions", + params={"limit": args.limit}, + ) + _output(response, args.json) + except Exception as e: + _error(str(e), args.json) + + +# =========================================== +# Jobs Commands +# =========================================== + +def jobs_create(args: argparse.Namespace) -> None: + """Create a new job.""" + client = _get_client() + team_slug, app_slug = _parse_project(args.project) + + data: Dict[str, Any] = { + "type": args.type or "training", + } + + if args.script: + # Read script content from file + try: + with open(args.script, "r") as f: + data["script"] = f.read() + except FileNotFoundError: + _error(f"Script file not found: {args.script}", args.json) + + if args.command: + data["command"] = args.command + + if args.config: + try: + data["config"] = json.loads(args.config) + except json.JSONDecodeError as e: + _error(f"Invalid JSON in --config: {e}", args.json) + + if args.rationale: + data["ai_rationale"] = args.rationale + data["created_by"] = "ai:claude" + + if args.priority is not None: + data["priority"] = args.priority + + try: + response = client._request( + "POST", + f"/teams/{team_slug}/apps/{app_slug}/jobs", + data=data, + ) + _output(response, args.json) + except Exception as e: + _error(str(e), args.json) + + +def jobs_get(args: argparse.Namespace) -> None: + """Get job details by ID.""" + client = _get_client() + + try: + response = client._request("GET", f"/jobs/{args.job_id}") + _output(response, args.json) + except Exception as e: + _error(str(e), args.json) + + +def jobs_list(args: argparse.Namespace) -> None: + """List jobs in a project.""" + client = _get_client() + team_slug, app_slug = _parse_project(args.project) + + params: Dict[str, Any] = {"limit": args.limit} + if args.status: + params["status"] = args.status + + try: + response = client._request( + "GET", + f"/teams/{team_slug}/apps/{app_slug}/jobs", + params=params, + ) + _output(response, args.json) + except Exception as e: + _error(str(e), args.json) + + +def jobs_cancel(args: argparse.Namespace) -> None: + """Cancel a job.""" + client = _get_client() + + try: + response = client._request("POST", f"/jobs/{args.job_id}/cancel") + _output(response, args.json) + except Exception as e: + _error(str(e), args.json) + + +# =========================================== +# Workers Commands +# =========================================== + +def workers_list(args: argparse.Namespace) -> None: + """List workers in a project.""" + client = _get_client() + team_slug, app_slug = _parse_project(args.project) + + params: Dict[str, Any] = {"limit": args.limit} + if args.status: + params["status"] = args.status + + try: + response = client._request( + "GET", + f"/teams/{team_slug}/apps/{app_slug}/workers", + params=params, + ) + _output(response, args.json) + except Exception as e: + _error(str(e), args.json) + + +def workers_start(args: argparse.Namespace) -> None: + """Start a worker (runs the worker loop).""" + from p95.worker import Worker + + try: + tags = args.tags.split(",") if args.tags else [] + worker = Worker( + project=args.project, + tags=tags, + ) + worker.run() + except KeyboardInterrupt: + print("\nWorker stopped.") + except Exception as e: + _error(str(e), args.json) + + +# =========================================== +# Main CLI Entry Point +# =========================================== + +def create_parser() -> argparse.ArgumentParser: + """Create the argument parser for cloud CLI commands.""" + parser = argparse.ArgumentParser( + prog="p95", + description="p95 ML experiment tracking CLI", + ) + parser.add_argument("--json", action="store_true", default=True, + help="Output in JSON format (default)") + + subparsers = parser.add_subparsers(dest="command", help="Available commands") + + # ----------------------------------------- + # runs commands + # ----------------------------------------- + runs_parser = subparsers.add_parser("runs", help="Run operations") + runs_sub = runs_parser.add_subparsers(dest="runs_command") + + # runs list + runs_list_parser = runs_sub.add_parser("list", help="List runs") + runs_list_parser.add_argument("--project", "-p", required=True, + help="Project in format 'team/app'") + runs_list_parser.add_argument("--status", "-s", help="Filter by status") + runs_list_parser.add_argument("--limit", type=int, default=20) + runs_list_parser.add_argument("--offset", type=int, default=0) + runs_list_parser.add_argument("--json", action="store_true", default=True) + runs_list_parser.set_defaults(func=runs_list) + + # runs get + runs_get_parser = runs_sub.add_parser("get", help="Get run details") + runs_get_parser.add_argument("run_id", help="Run ID") + runs_get_parser.add_argument("--json", action="store_true", default=True) + runs_get_parser.set_defaults(func=runs_get) + + # runs metrics + runs_metrics_parser = runs_sub.add_parser("metrics", help="Get run metrics") + runs_metrics_parser.add_argument("run_id", help="Run ID") + runs_metrics_parser.add_argument("metric_name", nargs="?", help="Specific metric name") + runs_metrics_parser.add_argument("--since-step", type=int, help="Get metrics since step") + runs_metrics_parser.add_argument("--json", action="store_true", default=True) + runs_metrics_parser.set_defaults(func=runs_metrics) + + # runs intervene + runs_intervene_parser = runs_sub.add_parser("intervene", help="Create an intervention") + runs_intervene_parser.add_argument("run_id", help="Run ID") + runs_intervene_parser.add_argument("--action", "-a", required=True, + choices=["adjust_config", "early_stop", "pause", "resume"], + help="Intervention action") + runs_intervene_parser.add_argument("--config", "-c", help="Config changes as JSON") + runs_intervene_parser.add_argument("--rationale", "-r", required=True, + help="Explanation for the intervention") + runs_intervene_parser.add_argument("--step", type=int, help="Current step") + runs_intervene_parser.add_argument("--json", action="store_true", default=True) + runs_intervene_parser.set_defaults(func=runs_intervene) + + # runs interventions + runs_interventions_parser = runs_sub.add_parser("interventions", help="List interventions") + runs_interventions_parser.add_argument("run_id", help="Run ID") + runs_interventions_parser.add_argument("--limit", type=int, default=50) + runs_interventions_parser.add_argument("--json", action="store_true", default=True) + runs_interventions_parser.set_defaults(func=runs_interventions) + + # ----------------------------------------- + # jobs commands + # ----------------------------------------- + jobs_parser = subparsers.add_parser("jobs", help="Job queue operations") + jobs_sub = jobs_parser.add_subparsers(dest="jobs_command") + + # jobs create + jobs_create_parser = jobs_sub.add_parser("create", help="Create a job") + jobs_create_parser.add_argument("--project", "-p", required=True, + help="Project in format 'team/app'") + jobs_create_parser.add_argument("--script", help="Path to Python script") + jobs_create_parser.add_argument("--command", help="Command to execute") + jobs_create_parser.add_argument("--config", "-c", help="Config as JSON") + jobs_create_parser.add_argument("--type", "-t", default="training", + choices=["training", "sweep_trial", "evaluation", "custom"]) + jobs_create_parser.add_argument("--rationale", "-r", + help="AI rationale for creating this job") + jobs_create_parser.add_argument("--priority", type=int, default=0) + jobs_create_parser.add_argument("--json", action="store_true", default=True) + jobs_create_parser.set_defaults(func=jobs_create) + + # jobs get + jobs_get_parser = jobs_sub.add_parser("get", help="Get job details") + jobs_get_parser.add_argument("job_id", help="Job ID") + jobs_get_parser.add_argument("--json", action="store_true", default=True) + jobs_get_parser.set_defaults(func=jobs_get) + + # jobs list + jobs_list_parser = jobs_sub.add_parser("list", help="List jobs") + jobs_list_parser.add_argument("--project", "-p", required=True, + help="Project in format 'team/app'") + jobs_list_parser.add_argument("--status", "-s", help="Filter by status") + jobs_list_parser.add_argument("--limit", type=int, default=20) + jobs_list_parser.add_argument("--json", action="store_true", default=True) + jobs_list_parser.set_defaults(func=jobs_list) + + # jobs cancel + jobs_cancel_parser = jobs_sub.add_parser("cancel", help="Cancel a job") + jobs_cancel_parser.add_argument("job_id", help="Job ID") + jobs_cancel_parser.add_argument("--json", action="store_true", default=True) + jobs_cancel_parser.set_defaults(func=jobs_cancel) + + # ----------------------------------------- + # workers commands + # ----------------------------------------- + workers_parser = subparsers.add_parser("workers", help="Worker operations") + workers_sub = workers_parser.add_subparsers(dest="workers_command") + + # workers list + workers_list_parser = workers_sub.add_parser("list", help="List workers") + workers_list_parser.add_argument("--project", "-p", required=True, + help="Project in format 'team/app'") + workers_list_parser.add_argument("--status", "-s", help="Filter by status") + workers_list_parser.add_argument("--limit", type=int, default=50) + workers_list_parser.add_argument("--json", action="store_true", default=True) + workers_list_parser.set_defaults(func=workers_list) + + # ----------------------------------------- + # worker command (singular - starts a worker) + # ----------------------------------------- + worker_parser = subparsers.add_parser("worker", help="Worker daemon") + worker_sub = worker_parser.add_subparsers(dest="worker_command") + + worker_start_parser = worker_sub.add_parser("start", help="Start a worker") + worker_start_parser.add_argument("--project", "-p", required=True, + help="Project in format 'team/app'") + worker_start_parser.add_argument("--tags", help="Comma-separated worker tags") + worker_start_parser.add_argument("--json", action="store_true", default=True) + worker_start_parser.set_defaults(func=workers_start) + + return parser + + +def main_cloud() -> None: + """Main entry point for cloud CLI commands.""" + parser = create_parser() + args = parser.parse_args() + + if not hasattr(args, "func"): + parser.print_help() + sys.exit(1) + + args.func(args) + + +if __name__ == "__main__": + main_cloud() diff --git a/sdk/python/src/p95/exceptions.py b/sdk/python/src/p95/exceptions.py index 016520f..bc82ba5 100644 --- a/sdk/python/src/p95/exceptions.py +++ b/sdk/python/src/p95/exceptions.py @@ -38,3 +38,22 @@ class ServerError(P95Error): """Raised when server management fails (e.g., binary not found, failed to start).""" pass + + +class EarlyStopException(P95Error): + """Raised when an early stop intervention is applied. + + Training loops should catch this exception to gracefully exit. + + Example: + try: + for epoch in range(100): + train_step() + run.apply_intervention(run.check_intervention()) + except EarlyStopException as e: + print(f"Early stop: {e.rationale}") + """ + + def __init__(self, rationale: str): + super().__init__(f"Early stop requested: {rationale}") + self.rationale = rationale diff --git a/sdk/python/src/p95/run.py b/sdk/python/src/p95/run.py index 39304cc..b2df0d9 100644 --- a/sdk/python/src/p95/run.py +++ b/sdk/python/src/p95/run.py @@ -10,7 +10,7 @@ from typing import Any, Dict, List, Optional, TYPE_CHECKING from p95.config import SDKConfig, get_config -from p95.exceptions import ValidationError +from p95.exceptions import EarlyStopException, ValidationError from p95.utils import generate_run_name, get_git_info, get_system_info if TYPE_CHECKING: @@ -380,6 +380,89 @@ def add_tags(self, tags: List[str]) -> None: else: self._remote_client.add_run_tags(self._run_id, tags) + def check_intervention(self) -> Optional[Dict[str, Any]]: + """ + Check for a pending intervention from an AI agent. + + Call this periodically in your training loop to check if an AI + or human has requested a config change, early stop, or pause. + + Returns: + Intervention dictionary if pending, None otherwise. + The dictionary contains: + - id: Intervention ID + - type: "adjust_config", "early_stop", "pause", "resume" + - config_delta: Config changes to apply (for adjust_config) + - rationale: Explanation for the intervention + + Example: + for epoch in range(100): + train_step() + intervention = run.check_intervention() + if intervention: + run.apply_intervention(intervention) + """ + if self._config.mode == "local": + # Local mode doesn't support interventions + return None + + return self._remote_client.get_pending_intervention(self._run_id) + + def apply_intervention(self, intervention: Optional[Dict[str, Any]]) -> None: + """ + Apply and acknowledge an intervention. + + This method: + 1. Updates the run's config if type is "adjust_config" + 2. Raises EarlyStopException if type is "early_stop" + 3. Marks the intervention as applied on the server + + Args: + intervention: Intervention dictionary from check_intervention() + + Raises: + EarlyStopException: If the intervention is an early stop request + + Example: + try: + for epoch in range(100): + train_step() + intervention = run.check_intervention() + if intervention: + run.apply_intervention(intervention) + except EarlyStopException as e: + print(f"Training stopped early: {e.rationale}") + """ + if intervention is None: + return + + if self._config.mode == "local": + # Local mode doesn't support interventions + return + + intervention_type = intervention.get("type") + rationale = intervention.get("rationale", "") + + if intervention_type == "adjust_config": + config_delta = intervention.get("config_delta", {}) + if config_delta: + # Update config on the server + self._remote_client.update_run_config(self._run_id, config_delta) + # Update local config tracking if exists + if self._initial_config is not None: + self._initial_config.update(config_delta) + + # Acknowledge the intervention + self._remote_client.ack_intervention(intervention["id"]) + + if intervention_type == "early_stop": + raise EarlyStopException(rationale) + + @property + def config(self) -> Optional[Dict[str, Any]]: + """Return the current run configuration.""" + return self._initial_config + def complete(self) -> None: """Mark the run as completed successfully.""" self._finalize("completed") diff --git a/sdk/python/src/p95/worker.py b/sdk/python/src/p95/worker.py new file mode 100644 index 0000000..8abe7e9 --- /dev/null +++ b/sdk/python/src/p95/worker.py @@ -0,0 +1,430 @@ +"""Worker for distributed job execution. + +Example usage: + from p95 import Worker + + worker = Worker(project="team/app", tags=["gpu", "a100"]) + worker.run() + +Or via CLI: + p95 worker start --project team/app --tags gpu,a100 +""" + +from __future__ import annotations + +import logging +import os +import platform +import signal +import subprocess +import sys +import time +import uuid +from dataclasses import dataclass, field +from typing import Any, Dict, List, Optional + +from p95.client import P95Client +from p95.config import SDKConfig, get_config +from p95.exceptions import APIError + + +logger = logging.getLogger(__name__) + + +@dataclass +class WorkerCapabilities: + """Worker hardware capabilities.""" + gpu_count: int = 0 + gpu_memory_gb: float = 0.0 + gpu_model: str = "" + cpu_count: int = 0 + memory_gb: float = 0.0 + disk_gb: float = 0.0 + + +@dataclass +class Job: + """Represents a job to be executed.""" + id: str + type: str + status: str + script: Optional[str] = None + command: Optional[str] = None + config: Dict[str, Any] = field(default_factory=dict) + environment: Dict[str, str] = field(default_factory=dict) + ai_rationale: Optional[str] = None + + +class Worker: + """Distributed worker that claims and executes jobs. + + The worker connects to the p95 API, registers itself, claims available + jobs, and executes them. It sends heartbeats to indicate it's alive + and reports job completion/failure. + """ + + def __init__( + self, + project: str, + worker_id: Optional[str] = None, + tags: Optional[List[str]] = None, + capabilities: Optional[WorkerCapabilities] = None, + heartbeat_interval: int = 30, + poll_interval: int = 5, + ): + """Initialize the worker. + + Args: + project: Project in format 'team/app' + worker_id: Unique worker ID (auto-generated if not provided) + tags: Worker tags for capability matching + capabilities: Hardware capabilities (auto-detected if not provided) + heartbeat_interval: Seconds between heartbeats + poll_interval: Seconds between job polling + """ + self.project = project + parts = project.split("/") + if len(parts) != 2: + raise ValueError(f"Invalid project format: {project}. Expected 'team/app'") + self.team_slug, self.app_slug = parts + + self.worker_id = worker_id or self._generate_worker_id() + self.tags = tags or [] + self.capabilities = capabilities or self._detect_capabilities() + self.heartbeat_interval = heartbeat_interval + self.poll_interval = poll_interval + + self._running = False + self._current_job: Optional[Job] = None + self._client = self._create_client() + + def _generate_worker_id(self) -> str: + """Generate a unique worker ID.""" + hostname = platform.node() or "unknown" + short_uuid = str(uuid.uuid4())[:8] + return f"{hostname}-{short_uuid}" + + def _detect_capabilities(self) -> WorkerCapabilities: + """Auto-detect hardware capabilities.""" + import os + + caps = WorkerCapabilities() + + # CPU count + try: + caps.cpu_count = os.cpu_count() or 1 + except Exception: + caps.cpu_count = 1 + + # Memory (basic detection) + try: + if sys.platform == "darwin": + import subprocess + result = subprocess.run( + ["sysctl", "-n", "hw.memsize"], + capture_output=True, text=True + ) + caps.memory_gb = int(result.stdout.strip()) / (1024 ** 3) + elif sys.platform == "linux": + with open("/proc/meminfo") as f: + for line in f: + if line.startswith("MemTotal:"): + # Value is in kB + caps.memory_gb = int(line.split()[1]) / (1024 ** 2) + break + except Exception: + pass + + # GPU detection (NVIDIA only for now) + try: + result = subprocess.run( + ["nvidia-smi", "--query-gpu=count,memory.total,name", "--format=csv,noheader,nounits"], + capture_output=True, text=True + ) + if result.returncode == 0: + lines = result.stdout.strip().split("\n") + caps.gpu_count = len(lines) + if lines: + parts = lines[0].split(",") + if len(parts) >= 2: + caps.gpu_memory_gb = float(parts[1].strip()) / 1024 + if len(parts) >= 3: + caps.gpu_model = parts[2].strip() + except Exception: + pass + + return caps + + def _create_client(self) -> P95Client: + """Create the API client.""" + config = get_config() + if not config.api_key: + api_key = os.environ.get("P95_API_KEY") + if not api_key: + raise ValueError("P95_API_KEY environment variable is required") + config.api_key = api_key + if not config.base_url: + config.base_url = os.environ.get("P95_URL", "https://api.p95.dev") + return P95Client(config) + + def _register(self) -> None: + """Register the worker with the API.""" + data = { + "id": self.worker_id, + "capabilities": { + "gpu_count": self.capabilities.gpu_count, + "gpu_memory_gb": self.capabilities.gpu_memory_gb, + "gpu_model": self.capabilities.gpu_model, + "cpu_count": self.capabilities.cpu_count, + "memory_gb": self.capabilities.memory_gb, + }, + "tags": self.tags, + "hostname": platform.node(), + "system_info": { + "os": platform.system(), + "arch": platform.machine(), + "python": platform.python_version(), + }, + } + + try: + self._client._request( + "POST", + f"/teams/{self.team_slug}/apps/{self.app_slug}/workers", + data=data, + ) + logger.info(f"Worker {self.worker_id} registered successfully") + except APIError as e: + logger.error(f"Failed to register worker: {e}") + raise + + def _heartbeat(self) -> None: + """Send a heartbeat to indicate the worker is alive.""" + try: + status = "busy" if self._current_job else "online" + data = {"status": status} + if self._current_job: + data["current_job_id"] = self._current_job.id + + self._client._request( + "POST", + f"/workers/{self.worker_id}/heartbeat", + data=data, + ) + except APIError as e: + logger.warning(f"Heartbeat failed: {e}") + + def _claim_job(self) -> Optional[Job]: + """Try to claim an available job.""" + try: + # Get available jobs + response = self._client._request( + "GET", + f"/workers/{self.worker_id}/jobs", + params={"limit": 1}, + ) + + jobs = response.get("jobs", []) + if not jobs: + return None + + job_data = jobs[0] + job_id = job_data["id"] + + # Try to claim it + response = self._client._request( + "POST", + f"/jobs/{job_id}/claim", + data={"worker_id": self.worker_id}, + ) + + return Job( + id=response["id"], + type=response.get("type", "training"), + status=response.get("status", "queued"), + script=response.get("script"), + command=response.get("command"), + config=response.get("config", {}), + environment=response.get("environment", {}), + ai_rationale=response.get("ai_rationale"), + ) + except APIError as e: + if "not available" in str(e).lower(): + # Race condition - another worker claimed it + return None + logger.error(f"Failed to claim job: {e}") + return None + + def _execute_job(self, job: Job) -> int: + """Execute a job and return exit code.""" + logger.info(f"Executing job {job.id} (type: {job.type})") + + if job.ai_rationale: + logger.info(f"AI Rationale: {job.ai_rationale}") + + # Notify job started + try: + self._client._request( + "PUT", + f"/jobs/{job.id}/status", + data={"status": "running"}, + ) + except APIError: + pass + + # Build environment + env = os.environ.copy() + env.update(job.environment or {}) + env["P95_JOB_ID"] = job.id + env["P95_PROJECT"] = self.project + + # Add config as environment variables + for key, value in (job.config or {}).items(): + env[f"P95_CONFIG_{key.upper()}"] = str(value) + + try: + if job.command: + # Run command directly + result = subprocess.run( + job.command, + shell=True, + env=env, + capture_output=False, + ) + return result.returncode + elif job.script: + # Write script to temp file and run + import tempfile + with tempfile.NamedTemporaryFile( + mode="w", suffix=".py", delete=False + ) as f: + f.write(job.script) + script_path = f.name + + try: + result = subprocess.run( + [sys.executable, script_path], + env=env, + capture_output=False, + ) + return result.returncode + finally: + os.unlink(script_path) + else: + logger.error("Job has no command or script") + return 1 + except Exception as e: + logger.error(f"Job execution failed: {e}") + return 1 + + def _report_completion(self, job: Job, exit_code: int) -> None: + """Report job completion or failure.""" + try: + if exit_code == 0: + self._client._request( + "POST", + f"/jobs/{job.id}/complete", + data={"worker_id": self.worker_id, "exit_code": exit_code}, + ) + logger.info(f"Job {job.id} completed successfully") + else: + self._client._request( + "POST", + f"/jobs/{job.id}/fail", + data={ + "worker_id": self.worker_id, + "error_message": f"Exit code: {exit_code}", + }, + ) + logger.info(f"Job {job.id} failed with exit code {exit_code}") + except APIError as e: + logger.error(f"Failed to report job completion: {e}") + + def _unregister(self) -> None: + """Unregister the worker.""" + try: + self._client._request("DELETE", f"/workers/{self.worker_id}") + logger.info(f"Worker {self.worker_id} unregistered") + except APIError as e: + logger.warning(f"Failed to unregister worker: {e}") + + def run(self) -> None: + """Run the worker loop. + + The worker will: + 1. Register with the API + 2. Poll for available jobs + 3. Claim and execute jobs + 4. Send heartbeats + 5. Report job completion/failure + + Handles SIGINT/SIGTERM for graceful shutdown. + """ + self._running = True + + # Set up signal handlers + def handle_signal(signum, frame): + logger.info("Received shutdown signal") + self._running = False + + signal.signal(signal.SIGINT, handle_signal) + signal.signal(signal.SIGTERM, handle_signal) + + logger.info(f"Starting worker {self.worker_id}") + logger.info(f"Project: {self.project}") + logger.info(f"Tags: {self.tags}") + logger.info(f"Capabilities: GPU={self.capabilities.gpu_count}, " + f"CPU={self.capabilities.cpu_count}, " + f"Memory={self.capabilities.memory_gb:.1f}GB") + + # Register + self._register() + + last_heartbeat = 0.0 + last_poll = 0.0 + + try: + while self._running: + now = time.time() + + # Send heartbeat if needed + if now - last_heartbeat >= self.heartbeat_interval: + self._heartbeat() + last_heartbeat = now + + # Poll for jobs if not currently executing + if self._current_job is None and now - last_poll >= self.poll_interval: + job = self._claim_job() + if job: + self._current_job = job + exit_code = self._execute_job(job) + self._report_completion(job, exit_code) + self._current_job = None + last_poll = now + + # Sleep briefly to avoid busy-waiting + time.sleep(1) + + finally: + self._unregister() + logger.info("Worker stopped") + + +def start_worker( + project: str, + worker_id: Optional[str] = None, + tags: Optional[List[str]] = None, +) -> Worker: + """Start a worker and return it. + + Args: + project: Project in format 'team/app' + worker_id: Unique worker ID (auto-generated if not provided) + tags: Worker tags for capability matching + + Returns: + The running Worker instance + """ + worker = Worker(project=project, worker_id=worker_id, tags=tags) + worker.run() + return worker From d39dfb6d7d54f2f26e010cdc4240a38a3f6dfda7 Mon Sep 17 00:00:00 2001 From: Brodey Newman Date: Wed, 11 Mar 2026 00:07:56 -0400 Subject: [PATCH 2/3] chore: jobs --- sdk/python/examples/train_mlp.py | 352 +++++++++++++++++++++++++++++++ sdk/python/pyproject.toml | 1 + sdk/python/src/p95/client.py | 16 ++ sdk/python/src/p95/run.py | 10 + 4 files changed, 379 insertions(+) create mode 100644 sdk/python/examples/train_mlp.py diff --git a/sdk/python/examples/train_mlp.py b/sdk/python/examples/train_mlp.py new file mode 100644 index 0000000..610bcb6 --- /dev/null +++ b/sdk/python/examples/train_mlp.py @@ -0,0 +1,352 @@ +#!/usr/bin/env python3 +"""Train a simple MLP on synthetic classification data. + +This example demonstrates using p95 to track a real PyTorch training loop. +It uses a synthetic dataset so no downloads are required. + +Usage: + # Local mode (default) + python examples/train_mlp.py + + # Remote mode + P95_URL=http://localhost:8080 P95_API_KEY=xxx python examples/train_mlp.py + + # Or submit as a job + p95 jobs create --project team/app --command "python train_mlp.py" +""" + +import os +import time +import math +import random + +# Check if torch is available, use numpy fallback if not +try: + import torch + import torch.nn as nn + import torch.optim as optim + HAS_TORCH = True +except ImportError: + HAS_TORCH = False + print("PyTorch not found, using numpy fallback") + +import numpy as np +import p95 + + +def make_synthetic_data(n_samples=1000, n_features=20, n_classes=3, seed=42): + """Generate synthetic classification data.""" + if HAS_TORCH: + torch.manual_seed(seed) + X = torch.randn(n_samples, n_features) + # Create clusters for each class + centers = torch.randn(n_classes, n_features) * 2 + y = torch.randint(0, n_classes, (n_samples,)) + for i in range(n_classes): + mask = y == i + X[mask] += centers[i] + return X, y + else: + np.random.seed(seed) + X = np.random.randn(n_samples, n_features).astype(np.float32) + centers = np.random.randn(n_classes, n_features).astype(np.float32) * 2 + y = np.random.randint(0, n_classes, n_samples) + for i in range(n_classes): + mask = y == i + X[mask] += centers[i] + return X, y + + +if HAS_TORCH: + class MLP(nn.Module): + """Simple MLP classifier.""" + def __init__(self, n_features, n_hidden, n_classes, dropout=0.2): + super().__init__() + self.net = nn.Sequential( + nn.Linear(n_features, n_hidden), + nn.ReLU(), + nn.Dropout(dropout), + nn.Linear(n_hidden, n_hidden), + nn.ReLU(), + nn.Dropout(dropout), + nn.Linear(n_hidden, n_classes), + ) + + def forward(self, x): + return self.net(x) + + +class NumpyMLP: + """Simple numpy-based MLP for when PyTorch isn't available.""" + def __init__(self, n_features, n_hidden, n_classes, lr=0.01): + self.lr = lr + # Xavier initialization + self.W1 = np.random.randn(n_features, n_hidden).astype(np.float32) * np.sqrt(2.0 / n_features) + self.b1 = np.zeros(n_hidden, dtype=np.float32) + self.W2 = np.random.randn(n_hidden, n_hidden).astype(np.float32) * np.sqrt(2.0 / n_hidden) + self.b2 = np.zeros(n_hidden, dtype=np.float32) + self.W3 = np.random.randn(n_hidden, n_classes).astype(np.float32) * np.sqrt(2.0 / n_hidden) + self.b3 = np.zeros(n_classes, dtype=np.float32) + + def relu(self, x): + return np.maximum(0, x) + + def softmax(self, x): + exp_x = np.exp(x - np.max(x, axis=1, keepdims=True)) + return exp_x / np.sum(exp_x, axis=1, keepdims=True) + + def forward(self, X): + self.z1 = X @ self.W1 + self.b1 + self.a1 = self.relu(self.z1) + self.z2 = self.a1 @ self.W2 + self.b2 + self.a2 = self.relu(self.z2) + self.z3 = self.a2 @ self.W3 + self.b3 + return self.softmax(self.z3) + + def loss(self, probs, y): + n = len(y) + log_probs = -np.log(probs[np.arange(n), y] + 1e-8) + return np.mean(log_probs) + + def accuracy(self, probs, y): + preds = np.argmax(probs, axis=1) + return np.mean(preds == y) + + def backward(self, X, y, probs): + n = len(y) + # Gradient of cross-entropy loss + dz3 = probs.copy() + dz3[np.arange(n), y] -= 1 + dz3 /= n + + dW3 = self.a2.T @ dz3 + db3 = np.sum(dz3, axis=0) + + da2 = dz3 @ self.W3.T + dz2 = da2 * (self.z2 > 0) + + dW2 = self.a1.T @ dz2 + db2 = np.sum(dz2, axis=0) + + da1 = dz2 @ self.W2.T + dz1 = da1 * (self.z1 > 0) + + dW1 = X.T @ dz1 + db1 = np.sum(dz1, axis=0) + + # Update weights + self.W3 -= self.lr * dW3 + self.b3 -= self.lr * db3 + self.W2 -= self.lr * dW2 + self.b2 -= self.lr * db2 + self.W1 -= self.lr * dW1 + self.b1 -= self.lr * db1 + + +def train_torch(run, config): + """Train using PyTorch.""" + # Generate data + X_train, y_train = make_synthetic_data( + n_samples=config["n_samples"], + n_features=config["n_features"], + n_classes=config["n_classes"], + ) + X_val, y_val = make_synthetic_data( + n_samples=config["n_samples"] // 5, + n_features=config["n_features"], + n_classes=config["n_classes"], + seed=123, + ) + + # Create model + model = MLP( + n_features=config["n_features"], + n_hidden=config["n_hidden"], + n_classes=config["n_classes"], + dropout=config["dropout"], + ) + + criterion = nn.CrossEntropyLoss() + optimizer = optim.Adam(model.parameters(), lr=config["lr"]) + + batch_size = config["batch_size"] + n_batches = len(X_train) // batch_size + + for epoch in range(config["epochs"]): + model.train() + epoch_loss = 0.0 + correct = 0 + total = 0 + + # Shuffle data + perm = torch.randperm(len(X_train)) + X_train = X_train[perm] + y_train = y_train[perm] + + for batch_idx in range(n_batches): + start = batch_idx * batch_size + end = start + batch_size + X_batch = X_train[start:end] + y_batch = y_train[start:end] + + optimizer.zero_grad() + outputs = model(X_batch) + loss = criterion(outputs, y_batch) + loss.backward() + optimizer.step() + + epoch_loss += loss.item() + _, predicted = outputs.max(1) + total += y_batch.size(0) + correct += predicted.eq(y_batch).sum().item() + + train_loss = epoch_loss / n_batches + train_acc = correct / total + + # Validation + model.eval() + with torch.no_grad(): + val_outputs = model(X_val) + val_loss = criterion(val_outputs, y_val).item() + _, val_predicted = val_outputs.max(1) + val_acc = val_predicted.eq(y_val).sum().item() / len(y_val) + + # Log metrics + run.log_metrics({ + "train/loss": train_loss, + "train/accuracy": train_acc, + "val/loss": val_loss, + "val/accuracy": val_acc, + }, step=epoch) + + # Check for interventions + intervention = run.check_intervention() + if intervention: + print(f"Received intervention: {intervention['type']}") + run.apply_intervention(intervention) + # Update config if adjusted + if intervention["type"] == "adjust_config": + for key, value in intervention.get("config_delta", {}).items(): + if key == "lr": + for param_group in optimizer.param_groups: + param_group["lr"] = value + print(f"Updated learning rate to {value}") + + print(f"Epoch {epoch+1}/{config['epochs']} - " + f"train_loss: {train_loss:.4f}, train_acc: {train_acc:.4f}, " + f"val_loss: {val_loss:.4f}, val_acc: {val_acc:.4f}") + + +def train_numpy(run, config): + """Train using numpy fallback.""" + # Generate data + X_train, y_train = make_synthetic_data( + n_samples=config["n_samples"], + n_features=config["n_features"], + n_classes=config["n_classes"], + ) + X_val, y_val = make_synthetic_data( + n_samples=config["n_samples"] // 5, + n_features=config["n_features"], + n_classes=config["n_classes"], + seed=123, + ) + + model = NumpyMLP( + n_features=config["n_features"], + n_hidden=config["n_hidden"], + n_classes=config["n_classes"], + lr=config["lr"], + ) + + batch_size = config["batch_size"] + n_batches = len(X_train) // batch_size + + for epoch in range(config["epochs"]): + epoch_loss = 0.0 + epoch_acc = 0.0 + + # Shuffle data + perm = np.random.permutation(len(X_train)) + X_train = X_train[perm] + y_train = y_train[perm] + + for batch_idx in range(n_batches): + start = batch_idx * batch_size + end = start + batch_size + X_batch = X_train[start:end] + y_batch = y_train[start:end] + + probs = model.forward(X_batch) + loss = model.loss(probs, y_batch) + acc = model.accuracy(probs, y_batch) + model.backward(X_batch, y_batch, probs) + + epoch_loss += loss + epoch_acc += acc + + train_loss = epoch_loss / n_batches + train_acc = epoch_acc / n_batches + + # Validation + val_probs = model.forward(X_val) + val_loss = model.loss(val_probs, y_val) + val_acc = model.accuracy(val_probs, y_val) + + # Log metrics + run.log_metrics({ + "train/loss": train_loss, + "train/accuracy": train_acc, + "val/loss": val_loss, + "val/accuracy": val_acc, + }, step=epoch) + + # Check for interventions + intervention = run.check_intervention() + if intervention: + print(f"Received intervention: {intervention['type']}") + run.apply_intervention(intervention) + if intervention["type"] == "adjust_config": + for key, value in intervention.get("config_delta", {}).items(): + if key == "lr": + model.lr = value + print(f"Updated learning rate to {value}") + + print(f"Epoch {epoch+1}/{config['epochs']} - " + f"train_loss: {train_loss:.4f}, train_acc: {train_acc:.4f}, " + f"val_loss: {val_loss:.4f}, val_acc: {val_acc:.4f}") + + +def main(): + # Get config from environment or use defaults + config = { + "epochs": int(os.environ.get("P95_CONFIG_EPOCHS", "50")), + "lr": float(os.environ.get("P95_CONFIG_LR", "0.001")), + "batch_size": int(os.environ.get("P95_CONFIG_BATCH_SIZE", "32")), + "n_hidden": int(os.environ.get("P95_CONFIG_N_HIDDEN", "64")), + "n_samples": int(os.environ.get("P95_CONFIG_N_SAMPLES", "1000")), + "n_features": int(os.environ.get("P95_CONFIG_N_FEATURES", "20")), + "n_classes": int(os.environ.get("P95_CONFIG_N_CLASSES", "3")), + "dropout": float(os.environ.get("P95_CONFIG_DROPOUT", "0.2")), + } + + # Determine project - use env var if set (for remote mode) + project = os.environ.get("P95_PROJECT", "mlp-training") + + print(f"Training MLP classifier") + print(f"Config: {config}") + print(f"Backend: {'PyTorch' if HAS_TORCH else 'NumPy'}") + + with p95.Run(project=project, config=config) as run: + print(f"Run ID: {run.id}") + + if HAS_TORCH: + train_torch(run, config) + else: + train_numpy(run, config) + + print("Training complete!") + + +if __name__ == "__main__": + main() diff --git a/sdk/python/pyproject.toml b/sdk/python/pyproject.toml index b348f25..b055275 100644 --- a/sdk/python/pyproject.toml +++ b/sdk/python/pyproject.toml @@ -51,6 +51,7 @@ Repository = "https://github.com/numerataz/p95" [project.scripts] pnf = "p95.cli:main" +p95 = "p95.cli:main" [tool.setuptools] package-dir = {"" = "src"} diff --git a/sdk/python/src/p95/client.py b/sdk/python/src/p95/client.py index a7bbae8..08da812 100644 --- a/sdk/python/src/p95/client.py +++ b/sdk/python/src/p95/client.py @@ -259,3 +259,19 @@ def ack_intervention(self, intervention_id: str) -> Dict[str, Any]: Updated intervention dictionary """ return self._request("POST", f"/interventions/{intervention_id}/apply") + + def link_run_to_job(self, job_id: str, run_id: str) -> Dict[str, Any]: + """ + Link a run to a job. + + Called automatically when a run is created within a job context + (when P95_JOB_ID environment variable is set). + + Args: + job_id: The job ID + run_id: The run ID + + Returns: + Updated job dictionary + """ + return self._request("POST", f"/jobs/{job_id}/link-run", data={"run_id": run_id}) diff --git a/sdk/python/src/p95/run.py b/sdk/python/src/p95/run.py index b2df0d9..d907b37 100644 --- a/sdk/python/src/p95/run.py +++ b/sdk/python/src/p95/run.py @@ -3,6 +3,7 @@ from __future__ import annotations import atexit +import os import signal import sys import threading @@ -214,6 +215,15 @@ def _init_remote_mode(self) -> None: system_info=self._system_info, ) + # Auto-link to job if running within a job context + job_id = os.environ.get("P95_JOB_ID") + if job_id: + try: + self._remote_client.link_run_to_job(job_id, self._run_id) + except Exception as e: + # Log but don't fail - the run was created successfully + print(f"p95: Warning: Failed to link run to job {job_id}: {e}") + self._remote_batcher = MetricsBatcher( client=self._remote_client, run_id=self._run_id, From 6d2bc5497e756a3d27ab077ef6236ef4fe3187b9 Mon Sep 17 00:00:00 2001 From: Brodey Newman Date: Mon, 16 Mar 2026 21:51:14 -0400 Subject: [PATCH 3/3] fix: build + format --- sdk/python/examples/test_requirements.py | 7 + sdk/python/examples/train_mlp.py | 67 ++-- sdk/python/src/p95/cli.py | 1 + sdk/python/src/p95/client.py | 4 +- sdk/python/src/p95/cloud_cli.py | 274 ++++++++++++-- sdk/python/src/p95/sweep.py | 4 +- sdk/python/src/p95/worker.py | 135 +++++-- sdk/python/tests/__init__.py | 1 + sdk/python/tests/test_cloud_cli.py | 440 +++++++++++++++++++++++ sdk/python/tests/test_worker.py | 322 +++++++++++++++++ 10 files changed, 1175 insertions(+), 80 deletions(-) create mode 100644 sdk/python/examples/test_requirements.py create mode 100644 sdk/python/tests/__init__.py create mode 100644 sdk/python/tests/test_cloud_cli.py create mode 100644 sdk/python/tests/test_worker.py diff --git a/sdk/python/examples/test_requirements.py b/sdk/python/examples/test_requirements.py new file mode 100644 index 0000000..8c91f83 --- /dev/null +++ b/sdk/python/examples/test_requirements.py @@ -0,0 +1,7 @@ +"""Test script that requires cowsay package.""" + +import cowsay + +print("Testing requirements installation...") +cowsay.cow("p95 requirements work!") +print("SUCCESS: cowsay package was installed and imported correctly!") diff --git a/sdk/python/examples/train_mlp.py b/sdk/python/examples/train_mlp.py index 610bcb6..b5880dc 100644 --- a/sdk/python/examples/train_mlp.py +++ b/sdk/python/examples/train_mlp.py @@ -16,15 +16,13 @@ """ import os -import time -import math -import random # Check if torch is available, use numpy fallback if not try: import torch import torch.nn as nn import torch.optim as optim + HAS_TORCH = True except ImportError: HAS_TORCH = False @@ -58,8 +56,10 @@ def make_synthetic_data(n_samples=1000, n_features=20, n_classes=3, seed=42): if HAS_TORCH: + class MLP(nn.Module): """Simple MLP classifier.""" + def __init__(self, n_features, n_hidden, n_classes, dropout=0.2): super().__init__() self.net = nn.Sequential( @@ -78,14 +78,21 @@ def forward(self, x): class NumpyMLP: """Simple numpy-based MLP for when PyTorch isn't available.""" + def __init__(self, n_features, n_hidden, n_classes, lr=0.01): self.lr = lr # Xavier initialization - self.W1 = np.random.randn(n_features, n_hidden).astype(np.float32) * np.sqrt(2.0 / n_features) + self.W1 = np.random.randn(n_features, n_hidden).astype(np.float32) * np.sqrt( + 2.0 / n_features + ) self.b1 = np.zeros(n_hidden, dtype=np.float32) - self.W2 = np.random.randn(n_hidden, n_hidden).astype(np.float32) * np.sqrt(2.0 / n_hidden) + self.W2 = np.random.randn(n_hidden, n_hidden).astype(np.float32) * np.sqrt( + 2.0 / n_hidden + ) self.b2 = np.zeros(n_hidden, dtype=np.float32) - self.W3 = np.random.randn(n_hidden, n_classes).astype(np.float32) * np.sqrt(2.0 / n_hidden) + self.W3 = np.random.randn(n_hidden, n_classes).astype(np.float32) * np.sqrt( + 2.0 / n_hidden + ) self.b3 = np.zeros(n_classes, dtype=np.float32) def relu(self, x): @@ -212,12 +219,15 @@ def train_torch(run, config): val_acc = val_predicted.eq(y_val).sum().item() / len(y_val) # Log metrics - run.log_metrics({ - "train/loss": train_loss, - "train/accuracy": train_acc, - "val/loss": val_loss, - "val/accuracy": val_acc, - }, step=epoch) + run.log_metrics( + { + "train/loss": train_loss, + "train/accuracy": train_acc, + "val/loss": val_loss, + "val/accuracy": val_acc, + }, + step=epoch, + ) # Check for interventions intervention = run.check_intervention() @@ -232,9 +242,11 @@ def train_torch(run, config): param_group["lr"] = value print(f"Updated learning rate to {value}") - print(f"Epoch {epoch+1}/{config['epochs']} - " - f"train_loss: {train_loss:.4f}, train_acc: {train_acc:.4f}, " - f"val_loss: {val_loss:.4f}, val_acc: {val_acc:.4f}") + print( + f"Epoch {epoch + 1}/{config['epochs']} - " + f"train_loss: {train_loss:.4f}, train_acc: {train_acc:.4f}, " + f"val_loss: {val_loss:.4f}, val_acc: {val_acc:.4f}" + ) def train_numpy(run, config): @@ -294,12 +306,15 @@ def train_numpy(run, config): val_acc = model.accuracy(val_probs, y_val) # Log metrics - run.log_metrics({ - "train/loss": train_loss, - "train/accuracy": train_acc, - "val/loss": val_loss, - "val/accuracy": val_acc, - }, step=epoch) + run.log_metrics( + { + "train/loss": train_loss, + "train/accuracy": train_acc, + "val/loss": val_loss, + "val/accuracy": val_acc, + }, + step=epoch, + ) # Check for interventions intervention = run.check_intervention() @@ -312,9 +327,11 @@ def train_numpy(run, config): model.lr = value print(f"Updated learning rate to {value}") - print(f"Epoch {epoch+1}/{config['epochs']} - " - f"train_loss: {train_loss:.4f}, train_acc: {train_acc:.4f}, " - f"val_loss: {val_loss:.4f}, val_acc: {val_acc:.4f}") + print( + f"Epoch {epoch + 1}/{config['epochs']} - " + f"train_loss: {train_loss:.4f}, train_acc: {train_acc:.4f}, " + f"val_loss: {val_loss:.4f}, val_acc: {val_acc:.4f}" + ) def main(): @@ -333,7 +350,7 @@ def main(): # Determine project - use env var if set (for remote mode) project = os.environ.get("P95_PROJECT", "mlp-training") - print(f"Training MLP classifier") + print("Training MLP classifier") print(f"Config: {config}") print(f"Backend: {'PyTorch' if HAS_TORCH else 'NumPy'}") diff --git a/sdk/python/src/p95/cli.py b/sdk/python/src/p95/cli.py index 892b553..c9afbc7 100644 --- a/sdk/python/src/p95/cli.py +++ b/sdk/python/src/p95/cli.py @@ -79,6 +79,7 @@ def main() -> None: # Check for cloud commands first if _is_cloud_command(): from p95.cloud_cli import main_cloud + main_cloud() return diff --git a/sdk/python/src/p95/client.py b/sdk/python/src/p95/client.py index 08da812..8b1c33f 100644 --- a/sdk/python/src/p95/client.py +++ b/sdk/python/src/p95/client.py @@ -274,4 +274,6 @@ def link_run_to_job(self, job_id: str, run_id: str) -> Dict[str, Any]: Returns: Updated job dictionary """ - return self._request("POST", f"/jobs/{job_id}/link-run", data={"run_id": run_id}) + return self._request( + "POST", f"/jobs/{job_id}/link-run", data={"run_id": run_id} + ) diff --git a/sdk/python/src/p95/cloud_cli.py b/sdk/python/src/p95/cloud_cli.py index 793c966..af63304 100644 --- a/sdk/python/src/p95/cloud_cli.py +++ b/sdk/python/src/p95/cloud_cli.py @@ -10,11 +10,13 @@ import argparse import json import os +import subprocess import sys -from typing import Any, Dict, List, Optional +import tempfile +from typing import Any, Dict, Optional from p95.client import P95Client -from p95.config import SDKConfig, get_config +from p95.config import get_config def _get_client() -> P95Client: @@ -55,10 +57,126 @@ def _parse_project(project: str) -> tuple[str, str]: return parts[0], parts[1] +# =========================================== +# Local Execution Helpers +# =========================================== + + +def _install_requirements(requirements: str, env: dict) -> tuple[bool, str]: + """Install Python requirements using uv (fast) or pip (fallback). + + Returns: + Tuple of (success, log_output) + """ + # Parse requirements: "torch,transformers>=4.0" -> ["torch", "transformers>=4.0"] + reqs = [r.strip() for r in requirements.split(",") if r.strip()] + if not reqs: + return True, "" + + print(f"Installing requirements: {reqs}", file=sys.stderr) + + # Try uv first (much faster), fall back to pip + for installer in ["uv pip install", f"{sys.executable} -m pip install"]: + try: + cmd = installer.split() + reqs + result = subprocess.run( + cmd, + env=env, + capture_output=True, + text=True, + timeout=300, # 5 minute timeout for installs + ) + output = result.stdout + result.stderr + if result.returncode == 0: + print( + f"Requirements installed with {installer.split()[0]}", + file=sys.stderr, + ) + return True, output + except FileNotFoundError: + continue # Try next installer + except subprocess.TimeoutExpired: + return False, "Requirement installation timed out after 5 minutes" + except Exception: + continue + + return False, "Failed to install requirements with both uv and pip" + + +def _execute_job_locally( + job_id: str, + project: str, + script: Optional[str], + command: Optional[str], + config: Dict[str, Any], + python_requirements: Optional[str], +) -> tuple[int, str]: + """Execute a job locally and return (exit_code, logs).""" + logs = [] + + # Build environment + env = os.environ.copy() + env["P95_JOB_ID"] = job_id + env["P95_PROJECT"] = project + + # Add config as environment variables + for key, value in (config or {}).items(): + env[f"P95_CONFIG_{key.upper()}"] = str(value) + + # Install requirements if specified + if python_requirements: + success, install_logs = _install_requirements(python_requirements, env) + logs.append(f"=== Installing requirements ===\n{install_logs}") + if not success: + logs.append(f"\nFailed to install requirements: {python_requirements}") + return 1, "\n".join(logs) + + try: + if command: + # Run command directly + logs.append(f"=== Running command ===\n$ {command}\n") + result = subprocess.run( + command, + shell=True, + env=env, + capture_output=True, + text=True, + ) + logs.append(result.stdout) + if result.stderr: + logs.append(f"\n=== stderr ===\n{result.stderr}") + return result.returncode, "\n".join(logs) + elif script: + # Write script to temp file and run + with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as f: + f.write(script) + script_path = f.name + + logs.append("=== Running script ===\n") + try: + result = subprocess.run( + [sys.executable, script_path], + env=env, + capture_output=True, + text=True, + ) + logs.append(result.stdout) + if result.stderr: + logs.append(f"\n=== stderr ===\n{result.stderr}") + return result.returncode, "\n".join(logs) + finally: + os.unlink(script_path) + else: + return 1, "Job has no command or script" + except Exception as e: + return 1, f"Job execution failed: {e}" + + # =========================================== # Runs Commands # =========================================== + def runs_list(args: argparse.Namespace) -> None: """List runs in a project.""" client = _get_client() @@ -163,6 +281,7 @@ def runs_interventions(args: argparse.Namespace) -> None: # Jobs Commands # =========================================== + def jobs_create(args: argparse.Namespace) -> None: """Create a new job.""" client = _get_client() @@ -172,23 +291,30 @@ def jobs_create(args: argparse.Namespace) -> None: "type": args.type or "training", } + script_content: Optional[str] = None if args.script: # Read script content from file try: with open(args.script, "r") as f: - data["script"] = f.read() + script_content = f.read() + data["script"] = script_content except FileNotFoundError: _error(f"Script file not found: {args.script}", args.json) if args.command: data["command"] = args.command + config: Dict[str, Any] = {} if args.config: try: - data["config"] = json.loads(args.config) + config = json.loads(args.config) + data["config"] = config except json.JSONDecodeError as e: _error(f"Invalid JSON in --config: {e}", args.json) + if args.requirements: + data["python_requirements"] = args.requirements + if args.rationale: data["ai_rationale"] = args.rationale data["created_by"] = "ai:claude" @@ -196,12 +322,66 @@ def jobs_create(args: argparse.Namespace) -> None: if args.priority is not None: data["priority"] = args.priority + # If --now flag is set, tell the API to create with status=running + # so workers don't try to claim it + if getattr(args, "now", False): + data["run_locally"] = True + try: + # Create the job response = client._request( "POST", f"/teams/{team_slug}/apps/{app_slug}/jobs", data=data, ) + job_id = response["id"] + + # If --now flag is set, execute the job immediately + if getattr(args, "now", False): + print(f"Executing job {job_id} locally...", file=sys.stderr) + + # Execute locally (no need to claim - we'll update status directly) + exit_code, logs = _execute_job_locally( + job_id=job_id, + project=args.project, + script=script_content, + command=args.command, + config=config, + python_requirements=args.requirements, + ) + + # Print logs in real-time feel + if logs: + print(logs, file=sys.stderr) + + # Report completion using the local endpoint (no worker required) + try: + # Truncate logs if too large (max 1MB) + max_log_size = 1024 * 1024 + if len(logs) > max_log_size: + logs = logs[:max_log_size] + "\n... [truncated]" + + client._request( + "POST", + f"/jobs/{job_id}/complete-local", + data={ + "exit_code": exit_code, + "logs": logs, + }, + ) + except Exception as e: + print(f"Warning: Could not report completion: {e}", file=sys.stderr) + + # Fetch updated job with run_id + try: + response = client._request("GET", f"/jobs/{job_id}") + except Exception: + pass + + # Add execution info to response + response["_executed_locally"] = True + response["_exit_code"] = exit_code + _output(response, args.json) except Exception as e: _error(str(e), args.json) @@ -253,6 +433,7 @@ def jobs_cancel(args: argparse.Namespace) -> None: # Workers Commands # =========================================== + def workers_list(args: argparse.Namespace) -> None: """List workers in a project.""" client = _get_client() @@ -294,14 +475,19 @@ def workers_start(args: argparse.Namespace) -> None: # Main CLI Entry Point # =========================================== + def create_parser() -> argparse.ArgumentParser: """Create the argument parser for cloud CLI commands.""" parser = argparse.ArgumentParser( prog="p95", description="p95 ML experiment tracking CLI", ) - parser.add_argument("--json", action="store_true", default=True, - help="Output in JSON format (default)") + parser.add_argument( + "--json", + action="store_true", + default=True, + help="Output in JSON format (default)", + ) subparsers = parser.add_subparsers(dest="command", help="Available commands") @@ -313,8 +499,9 @@ def create_parser() -> argparse.ArgumentParser: # runs list runs_list_parser = runs_sub.add_parser("list", help="List runs") - runs_list_parser.add_argument("--project", "-p", required=True, - help="Project in format 'team/app'") + runs_list_parser.add_argument( + "--project", "-p", required=True, help="Project in format 'team/app'" + ) runs_list_parser.add_argument("--status", "-s", help="Filter by status") runs_list_parser.add_argument("--limit", type=int, default=20) runs_list_parser.add_argument("--offset", type=int, default=0) @@ -330,26 +517,39 @@ def create_parser() -> argparse.ArgumentParser: # runs metrics runs_metrics_parser = runs_sub.add_parser("metrics", help="Get run metrics") runs_metrics_parser.add_argument("run_id", help="Run ID") - runs_metrics_parser.add_argument("metric_name", nargs="?", help="Specific metric name") - runs_metrics_parser.add_argument("--since-step", type=int, help="Get metrics since step") + runs_metrics_parser.add_argument( + "metric_name", nargs="?", help="Specific metric name" + ) + runs_metrics_parser.add_argument( + "--since-step", type=int, help="Get metrics since step" + ) runs_metrics_parser.add_argument("--json", action="store_true", default=True) runs_metrics_parser.set_defaults(func=runs_metrics) # runs intervene - runs_intervene_parser = runs_sub.add_parser("intervene", help="Create an intervention") + runs_intervene_parser = runs_sub.add_parser( + "intervene", help="Create an intervention" + ) runs_intervene_parser.add_argument("run_id", help="Run ID") - runs_intervene_parser.add_argument("--action", "-a", required=True, - choices=["adjust_config", "early_stop", "pause", "resume"], - help="Intervention action") + runs_intervene_parser.add_argument( + "--action", + "-a", + required=True, + choices=["adjust_config", "early_stop", "pause", "resume"], + help="Intervention action", + ) runs_intervene_parser.add_argument("--config", "-c", help="Config changes as JSON") - runs_intervene_parser.add_argument("--rationale", "-r", required=True, - help="Explanation for the intervention") + runs_intervene_parser.add_argument( + "--rationale", "-r", required=True, help="Explanation for the intervention" + ) runs_intervene_parser.add_argument("--step", type=int, help="Current step") runs_intervene_parser.add_argument("--json", action="store_true", default=True) runs_intervene_parser.set_defaults(func=runs_intervene) # runs interventions - runs_interventions_parser = runs_sub.add_parser("interventions", help="List interventions") + runs_interventions_parser = runs_sub.add_parser( + "interventions", help="List interventions" + ) runs_interventions_parser.add_argument("run_id", help="Run ID") runs_interventions_parser.add_argument("--limit", type=int, default=50) runs_interventions_parser.add_argument("--json", action="store_true", default=True) @@ -363,16 +563,29 @@ def create_parser() -> argparse.ArgumentParser: # jobs create jobs_create_parser = jobs_sub.add_parser("create", help="Create a job") - jobs_create_parser.add_argument("--project", "-p", required=True, - help="Project in format 'team/app'") + jobs_create_parser.add_argument( + "--project", "-p", required=True, help="Project in format 'team/app'" + ) jobs_create_parser.add_argument("--script", help="Path to Python script") jobs_create_parser.add_argument("--command", help="Command to execute") jobs_create_parser.add_argument("--config", "-c", help="Config as JSON") - jobs_create_parser.add_argument("--type", "-t", default="training", - choices=["training", "sweep_trial", "evaluation", "custom"]) - jobs_create_parser.add_argument("--rationale", "-r", - help="AI rationale for creating this job") + jobs_create_parser.add_argument( + "--requirements", + help="Python packages to install (e.g., 'torch,transformers>=4.0')", + ) + jobs_create_parser.add_argument( + "--type", + "-t", + default="training", + choices=["training", "sweep_trial", "evaluation", "custom"], + ) + jobs_create_parser.add_argument( + "--rationale", "-r", help="AI rationale for creating this job" + ) jobs_create_parser.add_argument("--priority", type=int, default=0) + jobs_create_parser.add_argument( + "--now", action="store_true", help="Execute the job immediately (locally)" + ) jobs_create_parser.add_argument("--json", action="store_true", default=True) jobs_create_parser.set_defaults(func=jobs_create) @@ -384,8 +597,9 @@ def create_parser() -> argparse.ArgumentParser: # jobs list jobs_list_parser = jobs_sub.add_parser("list", help="List jobs") - jobs_list_parser.add_argument("--project", "-p", required=True, - help="Project in format 'team/app'") + jobs_list_parser.add_argument( + "--project", "-p", required=True, help="Project in format 'team/app'" + ) jobs_list_parser.add_argument("--status", "-s", help="Filter by status") jobs_list_parser.add_argument("--limit", type=int, default=20) jobs_list_parser.add_argument("--json", action="store_true", default=True) @@ -405,8 +619,9 @@ def create_parser() -> argparse.ArgumentParser: # workers list workers_list_parser = workers_sub.add_parser("list", help="List workers") - workers_list_parser.add_argument("--project", "-p", required=True, - help="Project in format 'team/app'") + workers_list_parser.add_argument( + "--project", "-p", required=True, help="Project in format 'team/app'" + ) workers_list_parser.add_argument("--status", "-s", help="Filter by status") workers_list_parser.add_argument("--limit", type=int, default=50) workers_list_parser.add_argument("--json", action="store_true", default=True) @@ -419,8 +634,9 @@ def create_parser() -> argparse.ArgumentParser: worker_sub = worker_parser.add_subparsers(dest="worker_command") worker_start_parser = worker_sub.add_parser("start", help="Start a worker") - worker_start_parser.add_argument("--project", "-p", required=True, - help="Project in format 'team/app'") + worker_start_parser.add_argument( + "--project", "-p", required=True, help="Project in format 'team/app'" + ) worker_start_parser.add_argument("--tags", help="Comma-separated worker tags") worker_start_parser.add_argument("--json", action="store_true", default=True) worker_start_parser.set_defaults(func=workers_start) diff --git a/sdk/python/src/p95/sweep.py b/sdk/python/src/p95/sweep.py index 51d05af..8844b92 100644 --- a/sdk/python/src/p95/sweep.py +++ b/sdk/python/src/p95/sweep.py @@ -387,7 +387,7 @@ def _run_remote_agent( sweep_data=sweep, is_local=False, ) - token = _set_sweep_context(ctx) + _token = _set_sweep_context(ctx) # noqa: F841 try: # Call the training function @@ -478,7 +478,7 @@ def _run_local_agent( project=project, is_local=True, ) - token = _set_sweep_context(ctx) + _token = _set_sweep_context(ctx) # noqa: F841 try: # Call the training function - it will create its own Run diff --git a/sdk/python/src/p95/worker.py b/sdk/python/src/p95/worker.py index 8abe7e9..c3a2510 100644 --- a/sdk/python/src/p95/worker.py +++ b/sdk/python/src/p95/worker.py @@ -24,7 +24,7 @@ from typing import Any, Dict, List, Optional from p95.client import P95Client -from p95.config import SDKConfig, get_config +from p95.config import get_config from p95.exceptions import APIError @@ -34,6 +34,7 @@ @dataclass class WorkerCapabilities: """Worker hardware capabilities.""" + gpu_count: int = 0 gpu_memory_gb: float = 0.0 gpu_model: str = "" @@ -45,6 +46,7 @@ class WorkerCapabilities: @dataclass class Job: """Represents a job to be executed.""" + id: str type: str status: str @@ -52,6 +54,7 @@ class Job: command: Optional[str] = None config: Dict[str, Any] = field(default_factory=dict) environment: Dict[str, str] = field(default_factory=dict) + python_requirements: Optional[str] = None # e.g., "torch,transformers>=4.0" ai_rationale: Optional[str] = None @@ -120,17 +123,17 @@ def _detect_capabilities(self) -> WorkerCapabilities: try: if sys.platform == "darwin": import subprocess + result = subprocess.run( - ["sysctl", "-n", "hw.memsize"], - capture_output=True, text=True + ["sysctl", "-n", "hw.memsize"], capture_output=True, text=True ) - caps.memory_gb = int(result.stdout.strip()) / (1024 ** 3) + caps.memory_gb = int(result.stdout.strip()) / (1024**3) elif sys.platform == "linux": with open("/proc/meminfo") as f: for line in f: if line.startswith("MemTotal:"): # Value is in kB - caps.memory_gb = int(line.split()[1]) / (1024 ** 2) + caps.memory_gb = int(line.split()[1]) / (1024**2) break except Exception: pass @@ -138,8 +141,13 @@ def _detect_capabilities(self) -> WorkerCapabilities: # GPU detection (NVIDIA only for now) try: result = subprocess.run( - ["nvidia-smi", "--query-gpu=count,memory.total,name", "--format=csv,noheader,nounits"], - capture_output=True, text=True + [ + "nvidia-smi", + "--query-gpu=count,memory.total,name", + "--format=csv,noheader,nounits", + ], + capture_output=True, + text=True, ) if result.returncode == 0: lines = result.stdout.strip().split("\n") @@ -246,6 +254,7 @@ def _claim_job(self) -> Optional[Job]: command=response.get("command"), config=response.get("config", {}), environment=response.get("environment", {}), + python_requirements=response.get("python_requirements"), ai_rationale=response.get("ai_rationale"), ) except APIError as e: @@ -255,9 +264,54 @@ def _claim_job(self) -> Optional[Job]: logger.error(f"Failed to claim job: {e}") return None - def _execute_job(self, job: Job) -> int: - """Execute a job and return exit code.""" + def _install_requirements(self, requirements: str, env: dict) -> tuple[bool, str]: + """Install Python requirements using uv (fast) or pip (fallback). + + Returns: + Tuple of (success, log_output) + """ + # Parse requirements: "torch,transformers>=4.0" -> ["torch", "transformers>=4.0"] + reqs = [r.strip() for r in requirements.split(",") if r.strip()] + if not reqs: + return True, "" + + logger.info(f"Installing requirements: {reqs}") + + # Try uv first (much faster), fall back to pip + for installer in ["uv pip install", f"{sys.executable} -m pip install"]: + try: + cmd = installer.split() + reqs + result = subprocess.run( + cmd, + env=env, + capture_output=True, + text=True, + timeout=300, # 5 minute timeout for installs + ) + output = result.stdout + result.stderr + if result.returncode == 0: + logger.info( + f"Requirements installed successfully with {installer.split()[0]}" + ) + return True, output + else: + logger.warning( + f"Install failed with {installer.split()[0]}: {result.stderr}" + ) + except FileNotFoundError: + continue # Try next installer + except subprocess.TimeoutExpired: + return False, "Requirement installation timed out after 5 minutes" + except Exception as e: + logger.warning(f"Install error with {installer.split()[0]}: {e}") + continue + + return False, "Failed to install requirements with both uv and pip" + + def _execute_job(self, job: Job) -> tuple[int, str]: + """Execute a job and return (exit_code, logs).""" logger.info(f"Executing job {job.id} (type: {job.type})") + logs = [] if job.ai_rationale: logger.info(f"AI Rationale: {job.ai_rationale}") @@ -282,49 +336,81 @@ def _execute_job(self, job: Job) -> int: for key, value in (job.config or {}).items(): env[f"P95_CONFIG_{key.upper()}"] = str(value) + # Install requirements if specified + if job.python_requirements: + success, install_logs = self._install_requirements( + job.python_requirements, env + ) + logs.append(f"=== Installing requirements ===\n{install_logs}") + if not success: + logs.append( + f"\nFailed to install requirements: {job.python_requirements}" + ) + return 1, "\n".join(logs) + try: if job.command: # Run command directly + logs.append(f"=== Running command ===\n$ {job.command}\n") result = subprocess.run( job.command, shell=True, env=env, - capture_output=False, + capture_output=True, + text=True, ) - return result.returncode + logs.append(result.stdout) + if result.stderr: + logs.append(f"\n=== stderr ===\n{result.stderr}") + return result.returncode, "\n".join(logs) elif job.script: # Write script to temp file and run import tempfile + with tempfile.NamedTemporaryFile( mode="w", suffix=".py", delete=False ) as f: f.write(job.script) script_path = f.name + logs.append("=== Running script ===\n") try: result = subprocess.run( [sys.executable, script_path], env=env, - capture_output=False, + capture_output=True, + text=True, ) - return result.returncode + logs.append(result.stdout) + if result.stderr: + logs.append(f"\n=== stderr ===\n{result.stderr}") + return result.returncode, "\n".join(logs) finally: os.unlink(script_path) else: logger.error("Job has no command or script") - return 1 + return 1, "Job has no command or script" except Exception as e: logger.error(f"Job execution failed: {e}") - return 1 + return 1, f"Job execution failed: {e}" + + def _report_completion(self, job: Job, exit_code: int, logs: str = "") -> None: + """Report job completion or failure with logs.""" + # Truncate logs if too large (max 1MB) + max_log_size = 1024 * 1024 + if len(logs) > max_log_size: + logs = logs[:max_log_size] + "\n... [truncated]" - def _report_completion(self, job: Job, exit_code: int) -> None: - """Report job completion or failure.""" try: if exit_code == 0: self._client._request( "POST", f"/jobs/{job.id}/complete", - data={"worker_id": self.worker_id, "exit_code": exit_code}, + data={ + "worker_id": self.worker_id, + "exit_code": exit_code, + "logs": logs, + }, ) logger.info(f"Job {job.id} completed successfully") else: @@ -334,6 +420,7 @@ def _report_completion(self, job: Job, exit_code: int) -> None: data={ "worker_id": self.worker_id, "error_message": f"Exit code: {exit_code}", + "logs": logs, }, ) logger.info(f"Job {job.id} failed with exit code {exit_code}") @@ -373,9 +460,11 @@ def handle_signal(signum, frame): logger.info(f"Starting worker {self.worker_id}") logger.info(f"Project: {self.project}") logger.info(f"Tags: {self.tags}") - logger.info(f"Capabilities: GPU={self.capabilities.gpu_count}, " - f"CPU={self.capabilities.cpu_count}, " - f"Memory={self.capabilities.memory_gb:.1f}GB") + logger.info( + f"Capabilities: GPU={self.capabilities.gpu_count}, " + f"CPU={self.capabilities.cpu_count}, " + f"Memory={self.capabilities.memory_gb:.1f}GB" + ) # Register self._register() @@ -397,8 +486,8 @@ def handle_signal(signum, frame): job = self._claim_job() if job: self._current_job = job - exit_code = self._execute_job(job) - self._report_completion(job, exit_code) + exit_code, logs = self._execute_job(job) + self._report_completion(job, exit_code, logs) self._current_job = None last_poll = now diff --git a/sdk/python/tests/__init__.py b/sdk/python/tests/__init__.py new file mode 100644 index 0000000..9e01808 --- /dev/null +++ b/sdk/python/tests/__init__.py @@ -0,0 +1 @@ +# p95 SDK tests diff --git a/sdk/python/tests/test_cloud_cli.py b/sdk/python/tests/test_cloud_cli.py new file mode 100644 index 0000000..fd3ffad --- /dev/null +++ b/sdk/python/tests/test_cloud_cli.py @@ -0,0 +1,440 @@ +"""Tests for p95 cloud CLI commands.""" + +import json +import os + +import pytest + +from p95.cloud_cli import ( + _execute_job_locally, + _install_requirements, + _parse_project, + create_parser, +) + + +class TestParseProject: + """Tests for project string parsing.""" + + def test_valid_project(self): + """Test parsing a valid project string.""" + team, app = _parse_project("my-team/my-app") + assert team == "my-team" + assert app == "my-app" + + def test_project_with_hyphens(self): + """Test parsing a project with hyphens.""" + team, app = _parse_project("team-name-123/app-name-456") + assert team == "team-name-123" + assert app == "app-name-456" + + def test_invalid_project_no_slash(self): + """Test that invalid project format raises error.""" + with pytest.raises(SystemExit): + _parse_project("invalid-format") + + def test_invalid_project_too_many_slashes(self): + """Test that too many slashes raises error.""" + with pytest.raises(SystemExit): + _parse_project("a/b/c") + + +class TestCreateParser: + """Tests for CLI argument parser.""" + + def test_parser_creation(self): + """Test that parser is created successfully.""" + parser = create_parser() + assert parser is not None + assert parser.prog == "p95" + + def test_jobs_create_args(self): + """Test jobs create command arguments.""" + parser = create_parser() + args = parser.parse_args( + [ + "jobs", + "create", + "--project", + "team/app", + "--script", + "train.py", + "--requirements", + "numpy,torch", + "--config", + '{"lr": 0.001}', + "--rationale", + "Testing something", + "--now", + ] + ) + + assert args.project == "team/app" + assert args.script == "train.py" + assert args.requirements == "numpy,torch" + assert args.config == '{"lr": 0.001}' + assert args.rationale == "Testing something" + assert args.now is True + + def test_jobs_create_without_now(self): + """Test jobs create without --now flag.""" + parser = create_parser() + args = parser.parse_args( + [ + "jobs", + "create", + "--project", + "team/app", + "--script", + "train.py", + ] + ) + + assert args.now is False + + def test_jobs_list_args(self): + """Test jobs list command arguments.""" + parser = create_parser() + args = parser.parse_args( + [ + "jobs", + "list", + "--project", + "team/app", + "--status", + "pending", + "--limit", + "50", + ] + ) + + assert args.project == "team/app" + assert args.status == "pending" + assert args.limit == 50 + + def test_jobs_get_args(self): + """Test jobs get command arguments.""" + parser = create_parser() + args = parser.parse_args( + [ + "jobs", + "get", + "abc123-job-id", + ] + ) + + assert args.job_id == "abc123-job-id" + + def test_worker_start_args(self): + """Test worker start command arguments.""" + parser = create_parser() + args = parser.parse_args( + [ + "worker", + "start", + "--project", + "team/app", + "--tags", + "gpu,a100", + ] + ) + + assert args.project == "team/app" + assert args.tags == "gpu,a100" + + +class TestInstallRequirements: + """Tests for requirements installation.""" + + def test_empty_requirements(self): + """Test with empty requirements string.""" + success, logs = _install_requirements("", os.environ.copy()) + assert success is True + assert logs == "" + + def test_whitespace_requirements(self): + """Test with whitespace-only requirements.""" + success, logs = _install_requirements(" , , ", os.environ.copy()) + assert success is True + assert logs == "" + + def test_parse_requirements(self): + """Test requirements parsing.""" + # This tests the parsing logic without actually installing + requirements = "numpy,torch>=2.0,transformers" + reqs = [r.strip() for r in requirements.split(",") if r.strip()] + + assert len(reqs) == 3 + assert reqs[0] == "numpy" + assert reqs[1] == "torch>=2.0" + assert reqs[2] == "transformers" + + +class TestExecuteJobLocally: + """Tests for local job execution.""" + + def test_execute_script(self): + """Test executing a simple script.""" + script = "print('Hello, World!')" + exit_code, logs = _execute_job_locally( + job_id="test-123", + project="team/app", + script=script, + command=None, + config={}, + python_requirements=None, + ) + + assert exit_code == 0 + assert "Hello, World!" in logs + + def test_execute_script_with_config(self): + """Test executing a script with config environment variables.""" + script = """ +import os +print(f"EPOCHS={os.environ.get('P95_CONFIG_EPOCHS', 'not set')}") +print(f"LR={os.environ.get('P95_CONFIG_LR', 'not set')}") +""" + exit_code, logs = _execute_job_locally( + job_id="test-123", + project="team/app", + script=script, + command=None, + config={"epochs": 10, "lr": 0.001}, + python_requirements=None, + ) + + assert exit_code == 0 + assert "EPOCHS=10" in logs + assert "LR=0.001" in logs + + def test_execute_script_failure(self): + """Test executing a script that fails.""" + script = "raise Exception('Test error')" + exit_code, logs = _execute_job_locally( + job_id="test-123", + project="team/app", + script=script, + command=None, + config={}, + python_requirements=None, + ) + + assert exit_code != 0 + assert "Test error" in logs or "Exception" in logs + + def test_execute_command(self): + """Test executing a command.""" + exit_code, logs = _execute_job_locally( + job_id="test-123", + project="team/app", + script=None, + command="echo 'Command executed'", + config={}, + python_requirements=None, + ) + + assert exit_code == 0 + assert "Command executed" in logs + + def test_execute_no_script_or_command(self): + """Test that missing script and command returns error.""" + exit_code, logs = _execute_job_locally( + job_id="test-123", + project="team/app", + script=None, + command=None, + config={}, + python_requirements=None, + ) + + assert exit_code == 1 + assert "no command or script" in logs.lower() + + def test_job_id_in_environment(self): + """Test that job ID is available in environment.""" + script = """ +import os +print(f"JOB_ID={os.environ.get('P95_JOB_ID', 'not set')}") +""" + exit_code, logs = _execute_job_locally( + job_id="test-job-12345", + project="team/app", + script=script, + command=None, + config={}, + python_requirements=None, + ) + + assert exit_code == 0 + assert "JOB_ID=test-job-12345" in logs + + def test_project_in_environment(self): + """Test that project is available in environment.""" + script = """ +import os +print(f"PROJECT={os.environ.get('P95_PROJECT', 'not set')}") +""" + exit_code, logs = _execute_job_locally( + job_id="test-123", + project="my-team/my-app", + script=script, + command=None, + config={}, + python_requirements=None, + ) + + assert exit_code == 0 + assert "PROJECT=my-team/my-app" in logs + + +class TestJobCreateData: + """Tests for job create data construction.""" + + def test_run_locally_flag_in_data(self): + """Test that run_locally flag is set when --now is used.""" + parser = create_parser() + args = parser.parse_args( + [ + "jobs", + "create", + "--project", + "team/app", + "--script", + "train.py", + "--now", + ] + ) + + # Simulate the data construction logic from jobs_create + data = {"type": "training"} + if getattr(args, "now", False): + data["run_locally"] = True + + assert data["run_locally"] is True + + def test_no_run_locally_without_now(self): + """Test that run_locally is not set without --now.""" + parser = create_parser() + args = parser.parse_args( + [ + "jobs", + "create", + "--project", + "team/app", + "--script", + "train.py", + ] + ) + + data = {"type": "training"} + if getattr(args, "now", False): + data["run_locally"] = True + + assert "run_locally" not in data + + +class TestWorkerTags: + """Tests for worker tag parsing.""" + + def test_single_tag(self): + """Test parsing a single tag.""" + parser = create_parser() + args = parser.parse_args( + [ + "worker", + "start", + "--project", + "team/app", + "--tags", + "gpu", + ] + ) + + tags = args.tags.split(",") if args.tags else [] + assert tags == ["gpu"] + + def test_multiple_tags(self): + """Test parsing multiple tags.""" + parser = create_parser() + args = parser.parse_args( + [ + "worker", + "start", + "--project", + "team/app", + "--tags", + "gpu,a100,high-memory", + ] + ) + + tags = args.tags.split(",") if args.tags else [] + assert tags == ["gpu", "a100", "high-memory"] + + def test_no_tags(self): + """Test worker start without tags.""" + parser = create_parser() + args = parser.parse_args( + [ + "worker", + "start", + "--project", + "team/app", + ] + ) + + tags = args.tags.split(",") if args.tags else [] + assert tags == [] + + +class TestConfigParsing: + """Tests for config JSON parsing.""" + + def test_valid_config(self): + """Test parsing valid JSON config.""" + config_str = '{"epochs": 10, "lr": 0.001, "batch_size": 32}' + config = json.loads(config_str) + + assert config["epochs"] == 10 + assert config["lr"] == 0.001 + assert config["batch_size"] == 32 + + def test_nested_config(self): + """Test parsing nested JSON config.""" + config_str = '{"model": {"hidden_size": 128}, "training": {"epochs": 10}}' + config = json.loads(config_str) + + assert config["model"]["hidden_size"] == 128 + assert config["training"]["epochs"] == 10 + + def test_invalid_config(self): + """Test that invalid JSON raises error.""" + config_str = "not valid json" + with pytest.raises(json.JSONDecodeError): + json.loads(config_str) + + +class TestLogsTruncation: + """Tests for log truncation logic.""" + + def test_short_logs_not_truncated(self): + """Test that short logs are not truncated.""" + logs = "Short log message" + max_log_size = 1024 * 1024 # 1MB + + if len(logs) > max_log_size: + logs = logs[:max_log_size] + "\n... [truncated]" + + assert logs == "Short log message" + assert "[truncated]" not in logs + + def test_long_logs_truncated(self): + """Test that long logs are truncated.""" + logs = "x" * (2 * 1024 * 1024) # 2MB + max_log_size = 1024 * 1024 # 1MB + + if len(logs) > max_log_size: + logs = logs[:max_log_size] + "\n... [truncated]" + + assert len(logs) < 2 * 1024 * 1024 + assert "[truncated]" in logs diff --git a/sdk/python/tests/test_worker.py b/sdk/python/tests/test_worker.py new file mode 100644 index 0000000..137224e --- /dev/null +++ b/sdk/python/tests/test_worker.py @@ -0,0 +1,322 @@ +"""Tests for p95 worker module.""" + +import os +import platform +from unittest import mock + +import pytest + +from p95.worker import Job, Worker, WorkerCapabilities + + +class TestWorkerCapabilities: + """Tests for WorkerCapabilities dataclass.""" + + def test_default_capabilities(self): + """Test default capability values.""" + caps = WorkerCapabilities() + + assert caps.gpu_count == 0 + assert caps.gpu_memory_gb == 0.0 + assert caps.gpu_model == "" + assert caps.cpu_count == 0 + assert caps.memory_gb == 0.0 + assert caps.disk_gb == 0.0 + + def test_custom_capabilities(self): + """Test setting custom capabilities.""" + caps = WorkerCapabilities( + gpu_count=4, + gpu_memory_gb=80.0, + gpu_model="A100", + cpu_count=64, + memory_gb=512.0, + disk_gb=2000.0, + ) + + assert caps.gpu_count == 4 + assert caps.gpu_memory_gb == 80.0 + assert caps.gpu_model == "A100" + assert caps.cpu_count == 64 + assert caps.memory_gb == 512.0 + assert caps.disk_gb == 2000.0 + + +class TestJob: + """Tests for Job dataclass.""" + + def test_job_creation(self): + """Test creating a job.""" + job = Job( + id="job-123", + type="training", + status="pending", + script="print('hello')", + ) + + assert job.id == "job-123" + assert job.type == "training" + assert job.status == "pending" + assert job.script == "print('hello')" + + def test_job_with_config(self): + """Test job with config.""" + job = Job( + id="job-123", + type="training", + status="running", + config={"epochs": 10, "lr": 0.001}, + ) + + assert job.config["epochs"] == 10 + assert job.config["lr"] == 0.001 + + def test_job_with_requirements(self): + """Test job with Python requirements.""" + job = Job( + id="job-123", + type="training", + status="pending", + python_requirements="numpy,torch>=2.0", + ) + + assert job.python_requirements == "numpy,torch>=2.0" + + def test_job_with_rationale(self): + """Test job with AI rationale.""" + job = Job( + id="job-123", + type="training", + status="pending", + ai_rationale="Reducing learning rate due to loss plateau", + ) + + assert job.ai_rationale == "Reducing learning rate due to loss plateau" + + +class TestWorkerInit: + """Tests for Worker initialization.""" + + def test_worker_id_generation(self): + """Test that worker ID is generated correctly.""" + # Worker ID should be hostname-uuid format + worker = Worker.__new__(Worker) + worker_id = worker._generate_worker_id() + + assert "-" in worker_id + hostname = platform.node() or "unknown" + assert worker_id.startswith(hostname) + + def test_worker_id_uniqueness(self): + """Test that generated worker IDs are unique.""" + worker = Worker.__new__(Worker) + id1 = worker._generate_worker_id() + id2 = worker._generate_worker_id() + + assert id1 != id2 + + def test_project_parsing(self): + """Test that project is parsed correctly.""" + # Valid format + project = "team-name/app-name" + parts = project.split("/") + + assert len(parts) == 2 + assert parts[0] == "team-name" + assert parts[1] == "app-name" + + def test_invalid_project_format(self): + """Test that invalid project format raises error.""" + with pytest.raises(ValueError) as exc: + Worker(project="invalid-format") + + assert "Invalid project format" in str(exc.value) + + +class TestCapabilityDetection: + """Tests for automatic capability detection.""" + + def test_cpu_count_detection(self): + """Test CPU count detection.""" + caps = WorkerCapabilities() + caps.cpu_count = os.cpu_count() or 1 + + assert caps.cpu_count >= 1 + + @mock.patch("platform.system", return_value="Darwin") + def test_macos_memory_detection_format(self, mock_system): + """Test that macOS memory detection uses correct command.""" + # This tests the logic path, not actual execution + assert platform.system() == "Darwin" + + +class TestJobExecution: + """Tests for job execution logic.""" + + def test_environment_setup(self): + """Test that environment variables are set correctly for job.""" + job = Job( + id="job-abc123", + type="training", + status="running", + config={"epochs": 10, "lr": 0.001}, + environment={"CUSTOM_VAR": "custom_value"}, + ) + + # Simulate environment setup + env = os.environ.copy() + env.update(job.environment or {}) + env["P95_JOB_ID"] = job.id + env["P95_PROJECT"] = "test/app" + + for key, value in (job.config or {}).items(): + env[f"P95_CONFIG_{key.upper()}"] = str(value) + + assert env["P95_JOB_ID"] == "job-abc123" + assert env["P95_PROJECT"] == "test/app" + assert env["CUSTOM_VAR"] == "custom_value" + assert env["P95_CONFIG_EPOCHS"] == "10" + assert env["P95_CONFIG_LR"] == "0.001" + + def test_config_to_env_conversion(self): + """Test that config values are converted to uppercase env vars.""" + config = { + "epochs": 10, + "learning_rate": 0.001, + "batch_size": 32, + } + + env = {} + for key, value in config.items(): + env[f"P95_CONFIG_{key.upper()}"] = str(value) + + assert env["P95_CONFIG_EPOCHS"] == "10" + assert env["P95_CONFIG_LEARNING_RATE"] == "0.001" + assert env["P95_CONFIG_BATCH_SIZE"] == "32" + + +class TestRequirementsInstallation: + """Tests for Python requirements installation.""" + + def test_parse_single_requirement(self): + """Test parsing a single requirement.""" + requirements = "numpy" + reqs = [r.strip() for r in requirements.split(",") if r.strip()] + + assert reqs == ["numpy"] + + def test_parse_multiple_requirements(self): + """Test parsing multiple requirements.""" + requirements = "numpy,torch,transformers" + reqs = [r.strip() for r in requirements.split(",") if r.strip()] + + assert reqs == ["numpy", "torch", "transformers"] + + def test_parse_versioned_requirements(self): + """Test parsing versioned requirements.""" + requirements = "torch>=2.0,transformers>=4.0,numpy==1.24" + reqs = [r.strip() for r in requirements.split(",") if r.strip()] + + assert reqs == ["torch>=2.0", "transformers>=4.0", "numpy==1.24"] + + def test_parse_with_whitespace(self): + """Test parsing requirements with extra whitespace.""" + requirements = " numpy , torch , transformers " + reqs = [r.strip() for r in requirements.split(",") if r.strip()] + + assert reqs == ["numpy", "torch", "transformers"] + + def test_parse_empty_requirements(self): + """Test parsing empty requirements.""" + requirements = "" + reqs = [r.strip() for r in requirements.split(",") if r.strip()] + + assert reqs == [] + + +class TestLogCapture: + """Tests for log capture during job execution.""" + + def test_log_format(self): + """Test that logs include expected sections.""" + logs = [] + logs.append("=== Installing requirements ===\nInstalled: numpy") + logs.append("=== Running script ===\nHello, World!") + + full_logs = "\n".join(logs) + + assert "=== Installing requirements ===" in full_logs + assert "=== Running script ===" in full_logs + + def test_stderr_capture(self): + """Test that stderr is included in logs.""" + logs = [] + logs.append("stdout output") + logs.append("\n=== stderr ===\nWarning: deprecated function") + + full_logs = "\n".join(logs) + + assert "stdout output" in full_logs + assert "=== stderr ===" in full_logs + assert "Warning:" in full_logs + + +class TestWorkerStatus: + """Tests for worker status management.""" + + def test_status_values(self): + """Test valid worker status values.""" + valid_statuses = ["online", "busy", "offline"] + + for status in valid_statuses: + assert status in ["online", "busy", "offline"] + + def test_status_when_executing(self): + """Test that status is 'busy' when executing a job.""" + # Simulate the heartbeat logic + current_job = Job(id="job-123", type="training", status="running") + status = "busy" if current_job else "online" + + assert status == "busy" + + def test_status_when_idle(self): + """Test that status is 'online' when idle.""" + current_job = None + status = "busy" if current_job else "online" + + assert status == "online" + + +class TestLogTruncation: + """Tests for log truncation to prevent oversized payloads.""" + + def test_small_logs_not_truncated(self): + """Test that small logs are not truncated.""" + logs = "Small log output" + max_log_size = 1024 * 1024 # 1MB + + if len(logs) > max_log_size: + logs = logs[:max_log_size] + "\n... [truncated]" + + assert logs == "Small log output" + + def test_large_logs_truncated(self): + """Test that large logs are truncated.""" + logs = "x" * (2 * 1024 * 1024) # 2MB + max_log_size = 1024 * 1024 # 1MB + + if len(logs) > max_log_size: + logs = logs[:max_log_size] + "\n... [truncated]" + + assert len(logs) <= max_log_size + 20 # +20 for truncation message + assert logs.endswith("[truncated]") + + def test_truncation_preserves_start(self): + """Test that truncation preserves the start of logs.""" + logs = "START_MARKER" + "x" * (2 * 1024 * 1024) + max_log_size = 1024 * 1024 + + if len(logs) > max_log_size: + logs = logs[:max_log_size] + "\n... [truncated]" + + assert logs.startswith("START_MARKER")