Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
67c44d9
batcher: emit queue/poll lifecycle events for live UI
vienneraphael Mar 3, 2026
214e4f6
ui: add rich live batching display renderer
vienneraphael Mar 3, 2026
85c5576
batchify/context: add live_display tri-state API and context wiring
vienneraphael Mar 3, 2026
314c7c2
cli/tests/docs: expose live_display flag and document behavior
vienneraphael Mar 3, 2026
6ebd209
ui: render live display as sent-batches table
vienneraphael Mar 3, 2026
bb1d826
ui: show context progress from completed batch sizes
vienneraphael Mar 3, 2026
d276533
ui: render context metrics as rich progress bar
vienneraphael Mar 3, 2026
f8f14c0
ui: left-align progress bar and inline metrics
vienneraphael Mar 3, 2026
b499448
ui: add elapsed timer next to progress metrics
vienneraphael Mar 3, 2026
6d7ca06
ui: lower live display refresh to 1hz
vienneraphael Mar 3, 2026
9086508
ui: heartbeat refresh every second without poll countdown
vienneraphael Mar 3, 2026
4e05899
ui: colorize live progress metrics
vienneraphael Mar 3, 2026
ea1b16b
ui: add styled request counters under progress bar
vienneraphael Mar 3, 2026
6cd27cc
ui: add pending batches table with dataframe-style truncation
vienneraphael Mar 3, 2026
c20d059
ui: change color to green for completed samples
vienneraphael Mar 3, 2026
a971bd0
live display: switch to bool auto mode with polling-log fallback
vienneraphael Mar 3, 2026
5a1bbb2
examples: add provider race streaming completion example
vienneraphael Mar 3, 2026
0c86971
ui: stabilize queue progress formatting and remove stale rich display…
vienneraphael Mar 3, 2026
9c275c4
examples: racing
vienneraphael Mar 3, 2026
f18a1ec
fix: refactor shared code
vienneraphael Mar 3, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions docs/architecture/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ yields `None`. Import it from `batchling`.
- Install HTTP hooks once (idempotent).
- Construct a `Batcher` with configuration such as `batch_size`,
`batch_window_seconds`, `batch_poll_interval_seconds`, `dry_run`,
and `cache`.
`cache`, and `live_display`.
- Configure `batchling` logging defaults with Python's stdlib `logging`
(`WARNING` by default).
- Return a `BatchingContext` to scope batching to a context manager.
Expand All @@ -26,6 +26,12 @@ yields `None`. Import it from `batchling`.
- **`cache` behavior**: when `cache=True` (default), intercepted requests are fingerprinted
and looked up in a persistent request cache. Cache hits bypass queueing and resume polling
from an existing provider batch when not in dry-run mode.
- **`live_display` behavior**: `live_display` is a boolean.
When `True` (default), Rich panel rendering runs in auto mode and is enabled
only when `stderr` is a TTY, terminal is not `dumb`, and `CI` is not set.
If auto mode disables Rich, context-level progress is logged at `INFO` on
polling events.
When `False`, live display and fallback progress logs are both disabled.
- **Outputs**: `BatchingContext[None]` instance that yields `None`.
- **Logging**: lifecycle milestones are emitted at `INFO`, problems at
`WARNING`/`ERROR`, and high-volume diagnostics at `DEBUG`. Request payloads
Expand All @@ -43,7 +49,7 @@ Behavior:

- CLI options map directly to `batchify` arguments:
`batch_size`, `batch_window_seconds`, `batch_poll_interval_seconds`, `dry_run`,
and `cache`.
`cache`, and `live_display`.
- Script target must use `module_path:function_name` syntax.
- Forwarded callable arguments are mapped as:
positional tokens are passed as positional arguments;
Expand Down
8 changes: 7 additions & 1 deletion docs/architecture/context.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,20 @@ a context variable.
- Activate the `active_batcher` context for the duration of a context block.
- Yield `None` for scope-only lifecycle control.
- Support sync and async context manager patterns for cleanup and context scoping.
- Start and stop optional Rich live activity display while the context is active.

## Flow summary

