Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
304 changes: 304 additions & 0 deletions docs/tutorials/agents.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,304 @@
# %% [markdown]
# # Configuring an agent in ravnar
#
# This tutorial explains how to configure an agent in ravnar. You will see two approaches:
#
# 1. **Full control** — subclassing the [`Agent`][ravnar.agents.Agent] ABC directly.
# 2. **Using a wrapper** — adapting an existing [pydantic-ai](https://ai.pydantic.dev/) agent via
# [`PydanticAiAgentWrapper`][ravnar.agents.PydanticAiAgentWrapper], and giving it the tools of an
# [MCP](https://modelcontextprotocol.io/) server.
#
# pydantic-ai is used throughout as the example, but ravnar is not tied to it: under the hood ravnar is an AG-UI
# server, so any agent that emits AG-UI events is a first-class citizen. Other built-in options are summarised at the
# end.
#
# A special `Client` is used for the documentation. For real-world scenarios, it can be substituted with a regular HTTP
# client with the base URL set to the URL of your ravnar deployment.

# %%
import json
import uuid
from collections.abc import AsyncIterator

from _ravnar.docs import Client


def print_json(obj):
print(json.dumps(obj, indent=2, sort_keys=False))


def run_agent(client, agent_id: str, message: str) -> None:
"""Send a message to an agent and display the response in a human-readable format."""
import httpx_sse

body = {
"thread_id": str(uuid.uuid4()),
"run_id": str(uuid.uuid4()),
"state": None,
"tools": [],
"context": [],
"forwardedProps": None,
"messages": [
{
"role": "user",
"content": [{"type": "text", "text": message}],
"id": str(uuid.uuid4()),
}
],
}

with httpx_sse.connect_sse(
client, "POST", f"/api/agents/{agent_id}/run", json=body
) as event_source:
text = ""
for sse in event_source.iter_sse():
event = json.loads(sse.data)
match event["type"]:
case "TEXT_MESSAGE_CONTENT":
text += event["delta"]
case "TOOL_CALL_START":
print(f" 🛠 Calling tool: {event['toolCallName']}")
case "TOOL_CALL_RESULT":
print(f" ✅ {event['content']}")
case "RUN_ERROR":
print(f" ❌ Error: {event.get('error', 'Unknown error')}")

if text:
print(f"\n{text}")


# %% [markdown]
# ## Full control with the Agent ABC

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This section needs to explain more that ravnar agents are based on AG-UI

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Expanded it — the section now leads with ravnar being an AG-UI server: the Agent ABC's run() produces a stream of AG-UI events (RUN_STARTED, TEXT_MESSAGE_CONTENT, TOOL_CALL_START, RUN_FINISHED, …) that ravnar streams to the client over SSE, and the example's event sequence is called out as the canonical AG-UI text response. Also added a summary bullet making the same point ("an agent is anything that produces a stream of AG-UI events").

#
# ravnar is, at its core, an [AG-UI](https://ag-ui.com) server: every agent talks to clients by emitting a stream of
# AG-UI events — `RUN_STARTED`, `TEXT_MESSAGE_CONTENT`, `TOOL_CALL_START`, `RUN_FINISHED`, and so on. ravnar itself
# doesn't decide what those events are; it streams whatever the agent produces to the client over Server-Sent Events.
# Every integration in this tutorial is ultimately just a different way of producing that event stream.
#
# The [`Agent`][ravnar.agents.Agent] abstract base class is the most direct way to produce it, giving you complete
# control over the agent's behaviour. You implement a single method, `run()`, which receives the incoming
# `RunAgentInput` (the AG-UI request — messages, thread and run IDs, and any client-supplied tools and state) and the
# authenticated `User`, and yields the `ag_ui.core.Event`s that make up the response.
#
# Let's build a simple agent that greets the current user. The sequence below — start the run, open a text message,
# stream its content word by word, end the message, finish the run — is the canonical shape of an AG-UI text response.

# %%
import ag_ui.core

from ravnar.agents import Agent
from ravnar.authenticators import User


