Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added .coverage
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
90 changes: 90 additions & 0 deletions STREAMING_SUMMARY.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# SSE Streaming Implementation Summary

## Overview
Added real-time streaming support to FastA2A via Server-Sent Events (SSE), enabling agents to stream responses as they are generated, making the framework fully compliant with the A2A specification v0.2.5.

## Key Changes

### 1. Broker Layer (fasta2a/broker.py)
- Added abstract methods `send_stream_event()` and `subscribe_to_stream()` to the Broker interface
- Implemented pub/sub infrastructure in InMemoryBroker:
- Thread-safe subscriber management with locks
- Automatic cleanup of disconnected subscribers
- Support for streaming Task, Message, TaskStatusUpdateEvent, and TaskArtifactUpdateEvent

### 2. Application Layer (fasta2a/applications.py)
- Added configurable `streaming` parameter to FastA2A constructor
- Implemented `message/stream` endpoint that returns EventSourceResponse
- Updated agent capabilities to correctly report streaming support

### 3. Task Manager (fasta2a/task_manager.py)
- Added `stream_message()` async generator method
- Yields initial task, then streams all subsequent events from broker
- Handles task execution in background while streaming events

### 4. Worker Integration
- Workers can emit streaming events using `broker.send_stream_event()`
- No changes to Worker base class needed - direct broker usage is cleaner

### 5. Dependencies
- Added `sse-starlette>=2.0.0` for SSE response handling
- Added `httpx-sse` to dev dependencies for testing

## Testing
- Comprehensive unit tests for broker pub/sub (7 tests)
- Integration tests for streaming endpoint (8 tests)
- Unit tests for task manager streaming (12 tests)
- Unit tests for agent card functionality (5 tests)
- Total: 32 tests, 90.18% coverage
- Tests use proper async synchronization instead of sleep-based timing

## Usage Example

```python
# Enable streaming in FastA2A
app = FastA2A(
storage=storage,
broker=broker,
streaming=True # Enable SSE streaming
)

# Workers emit events during execution
async def run_task(self, params: TaskSendParams):
task_id = params['id']

# Emit status updates
await self.broker.send_stream_event(task_id, {
'kind': 'status-update',
'task_id': task_id,
'status': {'state': 'working'},
'final': False
})

# Emit artifact chunks
await self.broker.send_stream_event(task_id, {
'kind': 'artifact-update',
'task_id': task_id,
'artifact': {'parts': [{'kind': 'text', 'text': 'Chunk 1'}]},
'append': False
})
```

## Client Usage

```python
# Using httpx-sse client
async with aconnect_sse(client, 'POST', '/', json={
'jsonrpc': '2.0',
'method': 'message/stream',
'params': {'message': {...}}
}) as event_source:
async for sse in event_source.aiter_sse():
event = json.loads(sse.data)
# Process streaming event
```

## Benefits
- Real-time response streaming reduces perceived latency
- Supports incremental artifact updates
- Fully compliant with A2A specification
- Backwards compatible - streaming is optional
231 changes: 231 additions & 0 deletions a2a.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
────────────────────────────────────────────────────────
Agent‑to‑Agent Protocol (A2A) — Comprehensive Reference
Spec version : 0.2.3 • Publish date : 2025‑06‑14
Homepage : https://a2aproject.github.io/A2A/latest/
────────────────────────────────────────────────────────

1. PURPOSE & SCOPE
────────────────────────────────────────────────────────
A2A defines **how independent AI agents communicate as peers**, with
built‑in discovery, capability negotiation, long‑running task management,
streaming, push notifications and enterprise‑grade security. Its goals are:

• Interoperability between heterogeneous agent stacks
• Dynamic discovery of skills & auth requirements
• Secure, opaque collaboration (no internal state leaks)
• First‑class support for async / streaming & human‑in‑loop workflows
• Basing everything on well‑known Web standards (HTTP, JSON‑RPC 2.0, SSE) :contentReference[oaicite:0]{index=0}

