Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 8 additions & 12 deletions fasta2a/applications.py
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from starlette.applications import Starlette
from starlette.middleware import Middleware
from starlette.requests import Request
from starlette.responses import FileResponse, Response
from starlette.responses import FileResponse, Response, StreamingResponse
from starlette.routing import Route
from starlette.types import ExceptionHandler, Lifespan, Receive, Scope, Send

Expand All @@ -19,9 +19,7 @@
AgentCard,
AgentInterface,
AgentProvider,
SendMessageResponse,
Skill,
UnsupportedOperationError,
a2a_request_ta,
a2a_response_ta,
agent_card_ta,
Expand Down Expand Up @@ -104,7 +102,7 @@ async def _agent_card_endpoint(self, request: Request) -> Response:
skills=self.skills,
default_input_modes=self.default_input_modes,
default_output_modes=self.default_output_modes,
capabilities=AgentCapabilities(streaming=False, push_notifications=False),
capabilities=AgentCapabilities(streaming=True, push_notifications=False),

@devin-ai-integration devin-ai-integration Bot Mar 8, 2026

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 AgentCard streaming capability is now unconditionally True

At fasta2a/applications.py:108, streaming is hardcoded to True. This means the agent card always advertises streaming support regardless of whether the EventBus is actually wired up correctly or whether the broker implementation supports it. Previously this was False. Consider whether this should be conditional on whether an EventBus was explicitly provided (i.e., streaming=event_bus is not None when the parameter was passed, vs the default).

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

)
if self.provider is not None:
agent_card['provider'] = self.provider
Expand Down Expand Up @@ -150,16 +148,14 @@ async def _agent_run_endpoint(self, request: Request) -> Response:
elif a2a_request['method'] == 'tasks/list':
jsonrpc_response = await self.task_manager.list_tasks(a2a_request)
elif a2a_request['method'] == 'message/stream':
jsonrpc_response = SendMessageResponse(
jsonrpc='2.0',
id=a2a_request['id'],
error=UnsupportedOperationError(code=-32004, message='This operation is not supported'),
return StreamingResponse(
self.task_manager.stream_message(a2a_request),
media_type='text/event-stream',
)
elif a2a_request['method'] == 'tasks/resubscribe':
jsonrpc_response = SendMessageResponse(
jsonrpc='2.0',
id=a2a_request['id'],
error=UnsupportedOperationError(code=-32004, message='This operation is not supported'),
return StreamingResponse(
self.task_manager.resubscribe_task(a2a_request),
media_type='text/event-stream',
)
else:
raise NotImplementedError(f'Method {a2a_request["method"]} not implemented.')
Expand Down
5 changes: 4 additions & 1 deletion fasta2a/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
from abc import ABC, abstractmethod
from collections.abc import AsyncIterator
from contextlib import AsyncExitStack
from dataclasses import dataclass
from dataclasses import dataclass, field
from typing import Annotated, Any, Generic, Literal, TypeVar

import anyio
from opentelemetry.trace import Span, get_current_span, get_tracer
from pydantic import Discriminator
from typing_extensions import Self, TypedDict

from .event_bus import EventBus, InMemoryEventBus
from .schema import TaskIdParams, TaskSendParams

tracer = get_tracer(__name__)
Expand All @@ -27,6 +28,8 @@ class Broker(ABC):
extended to support remote workers.
"""

event_bus: EventBus = field(default_factory=InMemoryEventBus)

@abstractmethod
async def run_task(self, params: TaskSendParams) -> None:
"""Send a task to be executed by the worker."""
Expand Down
33 changes: 33 additions & 0 deletions fasta2a/client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations as _annotations

import uuid
from collections.abc import AsyncIterator
from typing import Any

import pydantic
Expand All @@ -13,9 +14,13 @@
MessageSendParams,
SendMessageRequest,
SendMessageResponse,
StreamMessageRequest,
StreamMessageResponse,
a2a_request_ta,
send_message_request_ta,
send_message_response_ta,
stream_message_request_ta,
stream_message_response_ta,
)

get_task_response_ta = pydantic.TypeAdapter(GetTaskResponse)
Expand Down Expand Up @@ -63,6 +68,34 @@ async def send_message(

return send_message_response_ta.validate_json(response.content)

async def stream_message(
self,
message: Message,
*,
metadata: dict[str, Any] | None = None,
configuration: MessageSendConfiguration | None = None,
) -> AsyncIterator[StreamMessageResponse]:
"""Stream a message using SSE.

Yields StreamMessageResponse objects as they arrive.
"""
params = MessageSendParams(message=message)
if metadata is not None:
params['metadata'] = metadata
if configuration is not None:
params['configuration'] = configuration

request_id = str(uuid.uuid4())
payload = StreamMessageRequest(jsonrpc='2.0', id=request_id, method='message/stream', params=params)
content = stream_message_request_ta.dump_json(payload, by_alias=True)
async with self.http_client.stream(
'POST', '/', content=content, headers={'Content-Type': 'application/json'}
) as response:
async for line in response.aiter_lines():
if line.startswith('data: '):
data = line[6:]
yield stream_message_response_ta.validate_json(data)
Comment on lines +91 to +97

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Client stream_message silently ignores HTTP error status codes

The stream_message method in client.py does not check the HTTP response status code before iterating over SSE lines. Unlike send_message (which calls self._raise_for_status(response) at fasta2a/client.py:67), stream_message proceeds directly to parse lines from the response body. If the server returns an HTTP error (e.g., 400, 500), the client will silently iterate over the error response body, find no data: prefixed lines, and return without yielding anything — giving the caller no indication that an error occurred.

Suggested change
async with self.http_client.stream(
'POST', '/', content=content, headers={'Content-Type': 'application/json'}
) as response:
async for line in response.aiter_lines():
if line.startswith('data: '):
data = line[6:]
yield stream_message_response_ta.validate_json(data)
async with self.http_client.stream(
'POST', '/', content=content, headers={'Content-Type': 'application/json'}
) as response:
self._raise_for_status(response)
async for line in response.aiter_lines():
if line.startswith('data: '):
data = line[6:]
yield stream_message_response_ta.validate_json(data)
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.


async def get_task(self, task_id: str) -> GetTaskResponse:
payload = GetTaskRequest(jsonrpc='2.0', id=None, method='tasks/get', params={'id': task_id})
content = a2a_request_ta.dump_json(payload, by_alias=True)
Expand Down
70 changes: 70 additions & 0 deletions fasta2a/event_bus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
"""Event bus for streaming task updates to SSE connections."""

from __future__ import annotations as _annotations

from abc import ABC, abstractmethod
from collections import defaultdict
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager

import anyio
import anyio.abc

from .schema import StreamResponse


class EventBus(ABC):
"""A pub/sub event bus for streaming task events.

Allows workers to emit events that are delivered to SSE connections.
"""

@abstractmethod
@asynccontextmanager
async def subscribe(self, task_id: str) -> AsyncIterator[anyio.abc.ObjectReceiveStream[StreamResponse]]:
"""Subscribe to events for a task. Yields a receive stream."""
yield # type: ignore[misc]

@abstractmethod
async def emit(self, task_id: str, event: StreamResponse) -> None:
"""Emit an event to all subscribers for a task."""

@abstractmethod
async def close(self, task_id: str) -> None:
"""Close all subscriber streams for a task, signaling end of SSE."""


class InMemoryEventBus(EventBus):
"""An in-memory event bus using anyio memory streams."""

def __init__(self) -> None:
self._subscribers: dict[str, list[anyio.abc.ObjectSendStream[StreamResponse]]] = defaultdict(list)

@asynccontextmanager
async def subscribe(self, task_id: str) -> AsyncIterator[anyio.abc.ObjectReceiveStream[StreamResponse]]:
"""Subscribe to events for a task. Yields a receive stream."""
send_stream, receive_stream = anyio.create_memory_object_stream[StreamResponse]()

@devin-ai-integration devin-ai-integration Bot Mar 8, 2026

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 unbounded memory object streams could accumulate events if consumer is slow

In fasta2a/event_bus.py:28, anyio.create_memory_object_stream[StreamResponse]() is called without a max_buffer_size argument. The default buffer size for anyio memory streams is 0 (unbuffered, meaning sends block until a receiver is ready). This means if the worker emits events faster than the SSE consumer reads them, the worker's emit() call will block (apply backpressure). This is actually reasonable behavior for SSE — it prevents unbounded memory growth — but it means a slow client could block the worker from processing other tasks if the worker handles task operations sequentially (which it does, at fasta2a/worker.py:44).

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

self._subscribers[task_id].append(send_stream)
try:
yield receive_stream
finally:
subscribers = self._subscribers.get(task_id)
if subscribers is not None:
try:
subscribers.remove(send_stream)
except ValueError:
pass
if not subscribers:
del self._subscribers[task_id]
await send_stream.aclose()
await receive_stream.aclose()

async def emit(self, task_id: str, event: StreamResponse) -> None:
"""Emit an event to all subscribers for a task."""
for send_stream in self._subscribers.get(task_id, []):
await send_stream.send(event)
Comment on lines +62 to +65

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Iterating over live subscriber list in emit while close or subscribe cleanup can mutate it concurrently

In InMemoryEventBus.emit() at event_bus.py:64, the code iterates directly over the live list object self._subscribers.get(task_id, []). Each iteration calls await send_stream.send(event) which is a suspension point. During that suspension, a different task (e.g., an SSE handler whose client disconnected) could run the subscribe context manager's finally block, which calls subscribers.remove(send_stream) on the same list at event_bus.py:54. Mutating a list during iteration can cause elements to be skipped. Similarly, close() at event_bus.py:69 pops the list and closes all send streams, so a concurrent emit could call send on an already-closed stream, raising ClosedResourceError. This would crash the worker's task processing.

Suggested change
async def emit(self, task_id: str, event: StreamResponse) -> None:
"""Emit an event to all subscribers for a task."""
for send_stream in self._subscribers.get(task_id, []):
await send_stream.send(event)
async def emit(self, task_id: str, event: StreamResponse) -> None:
"""Emit an event to all subscribers for a task."""
for send_stream in list(self._subscribers.get(task_id, [])):
await send_stream.send(event)
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Comment on lines +62 to +65

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 InMemoryEventBus.emit iterates list while awaiting — mutation risk

In emit(), the code iterates directly over self._subscribers.get(task_id, []) and calls await send_stream.send(event) for each stream. During each await, the event loop could schedule other tasks that modify the subscriber list (e.g., a subscriber unsubscribing). In the current InMemoryBroker setup, the worker processes tasks sequentially in _loop (fasta2a/worker.py:40-42), making this unlikely. However, a custom broker or concurrent task handling could trigger a RuntimeError: list modified during iteration. A snapshot (list(...)) before iteration would be safer.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.


async def close(self, task_id: str) -> None:
"""Close all subscriber streams for a task, signaling end of SSE."""
for send_stream in self._subscribers.pop(task_id, []):
await send_stream.aclose()
69 changes: 68 additions & 1 deletion fasta2a/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
from __future__ import annotations as _annotations

import uuid
from collections.abc import AsyncIterator
from contextlib import AsyncExitStack
from dataclasses import dataclass, field
from typing import Any
Expand All @@ -80,14 +81,19 @@
ListTasksRequest,
ListTasksResponse,
PushNotificationNotSupportedError,
ResubscribeTaskRequest,
SendMessageRequest,
SendMessageResponse,
SendMessageResult,
SetTaskPushNotificationRequest,
SetTaskPushNotificationResponse,
StreamMessageRequest,
StreamMessageResponse,
StreamResponse,
TaskNotFoundError,
TaskSendParams,
UnsupportedOperationError,
stream_message_response_ta,
)
from .storage import Storage

Expand Down Expand Up @@ -119,7 +125,7 @@ async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any):
self._aexit_stack = None

async def send_message(self, request: SendMessageRequest) -> SendMessageResponse:
"""Send a message using the A2A v0.3.0 protocol."""
"""Send a message using the A2A protocol."""
request_id = request['id']
message = request['params']['message']
context_id = message.get('context_id', str(uuid.uuid4()))
Expand Down Expand Up @@ -163,6 +169,67 @@ async def cancel_task(self, request: CancelTaskRequest) -> CancelTaskResponse:
)
return CancelTaskResponse(jsonrpc='2.0', id=request['id'], result=task)

async def stream_message(self, request: StreamMessageRequest) -> AsyncIterator[bytes]:
"""Stream a message response as SSE events."""
request_id = request['id']
message = request['params']['message']
context_id = message.get('context_id', str(uuid.uuid4()))

task = await self.storage.submit_task(context_id, message)
task_id = task['id']

broker_params: TaskSendParams = {'id': task_id, 'context_id': context_id, 'message': message}
config = request['params'].get('configuration', {})
history_length = config.get('history_length')
if history_length is not None:
broker_params['history_length'] = history_length

async with self.broker.event_bus.subscribe(task_id) as receive_stream:
await self.broker.run_task(broker_params)

# Send initial task state
initial_response = StreamMessageResponse(jsonrpc='2.0', id=request_id, result=StreamResponse(task=task))
yield self._format_sse_event(initial_response)

async for event in receive_stream:
response = StreamMessageResponse(jsonrpc='2.0', id=request_id, result=event)
yield self._format_sse_event(response)

async def resubscribe_task(self, request: ResubscribeTaskRequest) -> AsyncIterator[bytes]:
"""Resubscribe to an existing task's event stream."""
request_id = request['id']
task_id = request['params']['id']

task = await self.storage.load_task(task_id)
if task is None:
error_response = StreamMessageResponse(
jsonrpc='2.0',
id=request_id,
error=TaskNotFoundError(code=-32001, message='Task not found'),
)
yield self._format_sse_event(error_response)
return

# Send current task state
initial_response = StreamMessageResponse(jsonrpc='2.0', id=request_id, result=StreamResponse(task=task))
yield self._format_sse_event(initial_response)

# If task is already in a terminal state, no need to subscribe
terminal_states = {'completed', 'canceled', 'failed', 'rejected'}
if task['status']['state'] in terminal_states:
return

async with self.broker.event_bus.subscribe(task_id) as receive_stream:
async for event in receive_stream:
response = StreamMessageResponse(jsonrpc='2.0', id=request_id, result=event)
yield self._format_sse_event(response)
Comment on lines +198 to +225

@devin-ai-integration devin-ai-integration Bot Mar 8, 2026

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 Race condition in resubscribe_task between terminal state check and subscribe is safe for InMemory but fragile for distributed implementations

In fasta2a/task_manager.py:217-225, the task state is checked for terminal status at line 219, and then the event bus subscription happens at line 222. For InMemoryStorage, the task variable returned by load_task is the same mutable dict stored internally (fasta2a/storage.py:78), so the check at line 219 sees real-time state updates from the worker. Additionally, between line 219 (the if check) and line 222 (async with subscribe), there are no await points, so no context switch can occur. However, if Storage were a remote/distributed implementation (e.g., Redis, database) where load_task returns a copy, the check at line 219 would use stale data and a race between the check and the subscribe could cause the SSE stream to hang forever. Consider subscribing BEFORE checking terminal state to close this window for future implementations.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Comment on lines +217 to +225

@devin-ai-integration devin-ai-integration Bot Mar 8, 2026

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 TOCTOU race in resubscribe_task causes SSE connection to hang forever

In resubscribe_task, the task state is loaded at line 203, the initial response is yielded at line 215 (context switch), and then the terminal state is checked at line 219 using the stale task dict before subscribing at line 222. Between the yield at line 215 and the subscribe at line 222, the worker (running in a separate task) can complete processing, call emit() (no subscribers yet, so events are silently dropped at event_bus.py:64), and call close() (no subscribers to close at event_bus.py:69). With any non-shared-dict Storage implementation (i.e., anything other than InMemoryStorage), the terminal state check at line 219 uses a stale snapshot and won't detect the completion, so the code proceeds to subscribe at line 222. The new subscriber will then block forever in async for event in receive_stream at line 223, since the worker has already finished and will never emit or close again.

Why InMemoryStorage accidentally masks this bug

With InMemoryStorage, load_task returns a reference to the same mutable dict object stored internally. When the worker updates the task state via update_task (which replaces task['status']), the task variable in resubscribe_task reflects the updated state because it points to the same dict. So the terminal check at line 219 would catch the completion. However, this relies on a coincidental implementation detail of InMemoryStorage and breaks with any database-backed or copy-returning Storage implementation.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.


@staticmethod
def _format_sse_event(response: StreamMessageResponse) -> bytes:
"""Format a StreamMessageResponse as an SSE event."""
data = stream_message_response_ta.dump_json(response, by_alias=True)
return b'data: ' + data + b'\n\n'

async def set_task_push_notification(
self, request: SetTaskPushNotificationRequest
) -> SetTaskPushNotificationResponse:
Expand Down
16 changes: 15 additions & 1 deletion fasta2a/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,21 @@ async def _handle_task_operation(self, task_operation: TaskOperation) -> None:
else:
assert_never(task_operation)
except Exception:
await self.storage.update_task(task_operation['params']['id'], state='failed')
task_id = task_operation['params']['id']
task = await self.storage.update_task(task_id, state='failed')
from .schema import StreamResponse, TaskStatus, TaskStatusUpdateEvent

await self.broker.event_bus.emit(
task_id,
StreamResponse(
status_update=TaskStatusUpdateEvent(
task_id=task_id,
context_id=task['context_id'],
status=TaskStatus(state='failed'),
)
),
)
await self.broker.event_bus.close(task_id)

@abstractmethod
async def run_task(self, params: TaskSendParams) -> None: ...
Comment on lines 73 to 74

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 Worker implementations must emit their own streaming events for intermediate updates

The Worker base class (fasta2a/worker.py:44-71) only emits events via the event bus in the error path. For intermediate updates (e.g., 'working' status, artifact updates during streaming), each concrete Worker.run_task implementation must call self.broker.event_bus.emit() directly. This contract is not documented anywhere in the base class or README. The README example worker at README.md:54-76 doesn't use event_bus at all, which means it won't produce any streaming events even for the message/stream endpoint. This is a documentation/design gap that could confuse implementors.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Expand Down
2 changes: 1 addition & 1 deletion tests/test_applications.py
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ async def test_agent_card():
'defaultInputModes': ['application/json'],
'defaultOutputModes': ['application/json'],
'capabilities': {
'streaming': False,
'streaming': True,
'pushNotifications': False,
},
}
Expand Down
Loading
Loading