diff --git a/src/cryoemservices/services/cluster_submission.py b/src/cryoemservices/services/cluster_submission.py index 64c65725..cb2a7d52 100644 --- a/src/cryoemservices/services/cluster_submission.py +++ b/src/cryoemservices/services/cluster_submission.py @@ -7,6 +7,7 @@ from cryoemservices.services.common_service import CommonService from cryoemservices.util.config import config_from_file +from cryoemservices.util.models import MockRW from cryoemservices.util.slurm_submission import ( JobSubmissionParameters, submit_to_slurm, @@ -30,8 +31,19 @@ def initializing(self): acknowledgement=True, ) - def run_submit_job(self, rw, header, message): + def run_submit_job(self, rw, header: dict, message: dict): """Submit cluster job according to message.""" + if not rw: + self.log.info("Received a simple message") + if not isinstance(message, dict): + self.log.error("Rejected invalid simple message") + self._transport.nack(header) + return + + # Create a wrapper-like object that can be passed to functions + # as if a recipe wrapper was present. + rw = MockRW(self._transport) + rw.recipe_step = {"parameters": message} parameters = rw.recipe_step["parameters"] if type(parameters.get("cluster", {}).get("commands")) is list: @@ -72,7 +84,7 @@ def run_submit_job(self, rw, header, message): self.log.error( "No absolute working directory specified. Will not run cluster job" ) - self._transport.nack(header) + rw.transport.nack(header) return working_directory = Path(parameters["workingdir"]) try: @@ -81,7 +93,7 @@ def run_submit_job(self, rw, header, message): self.log.error( "Could not create working directory: %s", str(e), exc_info=True ) - self._transport.nack(header) + rw.transport.nack(header) return if parameters.get("standard_output"): @@ -105,17 +117,9 @@ def run_submit_job(self, rw, header, message): ) if not jobnumber: self.log.error("Job was not submitted") - self._transport.nack(header) + rw.transport.nack(header) return - # Conditionally acknowledge receipt of the message - txn = self._transport.transaction_begin(subscription_id=header["subscription"]) - self._transport.ack(header, transaction=txn) - - # Send results onwards - rw.set_default_channel("job_submitted") - rw.send({"jobid": jobnumber}, transaction=txn) - - # Commit transaction - self._transport.transaction_commit(txn) + # Acknowledge success + rw.transport.ack(header) self.log.info(f"Submitted job {jobnumber} to slurm") diff --git a/src/cryoemservices/services/extract.py b/src/cryoemservices/services/extract.py index 00ab828b..6824644a 100644 --- a/src/cryoemservices/services/extract.py +++ b/src/cryoemservices/services/extract.py @@ -7,10 +7,8 @@ import numpy as np from gemmi import cif from pydantic import BaseModel, Field, ValidationError -from workflows.recipe import wrap_subscribe from cryoemservices.services.common_service import CommonService -from cryoemservices.util.models import MockRW from cryoemservices.util.relion_service_options import ( RelionServiceOptions, update_relion_options, @@ -55,44 +53,27 @@ class Extract(CommonService): def initializing(self): """Subscribe to a queue. Received messages must be acknowledged.""" self.log.info("Extract service starting") - wrap_subscribe( - self._transport, + self._transport.subscribe( self._environment["queue"] or "extract", self.extract, acknowledgement=True, - allow_non_recipe_messages=True, ) - def extract(self, rw, header: dict, message: dict): + def extract(self, header: dict, message: dict): """Main function which interprets and processes received messages""" - if not rw: - self.log.info("Received a simple message") - if not isinstance(message, dict): - self.log.error("Rejected invalid simple message") - self._transport.nack(header) - return - - # Create a wrapper-like object that can be passed to functions - # as if a recipe wrapper was present. - rw = MockRW(self._transport) - rw.recipe_step = {"parameters": message} + if not isinstance(message, dict): + self.log.error("Rejected invalid non-dictionary message") + self._transport.nack(header) + return try: - if isinstance(message, dict): - extract_params = ExtractParameters( - **{**rw.recipe_step.get("parameters", {}), **message} - ) - else: - extract_params = ExtractParameters( - **{**rw.recipe_step.get("parameters", {})} - ) + extract_params = ExtractParameters(**message) except (ValidationError, TypeError) as e: self.log.warning( f"Extraction parameter validation failed for message: {message} " - f"and recipe parameters: {rw.recipe_step.get('parameters', {})} " f"with exception: {e}" ) - rw.transport.nack(header) + self._transport.nack(header) return self.log.info( @@ -113,7 +94,7 @@ def extract(self, rw, header: dict, message: dict): job_dir = Path(job_dir_search[0]) else: self.log.warning(f"Invalid job directory in {extract_params.output_file}") - rw.transport.nack(header) + self._transport.nack(header) return project_dir = job_dir.parent.parent if not Path(extract_params.output_file).parent.exists(): @@ -399,7 +380,7 @@ def extract(self, rw, header: dict, message: dict): "stderr": "", "results": {"box_size": box_len}, } - rw.send_to("node_creator", node_creator_parameters) + self._transport.send("node_creator", node_creator_parameters) # Register the files needed for selection and batching self.log.info("Sending to particle selection") @@ -409,7 +390,7 @@ def extract(self, rw, header: dict, message: dict): "image_size": box_len, "relion_options": dict(extract_params.relion_options), } - rw.send_to("select_particles", select_params) + self._transport.send("select_particles", select_params) self.log.info(f"Done {self.job_type} for {extract_params.coord_list_file}.") - rw.transport.ack(header) + self._transport.ack(header) diff --git a/src/cryoemservices/services/ispyb_connector.py b/src/cryoemservices/services/ispyb_connector.py index a1354813..3c734afb 100644 --- a/src/cryoemservices/services/ispyb_connector.py +++ b/src/cryoemservices/services/ispyb_connector.py @@ -142,8 +142,7 @@ def parameters(parameter): ) if result and result.get("success"): - rw.set_default_channel("output") - rw.send({"result": result.get("return_value")}) + rw.send_to("output", {"result": result.get("return_value")}) rw.transport.ack(header) elif result and result.get("checkpoint"): rw.checkpoint(result.get("checkpoint_dict")) diff --git a/src/cryoemservices/services/process_recipe.py b/src/cryoemservices/services/process_recipe.py index e4737e1f..c2ced547 100644 --- a/src/cryoemservices/services/process_recipe.py +++ b/src/cryoemservices/services/process_recipe.py @@ -4,11 +4,9 @@ from importlib.metadata import entry_points from pathlib import Path -import workflows.recipe -from workflows import Error as WorkflowsError - from cryoemservices.services.common_service import CommonService from cryoemservices.util.config import ServiceConfig, config_from_file +from cryoemservices.util.recipe import Recipe, RecipeWrapper, wrap_subscribe def filter_load_recipes_from_files( @@ -20,12 +18,8 @@ def filter_load_recipes_from_files( if not recipe_location.is_file(): raise ValueError(f"Cannot find recipe in location {recipe_location}") with open(recipe_location, "r") as rcp: - named_recipe = workflows.recipe.Recipe(recipe=rcp.read()) - try: - named_recipe.validate() - except WorkflowsError as e: - raise ValueError(f"Named recipe {recipefile} failed validation. {e}") - message["recipe"] = message["recipe"].merge(named_recipe) + named_recipe = Recipe(recipe=rcp.read()) + message["recipe"] = named_recipe return message, parameters @@ -62,12 +56,11 @@ def initializing(self): "apply_parameters": filter_apply_parameters, } - workflows.recipe.wrap_subscribe( + wrap_subscribe( self._transport, self._environment["queue"] or "processing_recipe", self.process, acknowledgement=True, - allow_non_recipe_messages=True, ) def process(self, rw, header, message): @@ -88,7 +81,7 @@ def process(self, rw, header, message): parameters["guid"] = recipe_id # Add an empty recipe to the message - message["recipe"] = workflows.recipe.Recipe() + message["recipe"] = Recipe() # Apply all specified filters in order to message and parameters for name, f in self.message_filters.items(): @@ -106,16 +99,11 @@ def process(self, rw, header, message): self.log.info(f"Filtered processing request: {str(message)}") self.log.info(f"Filtered parameters: {str(parameters)}") - # Conditionally acknowledge receipt of the message - txn = self._transport.transaction_begin(subscription_id=header["subscription"]) - self._transport.ack(header, transaction=txn) - - rw = workflows.recipe.RecipeWrapper( - recipe=message["recipe"], transport=self._transport - ) + # Start the recipe wrapper + rw = RecipeWrapper(recipe=message["recipe"], transport=self._transport) rw.environment = {"ID": recipe_id} - rw.start(transaction=txn) + rw.start() - # Commit transaction - self._transport.transaction_commit(txn) + # Acknowledge success + self._transport.ack(header) self.log.info("Processed incoming message") diff --git a/src/cryoemservices/util/recipe.py b/src/cryoemservices/util/recipe.py new file mode 100644 index 00000000..0a37f201 --- /dev/null +++ b/src/cryoemservices/util/recipe.py @@ -0,0 +1,240 @@ +from __future__ import annotations + +import functools +import json +import logging +import string +from typing import Any + +logger = logging.getLogger("cryoemservices.util.recipe") + + +def wrap_subscribe( + transport_layer, + channel, + callback, + acknowledgement=True, +): + """Internal method to create an intercepting function for incoming messages + to interpret recipes. This function is then used to subscribe to a channel + on the transport layer. + """ + + @functools.wraps(callback) + def unwrap_recipe(header, message): + """This is a helper function unpacking incoming messages when they are + in a recipe format. Other messages are passed through unmodified. + :param header: A dictionary of message headers. If the header contains + an entry 'workflows-recipe' then the message is parsed + and the embedded recipe information is passed on in a + RecipeWrapper object to the target function. + :param message: Incoming deserialized message object. + """ + if header.get("workflows-recipe") is True: + rw = RecipeWrapper(message=message, transport=transport_layer) + return callback(rw, header, message.get("payload")) + return callback(None, header, message) + + return transport_layer.subscribe( + channel, unwrap_recipe, acknowledgement=acknowledgement + ) + + +class Recipe: + """Object containing a processing recipe that can be passed to services. + A recipe describes how all involved services are connected together, how + data should be passed and how errors should be handled.""" + + recipe: dict[Any, Any] = {} + """The processing recipe is encoded in this dictionary.""" + + def __init__(self, recipe=None): + """Constructor allows passing in a recipe dictionary.""" + if isinstance(recipe, str): + self.recipe = json.loads(recipe) + elif recipe: + self.recipe = recipe + if self.recipe: + self.validate() + + def __getitem__(self, item): + """Allow direct dictionary access to recipe elements.""" + return self.recipe.__getitem__(item) + + def validate(self): + """Check whether the encoded recipe is valid""" + if not self.recipe: + raise Exception("Invalid recipe: No recipe defined") + + # Make all keys strings + for key in self.recipe.keys(): + if isinstance(key, int): + self.recipe[str(key)] = self.recipe[key] + del self.recipe[key] + + # Without a 'start' node nothing would happen + if not self.recipe.get("start"): + raise Exception('Invalid recipe: "start" node empty or missing') + if not type(self.recipe["start"]) in [int, str]: + raise Exception('Invalid recipe: "start" must be an integer or string') + + touched_nodes = ["start"] + + def follow_recipe(node: str): + touched_nodes.append(node) + if self.recipe[node]["output"]: + if not isinstance(self.recipe[node]["output"], dict): + raise ValueError(f"Invalid output for recipe node {node}") + for k, v in self.recipe[node]["output"].items(): + if str(v) not in touched_nodes: + follow_recipe(str(v)) + + # Test recipe for unreferenced nodes + follow_recipe(str(self.recipe["start"])) + for key in self.recipe.keys(): + if key not in touched_nodes: + raise KeyError(f"Recipe node {key} is not accessed") + + def apply_parameters(self, parameters): + """Recursively apply dictionary entries in 'parameters' to {item}s in recipe + structure, leaving undefined {item}s as they are. + """ + # The python formatter class is used to resolve {item} references + formatter = string.Formatter() + + def _recursive_apply(item): + """Helper function to recursively apply replacements.""" + if isinstance(item, str): + return formatter.vformat(item, (), parameters) + elif isinstance(item, dict): + return { + _recursive_apply(key): _recursive_apply(value) + for key, value in item.items() + } + elif isinstance(item, list): + return [_recursive_apply(x) for x in item] + else: + raise TypeError( + f"Cannot format recipe item {item} of type {type(item)}" + ) + + self.recipe = _recursive_apply(self.recipe) + + +class RecipeWrapper: + """A wrapper object which contains a recipe and a number of functions to make + life easier for recipe users. + """ + + def __init__(self, message=None, transport=None, recipe=None, environment=None): + """Create a RecipeWrapper object from a wrapped message. + References to the transport layer are required to send directly to + connected downstream processes. + """ + if not transport: + raise ValueError("Transport object is required") + if message: + self.recipe = Recipe(message["recipe"]) + self.recipe_pointer = int(message["recipe-pointer"]) + self.recipe_step = self.recipe[self.recipe_pointer] + self.recipe_path = message.get("recipe-path", []) + if environment is None: + self.environment = message.get("environment", {}) + else: + self.environment = environment + self.payload = message.get("payload") + elif recipe: + if isinstance(recipe, Recipe): + self.recipe = recipe + else: + self.recipe = Recipe(recipe) + self.recipe_pointer = None + self.recipe_step = None + self.recipe_path = [] + self.environment = environment or {} + self.payload = None + else: + raise ValueError("A message or recipe is required for a RecipeWrapper") + self.transport = transport + + def send_to(self, channel, *args, **kwargs): + """Send messages to another service that is connected to the currently + running service via the recipe. Discard messages if the recipe does + not have anything connected to the specified output channel. + """ + if not self.recipe_step: + raise ValueError("This RecipeWrapper object does not contain a recipe step") + if channel not in self.recipe_step.get("output", []): + raise ValueError( + "The current recipe step does not have an output channel with this name" + ) + + self._send_to_destination(self.recipe_step["output"][channel], *args, **kwargs) + + def start(self, header=None, **kwargs): + """Trigger the start of a recipe, sending the defined payloads to the + recipients set in the recipe. Any parameters to this function are + passed to the transport send/broadcast methods. + If the wrapped recipe has already been started then a ValueError will + be raised. + """ + if self.recipe_step: + raise ValueError("This recipe has already been started.") + for destination, payload in self.recipe["start"]: + self._send_to_destination(destination, header, payload, kwargs) + + def checkpoint(self, message, header=None, **kwargs): + """Send a message to the current recipe destination. This can be used to + keep a state for longer processing tasks. + """ + if not self.recipe_step: + raise ValueError("This RecipeWrapper object does not contain a recipe step") + self._send_to_destination( + self.recipe_pointer, header, message, kwargs, add_path_step=False + ) + + def _generate_full_recipe_message(self, destination, message, add_path_step): + """Factory function to generate independent message objects for + downstream recipients with different destinations.""" + if add_path_step and self.recipe_pointer: + recipe_path = self.recipe_path + [self.recipe_pointer] + else: + recipe_path = self.recipe_path + + return { + "environment": self.environment, + "payload": message, + "recipe": self.recipe.recipe, + "recipe-path": recipe_path, + "recipe-pointer": destination, + } + + def _send_to_destination( + self, + destination, + header, + payload, + transport_kwargs, + add_path_step=True, + ): + """Helper function to send a message to a specific recipe destination.""" + if header: + header = header.copy() + header["workflows-recipe"] = True + else: + header = {"workflows-recipe": True} + + dest_kwargs = transport_kwargs.copy() + if "exchange" in self.recipe[destination]: + dest_kwargs.setdefault("exchange", self.recipe[destination]["exchange"]) + + if self.recipe[destination].get("queue"): + message = self._generate_full_recipe_message( + destination, payload, add_path_step + ) + self.transport.send( + self.recipe[destination]["queue"], + message, + headers=header, + **dest_kwargs, + )