2. CORE MODEL
────────────────────────────────────────────────────────
Concept | Description
-------------------- | -----------------------------------------------------
A2A Client | Initiator acting for a user / upstream agent
A2A Server | Remote agent exposing an A2A endpoint
Task | Long‑lived stateful unit of work
Message | One conversational turn; contains **Part[]**
Artifact | Durable output produced by a task
Agent Card | JSON metadata document describing identity, skills,
| endpoint URL & security requirements :contentReference[oaicite:1]{index=1}

3. TRANSPORT & ENCODING
────────────────────────────────────────────────────────
Transport : **HTTPS** (TLS 1.2+) POST to Agent Card .url
Payload format : **JSON‑RPC 2.0** (`Content‑Type: application/json`)
Streaming channel : **Server‑Sent Events** (`text/event-stream`) used by
`message/stream` and `tasks/resubscribe`.
SSE payload : each event’s *data* field is a full JSON‑RPC Response. :contentReference[oaicite:2]{index=2}

4. SECURITY MODEL
────────────────────────────────────────────────────────
Layered security = transport TLS + declared auth schemes.

• Auth schemes are declared in `AgentCard.securitySchemes`
– mirrors OpenAPI: `apiKey`, `http` (Basic / Bearer), `oauth2`,
`openIdConnect`.
• Required scheme combinations are listed in `AgentCard.security`
• Missing / invalid credentials → HTTP 401/403 with `WWW‑Authenticate`.
• Optional **secondary (in‑task) auth** — task transitions to
`auth-required` until satisfied. :contentReference[oaicite:3]{index=3}

5. DISCOVERY – THE AGENT CARD
────────────────────────────────────────────────────────
Recommended URL : `https://{host}/.well‑known/agent.json`
Minimal top‑level schema (TS‑style):

```ts
interface AgentCard {
name: string;
description: string;
url: string; // base A2A endpoint
provider?: AgentProvider; // org info
version: string; // agent impl version
iconUrl?: string; // 64×64+ PNG/SVG
documentationUrl?: string;
capabilities: AgentCapabilities;
securitySchemes?: { [name: string]: SecurityScheme };
security?: { [name: string]: string[] }[];
defaultInputModes: string[]; // e.g. ["text/plain","application/json"]
defaultOutputModes: string[]; // e.g. ["application/json"]
skills: AgentSkill[];
supportsAuthenticatedExtendedCard?: boolean; // if true: expose
}
``` :contentReference[oaicite:4]{index=4}

Key sub‑objects
• **AgentCapabilities** → `{ streaming?: bool, pushNotifications?: bool,
stateTransitionHistory?: bool, extensions?: AgentExtension[] }`
• **AgentSkill** → `{ id, name, description, tags[], examples?[],
inputModes?, outputModes? }`
• **SecurityScheme** → union of OpenAPI schemes (API Key, HTTP, OAuth2,
OpenID Connect). :contentReference[oaicite:5]{index=5}

6. DATA OBJECTS
────────────────────────────────────────────────────────
6.1 Task
```ts
interface Task {
id: string;
contextId: string;
status: TaskStatus;
artifacts?: Artifact[];
history?: Message[];
metadata?: Record<string,any>;
kind: "task";
}
``` :contentReference[oaicite:6]{index=6}

6.2 TaskStatus & TaskState enum
States: `submitted | working | input‑required | completed | canceled |
failed | rejected | auth‑required | unknown`.
`completed, canceled, failed, rejected` are **terminal**. :contentReference[oaicite:7]{index=7}

6.3 Message & Part
```ts
type Part = TextPart | FilePart | DataPart;

interface Message {
role: "user" | "agent";
parts: Part[];
messageId: string;
contextId?: string;
taskId?: string;
referenceTaskIds?: string[];
metadata?: Record<string,any>;
extensions?: string[];
kind: "message";
}
``` :contentReference[oaicite:8]{index=8}

• **TextPart**→ `{ kind:"text", text }`
• **FilePart**→ `{ kind:"file", file: FileWithBytes | FileWithUri }`
• **DataPart**→ `{ kind:"data", data: {...} }`
• **Artifact**→ `{ artifactId, parts:Part[], name?, description?, metadata?, extensions? }` :contentReference[oaicite:9]{index=9}

