From 0087b4db37a05c3e69cb15b2ba47091a2cb916ce Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Tue, 3 Jun 2025 09:55:41 +0100 Subject: [PATCH 1/6] Some simplifications --- .../services/cluster_submission.py | 32 +++++++++++-------- .../services/ispyb_connector.py | 3 +- src/cryoemservices/services/process_recipe.py | 11 +++---- 3 files changed, 23 insertions(+), 23 deletions(-) diff --git a/src/cryoemservices/services/cluster_submission.py b/src/cryoemservices/services/cluster_submission.py index 64c65725..cb2a7d52 100644 --- a/src/cryoemservices/services/cluster_submission.py +++ b/src/cryoemservices/services/cluster_submission.py @@ -7,6 +7,7 @@ from cryoemservices.services.common_service import CommonService from cryoemservices.util.config import config_from_file +from cryoemservices.util.models import MockRW from cryoemservices.util.slurm_submission import ( JobSubmissionParameters, submit_to_slurm, @@ -30,8 +31,19 @@ def initializing(self): acknowledgement=True, ) - def run_submit_job(self, rw, header, message): + def run_submit_job(self, rw, header: dict, message: dict): """Submit cluster job according to message.""" + if not rw: + self.log.info("Received a simple message") + if not isinstance(message, dict): + self.log.error("Rejected invalid simple message") + self._transport.nack(header) + return + + # Create a wrapper-like object that can be passed to functions + # as if a recipe wrapper was present. + rw = MockRW(self._transport) + rw.recipe_step = {"parameters": message} parameters = rw.recipe_step["parameters"] if type(parameters.get("cluster", {}).get("commands")) is list: @@ -72,7 +84,7 @@ def run_submit_job(self, rw, header, message): self.log.error( "No absolute working directory specified. Will not run cluster job" ) - self._transport.nack(header) + rw.transport.nack(header) return working_directory = Path(parameters["workingdir"]) try: @@ -81,7 +93,7 @@ def run_submit_job(self, rw, header, message): self.log.error( "Could not create working directory: %s", str(e), exc_info=True ) - self._transport.nack(header) + rw.transport.nack(header) return if parameters.get("standard_output"): @@ -105,17 +117,9 @@ def run_submit_job(self, rw, header, message): ) if not jobnumber: self.log.error("Job was not submitted") - self._transport.nack(header) + rw.transport.nack(header) return - # Conditionally acknowledge receipt of the message - txn = self._transport.transaction_begin(subscription_id=header["subscription"]) - self._transport.ack(header, transaction=txn) - - # Send results onwards - rw.set_default_channel("job_submitted") - rw.send({"jobid": jobnumber}, transaction=txn) - - # Commit transaction - self._transport.transaction_commit(txn) + # Acknowledge success + rw.transport.ack(header) self.log.info(f"Submitted job {jobnumber} to slurm") diff --git a/src/cryoemservices/services/ispyb_connector.py b/src/cryoemservices/services/ispyb_connector.py index a1354813..3c734afb 100644 --- a/src/cryoemservices/services/ispyb_connector.py +++ b/src/cryoemservices/services/ispyb_connector.py @@ -142,8 +142,7 @@ def parameters(parameter): ) if result and result.get("success"): - rw.set_default_channel("output") - rw.send({"result": result.get("return_value")}) + rw.send_to("output", {"result": result.get("return_value")}) rw.transport.ack(header) elif result and result.get("checkpoint"): rw.checkpoint(result.get("checkpoint_dict")) diff --git a/src/cryoemservices/services/process_recipe.py b/src/cryoemservices/services/process_recipe.py index e4737e1f..e45f8d93 100644 --- a/src/cryoemservices/services/process_recipe.py +++ b/src/cryoemservices/services/process_recipe.py @@ -106,16 +106,13 @@ def process(self, rw, header, message): self.log.info(f"Filtered processing request: {str(message)}") self.log.info(f"Filtered parameters: {str(parameters)}") - # Conditionally acknowledge receipt of the message - txn = self._transport.transaction_begin(subscription_id=header["subscription"]) - self._transport.ack(header, transaction=txn) - + # Start the recipe wrapper rw = workflows.recipe.RecipeWrapper( recipe=message["recipe"], transport=self._transport ) rw.environment = {"ID": recipe_id} - rw.start(transaction=txn) + rw.start() - # Commit transaction - self._transport.transaction_commit(txn) + # Acknowledge success + self._transport.ack(header) self.log.info("Processed incoming message") From 95df3516c27273a194ece4c49197061403bd5704 Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Wed, 4 Jun 2025 09:33:25 +0100 Subject: [PATCH 2/6] Some attempt to bring in recipe components --- src/cryoemservices/services/process_recipe.py | 14 +- src/cryoemservices/util/recipe.py | 343 ++++++++++++++++++ 2 files changed, 349 insertions(+), 8 deletions(-) create mode 100644 src/cryoemservices/util/recipe.py diff --git a/src/cryoemservices/services/process_recipe.py b/src/cryoemservices/services/process_recipe.py index e45f8d93..b8220eb7 100644 --- a/src/cryoemservices/services/process_recipe.py +++ b/src/cryoemservices/services/process_recipe.py @@ -4,11 +4,11 @@ from importlib.metadata import entry_points from pathlib import Path -import workflows.recipe from workflows import Error as WorkflowsError from cryoemservices.services.common_service import CommonService from cryoemservices.util.config import ServiceConfig, config_from_file +from cryoemservices.util.recipe import Recipe, RecipeWrapper, wrap_subscribe def filter_load_recipes_from_files( @@ -20,12 +20,12 @@ def filter_load_recipes_from_files( if not recipe_location.is_file(): raise ValueError(f"Cannot find recipe in location {recipe_location}") with open(recipe_location, "r") as rcp: - named_recipe = workflows.recipe.Recipe(recipe=rcp.read()) + named_recipe = Recipe(recipe=rcp.read()) try: named_recipe.validate() except WorkflowsError as e: raise ValueError(f"Named recipe {recipefile} failed validation. {e}") - message["recipe"] = message["recipe"].merge(named_recipe) + message["recipe"] = named_recipe return message, parameters @@ -62,7 +62,7 @@ def initializing(self): "apply_parameters": filter_apply_parameters, } - workflows.recipe.wrap_subscribe( + wrap_subscribe( self._transport, self._environment["queue"] or "processing_recipe", self.process, @@ -88,7 +88,7 @@ def process(self, rw, header, message): parameters["guid"] = recipe_id # Add an empty recipe to the message - message["recipe"] = workflows.recipe.Recipe() + message["recipe"] = Recipe() # Apply all specified filters in order to message and parameters for name, f in self.message_filters.items(): @@ -107,9 +107,7 @@ def process(self, rw, header, message): self.log.info(f"Filtered parameters: {str(parameters)}") # Start the recipe wrapper - rw = workflows.recipe.RecipeWrapper( - recipe=message["recipe"], transport=self._transport - ) + rw = RecipeWrapper(recipe=message["recipe"], transport=self._transport) rw.environment = {"ID": recipe_id} rw.start() diff --git a/src/cryoemservices/util/recipe.py b/src/cryoemservices/util/recipe.py new file mode 100644 index 00000000..43c49eb0 --- /dev/null +++ b/src/cryoemservices/util/recipe.py @@ -0,0 +1,343 @@ +from __future__ import annotations + +import copy +import functools +import json +import logging +import string +from typing import Any + +basestring = (str, bytes) + +logger = logging.getLogger("cryoemservices.util.recipe") + + +def wrap_subscribe( + transport_layer, + channel, + callback, + acknowledgement=True, + allow_non_recipe_messages=True, +): + """Internal method to create an intercepting function for incoming messages + to interpret recipes. This function is then used to subscribe to a channel + on the transport layer. + :param transport_layer: Reference to underlying transport object. + :param subscription_call: Reference to the subscribing function of the + transport layer. + :param channel: Channel name to subscribe to. + :param callback: Real function to be called when messages are received. + The callback will pass three arguments, + a RecipeWrapper object (details below), the header as + a dictionary structure, and the message. + + :param allow_non_recipe_messages: Pass on incoming messages that do not + include recipe information. In this case the first + argument to the callback function will be 'None'. + :param log_extender: If the recipe contains useful contextual information + for log messages, such as a unique ID which can be used + to connect all messages originating from the same + recipe, then the information will be passed to this + function, which must be a context manager factory. + :return: Return value of call to subscription_call. + """ + + @functools.wraps(callback) + def unwrap_recipe(header, message): + """This is a helper function unpacking incoming messages when they are + in a recipe format. Other messages are passed through unmodified. + :param header: A dictionary of message headers. If the header contains + an entry 'workflows-recipe' then the message is parsed + and the embedded recipe information is passed on in a + RecipeWrapper object to the target function. + :param message: Incoming deserialized message object. + """ + if header.get("workflows-recipe") in {True, "True", "true", 1}: + rw = RecipeWrapper(message=message, transport=transport_layer) + return callback(rw, header, message.get("payload")) + if allow_non_recipe_messages: + return callback(None, header, message) + logger.error( + "The input to this service is not a wrapped recipe. " + "Unable to process incoming message." + ) + + return transport_layer.subscribe( + channel, unwrap_recipe, acknowledgement=acknowledgement + ) + + +class Recipe: + """Object containing a processing recipe that can be passed to services. + A recipe describes how all involved services are connected together, how + data should be passed and how errors should be handled.""" + + recipe: dict[Any, Any] = {} + """The processing recipe is encoded in this dictionary.""" + + def __init__(self, recipe=None): + """Constructor allows passing in a recipe dictionary.""" + if isinstance(recipe, basestring): + self.recipe = self._sanitize(json.loads(recipe)) + elif recipe: + self.recipe = self._sanitize(recipe) + + @staticmethod + def _sanitize(recipe): + """Clean up a recipe that may have been stored as serialized json string. + Convert any numerical pointers that are stored as strings to integers.""" + recipe = recipe.copy() + for k in list(recipe): + if k not in ("start", "error") and int(k) and k != int(k): + recipe[int(k)] = recipe[k] + del recipe[k] + for k in list(recipe): + if "output" in recipe[k] and not isinstance( + recipe[k]["output"], (list, dict) + ): + recipe[k]["output"] = [recipe[k]["output"]] + # dicts should be normalized, too + if "start" in recipe: + recipe["start"] = [tuple(x) for x in recipe["start"]] + return recipe + + def __getitem__(self, item): + """Allow direct dictionary access to recipe elements.""" + return self.recipe.__getitem__(item) + + def __contains__(self, item): + """Testing for presence of recipe elements.""" + return item in self.recipe + + def validate(self): + """Check whether the encoded recipe is valid. It must describe a directed + acyclical graph, all connections must be defined, etc.""" + if not self.recipe: + raise Exception("Invalid recipe: No recipe defined") + + # Without a 'start' node nothing would happen + if not self.recipe.get("start"): + raise Exception('Invalid recipe: "start" node empty or missing') + if not type(self.recipe["start"]) in [int, str]: + raise Exception('Invalid recipe: "start" must be an integer or string') + + touched_nodes = ["start"] + + def follow_recipe(node: str): + touched_nodes.append(node) + if self.recipe[node]["output"]: + if not isinstance(self.recipe[node]["output"], dict): + raise ValueError(f"Invalid output for recipe node {node}") + for k, v in self.recipe[node]["output"].items(): + if str(v) not in touched_nodes: + follow_recipe(str(v)) + + # Test recipe for unreferenced nodes + follow_recipe(str(self.recipe["start"])) + for node in self.recipe: + if node not in touched_nodes: + raise KeyError(f"Recipe node {node} is not accessed") + + def apply_parameters(self, parameters): + """Recursively apply dictionary entries in 'parameters' to {item}s in recipe + structure, leaving undefined {item}s as they are. A special case is a + {$REPLACE:item}, which replaces the string with a copy of the referenced + parameter item. + + Examples: + + parameters = { 'x':'5' } + apply_parameters( { '{x}': '{y}' }, parameters ) + => { '5': '{y}' } + + parameters = { 'y':'5' } + apply_parameters( { '{x}': '{y}' }, parameters ) + => { '{x}': '5' } + + parameters = { 'x':'3', 'y':'5' } + apply_parameters( { '{x}': '{y}' }, parameters ) + => { '3': '5' } + + parameters = { 'l': [ 1, 2 ] } + apply_parameters( { 'x': '{$REPLACE:l}' }, parameters ) + => { 'x': [ 1, 2 ] } + """ + + class SafeString: + def __init__(self, s): + self.string = s + + def __repr__(self): + return "{" + self.string + "}" + + def __str__(self): + return "{" + self.string + "}" + + def __getitem__(self, item): + return SafeString(self.string + "[" + item + "]") + + class SafeDict(dict): + """A dictionary that returns undefined keys as {keyname}. + This can be used to selectively replace variables in datastructures.""" + + def __missing__(self, key): + return SafeString(key) + + # By default the python formatter class is used to resolve {item} references + formatter = string.Formatter() + + # Special format strings "{$REPLACE:(...)}" use this data structure + # formatter to return the referenced data structure rather than a formatted + # string. + ds_formatter = string.Formatter() + + def ds_format_field(value, spec): + ds_format_field.last = value + return "" + + ds_formatter.format_field = ds_format_field + + params = SafeDict(parameters) + + def _recursive_apply(item): + """Helper function to recursively apply replacements.""" + if isinstance(item, basestring): + if item.startswith("{$REPLACE") and item.endswith("}"): + try: + ds_formatter.vformat("{" + item[10:-1] + "}", (), parameters) + except KeyError: + return None + return copy.deepcopy(ds_formatter.format_field.last) + else: + return formatter.vformat(item, (), params) + if isinstance(item, dict): + return { + _recursive_apply(key): _recursive_apply(value) + for key, value in item.items() + } + if isinstance(item, tuple): + return tuple(_recursive_apply(list(item))) + if isinstance(item, list): + return [_recursive_apply(x) for x in item] + return item + + self.recipe = _recursive_apply(self.recipe) + + +class RecipeWrapper: + """A wrapper object which contains a recipe and a number of functions to make + life easier for recipe users. + """ + + def __init__(self, message=None, transport=None, recipe=None, environment=None): + """Create a RecipeWrapper object from a wrapped message. + References to the transport layer are required to send directly to + connected downstream processes. + """ + if not transport: + raise ValueError("Transport object is required") + if message: + self.recipe = Recipe(message["recipe"]) + self.recipe_pointer = int(message["recipe-pointer"]) + self.recipe_step = self.recipe[self.recipe_pointer] + self.recipe_path = message.get("recipe-path", []) + if environment is None: + self.environment = message.get("environment", {}) + else: + self.environment = environment + self.payload = message.get("payload") + elif recipe: + if isinstance(recipe, Recipe): + self.recipe = recipe + else: + self.recipe = Recipe(recipe) + self.recipe_pointer = None + self.recipe_step = None + self.recipe_path = [] + self.environment = environment or {} + self.payload = None + else: + raise ValueError("A message or recipe is required for a RecipeWrapper") + self.transport = transport + + def send_to(self, channel, *args, **kwargs): + """Send messages to another service that is connected to the currently + running service via the recipe. Discard messages if the recipe does + not have anything connected to the specified output channel. + """ + if not self.recipe_step: + raise ValueError("This RecipeWrapper object does not contain a recipe step") + if channel not in self.recipe_step.get("output", []): + raise ValueError( + "The current recipe step does not have an output channel with this name" + ) + + self._send_to_destination(self.recipe_step["output"][channel], *args, **kwargs) + + def start(self, header=None, **kwargs): + """Trigger the start of a recipe, sending the defined payloads to the + recipients set in the recipe. Any parameters to this function are + passed to the transport send/broadcast methods. + If the wrapped recipe has already been started then a ValueError will + be raised. + """ + if self.recipe_step: + raise ValueError("This recipe has already been started.") + for destination, payload in self.recipe["start"]: + self._send_to_destination(destination, header, payload, kwargs) + + def checkpoint(self, message, header=None, **kwargs): + """Send a message to the current recipe destination. This can be used to + keep a state for longer processing tasks. + """ + if not self.recipe_step: + raise ValueError("This RecipeWrapper object does not contain a recipe step") + self._send_to_destination( + self.recipe_pointer, header, message, kwargs, add_path_step=False + ) + + def _generate_full_recipe_message(self, destination, message, add_path_step): + """Factory function to generate independent message objects for + downstream recipients with different destinations.""" + if add_path_step and self.recipe_pointer: + recipe_path = self.recipe_path + [self.recipe_pointer] + else: + recipe_path = self.recipe_path + + return { + "environment": self.environment, + "payload": message, + "recipe": self.recipe.recipe, + "recipe-path": recipe_path, + "recipe-pointer": destination, + } + + def _send_to_destination( + self, + destination, + header, + payload, + transport_kwargs, + add_path_step=True, + ): + """Helper function to send a message to a specific recipe destination.""" + if header: + header = header.copy() + header["workflows-recipe"] = True + else: + header = {"workflows-recipe": True} + + dest_kwargs = transport_kwargs.copy() + if "exchange" in self.recipe[destination]: + dest_kwargs.setdefault("exchange", self.recipe[destination]["exchange"]) + + if self.recipe[destination].get("queue"): + message = self._generate_full_recipe_message( + destination, payload, add_path_step + ) + self.transport.send( + self.recipe[destination]["queue"], + message, + headers=header, + **dest_kwargs, + ) From d6d4fb02c798230410acf3a158e86414a0e2c614 Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Wed, 4 Jun 2025 10:05:54 +0100 Subject: [PATCH 3/6] Better recipe validation --- src/cryoemservices/services/process_recipe.py | 6 -- src/cryoemservices/util/recipe.py | 56 ++++--------------- 2 files changed, 11 insertions(+), 51 deletions(-) diff --git a/src/cryoemservices/services/process_recipe.py b/src/cryoemservices/services/process_recipe.py index b8220eb7..06af22b9 100644 --- a/src/cryoemservices/services/process_recipe.py +++ b/src/cryoemservices/services/process_recipe.py @@ -4,8 +4,6 @@ from importlib.metadata import entry_points from pathlib import Path -from workflows import Error as WorkflowsError - from cryoemservices.services.common_service import CommonService from cryoemservices.util.config import ServiceConfig, config_from_file from cryoemservices.util.recipe import Recipe, RecipeWrapper, wrap_subscribe @@ -21,10 +19,6 @@ def filter_load_recipes_from_files( raise ValueError(f"Cannot find recipe in location {recipe_location}") with open(recipe_location, "r") as rcp: named_recipe = Recipe(recipe=rcp.read()) - try: - named_recipe.validate() - except WorkflowsError as e: - raise ValueError(f"Named recipe {recipefile} failed validation. {e}") message["recipe"] = named_recipe return message, parameters diff --git a/src/cryoemservices/util/recipe.py b/src/cryoemservices/util/recipe.py index 43c49eb0..5e365792 100644 --- a/src/cryoemservices/util/recipe.py +++ b/src/cryoemservices/util/recipe.py @@ -22,24 +22,6 @@ def wrap_subscribe( """Internal method to create an intercepting function for incoming messages to interpret recipes. This function is then used to subscribe to a channel on the transport layer. - :param transport_layer: Reference to underlying transport object. - :param subscription_call: Reference to the subscribing function of the - transport layer. - :param channel: Channel name to subscribe to. - :param callback: Real function to be called when messages are received. - The callback will pass three arguments, - a RecipeWrapper object (details below), the header as - a dictionary structure, and the message. - - :param allow_non_recipe_messages: Pass on incoming messages that do not - include recipe information. In this case the first - argument to the callback function will be 'None'. - :param log_extender: If the recipe contains useful contextual information - for log messages, such as a unique ID which can be used - to connect all messages originating from the same - recipe, then the information will be passed to this - function, which must be a context manager factory. - :return: Return value of call to subscription_call. """ @functools.wraps(callback) @@ -78,43 +60,27 @@ class Recipe: def __init__(self, recipe=None): """Constructor allows passing in a recipe dictionary.""" if isinstance(recipe, basestring): - self.recipe = self._sanitize(json.loads(recipe)) + self.recipe = json.loads(recipe) elif recipe: - self.recipe = self._sanitize(recipe) - - @staticmethod - def _sanitize(recipe): - """Clean up a recipe that may have been stored as serialized json string. - Convert any numerical pointers that are stored as strings to integers.""" - recipe = recipe.copy() - for k in list(recipe): - if k not in ("start", "error") and int(k) and k != int(k): - recipe[int(k)] = recipe[k] - del recipe[k] - for k in list(recipe): - if "output" in recipe[k] and not isinstance( - recipe[k]["output"], (list, dict) - ): - recipe[k]["output"] = [recipe[k]["output"]] - # dicts should be normalized, too - if "start" in recipe: - recipe["start"] = [tuple(x) for x in recipe["start"]] - return recipe + self.recipe = recipe + if self.recipe: + self.validate() def __getitem__(self, item): """Allow direct dictionary access to recipe elements.""" return self.recipe.__getitem__(item) - def __contains__(self, item): - """Testing for presence of recipe elements.""" - return item in self.recipe - def validate(self): - """Check whether the encoded recipe is valid. It must describe a directed - acyclical graph, all connections must be defined, etc.""" + """Check whether the encoded recipe is valid""" if not self.recipe: raise Exception("Invalid recipe: No recipe defined") + # Make all keys strings + for k in self.recipe.keys(): + if isinstance(k, int): + self.recipe[str(k)] = self.recipe[k] + del self.recipe[k] + # Without a 'start' node nothing would happen if not self.recipe.get("start"): raise Exception('Invalid recipe: "start" node empty or missing') From 14ad2c100811b22c807252e972e622007e651476 Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Wed, 4 Jun 2025 10:26:51 +0100 Subject: [PATCH 4/6] Sort out parameter application --- src/cryoemservices/util/recipe.py | 99 ++++++------------------------- 1 file changed, 18 insertions(+), 81 deletions(-) diff --git a/src/cryoemservices/util/recipe.py b/src/cryoemservices/util/recipe.py index 5e365792..c4a71e29 100644 --- a/src/cryoemservices/util/recipe.py +++ b/src/cryoemservices/util/recipe.py @@ -1,14 +1,11 @@ from __future__ import annotations -import copy import functools import json import logging import string from typing import Any -basestring = (str, bytes) - logger = logging.getLogger("cryoemservices.util.recipe") @@ -59,7 +56,7 @@ class Recipe: def __init__(self, recipe=None): """Constructor allows passing in a recipe dictionary.""" - if isinstance(recipe, basestring): + if isinstance(recipe, str): self.recipe = json.loads(recipe) elif recipe: self.recipe = recipe @@ -76,10 +73,10 @@ def validate(self): raise Exception("Invalid recipe: No recipe defined") # Make all keys strings - for k in self.recipe.keys(): - if isinstance(k, int): - self.recipe[str(k)] = self.recipe[k] - del self.recipe[k] + for key in self.recipe.keys(): + if isinstance(key, int): + self.recipe[str(key)] = self.recipe[key] + del self.recipe[key] # Without a 'start' node nothing would happen if not self.recipe.get("start"): @@ -100,92 +97,32 @@ def follow_recipe(node: str): # Test recipe for unreferenced nodes follow_recipe(str(self.recipe["start"])) - for node in self.recipe: - if node not in touched_nodes: - raise KeyError(f"Recipe node {node} is not accessed") + for key in self.recipe.keys(): + if key not in touched_nodes: + raise KeyError(f"Recipe node {key} is not accessed") def apply_parameters(self, parameters): """Recursively apply dictionary entries in 'parameters' to {item}s in recipe - structure, leaving undefined {item}s as they are. A special case is a - {$REPLACE:item}, which replaces the string with a copy of the referenced - parameter item. - - Examples: - - parameters = { 'x':'5' } - apply_parameters( { '{x}': '{y}' }, parameters ) - => { '5': '{y}' } - - parameters = { 'y':'5' } - apply_parameters( { '{x}': '{y}' }, parameters ) - => { '{x}': '5' } - - parameters = { 'x':'3', 'y':'5' } - apply_parameters( { '{x}': '{y}' }, parameters ) - => { '3': '5' } - - parameters = { 'l': [ 1, 2 ] } - apply_parameters( { 'x': '{$REPLACE:l}' }, parameters ) - => { 'x': [ 1, 2 ] } + structure, leaving undefined {item}s as they are. """ - - class SafeString: - def __init__(self, s): - self.string = s - - def __repr__(self): - return "{" + self.string + "}" - - def __str__(self): - return "{" + self.string + "}" - - def __getitem__(self, item): - return SafeString(self.string + "[" + item + "]") - - class SafeDict(dict): - """A dictionary that returns undefined keys as {keyname}. - This can be used to selectively replace variables in datastructures.""" - - def __missing__(self, key): - return SafeString(key) - - # By default the python formatter class is used to resolve {item} references + # The python formatter class is used to resolve {item} references formatter = string.Formatter() - # Special format strings "{$REPLACE:(...)}" use this data structure - # formatter to return the referenced data structure rather than a formatted - # string. - ds_formatter = string.Formatter() - - def ds_format_field(value, spec): - ds_format_field.last = value - return "" - - ds_formatter.format_field = ds_format_field - - params = SafeDict(parameters) - def _recursive_apply(item): """Helper function to recursively apply replacements.""" - if isinstance(item, basestring): - if item.startswith("{$REPLACE") and item.endswith("}"): - try: - ds_formatter.vformat("{" + item[10:-1] + "}", (), parameters) - except KeyError: - return None - return copy.deepcopy(ds_formatter.format_field.last) - else: - return formatter.vformat(item, (), params) - if isinstance(item, dict): + if isinstance(item, str): + return formatter.vformat(item, (), parameters) + elif isinstance(item, dict): return { _recursive_apply(key): _recursive_apply(value) for key, value in item.items() } - if isinstance(item, tuple): - return tuple(_recursive_apply(list(item))) - if isinstance(item, list): + elif isinstance(item, list): return [_recursive_apply(x) for x in item] - return item + else: + raise TypeError( + f"Cannot format recipe item {item} of type {type(item)}" + ) self.recipe = _recursive_apply(self.recipe) From 52766c1af353cc0e587a7f65ce56759fbaa94678 Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Wed, 4 Jun 2025 10:46:43 +0100 Subject: [PATCH 5/6] Always allow non-recipe messages --- src/cryoemservices/services/process_recipe.py | 1 - src/cryoemservices/util/recipe.py | 10 ++-------- 2 files changed, 2 insertions(+), 9 deletions(-) diff --git a/src/cryoemservices/services/process_recipe.py b/src/cryoemservices/services/process_recipe.py index 06af22b9..c2ced547 100644 --- a/src/cryoemservices/services/process_recipe.py +++ b/src/cryoemservices/services/process_recipe.py @@ -61,7 +61,6 @@ def initializing(self): self._environment["queue"] or "processing_recipe", self.process, acknowledgement=True, - allow_non_recipe_messages=True, ) def process(self, rw, header, message): diff --git a/src/cryoemservices/util/recipe.py b/src/cryoemservices/util/recipe.py index c4a71e29..0a37f201 100644 --- a/src/cryoemservices/util/recipe.py +++ b/src/cryoemservices/util/recipe.py @@ -14,7 +14,6 @@ def wrap_subscribe( channel, callback, acknowledgement=True, - allow_non_recipe_messages=True, ): """Internal method to create an intercepting function for incoming messages to interpret recipes. This function is then used to subscribe to a channel @@ -31,15 +30,10 @@ def unwrap_recipe(header, message): RecipeWrapper object to the target function. :param message: Incoming deserialized message object. """ - if header.get("workflows-recipe") in {True, "True", "true", 1}: + if header.get("workflows-recipe") is True: rw = RecipeWrapper(message=message, transport=transport_layer) return callback(rw, header, message.get("payload")) - if allow_non_recipe_messages: - return callback(None, header, message) - logger.error( - "The input to this service is not a wrapped recipe. " - "Unable to process incoming message." - ) + return callback(None, header, message) return transport_layer.subscribe( channel, unwrap_recipe, acknowledgement=acknowledgement From dba17c10990a73f1fb7821dec7d7b2cf90dfc93b Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Thu, 9 Oct 2025 12:06:53 +0100 Subject: [PATCH 6/6] Thoughts on removing the recipes --- src/cryoemservices/services/extract.py | 43 +++++++------------------- 1 file changed, 12 insertions(+), 31 deletions(-) diff --git a/src/cryoemservices/services/extract.py b/src/cryoemservices/services/extract.py index 00ab828b..6824644a 100644 --- a/src/cryoemservices/services/extract.py +++ b/src/cryoemservices/services/extract.py @@ -7,10 +7,8 @@ import numpy as np from gemmi import cif from pydantic import BaseModel, Field, ValidationError -from workflows.recipe import wrap_subscribe from cryoemservices.services.common_service import CommonService -from cryoemservices.util.models import MockRW from cryoemservices.util.relion_service_options import ( RelionServiceOptions, update_relion_options, @@ -55,44 +53,27 @@ class Extract(CommonService): def initializing(self): """Subscribe to a queue. Received messages must be acknowledged.""" self.log.info("Extract service starting") - wrap_subscribe( - self._transport, + self._transport.subscribe( self._environment["queue"] or "extract", self.extract, acknowledgement=True, - allow_non_recipe_messages=True, ) - def extract(self, rw, header: dict, message: dict): + def extract(self, header: dict, message: dict): """Main function which interprets and processes received messages""" - if not rw: - self.log.info("Received a simple message") - if not isinstance(message, dict): - self.log.error("Rejected invalid simple message") - self._transport.nack(header) - return - - # Create a wrapper-like object that can be passed to functions - # as if a recipe wrapper was present. - rw = MockRW(self._transport) - rw.recipe_step = {"parameters": message} + if not isinstance(message, dict): + self.log.error("Rejected invalid non-dictionary message") + self._transport.nack(header) + return try: - if isinstance(message, dict): - extract_params = ExtractParameters( - **{**rw.recipe_step.get("parameters", {}), **message} - ) - else: - extract_params = ExtractParameters( - **{**rw.recipe_step.get("parameters", {})} - ) + extract_params = ExtractParameters(**message) except (ValidationError, TypeError) as e: self.log.warning( f"Extraction parameter validation failed for message: {message} " - f"and recipe parameters: {rw.recipe_step.get('parameters', {})} " f"with exception: {e}" ) - rw.transport.nack(header) + self._transport.nack(header) return self.log.info( @@ -113,7 +94,7 @@ def extract(self, rw, header: dict, message: dict): job_dir = Path(job_dir_search[0]) else: self.log.warning(f"Invalid job directory in {extract_params.output_file}") - rw.transport.nack(header) + self._transport.nack(header) return project_dir = job_dir.parent.parent if not Path(extract_params.output_file).parent.exists(): @@ -399,7 +380,7 @@ def extract(self, rw, header: dict, message: dict): "stderr": "", "results": {"box_size": box_len}, } - rw.send_to("node_creator", node_creator_parameters) + self._transport.send("node_creator", node_creator_parameters) # Register the files needed for selection and batching self.log.info("Sending to particle selection") @@ -409,7 +390,7 @@ def extract(self, rw, header: dict, message: dict): "image_size": box_len, "relion_options": dict(extract_params.relion_options), } - rw.send_to("select_particles", select_params) + self._transport.send("select_particles", select_params) self.log.info(f"Done {self.job_type} for {extract_params.coord_list_file}.") - rw.transport.ack(header) + self._transport.ack(header)