diff --git a/.coverage b/.coverage new file mode 100644 index 0000000..41ac1f4 Binary files /dev/null and b/.coverage differ diff --git a/.coverage.coder-rob-rob-web-97d3fbcc6330-6bd87969d4-wl7xh.1933182.XnWOPWYx b/.coverage.coder-rob-rob-web-97d3fbcc6330-6bd87969d4-wl7xh.1933182.XnWOPWYx new file mode 100644 index 0000000..7ba98c1 Binary files /dev/null and b/.coverage.coder-rob-rob-web-97d3fbcc6330-6bd87969d4-wl7xh.1933182.XnWOPWYx differ diff --git a/.coverage.coder-rob-rob-web-97d3fbcc6330-6bd87969d4-wl7xh.1935624.XKqLCknx b/.coverage.coder-rob-rob-web-97d3fbcc6330-6bd87969d4-wl7xh.1935624.XKqLCknx new file mode 100644 index 0000000..7ba98c1 Binary files /dev/null and b/.coverage.coder-rob-rob-web-97d3fbcc6330-6bd87969d4-wl7xh.1935624.XKqLCknx differ diff --git a/.coverage.coder-rob-rob-web-97d3fbcc6330-6bd87969d4-wl7xh.1937271.XsCrUXhx b/.coverage.coder-rob-rob-web-97d3fbcc6330-6bd87969d4-wl7xh.1937271.XsCrUXhx new file mode 100644 index 0000000..c614c01 Binary files /dev/null and b/.coverage.coder-rob-rob-web-97d3fbcc6330-6bd87969d4-wl7xh.1937271.XsCrUXhx differ diff --git a/STREAMING_SUMMARY.md b/STREAMING_SUMMARY.md new file mode 100644 index 0000000..21018dc --- /dev/null +++ b/STREAMING_SUMMARY.md @@ -0,0 +1,90 @@ +# SSE Streaming Implementation Summary + +## Overview +Added real-time streaming support to FastA2A via Server-Sent Events (SSE), enabling agents to stream responses as they are generated, making the framework fully compliant with the A2A specification v0.2.5. + +## Key Changes + +### 1. Broker Layer (fasta2a/broker.py) +- Added abstract methods `send_stream_event()` and `subscribe_to_stream()` to the Broker interface +- Implemented pub/sub infrastructure in InMemoryBroker: + - Thread-safe subscriber management with locks + - Automatic cleanup of disconnected subscribers + - Support for streaming Task, Message, TaskStatusUpdateEvent, and TaskArtifactUpdateEvent + +### 2. Application Layer (fasta2a/applications.py) +- Added configurable `streaming` parameter to FastA2A constructor +- Implemented `message/stream` endpoint that returns EventSourceResponse +- Updated agent capabilities to correctly report streaming support + +### 3. Task Manager (fasta2a/task_manager.py) +- Added `stream_message()` async generator method +- Yields initial task, then streams all subsequent events from broker +- Handles task execution in background while streaming events + +### 4. Worker Integration +- Workers can emit streaming events using `broker.send_stream_event()` +- No changes to Worker base class needed - direct broker usage is cleaner + +### 5. Dependencies +- Added `sse-starlette>=2.0.0` for SSE response handling +- Added `httpx-sse` to dev dependencies for testing + +## Testing +- Comprehensive unit tests for broker pub/sub (7 tests) +- Integration tests for streaming endpoint (8 tests) +- Unit tests for task manager streaming (12 tests) +- Unit tests for agent card functionality (5 tests) +- Total: 32 tests, 90.18% coverage +- Tests use proper async synchronization instead of sleep-based timing + +## Usage Example + +```python +# Enable streaming in FastA2A +app = FastA2A( + storage=storage, + broker=broker, + streaming=True # Enable SSE streaming +) + +# Workers emit events during execution +async def run_task(self, params: TaskSendParams): + task_id = params['id'] + + # Emit status updates + await self.broker.send_stream_event(task_id, { + 'kind': 'status-update', + 'task_id': task_id, + 'status': {'state': 'working'}, + 'final': False + }) + + # Emit artifact chunks + await self.broker.send_stream_event(task_id, { + 'kind': 'artifact-update', + 'task_id': task_id, + 'artifact': {'parts': [{'kind': 'text', 'text': 'Chunk 1'}]}, + 'append': False + }) +``` + +## Client Usage + +```python +# Using httpx-sse client +async with aconnect_sse(client, 'POST', '/', json={ + 'jsonrpc': '2.0', + 'method': 'message/stream', + 'params': {'message': {...}} +}) as event_source: + async for sse in event_source.aiter_sse(): + event = json.loads(sse.data) + # Process streaming event +``` + +## Benefits +- Real-time response streaming reduces perceived latency +- Supports incremental artifact updates +- Fully compliant with A2A specification +- Backwards compatible - streaming is optional \ No newline at end of file diff --git a/a2a.md b/a2a.md new file mode 100644 index 0000000..85d9755 --- /dev/null +++ b/a2a.md @@ -0,0 +1,231 @@ +──────────────────────────────────────────────────────── +Agent‑to‑Agent Protocol (A2A) — Comprehensive Reference +Spec version : 0.2.3 • Publish date : 2025‑06‑14 +Homepage : https://a2aproject.github.io/A2A/latest/ +──────────────────────────────────────────────────────── + +1. PURPOSE & SCOPE +──────────────────────────────────────────────────────── +A2A defines **how independent AI agents communicate as peers**, with +built‑in discovery, capability negotiation, long‑running task management, +streaming, push notifications and enterprise‑grade security. Its goals are: + +• Interoperability between heterogeneous agent stacks +• Dynamic discovery of skills & auth requirements +• Secure, opaque collaboration (no internal state leaks) +• First‑class support for async / streaming & human‑in‑loop workflows +• Basing everything on well‑known Web standards (HTTP, JSON‑RPC 2.0, SSE) :contentReference[oaicite:0]{index=0} + +2. CORE MODEL +──────────────────────────────────────────────────────── +Concept | Description +-------------------- | ----------------------------------------------------- +A2A Client | Initiator acting for a user / upstream agent +A2A Server | Remote agent exposing an A2A endpoint +Task | Long‑lived stateful unit of work +Message | One conversational turn; contains **Part[]** +Artifact | Durable output produced by a task +Agent Card | JSON metadata document describing identity, skills, + | endpoint URL & security requirements :contentReference[oaicite:1]{index=1} + +3. TRANSPORT & ENCODING +──────────────────────────────────────────────────────── +Transport : **HTTPS** (TLS 1.2+) POST to Agent Card .url +Payload format : **JSON‑RPC 2.0** (`Content‑Type: application/json`) +Streaming channel : **Server‑Sent Events** (`text/event-stream`) used by + `message/stream` and `tasks/resubscribe`. +SSE payload : each event’s *data* field is a full JSON‑RPC Response. :contentReference[oaicite:2]{index=2} + +4. SECURITY MODEL +──────────────────────────────────────────────────────── +Layered security = transport TLS + declared auth schemes. + +• Auth schemes are declared in `AgentCard.securitySchemes` + – mirrors OpenAPI: `apiKey`, `http` (Basic / Bearer), `oauth2`, + `openIdConnect`. +• Required scheme combinations are listed in `AgentCard.security` +• Missing / invalid credentials → HTTP 401/403 with `WWW‑Authenticate`. +• Optional **secondary (in‑task) auth** — task transitions to + `auth-required` until satisfied. :contentReference[oaicite:3]{index=3} + +5. DISCOVERY – THE AGENT CARD +──────────────────────────────────────────────────────── +Recommended URL : `https://{host}/.well‑known/agent.json` +Minimal top‑level schema (TS‑style): + +```ts +interface AgentCard { + name: string; + description: string; + url: string; // base A2A endpoint + provider?: AgentProvider; // org info + version: string; // agent impl version + iconUrl?: string; // 64×64+ PNG/SVG + documentationUrl?: string; + capabilities: AgentCapabilities; + securitySchemes?: { [name: string]: SecurityScheme }; + security?: { [name: string]: string[] }[]; + defaultInputModes: string[]; // e.g. ["text/plain","application/json"] + defaultOutputModes: string[]; // e.g. ["application/json"] + skills: AgentSkill[]; + supportsAuthenticatedExtendedCard?: boolean; // if true: expose +} +``` :contentReference[oaicite:4]{index=4} + +Key sub‑objects +• **AgentCapabilities** → `{ streaming?: bool, pushNotifications?: bool, + stateTransitionHistory?: bool, extensions?: AgentExtension[] }` +• **AgentSkill** → `{ id, name, description, tags[], examples?[], + inputModes?, outputModes? }` +• **SecurityScheme** → union of OpenAPI schemes (API Key, HTTP, OAuth2, + OpenID Connect). :contentReference[oaicite:5]{index=5} + +6. DATA OBJECTS +──────────────────────────────────────────────────────── +6.1 Task +```ts +interface Task { + id: string; + contextId: string; + status: TaskStatus; + artifacts?: Artifact[]; + history?: Message[]; + metadata?: Record; + kind: "task"; +} +``` :contentReference[oaicite:6]{index=6} + +6.2 TaskStatus & TaskState enum +States: `submitted | working | input‑required | completed | canceled | +failed | rejected | auth‑required | unknown`. +`completed, canceled, failed, rejected` are **terminal**. :contentReference[oaicite:7]{index=7} + +6.3 Message & Part +```ts +type Part = TextPart | FilePart | DataPart; + +interface Message { + role: "user" | "agent"; + parts: Part[]; + messageId: string; + contextId?: string; + taskId?: string; + referenceTaskIds?: string[]; + metadata?: Record; + extensions?: string[]; + kind: "message"; +} +``` :contentReference[oaicite:8]{index=8} + +• **TextPart**→ `{ kind:"text", text }` +• **FilePart**→ `{ kind:"file", file: FileWithBytes | FileWithUri }` +• **DataPart**→ `{ kind:"data", data: {...} }` +• **Artifact**→ `{ artifactId, parts:Part[], name?, description?, metadata?, extensions? }` :contentReference[oaicite:9]{index=9} + +6.4 PushNotificationConfig +```ts +interface PushNotificationConfig { + id?: string; // server‑assigned + url: string; // HTTPS webhook + token?: string; // opaque HMAC/shared secret + authentication?: PushNotificationAuthenticationInfo; +} +``` :contentReference[oaicite:10]{index=10} + +6.5 JSON‑RPC Structures +`JSONRPCRequest`, `JSONRPCResponse`, `JSONRPCError` – standard 2.0 +shapes, plus A2A‑specific `result` payloads. :contentReference[oaicite:11]{index=11} + +7. RPC METHOD CATALOG +──────────────────────────────────────────────────────── +All calls are **POST** `AgentCard.url` with a JSON‑RPC 2.0 request. + +Method | Params → Result (success) | Notes +---------------------------- | --------------------------------------------- | ------------------------------------------------------ +`message/send` | **MessageSendParams** → `Task | Message` | Sync request; client polls `tasks/get` or SSE +`message/stream` | MessageSendParams → *SSE* stream of `Message` \| `Task` \| `TaskStatusUpdateEvent` \| `TaskArtifactUpdateEvent` | Requires `capabilities.streaming=true` +`tasks/get` | **/* TaskQueryParams** → `Task` | Poll task status / history +`tasks/cancel` | **TaskIdParams** → `Task` | Attempt cancellation +`tasks/pushNotificationConfig/set` | **TaskPushNotificationConfig** → same | Requires `capabilities.pushNotifications=true` +`tasks/pushNotificationConfig/get` | GetTaskPushNotificationConfigParams → TaskPushNotificationConfig | +`tasks/pushNotificationConfig/list`| ListTaskPushNotificationConfigParams → TaskPushNotificationConfig[] | +`tasks/pushNotificationConfig/delete`| TaskIdParams + configId → (void) | +`tasks/resubscribe` | TaskIdParams → *SSE* | Re‑attach after network drop +`agent/authenticatedExtendedCard` | AuthenticatedExtendedCardParams → AgentCard | Returns richer card after auth :contentReference[oaicite:12]{index=12} + +Essential parameter types +```ts +interface MessageSendParams { + message: Message; + configuration?: { + acceptedOutputModes: string[]; + historyLength?: number; + pushNotificationConfig?: PushNotificationConfig; + blocking?: boolean; // if true: HTTP held open until terminal + }; + metadata?: Record; +} + +interface TaskQueryParams { id: string; historyLength?: number } +interface TaskIdParams { id: string } +interface TaskPushNotificationConfig { taskId: string; pushNotificationConfig: PushNotificationConfig } +``` :contentReference[oaicite:13]{index=13} + +7.1 Streaming events +• **TaskStatusUpdateEvent** → `{ taskId, contextId, kind:"status‑update", status:TaskStatus, final:boolean }` +• **TaskArtifactUpdateEvent** → `{ taskId, contextId, kind:"artifact-update", artifact:Artifact, append?, lastChunk? }` :contentReference[oaicite:14]{index=14} + +8. ERROR HANDLING +──────────────────────────────────────────────────────── +A2A reuses JSON‑RPC codes and reserves **‑32000…‑32099** for protocol‑specific +errors. + +Code | Name | Typical meaning +------ | --------------------------------- | ---------------------------------------- +‑32001 | TaskNotFoundError | Unknown/expired task ID +‑32002 | TaskNotCancelableError | Task is not in cancelable state +‑32003 | PushNotificationNotSupportedError | Server lacks push‑notification capability +‑32004 | UnsupportedOperationError | Feature or parameter not supported +‑32005 | ContentTypeNotSupportedError | Unaccepted MIME type in parts/artifacts +‑32006 | InvalidAgentResponseError | Server produced invalid response shape :contentReference[oaicite:15]{index=15} +Standard codes ‑32700…‑32603 follow JSON‑RPC 2.0. Include explanatory +`message` and optional structured `data`. + +9. TYPICAL WORKFLOWS (NON‑NORMATIVE) +──────────────────────────────────────────────────────── +**Synchronous / Polling** + +1. Fetch Agent Card → pick skill → form first `Message` +2. `message/send` → server returns `Task { state:working }` +3. Client polls `tasks/get` until terminal state +4. On `completed` → download artifacts/messages as needed + +**Streaming** + +1. `message/stream` → HTTP 200 + SSE +2. Parse stream events; optionally `tasks/resubscribe` if dropped +3. Close stream when final status update or artifact chunk with `lastChunk:true` + +**Input‑Required turn** + +*Task* enters `input‑required` → client collects user input → call +`message/send` with same `taskId` (inside Message.taskId). Task continues. + +**Push‑notifications** + +• Register via `tasks/pushNotificationConfig/set` → Server POSTs JSON‑RPC +envelopes to `url`, signed per `token` and `authentication`. :contentReference[oaicite:16]{index=16} + +10. EXTENSIBILITY +──────────────────────────────────────────────────────── +• **Extensions** declared in `AgentCapabilities.extensions` and echoed in +messages/artifacts via the `extensions` string array. +• New RPC methods MAY be added using `namespace/verb` naming, provided +clients fail gracefully on `‑32601 Method not found`. +• New TaskState values SHOULD reserve lower‑case, dash‑separated strings and +be treated as non‑terminal unless explicitly documented. + +──────────────────────────────────────────────────────── +END OF SPEC (A2A v0.2.3) +──────────────────────────────────────────────────────── */ + diff --git a/fasta2a/applications.py b/fasta2a/applications.py index c5a5b5d..2da742b 100644 --- a/fasta2a/applications.py +++ b/fasta2a/applications.py @@ -1,10 +1,12 @@ from __future__ import annotations as _annotations +import json from collections.abc import AsyncIterator, Sequence from contextlib import asynccontextmanager from pathlib import Path from typing import Any +from sse_starlette import EventSourceResponse from starlette.applications import Starlette from starlette.middleware import Middleware from starlette.requests import Request @@ -21,6 +23,8 @@ a2a_request_ta, a2a_response_ta, agent_card_ta, + stream_event_ta, + stream_message_request_ta, ) from .storage import Storage from .task_manager import TaskManager @@ -41,6 +45,7 @@ def __init__( description: str | None = None, provider: AgentProvider | None = None, skills: list[Skill] | None = None, + streaming: bool = False, # Starlette debug: bool = False, routes: Sequence[Route] | None = None, @@ -65,6 +70,7 @@ def __init__( self.description = description self.provider = provider self.skills = skills or [] + self.streaming = streaming # NOTE: For now, I don't think there's any reason to support any other input/output modes. self.default_input_modes = ['application/json'] self.default_output_modes = ['application/json'] @@ -94,7 +100,7 @@ async def _agent_card_endpoint(self, request: Request) -> Response: default_input_modes=self.default_input_modes, default_output_modes=self.default_output_modes, capabilities=AgentCapabilities( - streaming=False, push_notifications=False, state_transition_history=False + streaming=self.streaming, push_notifications=False, state_transition_history=False ), ) if self.provider is not None: @@ -125,6 +131,25 @@ async def _agent_run_endpoint(self, request: Request) -> Response: if a2a_request['method'] == 'message/send': jsonrpc_response = await self.task_manager.send_message(a2a_request) + elif a2a_request['method'] == 'message/stream': + # Parse the streaming request + stream_request = stream_message_request_ta.validate_json(data) + + # Create an async generator wrapper that formats events as JSON-RPC responses + async def sse_generator(): + request_id = stream_request.get('id') + async for event in self.task_manager.stream_message(stream_request): + # Serialize event to ensure proper camelCase conversion + event_dict = stream_event_ta.dump_python(event, mode='json', by_alias=True) + + # Wrap in JSON-RPC response + jsonrpc_response = {'jsonrpc': '2.0', 'id': request_id, 'result': event_dict} + + # Convert to JSON string + yield json.dumps(jsonrpc_response) + + # Return SSE response + return EventSourceResponse(sse_generator()) elif a2a_request['method'] == 'tasks/get': jsonrpc_response = await self.task_manager.get_task(a2a_request) elif a2a_request['method'] == 'tasks/cancel': diff --git a/fasta2a/broker.py b/fasta2a/broker.py index c84b738..e535500 100644 --- a/fasta2a/broker.py +++ b/fasta2a/broker.py @@ -4,17 +4,21 @@ from collections.abc import AsyncIterator from contextlib import AsyncExitStack from dataclasses import dataclass -from typing import Annotated, Any, Generic, Literal, TypeVar +from typing import Annotated, Any, Generic, Literal, TypeVar, Union import anyio +from anyio.streams.memory import MemoryObjectSendStream from opentelemetry.trace import Span, get_current_span, get_tracer from pydantic import Discriminator from typing_extensions import Self, TypedDict -from .schema import TaskIdParams, TaskSendParams +from .schema import Message, Task, TaskArtifactUpdateEvent, TaskIdParams, TaskSendParams, TaskStatusUpdateEvent tracer = get_tracer(__name__) +StreamEvent = Union[Task, Message, TaskStatusUpdateEvent, TaskArtifactUpdateEvent] +"""Type alias for all events that can be streamed.""" + @dataclass class Broker(ABC): @@ -30,12 +34,32 @@ class Broker(ABC): @abstractmethod async def run_task(self, params: TaskSendParams) -> None: """Send a task to be executed by the worker.""" - raise NotImplementedError('send_run_task is not implemented yet.') + ... @abstractmethod async def cancel_task(self, params: TaskIdParams) -> None: """Cancel a task.""" - raise NotImplementedError('send_cancel_task is not implemented yet.') + ... + + @abstractmethod + async def send_stream_event(self, task_id: str, event: StreamEvent) -> None: + """Send a streaming event from worker to subscribers. + + This is used by workers to publish status updates, messages, and artifacts + during task execution. Events are forwarded to all active subscribers of + the given task_id. + """ + ... + + @abstractmethod + def subscribe_to_stream(self, task_id: str) -> AsyncIterator[StreamEvent]: + """Subscribe to streaming events for a specific task. + + Returns an async iterator that yields events published by workers for the + given task_id. The iterator completes when a TaskStatusUpdateEvent with + final=True is received or the subscription is cancelled. + """ + ... @abstractmethod async def __aenter__(self) -> Self: ... @@ -73,6 +97,10 @@ class _TaskOperation(TypedDict, Generic[OperationT, ParamsT]): class InMemoryBroker(Broker): """A broker that schedules tasks in memory.""" + def __init__(self): + self._event_subscribers: dict[str, list[MemoryObjectSendStream[StreamEvent]]] = {} + self._subscriber_lock: anyio.Lock | None = None + async def __aenter__(self): self.aexit_stack = AsyncExitStack() await self.aexit_stack.__aenter__() @@ -81,6 +109,8 @@ async def __aenter__(self): await self.aexit_stack.enter_async_context(self._read_stream) await self.aexit_stack.enter_async_context(self._write_stream) + self._subscriber_lock = anyio.Lock() + return self async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any): @@ -96,3 +126,65 @@ async def receive_task_operations(self) -> AsyncIterator[TaskOperation]: """Receive task operations from the broker.""" async for task_operation in self._read_stream: yield task_operation + + async def send_stream_event(self, task_id: str, event: StreamEvent) -> None: + """Send a streaming event from worker to subscribers.""" + assert self._subscriber_lock is not None, 'Broker not initialized' + + async with self._subscriber_lock: + subscribers = self._event_subscribers.get(task_id, []) + if not subscribers: + return + + # Send event to all subscribers, removing closed streams + active_subscribers: list[MemoryObjectSendStream[StreamEvent]] = [] + for stream in subscribers: + try: + await stream.send(event) + active_subscribers.append(stream) + except (anyio.ClosedResourceError, anyio.BrokenResourceError): + # Subscriber disconnected, remove from list + pass + + # Update subscriber list with only active ones + if active_subscribers: + self._event_subscribers[task_id] = active_subscribers + else: + # No active subscribers left, clean up + del self._event_subscribers[task_id] + + async def subscribe_to_stream(self, task_id: str) -> AsyncIterator[StreamEvent]: + """Subscribe to streaming events for a specific task.""" + assert self._subscriber_lock is not None, 'Broker not initialized' + + # Create a new stream for this subscriber + send_stream, receive_stream = anyio.create_memory_object_stream[StreamEvent](max_buffer_size=100) + + # Register the subscriber + async with self._subscriber_lock: + if task_id not in self._event_subscribers: + self._event_subscribers[task_id] = [] + self._event_subscribers[task_id].append(send_stream) + + try: + async with receive_stream: + async for event in receive_stream: + yield event + + # Check if this is a final status update + if isinstance(event, dict) and event.get('kind') == 'status-update' and event.get('final', False): + break + finally: + # Clean up subscription on exit + async with self._subscriber_lock: + if task_id in self._event_subscribers: + try: + self._event_subscribers[task_id].remove(send_stream) + if not self._event_subscribers[task_id]: + del self._event_subscribers[task_id] + except ValueError: + # Already removed + pass + + # Close the send stream + await send_stream.aclose() diff --git a/fasta2a/schema.py b/fasta2a/schema.py index f500be4..d1bdbbd 100644 --- a/fasta2a/schema.py +++ b/fasta2a/schema.py @@ -797,3 +797,7 @@ class JSONRPCResponse(JSONRPCMessage, Generic[ResultT, ErrorT]): send_message_response_ta: TypeAdapter[SendMessageResponse] = TypeAdapter(SendMessageResponse) stream_message_request_ta: TypeAdapter[StreamMessageRequest] = TypeAdapter(StreamMessageRequest) stream_message_response_ta: TypeAdapter[StreamMessageResponse] = TypeAdapter(StreamMessageResponse) + +# Type adapter for streaming events +StreamEvent = Union[Task, Message, TaskStatusUpdateEvent, TaskArtifactUpdateEvent] +stream_event_ta: TypeAdapter[StreamEvent] = TypeAdapter(StreamEvent) diff --git a/fasta2a/task_manager.py b/fasta2a/task_manager.py index df06e62..8aa156f 100644 --- a/fasta2a/task_manager.py +++ b/fasta2a/task_manager.py @@ -60,12 +60,14 @@ from __future__ import annotations as _annotations +import asyncio import uuid +from collections.abc import AsyncGenerator from contextlib import AsyncExitStack from dataclasses import dataclass, field from typing import Any -from .broker import Broker +from .broker import Broker, StreamEvent from .schema import ( CancelTaskRequest, CancelTaskResponse, @@ -79,7 +81,6 @@ SetTaskPushNotificationRequest, SetTaskPushNotificationResponse, StreamMessageRequest, - StreamMessageResponse, TaskNotFoundError, TaskSendParams, ) @@ -126,6 +127,10 @@ async def send_message(self, request: SendMessageRequest) -> SendMessageResponse if history_length is not None: broker_params['history_length'] = history_length + metadata = request['params'].get('metadata') + if metadata is not None: + broker_params['metadata'] = metadata + await self.broker.run_task(broker_params) return SendMessageResponse(jsonrpc='2.0', id=request_id, result=task) @@ -156,9 +161,44 @@ async def cancel_task(self, request: CancelTaskRequest) -> CancelTaskResponse: ) return CancelTaskResponse(jsonrpc='2.0', id=request['id'], result=task) - async def stream_message(self, request: StreamMessageRequest) -> StreamMessageResponse: - """Stream messages using Server-Sent Events.""" - raise NotImplementedError('message/stream method is not implemented yet.') + async def stream_message(self, request: StreamMessageRequest) -> AsyncGenerator[StreamEvent, None]: + """Handle a streaming message request. + + This method: + 1. Creates and submits a new task + 2. Yields the initial task object + 3. Subscribes to the broker's event stream + 4. Starts task execution asynchronously + 5. Streams all events until completion + """ + # Extract parameters + params = request['params'] + message = params['message'] + context_id = message.get('context_id', str(uuid.uuid4())) + + # Create and submit the task + task = await self.storage.submit_task(context_id, message) + + # Yield the initial task + yield task + + # Prepare broker params + broker_params: TaskSendParams = {'id': task['id'], 'context_id': context_id, 'message': message} + config = params.get('configuration', {}) + history_length = config.get('history_length') + if history_length is not None: + broker_params['history_length'] = history_length + + metadata = params.get('metadata') + if metadata is not None: + broker_params['metadata'] = metadata + + # Start task execution in background + asyncio.create_task(self.broker.run_task(broker_params)) + + # Stream events from broker + async for event in self.broker.subscribe_to_stream(task['id']): + yield event async def set_task_push_notification( self, request: SetTaskPushNotificationRequest diff --git a/fasta2a/worker.py b/fasta2a/worker.py index bcb0172..9127f10 100644 --- a/fasta2a/worker.py +++ b/fasta2a/worker.py @@ -14,7 +14,12 @@ if TYPE_CHECKING: from .broker import Broker, TaskOperation - from .schema import Artifact, Message, TaskIdParams, TaskSendParams + from .schema import ( + Artifact, + Message, + TaskIdParams, + TaskSendParams, + ) tracer = get_tracer(__name__) diff --git a/pyproject.toml b/pyproject.toml index 2af3b7d..d1018e8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,6 +48,7 @@ dependencies = [ "pydantic>=2.10", "opentelemetry-api>=1.28.0", "eval_type_backport>=0.2.2; python_version <= '3.9'", + "sse-starlette>=2.0.0", ] [project.optional-dependencies] @@ -58,8 +59,11 @@ dev = [ "asgi-lifespan", "coverage", "httpx", + "httpx-sse", "inline-snapshot", "pytest", + "pytest-asyncio", + "pytest-mock", "ruff", "pyright", ] @@ -120,3 +124,24 @@ include = [ "fasta2a", "tests", ] + +[tool.coverage.run] +branch = true +source = ["fasta2a"] +omit = ["fasta2a/static/*"] + +[tool.coverage.report] +skip_covered = true +show_missing = true +precision = 2 +exclude_lines = [ + # Standard marker for code that's not covered + 'pragma: no cover', + # Don't complain about abstract methods and ellipsis + '@abstractmethod', + '^\s*\.\.\.\s*$', + # Don't complain about TYPE_CHECKING blocks + 'if TYPE_CHECKING:', + # Don't complain about NotImplementedError + 'raise NotImplementedError', +] diff --git a/tests/test_applications.py b/tests/test_applications.py index 6c7440d..d4d6630 100644 --- a/tests/test_applications.py +++ b/tests/test_applications.py @@ -44,3 +44,91 @@ async def test_agent_card(): }, } ) + + +async def test_agent_card_with_all_params(): + """Test agent card with all parameters specified.""" + app = FastA2A( + storage=InMemoryStorage(), + broker=InMemoryBroker(), + name='Test Agent', + url='https://example.com', + version='2.0.0', + description='A test agent', + provider={'organization': 'Test Provider', 'url': 'https://example.com'}, + skills=[ + { + 'id': 'skill1', + 'name': 'Skill 1', + 'description': 'First test skill', + 'tags': ['test', 'skill1'], + 'input_modes': ['application/json'], + 'output_modes': ['application/json'], + }, + { + 'id': 'skill2', + 'name': 'Skill 2', + 'description': 'Second test skill', + 'tags': ['test', 'skill2'], + 'input_modes': ['application/json'], + 'output_modes': ['application/json'], + }, + ], + streaming=True, + ) + async with create_test_client(app) as client: + response = await client.get('/.well-known/agent.json') + assert response.status_code == 200 + data = response.json() + assert data['name'] == 'Test Agent' + assert data['url'] == 'https://example.com' + assert data['version'] == '2.0.0' + assert data['description'] == 'A test agent' + assert data['provider']['organization'] == 'Test Provider' + assert len(data['skills']) == 2 + assert data['skills'][0]['id'] == 'skill1' + assert data['skills'][1]['id'] == 'skill2' + assert data['capabilities']['streaming'] is True + + +async def test_agent_card_head_and_options(): + """Test HEAD and OPTIONS methods for agent card.""" + app = FastA2A(storage=InMemoryStorage(), broker=InMemoryBroker()) + async with create_test_client(app) as client: + # Test HEAD + head_response = await client.head('/.well-known/agent.json') + assert head_response.status_code == 200 + + # Test OPTIONS + options_response = await client.options('/.well-known/agent.json') + assert options_response.status_code == 200 + + +async def test_agent_card_caching(): + """Test that agent card is cached after first generation.""" + app = FastA2A(storage=InMemoryStorage(), broker=InMemoryBroker(), name='Original') + async with create_test_client(app) as client: + # First request + response1 = await client.get('/.well-known/agent.json') + assert response1.status_code == 200 + data1 = response1.json() + assert data1['name'] == 'Original' + + # Modify app (shouldn't affect cached response) + app.name = 'Modified' + + # Second request should return cached + response2 = await client.get('/.well-known/agent.json') + assert response2.status_code == 200 + data2 = response2.json() + assert data2['name'] == 'Original' + + +async def test_docs_endpoint(): + """Test the /docs endpoint.""" + app = FastA2A(storage=InMemoryStorage(), broker=InMemoryBroker()) + async with create_test_client(app) as client: + response = await client.get('/docs') + assert response.status_code == 200 + assert response.headers['content-type'] == 'text/html; charset=utf-8' + assert b'' in response.content or b' TaskStatusUpdateEvent: + """Create a TaskStatusUpdateEvent with common defaults.""" + event: TaskStatusUpdateEvent = { + 'kind': 'status-update', + 'task_id': task_id, + 'context_id': 'test-context', + 'status': {'state': state}, + 'final': final, + } + if metadata: + event['metadata'] = metadata + return event + + +def make_artifact_event( + task_id: str, text: str, append: bool = False, last_chunk: bool = False +) -> TaskArtifactUpdateEvent: + """Create a TaskArtifactUpdateEvent with common defaults.""" + return { + 'kind': 'artifact-update', + 'task_id': task_id, + 'context_id': 'test-context', + 'artifact': {'artifact_id': 'artifact-1', 'parts': [{'kind': 'text', 'text': text}]}, + 'append': append, + 'last_chunk': last_chunk, + } + + +def is_final_status_event(event: StreamEvent) -> bool: + """Check if event is a final status update.""" + return isinstance(event, dict) and event.get('kind') == 'status-update' and bool(event.get('final')) + + +@pytest.mark.asyncio +async def test_broker_pub_sub_single_subscriber(): + """Test basic pub/sub with a single subscriber.""" + async with InMemoryBroker() as broker: + task_id = 'test-task-123' + events_received: list[StreamEvent] = [] + + # Create events to track subscriber lifecycle + subscriber_ready = asyncio.Event() + subscriber_done = asyncio.Event() + + async def subscriber(): + # Set ready event to signal we're about to start listening + subscriber_ready.set() + async for event in broker.subscribe_to_stream(task_id): + events_received.append(event) + # Check for final event + if is_final_status_event(event): + break + subscriber_done.set() + + # Start subscriber in background + subscriber_task = asyncio.create_task(subscriber()) + + # Wait for subscriber to be ready + await subscriber_ready.wait() + + # Send some events + test_task: Task = { + 'id': task_id, + 'context_id': 'test-context', + 'kind': 'task', + 'status': {'state': 'submitted'}, + } + await broker.send_stream_event(task_id, test_task) + + status_update = make_status_event(task_id, 'working') + await broker.send_stream_event(task_id, status_update) + + final_update = make_status_event(task_id, 'completed', final=True) + await broker.send_stream_event(task_id, final_update) + + # Wait for subscriber to complete + await subscriber_done.wait() + await subscriber_task + + # Verify events were received + assert len(events_received) == 3 + assert events_received[0] == test_task + assert events_received[1] == status_update + assert events_received[2] == final_update + + +@pytest.mark.asyncio +async def test_broker_pub_sub_multiple_subscribers(): + """Test pub/sub with multiple subscribers to the same task.""" + async with InMemoryBroker() as broker: + task_id = 'test-task-456' + events_sub1: list[StreamEvent] = [] + events_sub2: list[StreamEvent] = [] + + # Create ready and completion events + sub1_ready = asyncio.Event() + sub2_ready = asyncio.Event() + sub1_done = asyncio.Event() + sub2_done = asyncio.Event() + + async def subscriber1(): + sub1_ready.set() + async for event in broker.subscribe_to_stream(task_id): + events_sub1.append(event) + if is_final_status_event(event): + break + sub1_done.set() + + async def subscriber2(): + sub2_ready.set() + async for event in broker.subscribe_to_stream(task_id): + events_sub2.append(event) + if is_final_status_event(event): + break + sub2_done.set() + + # Start both subscribers + sub1_task = asyncio.create_task(subscriber1()) + sub2_task = asyncio.create_task(subscriber2()) + + # Wait for both subscribers to be ready + await sub1_ready.wait() + await sub2_ready.wait() + + # Send event + test_event = make_status_event(task_id, 'completed', final=True) + await broker.send_stream_event(task_id, test_event) + + # Wait for both subscribers + await sub1_done.wait() + await sub2_done.wait() + await sub1_task + await sub2_task + + # Both should receive the event + assert len(events_sub1) == 1 + assert len(events_sub2) == 1 + assert events_sub1[0] == test_event + assert events_sub2[0] == test_event + + +@pytest.mark.asyncio +async def test_broker_no_subscribers(): + """Test sending events when there are no subscribers.""" + async with InMemoryBroker() as broker: + task_id = 'test-task-789' + + # This should not raise an error even with no subscribers + test_event = make_status_event(task_id, 'working') + await broker.send_stream_event(task_id, test_event) + + # Test passes if no error is raised when sending with no subscribers + + +@pytest.mark.asyncio +async def test_broker_subscriber_cleanup_on_disconnect(): + """Test that broker continues to work correctly after subscribers disconnect.""" + async with InMemoryBroker() as broker: + task_id = 'test-task-cleanup' + first_subscriber_events: list[StreamEvent] = [] + + # Create ready event for first subscriber + first_subscriber_ready = asyncio.Event() + + # Create a subscriber that exits early + async def early_exit_subscriber(): + first_subscriber_ready.set() + async for event in broker.subscribe_to_stream(task_id): + first_subscriber_events.append(event) + # Exit after first event + break + + # Start subscriber + subscriber_task = asyncio.create_task(early_exit_subscriber()) + + # Wait for subscriber to be ready + await first_subscriber_ready.wait() + + # Send first event + await broker.send_stream_event(task_id, make_status_event(task_id, 'working')) + + # Wait for subscriber to exit + await subscriber_task + + # Verify first subscriber received the event before disconnecting + assert len(first_subscriber_events) == 1 + + # Add a second subscriber to verify system continues working + events_received: list[StreamEvent] = [] + second_subscriber_ready = asyncio.Event() + complete = asyncio.Event() + + async def second_subscriber(): + second_subscriber_ready.set() + async for event in broker.subscribe_to_stream(task_id): + events_received.append(event) + if isinstance(event, dict) and event.get('final'): + break + complete.set() + + sub2_task = asyncio.create_task(second_subscriber()) + await second_subscriber_ready.wait() + + # Send another event - verifies broker works after first subscriber disconnected + await broker.send_stream_event(task_id, make_status_event(task_id, 'completed', final=True)) + + # Wait for second subscriber + await complete.wait() + await sub2_task + + # Verify the second subscriber got only the new event (not the old one) + assert len(events_received) == 1 + event = events_received[0] + assert isinstance(event, dict) and event.get('kind') == 'status-update' + # Type narrow to TaskStatusUpdateEvent + status_event = cast(TaskStatusUpdateEvent, event) + assert status_event['status']['state'] == 'completed' + + +@pytest.mark.asyncio +async def test_broker_artifact_streaming(): + """Test streaming artifact update events.""" + async with InMemoryBroker() as broker: + task_id = 'test-task-artifacts' + events_received: list[StreamEvent] = [] + + subscriber_ready = asyncio.Event() + complete = asyncio.Event() + + async def subscriber(): + subscriber_ready.set() + async for event in broker.subscribe_to_stream(task_id): + events_received.append(event) + if isinstance(event, dict) and event.get('kind') == 'artifact-update' and event.get('last_chunk'): + break + complete.set() + + # Start subscriber + subscriber_task = asyncio.create_task(subscriber()) + await subscriber_ready.wait() + + # Send artifact updates + artifact1 = make_artifact_event(task_id, 'Hello') + await broker.send_stream_event(task_id, artifact1) + + artifact2 = make_artifact_event(task_id, ' World', append=True, last_chunk=True) + await broker.send_stream_event(task_id, artifact2) + + # Wait for completion + await complete.wait() + await subscriber_task + + # Verify both artifacts received + assert len(events_received) == 2 + + # Check first artifact + event0 = events_received[0] + assert isinstance(event0, dict) and event0.get('kind') == 'artifact-update' + # Type narrow to TaskArtifactUpdateEvent + artifact_event0 = cast(TaskArtifactUpdateEvent, event0) + artifact_parts = artifact_event0['artifact']['parts'] + assert len(artifact_parts) > 0 + part = artifact_parts[0] + assert part['kind'] == 'text' and 'text' in part and part['text'] == 'Hello' + + # Check second artifact + event1 = events_received[1] + assert isinstance(event1, dict) and event1.get('kind') == 'artifact-update' + # Type narrow to TaskArtifactUpdateEvent + artifact_event1 = cast(TaskArtifactUpdateEvent, event1) + artifact_parts = artifact_event1['artifact']['parts'] + assert len(artifact_parts) > 0 + part = artifact_parts[0] + assert part['kind'] == 'text' and 'text' in part and part['text'] == ' World' + assert artifact_event1.get('append') is True + assert artifact_event1.get('last_chunk') is True + + +@pytest.mark.asyncio +async def test_broker_concurrent_operations(): + """Test concurrent pub/sub operations.""" + async with InMemoryBroker() as broker: + num_tasks = 5 + num_events_per_task = 10 + + results: dict[str, list[StreamEvent]] = {f'task-{i}': [] for i in range(num_tasks)} + + async def subscriber(task_id: str): + async for event in broker.subscribe_to_stream(task_id): + results[task_id].append(event) + if isinstance(event, dict) and event.get('final'): + break + + async def publisher(task_id: str): + for i in range(num_events_per_task - 1): + event = make_status_event(task_id, 'working', metadata={'message_num': i}) + await broker.send_stream_event(task_id, event) + + # Send final event + final_event = make_status_event( + task_id, 'completed', final=True, metadata={'message_num': num_events_per_task - 1} + ) + await broker.send_stream_event(task_id, final_event) + + # Start all subscribers and publishers concurrently + async with anyio.create_task_group() as tg: + for i in range(num_tasks): + task_id = f'task-{i}' + tg.start_soon(subscriber, task_id) + tg.start_soon(publisher, task_id) + + # Verify all events were received in order + for i in range(num_tasks): + task_id = f'task-{i}' + assert len(results[task_id]) == num_events_per_task + for j in range(num_events_per_task): + event = results[task_id][j] + assert isinstance(event, dict) and 'metadata' in event + assert event['metadata']['message_num'] == j + + +@pytest.mark.asyncio +async def test_broker_early_final_event(): + """Test that subscription stops on receiving a final event.""" + async with InMemoryBroker() as broker: + task_id = 'test-task-early-final' + events_received: list[StreamEvent] = [] + + subscriber_ready = asyncio.Event() + + async def subscriber(): + subscriber_ready.set() + async for event in broker.subscribe_to_stream(task_id): + events_received.append(event) + + # Start subscriber + subscriber_task = asyncio.create_task(subscriber()) + await subscriber_ready.wait() + + # Send a non-final event + await broker.send_stream_event(task_id, make_status_event(task_id, 'working')) + + # Send a final event - this should cause the subscriber to exit + await broker.send_stream_event(task_id, make_status_event(task_id, 'completed', final=True)) + + # Wait for subscriber to complete + await subscriber_task + + # Send another event after subscriber has exited - no subscribers should receive this + await broker.send_stream_event(task_id, make_status_event(task_id, 'working')) + + # Should have received only 2 events (not the third) + assert len(events_received) == 2 + # Check the second event is final + event = events_received[1] + assert isinstance(event, dict) and event.get('kind') == 'status-update' + status_event = cast(TaskStatusUpdateEvent, event) + assert status_event['final'] is True diff --git a/tests/test_streaming_integration.py b/tests/test_streaming_integration.py new file mode 100644 index 0000000..52d38f4 --- /dev/null +++ b/tests/test_streaming_integration.py @@ -0,0 +1,414 @@ +"""Integration tests for the streaming endpoint.""" + +from __future__ import annotations + +import asyncio +import json +from collections.abc import AsyncIterator, Callable +from contextlib import asynccontextmanager +from typing import Any + +import httpx +import pytest +import pytest_asyncio +from asgi_lifespan import LifespanManager +from httpx_sse import aconnect_sse +from sse_starlette.sse import AppStatus + +from fasta2a import FastA2A, Worker +from fasta2a.broker import InMemoryBroker +from fasta2a.schema import Artifact, Message, TaskIdParams, TaskSendParams +from fasta2a.storage import InMemoryStorage + + +def make_stream_request( + text: str, request_id: str | int | None = 'test-req', message_id: str = 'test-msg' +) -> dict[str, Any]: + """Create a JSON-RPC streaming request with defaults.""" + return { + 'jsonrpc': '2.0', + 'id': request_id, + 'method': 'message/stream', + 'params': { + 'message': { + 'role': 'user', + 'parts': [{'kind': 'text', 'text': text}], + 'messageId': message_id, + 'kind': 'message', + } + }, + } + + +def make_send_request( + text: str, request_id: str | int | None = 'test-req', message_id: str = 'test-msg' +) -> dict[str, Any]: + """Create a JSON-RPC send request with defaults.""" + return { + 'jsonrpc': '2.0', + 'id': request_id, + 'method': 'message/send', + 'params': { + 'message': { + 'role': 'user', + 'parts': [{'kind': 'text', 'text': text}], + 'messageId': message_id, + 'kind': 'message', + } + }, + } + + +def make_get_task_request(task_id: str, request_id: str | int | None = 'test-req') -> dict[str, Any]: + """Create a JSON-RPC get task request with defaults.""" + return {'jsonrpc': '2.0', 'id': request_id, 'method': 'tasks/get', 'params': {'id': task_id}} + + +async def collect_sse_events( + client: httpx.AsyncClient, + request_data: dict[str, Any], + stop_condition: Callable[[dict[str, Any]], bool] | None = None, +) -> list[dict[str, Any]]: + """Collect all SSE events from a streaming request.""" + events: list[dict[str, Any]] = [] + async with aconnect_sse(client, 'POST', '/', json=request_data) as event_source: + async for sse in event_source.aiter_sse(): + event_data = json.loads(sse.data) + events.append(event_data) + if stop_condition and stop_condition(event_data): + break + return events + + +def get_status_updates(events: list[dict[str, Any]]) -> list[dict[str, Any]]: + """Extract status update events.""" + return [e for e in events if e.get('result', {}).get('kind') == 'status-update'] + + +def get_artifact_updates(events: list[dict[str, Any]]) -> list[dict[str, Any]]: + """Extract artifact update events.""" + return [e for e in events if e.get('result', {}).get('kind') == 'artifact-update'] + + +def get_tasks(events: list[dict[str, Any]]) -> list[dict[str, Any]]: + """Extract task events.""" + return [e for e in events if e.get('result', {}).get('kind') == 'task'] + + +Context = list[Message] +"""The shape of the context you store in the storage.""" + + +@pytest.fixture(autouse=True) +def reset_sse_app_status(): + """Reset SSE global state between tests.""" + AppStatus.should_exit_event = None + yield + AppStatus.should_exit_event = None + + +class StreamingWorker(Worker[Context]): + """A test worker that emits streaming events.""" + + async def run_task(self, params: TaskSendParams) -> None: + task_id = params['id'] + context_id = params['context_id'] + + # Load the task + task = await self.storage.load_task(task_id) + assert task is not None + + # Update status to working + await self.storage.update_task(task_id, state='working') + + # Emit initial status update + await self.broker.send_stream_event( + task_id, + { + 'kind': 'status-update', + 'task_id': task_id, + 'context_id': context_id, + 'status': {'state': 'working'}, + 'final': False, + }, + ) + + # Simulate some work with incremental updates + result_parts = ['Hello', ' from', ' streaming', ' worker!'] + + for i, part in enumerate(result_parts): + # Create an artifact part + artifact: Artifact = {'artifact_id': 'result-1', 'parts': [{'kind': 'text', 'text': part}]} + + # Emit artifact update + await self.broker.send_stream_event( + task_id, + { + 'kind': 'artifact-update', + 'task_id': task_id, + 'context_id': context_id, + 'artifact': artifact, + 'append': i > 0, # Append after first part + 'last_chunk': i == len(result_parts) - 1, + }, + ) + + # Store the complete artifact + complete_artifact: Artifact = { + 'artifact_id': 'result-1', + 'parts': [{'kind': 'text', 'text': 'Hello from streaming worker!'}], + } + + # Update task with final status + await self.storage.update_task(task_id, state='completed', new_artifacts=[complete_artifact]) + + # Emit final status update + await self.broker.send_stream_event( + task_id, + { + 'kind': 'status-update', + 'task_id': task_id, + 'context_id': context_id, + 'status': {'state': 'completed'}, + 'final': True, + }, + ) + + async def cancel_task(self, params: TaskIdParams) -> None: + await self.storage.update_task(params['id'], state='canceled') + + def build_message_history(self, history: list[Message]) -> list[Any]: + return history + + def build_artifacts(self, result: Any) -> list[Artifact]: + return [] + + +@pytest_asyncio.fixture(scope='function') +async def streaming_app() -> FastA2A: + """Create a FastA2A app with streaming enabled and a streaming worker.""" + storage = InMemoryStorage() + broker = InMemoryBroker() + worker = StreamingWorker(storage=storage, broker=broker) + + @asynccontextmanager + async def lifespan(app: FastA2A) -> AsyncIterator[None]: + async with app.task_manager: + async with worker.run(): + yield + + app = FastA2A( + storage=storage, + broker=broker, + streaming=True, # Enable streaming + lifespan=lifespan, + ) + + return app + + +# ===== Basic Streaming Functionality ===== + + +@pytest.mark.asyncio +async def test_streaming_endpoint_basic(streaming_app: FastA2A) -> None: + """Test basic streaming functionality.""" + async with LifespanManager(streaming_app): + async with httpx.AsyncClient( + transport=httpx.ASGITransport(app=streaming_app), base_url='http://test' + ) as client: + # Send a streaming request + request_data = make_stream_request('Test streaming', 'test-1', 'msg-1') + events_received = await collect_sse_events(client, request_data) + + # Verify we received events + assert len(events_received) > 0 + + # First event should be the task + first_event = events_received[0] + assert first_event['jsonrpc'] == '2.0' + assert first_event['id'] == 'test-1' + assert 'result' in first_event + assert first_event['result']['kind'] == 'task' + assert first_event['result']['status']['state'] == 'submitted' + + # Should have status updates + status_updates = get_status_updates(events_received) + assert len(status_updates) >= 2 # At least working and completed + + # Should have artifact updates + artifact_updates = get_artifact_updates(events_received) + assert len(artifact_updates) == 4 # 4 parts + + # Last status update should be final + last_status = status_updates[-1] + assert last_status['result']['status']['state'] == 'completed' + assert last_status['result']['final'] is True + + +# ===== Artifact Streaming ===== + + +@pytest.mark.asyncio +async def test_streaming_endpoint_incremental_artifacts(streaming_app: FastA2A) -> None: + """Test that artifacts are streamed incrementally.""" + async with LifespanManager(streaming_app): + async with httpx.AsyncClient( + transport=httpx.ASGITransport(app=streaming_app), base_url='http://test' + ) as client: + request_data = make_stream_request('Test artifacts', 'test-2', 'msg-2') + events = await collect_sse_events(client, request_data) + artifact_events = [e['result'] for e in get_artifact_updates(events)] + + # Verify artifact streaming + assert len(artifact_events) == 4 + + # First artifact should not append + assert 'append' not in artifact_events[0] or artifact_events[0]['append'] is False + assert artifact_events[0]['artifact']['parts'][0]['text'] == 'Hello' + + # Subsequent artifacts should append + assert artifact_events[1]['append'] is True + assert artifact_events[1]['artifact']['parts'][0]['text'] == ' from' + + assert artifact_events[2]['append'] is True + assert artifact_events[2]['artifact']['parts'][0]['text'] == ' streaming' + + # Last artifact should be marked as last chunk + assert artifact_events[3]['append'] is True + assert artifact_events[3]['artifact']['parts'][0]['text'] == ' worker!' + assert artifact_events[3]['lastChunk'] is True + + +# ===== Streaming vs Non-Streaming Comparison ===== + + +@pytest.mark.asyncio +async def test_streaming_vs_non_streaming_endpoints(streaming_app: FastA2A) -> None: + """Test that both streaming and non-streaming endpoints work.""" + async with LifespanManager(streaming_app): + async with httpx.AsyncClient( + transport=httpx.ASGITransport(app=streaming_app), base_url='http://test' + ) as client: + # First, test non-streaming endpoint + non_streaming_request = make_send_request('Non-streaming test', 'test-3', 'msg-3') + + # Non-streaming should return immediately with task + response = await client.post('/', json=non_streaming_request) + assert response.status_code == 200 + data = response.json() + assert data['result']['kind'] == 'task' + assert data['result']['status']['state'] == 'submitted' + task_id = data['result']['id'] + + # Check task status with minimal polling + get_task_request = make_get_task_request(task_id, 'test-4') + + # Poll for completion with short timeout + for _ in range(10): # Try up to 10 times (1 second max) + response = await client.post('/', json=get_task_request) + assert response.status_code == 200 + data = response.json() + if data['result']['status']['state'] == 'completed': + break + await asyncio.sleep(0.1) # Small delay between polls + else: + pytest.fail(f'Task did not complete, final state: {data["result"]["status"]["state"]}') + + assert len(data['result'].get('artifacts', [])) == 1 + + +# ===== Agent Card and Capabilities ===== + + +@pytest.mark.asyncio +async def test_agent_card_shows_streaming_capability(streaming_app: FastA2A) -> None: + """Test that agent card correctly reports streaming capability.""" + async with LifespanManager(streaming_app): + async with httpx.AsyncClient( + transport=httpx.ASGITransport(app=streaming_app), base_url='http://test' + ) as client: + response = await client.get('/.well-known/agent.json') + assert response.status_code == 200 + + agent_card = response.json() + assert agent_card['capabilities']['streaming'] is True + assert agent_card['capabilities']['pushNotifications'] is False + assert agent_card['capabilities']['stateTransitionHistory'] is False + + +@pytest.mark.asyncio +async def test_non_streaming_app(): + """Test app with streaming disabled.""" + storage = InMemoryStorage() + broker = InMemoryBroker() + + app = FastA2A( + storage=storage, + broker=broker, + streaming=False, # Streaming disabled + ) + + async with LifespanManager(app): + async with httpx.AsyncClient(transport=httpx.ASGITransport(app=app), base_url='http://test') as client: + # Check agent card + response = await client.get('/.well-known/agent.json') + assert response.status_code == 200 + + agent_card = response.json() + assert agent_card['capabilities']['streaming'] is False + + +# ===== ID Validation Tests ===== + + +@pytest.mark.asyncio +async def test_streaming_null_id_accepted(streaming_app: FastA2A) -> None: + """Test streaming endpoint with explicit null ID - should work.""" + async with LifespanManager(streaming_app): + async with httpx.AsyncClient( + transport=httpx.ASGITransport(app=streaming_app), base_url='http://test' + ) as client: + # Request with explicit null ID + request_data = make_stream_request('Test', None, 'msg-null') + events_received = await collect_sse_events(client, request_data, lambda _: True) # Stop after first event + + # Verify the event has null ID + assert len(events_received) > 0 + assert events_received[0]['id'] is None + + +@pytest.mark.asyncio +async def test_streaming_numeric_id_accepted(streaming_app: FastA2A) -> None: + """Test streaming endpoint with numeric ID - should work per JSON-RPC spec.""" + async with LifespanManager(streaming_app): + async with httpx.AsyncClient( + transport=httpx.ASGITransport(app=streaming_app), base_url='http://test' + ) as client: + # Request with numeric ID + request_data = make_stream_request('Test', 12345, 'msg-numeric') + events_received = await collect_sse_events(client, request_data, lambda _: True) # Stop after first event + + # Verify the event has numeric ID + assert len(events_received) > 0 + assert events_received[0]['id'] == 12345 + + +@pytest.mark.asyncio +async def test_streaming_large_message(streaming_app: FastA2A) -> None: + """Test streaming with a large message payload.""" + async with LifespanManager(streaming_app): + async with httpx.AsyncClient( + transport=httpx.ASGITransport(app=streaming_app), base_url='http://test' + ) as client: + # Create a large message + large_text = 'x' * 10000 # 10KB of text + request_data = make_stream_request(large_text, 'test-large', 'msg-large') + events_received = await collect_sse_events(client, request_data) + + # Should still process successfully + assert len(events_received) > 0 + # Check we got a completed status + status_updates = get_status_updates(events_received) + last_status = status_updates[-1] + assert last_status['result']['status']['state'] == 'completed' diff --git a/tests/test_task_manager.py b/tests/test_task_manager.py new file mode 100644 index 0000000..2ad78d1 --- /dev/null +++ b/tests/test_task_manager.py @@ -0,0 +1,537 @@ +"""Tests for TaskManager class.""" + +from __future__ import annotations + +import asyncio +from collections.abc import AsyncIterator +from typing import Any, Literal +from unittest.mock import AsyncMock + +import pytest +import pytest_asyncio +from pytest_mock import MockerFixture + +from fasta2a.broker import InMemoryBroker +from fasta2a.schema import ( + CancelTaskRequest, + GetTaskRequest, + Message, + MessageSendConfiguration, + MessageSendParams, + SendMessageRequest, + StreamMessageRequest, + TaskQueryParams, + TaskSendParams, + TaskStatus, + TaskStatusUpdateEvent, + TextPart, +) +from fasta2a.storage import InMemoryStorage +from fasta2a.task_manager import TaskManager + +# ============================================================================ +# Fixtures +# ============================================================================ + + +@pytest.fixture +def storage() -> InMemoryStorage: + """Create an InMemoryStorage instance.""" + return InMemoryStorage() + + +@pytest.fixture +def broker() -> InMemoryBroker: + """Create an InMemoryBroker instance.""" + return InMemoryBroker() + + +@pytest.fixture +def task_manager_factory(broker: InMemoryBroker, storage: InMemoryStorage) -> TaskManager: + """Create a TaskManager instance without entering contexts.""" + return TaskManager(broker=broker, storage=storage) + + +@pytest_asyncio.fixture +async def task_manager( + broker: InMemoryBroker, storage: InMemoryStorage +) -> AsyncIterator[tuple[TaskManager, InMemoryBroker, InMemoryStorage]]: + """Create and enter TaskManager with broker contexts. + + Yields: + tuple: (task_manager, broker, storage) for tests that need access to all components + """ + tm = TaskManager(broker=broker, storage=storage) + async with broker: + async with tm: + yield tm, broker, storage + + +# ============================================================================ +# Helper Functions +# ============================================================================ + + +def send_message_request( + message: Message, + req_id: str = 'req-1', + configuration: MessageSendConfiguration | None = None, + metadata: dict[str, Any] | None = None, +) -> SendMessageRequest: + """Build a SendMessageRequest.""" + params: MessageSendParams = {'message': message} + if configuration: + params['configuration'] = configuration + if metadata: + params['metadata'] = metadata + return { + 'jsonrpc': '2.0', + 'id': req_id, + 'method': 'message/send', + 'params': params, + } + + +def stream_message_request( + message: Message, + req_id: str = 'req-1', + configuration: MessageSendConfiguration | None = None, + metadata: dict[str, Any] | None = None, +) -> StreamMessageRequest: + """Build a StreamMessageRequest.""" + params: MessageSendParams = {'message': message} + if configuration: + params['configuration'] = configuration + if metadata: + params['metadata'] = metadata + return { + 'jsonrpc': '2.0', + 'id': req_id, + 'method': 'message/stream', + 'params': params, + } + + +def get_task_request(task_id: str, req_id: str = 'req-1', history_length: int | None = None) -> GetTaskRequest: + """Build a GetTaskRequest.""" + params: TaskQueryParams = {'id': task_id} + if history_length is not None: + params['history_length'] = history_length + return { + 'jsonrpc': '2.0', + 'id': req_id, + 'method': 'tasks/get', + 'params': params, + } + + +def cancel_task_request(task_id: str, req_id: str = 'req-1') -> CancelTaskRequest: + """Build a CancelTaskRequest.""" + return { + 'jsonrpc': '2.0', + 'id': req_id, + 'method': 'tasks/cancel', + 'params': {'id': task_id}, + } + + +def create_test_message( + role: Literal['user', 'agent'] = 'user', + text: str = 'Hello', + message_id: str = 'msg-1', + context_id: str | None = None, +) -> Message: + """Helper to create a properly typed Message.""" + text_part: TextPart = { + 'kind': 'text', + 'text': text, + } + message: Message = { + 'role': role, # No cast needed now! + 'parts': [text_part], + 'message_id': message_id, + 'kind': 'message', + } + if context_id is not None: + message['context_id'] = context_id + return message + + +# ============================================================================ +# Tests +# ============================================================================ + + +@pytest.mark.asyncio +async def test_task_manager_context_manager(task_manager_factory: TaskManager): + """Test TaskManager as async context manager.""" + # Not running before entering context + assert not task_manager_factory.is_running + + async with task_manager_factory: + # Should be running inside context + assert task_manager_factory.is_running + + # Not running after exiting context + assert not task_manager_factory.is_running + + +@pytest.mark.asyncio +async def test_task_manager_exit_without_enter(task_manager_factory: TaskManager): + """Test exiting TaskManager without entering raises error.""" + with pytest.raises(RuntimeError, match='TaskManager was not properly initialized'): + await task_manager_factory.__aexit__(None, None, None) + + +@pytest.mark.asyncio +async def test_send_message( + task_manager: tuple[TaskManager, InMemoryBroker, InMemoryStorage], mocker: MockerFixture +) -> None: + """Test send_message method.""" + tm, broker, _storage = task_manager + + # Mock broker.run_task + mock_run_task = mocker.patch.object(broker, 'run_task', new_callable=AsyncMock) + + # Create and send message with metadata + message = create_test_message(text='Hello', message_id='msg-1') + metadata = {'user_id': '123'} + request = send_message_request(message, req_id='req-1', metadata=metadata) + + response = await tm.send_message(request) + + # Verify response + assert response['jsonrpc'] == '2.0' + assert response['id'] == 'req-1' + assert 'result' in response + + task = response['result'] + assert task['kind'] == 'task' + assert task['status']['state'] == 'submitted' + assert 'id' in task + assert 'context_id' in task + + # Verify broker.run_task was called with metadata + mock_run_task.assert_called_once() + call_args = mock_run_task.call_args[0][0] + assert call_args.get('metadata') == metadata + + +@pytest.mark.asyncio +async def test_send_message_with_context_id( + task_manager: tuple[TaskManager, InMemoryBroker, InMemoryStorage], mocker: MockerFixture +) -> None: + """Test send_message with explicit context_id.""" + tm, broker, _storage = task_manager + + # Mock broker.run_task + mocker.patch.object(broker, 'run_task', new_callable=AsyncMock) + + # Create message with context_id + message = create_test_message(text='Hello', message_id='msg-2', context_id='custom-context-123') + request = send_message_request(message, req_id='req-2') + + response = await tm.send_message(request) + + assert 'result' in response + task = response['result'] + assert 'context_id' in task + assert task['context_id'] == 'custom-context-123' + + +@pytest.mark.asyncio +async def test_send_message_with_history_length( + task_manager: tuple[TaskManager, InMemoryBroker, InMemoryStorage], mocker: MockerFixture +) -> None: + """Test send_message with history_length configuration.""" + tm, broker, _storage = task_manager + + # Mock broker.run_task + mock_run_task = mocker.patch.object(broker, 'run_task', new_callable=AsyncMock) + + # Create message with configuration + message = create_test_message(text='Hello', message_id='msg-3') + configuration: MessageSendConfiguration = { + 'history_length': 10, + 'accepted_output_modes': ['text/plain'], + } + request = send_message_request(message, req_id='req-3', configuration=configuration) + + await tm.send_message(request) + + # Verify history_length was passed to broker + mock_run_task.assert_called_once() + call_args = mock_run_task.call_args[0][0] + assert 'history_length' in call_args + assert call_args['history_length'] == 10 + + +@pytest.mark.asyncio +async def test_get_task( + task_manager: tuple[TaskManager, InMemoryBroker, InMemoryStorage], mocker: MockerFixture +) -> None: + """Test get_task method.""" + tm, broker, _storage = task_manager + + # Mock broker.run_task + mocker.patch.object(broker, 'run_task', new_callable=AsyncMock) + + # First create a task + message = create_test_message(text='Hello', message_id='msg-4') + send_request = send_message_request(message, req_id='req-4') + + send_response = await tm.send_message(send_request) + assert 'result' in send_response + assert 'id' in send_response['result'] + task_id = send_response['result']['id'] + + # Now get the task + get_request = get_task_request(task_id, req_id='req-5') + get_response = await tm.get_task(get_request) + + assert get_response['jsonrpc'] == '2.0' + assert get_response['id'] == 'req-5' + assert 'result' in get_response + assert get_response['result']['id'] == task_id + + +@pytest.mark.asyncio +async def test_get_task_not_found(task_manager: tuple[TaskManager, InMemoryBroker, InMemoryStorage]) -> None: + """Test get_task with non-existent task.""" + tm, _broker, _storage = task_manager + + get_request = get_task_request('non-existent-task', req_id='req-6') + get_response = await tm.get_task(get_request) + + assert get_response['jsonrpc'] == '2.0' + assert get_response['id'] == 'req-6' + assert 'error' in get_response + assert get_response['error']['code'] == -32001 + assert get_response['error']['message'] == 'Task not found' + + +@pytest.mark.asyncio +async def test_get_task_with_history_length( + task_manager: tuple[TaskManager, InMemoryBroker, InMemoryStorage], mocker: MockerFixture +) -> None: + """Test get_task with history_length parameter.""" + tm, broker, storage = task_manager + + # Mock broker.run_task + mocker.patch.object(broker, 'run_task', new_callable=AsyncMock) + + # Mock storage.load_task to track calls + original_load_task = storage.load_task + mock_load_task = mocker.patch.object(storage, 'load_task', new_callable=AsyncMock) + # Make it return a valid task by calling the original + mock_load_task.side_effect = original_load_task + + # Create a task first + message = create_test_message(text='Hello', message_id='msg-7') + send_request = send_message_request(message, req_id='req-7') + + send_response = await tm.send_message(send_request) + assert 'result' in send_response + assert 'id' in send_response['result'] + task_id = send_response['result']['id'] + + # Get task with history_length + get_request = get_task_request(task_id, req_id='req-8', history_length=5) + await tm.get_task(get_request) + + # Verify history_length was passed + mock_load_task.assert_called_with(task_id, 5) + + +@pytest.mark.asyncio +async def test_cancel_task( + task_manager: tuple[TaskManager, InMemoryBroker, InMemoryStorage], mocker: MockerFixture +) -> None: + """Test cancel_task method.""" + tm, broker, _storage = task_manager + + # Mock broker methods + mocker.patch.object(broker, 'run_task', new_callable=AsyncMock) + mock_cancel_task = mocker.patch.object(broker, 'cancel_task', new_callable=AsyncMock) + + # Create a task first + message = create_test_message(text='Hello', message_id='msg-9') + send_request = send_message_request(message, req_id='req-9') + + send_response = await tm.send_message(send_request) + assert 'result' in send_response + assert 'id' in send_response['result'] + task_id = send_response['result']['id'] + + # Cancel the task + cancel_request = cancel_task_request(task_id, req_id='req-10') + cancel_response = await tm.cancel_task(cancel_request) + + assert cancel_response['jsonrpc'] == '2.0' + assert cancel_response['id'] == 'req-10' + assert 'result' in cancel_response + assert cancel_response['result']['id'] == task_id + + # Verify broker.cancel_task was called + mock_cancel_task.assert_called_once() + assert mock_cancel_task.call_args[0][0]['id'] == task_id + + +@pytest.mark.asyncio +async def test_cancel_task_not_found( + task_manager: tuple[TaskManager, InMemoryBroker, InMemoryStorage], mocker: MockerFixture +) -> None: + """Test cancel_task with non-existent task.""" + tm, broker, _storage = task_manager + + # Mock broker.cancel_task + mocker.patch.object(broker, 'cancel_task', new_callable=AsyncMock) + + cancel_request = cancel_task_request('non-existent-task', req_id='req-11') + cancel_response = await tm.cancel_task(cancel_request) + + assert cancel_response['jsonrpc'] == '2.0' + assert cancel_response['id'] == 'req-11' + assert 'error' in cancel_response + assert cancel_response['error']['code'] == -32001 + assert cancel_response['error']['message'] == 'Task not found' + + +@pytest.mark.asyncio +async def test_stream_message( + task_manager: tuple[TaskManager, InMemoryBroker, InMemoryStorage], mocker: MockerFixture +) -> None: + """Test stream_message method.""" + tm, broker, _storage = task_manager + + # Track the task info for our simulated worker + task_info: dict[str, str | None] = {'task_id': None, 'context_id': None} + + # Create side effect for simulating worker + async def simulate_worker(params: TaskSendParams): + # Capture task info + task_info['task_id'] = params['id'] + task_info['context_id'] = params['context_id'] + + # Simulate worker behavior in background + async def worker_task(): + # Small delay to ensure subscriber is ready + await asyncio.sleep(0.1) + + # Send working status + if task_info['task_id'] is not None and task_info['context_id'] is not None: + status: TaskStatus = {'state': 'working'} + event: TaskStatusUpdateEvent = { + 'kind': 'status-update', + 'task_id': task_info['task_id'], + 'context_id': task_info['context_id'], + 'status': status, + 'final': False, + } + await broker.send_stream_event(task_info['task_id'], event) + + # Small delay for processing + await asyncio.sleep(0.1) + + # Send completion status + if task_info['task_id'] is not None and task_info['context_id'] is not None: + status: TaskStatus = {'state': 'completed'} + event: TaskStatusUpdateEvent = { + 'kind': 'status-update', + 'task_id': task_info['task_id'], + 'context_id': task_info['context_id'], + 'status': status, + 'final': True, + } + await broker.send_stream_event(task_info['task_id'], event) + + # Start worker simulation in background + asyncio.create_task(worker_task()) + + # Mock broker.run_task with side effect + mock_run_task = mocker.patch.object(broker, 'run_task', new_callable=AsyncMock, side_effect=simulate_worker) + + # Create stream request with metadata + message = create_test_message(text='Hello streaming', message_id='msg-12') + metadata = {'session_id': 'stream-123'} + request = stream_message_request(message, req_id='req-12', metadata=metadata) + + # Collect events + events: list[Any] = [] + async for event in tm.stream_message(request): + events.append(event) + + # Should have received the task and status updates + assert len(events) == 3 # task, working status, completed status + assert events[0]['kind'] == 'task' + assert events[1]['kind'] == 'status-update' + assert events[1]['status']['state'] == 'working' + assert events[2]['kind'] == 'status-update' + assert events[2]['status']['state'] == 'completed' + assert events[2]['final'] is True + + # Verify metadata was passed to broker + mock_run_task.assert_called_once() + call_args = mock_run_task.call_args[0][0] + assert call_args.get('metadata') == metadata + + +@pytest.mark.asyncio +async def test_stream_message_with_context_and_history( + task_manager: tuple[TaskManager, InMemoryBroker, InMemoryStorage], mocker: MockerFixture +) -> None: + """Test stream_message with context_id and history_length.""" + tm, broker, _storage = task_manager + + # Create side effect for quick completion + async def simulate_quick_completion(params: TaskSendParams): + # Simulate worker behavior in background + async def worker_task(): + # Small delay to ensure subscriber is ready + await asyncio.sleep(0.1) + + # Send completion status immediately for this test + status: TaskStatus = {'state': 'completed'} + event: TaskStatusUpdateEvent = { + 'kind': 'status-update', + 'task_id': params['id'], + 'context_id': params['context_id'], + 'status': status, + 'final': True, + } + await broker.send_stream_event(params['id'], event) + + # Start worker simulation in background + asyncio.create_task(worker_task()) + + # Mock broker.run_task with side effect + mock_run_task = mocker.patch.object( + broker, 'run_task', new_callable=AsyncMock, side_effect=simulate_quick_completion + ) + + # Create message with context and configuration + message = create_test_message(text='Hello', message_id='msg-13', context_id='stream-context-123') + configuration: MessageSendConfiguration = { + 'history_length': 15, + 'accepted_output_modes': ['text/plain'], + } + request = stream_message_request(message, req_id='req-13', configuration=configuration) + + # Collect all events + events: list[Any] = [] + async for event in tm.stream_message(request): + events.append(event) + + # Verify we got the task and completion status + assert len(events) == 2 + assert events[0]['kind'] == 'task' + assert events[0]['context_id'] == 'stream-context-123' + assert events[1]['kind'] == 'status-update' + assert events[1]['final'] is True + + # Verify params passed to broker + mock_run_task.assert_called_once() + call_args = mock_run_task.call_args[0][0] + assert call_args['context_id'] == 'stream-context-123' + assert 'history_length' in call_args + assert call_args['history_length'] == 15 diff --git a/uv.lock b/uv.lock index a77fc88..4e88f72 100644 --- a/uv.lock +++ b/uv.lock @@ -60,6 +60,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/b7/b8/3fe70c75fe32afc4bb507f75563d39bc5642255d1d94f1f23604725780bf/babel-2.17.0-py3-none-any.whl", hash = "sha256:4d0b53093fdfb4b21c92b5213dba5a1b23885afa8383709427046b21c366e5f2", size = 10182537, upload-time = "2025-02-01T15:17:37.39Z" }, ] +[[package]] +name = "backports-asyncio-runner" +version = "1.2.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/8e/ff/70dca7d7cb1cbc0edb2c6cc0c38b65cba36cccc491eca64cabd5fe7f8670/backports_asyncio_runner-1.2.0.tar.gz", hash = "sha256:a5aa7b2b7d8f8bfcaa2b57313f70792df84e32a2a746f585213373f900b42162", size = 69893, upload-time = "2025-07-02T02:27:15.685Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/a0/59/76ab57e3fe74484f48a53f8e337171b4a2349e506eabe136d7e01d059086/backports_asyncio_runner-1.2.0-py3-none-any.whl", hash = "sha256:0da0a936a8aeb554eccb426dc55af3ba63bcdc69fa1a600b5bb305413a4477b5", size = 12313, upload-time = "2025-07-02T02:27:14.263Z" }, +] + [[package]] name = "backrefs" version = "5.9" @@ -439,6 +448,7 @@ dependencies = [ { name = "eval-type-backport", marker = "python_full_version < '3.10'" }, { name = "opentelemetry-api" }, { name = "pydantic" }, + { name = "sse-starlette" }, { name = "starlette" }, ] @@ -452,9 +462,12 @@ dev = [ { name = "asgi-lifespan" }, { name = "coverage" }, { name = "httpx" }, + { name = "httpx-sse" }, { name = "inline-snapshot" }, { name = "pyright" }, { name = "pytest" }, + { name = "pytest-asyncio" }, + { name = "pytest-mock" }, { name = "ruff" }, ] docs = [ @@ -470,6 +483,7 @@ requires-dist = [ { name = "logfire", marker = "extra == 'logfire'", specifier = ">=2.3" }, { name = "opentelemetry-api", specifier = ">=1.28.0" }, { name = "pydantic", specifier = ">=2.10" }, + { name = "sse-starlette", specifier = ">=2.0.0" }, { name = "starlette", specifier = ">0.29.0" }, ] provides-extras = ["logfire"] @@ -479,9 +493,12 @@ dev = [ { name = "asgi-lifespan" }, { name = "coverage" }, { name = "httpx" }, + { name = "httpx-sse" }, { name = "inline-snapshot" }, { name = "pyright" }, { name = "pytest" }, + { name = "pytest-asyncio" }, + { name = "pytest-mock" }, { name = "ruff" }, ] docs = [ @@ -564,6 +581,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/2a/39/e50c7c3a983047577ee07d2a9e53faf5a69493943ec3f6a384bdc792deb2/httpx-0.28.1-py3-none-any.whl", hash = "sha256:d909fcccc110f8c7faf814ca82a9a4d816bc5a6dbfea25d6591d6985b8ba59ad", size = 73517, upload-time = "2024-12-06T15:37:21.509Z" }, ] +[[package]] +name = "httpx-sse" +version = "0.4.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/6e/fa/66bd985dd0b7c109a3bcb89272ee0bfb7e2b4d06309ad7b38ff866734b2a/httpx_sse-0.4.1.tar.gz", hash = "sha256:8f44d34414bc7b21bf3602713005c5df4917884f76072479b21f68befa4ea26e", size = 12998, upload-time = "2025-06-24T13:21:05.71Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/25/0a/6269e3473b09aed2dab8aa1a600c70f31f00ae1349bee30658f7e358a159/httpx_sse-0.4.1-py3-none-any.whl", hash = "sha256:cba42174344c3a5b06f255ce65b350880f962d99ead85e776f23c6618a377a37", size = 8054, upload-time = "2025-06-24T13:21:04.772Z" }, +] + [[package]] name = "idna" version = "3.10" @@ -1356,6 +1382,32 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/29/16/c8a903f4c4dffe7a12843191437d7cd8e32751d5de349d45d3fe69544e87/pytest-8.4.1-py3-none-any.whl", hash = "sha256:539c70ba6fcead8e78eebbf1115e8b589e7565830d7d006a8723f19ac8a0afb7", size = 365474, upload-time = "2025-06-18T05:48:03.955Z" }, ] +[[package]] +name = "pytest-asyncio" +version = "1.1.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "backports-asyncio-runner", marker = "python_full_version < '3.11'" }, + { name = "pytest" }, + { name = "typing-extensions", marker = "python_full_version < '3.10'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/4e/51/f8794af39eeb870e87a8c8068642fc07bce0c854d6865d7dd0f2a9d338c2/pytest_asyncio-1.1.0.tar.gz", hash = "sha256:796aa822981e01b68c12e4827b8697108f7205020f24b5793b3c41555dab68ea", size = 46652, upload-time = "2025-07-16T04:29:26.393Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/c7/9d/bf86eddabf8c6c9cb1ea9a869d6873b46f105a5d292d3a6f7071f5b07935/pytest_asyncio-1.1.0-py3-none-any.whl", hash = "sha256:5fe2d69607b0bd75c656d1211f969cadba035030156745ee09e7d71740e58ecf", size = 15157, upload-time = "2025-07-16T04:29:24.929Z" }, +] + +[[package]] +name = "pytest-mock" +version = "3.14.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "pytest" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/71/28/67172c96ba684058a4d24ffe144d64783d2a270d0af0d9e792737bddc75c/pytest_mock-3.14.1.tar.gz", hash = "sha256:159e9edac4c451ce77a5cdb9fc5d1100708d2dd4ba3c3df572f14097351af80e", size = 33241, upload-time = "2025-05-26T13:58:45.167Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/b2/05/77b60e520511c53d1c1ca75f1930c7dd8e971d0c4379b7f4b3f9644685ba/pytest_mock-3.14.1-py3-none-any.whl", hash = "sha256:178aefcd11307d874b4cd3100344e7e2d888d9791a6a1d9bfe90fbc1b74fd1d0", size = 9923, upload-time = "2025-05-26T13:58:43.487Z" }, +] + [[package]] name = "python-dateutil" version = "2.9.0.post0" @@ -1514,6 +1566,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/e7/9c/0e6afc12c269578be5c0c1c9f4b49a8d32770a080260c333ac04cc1c832d/soupsieve-2.7-py3-none-any.whl", hash = "sha256:6e60cc5c1ffaf1cebcc12e8188320b72071e922c2e897f737cadce79ad5d30c4", size = 36677, upload-time = "2025-04-20T18:50:07.196Z" }, ] +[[package]] +name = "sse-starlette" +version = "2.4.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "anyio" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/07/3e/eae74d8d33e3262bae0a7e023bb43d8bdd27980aa3557333f4632611151f/sse_starlette-2.4.1.tar.gz", hash = "sha256:7c8a800a1ca343e9165fc06bbda45c78e4c6166320707ae30b416c42da070926", size = 18635, upload-time = "2025-07-06T09:41:33.631Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/e4/f1/6c7eaa8187ba789a6dd6d74430307478d2a91c23a5452ab339b6fbe15a08/sse_starlette-2.4.1-py3-none-any.whl", hash = "sha256:08b77ea898ab1a13a428b2b6f73cfe6d0e607a7b4e15b9bb23e4a37b087fd39a", size = 10824, upload-time = "2025-07-06T09:41:32.321Z" }, +] + [[package]] name = "starlette" version = "0.47.1"