Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
226 changes: 226 additions & 0 deletions src/agent_term/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
"""AgentTerm runtime configuration loading.

Configuration is intentionally declarative. It can describe local/default pipeline
posture and desired participant bindings, but it does not become authority for
agents, policy, Matrix, or any SourceOS plane.
"""

from __future__ import annotations

import json
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any

from agent_term.pipeline import DispatchPipelineConfig
from agent_term.store import DEFAULT_DB_PATH


@dataclass(frozen=True)
class EventStoreConfig:
driver: str = "sqlite"
path: str = str(DEFAULT_DB_PATH)


@dataclass(frozen=True)
class MatrixConfig:
enabled: bool = False
homeserver_url: str | None = None
user_id: str | None = None
device_name: str | None = None
rooms: dict[str, str] = field(default_factory=dict)
require_encrypted_room_posture_for_sensitive_context: bool = True
preserve_bridge_metadata: bool = True
preserve_redactions: bool = True
preserve_membership_events: bool = True


@dataclass(frozen=True)
class AgentRegistrationConfig:
require_registered_participants: bool = True
fail_closed_when_registry_unavailable: bool = True
repository: str = "SocioProphet/agent-registry"
required_for: tuple[str, ...] = ()


@dataclass(frozen=True)
class ParticipantConfig:
key: str
enabled: bool = False
mode: str | None = None
require_agent_registry_resolution: bool = True
agent_registry_id: str | None = None
require_policy_approval_for_mutation: bool = False
require_policy_approval_for_side_effects: bool = False
disable_for_sensitive_context: bool = False
metadata: dict[str, object] = field(default_factory=dict)


@dataclass(frozen=True)
class PlaneConfig:
key: str
enabled: bool = False
repository: str | None = None
role: str | None = None
metadata: dict[str, object] = field(default_factory=dict)


@dataclass(frozen=True)
class LocalRuntimeFixture:
"""Local test/dev fixture for the in-memory runtime backends."""

registered_agents: tuple[str, ...] = ()
tool_grants: tuple[str, ...] = ()
allow_policies: tuple[str, ...] = ()
deny_policies: tuple[str, ...] = ()
pending_policies: tuple[str, ...] = ()


@dataclass(frozen=True)
class AgentTermConfig:
workspace: str = "sourceos"
default_channel: str = "!sourceos-ops"
event_store: EventStoreConfig = field(default_factory=EventStoreConfig)
matrix: MatrixConfig = field(default_factory=MatrixConfig)
agent_registration: AgentRegistrationConfig = field(default_factory=AgentRegistrationConfig)
planes: dict[str, PlaneConfig] = field(default_factory=dict)
participants: dict[str, ParticipantConfig] = field(default_factory=dict)
local_runtime: LocalRuntimeFixture = field(default_factory=LocalRuntimeFixture)
raw: dict[str, object] = field(default_factory=dict)

def pipeline_config(self) -> DispatchPipelineConfig:
return DispatchPipelineConfig(
require_matrix_posture_for_sensitive_context=(
self.matrix.require_encrypted_room_posture_for_sensitive_context
),
require_agent_registry_for_participants=(
self.agent_registration.require_registered_participants
),
require_policy_for_admitted_events=True,
)

def participant_agent_id(self, participant: str) -> str | None:
config = self.participants.get(participant)
return config.agent_registry_id if config else None



def load_config(path: Path | str | None) -> AgentTermConfig:
if path is None:
return AgentTermConfig()
config_path = Path(path)
with config_path.open("r", encoding="utf-8") as handle:
raw = json.load(handle)
if not isinstance(raw, dict):
raise ValueError("AgentTerm config must be a JSON object")
return config_from_dict(raw)


def config_from_dict(raw: dict[str, Any]) -> AgentTermConfig:
event_store_raw = _dict(raw.get("eventStore"))
matrix_raw = _dict(raw.get("matrix"))
registration_raw = _dict(raw.get("agentRegistration"))
participants_raw = _dict(raw.get("participants"))
planes_raw = _dict(raw.get("planes"))
local_runtime_raw = _dict(raw.get("localRuntime"))

participants = {
key: _participant_config(key, _dict(value)) for key, value in participants_raw.items()
}
planes = {key: _plane_config(key, _dict(value)) for key, value in planes_raw.items()}