class WhoAmIAgent(Agent):
"""A simple agent that greets the current user."""

async def run(
self, input: ag_ui.core.RunAgentInput, user: User
) -> AsyncIterator[ag_ui.core.Event]:
message_id = str(uuid.uuid4())

yield ag_ui.core.RunStartedEvent(
thread_id=input.thread_id,
run_id=input.run_id,
parent_run_id=input.parent_run_id,
)
yield ag_ui.core.TextMessageStartEvent(message_id=message_id)

text = f"Hello, {user.id}!"
for word in text.split():
yield ag_ui.core.TextMessageContentEvent(
message_id=message_id, delta=word + " "
)

yield ag_ui.core.TextMessageEndEvent(message_id=message_id)
yield ag_ui.core.RunFinishedEvent(
thread_id=input.thread_id, run_id=input.run_id
)


# %% [markdown]
# The `User` object carries the authenticated user's identity along with any additional data and permissions.
# When no authenticator is configured, the user defaults to the current system user, and all permissions are granted.
#
# Now we register it as a static agent through the ravnar configuration. Static agents are declared upfront and are
# available for the entire lifetime of the server.

# %%
config = {
"agents": {
"static": {
"whoami": WhoAmIAgent,
}
}
}
client = Client(config)

# %% [markdown]
# Let's verify that our agent is registered and inspect its capabilities.

# %%
agents = client.get("/api/agents").raise_for_status().json()
print_json(agents)

# %% [markdown]
# The capabilities are derived from the default [`get_capabilities()`][ravnar.agents.Agent.get_capabilities] method of
# the base class which, among others, reports that the agent supports streaming. No tools are declared — this is a
# purely conversational agent.
#
# Time to send it a message.

# %%
run_agent(client, "whoami", "Hello!")

# %% [markdown]
# The `run_agent()` helper parsed the SSE event stream and printed only the text content. The agent reads the user
# ID from the `User` object and includes it in the greeting.
#
# Subclassing [`Agent`][ravnar.agents.Agent] is the most flexible approach — you have full control over the event
# stream and can integrate virtually any protocol or library. However, it also means you are responsible for producing
# the right events at the right time.

# %% [markdown]
# ## Using the Pydantic AI wrapper
#
# If you already use [pydantic-ai](https://ai.pydantic.dev/), you do not need to implement the
# [`Agent`][ravnar.agents.Agent] interface yourself. ravnar ships with
# [`PydanticAiAgentWrapper`][ravnar.agents.PydanticAiAgentWrapper], which adapts any `pydantic_ai.Agent` into a ravnar
# agent — handling AG-UI event generation, tool call streaming, and capability detection automatically.
#
# We'll give a single agent two kinds of tool: an in-process Python function, and the tools of an
# [MCP](https://modelcontextprotocol.io/) server.
#
# First, the in-process tool. `whoami` is a regular async function that takes `RunContext[User]` as its first
# parameter — ravnar injects the authenticated `User` as the dependency when the agent runs.

# %%
from pydantic_ai import RunContext


async def whoami(ctx: RunContext[User]) -> str:
"""Get the current user's identity."""
return ctx.deps.id


# %% [markdown]
# Second, an MCP server. pydantic-ai connects to one with [`MCPToolset`](https://ai.pydantic.dev/mcp/client/), and
# because the wrapper introspects the agent's `toolsets`, the server's tools are discovered and called exactly like
# `whoami` — no extra wiring on the ravnar side.
#
# One difference matters: an MCP server runs as a *separate process* (or a remote service), so its tools don't
# automatically receive ravnar's injected `User` the way an in-process tool does — though you can forward it
# explicitly if needed. Reach for an in-process tool when a tool needs the caller's identity, and an MCP server when
# the capability is self-contained.
#
# Let's write a minimal stdio MCP server that exposes a single `greet` tool. In a real project this would be a
# separate service; here we write it to a temporary file so the tutorial stays self-contained.

# %%
import pathlib
import tempfile