1. `BatchingContext` stores the `Batcher` on initialization.
2. `__enter__`/`__aenter__` set the active batcher for the entire context block.
3. `__exit__` resets the context and schedules `batcher.close()` if an event loop is
running (otherwise it warns).
4. `__aexit__` resets the context and awaits `batcher.close()` to flush pending work.
4. If `live_display=True`, the context attempts to start Rich panel rendering at
enter-time when terminal auto-detection passes (`TTY`, non-`dumb`, non-`CI`).
Otherwise it registers an `INFO` logging fallback that emits progress at poll-time.
5. `__aexit__` resets the context and awaits `batcher.close()` to flush pending work.
6. The live display listener is removed and the panel is stopped when context cleanup
finishes.

## Code reference

Expand Down
23 changes: 23 additions & 0 deletions docs/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,29 @@ batchling generate_product_images.py:main

That's it! Just run that command and you save 50% off your workflow.

## Live visibility panel

The CLI also exposes the live Rich panel control:

```bash
batchling generate_product_images.py:main --live-display
```

`--live-display` is a boolean flag pair:

- `--live-display` (default): auto mode, Rich panel only in interactive terminals
(`TTY`, non-`dumb`, non-`CI`). If Rich auto-disables, progress is emitted as
`INFO` logs on polling events.
- `--no-live-display`: disable both Rich panel and fallback progress logs.

When enabled, the panel shows overall context progress:
`completed_samples / total_samples`, completion percentage, and `Time Elapsed`
since the first batch seen in the context.
It also shows request counters and a queue summary table with one row per
`(provider, endpoint, model)`, including `progress` as
`completed/total (percentage)` where `completed` is terminal batches and
`total` is `running + completed`.

## Next Steps

If you haven't yet, look at how you can:
Expand Down
24 changes: 24 additions & 0 deletions docs/python-sdk.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,30 @@ async def main():

That's it! Update three lines of code and you save 50% off your workflow.

## Live visibility panel

You can toggle live visibility behavior while the context is active:

```py
async with batchify(live_display=True):
generated_images = await asyncio.gather(*tasks)
```

`live_display` accepts a boolean:

- `True` (default): auto mode, Rich panel only in interactive terminals
(`TTY`, non-`dumb`, non-`CI`). If Rich auto-disables, progress is emitted as
`INFO` logs on polling events.
- `False`: disable both Rich panel and fallback progress logs.

When enabled, the panel shows context-level progress only:
`completed_samples / total_samples`, completion percentage, and `Time Elapsed`
since the first batch seen in the context.
It also shows request counters and a queue summary table with one row per
`(provider, endpoint, model)`, including `progress` as
`completed/total (percentage)` where `completed` is terminal batches and
`total` is `running + completed`.

You can now run this script normally using python and start saving money:

```bash
Expand Down
278 changes: 278 additions & 0 deletions examples/racing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,278 @@
import asyncio
import os
import time
import typing as t
from dataclasses import dataclass

from dotenv import load_dotenv
from groq import AsyncGroq
from mistralai import Mistral
from openai import AsyncOpenAI
from together import AsyncTogether

from batchling import batchify

load_dotenv()


@dataclass
class ProviderRaceResult:
"""One provider completion entry in completion order."""

model: str
elapsed_seconds: float
answer: str


ProviderRequestBuilder = t.Callable[[], t.Coroutine[t.Any, t.Any, tuple[str, str]]]


async def run_openai_request(*, prompt: str) -> tuple[str, str]:
"""
Send one OpenAI request.

Parameters
----------
prompt : str
User prompt sent to the provider.

Returns
-------
tuple[str, str]
``(model_name, answer_text)``.
"""
client = AsyncOpenAI(api_key=os.getenv(key="OPENAI_API_KEY"))
response = await client.responses.create(
input=prompt,
model="gpt-4o-mini",
)
content = response.output[-1].content
return response.model, content[0].text


async def run_groq_request(*, prompt: str) -> tuple[str, str]:
"""
Send one Groq request.

Parameters
----------
prompt : str
User prompt sent to the provider.

Returns
-------
tuple[str, str]
``(model_name, answer_text)``.
"""
client = AsyncGroq(api_key=os.getenv(key="GROQ_API_KEY"))
response = await client.chat.completions.create(
model="llama-3.1-8b-instant",
messages=[
{
"role": "user",
"content": prompt,
}
],
)
return response.model, response.choices[0].message.content


async def run_mistral_request(*, prompt: str) -> tuple[str, str]:
"""
Send one Mistral request.

Parameters
----------
prompt : str
User prompt sent to the provider.

Returns
-------
tuple[str, str]
``(model_name, answer_text)``.
"""
client = Mistral(api_key=os.getenv(key="MISTRAL_API_KEY"))
response = await client.chat.complete_async(
model="mistral-medium-2505",
messages=[
{
"role": "user",
"content": prompt,
}
],
stream=False,
response_format={"type": "text"},
)
return response.model, str(object=response.choices[0].message.content)


async def run_together_request(*, prompt: str) -> tuple[str, str]:
"""
Send one Together request.

Parameters
----------
prompt : str
User prompt sent to the provider.

Returns
-------
tuple[str, str]
``(model_name, answer_text)``.
"""
client = AsyncTogether(api_key=os.getenv(key="TOGETHER_API_KEY"))
response = await client.chat.completions.create(
model="google/gemma-3n-E4B-it",
messages=[
{
"role": "user",
"content": prompt,
}
],
)
return response.model, response.choices[0].message.content


async def run_doubleword_request(*, prompt: str) -> tuple[str, str]:
"""
Send one Doubleword request.

Parameters
----------
prompt : str
User prompt sent to the provider.

Returns
-------
tuple[str, str]
``(model_name, answer_text)``.
"""
client = AsyncOpenAI(
api_key=os.getenv(key="DOUBLEWORD_API_KEY"),
base_url="https://api.doubleword.ai/v1",
)
response = await client.responses.create(
input=prompt,
model="openai/gpt-oss-20b",
)
content = response.output[-1].content
return response.model, content[0].text


async def run_provider_request(
*,
request_builder: ProviderRequestBuilder,
started_at: float,
) -> ProviderRaceResult:
"""
Execute one provider request and annotate elapsed time.

Parameters
----------
request_builder : ProviderRequestBuilder
Provider request coroutine factory.
started_at : float
Shared wall-clock start time in ``perf_counter`` seconds.

Returns
-------
ProviderRaceResult
Result payload with answer and elapsed time.
"""
model, answer = await request_builder()
elapsed_seconds = time.perf_counter() - started_at
return ProviderRaceResult(
model=model,
elapsed_seconds=elapsed_seconds,
answer=answer,
)


def build_enabled_request_builders(*, prompt: str) -> list[ProviderRequestBuilder]:
"""
Build one request factory per configured provider.

Parameters
----------
prompt : str
Shared text prompt sent to all providers.

Returns
-------
list[ProviderRequestBuilder]
Enabled provider request factories.
"""
providers: list[tuple[str, ProviderRequestBuilder]] = [
(
"OPENAI_API_KEY",
lambda: run_openai_request(prompt=prompt),
),
(
"GROQ_API_KEY",
lambda: run_groq_request(prompt=prompt),
),
(
"MISTRAL_API_KEY",
lambda: run_mistral_request(prompt=prompt),
),
(
"TOGETHER_API_KEY",
lambda: run_together_request(prompt=prompt),
),
(
"DOUBLEWORD_API_KEY",
lambda: run_doubleword_request(prompt=prompt),
),
]
enabled_builders: list[ProviderRequestBuilder] = []
for env_var_name, request_builder in providers:
api_key = os.getenv(key=env_var_name)
if not api_key:
continue
enabled_builders.append(request_builder)
return enabled_builders


async def main() -> None:
"""
Run one request per provider and collect completion-order results.

The race excludes Anthropic, Gemini, and XAI on purpose because their model field
extraction differs from the other provider examples.
"""
prompt = "Give one short sentence explaining what asynchronous batching is."
request_builders = build_enabled_request_builders(prompt=prompt)
if not request_builders:
print("No providers configured. Set at least one provider API key in your environment.")
return

started_at = time.perf_counter()
tasks = [
asyncio.create_task(
run_provider_request(
request_builder=request_builder,
started_at=started_at,
)
)
for request_builder in request_builders
]

completion_order_register: list[ProviderRaceResult] = []
for task in asyncio.as_completed(tasks):
result = await task
completion_order_register.append(result)

for index, result in enumerate(completion_order_register, start=1):
print(f"{index}. model={result.model}")
print(f" elapsed={result.elapsed_seconds:.2f}s")
print(f" answer={result.answer}\n")


async def run_with_batchify() -> None:
"""Run the provider race inside ``batchify`` for direct script execution."""
async with batchify():
await main()


if __name__ == "__main__":
asyncio.run(run_with_batchify())
Loading