return AgentTermConfig(
workspace=str(raw.get("workspace") or "sourceos"),
default_channel=str(raw.get("defaultChannel") or "!sourceos-ops"),
event_store=EventStoreConfig(
driver=str(event_store_raw.get("driver") or "sqlite"),
path=str(event_store_raw.get("path") or DEFAULT_DB_PATH),
),
matrix=MatrixConfig(
enabled=bool(matrix_raw.get("enabled", False)),
homeserver_url=_optional_str(matrix_raw.get("homeserverUrl")),
user_id=_optional_str(matrix_raw.get("userId")),
device_name=_optional_str(matrix_raw.get("deviceName")),
rooms={str(key): str(value) for key, value in _dict(matrix_raw.get("rooms")).items()},
require_encrypted_room_posture_for_sensitive_context=bool(
matrix_raw.get("requireEncryptedRoomPostureForSensitiveContext", True)
),
preserve_bridge_metadata=bool(matrix_raw.get("preserveBridgeMetadata", True)),
preserve_redactions=bool(matrix_raw.get("preserveRedactions", True)),
preserve_membership_events=bool(matrix_raw.get("preserveMembershipEvents", True)),
),
agent_registration=AgentRegistrationConfig(
require_registered_participants=bool(
registration_raw.get("requireRegisteredParticipants", True)
),
fail_closed_when_registry_unavailable=bool(
registration_raw.get("failClosedWhenRegistryUnavailable", True)
),
repository=str(registration_raw.get("repository") or "SocioProphet/agent-registry"),
required_for=tuple(str(item) for item in _list(registration_raw.get("requiredFor"))),
),
planes=planes,
participants=participants,
local_runtime=LocalRuntimeFixture(
registered_agents=tuple(
str(item) for item in _list(local_runtime_raw.get("registeredAgents"))
),
tool_grants=tuple(str(item) for item in _list(local_runtime_raw.get("toolGrants"))),
allow_policies=tuple(str(item) for item in _list(local_runtime_raw.get("allowPolicies"))),
deny_policies=tuple(str(item) for item in _list(local_runtime_raw.get("denyPolicies"))),
pending_policies=tuple(
str(item) for item in _list(local_runtime_raw.get("pendingPolicies"))
),
),
raw=raw,
)


def _participant_config(key: str, raw: dict[str, Any]) -> ParticipantConfig:
known = {
"enabled",
"mode",
"requireAgentRegistryResolution",
"agentRegistryId",
"requirePolicyApprovalForMutation",
"requirePolicyApprovalForSideEffects",
"disableForSensitiveContext",
}
return ParticipantConfig(
key=key,
enabled=bool(raw.get("enabled", False)),
mode=_optional_str(raw.get("mode")),
require_agent_registry_resolution=bool(raw.get("requireAgentRegistryResolution", True)),
agent_registry_id=_optional_str(raw.get("agentRegistryId")),
require_policy_approval_for_mutation=bool(
raw.get("requirePolicyApprovalForMutation", False)
),
require_policy_approval_for_side_effects=bool(
raw.get("requirePolicyApprovalForSideEffects", False)
),
disable_for_sensitive_context=bool(raw.get("disableForSensitiveContext", False)),
metadata={key_: value for key_, value in raw.items() if key_ not in known},
)


def _plane_config(key: str, raw: dict[str, Any]) -> PlaneConfig:
known = {"enabled", "repository", "role"}
return PlaneConfig(
key=key,
enabled=bool(raw.get("enabled", False)),
repository=_optional_str(raw.get("repository")),
role=_optional_str(raw.get("role")),
metadata={key_: value for key_, value in raw.items() if key_ not in known},
)


def _dict(value: object) -> dict[str, Any]:
return value if isinstance(value, dict) else {}


def _list(value: object) -> list[object]:
return value if isinstance(value, list) else []


def _optional_str(value: object) -> str | None:
return str(value) if value is not None else None
55 changes: 36 additions & 19 deletions src/agent_term/dispatch_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from agent_term.agent_registry import InMemoryAgentRegistryBackend, ToolGrant
from agent_term.agentplane import AgentPlaneAdapter, InMemoryAgentPlaneBackend
from agent_term.cloudshell_fog import CloudShellFogAdapter, InMemoryCloudShellFogBackend
from agent_term.config import AgentTermConfig, load_config
from agent_term.events import AgentTermEvent
from agent_term.knowledge import (
HolmesAdapter,
Expand Down Expand Up @@ -56,7 +57,8 @@ def build_parser() -> argparse.ArgumentParser:
parser.add_argument("kind", help="Event kind, e.g. memory_recall, agent_message, context_pack.")
parser.add_argument("channel", help="Logical channel or Matrix room alias/ID.")
parser.add_argument("body", help="Event body.")
parser.add_argument("--db", default=str(DEFAULT_DB_PATH), help="Path to local AgentTerm SQLite event log.")
parser.add_argument("--config", help="Optional AgentTerm JSON config path.")
parser.add_argument("--db", help="Path to local AgentTerm SQLite event log.")
parser.add_argument("--sender", default="@operator")
parser.add_argument("--thread-id")
parser.add_argument("--metadata-json", default="{}")
Expand Down Expand Up @@ -86,10 +88,11 @@ def parse_metadata(metadata_json: str) -> dict[str, object]:
return value


def build_event(args: argparse.Namespace) -> AgentTermEvent:
def build_event(args: argparse.Namespace, config: AgentTermConfig) -> AgentTermEvent:
metadata = parse_metadata(args.metadata_json)
if args.agent_id:
metadata["agent_id"] = args.agent_id
agent_id = args.agent_id or config.participant_agent_id(args.source)
if agent_id:
metadata["agent_id"] = agent_id
if args.tool:
metadata["tool"] = args.tool
if args.policy_action:
Expand All @@ -111,10 +114,12 @@ def build_event(args: argparse.Namespace) -> AgentTermEvent:
)


