Decorator-based HPC workflow engine with an auto-generated CLI.
Define tasks with Python decorators, wire data flow with Result
annotations, and submit to Slurm with a CLI built from your function
signatures -- or interactively from a notebook.
Zero heavy dependencies: the CLI is pure argparse, the manifest store
is stdlib sqlite3. The only optional dependency is tomli for
config file support on Python 3.10.
- Decorator API --
@wf.job()and@wf.array_job()with typed parameter annotations. - Automatic dependency inference --
Result(step="prepare")adds the dependency and wires the data. - Static type validation -- return types vs. parameter types are checked at registration time.
- Fan-out / gather / chain -- all data flow patterns between singletons and arrays.
- Reusable flows -- define tasks on a
Flow, include in multiple workflows with optional prefixes. - Interactive
Runhandle --wf.submit()returns aRunfor status, cancel, retry from notebooks. - Literal support --
Literal["era5", "icon"]maps to argparse choices automatically. - Config file --
~/.config/reflow/config.tomlfor default partition, account, python, server URL. - Pluggable executors --
SlurmExecutor,LocalExecutor, or implement your own. - Pluggable stores --
SqliteStore(stdlib), future REST server.
# demo_flow.py
from typing import Annotated, Literal
from reflow import Workflow, Param, Result, RunDir
wf = Workflow("demo")
@wf.job()
def prepare(
start: Annotated[str, Param(help="Start date, ISO-8601")],
end: Annotated[str, Param(help="End date, ISO-8601")],
model: Annotated[Literal["era5", "icon"], Param(help="Model")] = "era5",
run_dir: RunDir = RunDir(),
) -> list[str]:
return ["a.nc", "b.nc", "c.nc"]
@wf.array_job(cpus=4, time="00:10:00")
def convert(
nc_file: Annotated[str, Result(step="prepare")],
bucket: Annotated[str, Param(help="S3 bucket")],
run_dir: RunDir = RunDir(),
) -> str:
return f"/data/{nc_file}.zarr"
@wf.job()
def finalize(
paths: Annotated[list[str], Result(step="convert")],
run_dir: RunDir = RunDir(),
) -> str:
return "done"
if __name__ == "__main__":
raise SystemExit(wf.cli())python demo_flow.py dag
python demo_flow.py submit --help
export REFLOW_MODE=dry-run
python demo_flow.py submit \
--run-dir /scratch/$USER/demo \
--start 2025-01-01 --end 2025-01-31 \
--bucket my-bucket --model iconfrom typing import Annotated
from reflow import Workflow, Param, Result, RunDir
wf = Workflow("experiment")
@wf.job()
def prepare(start: Annotated[str, Param(help="Start")], run_dir: RunDir = RunDir()) -> list[str]:
return ["a.nc", "b.nc"]
@wf.array_job()
def convert(
nc_file: Annotated[str, Result(step="prepare")],
bucket: Annotated[str, Param(help="Bucket")],
run_dir: RunDir = RunDir(),
) -> str:
return f"{nc_file}.zarr"
run = wf.submit(run_dir="/scratch/run1", start="2025-01-01", bucket="b")
run.status() # pretty-prints task states
run.cancel() # cancel all active tasks
run.cancel("convert") # cancel one task
run.retry() # retry all failures
run.status(as_dict=True) # returns dict for scriptingpip install -e .@wf.array_job()
def convert(
nc_file: Annotated[str, Result(step="prepare")], # dependency + data
run_dir: RunDir = RunDir(),
) -> str: ...Multi-source concatenation:
@wf.array_job()
def merge(
item: Annotated[str, Result(steps=["prepare_a", "prepare_b"])],
) -> str: ...Injected as pathlib.Path. Never appears on the CLI.
start: Annotated[str, Param(help="Start date")] # global: --start
chunk: Annotated[int, Param(help="Chunk", namespace="local")] = 256 # --ingest-chunk
model: Annotated[Literal["era5", "icon"], Param(help="Model")] = "era5" # --model {era5,icon}@wf.job(after=["cleanup_old_runs"])
def prepare(run_dir: RunDir = RunDir()) -> list[str]: ...from reflow import Flow, Workflow
conversion = Flow("conversion")
@conversion.job()
def prepare(...) -> list[str]: ...
@conversion.array_job()
def convert(...) -> str: ...
# Single use
wf = Workflow("experiment")
wf.include(conversion)
# Or with prefixes
combined = Workflow("combined")
combined.include(conversion, prefix="era5")
combined.include(conversion, prefix="icon")
@combined.job()
def merge(
era5: Annotated[list[str], Result(step="era5_convert")],
icon: Annotated[list[str], Result(step="icon_convert")],
run_dir: RunDir = RunDir(),
) -> str:
return "merged"| Upstream returns | Upstream kind | Downstream wants | Downstream kind | Result |
|---|---|---|---|---|
list[str] |
job |
str |
array_job |
fan-out |
str |
array_job |
list[str] |
job |
gather |
str |
array_job |
str |
array_job |
1:1 chain |
list[str] |
array_job |
str |
array_job |
flatten + fan-out |
list[str] |
array_job |
list[str] |
job |
gather + flatten |
str |
job |
str |
job |
direct |
Reflow uses a Merkle DAG to avoid redundant work. Each task instance gets an identity hash computed from:
- task name + version string
- direct input parameters (JSON-serialised)
- output hashes of all upstream dependencies
If a previous successful instance with the same identity exists in the store, its output is reused without submitting a Slurm job. This works across runs -- resubmitting the same workflow with the same parameters skips everything that already succeeded.
prepare(start="2025-01-01", v=1)
identity = hash(name + version + inputs)
output_hash = hash(output)
|
v
convert[0](nc_file="a.nc", v=1)
identity = hash(name + version + inputs + prepare.output_hash)
Change anything upstream and the hashes cascade forward -- every downstream task gets a new identity and misses the cache.
# Bump version when task logic changes (invalidates cache)
@wf.job(version="2")
def prepare(...) -> list[str]: ...
# Disable caching entirely for a task
@wf.job(cache=False)
def always_fresh(...) -> str: ...
# Force-run everything on submit
run = wf.submit(run_dir="...", force=True, start="2025-01-01")
# Force-run specific tasks only
run = wf.submit(run_dir="...", force_tasks=["prepare"], start="2025-01-01")By default, the Merkle identity is trusted on submit -- no filesystem checks. This is intentional: intermediate files may have been cleaned up (and should be), and checking every cached output would penalise good housekeeping.
Verification runs automatically on retry. When something fails,
run.retry() walks the upstream chain, checks that cached outputs
are still valid (file existence for Path types, custom callables),
and invalidates anything stale:
# Normal submit: trusts the cache, fast
run = wf.submit(run_dir="...", start="2025-01-01")
# Task fails -> retry verifies upstream before resubmitting
run.retry() # verify=True by default
run.retry("convert") # verify one task's upstreams
run.retry("convert", verify=False) # skip verification if you're sureFor critical pipelines, opt in to proactive verification on submit:
run = wf.submit(run_dir="...", verify=True, start="2025-01-01")Tasks returning Path or list[Path] are verified by checking file
existence. For custom logic, pass a callable:
@wf.job(verify=lambda output: output > 0)
def compute_stats(...) -> float: ...
@wf.job()
def download(...) -> list[Path]: # auto-verified via Path type
return [Path("/scratch/era5.nc")]Each array instance is cached individually. If 98 out of 100 instances are cached but 2 are stale, only those 2 are submitted to Slurm.
~/.config/reflow/config.toml:
[executor]
partition = "compute"
account = "bm1159"
python = "/sw/spack-levante/mambaforge-23.1/bin/python"
mode = "sbatch"
[defaults]
run_dir = "/scratch/k204221/reflow"
[server]
url = "https://flow.dkrz.de"Falls back to REFLOW_* environment variables.
python flow.py submit --run-dir ... --start ... --bucket ...
python flow.py status <run-id> --run-dir ...
python flow.py cancel <run-id> --run-dir ...
python flow.py retry <run-id> --run-dir ...
python flow.py runs --run-dir ...
python flow.py dag
python flow.py describe # JSON manifest for future server registrationreflow register demo_flow.py --server https://flow.dkrz.de
reflow list --server https://flow.dkrz.de
reflow inspect demo --server https://flow.dkrz.de
reflow delete demo --server https://flow.dkrz.deThe describe command already produces the JSON manifest that a server
would need to reconstruct the CLI and dispatch logic without importing
user code.
Why zero heavy dependencies? HPC environments have constrained package availability. stdlib sqlite3 + argparse means this works everywhere Python 3.10+ is installed.
Why Result annotations? Dependencies and data flow declared in
one place -- the function signature. The framework infers fan-out,
gather, and chaining from the type relationship.
Why Flow and Workflow? Separation of reusable task definitions
from concrete execution machinery. Define once, include in multiple
workflows with optional prefixes.
Why Store ABC? SQLite for local single-user runs, future REST
server for multi-user production. The orchestration core is
storage-agnostic.
Why Run handles? Notebooks need an object that remembers
connection details. run.status() beats passing five arguments.
Why static type validation? Catch wiring errors before submitting to Slurm, not 30 minutes into a batch job.
Why Merkle-DAG caching? Change anything upstream and the hashes
cascade forward automatically -- no manual invalidation. Cross-run
caching means resubmitting the same workflow skips completed work.
Tasks returning Path are auto-verified by checking file existence.