Skip to content

ovro-lwa/distributed-pipeline

Repository files navigation

ORCA — Tools for distributed OVRO-LWA data processing

Tests Docs

Table of Contents

Installation and Environment Setup

This repository serves as the central place for running and developing the OVRO-LWA data reduction and calibration pipelines.
A pre-configured environment is already available on Calim — see the Developer & Testing Guide for instructions and usage examples.
The following are instructions for setting up a new environment from scratch.

First, clone the repository:

git clone https://github.com/ovro-lwa/distributed-pipeline.git
cd distributed-pipeline

Then create the environment using the provided environment.yaml:

conda env create -f environment.yaml

Activate the environment:

conda activate orca-env

All required dependencies — including both conda and pip packages — are installed automatically by environment.yaml, so no additional installation commands are needed.

Additional Software Requirements

In addition to the Python packages listed in requirements.txt, several external tools are required to run certain ORCA functions:

Configuration Setup

Copy the default configuration file to your home directory:

cp orca/default-orca-conf.yml ~/orca-conf.yml

If you plan to use Celery, edit the queue: section in ~/orca-conf.yml and update:

  • broker_uri with your RabbitMQ URI
  • result_backend_uri with your Redis backend address

If you are not using Celery, you can leave the queue: section unchanged.
It will not affect functionality unless Celery-based task execution is used.

The configuration file is still required for settings related to telescope layout and executable paths.

Run with Celery

Adding a function to orca also requires integrating it with celery. This example commit shows the way to add and integrate a new function. A good way to develop code for celery is to create a function with a unit test An function can be made into a task with the celery application decorator @app.task (app is imported from the celery.py module in this repo). You can call the decorated function like a regular function, test it locally, etc.

This is for integration testing only. Make sure you read https://docs.celeryq.dev/en/stable/getting-started/introduction.html before you start.

You can start a celery worker with

celery -A orca.celery worker -Q default --name=<give-it-a-name> --loglevel=INFO

The queue and backend are configured in celery.py under orca. Make sure you use a different rabbitMQ vhost for testing.

Now you can submit tasks to the application from another session (e.g., IPython, notebook, etc). A common way to submit a task is to use the delay member function, so for your decorated function do_something(a, b), you can run it as result = do_something.delay(a, b). The object result will refer to the task running asynchronously. You can use properties on result to see the status and return value of the function.

Celery admin notes are in docs/celery_deployment.md. The submission session will show some logging, but the celery application process will show more.

Code Structure

Directory Description
orca/ Core library — wrappers and functions that do single units of work
pipeline/ Pipeline submission scripts (Celery chord/chain orchestration)
deploy/ Worker management scripts (see docs/worker-management.md)
tools/ Standalone utilities (e.g. QA imaging) — not part of the core pipeline
tests/ Unit and integration tests
docs/ Sphinx documentation and guides
notebooks/ Jupyter notebooks for examples and monitoring

Developer & Testing Guide

For usage examples and how to test the pipeline without Celery, please refer to the Usage Guide

Subband Pipeline

The Celery-based subband pipeline processes OVRO-LWA data on the calim cluster using node-local NVMe storage. Each subband is pinned to a specific calim node for data locality.

Documentation:

Document Description
docs/subband-pipeline.md Architecture, file reference, and processing flow
docs/worker-management.md Starting, stopping, deploying workers, and troubleshooting
docs/celery_deployment.md Cluster architecture and Celery infrastructure

Quick start (after initial setup):

# Start workers on all available calim nodes (from any calim node):
./deploy/manage-workers.sh start

# Submit a pipeline run (static mode — subband pinned to its default node):
python pipeline/subband_celery.py \
  --range 04-05 --date 2026-01-31 \
  --bp_table /path/to/bandpass.B.flagged \
  --xy_table /path/to/xyphase.Xf \
  --subbands 73MHz --peel_sky --peel_rfi

# Submit with dynamic scheduling (any free node picks up work):
python pipeline/subband_celery.py \
  --range 00-24 --date 2026-01-31 \
  --bp_table /path/to/bandpass.B.flagged \
  --xy_table /path/to/xyphase.Xf \
  --peel_sky --peel_rfi --hot_baselines \
  --cleanup_nvme --compress_snapshots --dynamic

# After code changes — deploy to all nodes in one command:
./deploy/manage-workers.sh deploy

About

A distributed data processing experiment for the OVRO-LWA

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors