Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions livekit-agents/livekit/agents/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def _dispatch(server: AgentServer, args: argparse.Namespace) -> None:
reload_addr=args.reload_addr,
log_format=args.log_format,
dev=args.dev,
simulation=args.simulation,
),
)

Expand Down Expand Up @@ -111,6 +112,9 @@ def main(argv: list[str] | None = None) -> int:
start_p.add_argument("--api-secret")
start_p.add_argument("--dev", action="store_true", default=False)
start_p.add_argument("--reload-addr")
# set by `lk simulate`: disables the worker load limit so simulation runs
# can saturate the agent
start_p.add_argument("--simulation", action="store_true", default=False)

console_p = sub.add_parser("console")
console_p.add_argument("entrypoint", nargs="?")
Expand Down
9 changes: 9 additions & 0 deletions livekit-agents/livekit/agents/cli/_legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -1709,6 +1709,14 @@ def start(
help="Time in seconds to wait for jobs to finish before shutting down.",
),
] = None,
simulation: Annotated[
bool,
typer.Option(
hidden=True,
help="Run under an agent simulation: the worker load limit is disabled "
"so runs can saturate the agent. Set by `lk simulate`.",
),
] = False,
) -> None:
if drain_timeout is not None:
server.update_options(drain_timeout=drain_timeout)
Expand All @@ -1720,6 +1728,7 @@ def start(
url=url,
api_key=api_key,
api_secret=api_secret,
simulation=simulation,
),
)

Expand Down
3 changes: 3 additions & 0 deletions livekit-agents/livekit/agents/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,9 @@ def _run_worker(server: AgentServer, args: proto.CliArgs) -> None:
if kwargs:
server.update_options(**kwargs)

if args.simulation:
server._simulation = True

if args.reload_addr and not args.dev:
raise ValueError("--reload-addr requires --dev")

Expand Down
3 changes: 3 additions & 0 deletions livekit-agents/livekit/agents/cli/proto.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ class CliArgs:
reload_addr: str | None = None
log_format: str = "json"
dev: bool = False
# set by `lk simulate` when launching the agent under test; disables the
# worker load limit so simulation runs can saturate the agent
simulation: bool = False


def running_job_to_proto(info: RunningJobInfo) -> agent_dev.RunningAgentJobInfo:
Expand Down
5 changes: 4 additions & 1 deletion livekit-agents/livekit/agents/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,10 @@ def simulation_context(self) -> SimulationContext | None:
from .simulation import SimulationContext

try:
dispatch = json_format.Parse(metadata, sim_pb.SimulationDispatch())
# ignore unknown fields so dispatches from newer servers still parse
dispatch = json_format.Parse(
metadata, sim_pb.SimulationDispatch(), ignore_unknown_fields=True
)
except json_format.ParseError:
return None

Expand Down
15 changes: 13 additions & 2 deletions livekit-agents/livekit/agents/simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
ScenarioGroup = proto.ScenarioGroup
SimulationRun = proto.SimulationRun
SimulationDispatch = proto.SimulationDispatch
SimulationMode = proto.SimulationMode

# Decoded form of a Scenario's `userdata` (arbitrary JSON). On the wire it is a
# JSON-encoded string; in a scenarios.yaml it is written as a nested mapping.
Expand All @@ -24,6 +25,7 @@
"ScenarioGroup",
"SimulationRun",
"SimulationDispatch",
"SimulationMode",

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 SimulationMode not re-exported from init.py, breaking the established pattern

SimulationMode is added to simulation.py:__all__ (line 28) but is NOT added to livekit/agents/__init__.py's import list at livekit-agents/livekit/agents/__init__.py:61-69. Every other symbol in simulation.__all__ (Scenario, ScenarioGroup, SimulationRun, SimulationDispatch, ScenarioUserdata, SimulationVerdict, SimulationContext) is re-exported from the top-level __init__.py. Users who follow the established import pattern (from livekit.agents import SimulationMode) will get an ImportError.

Prompt for agents
SimulationMode is added to simulation.__all__ but not re-exported from livekit/agents/__init__.py. All other symbols from simulation.__all__ are listed in the import block at __init__.py lines 61-69. Add SimulationMode to that import list to maintain the existing pattern. The fix is in livekit-agents/livekit/agents/__init__.py, in the `from .simulation import (...)` block.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

"ScenarioUserdata",
"SimulationVerdict",
"SimulationContext",
Expand Down Expand Up @@ -66,11 +68,20 @@ def scenario(self) -> proto.Scenario:
return self._scenario

@property
def run(self) -> proto.SimulationRun | None:
def simulation_mode(self) -> int:
"""How the simulated user interacts with the agent (text chat or audio).
Unspecified is treated as text, since simulations predating the field
were all text-only."""
if self._dispatch.mode == proto.SimulationMode.SIMULATION_MODE_UNSPECIFIED:
return proto.SimulationMode.SIMULATION_MODE_TEXT
return self._dispatch.mode

@property
def simulation_run(self) -> proto.SimulationRun | None:
return self._run

@property
def job(self) -> proto.SimulationRun.Job | None:
def simulation_job(self) -> proto.SimulationRun.Job | None:
return self._job