6.4 PushNotificationConfig
```ts
interface PushNotificationConfig {
id?: string; // server‑assigned
url: string; // HTTPS webhook
token?: string; // opaque HMAC/shared secret
authentication?: PushNotificationAuthenticationInfo;
}
``` :contentReference[oaicite:10]{index=10}

6.5 JSON‑RPC Structures
`JSONRPCRequest`, `JSONRPCResponse`, `JSONRPCError` – standard 2.0
shapes, plus A2A‑specific `result` payloads. :contentReference[oaicite:11]{index=11}

7. RPC METHOD CATALOG
────────────────────────────────────────────────────────
All calls are **POST** `AgentCard.url` with a JSON‑RPC 2.0 request.

Method | Params → Result (success) | Notes
---------------------------- | --------------------------------------------- | ------------------------------------------------------
`message/send` | **MessageSendParams** → `Task | Message` | Sync request; client polls `tasks/get` or SSE
`message/stream` | MessageSendParams → *SSE* stream of `Message` \| `Task` \| `TaskStatusUpdateEvent` \| `TaskArtifactUpdateEvent` | Requires `capabilities.streaming=true`
`tasks/get` | **/* TaskQueryParams** → `Task` | Poll task status / history
`tasks/cancel` | **TaskIdParams** → `Task` | Attempt cancellation
`tasks/pushNotificationConfig/set` | **TaskPushNotificationConfig** → same | Requires `capabilities.pushNotifications=true`
`tasks/pushNotificationConfig/get` | GetTaskPushNotificationConfigParams → TaskPushNotificationConfig |
`tasks/pushNotificationConfig/list`| ListTaskPushNotificationConfigParams → TaskPushNotificationConfig[] |
`tasks/pushNotificationConfig/delete`| TaskIdParams + configId → (void) |
`tasks/resubscribe` | TaskIdParams → *SSE* | Re‑attach after network drop
`agent/authenticatedExtendedCard` | AuthenticatedExtendedCardParams → AgentCard | Returns richer card after auth :contentReference[oaicite:12]{index=12}

Essential parameter types
```ts
interface MessageSendParams {
message: Message;
configuration?: {
acceptedOutputModes: string[];
historyLength?: number;
pushNotificationConfig?: PushNotificationConfig;
blocking?: boolean; // if true: HTTP held open until terminal
};
metadata?: Record<string,any>;
}

interface TaskQueryParams { id: string; historyLength?: number }
interface TaskIdParams { id: string }
interface TaskPushNotificationConfig { taskId: string; pushNotificationConfig: PushNotificationConfig }
``` :contentReference[oaicite:13]{index=13}

7.1 Streaming events
• **TaskStatusUpdateEvent** → `{ taskId, contextId, kind:"status‑update", status:TaskStatus, final:boolean }`
• **TaskArtifactUpdateEvent** → `{ taskId, contextId, kind:"artifact-update", artifact:Artifact, append?, lastChunk? }` :contentReference[oaicite:14]{index=14}

8. ERROR HANDLING
────────────────────────────────────────────────────────
A2A reuses JSON‑RPC codes and reserves **‑32000…‑32099** for protocol‑specific
errors.

Code | Name | Typical meaning
------ | --------------------------------- | ----------------------------------------
‑32001 | TaskNotFoundError | Unknown/expired task ID
‑32002 | TaskNotCancelableError | Task is not in cancelable state
‑32003 | PushNotificationNotSupportedError | Server lacks push‑notification capability
‑32004 | UnsupportedOperationError | Feature or parameter not supported
‑32005 | ContentTypeNotSupportedError | Unaccepted MIME type in parts/artifacts
‑32006 | InvalidAgentResponseError | Server produced invalid response shape :contentReference[oaicite:15]{index=15}
Standard codes ‑32700…‑32603 follow JSON‑RPC 2.0. Include explanatory
`message` and optional structured `data`.

9. TYPICAL WORKFLOWS (NON‑NORMATIVE)
────────────────────────────────────────────────────────
**Synchronous / Polling**

1. Fetch Agent Card → pick skill → form first `Message`
2. `message/send` → server returns `Task { state:working }`
3. Client polls `tasks/get` until terminal state
4. On `completed` → download artifacts/messages as needed

**Streaming**

1. `message/stream` → HTTP 200 + SSE
2. Parse stream events; optionally `tasks/resubscribe` if dropped
3. Close stream when final status update or artifact chunk with `lastChunk:true`

**Input‑Required turn**

*Task* enters `input‑required` → client collects user input → call
`message/send` with same `taskId` (inside Message.taskId). Task continues.

**Push‑notifications**

• Register via `tasks/pushNotificationConfig/set` → Server POSTs JSON‑RPC
envelopes to `url`, signed per `token` and `authentication`. :contentReference[oaicite:16]{index=16}

10. EXTENSIBILITY
────────────────────────────────────────────────────────
• **Extensions** declared in `AgentCapabilities.extensions` and echoed in
messages/artifacts via the `extensions` string array.
• New RPC methods MAY be added using `namespace/verb` naming, provided
clients fail gracefully on `‑32601 Method not found`.
• New TaskState values SHOULD reserve lower‑case, dash‑separated strings and
be treated as non‑terminal unless explicitly documented.

────────────────────────────────────────────────────────
END OF SPEC (A2A v0.2.3)
──────────────────────────────────────────────────────── */

