Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions agrifoodpy/pipeline/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import argparse
import json
import sys
from .pipeline import Pipeline


def main(args=None):
parser = argparse.ArgumentParser(
description="Run an AgriFoodPy pipeline from a configuration file."
)

parser.add_argument(
"config",
help="Pipeline configuration YAML file"
)

parser.add_argument(
"-o",
"--output",
help="Optional output file to store the datablock as JSON",
default=None
)

parser.add_argument(
"--nodes",
action="store_true",
help="Print the nodes and parameters to stdout"
)

parser.add_argument(
"--no-run",
action="store_false",
help="Do not run the pipeline"
)

parser.add_argument(
"--from-node",
type=int,
help="Index of the first node to be executed"
)

parser.add_argument(
"--to-node",
type=int,
help="Index of the last node to be executed"
)

parser.add_argument(
"--skip-nodes",
nargs="+",
type=int,
help="List of nodes to be skipped in the pipeline execution"
)

# get system args if none passed
if args is None:
args = sys.argv[1:]

args = parser.parse_args(args or ['--help'])

# read pipeline configuration and set pipeline object
pipeline = Pipeline.read(args.config)

from_node = args.from_node if args.from_node is not None else 0
to_node = args.to_node if args.to_node is not None else len(pipeline.nodes)
skip_nodes = args.skip_nodes if args.skip_nodes is not None else None

# print the nodes and parameters if requested
if args.nodes:
pipeline.print_nodes()

# run the pipeline if not skipped
if args.no_run:
pipeline.run(from_node=from_node, to_node=to_node, skip=skip_nodes)

datablock = pipeline.datablock

# write outputs
if args.output is not None:
with open(args.output, "w") as f:
json.dump(datablock, f, indent=2, default=str)

return 0
39 changes: 32 additions & 7 deletions agrifoodpy/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@
from functools import wraps
from inspect import signature
import time
import yaml
import importlib


class Pipeline():
'''Class for constructing and running pipelines of functions with
individual sets of parameters.'''

def __init__(self, datablock=None):
self.nodes = []
self.params = []
Expand All @@ -22,9 +25,16 @@ def __init__(self, datablock=None):
else:
self.datablock = {}

@staticmethod
def _load_function(path):
"""Load a function from a dotted path."""
module_path, func_name = path.rsplit(".", 1)
module = importlib.import_module(module_path)
return getattr(module, func_name)

@classmethod
def read(cls, filename):
"""Read a pipeline from a configuration file
"""Read a pipeline configuration from a YAML file

Parameters
----------
Expand All @@ -36,7 +46,23 @@ def read(cls, filename):
pipeline : Pipeline
The pipeline object.
"""
raise NotImplementedError("This method is not yet implemented.")

with open(filename, "r") as f:
config = yaml.load(f, Loader=yaml.FullLoader)

pipeline = cls()

if config is not None:
for step in config["nodes"]:
func = cls._load_function(step["function"])
params = step.get("params", {})
name = step.get("name", func.__name__)

pipeline.nodes.append(func)
pipeline.params.append(params)
pipeline.names.append(name)

return pipeline

def datablock_write(self, path, value):
"""Writes a single value to the datablock at the specified path.
Expand Down Expand Up @@ -118,7 +144,6 @@ def remove_node(self, node):
del self.params[index]
del self.names[index]


def run(self, from_node=0, to_node=None, skip=None, timing=False):
"""Runs the pipeline

Expand All @@ -130,7 +155,7 @@ def run(self, from_node=0, to_node=None, skip=None, timing=False):
to_node : int, optional
The index of the last node to be executed. If not provided, all
nodes will be executed

skip : list of int, optional
List of node indices to skip during execution. Defaults to None.

Expand Down Expand Up @@ -175,18 +200,17 @@ def run(self, from_node=0, to_node=None, skip=None, timing=False):

def print_nodes(self, show_params=True):
"""Prints the list of nodes associated with a Pipeline instance.

Parameters
----------
show_params : bool, optional
If True, displays the parameters associated with each node.
"""


if not self.nodes:
print("Pipeline is empty.")
return

print("Pipeline nodes:")
for i, (name, node, params) in enumerate(zip(self.names, self.nodes, self.params)):
node_name = getattr(node, "__name__", repr(node))
Expand All @@ -195,6 +219,7 @@ def print_nodes(self, show_params=True):
for k, v in params.items():
print(f" {k} = {v}")


def standalone(input_keys, return_keys):
""" Decorator to make a pipeline node available as a standalone function

Expand Down
Empty file.
6 changes: 6 additions & 0 deletions agrifoodpy/pipeline/tests/data/test_config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
nodes:
- function: agrifoodpy.utils.nodes.write_to_datablock
params:
key: test_write
value: 200
name: writing to datablock
27 changes: 27 additions & 0 deletions agrifoodpy/pipeline/tests/test_cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from agrifoodpy.pipeline.cli import main
import pytest
import os

def test_cli(tmp_path):

# Test with no arguments
with pytest.raises(SystemExit) as e:
main()
assert e.value.code == 0

# Argparse help
with pytest.raises(SystemExit) as e:
main(['--help'])
assert e.value.code == 0

# Process test config file
script_dir = os.path.dirname(__file__)
config_filename = os.path.join(script_dir, "data/empty_config.yml")
output_filename = str(tmp_path / 'empty.json')
assert main([config_filename, '-o', output_filename]) == 0

# Process test config file
script_dir = os.path.dirname(__file__)
config_filename = os.path.join(script_dir, "data/test_config.yml")
output_filename = str(tmp_path / 'test.json')
assert main([config_filename, '-o', output_filename]) == 0
Loading
Loading