-
Notifications
You must be signed in to change notification settings - Fork 26
🎥 Add streaming support
#43
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
3e60fa1
8a5dbfe
5903cff
d16abca
23907cf
dec5581
33ee9c5
1709673
2db81b8
ab97e62
e5e2be8
447153f
6c0f423
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,2 +1,3 @@ | ||
| .cache | ||
| __pycache__ | ||
| .coverage* |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,7 +1,7 @@ | ||
| from .applications import FastA2A | ||
| from .broker import Broker | ||
| from .schema import Skill | ||
| from .schema import Skill, StreamEvent | ||
| from .storage import Storage | ||
| from .worker import Worker | ||
|
|
||
| __all__ = ['FastA2A', 'Skill', 'Storage', 'Broker', 'Worker'] | ||
| __all__ = ['FastA2A', 'Skill', 'Storage', 'Broker', 'Worker', 'StreamEvent'] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -992,3 +992,9 @@ class StreamResponse(TypedDict): | |
| send_message_response_ta: TypeAdapter[SendMessageResponse] = TypeAdapter(SendMessageResponse) | ||
| stream_message_request_ta: TypeAdapter[StreamMessageRequest] = TypeAdapter(StreamMessageRequest) | ||
| stream_message_response_ta: TypeAdapter[StreamMessageResponse] = TypeAdapter(StreamMessageResponse) | ||
|
|
||
| # Type for streaming events (used by broker and task manager) | ||
| StreamEvent = Union[Task, Message, TaskStatusUpdateEvent, TaskArtifactUpdateEvent] | ||
| """A streaming event that can be sent during message/stream requests.""" | ||
|
|
||
| stream_event_ta: TypeAdapter[StreamEvent] = TypeAdapter(StreamEvent) | ||
|
Comment on lines
+997
to
+1000
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🚩 StreamEvent type alias may not be useful without discriminator The new Was this helpful? React with 👍 or 👎 to provide feedback. |
||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🚩 InMemoryStorage.load_task mutates the stored task's history in place Pre-existing issue: Was this helpful? React with 👍 or 👎 to provide feedback. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -10,11 +10,12 @@ | |
| from opentelemetry.trace import get_tracer, use_span | ||
| from typing_extensions import assert_never | ||
|
|
||
| from .schema import StreamResponse, TaskArtifactUpdateEvent, TaskStatusUpdateEvent | ||
| from .storage import ContextT, Storage | ||
|
|
||
| if TYPE_CHECKING: | ||
| from .broker import Broker, TaskOperation | ||
| from .schema import Artifact, Message, TaskIdParams, TaskSendParams | ||
| from .schema import Artifact, Message, TaskIdParams, TaskSendParams, TaskState | ||
|
|
||
| tracer = get_tracer(__name__) | ||
|
|
||
|
|
@@ -56,7 +57,7 @@ async def _handle_task_operation(self, task_operation: TaskOperation) -> None: | |
| except Exception: | ||
| task_id = task_operation['params']['id'] | ||
| task = await self.storage.update_task(task_id, state='failed') | ||
| from .schema import StreamResponse, TaskStatus, TaskStatusUpdateEvent | ||
| from .schema import TaskStatus | ||
|
|
||
| await self.broker.event_bus.emit( | ||
| task_id, | ||
|
|
@@ -70,6 +71,68 @@ async def _handle_task_operation(self, task_operation: TaskOperation) -> None: | |
| ) | ||
| await self.broker.event_bus.close(task_id) | ||
|
|
||
| async def update_task( | ||
| self, | ||
| task_id: str, | ||
| state: TaskState, | ||
| new_artifacts: list[Artifact] | None = None, | ||
| new_messages: list[Message] | None = None, | ||
| ) -> None: | ||
| """Update a task's state in storage and publish streaming events to the broker. | ||
|
|
||
| This is the primary method workers should use to update task state. It handles | ||
| both persisting the update and notifying any stream subscribers. | ||
| """ | ||
| task = await self.storage.update_task(task_id, state, new_artifacts, new_messages) | ||
|
|
||
| final = state in ('completed', 'failed', 'canceled') | ||
|
|
||
| # For non-final updates, publish status first | ||
| if not final: | ||
| await self.broker.event_bus.emit( | ||
| task_id, | ||
| StreamResponse( | ||
| status_update=TaskStatusUpdateEvent( | ||
| task_id=task_id, | ||
| context_id=task['context_id'], | ||
| status=task['status'], | ||
| ), | ||
| ), | ||
| ) | ||
|
|
||
| # Publish message events before final status so subscribers receive them | ||
| if new_messages: | ||
| for message in new_messages: | ||
| await self.broker.event_bus.emit(task_id, StreamResponse(message=message)) | ||
|
|
||
| # Publish artifact events | ||
| if new_artifacts: | ||
| for artifact in new_artifacts: | ||
| await self.broker.event_bus.emit( | ||
| task_id, | ||
| StreamResponse( | ||
| artifact_update=TaskArtifactUpdateEvent( | ||
| task_id=task_id, | ||
| context_id=task['context_id'], | ||
| artifact=artifact, | ||
| ), | ||
| ), | ||
| ) | ||
|
|
||
| # For final updates, publish status last (after messages and artifacts) | ||
| if final: | ||
| await self.broker.event_bus.emit( | ||
| task_id, | ||
| StreamResponse( | ||
| status_update=TaskStatusUpdateEvent( | ||
| task_id=task_id, | ||
| context_id=task['context_id'], | ||
| status=task['status'], | ||
| ), | ||
| ), | ||
| ) | ||
| await self.broker.event_bus.close(task_id) | ||
|
Comment on lines
+74
to
+134
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🚩 Test still uses direct storage/event_bus calls instead of new Worker.update_task helper The test in Was this helpful? React with 👍 or 👎 to provide feedback. |
||
|
|
||
| @abstractmethod | ||
| async def run_task(self, params: TaskSendParams) -> None: ... | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.