Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion recipes/ispyb/em-spa-refine-wrapper.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
"original_pixel_size": "{pixel_size}",
"refine_class_nr": "{class_number}",
"refine_job_dir": "{refine_job_dir}",
"relion_options": {}
"relion_options": {},
"submit_to_slurm": "True"
},
"queue": "extract_class",
"service": "ExtractClass"
Expand Down
12 changes: 9 additions & 3 deletions src/cryoemservices/services/class2d.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class Class2D(CommonService):
def initializing(self):
"""Subscribe to a queue. Received messages must be acknowledged."""
self.log.info("Class2D service starting")
wrap_subscribe(
self.subscription_id = wrap_subscribe(
self._transport,
self._environment["queue"] or "class2d",
self.class2d,
Expand All @@ -34,7 +34,7 @@ def class2d(self, rw, header: dict, message: dict):
if not isinstance(message, dict):
self.log.error("Rejected invalid simple message")
self._transport.nack(header)
return
return False

# Create a wrapper-like object that can be passed to functions
# as if a recipe wrapper was present.
Expand All @@ -58,7 +58,7 @@ def class2d(self, rw, header: dict, message: dict):
f"with exception: {e}"
)
rw.transport.nack(header)
return
return False

# In this setup we cannot nack messages on failure, so instead check here
if message.get("requeue", 0) >= 5:
Expand All @@ -71,6 +71,8 @@ def class2d(self, rw, header: dict, message: dict):
f"Running disconnected Class2D job for {class2d_params.particles_file}"
)
rw.transport.ack(header)
rw.transport.unsubscribe(self.subscription_id)
rw.transport.drop_callback_reference(self.subscription_id)

# Run the class2d job
try:
Expand All @@ -88,3 +90,7 @@ def class2d(self, rw, header: dict, message: dict):
# Send back to the queue but mark a failure in the message
message["requeue"] = message.get("requeue", 0) + 1
rw.send_to("class2d", message)

# Reconnect to rabbitmq
self.initializing()
return True
12 changes: 9 additions & 3 deletions src/cryoemservices/services/class3d.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class Class3D(CommonService):
def initializing(self):
"""Subscribe to a queue. Received messages must be acknowledged."""
self.log.info("Class3D service starting")
wrap_subscribe(
self.subscription_id = wrap_subscribe(
self._transport,
self._environment["queue"] or "class3d",
self.class3d,
Expand All @@ -34,7 +34,7 @@ def class3d(self, rw, header: dict, message: dict):
if not isinstance(message, dict):
self.log.error("Rejected invalid simple message")
self._transport.nack(header)
return
return False

# Create a wrapper-like object that can be passed to functions
# as if a recipe wrapper was present.
Expand All @@ -58,7 +58,7 @@ def class3d(self, rw, header: dict, message: dict):
f"with exception: {e}"
)
rw.transport.nack(header)
return
return False

# In this setup we cannot nack messages on failure, so instead check here
if message.get("requeue", 0) >= 5:
Expand All @@ -71,6 +71,8 @@ def class3d(self, rw, header: dict, message: dict):
f"Running disconnected Class3D job for {class3d_params.particles_file}"
)
rw.transport.ack(header)
rw.transport.unsubscribe(self.subscription_id)
rw.transport.drop_callback_reference(self.subscription_id)

# Run the class3d job
try:
Expand All @@ -88,3 +90,7 @@ def class3d(self, rw, header: dict, message: dict):
# Send back to the queue but mark a failure in the message
message["requeue"] = message.get("requeue", 0) + 1
rw.send_to("class3d", message)

