Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 18 additions & 14 deletions src/cryoemservices/services/cluster_submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from cryoemservices.services.common_service import CommonService
from cryoemservices.util.config import config_from_file
from cryoemservices.util.models import MockRW
from cryoemservices.util.slurm_submission import (
JobSubmissionParameters,
submit_to_slurm,
Expand All @@ -30,8 +31,19 @@ def initializing(self):
acknowledgement=True,
)

def run_submit_job(self, rw, header, message):
def run_submit_job(self, rw, header: dict, message: dict):
"""Submit cluster job according to message."""
if not rw:
self.log.info("Received a simple message")
if not isinstance(message, dict):
self.log.error("Rejected invalid simple message")
self._transport.nack(header)
return

# Create a wrapper-like object that can be passed to functions
# as if a recipe wrapper was present.
rw = MockRW(self._transport)
rw.recipe_step = {"parameters": message}

parameters = rw.recipe_step["parameters"]
if type(parameters.get("cluster", {}).get("commands")) is list:
Expand Down Expand Up @@ -72,7 +84,7 @@ def run_submit_job(self, rw, header, message):
self.log.error(
"No absolute working directory specified. Will not run cluster job"
)
self._transport.nack(header)
rw.transport.nack(header)
return
working_directory = Path(parameters["workingdir"])
try:
Expand All @@ -81,7 +93,7 @@ def run_submit_job(self, rw, header, message):
self.log.error(
"Could not create working directory: %s", str(e), exc_info=True
)
self._transport.nack(header)
rw.transport.nack(header)
return

if parameters.get("standard_output"):
Expand All @@ -105,17 +117,9 @@ def run_submit_job(self, rw, header, message):
)
if not jobnumber:
self.log.error("Job was not submitted")
self._transport.nack(header)
rw.transport.nack(header)
return

# Conditionally acknowledge receipt of the message
txn = self._transport.transaction_begin(subscription_id=header["subscription"])
self._transport.ack(header, transaction=txn)

# Send results onwards
rw.set_default_channel("job_submitted")
rw.send({"jobid": jobnumber}, transaction=txn)

# Commit transaction
self._transport.transaction_commit(txn)
# Acknowledge success
rw.transport.ack(header)
self.log.info(f"Submitted job {jobnumber} to slurm")
43 changes: 12 additions & 31 deletions src/cryoemservices/services/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,8 @@
import numpy as np
from gemmi import cif
from pydantic import BaseModel, Field, ValidationError
from workflows.recipe import wrap_subscribe

from cryoemservices.services.common_service import CommonService
from cryoemservices.util.models import MockRW
from cryoemservices.util.relion_service_options import (
RelionServiceOptions,
update_relion_options,
Expand Down Expand Up @@ -55,44 +53,27 @@ class Extract(CommonService):
def initializing(self):
"""Subscribe to a queue. Received messages must be acknowledged."""
self.log.info("Extract service starting")
wrap_subscribe(
self._transport,
self._transport.subscribe(
self._environment["queue"] or "extract",
self.extract,
acknowledgement=True,
allow_non_recipe_messages=True,
)

def extract(self, rw, header: dict, message: dict):
def extract(self, header: dict, message: dict):
"""Main function which interprets and processes received messages"""
if not rw:
self.log.info("Received a simple message")
if not isinstance(message, dict):
self.log.error("Rejected invalid simple message")
self._transport.nack(header)
return

# Create a wrapper-like object that can be passed to functions
# as if a recipe wrapper was present.
rw = MockRW(self._transport)
rw.recipe_step = {"parameters": message}
if not isinstance(message, dict):
self.log.error("Rejected invalid non-dictionary message")
self._transport.nack(header)
return

try:
if isinstance(message, dict):
extract_params = ExtractParameters(
**{**rw.recipe_step.get("parameters", {}), **message}
)
else:
extract_params = ExtractParameters(
**{**rw.recipe_step.get("parameters", {})}
)
extract_params = ExtractParameters(**message)
except (ValidationError, TypeError) as e:
self.log.warning(
f"Extraction parameter validation failed for message: {message} "
f"and recipe parameters: {rw.recipe_step.get('parameters', {})} "
f"with exception: {e}"
)
rw.transport.nack(header)
self._transport.nack(header)
return

self.log.info(
Expand All @@ -113,7 +94,7 @@ def extract(self, rw, header: dict, message: dict):
job_dir = Path(job_dir_search[0])
else:
self.log.warning(f"Invalid job directory in {extract_params.output_file}")
rw.transport.nack(header)
self._transport.nack(header)
return
project_dir = job_dir.parent.parent
if not Path(extract_params.output_file).parent.exists():
Expand Down Expand Up @@ -399,7 +380,7 @@ def extract(self, rw, header: dict, message: dict):
"stderr": "",
"results": {"box_size": box_len},
}
rw.send_to("node_creator", node_creator_parameters)
self._transport.send("node_creator", node_creator_parameters)

# Register the files needed for selection and batching
self.log.info("Sending to particle selection")
Expand All @@ -409,7 +390,7 @@ def extract(self, rw, header: dict, message: dict):
"image_size": box_len,
"relion_options": dict(extract_params.relion_options),
}
rw.send_to("select_particles", select_params)
self._transport.send("select_particles", select_params)

self.log.info(f"Done {self.job_type} for {extract_params.coord_list_file}.")
rw.transport.ack(header)
self._transport.ack(header)
3 changes: 1 addition & 2 deletions src/cryoemservices/services/ispyb_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,7 @@ def parameters(parameter):
)

