A transparent sync/async bridge for Python.
Define a pipeline once — quent handles the rest.
- One definition, two worlds — a single pipeline works for both sync and async callers. Zero code duplication.
- Zero ceremony — no decorators, no base classes, no type wrappers. Just compose your functions.
- Drop-in migration — unify existing sync and async implementations into one pipeline. Stop maintaining two versions.
- Pure Python — zero runtime dependencies. Fully typed (PEP 561).
- Works with asyncio, trio, and curio — async pipelines run transparently under any of these event loops. Event loop detection uses
sys.moduleslookups (~50ns), adding zero overhead when those libraries are not loaded. Dual-protocol objects (context managers and iterables supporting both sync and async protocols) automatically prefer the async protocol under any running event loop. - Focused — every feature exists because removing it would force separate sync and async code paths.
Any codebase that supports both sync and async callers ends up maintaining two versions of the same logic:
# Without quent -- the same pipeline, written twice
def process_sync(data):
validated = validate_sync(data)
transformed = transform_sync(validated)
return save_sync(transformed)
async def process_async(data):
validated = await validate_async(data)
transformed = await transform_async(validated)
return await save_async(transformed)Every function, every pipeline, every utility — duplicated. When a bug is fixed in one version, the other falls out of sync. When a new step is added, it must be added in both places.
# With quent -- write it once
pipeline = Q().then(validate).then(transform).then(save)
result = pipeline.run(data) # sync if all steps are sync
result = await pipeline.run(data) # async if any step is asyncOne definition. The pipeline starts executing synchronously. The moment any step returns an awaitable, execution seamlessly transitions to async and stays there. The caller decides whether to await.
pip install quentRequires Python 3.10+. Supports 3.10 through 3.14, including free-threaded builds. Zero runtime dependencies on Python 3.11+ (typing_extensions on 3.10).
from quent import Q
# Basic pipeline
result = Q(5).then(lambda x: x * 2).then(lambda x: x + 1).run()
print(result) # 11
# Side effects -- do() runs the function but passes the value through
result = Q(42).then(lambda x: x * 2).do(print).then(str).run() # prints: 84
print(result) # '84'
# Works with any callable
result = Q(' hello ').then(str.strip).then(str.upper).run()
print(result) # HELLOThe same pipeline works whether your functions are sync, async, or a mix:
pipeline = Q().then(fetch_data).then(validate).then(normalize)
# Sync context
result = pipeline.run(id)
# Async context -- same pipeline, no changes
result = await pipeline.run(id)Build pipelines fluently. Every builder method returns self for chaining.
from quent import Q
result = (
Q(fetch_user, user_id) # fetch user by id
.then(validate) # transform
.do(log) # side-effect
.foreach(normalize_field) # per-element
.gather(enrich, score) # concurrent
.then(merge) # combine
.if_(has_premium).then(upgrade) # conditional
.except_(handle_error) # error handling
.finally_(cleanup) # cleanup
.run() # execute
)Collection Operations — foreach, foreach_do
# foreach -- transform each element, collect results
Q([1, 2, 3]).foreach(lambda x: x ** 2).run() # [1, 4, 9]
# foreach_do -- side-effect per element, keep originals
Q([1, 2, 3]).foreach_do(print).run() # prints 1, 2, 3; returns [1, 2, 3]
# filter via list comprehension
Q([1, 2, 3, 4, 5]).then(lambda xs: [x for x in xs if x % 2 == 0]).run() # [2, 4]Concurrent Execution — gather, concurrency parameter
Run multiple functions on the same value concurrently:
Q('hello').gather(str.upper, len).run() # ('HELLO', 5)Limit concurrency on collection operations with the concurrency parameter. Uses ThreadPoolExecutor for sync callables and asyncio.Semaphore + TaskGroup for async:
# Process up to 10 items at a time
Q(urls).foreach(fetch, concurrency=10).run()
# Limit concurrent gather branches
Q(data).gather(analyze, compress, upload, concurrency=5).run()Pass a custom executor for sync concurrent operations:
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=4) as pool:
Q(urls).foreach(fetch, concurrency=4, executor=pool).run()Conditionals — if_ / else_
Q(5).if_(lambda x: x > 0).then(lambda x: x * 2).run() # 10
Q(-5).if_(lambda x: x > 0).then(str).else_(abs).run() # 5
# When predicate is omitted, uses truthiness of the current value
Q('hello').if_().then(str.upper).run() # 'HELLO'
Q('').if_().then(str.upper).else_(lambda _: 'empty').run() # 'empty'
# Literal predicate -- truthiness used directly
Q(value).if_(is_admin).then(grant_access).run()
# Side-effect conditional branch
Q(user).if_(is_premium).do(log_premium_access).then(next_step).run()Loops — while_
# Decrement until zero (default predicate tests truthiness)
Q(10).while_().then(lambda x: x - 1).run() # 0
# Predicate callable -- halve while value exceeds 1
Q(100).while_(lambda x: x > 1).then(lambda x: x // 2).run() # 1
# break_() to exit early
Q(1).while_(True).then(lambda x: Q.break_(x) if x >= 100 else x * 2).run() # 128
# Nest conditionals inside a loop body via nested pipeline
Q(data).while_(has_more).then(
Q().if_(is_valid).then(process).else_(skip)
).run()Context Managers — with_ / with_do
Transparently handles both sync and async context managers:
Q(open('data.txt')).with_(lambda f: f.read()).run()
# Side-effect variant (result discarded, original value passes through)
Q(open('log.txt', 'w')).with_do(lambda f: f.write('done')).run()Error Handling — except_ / finally_
One exception handler and one finally handler per pipeline:
from quent import Q, QuentExcInfo
Q(0).then(lambda x: 1 / x).except_(lambda ei: -1).run() # -1
Q(url)
.then(fetch)
.then(parse)
.except_(handle_error, exceptions=ConnectionError)
.finally_(cleanup)
.run()except_ catches Exception by default. The handler receives a QuentExcInfo(exc, root_value) as its current value. Use reraise=True to re-raise after handling (handler runs for side-effects only). finally_ always runs and receives the pipeline's root value.
Control Flow — return_ / break_
# Early return -- skips all remaining steps
Q(5) \
.then(lambda x: Q.return_(x * 10) if x > 0 else x) \
.then(str) \
.run() # 50 (str step is skipped)
# Break from iteration -- break value is appended to partial results
Q([1, 2, 3, 4, 5]).foreach(lambda x: Q.break_(x) if x == 3 else x * 2).run()
# [2, 4, 3]Composition — clone, as_decorator
clone — fork-and-extend without modifying the original:
base = Q().then(validate).then(normalize)
for_api = base.clone().then(to_json) # base is untouched
for_db = base.clone().then(to_record) # independent copyas_decorator — wrap a pipeline as a function decorator:
@Q().then(lambda x: x.strip()).then(str.upper).as_decorator()
def get_name():
return ' alice '
get_name() # 'ALICE'Iteration — iterate / iterate_do
Dual sync/async generators over pipeline output:
for item in Q(range(5)).iterate(lambda x: x ** 2):
print(item) # 0, 1, 4, 9, 16
async for item in Q(async_source).iterate(transform):
print(item) # works with async sources tooGenerator Driving — drive_gen
Drive sync or async generators via the send protocol:
def gen():
x = yield 1
x = yield x + 1
x = yield x + 1
Q(gen()).drive_gen(lambda x: x * 2).run()
# Flow: yield 1 → fn(1)=2 → send 2 → yield 3 → fn(3)=6 → send 6 → yield 7 → fn(7)=14
# returns: 14
# Works with async generators too
result = await Q(async_gen()).drive_gen(process).run()Buffered Iteration — buffer
Decouple producer and consumer with a bounded, backpressure-aware buffer:
# Producer runs ahead up to 10 items while consumer processes
for item in Q(produce).buffer(10).iterate():
process(item)
# Works with all iteration terminals and async
async for item in Q(async_produce).buffer(10).iterate(transform):
await consume(item)Sync uses a background thread + queue.Queue; async uses a background task + asyncio.Queue.
Debug Execution — debug
Execute a pipeline with step-level tracing. Returns a DebugResult capturing the full execution trace — each step's name, input, result, timing, and exception status. The original pipeline is not modified (an internal clone is used).
result = Q(5).then(lambda x: x * 2).then(str).debug()
print(result.value) # '10'
print(result.succeeded) # True
print(result.elapsed_ns) # total nanoseconds
result.print_trace() # formatted table to stderr
# Works with async pipelines
result = await Q(fetch).then(parse).debug(url)DebugResult provides value, steps (list of StepRecord), elapsed_ns, succeeded, failed, and print_trace(). Each StepRecord has step_name, input_value, result, elapsed_ns, exception, and ok.
How arguments flow through the pipeline is determined by two rules, checked in priority order:
| Condition | Behavior |
|---|---|
| Explicit args/kwargs provided | Call fn(*args, **kwargs) -- current value NOT passed |
| No args (default) | Call fn(current_value), fn() if no value, or return value as-is if non-callable |
Q(5).then(str).run() # str(5) -- current value passed
Q(5).then(print, 'hello').run() # print('hello') -- explicit args usedWhen an exception occurs inside a pipeline, quent injects a visualization directly into the traceback showing exactly which step failed:
Traceback (most recent call last):
...
File "<quent>", line 1, in
Q(fetch_data)
.then(validate)
.then(transform) <----
.do(log)
...
ZeroDivisionError: division by zero
The <---- marker points to the step that raised. Internal quent frames are cleaned from the traceback. On Python 3.11+, a concise exception note is also attached.
Opt out by setting QUENT_NO_TRACEBACK=1 before importing quent.
Q(v=<no value>, /, *args, **kwargs)All methods return self for fluent chaining.
| Method | Description |
|---|---|
.then(v, /, *args, **kwargs) |
Append step; result replaces current value |
.do(fn, /, *args, **kwargs) |
Side-effect step; fn must be callable, result discarded |
.foreach(fn=None, /, *, concurrency=None, executor=None) |
Transform each element, collect results. fn=None collects elements as-is (identity mode) |
.foreach_do(fn, /, *, concurrency=None, executor=None) |
Side-effect per element, keep originals. fn is required |
.gather(*fns, concurrency=-1, executor=None) |
Run multiple fns on current value, collect results as tuple |
.with_(fn, /, *args, **kwargs) |
Enter current value as context manager, call fn |
.with_do(fn, /, *args, **kwargs) |
Same as with_, but fn result discarded |
.if_(predicate=None, /, *args, **kwargs) |
Begin conditional; must be followed by .then() or .do() |
.if_(...).then(fn, /, *args, **kwargs) |
Conditional transform -- runs fn if predicate is truthy, result replaces current value |
.if_(...).do(fn, /, *args, **kwargs) |
Conditional side-effect -- runs fn if predicate is truthy, result discarded |
.else_(v, /, *args, **kwargs) |
Else branch (must follow .then() or .do()) |
.else_do(fn, /, *args, **kwargs) |
Side-effect else branch (result discarded) |
.except_(fn, /, *args, exceptions=None, reraise=False, **kwargs) |
Exception handler (one per pipeline) |
.finally_(fn, /, *args, **kwargs) |
Cleanup handler (one per pipeline) |
.name(label) |
Assign a label for traceback identification |
.while_(predicate=None, /, *args, **kwargs) |
Begin loop; next .then() or .do() becomes body |
.drive_gen(fn) |
Drive a sync/async generator via send protocol |
.set(key) / .set(key, value) |
Store a value in the execution context (current value unchanged) |
.get(key) / .get(key, default) |
Retrieve a value from context; replaces current value |
| Method | Description |
|---|---|
.run(v=<no value>, /, *args, **kwargs) |
Execute the pipeline; returns value or coroutine |
q(...) |
Alias for .run() |
.debug(v=<no value>, /, *args, **kwargs) |
Execute with step-level tracing; returns DebugResult |
| Method | Description |
|---|---|
.as_decorator() |
Wrap pipeline as a function decorator |
.iterate(fn=None) |
Dual sync/async generator over output |
.iterate_do(fn=None) |
Like iterate, fn results discarded |
.flat_iterate(fn=None, *, flush=None) |
Flatmap iterator; flattens one level or maps fn to sub-iterables |
.flat_iterate_do(fn=None, *, flush=None) |
Like flat_iterate, fn results discarded; original elements yielded |
.buffer(n) |
Attach backpressure buffer for iteration terminals |
.clone() |
Deep copy for fork-and-extend |
Q.from_steps(*steps) |
Construct a pipeline from a sequence of .then() steps |
Class methods
| Method | Description |
|---|---|
Q.return_(v=<no value>, /, *args, **kwargs) |
Signal early return from pipeline |
Q.break_(v=<no value>, /, *args, **kwargs) |
Signal break from iteration or while_ loop |
| Method | Description |
|---|---|
Q.set(key, value) |
Store a value in the execution context immediately (not a pipeline step) |
Q.get(key) / Q.get(key, default) |
Retrieve a value from the execution context immediately |
| Name | Description |
|---|---|
Q |
Main pipeline class |
QuentExcInfo |
NamedTuple (exc, root_value) passed to except handlers |
QuentIterator |
Type alias for .iterate() / .iterate_do() return values |
QuentException |
Exception type for quent-specific errors |
__version__ |
Package version string |
Q.on_step |
Optional callback (q, step_name, input_value, result, elapsed_ns, exception) for instrumentation. exception is None on success, the exception object on failure (before except_ runs). Not called for control flow signals. Zero overhead when None. |
Note:
copy.copy()andcopy.deepcopy()are blocked on Q objects (TypeError). Use.clone()to produce a correct independent copy. Pickling is not blocked — most pipeline contents (lambdas, closures) will naturally fail to pickle, but quent does not enforce this.
See the examples/ directory for complete, runnable recipes covering ETL pipelines, API gateways, fan-out/fan-in patterns, retry with backoff, and testing pipelines.
The sync path is a tight while loop — zero async machinery. Each sync step adds ~210 ns of overhead. The async path adds ~4 µs per step — overhead that disappears against real I/O.
| Pipeline | 1 Step | 5 Steps | 10 Steps | Per Step |
|---|---|---|---|---|
| Sync | 0.8 µs | 1.7 µs | 2.7 µs | ~210 ns |
| Async | 17 µs | 33 µs | 53 µs | ~4 µs |
| Mixed (sync+async) | — | 22 µs | 34 µs | — |
Baselines — raw function call: ~26 ns · bare await coroutine(): ~2.8 µs
I/O-bound workloads — quent's primary use case. Pipeline overhead is fixed; I/O latency dwarfs it:
| I/O Latency | 5-Step Overhead | % of Total |
|---|---|---|
| 1 ms (fast query) | ~22 µs | 2.2% |
| 5 ms (database) | ~22 µs | 0.4% |
| 50 ms (HTTP API) | ~22 µs | 0.04% |
Python 3.14, Apple M-series, macOS. See benchmark scripts for reproducible results. — Full performance guide →
quent's correctness rests on a single guarantee: any pipeline step can be swapped between sync and async without changing the result. The test suite is purpose-built to prove this exhaustively.
- 1,342+ test methods across 24 test modules and 286+ test classes
- 21 CI matrix combinations — 3 OSes (Ubuntu, macOS, Windows) × 5 Python versions (3.10–3.14), plus free-threaded builds (3.13t, 3.14t)
- Security scanning —
pip-auditfor dependency vulnerabilities,banditSAST for source code
The core testing infrastructure proves the sync/async bridge contract across a 7-axis combinatorial space:
- Operation type — 96 "bricks" covering every pipeline operation × every calling convention
- Pipeline length — pipelines of 1 to N steps
- Operation order — every permutation of operations (with repetition)
- Sync/async per position — each step independently sync or async (2N combinations per pipeline)
- Error injection — exceptions, base exceptions, and control flow signals at each position
- Concurrency — sequential and concurrent variants
- Handler configuration — 18 error handler combinations (except/finally/both, sync/async, consuming/reraising/failing)
For each configuration, all 2N sync/async permutations run and must produce identical results. No expected values are precomputed — the invariant is that all permutations agree with each other. Correctness is independently verified by composing pure-Python oracle functions.
- Transition matrix — all 17,576 triplets of 26 atomic operations verify that every method adjacency produces correct results in all sync/async variants
- Property-based testing — Hypothesis generates random inputs for 179 property and fuzz tests, including CWE-117 repr sanitization with adversarial ANSI escape sequences
- Thread safety — 30–50 concurrent threads with barrier synchronization verify safe concurrent execution of fully constructed pipelines
- Oracle validation — each of the 96 bricks has an independent oracle function; oracles are verified against quent before being used in bridge assertions
- Warning validation — all warnings emitted during exhaustive runs are captured and validated against expected patterns
# Full suite (format + lint + type check + tests)
./run_tests.sh
# Tests only (parallel -- wall-clock time = slowest module)
python scripts/run_tests_parallel.py
# Single module
python -m unittest tests.bridge_testsFull documentation — including guides, advanced usage, recipes, and framework integration examples — is available at quent.readthedocs.io.
See the contributing guide for setup instructions, code style, and PR guidelines.
git clone https://github.com/drukmano/quent.git
cd quent
uv sync --group dev # or: pip install -e . && pip install ruff mypy
bash scripts/run_tests.shDocs • GitHub • PyPI • Getting Started • Changelog
MIT — Copyright (c) 2023–2026 Ohad Drukman
