diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a3fe25c..0dac4eb 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -20,8 +20,8 @@ jobs: uses: actions/setup-python@v5 with: python-version: "3.11" - - name: Validate examples - run: python src/sourceos_boot/validate_boot_release_set.py examples/*.json + - name: Validate native sourceos-boot example + run: python src/sourceos_boot/validate_boot_release_set.py examples/boot-release-set.example.json - name: Run tests run: | python -m pip install --upgrade pip pytest diff --git a/examples/control-plane-boot-release-set.example.json b/examples/control-plane-boot-release-set.example.json new file mode 100644 index 0000000..72de974 --- /dev/null +++ b/examples/control-plane-boot-release-set.example.json @@ -0,0 +1,74 @@ +{ + "boot_release_set_id": "urn:srcos:boot-release-set:m2-demo-recovery-2026-04-26", + "base_release_set_ref": "urn:srcos:release-set:m2-demo-2026-04-26", + "boot_mode": "recovery", + "boot_channel": "rescue", + "status": "ready", + "platform_entrypoints": [ + { + "platform": "apple-silicon", + "entrypoint_kind": "asahi-installer-entry", + "entrypoint_ref": "urn:srcos:boot-entry:m2-demo-sourceos-recovery-asahi-installer", + "requires_network": true, + "notes": "Models the SourceOS Recovery Environment as an Apple Silicon boot-picker compatible installer/recovery entry." + }, + { + "platform": "uefi-ipxe", + "entrypoint_kind": "ipxe-menu-entry", + "entrypoint_ref": "urn:srcos:boot-entry:generic-ipxe-sourceos-recovery", + "requires_network": true, + "notes": "Portable PXE-like entrypoint for later PC/Purism/generic UEFI targets." + } + ], + "artifacts": { + "manifest_ref": "urn:srcos:artifact:m2-demo-recovery-manifest-sha256-6e6f74626f6f74", + "kernel_ref": "urn:srcos:artifact:m2-demo-recovery-kernel-sha256-0f3b6d7f", + "initrd_ref": "urn:srcos:artifact:m2-demo-recovery-initrd-sha256-08f4c82e", + "rootfs_ref": "urn:srcos:artifact:m2-demo-recovery-rootfs-sha256-5f4dcc3b", + "bootloader_ref": "urn:srcos:artifact:m2-demo-m1n1-uboot-chain-sha256-7a38f2d1", + "installer_metadata_ref": "urn:srcos:artifact:m2-demo-asahi-installer-data-sha256-3f9c1f44" + }, + "policy_ref": "urn:srcos:policy:boot-recovery-m2-demo-v1", + "signing": { + "signature_ref": "urn:srcos:signature:m2-demo-recovery-manifest-rsa-pss-sha256", + "signer_ref": "urn:srcos:key:sourceos-release-root", + "signature_algorithm": "rsa-pss-sha256", + "manifest_digest": { + "algorithm": "sha256", + "value": "sha256:6e6f74626f6f742d6d322d7265636f766572792d64656d6f" + } + }, + "boot_capabilities": { + "disk_write": "recovery-scoped", + "network_required": true, + "kexec_allowed": false, + "recovery_actions": [ + "fetch-release-set", + "rollback-system", + "repair-user-plane", + "repair-agent-plane", + "rotate-keys", + "report-proof" + ] + }, + "proof_reporting": { + "required": true, + "reports": [ + "device-claim", + "environment-fingerprint", + "manifest-digest", + "artifact-hash-manifest", + "policy-decision", + "rollback-result" + ], + "endpoint_ref": "urn:srcos:endpoint:control-plane-boot-proof-report" + }, + "offline_fallback": { + "enabled": true, + "strategy": "last-known-good-signed-boot-release-set", + "requires_signature_verification": true, + "allows_unsigned_artifacts": false + }, + "created_at": "2026-04-26T14:30:00Z", + "notes": "SourceOS Recovery Environment for the M2 local-first demo." +} diff --git a/src/sourceos_boot/cli.py b/src/sourceos_boot/cli.py index 0bc8a9b..4ba1db7 100644 --- a/src/sourceos_boot/cli.py +++ b/src/sourceos_boot/cli.py @@ -11,6 +11,7 @@ from typing import Any from .adapter import DeviceClaim, SourceOSBootAdapter +from .control_plane import build_control_plane_boot_plan def load_json(path: Path) -> dict[str, Any]: @@ -57,6 +58,18 @@ def adapt_nlboot(args: argparse.Namespace) -> int: return 0 +def plan_control_plane(args: argparse.Namespace) -> int: + boot_release_set_doc = load_json(args.boot_release_set) + plan = build_control_plane_boot_plan(boot_release_set_doc) + output = { + "apiVersion": "sourceos.dev/v1", + "kind": "ControlPlaneBootPlan", + "plan": plan.to_dict(), + } + print(json.dumps(output, indent=2, sort_keys=True)) + return 0 + + def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="SourceOS Boot helpers") subparsers = parser.add_subparsers(dest="command", required=True) @@ -71,6 +84,13 @@ def build_parser() -> argparse.ArgumentParser: adapt.add_argument("--correlation-id", required=True) adapt.add_argument("--verification-result", choices=["pass", "fail", "unknown"], default="pass") adapt.set_defaults(func=adapt_nlboot) + + plan = subparsers.add_parser( + "plan-control-plane", + help="Build a safe, non-mutating boot plan from a sourceos-spec control-plane BootReleaseSet", + ) + plan.add_argument("--boot-release-set", type=Path, required=True) + plan.set_defaults(func=plan_control_plane) return parser diff --git a/src/sourceos_boot/control_plane.py b/src/sourceos_boot/control_plane.py new file mode 100644 index 0000000..c9836cc --- /dev/null +++ b/src/sourceos_boot/control_plane.py @@ -0,0 +1,223 @@ +"""Control-plane BootReleaseSet planner. + +This module consumes the canonical SourceOS control-plane BootReleaseSet shape +from `SourceOS-Linux/sourceos-spec` and turns it into a safe, non-mutating boot +plan for sourceos-boot. + +It deliberately performs no network, disk, kexec, install, rollback, or key +rotation side effects. It validates the boot intent and emits the operations a +future executor would need to perform, together with the proof reports required +by policy. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any + + +SAFE_DIGEST_PREFIXES = ("sha256:", "sha384:", "sha512:") +BOOT_ACTION_BY_CHANNEL = { + "live": "boot-live-environment", + "installer": "plan-install", + "rescue": "plan-rescue", + "rollback": "plan-rollback", + "bootstrap": "plan-bootstrap", +} + + +@dataclass(frozen=True) +class ControlPlaneBootPlan: + """Side-effect-free plan derived from a control-plane BootReleaseSet.""" + + boot_release_set_id: str + base_release_set_ref: str + boot_mode: str + boot_channel: str + action: str + status: str + policy_ref: str + platform_entrypoints: list[dict[str, Any]] + artifact_refs: dict[str, str | None] + signing: dict[str, Any] + boot_capabilities: dict[str, Any] + proof_reports: list[str] + offline_fallback: dict[str, Any] + verification_gates: list[str] + execute: bool = False + + def to_dict(self) -> dict[str, Any]: + return { + "boot_release_set_id": self.boot_release_set_id, + "base_release_set_ref": self.base_release_set_ref, + "boot_mode": self.boot_mode, + "boot_channel": self.boot_channel, + "action": self.action, + "status": self.status, + "policy_ref": self.policy_ref, + "platform_entrypoints": self.platform_entrypoints, + "artifact_refs": self.artifact_refs, + "signing": self.signing, + "boot_capabilities": self.boot_capabilities, + "proof_reports": self.proof_reports, + "offline_fallback": self.offline_fallback, + "verification_gates": self.verification_gates, + "execute": self.execute, + } + + +class ControlPlaneBootReleaseSetError(ValueError): + """Raised when a control-plane BootReleaseSet is unsafe or malformed.""" + + +def _require_str(data: dict[str, Any], key: str) -> str: + value = data.get(key) + if not isinstance(value, str) or not value.strip(): + raise ControlPlaneBootReleaseSetError(f"{key} must be a non-empty string") + return value + + +def _require_dict(data: dict[str, Any], key: str) -> dict[str, Any]: + value = data.get(key) + if not isinstance(value, dict): + raise ControlPlaneBootReleaseSetError(f"{key} must be an object") + return value + + +def _require_list(data: dict[str, Any], key: str) -> list[Any]: + value = data.get(key) + if not isinstance(value, list): + raise ControlPlaneBootReleaseSetError(f"{key} must be a list") + return value + + +def _artifact_refs(artifacts: dict[str, Any]) -> dict[str, str | None]: + required = ["manifest_ref", "kernel_ref", "initrd_ref", "rootfs_ref"] + refs: dict[str, str | None] = {} + for key in required: + refs[key] = _require_str(artifacts, key) + for key in ["bootloader_ref", "installer_metadata_ref"]: + value = artifacts.get(key) + if value is not None and not isinstance(value, str): + raise ControlPlaneBootReleaseSetError(f"artifacts.{key} must be a string or null") + refs[key] = value + return refs + + +def _validate_signing(signing: dict[str, Any]) -> None: + signature_ref = _require_str(signing, "signature_ref") + signer_ref = _require_str(signing, "signer_ref") + signature_algorithm = _require_str(signing, "signature_algorithm") + manifest_digest = _require_dict(signing, "manifest_digest") + digest_value = _require_str(manifest_digest, "value") + if not signature_ref.startswith("urn:srcos:signature:"): + raise ControlPlaneBootReleaseSetError("signing.signature_ref must be a SourceOS signature URN") + if not signer_ref.startswith("urn:srcos:key:"): + raise ControlPlaneBootReleaseSetError("signing.signer_ref must be a SourceOS key URN") + if signature_algorithm not in {"rsa-pss-sha256", "ed25519", "ecdsa-p256-sha256"}: + raise ControlPlaneBootReleaseSetError("signing.signature_algorithm is unsupported") + if not digest_value.startswith(SAFE_DIGEST_PREFIXES): + raise ControlPlaneBootReleaseSetError("signing.manifest_digest.value must be sha256/sha384/sha512 prefixed") + + +def _verification_gates(plan: ControlPlaneBootPlan) -> list[str]: + gates = [ + "verify-boot-release-set-status-ready", + "verify-manifest-signature", + "verify-artifact-refs-present", + "verify-policy-ref-present", + "verify-proof-reporting-required", + ] + if plan.offline_fallback.get("enabled"): + gates.append("verify-offline-fallback-signature-required") + if plan.boot_capabilities.get("kexec_allowed") is False: + gates.append("verify-kexec-denied") + if plan.boot_capabilities.get("disk_write") in {"installer-scoped", "recovery-scoped"}: + gates.append("verify-disk-write-scope") + return gates + + +def build_control_plane_boot_plan(doc: dict[str, Any]) -> ControlPlaneBootPlan: + """Build a side-effect-free plan from canonical control-plane BootReleaseSet JSON.""" + + boot_release_set_id = _require_str(doc, "boot_release_set_id") + base_release_set_ref = _require_str(doc, "base_release_set_ref") + boot_mode = _require_str(doc, "boot_mode") + boot_channel = _require_str(doc, "boot_channel") + status = _require_str(doc, "status") + policy_ref = _require_str(doc, "policy_ref") + + if status != "ready": + raise ControlPlaneBootReleaseSetError("BootReleaseSet status must be ready before planning") + if boot_channel not in BOOT_ACTION_BY_CHANNEL: + raise ControlPlaneBootReleaseSetError(f"unsupported boot_channel={boot_channel!r}") + if boot_mode not in {"installer", "recovery", "ephemeral", "bootstrap"}: + raise ControlPlaneBootReleaseSetError(f"unsupported boot_mode={boot_mode!r}") + if not boot_release_set_id.startswith("urn:srcos:boot-release-set:"): + raise ControlPlaneBootReleaseSetError("boot_release_set_id must be a SourceOS BootReleaseSet URN") + if not base_release_set_ref.startswith("urn:srcos:release-set:"): + raise ControlPlaneBootReleaseSetError("base_release_set_ref must be a SourceOS ReleaseSet URN") + if not policy_ref.startswith("urn:srcos:policy:"): + raise ControlPlaneBootReleaseSetError("policy_ref must be a SourceOS policy URN") + + platform_entrypoints = _require_list(doc, "platform_entrypoints") + if not platform_entrypoints: + raise ControlPlaneBootReleaseSetError("platform_entrypoints must not be empty") + for index, entrypoint in enumerate(platform_entrypoints): + if not isinstance(entrypoint, dict): + raise ControlPlaneBootReleaseSetError(f"platform_entrypoints[{index}] must be an object") + _require_str(entrypoint, "platform") + _require_str(entrypoint, "entrypoint_kind") + _require_str(entrypoint, "entrypoint_ref") + + artifacts = _artifact_refs(_require_dict(doc, "artifacts")) + signing = _require_dict(doc, "signing") + _validate_signing(signing) + + boot_capabilities = _require_dict(doc, "boot_capabilities") + offline_fallback = _require_dict(doc, "offline_fallback") + proof_reporting = _require_dict(doc, "proof_reporting") + proof_reports = proof_reporting.get("reports") + if proof_reporting.get("required") is not True: + raise ControlPlaneBootReleaseSetError("proof_reporting.required must be true") + if not isinstance(proof_reports, list) or not proof_reports: + raise ControlPlaneBootReleaseSetError("proof_reporting.reports must be a non-empty list") + if offline_fallback.get("enabled") and offline_fallback.get("requires_signature_verification") is not True: + raise ControlPlaneBootReleaseSetError("enabled offline fallback must require signature verification") + if offline_fallback.get("allows_unsigned_artifacts") is not False: + raise ControlPlaneBootReleaseSetError("offline fallback must not allow unsigned artifacts") + + plan = ControlPlaneBootPlan( + boot_release_set_id=boot_release_set_id, + base_release_set_ref=base_release_set_ref, + boot_mode=boot_mode, + boot_channel=boot_channel, + action=BOOT_ACTION_BY_CHANNEL[boot_channel], + status=status, + policy_ref=policy_ref, + platform_entrypoints=platform_entrypoints, + artifact_refs=artifacts, + signing=signing, + boot_capabilities=boot_capabilities, + proof_reports=[str(item) for item in proof_reports], + offline_fallback=offline_fallback, + verification_gates=[], + execute=False, + ) + return ControlPlaneBootPlan( + boot_release_set_id=plan.boot_release_set_id, + base_release_set_ref=plan.base_release_set_ref, + boot_mode=plan.boot_mode, + boot_channel=plan.boot_channel, + action=plan.action, + status=plan.status, + policy_ref=plan.policy_ref, + platform_entrypoints=plan.platform_entrypoints, + artifact_refs=plan.artifact_refs, + signing=plan.signing, + boot_capabilities=plan.boot_capabilities, + proof_reports=plan.proof_reports, + offline_fallback=plan.offline_fallback, + verification_gates=_verification_gates(plan), + execute=False, + ) diff --git a/tests/test_control_plane.py b/tests/test_control_plane.py new file mode 100644 index 0000000..ca6cefb --- /dev/null +++ b/tests/test_control_plane.py @@ -0,0 +1,96 @@ +from __future__ import annotations + +import json +import os +import subprocess +import sys +from pathlib import Path + +import pytest + +from sourceos_boot.control_plane import ControlPlaneBootReleaseSetError, build_control_plane_boot_plan + +ROOT = Path(__file__).resolve().parents[1] +FIXTURE = ROOT / "examples" / "control-plane-boot-release-set.example.json" + + +def load_fixture() -> dict: + return json.loads(FIXTURE.read_text(encoding="utf-8")) + + +def test_builds_non_mutating_plan_from_control_plane_boot_release_set() -> None: + plan = build_control_plane_boot_plan(load_fixture()) + + payload = plan.to_dict() + assert payload["boot_release_set_id"] == "urn:srcos:boot-release-set:m2-demo-recovery-2026-04-26" + assert payload["base_release_set_ref"] == "urn:srcos:release-set:m2-demo-2026-04-26" + assert payload["boot_mode"] == "recovery" + assert payload["boot_channel"] == "rescue" + assert payload["action"] == "plan-rescue" + assert payload["execute"] is False + assert payload["policy_ref"] == "urn:srcos:policy:boot-recovery-m2-demo-v1" + assert payload["artifact_refs"]["manifest_ref"].startswith("urn:srcos:artifact:") + assert payload["boot_capabilities"]["disk_write"] == "recovery-scoped" + assert payload["boot_capabilities"]["kexec_allowed"] is False + assert payload["offline_fallback"] == { + "enabled": True, + "strategy": "last-known-good-signed-boot-release-set", + "requires_signature_verification": True, + "allows_unsigned_artifacts": False, + } + assert payload["proof_reports"] == [ + "device-claim", + "environment-fingerprint", + "manifest-digest", + "artifact-hash-manifest", + "policy-decision", + "rollback-result", + ] + assert payload["verification_gates"] == [ + "verify-boot-release-set-status-ready", + "verify-manifest-signature", + "verify-artifact-refs-present", + "verify-policy-ref-present", + "verify-proof-reporting-required", + "verify-offline-fallback-signature-required", + "verify-kexec-denied", + "verify-disk-write-scope", + ] + + +def test_rejects_unsigned_offline_fallback() -> None: + doc = load_fixture() + doc["offline_fallback"]["allows_unsigned_artifacts"] = True + with pytest.raises(ControlPlaneBootReleaseSetError, match="must not allow unsigned artifacts"): + build_control_plane_boot_plan(doc) + + +def test_rejects_non_ready_boot_release_set() -> None: + doc = load_fixture() + doc["status"] = "draft" + with pytest.raises(ControlPlaneBootReleaseSetError, match="status must be ready"): + build_control_plane_boot_plan(doc) + + +def test_cli_plans_control_plane_boot_release_set() -> None: + env = os.environ.copy() + env["PYTHONPATH"] = str(ROOT / "src") + result = subprocess.run( + [ + sys.executable, + "-m", + "sourceos_boot.cli", + "plan-control-plane", + "--boot-release-set", + str(FIXTURE), + ], + cwd=ROOT, + env=env, + text=True, + capture_output=True, + check=True, + ) + payload = json.loads(result.stdout) + assert payload["kind"] == "ControlPlaneBootPlan" + assert payload["plan"]["action"] == "plan-rescue" + assert payload["plan"]["execute"] is False