From 231ce43f4e4288f3ba1e143d71c88e08e19099f9 Mon Sep 17 00:00:00 2001 From: George-iam Date: Sun, 1 Mar 2026 16:21:11 +0000 Subject: [PATCH] feat: add stream-first intent observation helpers Introduce send/list/resolve intent lifecycle helpers plus observe/wait_for semantics with SSE-first delivery and polling fallback so SDK consumers can implement continuation delivery without RPC callbacks. Made-with: Cursor --- README.md | 17 ++++ axme_sdk/client.py | 213 ++++++++++++++++++++++++++++++++++++++++++- tests/test_client.py | 170 ++++++++++++++++++++++++++++++++++ 3 files changed, 399 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index fa3f65e..0d9aa0a 100644 --- a/README.md +++ b/README.md @@ -36,6 +36,23 @@ with AxmeClient(config) as client: ) print(result) print(client.get_intent(result["intent_id"])["intent"]["status"]) + sent_intent_id = client.send_intent( + { + "intent_type": "notify.message.v1", + "from_agent": "agent://example/sender", + "to_agent": "agent://example/receiver", + "payload": {"text": "hello again"}, + }, + idempotency_key="send-intent-001", + ) + print(sent_intent_id) + print(client.list_intent_events(sent_intent_id, since=0)) + for event in client.observe(sent_intent_id, since=0, wait_seconds=10): + print(event["event_type"], event["status"]) + if event["status"] in {"COMPLETED", "FAILED", "CANCELED"}: + break + terminal = client.wait_for(sent_intent_id, timeout_seconds=30) + print(terminal["status"]) inbox = client.list_inbox(owner_agent="agent://example/receiver", trace_id="trace-inbox-001") print(inbox) thread = client.get_inbox_thread("11111111-1111-4111-8111-111111111111", owner_agent="agent://example/receiver") diff --git a/axme_sdk/client.py b/axme_sdk/client.py index d2f739d..bfa90af 100644 --- a/axme_sdk/client.py +++ b/axme_sdk/client.py @@ -1,8 +1,9 @@ from __future__ import annotations from dataclasses import dataclass +import json import time -from typing import Any, Callable +from typing import Any, Callable, Iterator from uuid import uuid4 import httpx @@ -83,6 +84,154 @@ def create_intent( def get_intent(self, intent_id: str, *, trace_id: str | None = None) -> dict[str, Any]: return self._request_json("GET", f"/v1/intents/{intent_id}", trace_id=trace_id, retryable=True) + def send_intent( + self, + payload: dict[str, Any], + *, + correlation_id: str | None = None, + idempotency_key: str | None = None, + trace_id: str | None = None, + ) -> str: + created = self.create_intent( + payload, + correlation_id=correlation_id or str(uuid4()), + idempotency_key=idempotency_key, + trace_id=trace_id, + ) + intent_id = created.get("intent_id") + if not isinstance(intent_id, str) or not intent_id: + raise ValueError("create_intent response does not include string intent_id") + return intent_id + + def list_intent_events( + self, + intent_id: str, + *, + since: int | None = None, + trace_id: str | None = None, + ) -> dict[str, Any]: + params: dict[str, str] | None = None + if since is not None: + if since < 0: + raise ValueError("since must be >= 0") + params = {"since": str(since)} + return self._request_json( + "GET", + f"/v1/intents/{intent_id}/events", + params=params, + trace_id=trace_id, + retryable=True, + ) + + def resolve_intent( + self, + intent_id: str, + payload: dict[str, Any], + *, + idempotency_key: str | None = None, + trace_id: str | None = None, + ) -> dict[str, Any]: + return self._request_json( + "POST", + f"/v1/intents/{intent_id}/resolve", + json_body=payload, + idempotency_key=idempotency_key, + trace_id=trace_id, + retryable=idempotency_key is not None, + ) + + def observe( + self, + intent_id: str, + *, + since: int = 0, + wait_seconds: int = 15, + poll_interval_seconds: float = 1.0, + timeout_seconds: float | None = None, + trace_id: str | None = None, + ) -> Iterator[dict[str, Any]]: + if since < 0: + raise ValueError("since must be >= 0") + if wait_seconds < 1: + raise ValueError("wait_seconds must be >= 1") + if poll_interval_seconds < 0: + raise ValueError("poll_interval_seconds must be >= 0") + if timeout_seconds is not None and timeout_seconds <= 0: + raise ValueError("timeout_seconds must be > 0 when provided") + + deadline = (time.monotonic() + timeout_seconds) if timeout_seconds is not None else None + next_since = since + + while True: + if deadline is not None and time.monotonic() >= deadline: + raise TimeoutError(f"timed out while observing intent {intent_id}") + + stream_wait_seconds = wait_seconds + if deadline is not None: + seconds_left = max(0.0, deadline - time.monotonic()) + if seconds_left <= 0: + raise TimeoutError(f"timed out while observing intent {intent_id}") + stream_wait_seconds = max(1, min(wait_seconds, int(seconds_left))) + + try: + for event in self._iter_intent_events_stream( + intent_id=intent_id, + since=next_since, + wait_seconds=stream_wait_seconds, + trace_id=trace_id, + ): + next_since = _max_seen_seq(next_since=next_since, event=event) + yield event + if _is_terminal_intent_event(event): + return + except AxmeHttpError as exc: + if exc.status_code not in {404, 405, 501}: + raise + + polled = self.list_intent_events( + intent_id, + since=next_since if next_since > 0 else None, + trace_id=trace_id, + ) + events = polled.get("events") + if not isinstance(events, list): + raise AxmeHttpError(502, "invalid intent events payload: events must be list", body=polled) + if not events: + if deadline is not None and time.monotonic() >= deadline: + raise TimeoutError(f"timed out while observing intent {intent_id}") + time.sleep(poll_interval_seconds) + continue + + for event in events: + if not isinstance(event, dict): + continue + next_since = _max_seen_seq(next_since=next_since, event=event) + yield event + if _is_terminal_intent_event(event): + return + + def wait_for( + self, + intent_id: str, + *, + since: int = 0, + wait_seconds: int = 15, + poll_interval_seconds: float = 1.0, + timeout_seconds: float | None = None, + trace_id: str | None = None, + ) -> dict[str, Any]: + for event in self.observe( + intent_id, + since=since, + wait_seconds=wait_seconds, + poll_interval_seconds=poll_interval_seconds, + timeout_seconds=timeout_seconds, + trace_id=trace_id, + ): + if _is_terminal_intent_event(event): + return event + raise RuntimeError(f"intent observation finished without terminal event for {intent_id}") + def list_inbox(self, *, owner_agent: str | None = None, trace_id: str | None = None) -> dict[str, Any]: params: dict[str, str] | None = None if owner_agent is not None: @@ -598,6 +747,53 @@ def _request_json( raise RuntimeError("unreachable retry loop state") + def _iter_intent_events_stream( + self, + *, + intent_id: str, + since: int, + wait_seconds: int, + trace_id: str | None, + ) -> Iterator[dict[str, Any]]: + headers: dict[str, str] | None = None + normalized_trace_id = self._normalize_trace_id(trace_id) + if normalized_trace_id is not None: + headers = {"X-Trace-Id": normalized_trace_id} + + with self._http.stream( + "GET", + f"/v1/intents/{intent_id}/events/stream", + params={"since": str(since), "wait_seconds": str(wait_seconds)}, + headers=headers, + ) as response: + if response.status_code >= 400: + self._raise_http_error(response) + + current_event: str | None = None + data_lines: list[str] = [] + for line in response.iter_lines(): + if line == "": + if current_event == "stream.timeout": + return + if current_event and data_lines: + try: + payload = json.loads("\n".join(data_lines)) + except ValueError: + payload = None + if isinstance(payload, dict) and current_event.startswith("intent."): + yield payload + current_event = None + data_lines = [] + continue + if line.startswith(":"): + continue + if line.startswith("event:"): + current_event = line.partition(":")[2].strip() + continue + if line.startswith("data:"): + data_lines.append(line.partition(":")[2].lstrip()) + continue + def _mcp_request( self, *, @@ -780,3 +976,18 @@ def _matches_json_type(*, value: Any, accepted_types: list[str]) -> bool: if type_name == "array" and isinstance(value, list): return True return False + + +def _max_seen_seq(*, next_since: int, event: dict[str, Any]) -> int: + raw_seq = event.get("seq") + if isinstance(raw_seq, int) and raw_seq >= 0: + return max(next_since, raw_seq) + return next_since + + +def _is_terminal_intent_event(event: dict[str, Any]) -> bool: + status = event.get("status") + if isinstance(status, str) and status in {"COMPLETED", "FAILED", "CANCELED"}: + return True + event_type = event.get("event_type") + return isinstance(event_type, str) and event_type in {"intent.completed", "intent.failed", "intent.canceled"} diff --git a/tests/test_client.py b/tests/test_client.py index ce3c965..a68d138 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -182,6 +182,176 @@ def handler(request: httpx.Request) -> httpx.Response: assert client.get_intent(intent_id)["intent"]["intent_id"] == intent_id +def test_send_intent_returns_intent_id() -> None: + def handler(request: httpx.Request) -> httpx.Response: + assert request.method == "POST" + assert request.url.path == "/v1/intents" + body = json.loads(request.read().decode("utf-8")) + assert isinstance(body["correlation_id"], str) + return httpx.Response(200, json={"intent_id": "33333333-3333-4333-8333-333333333333"}) + + client = _client(handler) + intent_id = client.send_intent( + { + "intent_type": "notify.message.v1", + "to_agent": "agent://x", + "from_agent": "agent://y", + "payload": {"text": "hello"}, + }, + idempotency_key="send-1", + ) + assert intent_id == "33333333-3333-4333-8333-333333333333" + + +def test_send_intent_requires_response_intent_id() -> None: + client = _client(lambda request: httpx.Response(200, json={"ok": True})) + + with pytest.raises(ValueError, match="intent_id"): + client.send_intent( + { + "intent_type": "notify.message.v1", + "to_agent": "agent://x", + "from_agent": "agent://y", + "payload": {}, + } + ) + + +def test_list_intent_events_success() -> None: + intent_id = "22222222-2222-4222-8222-222222222222" + + def handler(request: httpx.Request) -> httpx.Response: + assert request.method == "GET" + assert request.url.path == f"/v1/intents/{intent_id}/events" + assert request.url.params.get("since") == "2" + return httpx.Response( + 200, + json={ + "ok": True, + "events": [ + { + "intent_id": intent_id, + "seq": 3, + "event_type": "intent.completed", + "status": "COMPLETED", + "at": "2026-02-28T00:00:10Z", + } + ], + }, + ) + + client = _client(handler) + response = client.list_intent_events(intent_id, since=2) + assert response["ok"] is True + assert response["events"][0]["seq"] == 3 + + +def test_resolve_intent_success() -> None: + intent_id = "22222222-2222-4222-8222-222222222222" + + def handler(request: httpx.Request) -> httpx.Response: + assert request.method == "POST" + assert request.url.path == f"/v1/intents/{intent_id}/resolve" + assert request.headers["idempotency-key"] == "resolve-1" + body = json.loads(request.read().decode("utf-8")) + assert body["status"] == "COMPLETED" + return httpx.Response( + 200, + json={ + "ok": True, + "intent": {"intent_id": intent_id, "status": "done"}, + "event": {"intent_id": intent_id, "seq": 3, "event_type": "intent.completed", "status": "COMPLETED"}, + "completion_delivery": {"delivered": False, "reason": "reply_to_not_set"}, + }, + ) + + client = _client(handler) + response = client.resolve_intent( + intent_id, + {"status": "COMPLETED", "result": {"answer": "done"}}, + idempotency_key="resolve-1", + ) + assert response["event"]["event_type"] == "intent.completed" + + +def test_observe_prefers_stream_and_yields_terminal_event() -> None: + intent_id = "22222222-2222-4222-8222-222222222222" + + def handler(request: httpx.Request) -> httpx.Response: + assert request.method == "GET" + assert request.url.path == f"/v1/intents/{intent_id}/events/stream" + assert request.url.params.get("since") == "1" + assert request.url.params.get("wait_seconds") == "5" + sse_payload = ( + "id: 2\n" + "event: intent.submitted\n" + "data: {\"intent_id\":\"22222222-2222-4222-8222-222222222222\",\"seq\":2," + "\"event_type\":\"intent.submitted\",\"status\":\"SUBMITTED\",\"at\":\"2026-02-28T00:00:01Z\"}\n\n" + "id: 3\n" + "event: intent.completed\n" + "data: {\"intent_id\":\"22222222-2222-4222-8222-222222222222\",\"seq\":3," + "\"event_type\":\"intent.completed\",\"status\":\"COMPLETED\",\"at\":\"2026-02-28T00:00:10Z\"}\n\n" + ) + return httpx.Response(200, text=sse_payload) + + client = _client(handler) + observed = list(client.observe(intent_id, since=1, wait_seconds=5, poll_interval_seconds=0)) + assert [item["event_type"] for item in observed] == ["intent.submitted", "intent.completed"] + + +def test_observe_falls_back_to_polling_when_stream_unavailable() -> None: + intent_id = "22222222-2222-4222-8222-222222222222" + calls = {"stream": 0, "poll": 0} + + def handler(request: httpx.Request) -> httpx.Response: + if request.url.path.endswith("/events/stream"): + calls["stream"] += 1 + return httpx.Response(404, json={"error": "not found"}) + assert request.url.path.endswith("/events") + calls["poll"] += 1 + return httpx.Response( + 200, + json={ + "ok": True, + "events": [ + { + "intent_id": intent_id, + "seq": 1, + "event_type": "intent.created", + "status": "CREATED", + "at": "2026-02-28T00:00:00Z", + }, + { + "intent_id": intent_id, + "seq": 2, + "event_type": "intent.completed", + "status": "COMPLETED", + "at": "2026-02-28T00:00:10Z", + }, + ], + }, + ) + + client = _client(handler) + observed = list(client.observe(intent_id, poll_interval_seconds=0)) + assert [item["event_type"] for item in observed] == ["intent.created", "intent.completed"] + assert calls["stream"] == 1 + assert calls["poll"] == 1 + + +def test_wait_for_raises_timeout_without_terminal_event() -> None: + intent_id = "22222222-2222-4222-8222-222222222222" + + def handler(request: httpx.Request) -> httpx.Response: + if request.url.path.endswith("/events/stream"): + return httpx.Response(404, json={"error": "not found"}) + return httpx.Response(200, json={"ok": True, "events": []}) + + client = _client(handler) + with pytest.raises(TimeoutError): + client.wait_for(intent_id, timeout_seconds=0.01, poll_interval_seconds=0) + + def test_list_inbox_success() -> None: thread = _thread_payload()