From 3cc1da0d6576b7c1c59b710128cbd1d0a4398ce2 Mon Sep 17 00:00:00 2001 From: Marcelo Trylesinski Date: Sun, 8 Mar 2026 10:51:48 -0700 Subject: [PATCH 01/13] Update push notification types and add ListTasks for A2A v1 spec - Flatten TaskPushNotificationConfig (absorb PushNotificationConfig fields) - Rename PushNotificationAuthenticationInfo to AuthenticationInfo (schemes -> scheme) - Add ListTasksParams, ListTasksResult, ListTasksRequest, ListTasksResponse - Add GetTaskPushNotificationConfigParams, ListTaskPushNotificationConfigResult - Update MessageSendConfiguration to use task_push_notification_config - Update A2ARequest/A2AResponse unions with new types --- fasta2a/schema.py | 187 +++++++++++++++++++++++++++++++--------------- 1 file changed, 127 insertions(+), 60 deletions(-) diff --git a/fasta2a/schema.py b/fasta2a/schema.py index 6aff5f0..db12a3f 100644 --- a/fasta2a/schema.py +++ b/fasta2a/schema.py @@ -396,63 +396,37 @@ class Artifact(TypedDict): """Array of extensions.""" -@pydantic.with_config({'alias_generator': to_camel}) -class PushNotificationConfig(TypedDict): - """Configuration for push notifications. - - A2A supports a secure notification mechanism whereby an agent can notify a client of an update - outside a connected session via a PushNotificationService. Within and across enterprises, - it is critical that the agent verifies the identity of the notification service, authenticates - itself with the service, and presents an identifier that ties the notification to the executing - Task. - - The target server of the PushNotificationService should be considered a separate service, and - is not guaranteed (or even expected) to be the client directly. This PushNotificationService is - responsible for authenticating and authorizing the agent and for proxying the verified notification - to the appropriate endpoint (which could be anything from a pub/sub queue, to an email inbox or - other service, etc.). - - For contrived scenarios with isolated client-agent pairs (e.g. local service mesh in a contained - VPC, etc.) or isolated environments without enterprise security concerns, the client may choose to - simply open a port and act as its own PushNotificationService. Any enterprise implementation will - likely have a centralized service that authenticates the remote agents with trusted notification - credentials and can handle online/offline scenarios. (This should be thought of similarly to a - mobile Push Notification Service). - """ +class AuthenticationInfo(TypedDict): + """Defines authentication details, used for push notifications.""" - id: NotRequired[str] - """Server-assigned identifier.""" - - url: str - """The URL to send push notifications to.""" - - token: NotRequired[str] - """Token unique to this task/session.""" + scheme: str + """HTTP Authentication Scheme (e.g., 'Bearer', 'Basic', 'Digest').""" - authentication: NotRequired[PushNotificationAuthenticationInfo] - """Authentication details for push notifications.""" + credentials: NotRequired[str] + """Push notification credentials. Format depends on the scheme (e.g., token for Bearer).""" @pydantic.with_config({'alias_generator': to_camel}) -class PushNotificationAuthenticationInfo(TypedDict): - """Authentication information for push notifications.""" +class TaskPushNotificationConfig(TypedDict): + """A container associating a push notification configuration with a specific task.""" - schemes: list[str] - """A list of supported authentication schemes (e.g., 'Basic', 'Bearer').""" + id: NotRequired[str] + """A unique identifier (e.g. UUID) for this push notification configuration.""" - credentials: NotRequired[str] - """Optional credentials required by the push notification endpoint.""" + task_id: NotRequired[str] + """The ID of the task this configuration is associated with.""" + tenant: NotRequired[str] + """Optional. Tenant ID.""" -@pydantic.with_config({'alias_generator': to_camel}) -class TaskPushNotificationConfig(TypedDict): - """Configuration for task push notifications.""" + url: str + """The URL where the notification should be sent.""" - id: str - """The task id.""" + token: NotRequired[str] + """A token unique for this task or session.""" - push_notification_config: PushNotificationConfig - """The push notification configuration.""" + authentication: NotRequired[AuthenticationInfo] + """Authentication information required to send the notification.""" @pydantic.with_config({'alias_generator': to_camel}) @@ -633,6 +607,52 @@ class TaskQueryParams(TaskIdParams): """Number of recent messages to be retrieved.""" +@pydantic.with_config({'alias_generator': to_camel}) +class ListTasksParams(TypedDict): + """Parameters for listing tasks with optional filtering criteria.""" + + context_id: NotRequired[str] + """Filter tasks by context ID.""" + + status: NotRequired[TaskState] + """Filter tasks by their current status state.""" + + status_timestamp_after: NotRequired[str] + """Filter tasks with status updated after this ISO 8601 timestamp.""" + + history_length: NotRequired[int] + """The maximum number of messages to include in each task's history.""" + + include_artifacts: NotRequired[bool] + """Whether to include artifacts in the returned tasks. Defaults to false.""" + + page_size: NotRequired[int] + """The maximum number of tasks to return (1-100, default 50).""" + + page_token: NotRequired[str] + """A page token from a previous `ListTasks` call for pagination.""" + + tenant: NotRequired[str] + """Optional. Tenant ID.""" + + +@pydantic.with_config({'alias_generator': to_camel}) +class ListTasksResult(TypedDict): + """Result for listing tasks with pagination information.""" + + tasks: list[Task] + """Array of tasks matching the specified criteria.""" + + next_page_token: NotRequired[str] + """A token to retrieve the next page of results.""" + + page_size: NotRequired[int] + """The page size used for this response.""" + + total_size: NotRequired[int] + """Total number of tasks available (before pagination).""" + + @pydantic.with_config({'alias_generator': to_camel}) class MessageSendConfiguration(TypedDict): """Configuration for the send message request.""" @@ -646,8 +666,8 @@ class MessageSendConfiguration(TypedDict): history_length: NotRequired[int] """Number of recent messages to be retrieved.""" - push_notification_config: NotRequired[PushNotificationConfig] - """Where the server should send notifications when disconnected.""" + task_push_notification_config: NotRequired[TaskPushNotificationConfig] + """Configuration for the agent to send push notifications for task updates.""" @pydantic.with_config({'alias_generator': to_camel}) @@ -689,28 +709,59 @@ class TaskSendParams(TypedDict): @pydantic.with_config({'alias_generator': to_camel}) -class ListTaskPushNotificationConfigParams(TypedDict): - """Parameters for getting list of pushNotificationConfigurations associated with a Task.""" +class GetTaskPushNotificationConfigParams(TypedDict): + """Parameters for getting a push notification configuration.""" id: str - """Task id.""" + """The resource ID of the configuration to retrieve.""" - metadata: NotRequired[dict[str, Any]] - """Extension metadata.""" + task_id: str + """The parent task resource ID.""" + + tenant: NotRequired[str] + """Optional. Tenant ID.""" + + +@pydantic.with_config({'alias_generator': to_camel}) +class ListTaskPushNotificationConfigParams(TypedDict): + """Parameters for listing push notification configurations associated with a task.""" + + task_id: str + """The parent task resource ID.""" + + page_size: NotRequired[int] + """The maximum number of configurations to return.""" + + page_token: NotRequired[str] + """A page token received from a previous request.""" + + tenant: NotRequired[str] + """Optional. Tenant ID.""" + + +@pydantic.with_config({'alias_generator': to_camel}) +class ListTaskPushNotificationConfigResult(TypedDict): + """Result for listing push notification configurations.""" + + configs: list[TaskPushNotificationConfig] + """The list of push notification configurations.""" + + next_page_token: NotRequired[str] + """A token to retrieve the next page of results.""" @pydantic.with_config({'alias_generator': to_camel}) class DeleteTaskPushNotificationConfigParams(TypedDict): - """Parameters for removing pushNotificationConfiguration associated with a Task.""" + """Parameters for removing a push notification configuration associated with a task.""" id: str - """Task id.""" + """The resource ID of the configuration to delete.""" - push_notification_config_id: str - """The push notification config id to delete.""" + task_id: str + """The parent task resource ID.""" - metadata: NotRequired[dict[str, Any]] - """Extension metadata.""" + tenant: NotRequired[str] + """Optional. Tenant ID.""" class JSONRPCMessage(TypedDict): @@ -854,7 +905,9 @@ class StreamResponse(TypedDict): SetTaskPushNotificationResponse = JSONRPCResponse[TaskPushNotificationConfig, PushNotificationNotSupportedError] """A JSON RPC response to set a task push notification.""" -GetTaskPushNotificationRequest = JSONRPCRequest[Literal['tasks/pushNotification/get'], TaskIdParams] +GetTaskPushNotificationRequest = JSONRPCRequest[ + Literal['tasks/pushNotification/get'], GetTaskPushNotificationConfigParams +] """A JSON RPC request to get a task push notification.""" GetTaskPushNotificationResponse = JSONRPCResponse[TaskPushNotificationConfig, PushNotificationNotSupportedError] @@ -868,11 +921,22 @@ class StreamResponse(TypedDict): ] """A JSON RPC request to list task push notification configs.""" +ListTaskPushNotificationConfigResponse = JSONRPCResponse[ + ListTaskPushNotificationConfigResult, PushNotificationNotSupportedError +] +"""A JSON RPC response to list task push notification configs.""" + DeleteTaskPushNotificationConfigRequest = JSONRPCRequest[ Literal['tasks/pushNotificationConfig/delete'], DeleteTaskPushNotificationConfigParams ] """A JSON RPC request to delete a task push notification config.""" +ListTasksRequest = JSONRPCRequest[Literal['tasks/list'], ListTasksParams] +"""A JSON RPC request to list tasks.""" + +ListTasksResponse = JSONRPCResponse[ListTasksResult, JSONRPCError[Any, Any]] +"""A JSON RPC response to list tasks.""" + A2ARequest = Annotated[ Union[ SendMessageRequest, @@ -884,6 +948,7 @@ class StreamResponse(TypedDict): ResubscribeTaskRequest, ListTaskPushNotificationConfigRequest, DeleteTaskPushNotificationConfigRequest, + ListTasksRequest, ], Discriminator('method'), ] @@ -896,6 +961,8 @@ class StreamResponse(TypedDict): CancelTaskResponse, SetTaskPushNotificationResponse, GetTaskPushNotificationResponse, + ListTaskPushNotificationConfigResponse, + ListTasksResponse, ] """A JSON RPC response from the A2A server.""" From 3e3f9d033c7fff62377290a1e2d4348f1d2eb767 Mon Sep 17 00:00:00 2001 From: Marcelo Trylesinski Date: Sun, 8 Mar 2026 11:06:51 -0700 Subject: [PATCH 02/13] Remove `final` from TaskStatusUpdateEvent, add `tenant` to request params, add SendMessageResult wrapper - Remove `final` field from `TaskStatusUpdateEvent` (not in v1 spec) - Add `tenant` field to `TaskIdParams` and `MessageSendParams` - Add `SendMessageResult` wrapper type matching v1 spec shape - Update task_manager to wrap result in `SendMessageResult` --- fasta2a/schema.py | 23 +++++++++++++++++++---- fasta2a/task_manager.py | 4 +++- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/fasta2a/schema.py b/fasta2a/schema.py index db12a3f..8897b63 100644 --- a/fasta2a/schema.py +++ b/fasta2a/schema.py @@ -558,9 +558,6 @@ class TaskStatusUpdateEvent(TypedDict): status: TaskStatus """The status of the task.""" - final: bool - """Indicates the end of the event stream.""" - metadata: NotRequired[dict[str, Any]] """Extension metadata.""" @@ -595,6 +592,9 @@ class TaskIdParams(TypedDict): id: str """The unique identifier for the task.""" + tenant: NotRequired[str] + """Optional. Tenant ID.""" + metadata: NotRequired[dict[str, Any]] """Optional metadata associated with the request.""" @@ -680,6 +680,9 @@ class MessageSendParams(TypedDict): message: Message """The message being sent to the server.""" + tenant: NotRequired[str] + """Optional. Tenant ID.""" + metadata: NotRequired[dict[str, Any]] """Extension metadata.""" @@ -860,7 +863,19 @@ class JSONRPCResponse(JSONRPCMessage, Generic[ResultT, ErrorT]): SendMessageRequest = JSONRPCRequest[Literal['message/send'], MessageSendParams] """A JSON RPC request to send a message.""" -SendMessageResponse = JSONRPCResponse[Union[Task, Message], JSONRPCError[Any, Any]] + +@pydantic.with_config({'alias_generator': to_camel}) +class SendMessageResult(TypedDict): + """The result of a SendMessage request.""" + + message: NotRequired[Message] + """A message from the agent.""" + + task: NotRequired[Task] + """The task created or updated by the message.""" + + +SendMessageResponse = JSONRPCResponse[SendMessageResult, JSONRPCError[Any, Any]] """A JSON RPC response to send a message.""" StreamMessageRequest = JSONRPCRequest[Literal['message/stream'], MessageSendParams] diff --git a/fasta2a/task_manager.py b/fasta2a/task_manager.py index 54ab709..fc9f7d3 100644 --- a/fasta2a/task_manager.py +++ b/fasta2a/task_manager.py @@ -76,6 +76,7 @@ ResubscribeTaskRequest, SendMessageRequest, SendMessageResponse, + SendMessageResult, SetTaskPushNotificationRequest, SetTaskPushNotificationResponse, StreamMessageRequest, @@ -127,7 +128,8 @@ async def send_message(self, request: SendMessageRequest) -> SendMessageResponse broker_params['history_length'] = history_length await self.broker.run_task(broker_params) - return SendMessageResponse(jsonrpc='2.0', id=request_id, result=task) + result = SendMessageResult(task=task) + return SendMessageResponse(jsonrpc='2.0', id=request_id, result=result) async def get_task(self, request: GetTaskRequest) -> GetTaskResponse: """Get a task, and return it to the client. From 04be72f529697e343f73b09d8ac84f23b52d56fa Mon Sep 17 00:00:00 2001 From: Marcelo Trylesinski Date: Sun, 8 Mar 2026 11:15:36 -0700 Subject: [PATCH 03/13] Route all A2A methods and return proper JSON-RPC error responses - Route all JSON-RPC methods in _agent_run_endpoint - Return UnsupportedOperationError for message/stream and tasks/resubscribe - Return PushNotificationNotSupportedError for push notification methods - Return UnsupportedOperationError for tasks/list (not yet implemented) - Add DeleteTaskPushNotificationConfigResponse type - Remove TaskStatusUpdateEvent.final (not in v1 spec) - Add tenant field to TaskIdParams and MessageSendParams - Add SendMessageResult wrapper matching v1 spec --- fasta2a/applications.py | 61 +++++++++++++++++++++++++++++++++++------ fasta2a/schema.py | 4 +++ fasta2a/task_manager.py | 55 +++++++++++++++++++++++++++++-------- 3 files changed, 99 insertions(+), 21 deletions(-) diff --git a/fasta2a/applications.py b/fasta2a/applications.py index db95ab7..11b6fb6 100644 --- a/fasta2a/applications.py +++ b/fasta2a/applications.py @@ -3,7 +3,7 @@ from collections.abc import AsyncIterator, Sequence from contextlib import asynccontextmanager from pathlib import Path -from typing import Any +from typing import Any, cast from starlette.applications import Starlette from starlette.middleware import Middleware @@ -14,11 +14,23 @@ from .broker import Broker from .schema import ( + A2AResponse, AgentCapabilities, AgentCard, AgentInterface, AgentProvider, + CancelTaskRequest, + DeleteTaskPushNotificationConfigRequest, + GetTaskPushNotificationRequest, + GetTaskRequest, + ListTaskPushNotificationConfigRequest, + ListTasksRequest, + MethodNotFoundError, + SendMessageRequest, + SendMessageResponse, + SetTaskPushNotificationRequest, Skill, + UnsupportedOperationError, a2a_request_ta, a2a_response_ta, agent_card_ta, @@ -128,15 +140,46 @@ async def _agent_run_endpoint(self, request: Request) -> Response: """ data = await request.body() a2a_request = a2a_request_ta.validate_json(data) - - if a2a_request['method'] == 'message/send': - jsonrpc_response = await self.task_manager.send_message(a2a_request) - elif a2a_request['method'] == 'tasks/get': - jsonrpc_response = await self.task_manager.get_task(a2a_request) - elif a2a_request['method'] == 'tasks/cancel': - jsonrpc_response = await self.task_manager.cancel_task(a2a_request) + method = a2a_request['method'] + request_id = a2a_request['id'] + + jsonrpc_response: A2AResponse + if method == 'message/send': + jsonrpc_response = await self.task_manager.send_message(cast(SendMessageRequest, a2a_request)) + elif method == 'tasks/get': + jsonrpc_response = await self.task_manager.get_task(cast(GetTaskRequest, a2a_request)) + elif method == 'tasks/cancel': + jsonrpc_response = await self.task_manager.cancel_task(cast(CancelTaskRequest, a2a_request)) + elif method == 'tasks/pushNotification/set': + jsonrpc_response = await self.task_manager.set_task_push_notification( + cast(SetTaskPushNotificationRequest, a2a_request) + ) + elif method == 'tasks/pushNotification/get': + jsonrpc_response = await self.task_manager.get_task_push_notification( + cast(GetTaskPushNotificationRequest, a2a_request) + ) + elif method == 'tasks/pushNotificationConfig/list': + jsonrpc_response = await self.task_manager.list_task_push_notification_configs( + cast(ListTaskPushNotificationConfigRequest, a2a_request) + ) + elif method == 'tasks/pushNotificationConfig/delete': + jsonrpc_response = await self.task_manager.delete_task_push_notification_config( + cast(DeleteTaskPushNotificationConfigRequest, a2a_request) + ) + elif method == 'tasks/list': + jsonrpc_response = await self.task_manager.list_tasks(cast(ListTasksRequest, a2a_request)) + elif method in ('message/stream', 'tasks/resubscribe'): + jsonrpc_response = SendMessageResponse( + jsonrpc='2.0', + id=request_id, + error=UnsupportedOperationError(code=-32004, message='This operation is not supported'), + ) else: - raise NotImplementedError(f'Method {a2a_request["method"]} not implemented.') + jsonrpc_response = SendMessageResponse( + jsonrpc='2.0', + id=request_id, + error=MethodNotFoundError(code=-32601, message='Method not found'), + ) return Response( content=a2a_response_ta.dump_json(jsonrpc_response, by_alias=True), media_type='application/json' ) diff --git a/fasta2a/schema.py b/fasta2a/schema.py index 8897b63..84c42a7 100644 --- a/fasta2a/schema.py +++ b/fasta2a/schema.py @@ -946,6 +946,9 @@ class StreamResponse(TypedDict): ] """A JSON RPC request to delete a task push notification config.""" +DeleteTaskPushNotificationConfigResponse = JSONRPCResponse[None, PushNotificationNotSupportedError] +"""A JSON RPC response to delete a task push notification config.""" + ListTasksRequest = JSONRPCRequest[Literal['tasks/list'], ListTasksParams] """A JSON RPC request to list tasks.""" @@ -977,6 +980,7 @@ class StreamResponse(TypedDict): SetTaskPushNotificationResponse, GetTaskPushNotificationResponse, ListTaskPushNotificationConfigResponse, + DeleteTaskPushNotificationConfigResponse, ListTasksResponse, ] """A JSON RPC response from the A2A server.""" diff --git a/fasta2a/task_manager.py b/fasta2a/task_manager.py index fc9f7d3..0a2fee0 100644 --- a/fasta2a/task_manager.py +++ b/fasta2a/task_manager.py @@ -69,20 +69,25 @@ from .schema import ( CancelTaskRequest, CancelTaskResponse, + DeleteTaskPushNotificationConfigRequest, + DeleteTaskPushNotificationConfigResponse, GetTaskPushNotificationRequest, GetTaskPushNotificationResponse, GetTaskRequest, GetTaskResponse, - ResubscribeTaskRequest, + ListTaskPushNotificationConfigRequest, + ListTaskPushNotificationConfigResponse, + ListTasksRequest, + ListTasksResponse, + PushNotificationNotSupportedError, SendMessageRequest, SendMessageResponse, SendMessageResult, SetTaskPushNotificationRequest, SetTaskPushNotificationResponse, - StreamMessageRequest, - StreamMessageResponse, TaskNotFoundError, TaskSendParams, + UnsupportedOperationError, ) from .storage import Storage @@ -158,19 +163,45 @@ async def cancel_task(self, request: CancelTaskRequest) -> CancelTaskResponse: ) return CancelTaskResponse(jsonrpc='2.0', id=request['id'], result=task) - async def stream_message(self, request: StreamMessageRequest) -> StreamMessageResponse: - """Stream messages using Server-Sent Events.""" - raise NotImplementedError('message/stream method is not implemented yet.') - async def set_task_push_notification( self, request: SetTaskPushNotificationRequest ) -> SetTaskPushNotificationResponse: - raise NotImplementedError('SetTaskPushNotification is not implemented yet.') + return SetTaskPushNotificationResponse( + jsonrpc='2.0', + id=request['id'], + error=PushNotificationNotSupportedError(code=-32003, message='Push notification not supported'), + ) async def get_task_push_notification( self, request: GetTaskPushNotificationRequest ) -> GetTaskPushNotificationResponse: - raise NotImplementedError('GetTaskPushNotification is not implemented yet.') - - async def resubscribe_task(self, request: ResubscribeTaskRequest) -> None: - raise NotImplementedError('Resubscribe is not implemented yet.') + return GetTaskPushNotificationResponse( + jsonrpc='2.0', + id=request['id'], + error=PushNotificationNotSupportedError(code=-32003, message='Push notification not supported'), + ) + + async def list_task_push_notification_configs( + self, request: ListTaskPushNotificationConfigRequest + ) -> ListTaskPushNotificationConfigResponse: + return ListTaskPushNotificationConfigResponse( + jsonrpc='2.0', + id=request['id'], + error=PushNotificationNotSupportedError(code=-32003, message='Push notification not supported'), + ) + + async def delete_task_push_notification_config( + self, request: DeleteTaskPushNotificationConfigRequest + ) -> DeleteTaskPushNotificationConfigResponse: + return DeleteTaskPushNotificationConfigResponse( + jsonrpc='2.0', + id=request['id'], + error=PushNotificationNotSupportedError(code=-32003, message='Push notification not supported'), + ) + + async def list_tasks(self, request: ListTasksRequest) -> ListTasksResponse: + return ListTasksResponse( + jsonrpc='2.0', + id=request['id'], + error=UnsupportedOperationError(code=-32004, message='This operation is not supported'), + ) From 02fb21bfb1381a3a48fa9e4c90a5c1fa97206952 Mon Sep 17 00:00:00 2001 From: Marcelo Trylesinski Date: Sun, 8 Mar 2026 11:17:41 -0700 Subject: [PATCH 04/13] Remove cast() by using direct discriminator access for type narrowing --- fasta2a/applications.py | 62 +++++++++++++++-------------------------- 1 file changed, 22 insertions(+), 40 deletions(-) diff --git a/fasta2a/applications.py b/fasta2a/applications.py index 11b6fb6..cba1c0d 100644 --- a/fasta2a/applications.py +++ b/fasta2a/applications.py @@ -3,7 +3,7 @@ from collections.abc import AsyncIterator, Sequence from contextlib import asynccontextmanager from pathlib import Path -from typing import Any, cast +from typing import Any from starlette.applications import Starlette from starlette.middleware import Middleware @@ -19,16 +19,7 @@ AgentCard, AgentInterface, AgentProvider, - CancelTaskRequest, - DeleteTaskPushNotificationConfigRequest, - GetTaskPushNotificationRequest, - GetTaskRequest, - ListTaskPushNotificationConfigRequest, - ListTasksRequest, - MethodNotFoundError, - SendMessageRequest, SendMessageResponse, - SetTaskPushNotificationRequest, Skill, UnsupportedOperationError, a2a_request_ta, @@ -140,45 +131,36 @@ async def _agent_run_endpoint(self, request: Request) -> Response: """ data = await request.body() a2a_request = a2a_request_ta.validate_json(data) - method = a2a_request['method'] - request_id = a2a_request['id'] jsonrpc_response: A2AResponse - if method == 'message/send': - jsonrpc_response = await self.task_manager.send_message(cast(SendMessageRequest, a2a_request)) - elif method == 'tasks/get': - jsonrpc_response = await self.task_manager.get_task(cast(GetTaskRequest, a2a_request)) - elif method == 'tasks/cancel': - jsonrpc_response = await self.task_manager.cancel_task(cast(CancelTaskRequest, a2a_request)) - elif method == 'tasks/pushNotification/set': - jsonrpc_response = await self.task_manager.set_task_push_notification( - cast(SetTaskPushNotificationRequest, a2a_request) - ) - elif method == 'tasks/pushNotification/get': - jsonrpc_response = await self.task_manager.get_task_push_notification( - cast(GetTaskPushNotificationRequest, a2a_request) - ) - elif method == 'tasks/pushNotificationConfig/list': - jsonrpc_response = await self.task_manager.list_task_push_notification_configs( - cast(ListTaskPushNotificationConfigRequest, a2a_request) - ) - elif method == 'tasks/pushNotificationConfig/delete': - jsonrpc_response = await self.task_manager.delete_task_push_notification_config( - cast(DeleteTaskPushNotificationConfigRequest, a2a_request) - ) - elif method == 'tasks/list': - jsonrpc_response = await self.task_manager.list_tasks(cast(ListTasksRequest, a2a_request)) - elif method in ('message/stream', 'tasks/resubscribe'): + if a2a_request['method'] == 'message/send': + jsonrpc_response = await self.task_manager.send_message(a2a_request) + elif a2a_request['method'] == 'tasks/get': + jsonrpc_response = await self.task_manager.get_task(a2a_request) + elif a2a_request['method'] == 'tasks/cancel': + jsonrpc_response = await self.task_manager.cancel_task(a2a_request) + elif a2a_request['method'] == 'tasks/pushNotification/set': + jsonrpc_response = await self.task_manager.set_task_push_notification(a2a_request) + elif a2a_request['method'] == 'tasks/pushNotification/get': + jsonrpc_response = await self.task_manager.get_task_push_notification(a2a_request) + elif a2a_request['method'] == 'tasks/pushNotificationConfig/list': + jsonrpc_response = await self.task_manager.list_task_push_notification_configs(a2a_request) + elif a2a_request['method'] == 'tasks/pushNotificationConfig/delete': + jsonrpc_response = await self.task_manager.delete_task_push_notification_config(a2a_request) + elif a2a_request['method'] == 'tasks/list': + jsonrpc_response = await self.task_manager.list_tasks(a2a_request) + elif a2a_request['method'] == 'message/stream': jsonrpc_response = SendMessageResponse( jsonrpc='2.0', - id=request_id, + id=a2a_request['id'], error=UnsupportedOperationError(code=-32004, message='This operation is not supported'), ) else: + assert a2a_request['method'] == 'tasks/resubscribe' jsonrpc_response = SendMessageResponse( jsonrpc='2.0', - id=request_id, - error=MethodNotFoundError(code=-32601, message='Method not found'), + id=a2a_request['id'], + error=UnsupportedOperationError(code=-32004, message='This operation is not supported'), ) return Response( content=a2a_response_ta.dump_json(jsonrpc_response, by_alias=True), media_type='application/json' From 604e3f3f168eb77d1369f8d38eb56c8d9c6aa9c6 Mon Sep 17 00:00:00 2001 From: Marcelo Trylesinski Date: Sun, 8 Mar 2026 11:19:03 -0700 Subject: [PATCH 05/13] Raise ValueError for unknown methods instead of using assert --- fasta2a/applications.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/fasta2a/applications.py b/fasta2a/applications.py index cba1c0d..87b5c83 100644 --- a/fasta2a/applications.py +++ b/fasta2a/applications.py @@ -155,13 +155,14 @@ async def _agent_run_endpoint(self, request: Request) -> Response: id=a2a_request['id'], error=UnsupportedOperationError(code=-32004, message='This operation is not supported'), ) - else: - assert a2a_request['method'] == 'tasks/resubscribe' + elif a2a_request['method'] == 'tasks/resubscribe': jsonrpc_response = SendMessageResponse( jsonrpc='2.0', id=a2a_request['id'], error=UnsupportedOperationError(code=-32004, message='This operation is not supported'), ) + else: + raise ValueError(f'Unknown method: {a2a_request["method"]}') return Response( content=a2a_response_ta.dump_json(jsonrpc_response, by_alias=True), media_type='application/json' ) From 22020c5fe67edbe28b763087db2e156909ca41a4 Mon Sep 17 00:00:00 2001 From: Marcelo Trylesinski Date: Sun, 8 Mar 2026 11:20:15 -0700 Subject: [PATCH 06/13] Use NotImplementedError for unknown methods --- fasta2a/applications.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fasta2a/applications.py b/fasta2a/applications.py index 87b5c83..7e96251 100644 --- a/fasta2a/applications.py +++ b/fasta2a/applications.py @@ -162,7 +162,7 @@ async def _agent_run_endpoint(self, request: Request) -> Response: error=UnsupportedOperationError(code=-32004, message='This operation is not supported'), ) else: - raise ValueError(f'Unknown method: {a2a_request["method"]}') + raise NotImplementedError(f'Method {a2a_request["method"]} not implemented.') return Response( content=a2a_response_ta.dump_json(jsonrpc_response, by_alias=True), media_type='application/json' ) From cb1fcd6970ee5a8d09e3d7f1dfbc69e21d953350 Mon Sep 17 00:00:00 2001 From: Marcelo Trylesinski Date: Sun, 8 Mar 2026 11:25:09 -0700 Subject: [PATCH 07/13] Remove outdated v0.3.0 reference from docstring --- fasta2a/task_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fasta2a/task_manager.py b/fasta2a/task_manager.py index 0a2fee0..f937b2b 100644 --- a/fasta2a/task_manager.py +++ b/fasta2a/task_manager.py @@ -119,7 +119,7 @@ async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any): self._aexit_stack = None async def send_message(self, request: SendMessageRequest) -> SendMessageResponse: - """Send a message using the A2A v0.3.0 protocol.""" + """Send a message using the A2A protocol.""" request_id = request['id'] message = request['params']['message'] context_id = message.get('context_id', str(uuid.uuid4())) From 59530daebce00ddd7e8812c50030d1ae89c0a4bc Mon Sep 17 00:00:00 2001 From: Marcelo Trylesinski Date: Sun, 8 Mar 2026 11:31:34 -0700 Subject: [PATCH 08/13] Implement SSE streaming for message/stream and tasks/resubscribe - Add EventBus for pub/sub task events using anyio memory streams - Add stream_message() and resubscribe_task() to TaskManager - Route message/stream and tasks/resubscribe to StreamingResponse - Worker emits failed status events and closes the bus on exception - SSE events are formatted as `data: {json}\n\n` lines --- fasta2a/__init__.py | 3 +- fasta2a/applications.py | 18 +++++------ fasta2a/event_bus.py | 47 ++++++++++++++++++++++++++++ fasta2a/task_manager.py | 69 +++++++++++++++++++++++++++++++++++++++++ fasta2a/worker.py | 20 ++++++++++-- 5 files changed, 143 insertions(+), 14 deletions(-) create mode 100644 fasta2a/event_bus.py diff --git a/fasta2a/__init__.py b/fasta2a/__init__.py index 4a8b106..21b6cb7 100644 --- a/fasta2a/__init__.py +++ b/fasta2a/__init__.py @@ -1,7 +1,8 @@ from .applications import FastA2A from .broker import Broker +from .event_bus import EventBus from .schema import Skill from .storage import Storage from .worker import Worker -__all__ = ['FastA2A', 'Skill', 'Storage', 'Broker', 'Worker'] +__all__ = ['FastA2A', 'Skill', 'Storage', 'Broker', 'EventBus', 'Worker'] diff --git a/fasta2a/applications.py b/fasta2a/applications.py index 7e96251..b607ab2 100644 --- a/fasta2a/applications.py +++ b/fasta2a/applications.py @@ -8,7 +8,7 @@ from starlette.applications import Starlette from starlette.middleware import Middleware from starlette.requests import Request -from starlette.responses import FileResponse, Response +from starlette.responses import FileResponse, Response, StreamingResponse from starlette.routing import Route from starlette.types import ExceptionHandler, Lifespan, Receive, Scope, Send @@ -19,9 +19,7 @@ AgentCard, AgentInterface, AgentProvider, - SendMessageResponse, Skill, - UnsupportedOperationError, a2a_request_ta, a2a_response_ta, agent_card_ta, @@ -150,16 +148,14 @@ async def _agent_run_endpoint(self, request: Request) -> Response: elif a2a_request['method'] == 'tasks/list': jsonrpc_response = await self.task_manager.list_tasks(a2a_request) elif a2a_request['method'] == 'message/stream': - jsonrpc_response = SendMessageResponse( - jsonrpc='2.0', - id=a2a_request['id'], - error=UnsupportedOperationError(code=-32004, message='This operation is not supported'), + return StreamingResponse( + self.task_manager.stream_message(a2a_request), + media_type='text/event-stream', ) elif a2a_request['method'] == 'tasks/resubscribe': - jsonrpc_response = SendMessageResponse( - jsonrpc='2.0', - id=a2a_request['id'], - error=UnsupportedOperationError(code=-32004, message='This operation is not supported'), + return StreamingResponse( + self.task_manager.resubscribe_task(a2a_request), + media_type='text/event-stream', ) else: raise NotImplementedError(f'Method {a2a_request["method"]} not implemented.') diff --git a/fasta2a/event_bus.py b/fasta2a/event_bus.py new file mode 100644 index 0000000..b5b08e7 --- /dev/null +++ b/fasta2a/event_bus.py @@ -0,0 +1,47 @@ +"""Event bus for streaming task updates to SSE connections.""" + +from __future__ import annotations as _annotations + +from collections import defaultdict +from collections.abc import AsyncIterator +from contextlib import asynccontextmanager + +import anyio +import anyio.abc + +from .schema import StreamResponse + + +class EventBus: + """A pub/sub event bus for streaming task events. + + Allows workers to emit events that are delivered to SSE connections. + Each subscription creates an anyio memory stream pair keyed by task ID. + """ + + def __init__(self) -> None: + self._subscribers: dict[str, list[anyio.abc.ObjectSendStream[StreamResponse]]] = defaultdict(list) + + @asynccontextmanager + async def subscribe(self, task_id: str) -> AsyncIterator[anyio.abc.ObjectReceiveStream[StreamResponse]]: + """Subscribe to events for a task. Yields a receive stream.""" + send_stream, receive_stream = anyio.create_memory_object_stream[StreamResponse]() + self._subscribers[task_id].append(send_stream) + try: + yield receive_stream + finally: + self._subscribers[task_id].remove(send_stream) + if not self._subscribers[task_id]: + del self._subscribers[task_id] + await send_stream.aclose() + await receive_stream.aclose() + + async def emit(self, task_id: str, event: StreamResponse) -> None: + """Emit an event to all subscribers for a task.""" + for send_stream in self._subscribers.get(task_id, []): + await send_stream.send(event) + + async def close(self, task_id: str) -> None: + """Close all subscriber streams for a task, signaling end of SSE.""" + for send_stream in self._subscribers.pop(task_id, []): + await send_stream.aclose() diff --git a/fasta2a/task_manager.py b/fasta2a/task_manager.py index f937b2b..9d2809c 100644 --- a/fasta2a/task_manager.py +++ b/fasta2a/task_manager.py @@ -61,11 +61,13 @@ from __future__ import annotations as _annotations import uuid +from collections.abc import AsyncIterator from contextlib import AsyncExitStack from dataclasses import dataclass, field from typing import Any from .broker import Broker +from .event_bus import EventBus from .schema import ( CancelTaskRequest, CancelTaskResponse, @@ -80,14 +82,19 @@ ListTasksRequest, ListTasksResponse, PushNotificationNotSupportedError, + ResubscribeTaskRequest, SendMessageRequest, SendMessageResponse, SendMessageResult, SetTaskPushNotificationRequest, SetTaskPushNotificationResponse, + StreamMessageRequest, + StreamMessageResponse, + StreamResponse, TaskNotFoundError, TaskSendParams, UnsupportedOperationError, + stream_message_response_ta, ) from .storage import Storage @@ -98,6 +105,7 @@ class TaskManager: broker: Broker storage: Storage[Any] + event_bus: EventBus = field(default_factory=EventBus) _aexit_stack: AsyncExitStack | None = field(default=None, init=False) @@ -163,6 +171,67 @@ async def cancel_task(self, request: CancelTaskRequest) -> CancelTaskResponse: ) return CancelTaskResponse(jsonrpc='2.0', id=request['id'], result=task) + async def stream_message(self, request: StreamMessageRequest) -> AsyncIterator[bytes]: + """Stream a message response as SSE events.""" + request_id = request['id'] + message = request['params']['message'] + context_id = message.get('context_id', str(uuid.uuid4())) + + task = await self.storage.submit_task(context_id, message) + task_id = task['id'] + + broker_params: TaskSendParams = {'id': task_id, 'context_id': context_id, 'message': message} + config = request['params'].get('configuration', {}) + history_length = config.get('history_length') + if history_length is not None: + broker_params['history_length'] = history_length + + async with self.event_bus.subscribe(task_id) as receive_stream: + await self.broker.run_task(broker_params) + + # Send initial task state + initial_response = StreamMessageResponse(jsonrpc='2.0', id=request_id, result=StreamResponse(task=task)) + yield self._format_sse_event(initial_response) + + async for event in receive_stream: + response = StreamMessageResponse(jsonrpc='2.0', id=request_id, result=event) + yield self._format_sse_event(response) + + async def resubscribe_task(self, request: ResubscribeTaskRequest) -> AsyncIterator[bytes]: + """Resubscribe to an existing task's event stream.""" + request_id = request['id'] + task_id = request['params']['id'] + + task = await self.storage.load_task(task_id) + if task is None: + error_response = StreamMessageResponse( + jsonrpc='2.0', + id=request_id, + error=TaskNotFoundError(code=-32001, message='Task not found'), + ) + yield self._format_sse_event(error_response) + return + + # Send current task state + initial_response = StreamMessageResponse(jsonrpc='2.0', id=request_id, result=StreamResponse(task=task)) + yield self._format_sse_event(initial_response) + + # If task is already in a terminal state, no need to subscribe + terminal_states = {'completed', 'canceled', 'failed', 'rejected'} + if task['status']['state'] in terminal_states: + return + + async with self.event_bus.subscribe(task_id) as receive_stream: + async for event in receive_stream: + response = StreamMessageResponse(jsonrpc='2.0', id=request_id, result=event) + yield self._format_sse_event(response) + + @staticmethod + def _format_sse_event(response: StreamMessageResponse) -> bytes: + """Format a StreamMessageResponse as an SSE event.""" + data = stream_message_response_ta.dump_json(response, by_alias=True) + return b'data: ' + data + b'\n\n' + async def set_task_push_notification( self, request: SetTaskPushNotificationRequest ) -> SetTaskPushNotificationResponse: diff --git a/fasta2a/worker.py b/fasta2a/worker.py index bcb0172..58501dc 100644 --- a/fasta2a/worker.py +++ b/fasta2a/worker.py @@ -3,13 +3,14 @@ from abc import ABC, abstractmethod from collections.abc import AsyncIterator from contextlib import asynccontextmanager -from dataclasses import dataclass +from dataclasses import dataclass, field from typing import TYPE_CHECKING, Any, Generic import anyio from opentelemetry.trace import get_tracer, use_span from typing_extensions import assert_never +from .event_bus import EventBus from .storage import ContextT, Storage if TYPE_CHECKING: @@ -25,6 +26,7 @@ class Worker(ABC, Generic[ContextT]): broker: Broker storage: Storage[ContextT] + event_bus: EventBus = field(default_factory=EventBus) @asynccontextmanager async def run(self) -> AsyncIterator[None]: @@ -54,7 +56,21 @@ async def _handle_task_operation(self, task_operation: TaskOperation) -> None: else: assert_never(task_operation) except Exception: - await self.storage.update_task(task_operation['params']['id'], state='failed') + task_id = task_operation['params']['id'] + task = await self.storage.update_task(task_id, state='failed') + from .schema import StreamResponse, TaskStatus, TaskStatusUpdateEvent + + await self.event_bus.emit( + task_id, + StreamResponse( + status_update=TaskStatusUpdateEvent( + task_id=task_id, + context_id=task['context_id'], + status=TaskStatus(state='failed'), + ) + ), + ) + await self.event_bus.close(task_id) @abstractmethod async def run_task(self, params: TaskSendParams) -> None: ... From 83d8715cc2086d8161b4cbc61a746d1220e1419f Mon Sep 17 00:00:00 2001 From: Marcelo Trylesinski Date: Sun, 8 Mar 2026 11:51:07 -0700 Subject: [PATCH 09/13] Address review feedback: share EventBus, set streaming=True, handle cleanup race - Accept event_bus parameter on FastA2A, pass to TaskManager - Set streaming=True in agent card capabilities - Handle subscribe() cleanup when close() already removed subscribers --- fasta2a/applications.py | 7 +++++-- fasta2a/event_bus.py | 11 ++++++++--- tests/test_applications.py | 2 +- 3 files changed, 14 insertions(+), 6 deletions(-) diff --git a/fasta2a/applications.py b/fasta2a/applications.py index b607ab2..33834b4 100644 --- a/fasta2a/applications.py +++ b/fasta2a/applications.py @@ -13,6 +13,7 @@ from starlette.types import ExceptionHandler, Lifespan, Receive, Scope, Send from .broker import Broker +from .event_bus import EventBus from .schema import ( A2AResponse, AgentCapabilities, @@ -36,6 +37,7 @@ def __init__( *, storage: Storage, broker: Broker, + event_bus: EventBus | None = None, # Agent card name: str | None = None, url: str = 'http://localhost:8000', @@ -73,7 +75,8 @@ def __init__( self.default_input_modes = ['application/json'] self.default_output_modes = ['application/json'] - self.task_manager = TaskManager(broker=broker, storage=storage) + self.event_bus = event_bus or EventBus() + self.task_manager = TaskManager(broker=broker, storage=storage, event_bus=self.event_bus) # Setup self._agent_card_json_schema: bytes | None = None @@ -102,7 +105,7 @@ async def _agent_card_endpoint(self, request: Request) -> Response: skills=self.skills, default_input_modes=self.default_input_modes, default_output_modes=self.default_output_modes, - capabilities=AgentCapabilities(streaming=False, push_notifications=False), + capabilities=AgentCapabilities(streaming=True, push_notifications=False), ) if self.provider is not None: agent_card['provider'] = self.provider diff --git a/fasta2a/event_bus.py b/fasta2a/event_bus.py index b5b08e7..cf4588f 100644 --- a/fasta2a/event_bus.py +++ b/fasta2a/event_bus.py @@ -30,9 +30,14 @@ async def subscribe(self, task_id: str) -> AsyncIterator[anyio.abc.ObjectReceive try: yield receive_stream finally: - self._subscribers[task_id].remove(send_stream) - if not self._subscribers[task_id]: - del self._subscribers[task_id] + subscribers = self._subscribers.get(task_id) + if subscribers is not None: + try: + subscribers.remove(send_stream) + except ValueError: + pass + if not subscribers: + del self._subscribers[task_id] await send_stream.aclose() await receive_stream.aclose() diff --git a/tests/test_applications.py b/tests/test_applications.py index e5265ba..2385138 100644 --- a/tests/test_applications.py +++ b/tests/test_applications.py @@ -43,7 +43,7 @@ async def test_agent_card(): 'defaultInputModes': ['application/json'], 'defaultOutputModes': ['application/json'], 'capabilities': { - 'streaming': False, + 'streaming': True, 'pushNotifications': False, }, } From 5f1aaf1d4311d244bea5a3178ddcb5b39811c685 Mon Sep 17 00:00:00 2001 From: Marcelo Trylesinski Date: Sun, 8 Mar 2026 12:21:30 -0700 Subject: [PATCH 10/13] Move EventBus onto Broker so it's automatically shared No more separate EventBus parameter - the broker owns it, and both TaskManager and Worker access it via self.broker.event_bus. --- fasta2a/__init__.py | 3 +-- fasta2a/applications.py | 5 +---- fasta2a/broker.py | 5 ++++- fasta2a/task_manager.py | 6 ++---- fasta2a/worker.py | 8 +++----- 5 files changed, 11 insertions(+), 16 deletions(-) diff --git a/fasta2a/__init__.py b/fasta2a/__init__.py index 21b6cb7..4a8b106 100644 --- a/fasta2a/__init__.py +++ b/fasta2a/__init__.py @@ -1,8 +1,7 @@ from .applications import FastA2A from .broker import Broker -from .event_bus import EventBus from .schema import Skill from .storage import Storage from .worker import Worker -__all__ = ['FastA2A', 'Skill', 'Storage', 'Broker', 'EventBus', 'Worker'] +__all__ = ['FastA2A', 'Skill', 'Storage', 'Broker', 'Worker'] diff --git a/fasta2a/applications.py b/fasta2a/applications.py index 33834b4..89a3cda 100644 --- a/fasta2a/applications.py +++ b/fasta2a/applications.py @@ -13,7 +13,6 @@ from starlette.types import ExceptionHandler, Lifespan, Receive, Scope, Send from .broker import Broker -from .event_bus import EventBus from .schema import ( A2AResponse, AgentCapabilities, @@ -37,7 +36,6 @@ def __init__( *, storage: Storage, broker: Broker, - event_bus: EventBus | None = None, # Agent card name: str | None = None, url: str = 'http://localhost:8000', @@ -75,8 +73,7 @@ def __init__( self.default_input_modes = ['application/json'] self.default_output_modes = ['application/json'] - self.event_bus = event_bus or EventBus() - self.task_manager = TaskManager(broker=broker, storage=storage, event_bus=self.event_bus) + self.task_manager = TaskManager(broker=broker, storage=storage) # Setup self._agent_card_json_schema: bytes | None = None diff --git a/fasta2a/broker.py b/fasta2a/broker.py index c84b738..22029db 100644 --- a/fasta2a/broker.py +++ b/fasta2a/broker.py @@ -3,7 +3,7 @@ from abc import ABC, abstractmethod from collections.abc import AsyncIterator from contextlib import AsyncExitStack -from dataclasses import dataclass +from dataclasses import dataclass, field from typing import Annotated, Any, Generic, Literal, TypeVar import anyio @@ -11,6 +11,7 @@ from pydantic import Discriminator from typing_extensions import Self, TypedDict +from .event_bus import EventBus from .schema import TaskIdParams, TaskSendParams tracer = get_tracer(__name__) @@ -27,6 +28,8 @@ class Broker(ABC): extended to support remote workers. """ + event_bus: EventBus = field(default_factory=EventBus) + @abstractmethod async def run_task(self, params: TaskSendParams) -> None: """Send a task to be executed by the worker.""" diff --git a/fasta2a/task_manager.py b/fasta2a/task_manager.py index 9d2809c..d0fecbc 100644 --- a/fasta2a/task_manager.py +++ b/fasta2a/task_manager.py @@ -67,7 +67,6 @@ from typing import Any from .broker import Broker -from .event_bus import EventBus from .schema import ( CancelTaskRequest, CancelTaskResponse, @@ -105,7 +104,6 @@ class TaskManager: broker: Broker storage: Storage[Any] - event_bus: EventBus = field(default_factory=EventBus) _aexit_stack: AsyncExitStack | None = field(default=None, init=False) @@ -186,7 +184,7 @@ async def stream_message(self, request: StreamMessageRequest) -> AsyncIterator[b if history_length is not None: broker_params['history_length'] = history_length - async with self.event_bus.subscribe(task_id) as receive_stream: + async with self.broker.event_bus.subscribe(task_id) as receive_stream: await self.broker.run_task(broker_params) # Send initial task state @@ -221,7 +219,7 @@ async def resubscribe_task(self, request: ResubscribeTaskRequest) -> AsyncIterat if task['status']['state'] in terminal_states: return - async with self.event_bus.subscribe(task_id) as receive_stream: + async with self.broker.event_bus.subscribe(task_id) as receive_stream: async for event in receive_stream: response = StreamMessageResponse(jsonrpc='2.0', id=request_id, result=event) yield self._format_sse_event(response) diff --git a/fasta2a/worker.py b/fasta2a/worker.py index 58501dc..0e22a80 100644 --- a/fasta2a/worker.py +++ b/fasta2a/worker.py @@ -3,14 +3,13 @@ from abc import ABC, abstractmethod from collections.abc import AsyncIterator from contextlib import asynccontextmanager -from dataclasses import dataclass, field +from dataclasses import dataclass from typing import TYPE_CHECKING, Any, Generic import anyio from opentelemetry.trace import get_tracer, use_span from typing_extensions import assert_never -from .event_bus import EventBus from .storage import ContextT, Storage if TYPE_CHECKING: @@ -26,7 +25,6 @@ class Worker(ABC, Generic[ContextT]): broker: Broker storage: Storage[ContextT] - event_bus: EventBus = field(default_factory=EventBus) @asynccontextmanager async def run(self) -> AsyncIterator[None]: @@ -60,7 +58,7 @@ async def _handle_task_operation(self, task_operation: TaskOperation) -> None: task = await self.storage.update_task(task_id, state='failed') from .schema import StreamResponse, TaskStatus, TaskStatusUpdateEvent - await self.event_bus.emit( + await self.broker.event_bus.emit( task_id, StreamResponse( status_update=TaskStatusUpdateEvent( @@ -70,7 +68,7 @@ async def _handle_task_operation(self, task_operation: TaskOperation) -> None: ) ), ) - await self.event_bus.close(task_id) + await self.broker.event_bus.close(task_id) @abstractmethod async def run_task(self, params: TaskSendParams) -> None: ... From f4b1e8daa928b82cc84300f6f1189855dfe62b90 Mon Sep 17 00:00:00 2001 From: Marcelo Trylesinski Date: Sun, 8 Mar 2026 14:54:35 -0700 Subject: [PATCH 11/13] Make EventBus abstract with InMemoryEventBus implementation Matches the Broker/InMemoryBroker pattern so users can swap in a Redis/NATS-backed EventBus for multi-process deployments. --- fasta2a/__init__.py | 3 ++- fasta2a/broker.py | 4 ++-- fasta2a/event_bus.py | 22 ++++++++++++++++++++-- 3 files changed, 24 insertions(+), 5 deletions(-) diff --git a/fasta2a/__init__.py b/fasta2a/__init__.py index 4a8b106..a828d7b 100644 --- a/fasta2a/__init__.py +++ b/fasta2a/__init__.py @@ -1,7 +1,8 @@ from .applications import FastA2A from .broker import Broker +from .event_bus import EventBus, InMemoryEventBus from .schema import Skill from .storage import Storage from .worker import Worker -__all__ = ['FastA2A', 'Skill', 'Storage', 'Broker', 'Worker'] +__all__ = ['FastA2A', 'Skill', 'Storage', 'Broker', 'EventBus', 'InMemoryEventBus', 'Worker'] diff --git a/fasta2a/broker.py b/fasta2a/broker.py index 22029db..e91ffdd 100644 --- a/fasta2a/broker.py +++ b/fasta2a/broker.py @@ -11,7 +11,7 @@ from pydantic import Discriminator from typing_extensions import Self, TypedDict -from .event_bus import EventBus +from .event_bus import EventBus, InMemoryEventBus from .schema import TaskIdParams, TaskSendParams tracer = get_tracer(__name__) @@ -28,7 +28,7 @@ class Broker(ABC): extended to support remote workers. """ - event_bus: EventBus = field(default_factory=EventBus) + event_bus: EventBus = field(default_factory=InMemoryEventBus) @abstractmethod async def run_task(self, params: TaskSendParams) -> None: diff --git a/fasta2a/event_bus.py b/fasta2a/event_bus.py index cf4588f..a47ce31 100644 --- a/fasta2a/event_bus.py +++ b/fasta2a/event_bus.py @@ -2,6 +2,7 @@ from __future__ import annotations as _annotations +from abc import ABC, abstractmethod from collections import defaultdict from collections.abc import AsyncIterator from contextlib import asynccontextmanager @@ -12,13 +13,30 @@ from .schema import StreamResponse -class EventBus: +class EventBus(ABC): """A pub/sub event bus for streaming task events. Allows workers to emit events that are delivered to SSE connections. - Each subscription creates an anyio memory stream pair keyed by task ID. """ + @abstractmethod + @asynccontextmanager + async def subscribe(self, task_id: str) -> AsyncIterator[anyio.abc.ObjectReceiveStream[StreamResponse]]: + """Subscribe to events for a task. Yields a receive stream.""" + yield # type: ignore[misc] + + @abstractmethod + async def emit(self, task_id: str, event: StreamResponse) -> None: + """Emit an event to all subscribers for a task.""" + + @abstractmethod + async def close(self, task_id: str) -> None: + """Close all subscriber streams for a task, signaling end of SSE.""" + + +class InMemoryEventBus(EventBus): + """An in-memory event bus using anyio memory streams.""" + def __init__(self) -> None: self._subscribers: dict[str, list[anyio.abc.ObjectSendStream[StreamResponse]]] = defaultdict(list) From f338c1461890d71ac0304f199447c34a3b6e9436 Mon Sep 17 00:00:00 2001 From: Marcelo Trylesinski Date: Sun, 8 Mar 2026 15:08:04 -0700 Subject: [PATCH 12/13] Add streaming support to A2AClient with integration test --- fasta2a/client.py | 33 ++++++++++ tests/test_streaming.py | 131 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 164 insertions(+) create mode 100644 tests/test_streaming.py diff --git a/fasta2a/client.py b/fasta2a/client.py index cd84499..e059c33 100644 --- a/fasta2a/client.py +++ b/fasta2a/client.py @@ -1,6 +1,7 @@ from __future__ import annotations as _annotations import uuid +from collections.abc import AsyncIterator from typing import Any import pydantic @@ -13,9 +14,13 @@ MessageSendParams, SendMessageRequest, SendMessageResponse, + StreamMessageRequest, + StreamMessageResponse, a2a_request_ta, send_message_request_ta, send_message_response_ta, + stream_message_request_ta, + stream_message_response_ta, ) get_task_response_ta = pydantic.TypeAdapter(GetTaskResponse) @@ -63,6 +68,34 @@ async def send_message( return send_message_response_ta.validate_json(response.content) + async def stream_message( + self, + message: Message, + *, + metadata: dict[str, Any] | None = None, + configuration: MessageSendConfiguration | None = None, + ) -> AsyncIterator[StreamMessageResponse]: + """Stream a message using SSE. + + Yields StreamMessageResponse objects as they arrive. + """ + params = MessageSendParams(message=message) + if metadata is not None: + params['metadata'] = metadata + if configuration is not None: + params['configuration'] = configuration + + request_id = str(uuid.uuid4()) + payload = StreamMessageRequest(jsonrpc='2.0', id=request_id, method='message/stream', params=params) + content = stream_message_request_ta.dump_json(payload, by_alias=True) + async with self.http_client.stream( + 'POST', '/', content=content, headers={'Content-Type': 'application/json'} + ) as response: + async for line in response.aiter_lines(): + if line.startswith('data: '): + data = line[6:] + yield stream_message_response_ta.validate_json(data) + async def get_task(self, task_id: str) -> GetTaskResponse: payload = GetTaskRequest(jsonrpc='2.0', id=None, method='tasks/get', params={'id': task_id}) content = a2a_request_ta.dump_json(payload, by_alias=True) diff --git a/tests/test_streaming.py b/tests/test_streaming.py new file mode 100644 index 0000000..c46b814 --- /dev/null +++ b/tests/test_streaming.py @@ -0,0 +1,131 @@ +from __future__ import annotations as _annotations + +import uuid +from collections.abc import AsyncIterator +from contextlib import asynccontextmanager +from typing import Any + +import httpx +import pytest +from asgi_lifespan import LifespanManager + +from fasta2a.applications import FastA2A +from fasta2a.broker import InMemoryBroker +from fasta2a.client import A2AClient +from fasta2a.schema import ( + Artifact, + Message, + Part, + StreamResponse, + TaskSendParams, + TaskStatus, + TaskStatusUpdateEvent, +) +from fasta2a.storage import InMemoryStorage +from fasta2a.worker import Worker + +pytestmark = pytest.mark.anyio + + +class EchoWorker(Worker[Any]): + """A simple worker that echoes the input message as an artifact.""" + + async def run_task(self, params: TaskSendParams) -> None: + task_id = params['id'] + context_id = params['context_id'] + + # Emit a "working" status update + await self.broker.event_bus.emit( + task_id, + StreamResponse( + status_update=TaskStatusUpdateEvent( + task_id=task_id, + context_id=context_id, + status=TaskStatus(state='working'), + ) + ), + ) + + # Update storage to working + await self.storage.update_task(task_id, state='working') + + # Create an artifact echoing the input + input_parts = params['message']['parts'] + artifact = Artifact(artifact_id=str(uuid.uuid4()), parts=input_parts) + await self.storage.update_task(task_id, state='completed', new_artifacts=[artifact]) + + # Emit completed status + await self.broker.event_bus.emit( + task_id, + StreamResponse( + status_update=TaskStatusUpdateEvent( + task_id=task_id, + context_id=context_id, + status=TaskStatus(state='completed'), + ) + ), + ) + await self.broker.event_bus.close(task_id) + + async def cancel_task(self, params: Any) -> None: + pass + + def build_message_history(self, history: list[Message]) -> list[Any]: + return [] + + def build_artifacts(self, result: Any) -> list[Artifact]: + return [] + + +@asynccontextmanager +async def create_streaming_app() -> AsyncIterator[httpx.AsyncClient]: + broker = InMemoryBroker() + storage = InMemoryStorage() + worker = EchoWorker(broker=broker, storage=storage) + + app = FastA2A(storage=storage, broker=broker) + + @asynccontextmanager + async def lifespan(app: FastA2A) -> AsyncIterator[None]: + async with app.task_manager: + async with worker.run(): + yield + + app = FastA2A(storage=storage, broker=broker, lifespan=lifespan) + + async with LifespanManager(app=app) as manager: + transport = httpx.ASGITransport(app=manager.app) + async with httpx.AsyncClient(transport=transport, base_url='http://testclient') as client: + yield client + + +async def test_stream_message(): + async with create_streaming_app() as http_client: + client = A2AClient(http_client=http_client) + client.http_client.base_url = 'http://testclient' + + message = Message( + role='user', + parts=[Part(text='Hello, world!')], + message_id=str(uuid.uuid4()), + ) + + events: list[dict[str, Any]] = [] + async for response in client.stream_message(message): + if 'result' in response: + events.append(response['result']) + + # Should have: initial task, working status, completed status + assert len(events) == 3 + + # First event: initial task state (submitted) + assert 'task' in events[0] + assert events[0]['task']['status']['state'] == 'submitted' + + # Second event: working status update + assert 'status_update' in events[1] + assert events[1]['status_update']['status']['state'] == 'working' + + # Third event: completed status update + assert 'status_update' in events[2] + assert events[2]['status_update']['status']['state'] == 'completed' From e82dd17e61ef88973750e6a96311aa66870e5a1f Mon Sep 17 00:00:00 2001 From: Marcelo Trylesinski Date: Sun, 8 Mar 2026 15:23:29 -0700 Subject: [PATCH 13/13] Remove EventBus from public API and use correct type in streaming test --- fasta2a/__init__.py | 3 +-- tests/test_streaming.py | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/fasta2a/__init__.py b/fasta2a/__init__.py index a828d7b..4a8b106 100644 --- a/fasta2a/__init__.py +++ b/fasta2a/__init__.py @@ -1,8 +1,7 @@ from .applications import FastA2A from .broker import Broker -from .event_bus import EventBus, InMemoryEventBus from .schema import Skill from .storage import Storage from .worker import Worker -__all__ = ['FastA2A', 'Skill', 'Storage', 'Broker', 'EventBus', 'InMemoryEventBus', 'Worker'] +__all__ = ['FastA2A', 'Skill', 'Storage', 'Broker', 'Worker'] diff --git a/tests/test_streaming.py b/tests/test_streaming.py index c46b814..476a4e8 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -110,7 +110,7 @@ async def test_stream_message(): message_id=str(uuid.uuid4()), ) - events: list[dict[str, Any]] = [] + events: list[StreamResponse] = [] async for response in client.stream_message(message): if 'result' in response: events.append(response['result'])