mcp_server = pathlib.Path(tempfile.mkdtemp()) / "greeter.py"
mcp_server.write_text(
'''
from mcp.server.fastmcp import FastMCP

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I'm missing something here, but why not use the fastmcp package directly?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. They're the same lineage: FastMCP 1.0 was folded into the official mcp SDK (that's mcp.server.fastmcp), then Jeremiah Lowin took 2.0+ back out standalone — now fastmcp 3.x, maintained by Prefect (3.0 GA'd Feb 2026). The standalone is far more featureful and actively developed; the bundled copy is effectively frozen at the 1.0 server-building API.

I kept mcp.server.fastmcp here for two reasons: the snippet is identical either way (FastMCP(...), @mcp.tool(), mcp.run()), and the mcp SDK is already a docs dependency via pydantic-ai's [mcp] extra (needed for MCPToolset), so the server example costs zero extra dependencies — whereas standalone fastmcp would add one just for the tutorial. The snippet's job is to give ravnar a server to consume, not to teach server authoring.

Happy to switch to standalone fastmcp if you'd rather point readers at the maintained package — it'd just mean adding fastmcp (or fastmcp-slim[server]) to the docs group.


mcp = FastMCP("greeter")


@mcp.tool()
def greet() -> str:
"""Return a friendly greeting from the MCP server."""
return "Hello from the MCP server!"


if __name__ == "__main__":
mcp.run()
'''
)

# %% [markdown]
# Now we register the agent purely through configuration — no Python instantiation needed. ravnar's
# [configuration import mechanism](../../references/config/#plugins) resolves nested `cls_or_fn`/`params` definitions
# recursively, so we declare the whole agent tree — model, wrapper, the in-process `tools`, and the MCP `toolsets` —
# as a single config block. `MCPToolset` takes the path to a script and launches it as a stdio server (it also accepts
# a URL for a remote HTTP/SSE server).

# %%
config = {
"agents": {
"static": {
"assistant": {
"cls_or_fn": "ravnar.agents.PydanticAiAgentWrapper",
"params": {
"agent": {
"cls_or_fn": "pydantic_ai.Agent",
"params": {
"model": {
"cls_or_fn": "pydantic_ai.models.test.TestModel",
"params": {"call_tools": "all"},
},
"deps_type": User,
"tools": [whoami],
"toolsets": [
{
"cls_or_fn": "pydantic_ai.mcp.MCPToolset",
"params": {"client": str(mcp_server)},
}
],
},
},
},
},
},
},
}
client = Client(config)

# %% [markdown]
# The wrapper builds the capability object dynamically by introspecting the underlying pydantic-ai agent (via
# `extract_capabilities()`), so both tools show up: the in-process `whoami`, and `greet`, whose definition — name,
# description, and (empty) parameter schema — was discovered from the MCP server.

# %%
agents = client.get("/api/agents").raise_for_status().json()
print_json(agents)

# %% [markdown]
# Let's run it. `TestModel` is pydantic-ai's stand-in for a real model: it exercises the full agent plumbing without
# calling an LLM, and with `call_tools="all"` it invokes every available tool once — both the in-process `whoami` and
# the MCP `greet`.

# %%
run_agent(client, "assistant", "Hello!")

# %% [markdown]
# Both tools take no arguments, so `TestModel` calls them exactly as a real model would and the results are real:
# `whoami` returns your system username (no authenticator is configured) and `greet` returns the MCP server's
# greeting. The point is the plumbing: ravnar exposed an in-process tool and an MCP server's tool through the *same*
# wrapper, advertised both, and routed the calls — all from configuration. In production you would swap `TestModel`
# for a real model (e.g. `openai`, `anthropic`, `openrouter`), which would decide when to call each tool based on the
# conversation; everything else stays the same.
#
# To use a server you do not run yourself (the common case), pass its URL instead of a script path, e.g.
# `{"cls_or_fn": "pydantic_ai.mcp.MCPToolset", "params": {"client": "https://example.com/mcp"}}`.