if result and result.get("success"):
rw.set_default_channel("output")
rw.send({"result": result.get("return_value")})
rw.send_to("output", {"result": result.get("return_value")})
rw.transport.ack(header)
elif result and result.get("checkpoint"):
rw.checkpoint(result.get("checkpoint_dict"))
Expand Down
32 changes: 10 additions & 22 deletions src/cryoemservices/services/process_recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@
from importlib.metadata import entry_points
from pathlib import Path

import workflows.recipe
from workflows import Error as WorkflowsError

from cryoemservices.services.common_service import CommonService
from cryoemservices.util.config import ServiceConfig, config_from_file
from cryoemservices.util.recipe import Recipe, RecipeWrapper, wrap_subscribe


def filter_load_recipes_from_files(
Expand All @@ -20,12 +18,8 @@ def filter_load_recipes_from_files(
if not recipe_location.is_file():
raise ValueError(f"Cannot find recipe in location {recipe_location}")
with open(recipe_location, "r") as rcp:
named_recipe = workflows.recipe.Recipe(recipe=rcp.read())
try:
named_recipe.validate()
except WorkflowsError as e:
raise ValueError(f"Named recipe {recipefile} failed validation. {e}")
message["recipe"] = message["recipe"].merge(named_recipe)
named_recipe = Recipe(recipe=rcp.read())
message["recipe"] = named_recipe
return message, parameters


Expand Down Expand Up @@ -62,12 +56,11 @@ def initializing(self):
"apply_parameters": filter_apply_parameters,
}

workflows.recipe.wrap_subscribe(
wrap_subscribe(
self._transport,
self._environment["queue"] or "processing_recipe",
self.process,
acknowledgement=True,
allow_non_recipe_messages=True,
)

def process(self, rw, header, message):
Expand All @@ -88,7 +81,7 @@ def process(self, rw, header, message):
parameters["guid"] = recipe_id

# Add an empty recipe to the message
message["recipe"] = workflows.recipe.Recipe()
message["recipe"] = Recipe()

# Apply all specified filters in order to message and parameters
for name, f in self.message_filters.items():
Expand All @@ -106,16 +99,11 @@ def process(self, rw, header, message):
self.log.info(f"Filtered processing request: {str(message)}")
self.log.info(f"Filtered parameters: {str(parameters)}")

# Conditionally acknowledge receipt of the message
txn = self._transport.transaction_begin(subscription_id=header["subscription"])
self._transport.ack(header, transaction=txn)

rw = workflows.recipe.RecipeWrapper(
recipe=message["recipe"], transport=self._transport
)
# Start the recipe wrapper
rw = RecipeWrapper(recipe=message["recipe"], transport=self._transport)
rw.environment = {"ID": recipe_id}
rw.start(transaction=txn)
rw.start()

# Commit transaction
self._transport.transaction_commit(txn)
# Acknowledge success
self._transport.ack(header)
self.log.info("Processed incoming message")
Loading
Loading