Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
.cache
__pycache__
.coverage*
4 changes: 2 additions & 2 deletions fasta2a/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from .applications import FastA2A
from .broker import Broker
from .schema import Skill
from .schema import Skill, StreamEvent
from .storage import Storage
from .worker import Worker

__all__ = ['FastA2A', 'Skill', 'Storage', 'Broker', 'Worker']
__all__ = ['FastA2A', 'Skill', 'Storage', 'Broker', 'Worker', 'StreamEvent']
14 changes: 8 additions & 6 deletions fasta2a/applications.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def __init__(
provider: AgentProvider | None = None,
skills: list[Skill] | None = None,
docs_url: str | None = '/docs',
streaming: bool = True,
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.
# Starlette
debug: bool = False,
routes: Sequence[Route] | None = None,
Expand All @@ -69,6 +70,7 @@ def __init__(
self.provider = provider
self.skills = skills or []
self.docs_url = docs_url
self.streaming = streaming
# NOTE: For now, I don't think there's any reason to support any other input/output modes.
self.default_input_modes = ['application/json']
self.default_output_modes = ['application/json']
Expand Down Expand Up @@ -102,7 +104,7 @@ async def _agent_card_endpoint(self, request: Request) -> Response:
skills=self.skills,
default_input_modes=self.default_input_modes,
default_output_modes=self.default_output_modes,
capabilities=AgentCapabilities(streaming=True, push_notifications=False),
capabilities=AgentCapabilities(streaming=self.streaming, push_notifications=False),
)
if self.provider is not None:
agent_card['provider'] = self.provider
Expand Down Expand Up @@ -133,6 +135,11 @@ async def _agent_run_endpoint(self, request: Request) -> Response:
jsonrpc_response: A2AResponse
if a2a_request['method'] == 'message/send':
jsonrpc_response = await self.task_manager.send_message(a2a_request)
elif a2a_request['method'] == 'message/stream':
return StreamingResponse(
self.task_manager.stream_message(a2a_request),
media_type='text/event-stream',
)
elif a2a_request['method'] == 'tasks/get':
jsonrpc_response = await self.task_manager.get_task(a2a_request)
elif a2a_request['method'] == 'tasks/cancel':
Expand All @@ -147,11 +154,6 @@ async def _agent_run_endpoint(self, request: Request) -> Response:
jsonrpc_response = await self.task_manager.delete_task_push_notification_config(a2a_request)
elif a2a_request['method'] == 'tasks/list':
jsonrpc_response = await self.task_manager.list_tasks(a2a_request)
elif a2a_request['method'] == 'message/stream':
return StreamingResponse(
self.task_manager.stream_message(a2a_request),
media_type='text/event-stream',
)
elif a2a_request['method'] == 'tasks/resubscribe':
return StreamingResponse(
self.task_manager.resubscribe_task(a2a_request),
Expand Down
6 changes: 6 additions & 0 deletions fasta2a/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -992,3 +992,9 @@ class StreamResponse(TypedDict):
send_message_response_ta: TypeAdapter[SendMessageResponse] = TypeAdapter(SendMessageResponse)
stream_message_request_ta: TypeAdapter[StreamMessageRequest] = TypeAdapter(StreamMessageRequest)
stream_message_response_ta: TypeAdapter[StreamMessageResponse] = TypeAdapter(StreamMessageResponse)

# Type for streaming events (used by broker and task manager)
StreamEvent = Union[Task, Message, TaskStatusUpdateEvent, TaskArtifactUpdateEvent]
"""A streaming event that can be sent during message/stream requests."""

stream_event_ta: TypeAdapter[StreamEvent] = TypeAdapter(StreamEvent)
Comment on lines +997 to +1000

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 StreamEvent type alias may not be useful without discriminator

The new StreamEvent = Union[Task, Message, TaskStatusUpdateEvent, TaskArtifactUpdateEvent] at fasta2a/schema.py:997 and its TypeAdapter are exported publicly but never used internally. These TypedDicts share overlapping field names (e.g., task_id, context_id) which could make Pydantic's union discrimination unreliable without an explicit Discriminator. This may cause unexpected validation behavior when deserializing ambiguous payloads. Worth verifying the intended use case for this type.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

8 changes: 7 additions & 1 deletion fasta2a/storage.py

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 InMemoryStorage.load_task mutates the stored task's history in place

Pre-existing issue: InMemoryStorage.load_task at fasta2a/storage.py:85-86 does task['history'] = task['history'][-history_length:], which mutates the stored task dict in place, permanently truncating the history. This means subsequent calls to load_task without history_length will still see the truncated history. This is a pre-existing bug, not introduced by this PR, but relevant context since streaming relies on storage state.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,13 @@

from typing_extensions import TypeVar

from .schema import Artifact, Message, Task, TaskState, TaskStatus
from .schema import (
Artifact,
Message,
Task,
TaskState,
TaskStatus,
)

ContextT = TypeVar('ContextT', default=Any)

Expand Down
67 changes: 65 additions & 2 deletions fasta2a/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@
from opentelemetry.trace import get_tracer, use_span
from typing_extensions import assert_never

from .schema import StreamResponse, TaskArtifactUpdateEvent, TaskStatusUpdateEvent
from .storage import ContextT, Storage

if TYPE_CHECKING:
from .broker import Broker, TaskOperation
from .schema import Artifact, Message, TaskIdParams, TaskSendParams
from .schema import Artifact, Message, TaskIdParams, TaskSendParams, TaskState

tracer = get_tracer(__name__)

Expand Down Expand Up @@ -56,7 +57,7 @@ async def _handle_task_operation(self, task_operation: TaskOperation) -> None:
except Exception:
task_id = task_operation['params']['id']
task = await self.storage.update_task(task_id, state='failed')
from .schema import StreamResponse, TaskStatus, TaskStatusUpdateEvent
from .schema import TaskStatus

await self.broker.event_bus.emit(
task_id,
Expand All @@ -70,6 +71,68 @@ async def _handle_task_operation(self, task_operation: TaskOperation) -> None:
)
await self.broker.event_bus.close(task_id)

async def update_task(
self,
task_id: str,
state: TaskState,
new_artifacts: list[Artifact] | None = None,
new_messages: list[Message] | None = None,
) -> None:
"""Update a task's state in storage and publish streaming events to the broker.

This is the primary method workers should use to update task state. It handles
both persisting the update and notifying any stream subscribers.
"""
task = await self.storage.update_task(task_id, state, new_artifacts, new_messages)

final = state in ('completed', 'failed', 'canceled')

# For non-final updates, publish status first
if not final:
await self.broker.event_bus.emit(
task_id,
StreamResponse(
status_update=TaskStatusUpdateEvent(
task_id=task_id,
context_id=task['context_id'],
status=task['status'],
),
),
)

# Publish message events before final status so subscribers receive them
if new_messages:
for message in new_messages:
await self.broker.event_bus.emit(task_id, StreamResponse(message=message))

# Publish artifact events
if new_artifacts:
for artifact in new_artifacts:
await self.broker.event_bus.emit(
task_id,
StreamResponse(
artifact_update=TaskArtifactUpdateEvent(
task_id=task_id,
context_id=task['context_id'],
artifact=artifact,
),
),
)

# For final updates, publish status last (after messages and artifacts)
if final:
await self.broker.event_bus.emit(
task_id,
StreamResponse(
status_update=TaskStatusUpdateEvent(
task_id=task_id,
context_id=task['context_id'],
status=task['status'],
),
),
)
await self.broker.event_bus.close(task_id)
Comment on lines +74 to +134

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 Test still uses direct storage/event_bus calls instead of new Worker.update_task helper

The test in tests/test_streaming.py:30-68 (EchoWorker) manually calls self.storage.update_task() and self.broker.event_bus.emit()/close() directly, rather than using the new Worker.update_task() helper method introduced in this PR. While not a bug, this means the new Worker.update_task method has no test coverage. If the intent is for workers to use the helper, the test should be updated to validate the helper's behavior (including the ordering of status/message/artifact events).

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.


@abstractmethod
async def run_task(self, params: TaskSendParams) -> None: ...

Expand Down
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ dependencies = [
"pydantic>=2.10",
"opentelemetry-api>=1.28.0",
"eval_type_backport>=0.2.2; python_version <= '3.9'",
"sse-starlette>=2.0.0",
]

[project.optional-dependencies]
Expand All @@ -58,8 +59,11 @@ dev = [
"asgi-lifespan",
"coverage",
"httpx",
"httpx-sse",
"inline-snapshot",
"pytest",
"pytest-asyncio",
"pytest-mock",
"ruff",
"pyright",
]
Expand Down
Loading
Loading