A public reference example showing how to wire AG2.beta (the autogen.beta API) with the Dishka DI container via the dishka-ag2 integration package, streaming agent output over the AG-UI protocol from a production-shaped FastAPI backend.
autogen.beta has its own DI system (fast-depends), and Dishka has its own scopes (APP / REQUEST). Naively bridging them leads to hacks like "pass a dependencies dict by string keys". The community package dishka-ag2 solves it properly with a custom scope (AG2Scope) and a middleware (DishkaAsyncMiddleware) that opens the right scope on every agent turn / tool call.
The result: agent tools look like regular Dishka handlers — @tool @inject async def list_notes(uc: FromDishka[ListNotesUseCase], ...). No string keys, no manual wiring in the HTTP endpoint, no dependencies= dict passed to AGUIStream.dispatch.
FastAPIwith clean layered architecture (domain → models → gateways → usecases → api → main)Dishkacontainer withAG2Scope— a single container serves both HTTP handlers and agent toolsautogen.beta.Agentconfigured as an app-level singleton, withDishkaAsyncMiddlewareattached- AG-UI SSE chat endpoint via
autogen.beta.ag_ui.AGUIStream - PostgreSQL + SQLAlchemy 2.0 via async
psycopg3 - Alembic migrations
import-lintercontractspytestunit + integration testsuvdeps, multi-stage Dockerfile, docker-compose with Postgres, GitHub Actions CI
cp .env.example .env # set AG2EX__OPENAI_API_KEY
docker compose up --buildSmoke-test:
curl http://127.0.0.1:8000/api/health # {"status":"healthy"}
curl -X POST http://127.0.0.1:8000/api/notes \
-H 'Content-Type: application/json' \
-d '{"title":"hello","body":"world"}'
curl -N -X POST http://127.0.0.1:8000/api/chat \
-H 'Content-Type: application/json' \
-H 'Accept: text/event-stream' \
-d '{
"threadId":"t1","runId":"r1",
"messages":[{"id":"m1","role":"user","content":"Create a note titled hello with body world, then list all notes"}],
"state":{},"context":[],"tools":[],"forwardedProps":{}
}'Expect an SSE stream: RUN_STARTED → TOOL_CALL_START(create_note) → TOOL_CALL_RESULT → TOOL_CALL_START(list_notes) → TOOL_CALL_RESULT → TEXT_MESSAGE_CHUNK* → RUN_FINISHED.
- One container, one scope family. The container uses
AG2Scope(fromdishka-ag2).AG2Scope.APPholds app-level singletons;AG2Scope.REQUESTis opened both on every HTTP request and on every agent tool call. - Agent is a singleton built once in
main/entrypoint.py.DishkaAsyncMiddlewareis attached to it and carries the container reference:agent = Agent( ..., tools=[calculator, weather, notes_toolkit()], middleware=[Middleware(DishkaAsyncMiddleware, container=container)], )
- A tiny ASGI middleware (
main/middleware.py) opensAG2Scope.REQUESTon every HTTP request and attaches the request-scoped container torequest.state.dishka_container— so the standarddishka.integrations.fastapi@injectkeeps working for REST endpoints. - Tools are just
@tool @injectfunctions — seeapi/agent/tools/notes.py:@tool @inject async def list_notes( uc: FromDishka[ListNotesUseCase], limit: int = 20, ) -> list[dict[str, str]]: response = await uc.execute(ListNotesRequest(limit=limit)) return [...]
DishkaAsyncMiddlewareopens anAG2Scope.REQUESTchild container before the tool runs, anddishka-ag2's@injectresolvesFromDishka[T]out of it. - The chat endpoint (
api/routes/chat.py) has zero DI plumbing — it just hands the agent toAGUIStreamand streams:@router.post("") async def run_agent(run_input, request, accept=Header(None)) -> StreamingResponse: agent = request.app.state.agent return StreamingResponse( AGUIStream(agent).dispatch(run_input, accept=accept), media_type=accept or "text/event-stream", )
- Add a use case in
src/ag2_example/usecases/…. - Register it in
main/providers/usecases.py(scope=AG2Scope.REQUEST). - Add a tool function:
@tool @inject async def my_tool(uc: FromDishka[MyUseCase], arg: str) -> str: return await uc.execute(...)
- Include it in
api/agent/agent_factory.py(tools=[...]).
That's it — no endpoint changes.
src/ag2_example/
├── config.py / logging_config.py
├── alembic/ # Alembic migrations + env
├── domain/entities/ # frozen dataclasses (pure)
├── models/ # SQLAlchemy imperative mappers + registry
├── gateways/db/note/ # NoteRepository (Protocol) + AlchemyNoteRepository
├── usecases/ # Request/Response pattern + UnitOfWork
├── api/
│ ├── middlewares/request_id.py
│ ├── schemas/note.py
│ ├── routes/{health,notes,chat}.py
│ └── agent/
│ ├── agent_factory.py # build_agent(config, container) → Agent
│ ├── prompts.py
│ └── tools/
│ ├── utility.py # calculator + weather (no DI)
│ └── notes.py # CRUD tools via FromDishka[UseCase]
└── main/
├── entrypoint.py # create_app(): builds container + agent, wires middleware
├── di.py # create_container, default_providers()
├── middleware.py # AG2ContainerMiddleware — opens AG2Scope.REQUEST per HTTP request
└── providers/{settings,database,repositories,usecases,agent}.py
docker compose up -d db
uv run pytest # unit + integration
uv run pytest tests/unit # unit-only
uv run pytest tests/integration # REST CRUD + SSE smoketests/integration/test_chat_sse.py::test_chat_sse_streams_run_started_and_finished is skipped unless AG2EX__OPENAI_API_KEY or OPENAI_API_KEY is set.
uv sync
docker compose up -d db
uv run alembic upgrade head
uv run uvicorn ag2_example.main.entrypoint:app --reloaduv run lint-imports enforces:
- Layer direction —
main → api → usecases → gateways → models → domain - Agent framework isolation —
autogenimports never leak belowapi.agent/main.providers.agent - FastAPI isolation —
fastapi/starlettestay out ofdomain/gateways/usecases
Auth, OAuth, Redis, Kafka, S3, email, websockets, observability dashboards, rate limiting. The goal is the shortest possible path to AG2.beta + Dishka + AG-UI done right.
Apache 2.0