Skip to content

Commit 7ea4f2b

Browse files
committed
feat: add Lampstand-compatible evidence writer
1 parent fd8702d commit 7ea4f2b

1 file changed

Lines changed: 129 additions & 0 deletions

File tree

src/sourceos_syncd/evidence.py

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
"""Lampstand-compatible evidence writer stubs for sourceos-syncd.
2+
3+
The real Lampstand service will own durable evidence storage and signing. This
4+
module provides a deterministic local envelope and writer so sourceos-syncd can
5+
produce audit-grade artifacts today without taking a dependency on the service.
6+
"""
7+
8+
from __future__ import annotations
9+
10+
import hashlib
11+
import json
12+
from dataclasses import dataclass, field
13+
from datetime import datetime, timezone
14+
from pathlib import Path
15+
from typing import Any
16+
17+
EVIDENCE_SCHEMA = "sourceos.lampstand-evidence/v1alpha1"
18+
DEFAULT_WRITER = "sourceos-syncd-local-evidence-writer"
19+
20+
21+
def utc_now() -> str:
22+
return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
23+
24+
25+
def canonical_json(value: Any) -> str:
26+
return json.dumps(value, sort_keys=True, separators=(",", ":"))
27+
28+
29+
def digest_json(value: Any) -> str:
30+
return "sha256:" + hashlib.sha256(canonical_json(value).encode("utf-8")).hexdigest()
31+
32+
33+
@dataclass(frozen=True)
34+
class EvidenceEnvelope:
35+
evidence_id: str
36+
evidence_type: str
37+
subject: str
38+
artifact_digest: str
39+
artifact: dict[str, Any]
40+
writer: str = DEFAULT_WRITER
41+
schema: str = EVIDENCE_SCHEMA
42+
generated_at: str = field(default_factory=utc_now)
43+
signed: bool = False
44+
signature: dict[str, Any] | None = None
45+
46+
def as_dict(self) -> dict[str, Any]:
47+
return {
48+
"schema": self.schema,
49+
"evidence_id": self.evidence_id,
50+
"generated_at": self.generated_at,
51+
"writer": self.writer,
52+
"evidence_type": self.evidence_type,
53+
"subject": self.subject,
54+
"artifact_digest": self.artifact_digest,
55+
"artifact": self.artifact,
56+
"attestation": {
57+
"signed": self.signed,
58+
"signature": self.signature,
59+
"reason": "local-stub-unsigned" if not self.signed else "signed",
60+
},
61+
}
62+
63+
64+
def make_evidence(artifact: dict[str, Any], evidence_type: str, subject: str, writer: str = DEFAULT_WRITER) -> dict[str, Any]:
65+
artifact_digest = digest_json(artifact)
66+
seed = {
67+
"artifact_digest": artifact_digest,
68+
"evidence_type": evidence_type,
69+
"subject": subject,
70+
"writer": writer,
71+
}
72+
evidence_id = "evidence-" + hashlib.sha256(canonical_json(seed).encode("utf-8")).hexdigest()[:20]
73+
return EvidenceEnvelope(
74+
evidence_id=evidence_id,
75+
evidence_type=evidence_type,
76+
subject=subject,
77+
artifact_digest=artifact_digest,
78+
artifact=artifact,
79+
writer=writer,
80+
).as_dict()
81+
82+
83+
def validate_evidence(envelope: dict[str, Any]) -> list[str]:
84+
errors: list[str] = []
85+
required = {
86+
"schema",
87+
"evidence_id",
88+
"generated_at",
89+
"writer",
90+
"evidence_type",
91+
"subject",
92+
"artifact_digest",
93+
"artifact",
94+
"attestation",
95+
}
96+
missing = sorted(required - set(envelope))
97+
if missing:
98+
errors.append(f"missing evidence keys: {', '.join(missing)}")
99+
if envelope.get("schema") != EVIDENCE_SCHEMA:
100+
errors.append(f"unsupported evidence schema: {envelope.get('schema')!r}")
101+
if not str(envelope.get("artifact_digest", "")).startswith("sha256:"):
102+
errors.append("artifact_digest must start with sha256:")
103+
artifact = envelope.get("artifact")
104+
if isinstance(artifact, dict) and envelope.get("artifact_digest") != digest_json(artifact):
105+
errors.append("artifact_digest does not match artifact")
106+
if not isinstance(envelope.get("attestation"), dict):
107+
errors.append("attestation must be an object")
108+
return errors
109+
110+
111+
def write_evidence_file(envelope: dict[str, Any], output_dir: str | Path) -> Path:
112+
errors = validate_evidence(envelope)
113+
if errors:
114+
raise ValueError("invalid evidence envelope: " + "; ".join(errors))
115+
target_dir = Path(output_dir).expanduser().resolve()
116+
target_dir.mkdir(parents=True, exist_ok=True)
117+
target = target_dir / f"{envelope['evidence_id']}.json"
118+
tmp = target.with_suffix(".json.tmp")
119+
tmp.write_text(json.dumps(envelope, indent=2, sort_keys=False) + "\n", encoding="utf-8")
120+
tmp.replace(target)
121+
return target
122+
123+
124+
def load_json_file(path: str | Path) -> dict[str, Any]:
125+
with Path(path).expanduser().open("r", encoding="utf-8") as handle:
126+
data = json.load(handle)
127+
if not isinstance(data, dict):
128+
raise ValueError("evidence artifacts must be JSON objects")
129+
return data

0 commit comments

Comments
 (0)