- Installation and Environment Setup
- Additional Software Requirements
- Configuration Setup
- Run with celery
- Code Structure
- Developer & Testing Guide
- Subband Pipeline
This repository serves as the central place for running and developing the OVRO-LWA data reduction and calibration pipelines.
A pre-configured environment is already available on Calim — see the Developer & Testing Guide for instructions and usage examples.
The following are instructions for setting up a new environment from scratch.
First, clone the repository:
git clone https://github.com/ovro-lwa/distributed-pipeline.git
cd distributed-pipeline
Then create the environment using the provided environment.yaml:
conda env create -f environment.yaml
Activate the environment:
conda activate orca-env
All required dependencies — including both conda and pip packages — are installed automatically by environment.yaml, so no additional installation commands are needed.
In addition to the Python packages listed in requirements.txt, several external tools are required to run certain ORCA functions:
-
WSClean — used for imaging. Also includes tools like
chgcentrefor changing the phase center of a measurement set. See: -
AOFlagger — used for RFI flagging. See:
-
TTCal.jl — used for peeling bright sources: http://mweastwood.info/TTCal.jl/
-
mnc_python — OVRO-LWA Monitor and Control Python tools, used in functions like automatic bad antenna detection:
https://github.com/ovro-lwa/mnc_python
Copy the default configuration file to your home directory:
cp orca/default-orca-conf.yml ~/orca-conf.ymlIf you plan to use Celery, edit the queue: section in ~/orca-conf.yml and update:
broker_uriwith your RabbitMQ URIresult_backend_uriwith your Redis backend address
If you are not using Celery, you can leave the queue: section unchanged.
It will not affect functionality unless Celery-based task execution is used.
The configuration file is still required for settings related to telescope layout and executable paths.
Adding a function to orca also requires integrating it with celery. This example commit shows the way to add and integrate a new function. A good way to develop code for celery is to create a function with a unit test An function can be made into a task with the celery application decorator @app.task (app is imported from the celery.py module in this repo). You can call the decorated function like a regular function, test it locally, etc.
This is for integration testing only. Make sure you read https://docs.celeryq.dev/en/stable/getting-started/introduction.html before you start.
You can start a celery worker with
celery -A orca.celery worker -Q default --name=<give-it-a-name> --loglevel=INFO
The queue and backend are configured in celery.py under orca. Make sure you use a different rabbitMQ vhost for testing.
Now you can submit tasks to the application from another session (e.g., IPython, notebook, etc). A common way to submit a task is to use the delay member function, so for your decorated function do_something(a, b), you can run it as result = do_something.delay(a, b). The object result will refer to the task running asynchronously. You can use properties on result to see the status and return value of the function.
Celery admin notes are in docs/celery_deployment.md. The submission session will show some logging, but the celery application process will show more.
| Directory | Description |
|---|---|
orca/ |
Core library — wrappers and functions that do single units of work |
pipeline/ |
Pipeline submission scripts (Celery chord/chain orchestration) |
deploy/ |
Worker management scripts (see docs/worker-management.md) |
tools/ |
Standalone utilities (e.g. QA imaging) — not part of the core pipeline |
tests/ |
Unit and integration tests |
docs/ |
Sphinx documentation and guides |
notebooks/ |
Jupyter notebooks for examples and monitoring |
For usage examples and how to test the pipeline without Celery, please refer to the Usage Guide
The Celery-based subband pipeline processes OVRO-LWA data on the calim cluster using node-local NVMe storage. Each subband is pinned to a specific calim node for data locality.
Documentation:
| Document | Description |
|---|---|
| docs/subband-pipeline.md | Architecture, file reference, and processing flow |
| docs/worker-management.md | Starting, stopping, deploying workers, and troubleshooting |
| docs/celery_deployment.md | Cluster architecture and Celery infrastructure |
Quick start (after initial setup):
# Start workers on all available calim nodes (from any calim node):
./deploy/manage-workers.sh start
# Submit a pipeline run (static mode — subband pinned to its default node):
python pipeline/subband_celery.py \
--range 04-05 --date 2026-01-31 \
--bp_table /path/to/bandpass.B.flagged \
--xy_table /path/to/xyphase.Xf \
--subbands 73MHz --peel_sky --peel_rfi
# Submit with dynamic scheduling (any free node picks up work):
python pipeline/subband_celery.py \
--range 00-24 --date 2026-01-31 \
--bp_table /path/to/bandpass.B.flagged \
--xy_table /path/to/xyphase.Xf \
--peel_sky --peel_rfi --hot_baselines \
--cleanup_nvme --compress_snapshots --dynamic
# After code changes — deploy to all nodes in one command:
./deploy/manage-workers.sh deploy