diff --git a/.gitignore b/.gitignore index 32ab46e..34e02a7 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,5 @@ .env .env.* !.env.example +__pycache__/ +*.py[cod] diff --git a/docs/contracts.md b/docs/contracts.md index 6fcad6c..1e0915a 100644 --- a/docs/contracts.md +++ b/docs/contracts.md @@ -2,7 +2,7 @@ Canonical architecture: `SourceOS-Linux/sourceos-spec/docs/architecture/sourceos-state-integrity-layer.md` -This repository implements the first executable contract for SourceOS State Integrity. The current implementation lane is Python and standard-library-first. It provides State Integrity Report generation, diagnosis, verification, non-destructive repair planning, a filesystem-backed local store prototype, an append-only JSONL event log, registry persistence, and contract models for actors, schemas, objects, policy decisions, conflicts, events, profiles, devices, sync plans, and agent object transactions. +This repository implements the first executable contract for SourceOS State Integrity. The current implementation lane is Python and standard-library-first. It provides State Integrity Report generation, diagnosis, verification, non-destructive repair planning, a filesystem-backed local store prototype, an append-only JSONL event log, registry persistence, and contract models for actors, schemas, objects, policy decisions, conflicts, events, profiles, devices, sync plans, workspace operations, operation tasks, and agent object transactions. The next daemon phases should extend this baseline into full event-log, policy, profile, repair, service, and adapter subsystems without breaking the current JSON report contracts. @@ -114,6 +114,8 @@ sourceos.conflict-record/v1alpha1 sourceos.policy-decision/v1alpha1 sourceos.integrity-event/v1alpha1 sourceos.agent-object-transaction/v1alpha1 +sourceos.workspace-operation/v1alpha1 +sourceos.operation-task/v1alpha1 ``` Use `validate_contract(record)` to validate a JSON object by its `schema` field. Use the specific dataclass `from_dict()` helpers when callers know the expected type. @@ -139,6 +141,8 @@ conflict.alpha.json policy.decision-review.json event.object-alpha-created.json agent-transaction.alpha.json +workspace-operation.alpha.json +operation-task.alpha.json ``` The contract test suite loads every fixture, validates it through the model registry, and round-trips each fixture through its dataclass model. @@ -223,6 +227,18 @@ Agent transaction statuses: draft, proposed, approved, applied, rejected, reverted ``` +Workspace operation types: + +```text +sync.operation.enqueue, sync.operation.replay, sync.operation.reconcile, sync.conflict.detect, sync.conflict.resolve, sync.repair.apply, sync.tombstone.apply, sync.checkpoint.write +``` + +Decision card options: + +```text +merge, fork, skip +``` + Device states: ```text diff --git a/examples/contracts/conflict.alpha.json b/examples/contracts/conflict.alpha.json index eaa1c73..324fc8f 100644 --- a/examples/contracts/conflict.alpha.json +++ b/examples/contracts/conflict.alpha.json @@ -10,6 +10,15 @@ "schema_version": "v1", "severity": "review_required", "merge_policy": "human_review_required", + "decision_card": { + "card_id": "decision-card:conflict-alpha-001", + "options": ["merge", "fork", "skip"], + "recommended_option": "merge", + "selected_option": "fork", + "rationale": "Payload lineage diverged after remote replay; preserve both versions for review.", + "selected_by": "actor:local-user", + "selected_at": "2026-05-05T00:07:30Z" + }, "user_explanation": "Two scoped actors produced competing updates for the same object fixture.", "created_at": "2026-05-05T00:07:00Z" } diff --git a/examples/contracts/operation-task.alpha.json b/examples/contracts/operation-task.alpha.json new file mode 100644 index 0000000..7322ea1 --- /dev/null +++ b/examples/contracts/operation-task.alpha.json @@ -0,0 +1,19 @@ +{ + "schema": "sourceos.operation-task/v1alpha1", + "task_id": "operation-task:alpha-0001", + "operation_id": "workspace-operation:alpha-001", + "operation_type": "sync.operation.replay", + "status": "succeeded", + "attempt": 1, + "idempotency_key": "idem-replay-alpha-0001", + "policy_decision_ref": "policy-decision:review-alpha", + "evidence_refs": ["evidence:operation-task-0001"], + "created_at": "2026-05-05T00:10:01Z", + "updated_at": "2026-05-05T00:10:06Z", + "started_at": "2026-05-05T00:10:02Z", + "completed_at": "2026-05-05T00:10:06Z", + "remote_operation_id": "remote-op:9931", + "conflict_id": "conflict:alpha-001", + "checkpoint_ref": "checkpoint:workspace-default-0001", + "replay_safe": true +} diff --git a/examples/contracts/workspace-operation.alpha.json b/examples/contracts/workspace-operation.alpha.json new file mode 100644 index 0000000..3eac530 --- /dev/null +++ b/examples/contracts/workspace-operation.alpha.json @@ -0,0 +1,32 @@ +{ + "schema": "sourceos.workspace-operation/v1alpha1", + "operation_id": "workspace-operation:alpha-001", + "local_operation_id": "local-op:0001", + "remote_operation_id": "remote-op:9931", + "operation_type": "sync.operation.reconcile", + "status": "reconciled", + "workspace_id": "workspace:default", + "profile_id": "profile:local-dev", + "actor_id": "actor:agent-one", + "device_id": "device:local", + "encryption_profile_ref": "encryption-profile:local-default", + "local_log_ref": "journal:evt-00000001", + "checkpoint_ref": "checkpoint:workspace-default-0001", + "causal_parent_ids": ["local-op:0000"], + "conflict_ids": ["conflict:alpha-001"], + "provisional_artifact_ids": { + "artifact:temp-1": "object:alpha" + }, + "artifact_reconciliation": { + "artifact:temp-1": "artifact:final-44" + }, + "policy_decision_ref": "policy-decision:review-alpha", + "artifact_admission_ref": "artifact-admission:alpha-allow", + "ledger_evidence_refs": ["evidence:operation-0001"], + "replay_safe": true, + "stale_task": false, + "lease_expired": false, + "replay_idempotency_key": "idem-replay-alpha-0001", + "created_at": "2026-05-05T00:10:00Z", + "updated_at": "2026-05-05T00:10:05Z" +} diff --git a/src/sourceos_syncd/contracts.py b/src/sourceos_syncd/contracts.py index 9c63ca6..5ddcbcc 100644 --- a/src/sourceos_syncd/contracts.py +++ b/src/sourceos_syncd/contracts.py @@ -53,11 +53,24 @@ def require_mapping(record: dict[str, Any], field_name: str, contract_name: str) SYNC_OPERATION_CLASSES = {"replicate", "import", "export", "repair", "migrate", "delete", "restore"} SYNC_PLAN_STATUSES = {"planned", "blocked", "running", "failed", "completed", "cancelled"} CONFLICT_SEVERITIES = {"info", "warning", "review_required", "blocking"} +DECISION_CARD_OPTIONS = {"merge", "fork", "skip"} POLICY_EFFECTS = {"allow", "deny", "review_required"} TRANSACTION_OPERATIONS = {"create", "update", "delete", "merge", "repair", "migrate"} TRANSACTION_STATUSES = {"draft", "proposed", "approved", "applied", "rejected", "reverted"} DEVICE_STATES = {"trusted", "untrusted", "revoked", "quarantined"} PROFILE_CLASSES = {"personal", "work", "client", "air_gapped", "lab", "public_open_source"} +WORKSPACE_OPERATION_TYPES = { + "sync.operation.enqueue", + "sync.operation.replay", + "sync.operation.reconcile", + "sync.conflict.detect", + "sync.conflict.resolve", + "sync.repair.apply", + "sync.tombstone.apply", + "sync.checkpoint.write", +} +WORKSPACE_OPERATION_STATUSES = {"queued", "running", "replayed", "reconciled", "blocked", "completed", "failed", "cancelled"} +OPERATION_TASK_STATUSES = {"pending", "running", "succeeded", "failed", "cancelled"} @dataclass(slots=True) @@ -158,7 +171,7 @@ class ActorRecord(JsonContract): @classmethod def validate_dict(cls, record: dict[str, Any]) -> dict[str, Any]: - super().validate_dict(record) + super(ActorRecord, cls).validate_dict(record) invalid = sorted(set(record.get("capabilities", [])) - ACTOR_CAPABILITIES) if invalid: raise ContractError(f"ActorRecord.capabilities contains invalid values: {invalid}") @@ -168,7 +181,7 @@ def validate_dict(cls, record: dict[str, Any]) -> dict[str, Any]: @dataclass(slots=True) class SchemaContractRecord(JsonContract): schema: ClassVar[str] = "sourceos.schema-record/v1alpha1" - required_fields: ClassVar[set[str]] = { + schema_required_keys: ClassVar[set[str]] = { "schema_id", "object_type", "schema_version", @@ -187,7 +200,7 @@ class SchemaContractRecord(JsonContract): schema_id: str object_type: str schema_version: str - required_fields: list[str] + required_field_names: list[str] field_owners: dict[str, str] migration_policy: str conflict_policy: str @@ -195,6 +208,33 @@ class SchemaContractRecord(JsonContract): sync_visibility: str retention_class: str + @classmethod + def validate_dict(cls, record: dict[str, Any]) -> dict[str, Any]: + if not isinstance(record, dict): + raise ContractError(f"{cls.__name__} must be a JSON object") + require_fields(record, cls.schema_required_keys, cls.__name__) + for field_name, allowed in cls.controlled_fields.items(): + require_allowed(str(record.get(field_name)), allowed, field_name, cls.__name__) + for field_name in cls.list_fields: + require_list(record, field_name, cls.__name__) + for field_name in cls.mapping_fields: + require_mapping(record, field_name, cls.__name__) + return record + + def to_dict(self) -> dict[str, Any]: + data = super(SchemaContractRecord, self).to_dict() + data["required_fields"] = data.pop("required_field_names") + return data + + @classmethod + def from_dict(cls, record: dict[str, Any]) -> JsonContract: + cls.validate_dict(record) + converted = dict(record) + converted["required_field_names"] = converted.pop("required_fields") + allowed = {field.name for field in dataclasses.fields(cls)} + values = {key: value for key, value in converted.items() if key in allowed} + return cls(**values) # type: ignore[arg-type] + @dataclass(slots=True) class SourceObjectRecord(JsonContract): @@ -301,11 +341,13 @@ class ConflictRecord(JsonContract): "schema_version", "severity", "merge_policy", + "decision_card", "user_explanation", "created_at", } controlled_fields: ClassVar[dict[str, set[str]]] = {"severity": CONFLICT_SEVERITIES} list_fields: ClassVar[set[str]] = {"actors", "devices"} + mapping_fields: ClassVar[set[str]] = {"decision_card"} conflict_id: str object_id: str @@ -317,9 +359,31 @@ class ConflictRecord(JsonContract): schema_version: str severity: str merge_policy: str + decision_card: dict[str, Any] user_explanation: str created_at: str + @classmethod + def validate_dict(cls, record: dict[str, Any]) -> dict[str, Any]: + super(ConflictRecord, cls).validate_dict(record) + card = record.get("decision_card") + if not isinstance(card, dict): + raise ContractError("ConflictRecord.decision_card must be an object") + require_fields(card, {"card_id", "options", "recommended_option", "selected_option", "rationale", "selected_by", "selected_at"}, "ConflictRecord.decision_card") + if not isinstance(card.get("options"), list): + raise ContractError("ConflictRecord.decision_card.options must be a list") + options = {str(option) for option in card.get("options", [])} + invalid_options = sorted(options - DECISION_CARD_OPTIONS) + if invalid_options: + raise ContractError(f"ConflictRecord.decision_card.options has invalid values: {invalid_options}") + recommended = str(card.get("recommended_option")) + selected = str(card.get("selected_option")) + if recommended not in options: + raise ContractError("ConflictRecord.decision_card.recommended_option must be present in options") + if selected not in options: + raise ContractError("ConflictRecord.decision_card.selected_option must be present in options") + return record + @dataclass(slots=True) class PolicyDecisionRecord(JsonContract): @@ -397,6 +461,101 @@ class AgentObjectTransactionRecord(JsonContract): updated_at: str +@dataclass(slots=True) +class WorkspaceOperationRecord(JsonContract): + schema: ClassVar[str] = "sourceos.workspace-operation/v1alpha1" + required_fields: ClassVar[set[str]] = { + "operation_id", + "local_operation_id", + "operation_type", + "status", + "workspace_id", + "profile_id", + "actor_id", + "device_id", + "encryption_profile_ref", + "local_log_ref", + "checkpoint_ref", + "causal_parent_ids", + "conflict_ids", + "provisional_artifact_ids", + "artifact_reconciliation", + "policy_decision_ref", + "artifact_admission_ref", + "ledger_evidence_refs", + "replay_safe", + "stale_task", + "lease_expired", + "created_at", + "updated_at", + } + controlled_fields: ClassVar[dict[str, set[str]]] = {"operation_type": WORKSPACE_OPERATION_TYPES, "status": WORKSPACE_OPERATION_STATUSES} + list_fields: ClassVar[set[str]] = {"causal_parent_ids", "conflict_ids", "ledger_evidence_refs"} + mapping_fields: ClassVar[set[str]] = {"provisional_artifact_ids", "artifact_reconciliation"} + + operation_id: str + local_operation_id: str + operation_type: str + status: str + workspace_id: str + profile_id: str + actor_id: str + device_id: str + encryption_profile_ref: str + local_log_ref: str + checkpoint_ref: str + causal_parent_ids: list[str] + conflict_ids: list[str] + provisional_artifact_ids: dict[str, str] + artifact_reconciliation: dict[str, str] + policy_decision_ref: str + artifact_admission_ref: str + ledger_evidence_refs: list[str] + replay_safe: bool + stale_task: bool + lease_expired: bool + created_at: str + updated_at: str + remote_operation_id: str | None = None + replay_idempotency_key: str | None = None + + +@dataclass(slots=True) +class OperationTaskRecord(JsonContract): + schema: ClassVar[str] = "sourceos.operation-task/v1alpha1" + required_fields: ClassVar[set[str]] = { + "task_id", + "operation_id", + "operation_type", + "status", + "attempt", + "idempotency_key", + "policy_decision_ref", + "evidence_refs", + "created_at", + "updated_at", + } + controlled_fields: ClassVar[dict[str, set[str]]] = {"operation_type": WORKSPACE_OPERATION_TYPES, "status": OPERATION_TASK_STATUSES} + list_fields: ClassVar[set[str]] = {"evidence_refs"} + + task_id: str + operation_id: str + operation_type: str + status: str + attempt: int + idempotency_key: str + policy_decision_ref: str + evidence_refs: list[str] + created_at: str + updated_at: str + started_at: str | None = None + completed_at: str | None = None + remote_operation_id: str | None = None + conflict_id: str | None = None + checkpoint_ref: str | None = None + replay_safe: bool = True + + CONTRACT_TYPES: dict[str, type[JsonContract]] = { ProfileRecord.schema: ProfileRecord, DeviceTrustRecord.schema: DeviceTrustRecord, @@ -408,6 +567,8 @@ class AgentObjectTransactionRecord(JsonContract): PolicyDecisionRecord.schema: PolicyDecisionRecord, IntegrityEventRecord.schema: IntegrityEventRecord, AgentObjectTransactionRecord.schema: AgentObjectTransactionRecord, + WorkspaceOperationRecord.schema: WorkspaceOperationRecord, + OperationTaskRecord.schema: OperationTaskRecord, } diff --git a/tests/test_contracts.py b/tests/test_contracts.py index 79b136c..0fa5450 100644 --- a/tests/test_contracts.py +++ b/tests/test_contracts.py @@ -12,11 +12,13 @@ ContractError, DeviceTrustRecord, IntegrityEventRecord, + OperationTaskRecord, PolicyDecisionRecord, ProfileRecord, SchemaContractRecord, SourceObjectRecord, SyncPlanRecord, + WorkspaceOperationRecord, to_json_dict, validate_contract, ) @@ -34,6 +36,8 @@ "sourceos.policy-decision/v1alpha1", "sourceos.integrity-event/v1alpha1", "sourceos.agent-object-transaction/v1alpha1", + "sourceos.workspace-operation/v1alpha1", + "sourceos.operation-task/v1alpha1", } @@ -84,6 +88,8 @@ def test_each_contract_type_round_trips_from_fixture() -> None: "policy.decision-review.json": PolicyDecisionRecord, "event.object-alpha-created.json": IntegrityEventRecord, "agent-transaction.alpha.json": AgentObjectTransactionRecord, + "workspace-operation.alpha.json": WorkspaceOperationRecord, + "operation-task.alpha.json": OperationTaskRecord, } for fixture, contract_type in mapping.items(): @@ -132,6 +138,22 @@ def test_conflict_severity_validation_rejects_unknown_severity() -> None: validate_contract(record) +def test_workspace_operation_validation_rejects_unknown_operation_type() -> None: + record = load_example("workspace-operation.alpha.json") + record["operation_type"] = "sync.operation.unknown" + + with pytest.raises(ContractError, match="operation_type"): + validate_contract(record) + + +def test_conflict_decision_card_validation_rejects_unknown_option() -> None: + record = load_example("conflict.alpha.json") + record["decision_card"]["options"] = ["merge", "escalate"] + + with pytest.raises(ContractError, match="decision_card.options"): + validate_contract(record) + + def test_unsupported_schema_is_rejected() -> None: record = {"schema": "sourceos.unknown/v1alpha1"}