From 3f357d833cee756ecfb9942c358716c83b32cb1d Mon Sep 17 00:00:00 2001 From: amyjaynethompson <52806925+amyjaynethompson@users.noreply.github.com> Date: Mon, 19 Jan 2026 14:09:37 +0000 Subject: [PATCH 1/3] separate clustering behaviour and extra checks when xia2-dials is triggered with xia2.multiplex --- src/dlstbx/services/trigger.py | 77 ++++++++++++++++++---------------- 1 file changed, 40 insertions(+), 37 deletions(-) diff --git a/src/dlstbx/services/trigger.py b/src/dlstbx/services/trigger.py index d7257aad0..168a5b674 100644 --- a/src/dlstbx/services/trigger.py +++ b/src/dlstbx/services/trigger.py @@ -1803,53 +1803,56 @@ def trigger_multiplex( return {"success": True} self.log.debug(f"related_dcids for dcid={dcid}: {related_dcids}") + + # Turn on multiplex clustering + + if parameters.beamline in parameters.use_clustering: + parameters.recipe = "postprocessing-xia2-multiplex-clustering" + + # For beamlines where multiplex is triggered alongside xia2-dials need extra checks # Check if we have any new data collections added to any sample group # to decide if we need to processed triggering multiplex. # Run multiplex only once when processing for all samples in the group have been collected. - if parameters.use_clustering and parameters.program_id: + + if parameters.program_id: + # Get currnent list of data collections for all samples in the sample groups _, ispyb_info = dlstbx.ispybtbx.ispyb_filter( {}, {"ispyb_dcid": dcid}, session ) ispyb_related_dcids = ispyb_info.get("ispyb_related_dcids", []) - beamline = ispyb_info.get("ispyb_beamline", "") - visit = ispyb_info.get("ispyb_visit", "") - if beamline in parameters.use_clustering or any( - el in visit for el in parameters.use_clustering - ): - parameters.recipe = "postprocessing-xia2-multiplex-clustering" - # If we have a sample group that doesn't have any new data collections, - # proceed with triggering multiplex for all sample groups - if all(max(el.get("dcids", [])) > dcid for el in ispyb_related_dcids): - added_dcids = [] - for el in ispyb_related_dcids: - added_dcids.extend([d for d in el.get("dcids", []) if d > dcid]) - # Check if there are xia2 dials jobs that were triggered on any new - # data collections after current multiplex job was triggered - min_start_time = datetime.now() - timedelta(hours=12) - query = ( - ( - session.query( - AutoProcProgram, ProcessingJob.dataCollectionId - ).join( - ProcessingJob, - ProcessingJob.processingJobId - == AutoProcProgram.processingJobId, - ) + # If we have a sample group that doesn't have any new data collections, + # proceed with triggering multiplex for all sample groups + if all(max(el.get("dcids", [])) > dcid for el in ispyb_related_dcids): + added_dcids = [] + for el in ispyb_related_dcids: + added_dcids.extend([d for d in el.get("dcids", []) if d > dcid]) + # Check if there are xia2 dials jobs that were triggered on any new + # data collections after current multiplex job was triggered + min_start_time = datetime.now() - timedelta(hours=12) + query = ( + ( + session.query( + AutoProcProgram, ProcessingJob.dataCollectionId + ).join( + ProcessingJob, + ProcessingJob.processingJobId + == AutoProcProgram.processingJobId, ) - .filter(ProcessingJob.dataCollectionId.in_(added_dcids)) - .filter(ProcessingJob.automatic == True) # noqa E712 - .filter(AutoProcProgram.processingPrograms == "xia2 dials") - .filter(AutoProcProgram.autoProcProgramId > program_id) # noqa E711 - .filter(AutoProcProgram.recordTimeStamp > min_start_time) # noqa E711 ) - # Abort triggering multiplex if we have xia2 dials running on any subsequent - # data collection in all sample groups - if triggered_processing_job := query.first(): - self.log.info( - f"Aborting multiplex trigger for dcid {dcid} as processing job has been started for dcid {triggered_processing_job.dataCollectionId}" - ) - return {"success": True} + .filter(ProcessingJob.dataCollectionId.in_(added_dcids)) + .filter(ProcessingJob.automatic == True) # noqa E712 + .filter(AutoProcProgram.processingPrograms == "xia2 dials") + .filter(AutoProcProgram.autoProcProgramId > program_id) # noqa E711 + .filter(AutoProcProgram.recordTimeStamp > min_start_time) # noqa E711 + ) + # Abort triggering multiplex if we have xia2 dials running on any subsequent + # data collection in all sample groups + if triggered_processing_job := query.first(): + self.log.info( + f"Aborting multiplex trigger for dcid {dcid} as processing job has been started for dcid {triggered_processing_job.dataCollectionId}" + ) + return {"success": True} # Calculate message delay for exponential backoff in case a processing # program for a related data collection is still running, in which case From f43d1b3565962275deaebb739a3ff60463a50551 Mon Sep 17 00:00:00 2001 From: amyjaynethompson <52806925+amyjaynethompson@users.noreply.github.com> Date: Wed, 21 Jan 2026 14:26:52 +0000 Subject: [PATCH 2/3] add trigger_every_collection to make different logic more explicit --- src/dlstbx/services/trigger.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/dlstbx/services/trigger.py b/src/dlstbx/services/trigger.py index 168a5b674..b6d4f2fb2 100644 --- a/src/dlstbx/services/trigger.py +++ b/src/dlstbx/services/trigger.py @@ -222,6 +222,8 @@ class MultiplexParameters(pydantic.BaseModel): diffraction_plan_info: Optional[DiffractionPlanInfo] = None recipe: Optional[str] = None use_clustering: Optional[List[str]] = None + beamline: str + trigger_every_collection: bool class Xia2SsxReduceParameters(pydantic.BaseModel): @@ -1814,7 +1816,13 @@ def trigger_multiplex( # to decide if we need to processed triggering multiplex. # Run multiplex only once when processing for all samples in the group have been collected. - if parameters.program_id: + if parameters.trigger_every_collection: + + self.log.info("Triggering xia2.multiplex after every data collection.") + + else: + + self.log.info("Checking for subsequent dcids that are still processing.") # Get currnent list of data collections for all samples in the sample groups _, ispyb_info = dlstbx.ispybtbx.ispyb_filter( From 5d660c85eba6a73a16dfab04910162d586a2db78 Mon Sep 17 00:00:00 2001 From: amyjaynethompson <52806925+amyjaynethompson@users.noreply.github.com> Date: Wed, 21 Jan 2026 14:47:18 +0000 Subject: [PATCH 3/3] documentation --- src/dlstbx/services/trigger.py | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/src/dlstbx/services/trigger.py b/src/dlstbx/services/trigger.py index b6d4f2fb2..e61e169df 100644 --- a/src/dlstbx/services/trigger.py +++ b/src/dlstbx/services/trigger.py @@ -1738,17 +1738,22 @@ def trigger_multiplex( will be created, and the resulting list of processingJobIds will be sent to the `processing_recipe` queue. - If clustering algorithm is enabled, skip triggering multiplex if new related dcid - values are added into all defined sample groups to run multiplex only once when - all samples in one of the groups have been collected. When running multiplex - only include results from datasets collected prior to the current one in all - sample groups. + If clustering algorithm is enabled, further commandline options are added. + + There are two ways to trigger multiplex controlled by trigger_every_collection. + Multiplex will either be triggered after every dials collection (useful for + beamlines that use multiplex for mid-experiment feedback), or if there are + no subsequent datasets still processing (vmxi). The latter equates to + "once per sample group" if the experiment is rapidly collecting with no + significant delays. Recipe parameters: - target: set this to "multiplex" + - beamline: the beamline as a string - dcid: the dataCollectionId for the given data collection - comment: a comment to be stored in the ProcessingJob.comment field - automatic: boolean value passed to ProcessingJob.automatic field + - trigger_every_collection: decide triggering behaviour of multiplex - ispyb_parameters: a dictionary of ispyb_reprocessing_parameters set in the parent xia2-dials processing job - related_dcids: a list of groups of related data collection ids. Each item in @@ -1807,21 +1812,22 @@ def trigger_multiplex( self.log.debug(f"related_dcids for dcid={dcid}: {related_dcids}") # Turn on multiplex clustering - - if parameters.beamline in parameters.use_clustering: + + if ( + parameters.use_clustering + and parameters.beamline in parameters.use_clustering + ): parameters.recipe = "postprocessing-xia2-multiplex-clustering" # For beamlines where multiplex is triggered alongside xia2-dials need extra checks # Check if we have any new data collections added to any sample group # to decide if we need to processed triggering multiplex. # Run multiplex only once when processing for all samples in the group have been collected. - - if parameters.trigger_every_collection: + if parameters.trigger_every_collection: self.log.info("Triggering xia2.multiplex after every data collection.") else: - self.log.info("Checking for subsequent dcids that are still processing.") # Get currnent list of data collections for all samples in the sample groups