@property
Expand Down
11 changes: 11 additions & 0 deletions livekit-agents/livekit/agents/voice/agent_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -3893,8 +3893,15 @@ def _init_metrics_from_end_of_turn(self, info: _EndOfTurnInfo) -> llm.MetricsRep
return metrics_report

# move them to the end to avoid shadowing the same named modules for mypy
@property
def _text_only(self) -> bool:
# text simulations run without audio: no STT/TTS/VAD
return self._session._text_only

@property
def vad(self) -> vad.VAD | None:
if self._text_only:
return None
return self._agent.vad if is_given(self._agent.vad) else self._session.vad

def _resolve_interruption_detection(self) -> inference.AdaptiveInterruptionDetector | None:
Expand Down Expand Up @@ -3951,6 +3958,8 @@ def _resolve_interruption_detection(self) -> inference.AdaptiveInterruptionDetec

@property
def stt(self) -> stt.STT | None:
if self._text_only:
return None
return self._agent.stt if is_given(self._agent.stt) else self._session.stt

@property
Expand All @@ -3959,4 +3968,6 @@ def llm(self) -> llm.LLM | llm.RealtimeModel | None:

@property
def tts(self) -> tts.TTS | None:
if self._text_only:
return None
return self._agent.tts if is_given(self._agent.tts) else self._session.tts
24 changes: 24 additions & 0 deletions livekit-agents/livekit/agents/voice/agent_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,12 @@ async def start(

job_ctx.init_recording(self._recording_options)

# Under a text simulation the simulated user interacts over text
# streams only: disable audio I/O here, and STT/TTS/VAD via
# AgentActivity (both consult _text_only).
if self._text_only:
logger.info("text simulation: disabling STT/TTS/VAD and audio I/O")

self._session_span = current_span = tracer.start_span("agent_session")
# we detach here to avoid context issues since tokens need to be detached
# in the same context as it was created
Expand Down Expand Up @@ -738,6 +744,10 @@ async def start(
)
room_options = copy.copy(room_options) # shadow copy is enough

if self._text_only:
room_options.audio_input = False
room_options.audio_output = False

if self.input.audio is not None:
if room_options.audio_input:
logger.warning(
Expand Down Expand Up @@ -1722,6 +1732,20 @@ def _config_update_added(self, item: llm.AgentConfigUpdate) -> None:
self._chat_ctx.insert(item)

# move them to the end to avoid shadowing the same named modules for mypy
@property
def _text_only(self) -> bool:
"""True when running under a text simulation: the session uses no audio
I/O and no audio models (STT/TTS/VAD)."""
from ..job import get_job_context

job_ctx = get_job_context(required=False)
if job_ctx is None or (sim_ctx := job_ctx.simulation_context()) is None:
return False

from ..simulation import SimulationMode

return sim_ctx.simulation_mode == SimulationMode.SIMULATION_MODE_TEXT

@property
def stt(self) -> stt.STT | None:
return self._stt
Expand Down
13 changes: 11 additions & 2 deletions livekit-agents/livekit/agents/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,9 @@ def __init__(

self._http_proxy = http_proxy
self._log_level = _validate_and_normalize_log_level(log_level)
# Set by the CLI (--simulation) when the worker runs under an agent
# simulation: load shedding is disabled so runs can saturate the agent.
self._simulation = False
self._agent_name = ""
self._server_type = ServerType.ROOM
self._id = "unregistered"
Expand Down Expand Up @@ -584,6 +587,9 @@ async def run(self, *, devmode: bool = False, unregistered: bool = False) -> Non
)
self._load_threshold = _default_load_threshold

if self._simulation:
logger.info("simulation mode enabled: worker load limit disabled")

self._loop = asyncio.get_event_loop()
self._devmode = devmode
self._job_lifecycle_tasks = set[asyncio.Task[Any]]()
Expand Down Expand Up @@ -1017,7 +1023,7 @@ async def aclose(self) -> None:
await self._prometheus_server.aclose()

if self._api is not None:
await self._api.aclose() # type: ignore[no-untyped-call]
await self._api.aclose() # type: ignore[no-untyped-call, unused-ignore]

# await asyncio.sleep(0.25) # see https://github.com/aio-libs/aiohttp/issues/1925
self._msg_chan.close()
Expand Down Expand Up @@ -1296,6 +1302,9 @@ def _is_available(self) -> bool:
if self._draining:
return False

if self._simulation:
return True

load_threshold = ServerEnvOption.getvalue(self._load_threshold, self._devmode)
if math.isinf(load_threshold):
return True
Expand Down Expand Up @@ -1455,7 +1464,7 @@ async def _update_worker_status(self) -> None:

load_threshold = ServerEnvOption.getvalue(self._load_threshold, self._devmode)
effective_load = self._get_effective_load()
is_full = effective_load >= load_threshold
is_full = not self._simulation and effective_load >= load_threshold
currently_available = not is_full and not self._draining

status = (
Expand Down
2 changes: 1 addition & 1 deletion livekit-agents/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ dependencies = [
"certifi>=2025.6.15",
"livekit==1.1.8",
"livekit-api>=1.0.7,<2",
"livekit-protocol>=1.1.14,<2",
"livekit-protocol>=1.1.15,<2",
"livekit-blingfire~=1.1,<2",
"protobuf>=3",
"pyjwt>=2.0",
Expand Down
8 changes: 4 additions & 4 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading