Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ dashboard = [

[project.scripts]
ess-livedata-make-geometry-nexus = "ess.livedata.scripts.make_geometry_nexus:main"
esslivedata-workflows = "ess.livedata.scripts.visualize_workflows:main"

[project.urls]
"Bug Tracker" = "https://github.com/scipp/esslivedata/issues"
Expand Down Expand Up @@ -151,6 +152,9 @@ pydocstyle.convention = "numpy"
"scripts/*" = [
"TID251", # scripts can use stdlib logging
]
"src/ess/livedata/scripts/*" = [
"T201", # print is the right output mechanism for CLI entry points
]
"*.ipynb" = [
"E501", # longer lines are sometimes more readable
"F403", # *-imports used with domain types
Expand Down
8 changes: 8 additions & 0 deletions src/ess/livedata/handlers/stream_processor_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from collections.abc import Iterable
from typing import Any

import graphviz
import sciline
import sciline.typing
from ess.reduce import streaming
Expand Down Expand Up @@ -134,3 +135,10 @@ def clear(self) -> None:
self._stream_processor.clear()
self._current_start_time = None
self._current_end_time = None

def visualize(self, **kwargs: Any) -> graphviz.Digraph:
"""Visualize the streaming workflow graph.

See :py:meth:`ess.reduce.streaming.StreamProcessor.visualize` for parameters.
"""
return self._stream_processor.visualize(**kwargs)
4 changes: 2 additions & 2 deletions src/ess/livedata/scripts/make_geometry_nexus.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,11 @@ def main() -> int:
args = parser.parse_args()

if not args.input.exists():
print(f'Input file {args.input} does not exist', file=sys.stderr) # noqa: T201
print(f'Input file {args.input} does not exist', file=sys.stderr)
return 1

if args.output.exists() and not args.force:
print(f'Output file {args.output} already exists', file=sys.stderr) # noqa: T201
print(f'Output file {args.output} already exists', file=sys.stderr)
return 1

write_minimal_geometry(args.input, args.output, use_pixel_shape=args.use_shape)
Expand Down
103 changes: 103 additions & 0 deletions src/ess/livedata/scripts/visualize_workflows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
#!/usr/bin/env python
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2025 Scipp contributors (https://github.com/scipp)
"""Render workflow graphs for a given instrument."""

import argparse
import sys
from pathlib import Path

import structlog

logger = structlog.get_logger(__name__)


def _load_instrument(name: str):
from importlib import import_module

from ess.livedata.config.instrument import instrument_registry

import_module(f'ess.livedata.config.instruments.{name}.specs')
instrument = instrument_registry[name]
instrument.load_factories()
return instrument


def main() -> None:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('--instrument', required=True, help='Instrument name')
parser.add_argument(
'--workflow',
metavar='ID',
help='Workflow ID to render (see --list). Renders all if omitted.',
)
parser.add_argument('--format', default='svg', help='Output format (default: svg)')
parser.add_argument(
'--output-dir',
type=Path,
default=Path('.'),
help='Output directory (default: current directory)',
)
parser.add_argument(
'--list', action='store_true', help='List workflow IDs and exit'
)
args = parser.parse_args()

instrument = _load_instrument(args.instrument)
factory = instrument.workflow_factory
all_ids = sorted(str(wid) for wid in factory)

if args.list:
for wid in all_ids:
print(wid)
return

from ess.livedata.config.workflow_spec import WorkflowId

if args.workflow is not None:
target = WorkflowId.from_string(args.workflow)
if target not in factory:
print(f"Unknown workflow: {args.workflow}", file=sys.stderr)
print("Available workflows:", file=sys.stderr)
for wid in all_ids:
print(f" {wid}", file=sys.stderr)
sys.exit(1)
items = [(target, factory[target])]
else:
items = list(factory.items())

args.output_dir.mkdir(parents=True, exist_ok=True)
rendered = 0
for wid, spec in items:
source_name = spec.source_names[0] if spec.source_names else ""
from ess.livedata.config.workflow_spec import WorkflowConfig

config = WorkflowConfig(identifier=wid, params={})
try:
workflow = factory.create(source_name=source_name, config=config)
except Exception as e:
logger.warning(
"Failed to create workflow", workflow_id=str(wid), error=str(e)
)
continue
if not hasattr(workflow, 'visualize'):
continue
try:
graph = workflow.visualize()
except Exception as e:
logger.warning(
"Failed to visualize workflow", workflow_id=str(wid), error=str(e)
)
continue
filename = str(wid).replace('/', '_')
graph.render(args.output_dir / filename, format=args.format, cleanup=True)
print(f"Wrote {args.output_dir / filename}.{args.format}")
rendered += 1

if not rendered:
print("No visualizable workflows found.", file=sys.stderr)
sys.exit(1)


if __name__ == '__main__':
main()
37 changes: 37 additions & 0 deletions tests/handlers/visualize_workflows_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2025 Scipp contributors (https://github.com/scipp)
from typing import NewType

import graphviz
import sciline

from ess.livedata.handlers.stream_processor_workflow import StreamProcessorWorkflow

Input = NewType('Input', int)
Output = NewType('Output', int)


def _identity(x: Input) -> Output:
return Output(x)


class TestStreamProcessorWorkflowVisualize:
def test_returns_graphviz_digraph(self):
workflow = StreamProcessorWorkflow(
sciline.Pipeline((_identity,)),
dynamic_keys={'input': Input},
target_keys={'output': Output},
accumulators=(Output,),
)
graph = workflow.visualize()
assert isinstance(graph, graphviz.Digraph)

def test_passes_kwargs_through(self):
workflow = StreamProcessorWorkflow(
sciline.Pipeline((_identity,)),
dynamic_keys={'input': Input},
target_keys={'output': Output},
accumulators=(Output,),
)
graph = workflow.visualize(compact=True, show_legend=False)
assert isinstance(graph, graphviz.Digraph)
Loading