Skip to content

Commit 5b2d056

Browse files
committed
Add local-only State Integrity service boundary
1 parent b961fc9 commit 5b2d056

1 file changed

Lines changed: 236 additions & 0 deletions

File tree

src/sourceos_syncd/service.py

Lines changed: 236 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
"""Local-only HTTP service boundary for sourceos-syncd.
2+
3+
The service is standard-library-only and exposes read/preview endpoints for the
4+
current State Integrity Report implementation. The CLI remains the stable
5+
operator surface.
6+
"""
7+
8+
from __future__ import annotations
9+
10+
import argparse
11+
import json
12+
from dataclasses import dataclass
13+
from http import HTTPStatus
14+
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
15+
from typing import Any
16+
from urllib.parse import parse_qs, urlparse
17+
18+
from . import __version__
19+
from .local_store import LocalStateStore
20+
from .reports import repair_plan, snapshot, utc_now
21+
from .store_reports import snapshot_from_store
22+
23+
LOCAL_BIND_HOSTS = {"127.0.0.1", "localhost", "::1"}
24+
25+
26+
def json_bytes(payload: dict[str, Any] | list[Any]) -> bytes:
27+
return (json.dumps(payload, indent=2, sort_keys=False) + "\n").encode("utf-8")
28+
29+
30+
def build_state_report(store_root: str | None = None) -> dict[str, Any]:
31+
return snapshot_from_store(store_root) if store_root else snapshot()
32+
33+
34+
def readiness_from_report(report: dict[str, Any]) -> tuple[int, dict[str, Any]]:
35+
diagnosis = report.get("diagnosis", {}) if isinstance(report.get("diagnosis"), dict) else {}
36+
local_state = report.get("local_state") or diagnosis.get("local_state") or {}
37+
initialized = local_state.get("initialized")
38+
status = diagnosis.get("status", "unknown")
39+
severity = diagnosis.get("severity", "unknown")
40+
41+
if initialized is False:
42+
return int(HTTPStatus.SERVICE_UNAVAILABLE), {
43+
"ready": False,
44+
"status": "uninitialized",
45+
"severity": "warning",
46+
"summary": "Local state store is not initialized.",
47+
"generated_at": utc_now(),
48+
}
49+
50+
if status == "healthy":
51+
code, ready = HTTPStatus.OK, True
52+
elif status == "degraded":
53+
code, ready = HTTPStatus.OK, True
54+
elif status == "unsafe":
55+
code, ready = HTTPStatus.SERVICE_UNAVAILABLE, False
56+
else:
57+
code, ready = HTTPStatus.SERVICE_UNAVAILABLE, False
58+
59+
return int(code), {
60+
"ready": ready,
61+
"status": status,
62+
"severity": severity,
63+
"summary": diagnosis.get("summary", "State Integrity readiness is unknown."),
64+
"generated_at": utc_now(),
65+
}
66+
67+
68+
def liveness() -> dict[str, Any]:
69+
return {"live": True, "component": "sourceos-syncd", "version": __version__, "generated_at": utc_now()}
70+
71+
72+
def events_from_store(store_root: str | None = None) -> list[dict[str, Any]]:
73+
if not store_root:
74+
return []
75+
return LocalStateStore(store_root).iter_events()
76+
77+
78+
def find_event(event_id: str, store_root: str | None = None) -> dict[str, Any] | None:
79+
for event in events_from_store(store_root):
80+
if event.get("event_id") == event_id:
81+
return event
82+
return None
83+
84+
85+
def explain_event(event: dict[str, Any]) -> dict[str, Any]:
86+
return {
87+
"event_id": event.get("event_id"),
88+
"event_type": event.get("event_type"),
89+
"summary": f"Event {event.get('event_id')} recorded {event.get('event_type')} on lane {event.get('lane', 'unknown')}.",
90+
"actor_or_producer": event.get("producer") or event.get("actor_id"),
91+
"object_id": event.get("object_id"),
92+
"created_at": event.get("created_at") or event.get("occurred_at"),
93+
}
94+
95+
96+
def planning_preview(store_root: str | None = None) -> dict[str, Any]:
97+
report = build_state_report(store_root)
98+
plan = repair_plan(report)
99+
plan.setdefault("service", {})["preview_only"] = True
100+
return plan
101+
102+
103+
def metrics_text(report: dict[str, Any]) -> str:
104+
diagnosis = report.get("diagnosis", {}) if isinstance(report.get("diagnosis"), dict) else {}
105+
status = diagnosis.get("status", "unknown")
106+
local_state = report.get("local_state") or diagnosis.get("local_state") or {}
107+
initialized = 1 if local_state.get("initialized", True) is not False else 0
108+
corrupted = sum(int(lane.get("objects", {}).get("corrupted", 0) or 0) for lane in report.get("lanes", []) if isinstance(lane, dict))
109+
replay_lag = sum(int(lane.get("journal", {}).get("replay_lag_events", 0) or 0) for lane in report.get("lanes", []) if isinstance(lane, dict))
110+
ready_code, ready = readiness_from_report(report)
111+
ready_value = 1 if ready.get("ready") else 0
112+
return "\n".join([
113+
"# HELP sourceos_syncd_ready SourceOS syncd readiness, 1 when ready.",
114+
"# TYPE sourceos_syncd_ready gauge",
115+
f"sourceos_syncd_ready{{status=\"{status}\"}} {ready_value}",
116+
"# HELP sourceos_syncd_initialized Local state initialization status.",
117+
"# TYPE sourceos_syncd_initialized gauge",
118+
f"sourceos_syncd_initialized {initialized}",
119+
"# HELP sourceos_syncd_replay_lag_events Total replay lag events across lanes.",
120+
"# TYPE sourceos_syncd_replay_lag_events gauge",
121+
f"sourceos_syncd_replay_lag_events {replay_lag}",
122+
"# HELP sourceos_syncd_corrupted_objects Corrupted object count across lanes.",
123+
"# TYPE sourceos_syncd_corrupted_objects gauge",
124+
f"sourceos_syncd_corrupted_objects {corrupted}",
125+
"# HELP sourceos_syncd_ready_http_code HTTP code selected by readiness computation.",
126+
"# TYPE sourceos_syncd_ready_http_code gauge",
127+
f"sourceos_syncd_ready_http_code {ready_code}",
128+
"",
129+
])
130+
131+
132+
@dataclass(frozen=True)
133+
class ServiceConfig:
134+
host: str = "127.0.0.1"
135+
port: int = 8765
136+
store_root: str | None = None
137+
138+
def validate(self) -> None:
139+
if self.host not in LOCAL_BIND_HOSTS:
140+
raise ValueError(f"sourceos-syncd service is local-only; refusing bind host {self.host!r}")
141+
142+
143+
def make_handler(config: ServiceConfig) -> type[BaseHTTPRequestHandler]:
144+
config.validate()
145+
146+
class SourceOSSyncdHandler(BaseHTTPRequestHandler):
147+
server_version = "sourceos-syncd/" + __version__
148+
149+
def do_GET(self) -> None: # noqa: N802
150+
parsed = urlparse(self.path)
151+
path = parsed.path.rstrip("/") or "/"
152+
query = parse_qs(parsed.query)
153+
store_root = query.get("store_root", [config.store_root])[0]
154+
try:
155+
if path == "/healthz":
156+
self.write_json(HTTPStatus.OK, liveness())
157+
elif path == "/readyz":
158+
code, payload = readiness_from_report(build_state_report(store_root))
159+
self.write_json(code, payload)
160+
elif path == "/statez":
161+
self.write_json(HTTPStatus.OK, build_state_report(store_root))
162+
elif path == "/events":
163+
self.write_json(HTTPStatus.OK, {"events": events_from_store(store_root)})
164+
elif path.startswith("/events/"):
165+
self.handle_event_path(path, store_root)
166+
elif path == "/metrics":
167+
self.write_text(HTTPStatus.OK, metrics_text(build_state_report(store_root)), "text/plain; charset=utf-8")
168+
elif path == "/repairz":
169+
self.write_json(HTTPStatus.OK, planning_preview(store_root))
170+
else:
171+
self.write_json(HTTPStatus.NOT_FOUND, {"error": "not_found", "path": path})
172+
except Exception as exc: # noqa: BLE001
173+
self.write_json(HTTPStatus.INTERNAL_SERVER_ERROR, {"error": type(exc).__name__, "message": str(exc)})
174+
175+
def handle_event_path(self, path: str, store_root: str | None) -> None:
176+
parts = path.strip("/").split("/")
177+
event_id = parts[1] if len(parts) >= 2 else ""
178+
event = find_event(event_id, store_root)
179+
if event is None:
180+
self.write_json(HTTPStatus.NOT_FOUND, {"error": "not_found", "event_id": event_id})
181+
elif len(parts) == 3 and parts[2] == "explain":
182+
self.write_json(HTTPStatus.OK, explain_event(event))
183+
elif len(parts) == 2:
184+
self.write_json(HTTPStatus.OK, event)
185+
else:
186+
self.write_json(HTTPStatus.NOT_FOUND, {"error": "not_found", "path": path})
187+
188+
def log_message(self, format: str, *args: Any) -> None: # noqa: A002
189+
return
190+
191+
def write_json(self, status: int | HTTPStatus, payload: dict[str, Any] | list[Any]) -> None:
192+
body = json_bytes(payload)
193+
self.send_response(int(status))
194+
self.send_header("Content-Type", "application/json; charset=utf-8")
195+
self.send_header("Cache-Control", "no-store")
196+
self.send_header("Content-Length", str(len(body)))
197+
self.end_headers()
198+
self.wfile.write(body)
199+
200+
def write_text(self, status: int | HTTPStatus, payload: str, content_type: str) -> None:
201+
body = payload.encode("utf-8")
202+
self.send_response(int(status))
203+
self.send_header("Content-Type", content_type)
204+
self.send_header("Cache-Control", "no-store")
205+
self.send_header("Content-Length", str(len(body)))
206+
self.end_headers()
207+
self.wfile.write(body)
208+
209+
return SourceOSSyncdHandler
210+
211+
212+
def run_service(config: ServiceConfig) -> None:
213+
config.validate()
214+
server = ThreadingHTTPServer((config.host, config.port), make_handler(config))
215+
try:
216+
server.serve_forever()
217+
finally:
218+
server.server_close()
219+
220+
221+
def build_parser() -> argparse.ArgumentParser:
222+
parser = argparse.ArgumentParser(prog="python -m sourceos_syncd.service", description="Run local-only sourceos-syncd HTTP service.")
223+
parser.add_argument("--host", default="127.0.0.1", help="local bind host; defaults to 127.0.0.1")
224+
parser.add_argument("--port", type=int, default=8765, help="local bind port")
225+
parser.add_argument("--store-root", help="optional local store root")
226+
return parser
227+
228+
229+
def main(argv: list[str] | None = None) -> int:
230+
args = build_parser().parse_args(argv)
231+
run_service(ServiceConfig(host=args.host, port=args.port, store_root=args.store_root))
232+
return 0
233+
234+
235+
if __name__ == "__main__":
236+
raise SystemExit(main())

0 commit comments

Comments
 (0)