diff --git a/pyproject.toml b/pyproject.toml index a0d4867..45c2d6c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,7 +1,7 @@ [project] name = "q8s" -version = "0.13.0" -description = "Kernel extension for executing quantum programs in simulators on q8s clusters" +version = "0.14.0-dev1" +description = "CLI and Jupyter kernel extension for executing quantum programs in simulators on Kubernetes clusters" authors = [{ name = "Vlad Stirbu", email = "vstirbu@gmail.com" }] readme = "README.md" license = { file = "LICENSE" } @@ -28,6 +28,8 @@ dependencies = [ "typer>=0.15.4", "GitPython>=3.1.43", "python-dxf>=12.1.1", + "hydra-core>=1.2.0", + "omegaconf", ] requires-python = ">= 3.10" diff --git a/src/q8s/cli.py b/src/q8s/cli.py index 164563b..09b55a1 100644 --- a/src/q8s/cli.py +++ b/src/q8s/cli.py @@ -1,9 +1,13 @@ +import base64 import importlib +import os import sys from pathlib import Path from subprocess import Popen +import hydra import typer +from omegaconf import DictConfig, OmegaConf from rich.progress import Progress, SpinnerColumn, TextColumn, TimeElapsedColumn from typing_extensions import Annotated @@ -93,6 +97,105 @@ def build( project.update_images_cache() +def _multirun_with_cfg( + *, + cfg: DictConfig, + k8sctx: K8sContext, + workload: Workload, + script_args: list[str], +) -> None: + cfg_str = OmegaConf.to_yaml(cfg=cfg, resolve=True) + b64_cfg = base64.b64encode(cfg_str.encode("utf-8")).decode("ascii") + workload.set_args([b64_cfg]) + + k8sctx.execute_workload(workload=workload, submit=True) + + +@app.command( + context_settings={ + "allow_extra_args": True, + "ignore_unknown_options": True, + } +) +def multirun( + file: Annotated[Path, typer.Argument(help="Python file to be executed")], + config: Annotated[Path, typer.Option(help="Hydra config file")] = None, + target: Annotated[ + Target, typer.Option(help="Execution target", case_sensitive=False) + ] = Target.gpu, + kubeconfig: Annotated[ + Path, typer.Option(help="Kubernetes configuration", envvar="KUBECONFIG") + ] = None, + image: Annotated[str, typer.Option(help="Docker image")] = None, + registry_pat: Annotated[ + str, + typer.Option( + help="Registry personal access token (PAT)", + envvar="REGISTRY_PAT", + ), + ] = None, + args: Annotated[list[str], typer.Argument(help="Additional arguments")] = None, +): + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + TimeElapsedColumn(), + expand=True, + ) as progress: + task_project = progress.add_task( + description="[cyan]Loading project...", total=1 + ) + project = Project() + progress.advance(task_project) + + if image is None: + image = project.cached_images(target.value) + + if kubeconfig is None: + kubeconfig = project.kubeconfig + + if kubeconfig.exists() is False: + typer.echo(f"kubeconfig file {kubeconfig} does not exist") + raise typer.Exit(code=1) + + if kubeconfig is None: + typer.echo("KUBECONFIG not set") + raise typer.Exit(code=1) + + k8s_context = K8sContext(kubeconfig.as_posix(), progress=progress) + k8s_context.set_target(target) + k8s_context.set_registry_pat(registry_pat) + k8s_context.set_container_image(image) + + workload = Workload.from_entry_script(entry_script=file) + + @hydra.main( + version_base=None, + config_path=os.getcwd(), + config_name=config.stem, + ) + def _hydra_app(cfg: DictConfig): + _multirun_with_cfg( + cfg=cfg, + k8sctx=k8s_context, + workload=workload, + script_args=args or [], + ) + + try: + old_argv = sys.argv + sys.argv = [ + "q8sctl", + "-m", + "hydra.job.chdir=False", + "hydra.run.dir=.", + ] + + _hydra_app() + finally: + sys.argv = old_argv + + @app.command( context_settings={ "allow_extra_args": True, diff --git a/src/q8s/execution.py b/src/q8s/execution.py index 108c22e..c253994 100644 --- a/src/q8s/execution.py +++ b/src/q8s/execution.py @@ -16,6 +16,7 @@ from q8s.plugins.cpu_job import CPUJobTemplatePlugin from q8s.plugins.cuda_job import CUDAJobTemplatePlugin from q8s.plugins.job_template_spec import JobTemplatePluginSpec +from q8s.project import Project from q8s.utils import extract_non_none_value from .workload import Workload @@ -135,11 +136,10 @@ def __init__(self, kubeconfig: str, logger=None, progress: Progress = None): self.core_api_instance = client.CoreV1Api() self.batch_api_instance = client.BatchV1Api() - self.name = f"qubernetes-job-{K8sContext.get_id()}" - self.__env = load_env() self.jupyter_logger = logger + self.project = Project() @staticmethod def get_id(): @@ -206,7 +206,11 @@ def __create_job_object_from_workload(self, workload: Workload) -> client.V1Job: metadata=client.V1ObjectMeta( name=self.name, namespace=self.namespace, - labels={"qubernetes.dev/job.type": "jupyter"}, + labels={ + # "qubernetes.dev/job.type": "jupyter", + "qubernetes.dev/created.by": "q8sctl", + "qubernetes.dev/project.name": self.project.name, + }, ), spec=spec, ) @@ -490,6 +494,8 @@ def execute_workload( Execute the given workload. """ + self.name = f"qubernetes-job-{K8sContext.get_id()}" + try: self.__create_job_object_from_workload(workload=workload) diff --git a/src/q8s/plugins/cpu_job.py b/src/q8s/plugins/cpu_job.py index 12807d8..60a14dc 100644 --- a/src/q8s/plugins/cpu_job.py +++ b/src/q8s/plugins/cpu_job.py @@ -26,12 +26,12 @@ def makejob( volume_name = f"app-volume-{name}" - env_var = list(env) + env_var = self.patch_environment(list(env)) + env_var.append(client.V1EnvVar(name="Q8S_JOB_NAME", value=name)) + if workload.is_src_project: env_var.append(client.V1EnvVar(name="PYTHONPATH", value=f"{WORKSPACE}/src")) - self.patch_environment_with_git_info(env_var) - container = client.V1Container( name="quantum-routine", image=container_image, diff --git a/src/q8s/plugins/cuda_job.py b/src/q8s/plugins/cuda_job.py index b68f25d..40331f8 100644 --- a/src/q8s/plugins/cuda_job.py +++ b/src/q8s/plugins/cuda_job.py @@ -33,12 +33,12 @@ def makejob( volume_name = f"app-volume-{name}" - env_var = list(env) + env_var = self.patch_environment(list(env)) + env_var.append(client.V1EnvVar(name="Q8S_JOB_NAME", value=name)) + if workload.is_src_project: env_var.append(client.V1EnvVar(name="PYTHONPATH", value=f"{WORKSPACE}/src")) - self.patch_environment_with_git_info(env_var) - container = client.V1Container( name="quantum-routine", image=container_image, diff --git a/src/q8s/plugins/job.py b/src/q8s/plugins/job.py index 496cb0b..4e01dad 100644 --- a/src/q8s/plugins/job.py +++ b/src/q8s/plugins/job.py @@ -3,6 +3,7 @@ from kubernetes import client from q8s.plugins.utils.git_info import get_git_info +from q8s.project import Project class JobPlugin: @@ -18,6 +19,21 @@ class JobPlugin: - GIT_PYTHON_REFRESH: Set to "quiet" to suppress GitPython refresh warnings. """ + def patch_environment(self, env: list[client.V1EnvVar]) -> list[client.V1EnvVar]: + """ + Patch environment variables for the job with Git and Project information + + Args: + env (list[client.V1EnvVar]): Original environment variables + Returns: + list[client.V1EnvVar]: Patched environment variables + """ + + env = self.patch_environment_with_git_info(env) + env = self.patch_environment_with_project_info(env) + + return env + def patch_environment_with_git_info( self, env: list[client.V1EnvVar] ) -> list[client.V1EnvVar]: @@ -52,3 +68,21 @@ def patch_environment_with_git_info( env.append(client.V1EnvVar(name="GIT_PYTHON_REFRESH", value="quiet")) return env + + def patch_environment_with_project_info( + self, env: list[client.V1EnvVar] + ) -> list[client.V1EnvVar]: + """ + Patch environment variables for the job with Project information + + Args: + env (list[client.V1EnvVar]): Original environment variables + Returns: + list[client.V1EnvVar]: Patched environment variables + """ + + project = Project() + + env.append(client.V1EnvVar(name="Q8S_PROJECT_NAME", value=project.name)) + + return env diff --git a/src/q8s/runtime/__init__.py b/src/q8s/runtime/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/q8s/runtime/decorators.py b/src/q8s/runtime/decorators.py new file mode 100644 index 0000000..8effd71 --- /dev/null +++ b/src/q8s/runtime/decorators.py @@ -0,0 +1,51 @@ +import base64 +import sys +import traceback +from dataclasses import is_dataclass +from functools import wraps +from typing import Type, TypeVar + +from omegaconf import OmegaConf + +T = TypeVar("T") + + +def with_app_config(config_cls: Type[T]): + """ + Decorator that: + - reads base64-encoded config from sys.argv[1] + - merges it with a structured OmegaConf dataclass + - resolves the config + - passes the config instance into the function + """ + + if not is_dataclass(config_cls): + raise TypeError("config_cls must be a dataclass") + + def decorator(func): + @wraps(func) + def wrapper(*args, **kwargs): + cfg = None + trace: str = None + + if len(sys.argv) < 2: + raise RuntimeError( + "Missing base64-encoded config argument (expected sys.argv[1])" + ) + + try: + raw_input = base64.b64decode(sys.argv[1]).decode("utf-8") + + cfg = OmegaConf.merge( + OmegaConf.structured(config_cls), + OmegaConf.create(raw_input), + ) + OmegaConf.resolve(cfg) + except Exception: + trace = traceback.format_exc() + + return func(cfg, trace, *args, **kwargs) + + return wrapper + + return decorator diff --git a/tests/test_execution.py b/tests/test_execution.py index 85a6e0d..1d79b8a 100644 --- a/tests/test_execution.py +++ b/tests/test_execution.py @@ -124,7 +124,7 @@ def _make_context(self): progress.add_task.return_value = "task-id" ctx._K8sContext__progress = progress ctx.jupyter_logger = Mock() - ctx.name = "qubernetes-job-test" + K8sContext.get_id = Mock(return_value="test") return ctx, progress def test_execute_workload_submit_false_returns_logs(self): diff --git a/tests/test_job_template_spec.py b/tests/test_job_template_spec.py index 62c1488..c89f72a 100644 --- a/tests/test_job_template_spec.py +++ b/tests/test_job_template_spec.py @@ -6,10 +6,12 @@ from q8s.enums import Target from q8s.plugins.cpu_job import CPUJobTemplatePlugin from q8s.plugins.cuda_job import CUDAJobTemplatePlugin +from tests.test_project import mocked_configuration class TestCPUandGPUJobTemplatePlugin(unittest.TestCase): + @unittest.mock.patch("q8s.project.load", return_value=mocked_configuration) @patch("q8s.plugins.job_template_spec.client.V1Container") @patch("q8s.plugins.job_template_spec.client.V1PodTemplateSpec") @patch("q8s.plugins.job_template_spec.client.V1PodSpec") @@ -30,6 +32,7 @@ def test_makejob_cpu( mock_v1_pod_spec, mock_v1_pod_template_spec, mock_v1_container, + mock_load_project, ): plugin = CPUJobTemplatePlugin() name = "test-job" @@ -64,6 +67,7 @@ def test_makejob_cpu( mock_v1_volume.assert_called_once() mock_v1_config_map_volume_source.assert_called_once() + @unittest.mock.patch("q8s.project.load", return_value=mocked_configuration) @patch("q8s.plugins.job_template_spec.client.V1Container") @patch("q8s.plugins.job_template_spec.client.V1PodTemplateSpec") @patch("q8s.plugins.job_template_spec.client.V1PodSpec") @@ -84,6 +88,7 @@ def test_makejob_gpu( mock_v1_pod_spec, mock_v1_pod_template_spec, mock_v1_container, + mock_load_project, ): plugin = CUDAJobTemplatePlugin() name = "test-job"