27 changes: 26 additions & 1 deletion fasta2a/applications.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from __future__ import annotations as _annotations

import json
from collections.abc import AsyncIterator, Sequence
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Any

from sse_starlette import EventSourceResponse
from starlette.applications import Starlette
from starlette.middleware import Middleware
from starlette.requests import Request
Expand All @@ -21,6 +23,8 @@
a2a_request_ta,
a2a_response_ta,
agent_card_ta,
stream_event_ta,
stream_message_request_ta,
)
from .storage import Storage
from .task_manager import TaskManager
Expand All @@ -41,6 +45,7 @@ def __init__(
description: str | None = None,
provider: AgentProvider | None = None,
skills: list[Skill] | None = None,
streaming: bool = False,
# Starlette
debug: bool = False,
routes: Sequence[Route] | None = None,
Expand All @@ -65,6 +70,7 @@ def __init__(
self.description = description
self.provider = provider
self.skills = skills or []
self.streaming = streaming
# NOTE: For now, I don't think there's any reason to support any other input/output modes.
self.default_input_modes = ['application/json']
self.default_output_modes = ['application/json']
Expand Down Expand Up @@ -94,7 +100,7 @@ async def _agent_card_endpoint(self, request: Request) -> Response:
default_input_modes=self.default_input_modes,
default_output_modes=self.default_output_modes,
capabilities=AgentCapabilities(
streaming=False, push_notifications=False, state_transition_history=False
streaming=self.streaming, push_notifications=False, state_transition_history=False
),
)
if self.provider is not None:
Expand Down Expand Up @@ -125,6 +131,25 @@ async def _agent_run_endpoint(self, request: Request) -> Response:

if a2a_request['method'] == 'message/send':
jsonrpc_response = await self.task_manager.send_message(a2a_request)
elif a2a_request['method'] == 'message/stream':
# Parse the streaming request
stream_request = stream_message_request_ta.validate_json(data)

# Create an async generator wrapper that formats events as JSON-RPC responses
async def sse_generator():
request_id = stream_request.get('id')
async for event in self.task_manager.stream_message(stream_request):
# Serialize event to ensure proper camelCase conversion
event_dict = stream_event_ta.dump_python(event, mode='json', by_alias=True)

# Wrap in JSON-RPC response
jsonrpc_response = {'jsonrpc': '2.0', 'id': request_id, 'result': event_dict}

# Convert to JSON string
yield json.dumps(jsonrpc_response)

# Return SSE response
return EventSourceResponse(sse_generator())
elif a2a_request['method'] == 'tasks/get':
jsonrpc_response = await self.task_manager.get_task(a2a_request)
elif a2a_request['method'] == 'tasks/cancel':
Expand Down
Loading
Loading