# %% [markdown]
# ## Summary
#
# - ravnar is an AG-UI server: an agent is anything that produces a stream of AG-UI events, however you choose to
# build it.
# - Subclass [`Agent`][ravnar.agents.Agent] directly when you need full control over the event stream or want to
# integrate a custom protocol.
# - Use [`PydanticAiAgentWrapper`][ravnar.agents.PydanticAiAgentWrapper] when you already have a pydantic-ai agent —
# ravnar plugs it in automatically.
# - ravnar injects the `User` object into the agent's `run()` method. For pydantic-ai
# agents, it is available as `deps` in tools via `RunContext.deps`.
# - Add [`MCPToolset`](https://ai.pydantic.dev/mcp/client/) to a pydantic-ai agent's `toolsets` to expose the tools of
# any MCP server; the wrapper discovers and streams them automatically.
# - Not using pydantic-ai? ravnar also ships [`AgnoAgentWrapper`][ravnar.agents.AgnoAgentWrapper] for
# [Agno](https://docs.agno.com/) agents and [`SSEAgent`][ravnar.agents.SSEAgent] to connect any agent that already
# speaks AG-UI over HTTP — and you can always implement the [`Agent`][ravnar.agents.Agent] ABC directly. See the
# Python API reference for the full list of built-in agents.
# - All agents are registered through the same configuration mechanism, whether they are custom subclasses or wrappers.
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ nav:
- Deployment: deploy.md
- Tutorials:
- Authentication: tutorials/authentication.py
- Configuring an agent: tutorials/agents.py
- References:
- Configuration: references/config.md
- Python API: references/python_api.md
Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ docs = [
"mkdocs-macros-plugin>=1.3.9",
"mkdocs-material>=9.6.20",
"mkdocstrings[python]>=0.30.0",
# the agents tutorial builds a small MCP server; the mcp extra is only needed to render the docs
"pydantic-ai-slim[mcp]>=1.87,<2",
"pygments",
]
dev = [
Expand Down
3 changes: 2 additions & 1 deletion src/_ravnar/agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import abc
import dataclasses
import re
import textwrap
import uuid
from collections.abc import AsyncIterator
Expand Down Expand Up @@ -49,7 +50,7 @@ async def run(self, input: ag_ui.core.RunAgentInput, user: User) -> AsyncIterato
thread_id=input.thread_id, run_id=input.run_id, parent_run_id=input.parent_run_id
)
yield ag_ui.core.TextMessageStartEvent(message_id=message_id)
for delta in textwrap.dedent(message.strip()).split():
for delta in re.findall(r"\s*\S+", textwrap.dedent(message.strip())):
yield ag_ui.core.TextMessageContentEvent(message_id=message_id, delta=delta)
yield ag_ui.core.TextMessageEndEvent(message_id=message_id)
yield ag_ui.core.RunFinishedEvent(thread_id=input.thread_id, run_id=input.run_id)
Expand Down
15 changes: 9 additions & 6 deletions src/_ravnar/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,21 @@ def __init__(self, config: DatabaseConfig) -> None:
async def setup(self) -> None: # type: ignore[override]
session_factory_params = SessionFactoryParams(expire_on_commit=False)

# ``SQLAlchemyInstrumentor`` is a global singleton: once it is instrumented, further ``instrument`` calls are a
# no-op that only logs an "already instrumented" warning. That happens whenever several engines are built in one
# process -- e.g. the executed docs build a fresh app per example -- so guard the call to keep the logs clean.
instrumentor = SQLAlchemyInstrumentor()

if isinstance(self._engine, Engine):
SQLAlchemyInstrumentor().instrument(
engine=self._engine,
)
if not instrumentor.is_instrumented_by_opentelemetry:
instrumentor.instrument(engine=self._engine)

orm.Base.metadata.create_all(bind=self._engine)
self._session_factory = sessionmaker(bind=self._engine, **session_factory_params)

else:
SQLAlchemyInstrumentor().instrument(
engine=self._engine.sync_engine,
)
if not instrumentor.is_instrumented_by_opentelemetry:
instrumentor.instrument(engine=self._engine.sync_engine)

async with self._engine.begin() as conn:
await conn.run_sync(orm.Base.metadata.create_all)
Expand Down
Loading
Loading