# Reconnect to rabbitmq
self.initializing()
return True
1 change: 1 addition & 0 deletions src/cryoemservices/services/common_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def __init__(
self.log = logging.getLogger(self._logger_name)
self.log.setLevel(logging.INFO)
self.single_message_mode: bool = single_message_mode
self.subscription_id: int = 0

def _transport_interceptor(self, callback):
"""Takes a callback function and adds headers and messages"""
Expand Down
12 changes: 12 additions & 0 deletions src/cryoemservices/services/cryolo.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,18 @@ def cryolo(self, rw, header: dict, message: dict):
rw.transport.ack(header)
return

# Rename any flattened files
if scaled_input_path != cryolo_params.input_path:
(job_dir / f"CBOX/{Path(cryolo_params.output_path).stem}_flat.cbox").rename(
job_dir / f"CBOX/{Path(cryolo_params.output_path).stem}.cbox"
)
(job_dir / f"STAR/{Path(cryolo_params.output_path).stem}_flat.star").rename(
job_dir / f"STAR/{Path(cryolo_params.output_path).stem}.star"
)
(job_dir / f"EMAN/{Path(cryolo_params.output_path).stem}_flat.box").rename(
job_dir / f"EMAN/{Path(cryolo_params.output_path).stem}.box"
)

# Read in the cbox file for particle selection and finding sizes
try:
cbox_file = cif.read_file(
Expand Down
31 changes: 18 additions & 13 deletions src/cryoemservices/services/extract_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import math
import os
import re
import subprocess
from pathlib import Path

from pydantic import BaseModel, Field, ValidationError
Expand Down Expand Up @@ -30,6 +31,7 @@ class ExtractClassParameters(BaseModel):
downscale: bool = True
normalise: bool = True
invert_contrast: bool = True
submit_to_slurm: bool = False
relion_options: RelionServiceOptions


Expand Down Expand Up @@ -254,19 +256,22 @@ def extract_class(self, rw, header: dict, message: dict):
if extract_params.downscale:
command.append("--downscale")

result = slurm_submission_for_services(
log=self.log,
service_config_file=self._environment["config"],
slurm_cluster=self._environment["slurm_cluster"],
job_name="ReExtract",
command=command,
project_dir=extract_job_dir,
output_file=extract_job_dir / "slurm_run",
cpus=40,
use_gpu=False,
use_singularity=False,
script_extras="module load EM/cryoem-services",
)
if extract_params.submit_to_slurm:
result = slurm_submission_for_services(
log=self.log,
service_config_file=self._environment["config"],
slurm_cluster=self._environment["slurm_cluster"],
job_name="ReExtract",
command=command,
project_dir=extract_job_dir,
output_file=extract_job_dir / "slurm_run",
cpus=40,
use_gpu=False,
use_singularity=False,
script_extras="module load EM/cryoem-services",
)
else:
result = subprocess.run(command, capture_output=True)

# Register the Re-extraction job with the node creator
self.log.info(f"Sending {self.extract_job_type} to node creator")
Expand Down
6 changes: 3 additions & 3 deletions src/cryoemservices/services/motioncorr.py
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ def motion_correction(self, rw, header: dict, message: dict):
)
if mc_params.do_icebreaker_jobs and not icebreaker_output.is_file():
# Three IceBreaker jobs: CtfFind job is MC+4
ctf_job_number = 6
ctf_job_number = job_number + 4

# Both IceBreaker micrographs and flattening inherit from motioncorr
self.log.info(
Expand Down Expand Up @@ -624,10 +624,10 @@ def motion_correction(self, rw, header: dict, message: dict):
ctf_job_number = job_number + 4
else:
# No IceBreaker jobs: CtfFind job is MC+1
ctf_job_number = 3
ctf_job_number = job_number + 1
else:
# Tomography: CtfFind job is MC+1
ctf_job_number = 3
ctf_job_number = job_number + 1

# Forward results to ctffind (in both SPA and tomography)
self.log.info(f"Sending to ctf: {mc_params.mrc_out}")
Expand Down
12 changes: 9 additions & 3 deletions src/cryoemservices/services/refine3d.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class Refine3D(CommonService):
def initializing(self):
"""Subscribe to a queue. Received messages must be acknowledged."""
self.log.info("Refine3D service starting")
wrap_subscribe(
self.subscription_id = wrap_subscribe(
self._transport,
self._environment["queue"] or "refine3d",
self.refine3d,
Expand All @@ -34,7 +34,7 @@ def refine3d(self, rw, header: dict, message: dict):
if not isinstance(message, dict):
self.log.error("Rejected invalid simple message")
self._transport.nack(header)
return
return False

# Create a wrapper-like object that can be passed to functions
# as if a recipe wrapper was present.
Expand All @@ -58,7 +58,7 @@ def refine3d(self, rw, header: dict, message: dict):
f"with exception: {e}"
)
rw.transport.nack(header)
return
return False

# In this setup we cannot nack messages on failure, so instead check here
if message.get("requeue", 0) >= 5:
Expand All @@ -71,6 +71,8 @@ def refine3d(self, rw, header: dict, message: dict):
f"Running disconnected Refine3D job for {refine_params.particles_file}"
)
rw.transport.ack(header)
rw.transport.unsubscribe(self.subscription_id)
rw.transport.drop_callback_reference(self.subscription_id)

# Run the refinement job
try:
Expand All @@ -90,3 +92,7 @@ def refine3d(self, rw, header: dict, message: dict):
# Send back to the queue but mark a failure in the message
message["requeue"] = message.get("requeue", 0) + 1
rw.send_to("refine3d", message)

# Reconnect to rabbitmq
self.initializing()
return True
Loading
Loading