Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 59 additions & 42 deletions src/dlstbx/services/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ class MultiplexParameters(pydantic.BaseModel):
diffraction_plan_info: Optional[DiffractionPlanInfo] = None
recipe: Optional[str] = None
use_clustering: Optional[List[str]] = None
beamline: str
trigger_every_collection: bool


class Xia2SsxReduceParameters(pydantic.BaseModel):
Expand Down Expand Up @@ -1736,17 +1738,22 @@ def trigger_multiplex(
will be created, and the resulting list of processingJobIds will be sent to
the `processing_recipe` queue.

If clustering algorithm is enabled, skip triggering multiplex if new related dcid
values are added into all defined sample groups to run multiplex only once when
all samples in one of the groups have been collected. When running multiplex
only include results from datasets collected prior to the current one in all
sample groups.
If clustering algorithm is enabled, further commandline options are added.

There are two ways to trigger multiplex controlled by trigger_every_collection.
Multiplex will either be triggered after every dials collection (useful for
beamlines that use multiplex for mid-experiment feedback), or if there are
no subsequent datasets still processing (vmxi). The latter equates to
"once per sample group" if the experiment is rapidly collecting with no
significant delays.

Recipe parameters:
- target: set this to "multiplex"
- beamline: the beamline as a string
- dcid: the dataCollectionId for the given data collection
- comment: a comment to be stored in the ProcessingJob.comment field
- automatic: boolean value passed to ProcessingJob.automatic field
- trigger_every_collection: decide triggering behaviour of multiplex
- ispyb_parameters: a dictionary of ispyb_reprocessing_parameters set in the
parent xia2-dials processing job
- related_dcids: a list of groups of related data collection ids. Each item in
Expand Down Expand Up @@ -1803,53 +1810,63 @@ def trigger_multiplex(
return {"success": True}

self.log.debug(f"related_dcids for dcid={dcid}: {related_dcids}")

# Turn on multiplex clustering

if (
parameters.use_clustering
and parameters.beamline in parameters.use_clustering
):
parameters.recipe = "postprocessing-xia2-multiplex-clustering"

# For beamlines where multiplex is triggered alongside xia2-dials need extra checks
# Check if we have any new data collections added to any sample group
# to decide if we need to processed triggering multiplex.
# Run multiplex only once when processing for all samples in the group have been collected.
if parameters.use_clustering and parameters.program_id:

if parameters.trigger_every_collection:
self.log.info("Triggering xia2.multiplex after every data collection.")

else:
self.log.info("Checking for subsequent dcids that are still processing.")

# Get currnent list of data collections for all samples in the sample groups
_, ispyb_info = dlstbx.ispybtbx.ispyb_filter(
{}, {"ispyb_dcid": dcid}, session
)
ispyb_related_dcids = ispyb_info.get("ispyb_related_dcids", [])
beamline = ispyb_info.get("ispyb_beamline", "")
visit = ispyb_info.get("ispyb_visit", "")
if beamline in parameters.use_clustering or any(
el in visit for el in parameters.use_clustering
):
parameters.recipe = "postprocessing-xia2-multiplex-clustering"
# If we have a sample group that doesn't have any new data collections,
# proceed with triggering multiplex for all sample groups
if all(max(el.get("dcids", [])) > dcid for el in ispyb_related_dcids):
added_dcids = []
for el in ispyb_related_dcids:
added_dcids.extend([d for d in el.get("dcids", []) if d > dcid])
# Check if there are xia2 dials jobs that were triggered on any new
# data collections after current multiplex job was triggered
min_start_time = datetime.now() - timedelta(hours=12)
query = (
(
session.query(
AutoProcProgram, ProcessingJob.dataCollectionId
).join(
ProcessingJob,
ProcessingJob.processingJobId
== AutoProcProgram.processingJobId,
)
# If we have a sample group that doesn't have any new data collections,
# proceed with triggering multiplex for all sample groups
if all(max(el.get("dcids", [])) > dcid for el in ispyb_related_dcids):
added_dcids = []
for el in ispyb_related_dcids:
added_dcids.extend([d for d in el.get("dcids", []) if d > dcid])
# Check if there are xia2 dials jobs that were triggered on any new
# data collections after current multiplex job was triggered
min_start_time = datetime.now() - timedelta(hours=12)
query = (
(
session.query(
AutoProcProgram, ProcessingJob.dataCollectionId
).join(
ProcessingJob,
ProcessingJob.processingJobId
== AutoProcProgram.processingJobId,
)
.filter(ProcessingJob.dataCollectionId.in_(added_dcids))
.filter(ProcessingJob.automatic == True) # noqa E712
.filter(AutoProcProgram.processingPrograms == "xia2 dials")
.filter(AutoProcProgram.autoProcProgramId > program_id) # noqa E711
.filter(AutoProcProgram.recordTimeStamp > min_start_time) # noqa E711
)
# Abort triggering multiplex if we have xia2 dials running on any subsequent
# data collection in all sample groups
if triggered_processing_job := query.first():
self.log.info(
f"Aborting multiplex trigger for dcid {dcid} as processing job has been started for dcid {triggered_processing_job.dataCollectionId}"
)
return {"success": True}
.filter(ProcessingJob.dataCollectionId.in_(added_dcids))
.filter(ProcessingJob.automatic == True) # noqa E712
.filter(AutoProcProgram.processingPrograms == "xia2 dials")
.filter(AutoProcProgram.autoProcProgramId > program_id) # noqa E711
.filter(AutoProcProgram.recordTimeStamp > min_start_time) # noqa E711
)
# Abort triggering multiplex if we have xia2 dials running on any subsequent
# data collection in all sample groups
if triggered_processing_job := query.first():
self.log.info(
f"Aborting multiplex trigger for dcid {dcid} as processing job has been started for dcid {triggered_processing_job.dataCollectionId}"
)
return {"success": True}

# Calculate message delay for exponential backoff in case a processing
# program for a related data collection is still running, in which case
Expand Down