def build_registry_backend(args: argparse.Namespace) -> InMemoryAgentRegistryBackend:
agent_ids = set(args.register_agent)
if args.agent_id:
agent_ids.add(args.agent_id)
def build_registry_backend(args: argparse.Namespace, config: AgentTermConfig) -> InMemoryAgentRegistryBackend:
agent_ids = set(config.local_runtime.registered_agents)
agent_ids.update(args.register_agent)
agent_id = args.agent_id or config.participant_agent_id(args.source)
if agent_id:
agent_ids.add(agent_id)

agents = [
AgentRegistration(
Expand All @@ -125,7 +130,7 @@ def build_registry_backend(args: argparse.Namespace) -> InMemoryAgentRegistryBac
)
for agent_id in sorted(agent_ids)
]
grants = [_parse_grant(raw) for raw in args.grant]
grants = [_parse_grant(raw) for raw in (*config.local_runtime.tool_grants, *args.grant)]
return InMemoryAgentRegistryBackend(agents=agents, grants=grants)


Expand All @@ -138,13 +143,17 @@ def _parse_grant(raw: str) -> ToolGrant:
return ToolGrant(grant_id=grant_id, agent_id=agent_id, tool=tool)


def build_policy_backend(args: argparse.Namespace, event: AgentTermEvent) -> InMemoryPolicyFabricBackend:
def build_policy_backend(
args: argparse.Namespace,
event: AgentTermEvent,
config: AgentTermConfig,
) -> InMemoryPolicyFabricBackend:
decisions: list[PolicyDecision] = []
for action in args.allow_policy:
for action in (*config.local_runtime.allow_policies, *args.allow_policy):
decisions.append(_decision(action, ALLOW, args.policy_ref))
for action in args.deny_policy:
for action in (*config.local_runtime.deny_policies, *args.deny_policy):
decisions.append(_decision(action, DENY, args.policy_ref, reason="denied by dispatch CLI"))
for action in args.pending_policy:
for action in (*config.local_runtime.pending_policies, *args.pending_policy):
decisions.append(_decision(action, PENDING, args.policy_ref))

if args.policy_action and args.policy_action not in {decision.action for decision in decisions}:
Expand All @@ -165,9 +174,14 @@ def _decision(action: str, status: str, policy_ref: str, reason: str | None = No
)


def build_pipeline(args: argparse.Namespace, event: AgentTermEvent, store: EventStore) -> OperatorDispatchPipeline:
registry_backend = build_registry_backend(args)
policy_backend = build_policy_backend(args, event)
def build_pipeline(
args: argparse.Namespace,
event: AgentTermEvent,
store: EventStore,
config: AgentTermConfig,
) -> OperatorDispatchPipeline:
registry_backend = build_registry_backend(args, config)
policy_backend = build_policy_backend(args, event, config)
participant_backend = InMemoryParticipantBackend()

adapters = (
Expand All @@ -191,15 +205,18 @@ def build_pipeline(args: argparse.Namespace, event: AgentTermEvent, store: Event
agent_registry_adapter=AgentRegistryAdapter(registry_backend),
policy_fabric_adapter=PolicyFabricAdapter(policy_backend),
adapters=adapters,
config=config.pipeline_config(),
)


def main(argv: list[str] | None = None) -> int:
args = build_parser().parse_args(argv)
event = build_event(args)
store = EventStore(Path(args.db))
config = load_config(args.config)
event = build_event(args, config)
db_path = Path(args.db or config.event_store.path or DEFAULT_DB_PATH)
store = EventStore(db_path)
try:
outcome = build_pipeline(args, event, store).dispatch(event)
outcome = build_pipeline(args, event, store, config).dispatch(event)
status = "ok" if outcome.ok else "blocked"
print(f"dispatch_status={status}")
if outcome.adapter_key:
Expand Down
Loading
Loading