From 14c9e093f846248464122f10c5504b4b4487ace3 Mon Sep 17 00:00:00 2001 From: amyjaynethompson <52806925+amyjaynethompson@users.noreply.github.com> Date: Wed, 22 Apr 2026 11:52:47 +0100 Subject: [PATCH 1/4] initial addition for new multiplex_filtering pipeline --- setup.py | 1 + src/dlstbx/services/trigger.py | 378 +++++++++++++++ src/dlstbx/wrapper/xia2_multiplex.py | 5 + .../wrapper/xia2_multiplex_filtering.py | 437 ++++++++++++++++++ 4 files changed, 821 insertions(+) create mode 100644 src/dlstbx/wrapper/xia2_multiplex_filtering.py diff --git a/setup.py b/setup.py index a4d26588e..e5a168660 100644 --- a/setup.py +++ b/setup.py @@ -93,6 +93,7 @@ "topaz3 = dlstbx.wrapper.topaz3_wrapper:Topaz3Wrapper", "xia2 = dlstbx.wrapper.xia2:Xia2Wrapper", "xia2.multiplex = dlstbx.wrapper.xia2_multiplex:Xia2MultiplexWrapper", + "xia2.multiplex_filtering = dlstbx.wrapper.xia2_multiplex_filtering:Xia2MultiplexFilteringWrapper", "xia2.strategy = dlstbx.wrapper.xia2_strategy:Xia2StrategyWrapper", "xia2.to_shelxcde = dlstbx.wrapper.xia2_to_shelxcde:Xia2toShelxcdeWrapper", "xia2.ssx = dlstbx.wrapper.xia2_ssx:Xia2SsxWrapper", diff --git a/src/dlstbx/services/trigger.py b/src/dlstbx/services/trigger.py index d74ff5d1e..0a26a8b45 100644 --- a/src/dlstbx/services/trigger.py +++ b/src/dlstbx/services/trigger.py @@ -74,6 +74,7 @@ class RelatedDCIDs(pydantic.BaseModel): dcids: List[int] sample_id: Optional[int] = pydantic.Field(gt=0, default=None) sample_group_id: Optional[int] = pydantic.Field(gt=0, default=None) + name: Optional[str] = None class MetalIdParameters(pydantic.BaseModel): @@ -233,6 +234,32 @@ class MultiplexParameters(pydantic.BaseModel): trigger_every_collection: bool +class MultiplexFilteringParameters(pydantic.BaseModel): + dcid: int = pydantic.Field(gt=0) + multiplex_job: pathlib.Path + automatic: Optional[bool] = False + comment: Optional[str] = None + diffraction_plan_info: Optional[DiffractionPlanInfo] = None + recipe: Optional[str] = None + beamline: str + multiplex_id: Optional[int] = pydantic.Field(default=0, gt=0) + cluster_num: Optional[str] = None + backoff_delay: float = pydantic.Field(default=8, alias="backoff-delay") + backoff_max_try: int = pydantic.Field(default=10, alias="backoff-max-try") + backoff_multiplier: float = pydantic.Field(default=2, alias="backoff-multiplier") + multiplex_parameters: dict + filtering_group_size: Dict[str, int] = pydantic.Field( + default={"default": 50}, alias="filtering-group-size" + ) + + @pydantic.field_validator("multiplex_parameters", mode="before") + @classmethod + def convert_multiplex_parameters(cls, v): + if isinstance(v, ChainMapWithReplacement): + return dict(v) + return v + + class Xia2SsxReduceParameters(pydantic.BaseModel): dcid: int = pydantic.Field(gt=0) related_dcids: List[RelatedDCIDs] @@ -2168,6 +2195,12 @@ def trigger_multiplex( job_parameters.append(("sample_id", str(group.sample_id))) else: job_parameters.append(("sample_group_id", str(group.sample_group_id))) + + # Pass these to xia2.multiplex_filtering to avoid making all the same queries as above all over again + + for idx, dcid in enumerate(dcids): + job_parameters.append((f"group_dcid_{idx}", str(dcid))) + if parameters.spacegroup: job_parameters.append(("spacegroup", parameters.spacegroup)) if ( @@ -2206,6 +2239,351 @@ def trigger_multiplex( return {"success": True, "return_value": jobids} + @pydantic.validate_call(config={"arbitrary_types_allowed": True}) + def trigger_multiplex_filtering( + self, + rw: RecipeWrapper, + *, + parameters: MultiplexFilteringParameters, + session: sqlalchemy.orm.session.Session, + transaction: int, + message: Dict, + **kwargs, + ): + """Trigger a xia2.multiplex_filtering job for a given data collection. + This is triggered from a completed xia2.multiplex job and makes use of this + existing file structure. The filtering algorithms identify if there are any groups + of images which do not correlate with the rest of the data. The size of these + image groups can be varied. + + Currently, running filtering on clusters is not supported, so it is first verified + whether or not the triggered job corresponds to a cluster. + + A check is then run that the given multiplex directory exists, the job has finished + running, and that all the necessary files have finished copying. If they have not, a + delay is used like in xia2.multiplex. + + delay = backoff-delay * backoff-multiplier ** ntry + + If any xia2.multiplex jobs are still running (or files copying) after backoff-max-try + iterations, then xia2.multiplex_filtering will not run. + + The filtering parameters are then set (note future iterations will likey vary these) and + new ProcessingJob, ProcessingJobImageSweep and ProcessingJobParameter entries + will be created, and the resulting list of processingJobIds will be sent to + the `processing_recipe` queue. + + Note that initially this will only be triggered for VMXm. This filtering is done here + in the trigger service as future plans will roll out generally to all multiplex jobs. + + Recipe parameters: + - target: set this to "multiplex_filtering" + - beamline: the beamline as a string + - dcid: the dataCollectionId for the given data collection + - comment: a comment to be stored in the ProcessingJob.comment field + - automatic: boolean value passed to ProcessingJob.automatic field + - multiplex_job: location of the parent multiplex job ({ispyb_working_directory}/xia2.multiplex) + - multiplex_id:processingJobId of the parent multiplex job + - cluster_num: if the triggering multiplex job was a cluster, list the number + - multiplex_parameters: ispyb_processing_parameters for the parent multiplex job + - backoff-delay: base delay (in seconds) for exponential backoff in the event + that one or more xia2-dials processing job is still running or yet to start + - backoff-max-try: the number of times that a message re-sent before giving up + - backoff-multiplier: the multiplier for exponential backoff + + Example recipe parameters: + { + "target": "multiplex_filtering", + "dcid": "21152574", + "recipe": "postprocessing-multiplex-filtering", + "comment": "Extra filtering using xia2.multiplex triggered by automatic xia2.multiplex", + "automatic": true, + "beamline": "i02-2", + "multiplex_job": "/dls/mx/data/nt37104/nt37104-202/tmp/zocalo/VMXi-AB7814/well_62/images/image_291258/b8a346a9-c5cb-4eb5-8208-6115eb283910/xia2.multiplex", + "multiplex_id": "37941124", + "cluster_num": "$cluster_number", + "multiplex_parameters": { + "data": [ + "/dls/mx/data/nt37104/nt37104-202/processed/VMXi-AB7814/well_62/images/image_291252/c8ac49b8-741f-439d-94d7-3ab6f81b0c35/xia2-dials/DataFiles/nt37104v202_ximage291252_SAD_SWEEP1.refl;/dls/mx/data/nt37104/nt37104-202/processed/VMXi-AB7814/well_62/images/image_291252/c8ac49b8-741f-439d-94d7-3ab6f81b0c35/xia2-dials/DataFiles/nt37104v202_ximage291252_SAD_SWEEP1.expt", + "/dls/mx/data/nt37104/nt37104-202/processed/VMXi-AB7814/well_62/images/image_291253/cb1886c8-d3af-4412-ae9c-102859bab79c/xia2-dials/DataFiles/nt37104v202_ximage291253_SAD_SWEEP1.refl;/dls/mx/data/nt37104/nt37104-202/processed/VMXi-AB7814/well_62/images/image_291253/cb1886c8-d3af-4412-ae9c-102859bab79c/xia2-dials/DataFiles/nt37104v202_ximage291253_SAD_SWEEP1.expt", + "/dls/mx/data/nt37104/nt37104-202/processed/VMXi-AB7814/well_62/images/image_291254/4e44c987-5067-40d7-999e-4f89c70c7dc1/xia2-dials/DataFiles/nt37104v202_ximage291254_SAD_SWEEP1.expt;/dls/mx/data/nt37104/nt37104-202/processed/VMXi-AB7814/well_62/images/image_291254/4e44c987-5067-40d7-999e-4f89c70c7dc1/xia2-dials/DataFiles/nt37104v202_ximage291254_SAD_SWEEP1.refl", + "/dls/mx/data/nt37104/nt37104-202/processed/VMXi-AB7814/well_62/images/image_291255/1755523f-69c8-486b-8252-c2e961760488/xia2-dials/DataFiles/nt37104v202_ximage291255_SAD_SWEEP1.expt;/dls/mx/data/nt37104/nt37104-202/processed/VMXi-AB7814/well_62/images/image_291255/1755523f-69c8-486b-8252-c2e961760488/xia2-dials/DataFiles/nt37104v202_ximage291255_SAD_SWEEP1.refl", + "/dls/mx/data/nt37104/nt37104-202/processed/VMXi-AB7814/well_62/images/image_291256/0dba9917-4b69-454c-970c-6fde3cc956bf/xia2-dials/DataFiles/nt37104v202_ximage291256_SAD_SWEEP1.expt;/dls/mx/data/nt37104/nt37104-202/processed/VMXi-AB7814/well_62/images/image_291256/0dba9917-4b69-454c-970c-6fde3cc956bf/xia2-dials/DataFiles/nt37104v202_ximage291256_SAD_SWEEP1.refl", + "/dls/mx/data/nt37104/nt37104-202/processed/VMXi-AB7814/well_62/images/image_291257/2d732791-5c61-4b39-beb3-347506c4534b/xia2-dials/DataFiles/nt37104v202_ximage291257_SAD_SWEEP1.refl;/dls/mx/data/nt37104/nt37104-202/processed/VMXi-AB7814/well_62/images/image_291257/2d732791-5c61-4b39-beb3-347506c4534b/xia2-dials/DataFiles/nt37104v202_ximage291257_SAD_SWEEP1.expt", + "/dls/mx/data/nt37104/nt37104-202/processed/VMXi-AB7814/well_62/images/image_291258/4630060b-e3fd-4b6f-b28e-0a86f560a221/xia2-dials/DataFiles/nt37104v202_ximage291258_SAD_SWEEP1.expt;/dls/mx/data/nt37104/nt37104-202/processed/VMXi-AB7814/well_62/images/image_291258/4630060b-e3fd-4b6f-b28e-0a86f560a221/xia2-dials/DataFiles/nt37104v202_ximage291258_SAD_SWEEP1.refl" + ], + "sample_id": [ + "7087938" + ], + "group_dcid_0": [ + "21152550" + ], + "group_dcid_1": [ + "21152556" + ], + "group_dcid_2": [ + "21152559" + ], + "group_dcid_3": [ + "21152562" + ], + "group_dcid_4": [ + "21152565" + ], + "group_dcid_5": [ + "21152568" + ], + "group_dcid_6": [ + "21152574" + ], + "clustering.method": [ + "coordinate" + ], + "clustering.output_clusters": [ + "true" + ] + }, + "filtering_group_size": { + "default": 50, + "i02-1": 10 + }, + "backoff-delay": "8", + "backoff-max-try": "10", + "backoff-multiplier": "2" + } + """ + + # For now, just trigger for VMXm + if parameters.beamline != "i02-1": + self.log.info( + f"xia2.multiplex_filtering currently not triggered for beamline {parameters.beamline}." + ) + return {"success": True} + + # even though could be multiple different multiplex jobs for a given DCID, only have one here + # Think this is because the multiplex filtering is triggered off the individual multiplex jobs + + if parameters.cluster_num != "None": + is_cluster = True + else: + is_cluster = False + + if is_cluster: + self.log.info( + f"Incoming multiplex is cluster {parameters.cluster_num}. Filtering not currently supported for clusters." + ) + return {"success": True} + else: + multiplex_dir = parameters.multiplex_job + if not multiplex_dir.is_dir(): + self.log.error( + f"Given multiplex directory {multiplex_dir} does not exist. Aborting job." + ) + return {"success": True} + else: + self.log.info(f"Previous multiplex job at {multiplex_dir}") + + # Add in delay in case not finished + status = { + "ntry": 0, + } + if isinstance(message, dict): + status.update(message.get("trigger-status", {})) + message_delay = int( + parameters.backoff_delay + * parameters.backoff_multiplier ** status["ntry"] + ) + status["ntry"] += 1 + self.log.debug( + f"dcid={parameters.dcid}\nmessage_delay={message_delay}\n{status}" + ) + + min_start_time = datetime.now() - timedelta(hours=24) + query = ( + ( + session.query(AutoProcProgram, ProcessingJob.dataCollectionId).join( + ProcessingJob, + ProcessingJob.processingJobId + == AutoProcProgram.processingJobId, + ) + ) + .filter(ProcessingJob.dataCollectionId == parameters.dcid) + .filter(ProcessingJob.automatic == True) # noqa E712 + .filter(ProcessingJob.recordTimestamp > min_start_time) # noqa E711 + .filter( + AutoProcProgram.processingJobId == parameters.multiplex_id + ) # check only parent multiplex + .filter( + or_( + AutoProcProgram.processingStatus == None, # noqa E711 + AutoProcProgram.processingStartTime == None, # noqa E711 + ) + ) + ) + + # If there are any running (or yet to start) jobs, then checkpoint with delay + waiting_processing_jobs = query.all() + + # Check if finished copying files over + + needed_files = [ + multiplex_dir / "models.expt", + multiplex_dir / "observations.refl", + multiplex_dir / "scaled.mtz", + multiplex_dir / "xia2-multiplex-working.phil", + multiplex_dir / "xia2.multiplex.json", + ] + + for mplx_file in needed_files: + if not mplx_file.is_file(): + waiting_processing_jobs.append(parameters.multiplex_id) + self.log.info( + f"Files still copying - {mplx_file} not yet present in {multiplex_dir}." + ) + break + + if len(waiting_processing_jobs) > 0: + self.log.info( + f"Waiting on xia2.multiplex {parameters.multiplex_job} from processing job id {parameters.multiplex_id} to finish processing (dcid {parameters.dcid})" + ) + if status["ntry"] > parameters.backoff_max_try: + # Give up waiting for this program to finish and trigger + # multiplex with remaining related results are available + self.log.info( + f"max-try exceeded, giving up waiting for related processings for dcid {parameters.dcid}\n" + ) + else: + # Send results to myself for next round of processing + rw.checkpoint( + { + "trigger-status": status, + }, + delay=message_delay, + transaction=transaction, + ) + + return {"success": True} + + dc = ( + session.query(DataCollection) + .filter(DataCollection.dataCollectionId == parameters.dcid) + .one() + ) + + # define params -> use image_mode because dataset mode is essentially equivalent to clustering + + job_parameters: list[tuple[str, str]] = [] + + job_parameters.extend([("input", str(multiplex_dir))]) + + if ( + parameters.diffraction_plan_info + and parameters.diffraction_plan_info.anomalousScatterer + ): + job_parameters.extend( + [ + ("anomalous", "true"), + ("absorption_level", "high"), + ] + ) + + group_size = parameters.filtering_group_size.get( + parameters.beamline, parameters.filtering_group_size["default"] + ) + + job_parameters.extend( + [ + ("filtering.method", "deltacchalf"), + ("deltacchalf.stdcutoff", "3"), + ("deltacchalf.mode", "image_group"), + ("deltacchalf.group_size", str(group_size)), + ] + ) + + # compile all dcids actually used in previous multiplex job + # saves re-doing all the ispyb queries that multiplex does + # these are necessary as much filtering is done + # eg related dcids stored in the data base can be mix of grid scans and rotation which shoudl not be merged + + dcids = [] + + for i in parameters.multiplex_parameters: + if "group_dcid" in i: + dcids.append(parameters.multiplex_parameters[i][0]) + + # Need either sample_id or sample_group_id in the parameters so that synchweb will display the group name (ie xia2.multiplex_filtering (C7d2_)) + + if "sample_id" in parameters.multiplex_parameters: + sample_id = parameters.multiplex_parameters["sample_id"][0] + job_parameters.append(("sample_id", sample_id)) + self.log.debug(f"Has sample_id {sample_id}") + else: + sample_group_id = parameters.multiplex_parameters["sample_group_id"][0] + job_parameters.append(("sample_group_id", sample_group_id)) + self.log.debug(f"Has sample_group_id {sample_group_id}") + + # put parameters into ispyb data base + + jp = self.ispyb.mx_processing.get_job_params() + jp["automatic"] = parameters.automatic + jp["comments"] = parameters.comment + jp["datacollectionid"] = parameters.dcid + jp["display_name"] = "xia2.multiplex_filtering" + jp["recipe"] = "postprocessing-xia2-multiplex-filtering" + jobid = self.ispyb.mx_processing.upsert_job(list(jp.values())) + self.log.debug(f"xia2.multiplex_filtering trigger: generated JobID {jobid}") + + # Need to add all related dcids to jisp so that synchweb has a label for how many datasets are involved (ie 7x xia2.multiplex_filtering) + + query = ( + session.query(DataCollection) + .filter(DataCollection.dataCollectionId.in_(dcids)) + .options( + Load(DataCollection).load_only( + DataCollection.dataCollectionId, + DataCollection.wavelength, + DataCollection.startImageNumber, + DataCollection.numberOfImages, + raiseload=True, + ) + ) + ) + + for dc in query.all(): + jisp = self.ispyb.mx_processing.get_job_image_sweep_params() + jisp["datacollectionid"] = dc.dataCollectionId + jisp["start_image"] = dc.startImageNumber + jisp["end_image"] = dc.startImageNumber + dc.numberOfImages - 1 + jisp["job_id"] = jobid + jispid = self.ispyb.mx_processing.upsert_job_image_sweep( + list(jisp.values()) + ) + self.log.debug( + f"xia2.multiplex_filtering trigger: generated JobImageSweepID {jispid}" + ) + + for k, v in job_parameters: + jpp = self.ispyb.mx_processing.get_job_parameter_params() + jpp["job_id"] = jobid + jpp["parameter_key"] = k + jpp["parameter_value"] = v + jppid = self.ispyb.mx_processing.upsert_job_parameter( + list(jpp.values()) + ) + self.log.debug( + f"xia2.multiplex_filtering trigger generated JobParameterID {jppid} with {k}={v}" + ) + # send final message + + message = {"recipes": [], "parameters": {"ispyb_process": jobid}} + rw.transport.send("processing_recipe", message) + + self.log.info( + f"xia2.multiplex_filtering trigger: Processing job {jobid} triggered" + ) + + return {"success": True, "return_value": jobid} + @pydantic.validate_call(config={"arbitrary_types_allowed": True}) def trigger_xia2_ssx_reduce( self, diff --git a/src/dlstbx/wrapper/xia2_multiplex.py b/src/dlstbx/wrapper/xia2_multiplex.py index 17be9e0b1..164112a31 100644 --- a/src/dlstbx/wrapper/xia2_multiplex.py +++ b/src/dlstbx/wrapper/xia2_multiplex.py @@ -151,6 +151,9 @@ def construct_commandline(self, params): if params.get("ispyb_parameters"): ignore = {"sample_id", "sample_group_id"} + for param in params["ispyb_parameters"]: + if "group_dcid" in param: + ignore.add(param) translation = { "d_min": "resolution.d_min", "spacegroup": "symmetry.space_group", @@ -421,6 +424,8 @@ def is_final_result(final_file: pathlib.Path) -> bool: ) self.recwrap.environment.update({"dimple_symlink": dimple_symlink}) + self.recwrap.environment.update({"cluster_number": cluster_num}) + # Send results to ispyb and trigger downstream recipe steps for this dataset self.log.info( f"Triggering downstream recipe steps for dataset: '{dataset_name}'" diff --git a/src/dlstbx/wrapper/xia2_multiplex_filtering.py b/src/dlstbx/wrapper/xia2_multiplex_filtering.py new file mode 100644 index 000000000..1cf568fb9 --- /dev/null +++ b/src/dlstbx/wrapper/xia2_multiplex_filtering.py @@ -0,0 +1,437 @@ +from __future__ import annotations + +import json +import pathlib +import shutil +import subprocess +import time +from fnmatch import fnmatch + +import iotbx.merging_statistics +from cctbx import uctbx + +import dlstbx.util.symlink +from dlstbx.wrapper import Wrapper + + +def lookup(merging_stats, item, shell): + i_bin = {"innerShell": 0, "outerShell": -1}.get(shell) + if i_bin is not None: + return merging_stats[item][i_bin] + return merging_stats["overall"][item] + + +class Xia2MultiplexFilteringWrapper(Wrapper): + """ + Wrapper for xia2.multiplex_filtering. Largely based off wrapper for xia2.multiplex. + + xia2.multiplex_filtering takes existing multiplex results and performs some + additional filtering to improve data reduction quality. + + """ + + _logger_name = "dlstbx.wrap.xia2.multiplex_filtering" + name = "xia2.multiplex_filtering" + + def send_results_to_ispyb( + self, z, xtriage_results=None, cluster_num=None, attachments=[] + ): + ispyb_command_list = [] + + # Place holder code for future iterations where may run filtering jobs on clusters + + if cluster_num is not None: + register_autoproc_prog = { + "ispyb_command": "register_processing", + "program": "xia2.multiplex_filtering", + "cmdline": "xia2.multiplex_filtering (ap-zoc)", + "environment": {"cluster": cluster_num}, + "store_result": "ispyb_autoprocprogram_id", + } + ispyb_command_list.append(register_autoproc_prog) + + # Step 1: Add new record to AutoProc, keep the AutoProcID + + register_autoproc = { + "ispyb_command": "write_autoproc", + "autoproc_id": None, + "store_result": "ispyb_autoproc_id", + "spacegroup": z["spacegroup"], + "refinedcell_a": z["unit_cell"][0], + "refinedcell_b": z["unit_cell"][1], + "refinedcell_c": z["unit_cell"][2], + "refinedcell_alpha": z["unit_cell"][3], + "refinedcell_beta": z["unit_cell"][4], + "refinedcell_gamma": z["unit_cell"][5], + } + ispyb_command_list.append(register_autoproc) + + # Step 2: Store scaling results, linked to the AutoProcID + # Keep the AutoProcScalingID + + insert_scaling = z["scaling_statistics"] + insert_scaling.update( + { + "ispyb_command": "insert_scaling", + "autoproc_id": "$ispyb_autoproc_id", + "store_result": "ispyb_autoprocscaling_id", + } + ) + ispyb_command_list.append(insert_scaling) + + # Step 3: Store integration result, linked to the ScalingID + # Use pre-registered integration id for 'Filtered' dataset + + if cluster_num is not None: + integration_id = None + else: + integration_id = "$ispyb_integration_id" + + integration = { + "ispyb_command": "upsert_integration", + "scaling_id": "$ispyb_autoprocscaling_id", + "integration_id": integration_id, + "cell_a": z["unit_cell"][0], + "cell_b": z["unit_cell"][1], + "cell_c": z["unit_cell"][2], + "cell_alpha": z["unit_cell"][3], + "cell_beta": z["unit_cell"][4], + "cell_gamma": z["unit_cell"][5], + } + ispyb_command_list.append(integration) + + # Step 4: Upload attachments + + if attachments: + for attachment in attachments: + upload_attachment = { + "ispyb_command": "add_program_attachment", + "program_id": "$ispyb_autoprocprogram_id", + "file_name": attachment["file_name"], + "file_path": attachment["file_path"], + "file_type": attachment["file_type"], + "importance_rank": attachment["importance_rank"], + } + ispyb_command_list.append(upload_attachment) + + # Step 5: Register successful processing for cluster jobs (placeholder code) + + if cluster_num is not None: + update_autoproc_prog = { + "ispyb_command": "update_processing_status", + "program_id": "$ispyb_autoprocprogram_id", + "message": "processing successful", + "status": "success", + } + ispyb_command_list.append(update_autoproc_prog) + + if xtriage_results is not None: + for level, messages in xtriage_results.items(): + for message in messages: + if ( + message["text"] + == "The merging statistics indicate that the data may be assigned to the wrong space group." + ): + # this is not a useful warning for multi-crystal + continue + ispyb_command_list.append( + { + "ispyb_command": "add_program_message", + "program_id": "$ispyb_autoprocprogram_id", + "message": message["text"], + "description": message["summary"], + "severity": {0: "INFO", 1: "WARNING", 2: "ERROR"}.get( + message["level"] + ), + } + ) + + self.log.debug(f"Sending {ispyb_command_list}") + self.recwrap.send_to("ispyb", {"ispyb_command_list": ispyb_command_list}) + + def construct_commandline(self, params): + command = ["xia2.multiplex_filtering"] + + # xia2.multiplex_filtering uses previous multiplex job files + # so no specific data files to be input, just add ispyb_parameters to cmdline + + if params.get("ispyb_parameters"): + ignore = {"sample_id", "sample_group_id"} + translation = { + "d_min": "resolution.d_min", + } + for param, value in params["ispyb_parameters"].items(): + if param not in ignore: + command.append(translation.get(param, param) + "=" + value[0]) + + return command + + def run(self): + assert hasattr(self, "recwrap"), "No recipewrapper object found" + + params = self.recwrap.recipe_step["job_parameters"] + + command = self.construct_commandline(params) + + working_directory = pathlib.Path(params["working_directory"]) + results_directory = pathlib.Path(params["results_directory"]) + + # Create working directory with symbolic link + + working_directory.mkdir(parents=True, exist_ok=True) + if params.get("create_symlink"): + dlstbx.util.symlink.create_parent_symlink( + working_directory, params["create_symlink"] + ) + + # run xia2.multiplex_filtering in working directory + + self.log.info(f"command: {' '.join(command)}") + try: + start_time = time.perf_counter() + result = subprocess.run( + command, + capture_output=True, + timeout=params.get("timeout"), + cwd=working_directory, + ) + runtime = time.perf_counter() - start_time + self.log.info(f"xia2.multiplex_filtering took {runtime} seconds") + self._runtime_hist.observe(runtime) + except subprocess.TimeoutExpired as te: + success = False + self.log.warning( + f"xia2.multiplex_filtering timed out: {te.timeout}\n {te.cmd}" + ) + self.log.debug(te.stdout) + self.log.debug(te.stderr) + self._timeout_counter.inc() + else: + if success := not result.returncode: + self.log.info("xia2.multiplex_filtering successful") + else: + self.log.info( + f"xia2.multiplex_filtering failed with exitcode {result.returncode}" + ) + self.log.debug(result.stdout) + self.log.debug(result.stderr) + self.log.info(f"working_directory: {working_directory}") + + filtered_unmerged_mtz = working_directory / "filtered_unmerged.mtz" + multiplex_filtering_json = working_directory / "xia2.multiplex_filtering.json" + + if not (filtered_unmerged_mtz.is_file() and multiplex_filtering_json.is_file()): + success = False + + # Create results directory + results_directory.mkdir(parents=True, exist_ok=True) + if params.get("create_symlink"): + dlstbx.util.symlink.create_parent_symlink( + results_directory, params["create_symlink"] + ) + if pipeline_final_params := params.get("pipeline-final", []): + final_directory = pathlib.Path(pipeline_final_params["path"]) + final_directory.mkdir(parents=True, exist_ok=True) + if params.get("create_symlink"): + dlstbx.util.symlink.create_parent_symlink( + final_directory, params["create_symlink"] + ) + + # Maybe move this function elsewhere + def is_final_result(final_file: pathlib.Path) -> bool: + return any( + fnmatch(str(final_file.name), patt) + for patt in pipeline_final_params["patterns"] + ) + + keep_ext = { + ".png": None, + ".log": "log", + ".json": None, + ".expt": None, + ".refl": None, + ".mtz": None, + ".html": "log", + ".sca": None, + } + keep = { + "filtered.mtz": "result", + "filtered_unmerged.mtz": "result", + "filtered.expt": "result", + "filtered.refl": "result", + "filtered.sca": "result", + "merging-stats.json": "graph", + "xia2.multiplex_filtering.json": "result", + } + + # Record these log files first so they appear at the top of the list + # of attachments in SynchWeb + + primary_log_files = [ + working_directory / "xia2.multiplex_filtering.html", + working_directory / "xia2.multiplex_filtering.log", + ] + + allfiles = [] + + if success: + with multiplex_filtering_json.open("r") as fh: + d = json.load(fh) + + # Retain place holder logic from multiplex wrapper for future iterations with clusters + # Note that 'all data' corresponds to the parent multiplex job + # It is also added to the multiplex_filtering_json so the html has user-friendly comparisons + # However, as it is just a copy of the results from multiplex, it is ignored here + + for dataset_name, dataset in d["datasets"].items(): + if dataset_name == "Filtered": + base_dir = working_directory + dimple_symlink = "dimple-xia2.multiplex_filtering" + cluster_prefix = "" + cluster_num = None + elif "coordinate cluster" in dataset_name: + self.log.warning("Not currently applying filtering to clusters") + continue + elif dataset_name == "All data": + self.log.warning( + 'Results for "All Data" already in parent xia2.multiplex run.' + ) + continue + else: + self.log.warning( + f"Ignoring unrecognised dataset pattern {dataset_name}" + ) + continue + + filtered_unmerged_mtz = ( + base_dir / f"{cluster_prefix}filtered_unmerged.mtz" + ) + i_obs = iotbx.merging_statistics.select_data( + filtered_unmerged_mtz.as_posix(), data_labels=None + ) + merging_stats = dataset["merging_stats"] + merging_stats_anom = dataset["merging_stats_anom"] + with (base_dir / f"{cluster_prefix}merging-stats.json").open("w") as fh: + json.dump(merging_stats, fh) + + ispyb_d = { + "commandline": " ".join(command), + "spacegroup": i_obs.space_group().type().lookup_symbol(), + "unit_cell": list(i_obs.unit_cell().parameters()), + "scaling_statistics": {}, + } + for shell in ("overall", "innerShell", "outerShell"): + ispyb_d["scaling_statistics"][shell] = { + "cc_half": lookup(merging_stats, "cc_one_half", shell), + "completeness": lookup(merging_stats, "completeness", shell), + "mean_i_sig_i": lookup( + merging_stats, "i_over_sigma_mean", shell + ), + "multiplicity": lookup(merging_stats, "multiplicity", shell), + "n_tot_obs": lookup(merging_stats, "n_obs", shell), + "n_tot_unique_obs": lookup(merging_stats, "n_uniq", shell), + "r_merge": lookup(merging_stats, "r_merge", shell), + "res_lim_high": uctbx.d_star_sq_as_d( + lookup(merging_stats, "d_star_sq_min", shell) + ), + "res_lim_low": uctbx.d_star_sq_as_d( + lookup(merging_stats, "d_star_sq_max", shell) + ), + "anom_completeness": lookup( + merging_stats_anom, "anom_completeness", shell + ), + "anom_multiplicity": lookup( + merging_stats_anom, "multiplicity", shell + ), + "cc_anom": lookup(merging_stats_anom, "cc_anom", shell), + "r_meas_all_iplusi_minus": lookup( + merging_stats_anom, "r_meas", shell + ), + } + xtriage_results = dataset.get("xtriage") + attachments = [] + + for filename in set(primary_log_files + list(base_dir.iterdir())): + filetype = None + if not filename.is_file(): + continue + filetype = keep_ext.get(filename.suffix) + for file_pattern in keep: + if filename.name.endswith(file_pattern): + filetype = keep[file_pattern] + if filetype is None: + continue + + destination = results_directory / filename.name + if ( + destination.as_posix() in allfiles + and filename not in primary_log_files + ): + destination = ( + results_directory / f"{cluster_prefix}{filename.name}" + ) + + if destination.as_posix() not in allfiles: + self.log.debug(f"Copying {filename} to {destination}") + shutil.copy(filename, destination) + allfiles.append(destination.as_posix()) + if pipeline_final_params and is_final_result(destination): + destination = final_directory / destination.name + if destination not in allfiles: + self.log.debug(f"Copying {filename} to {destination}") + shutil.copy(filename, destination) + allfiles.append(destination.as_posix()) + + if filetype: + attachments.append( + { + "file_path": destination.parent.as_posix(), + "file_name": destination.name, + "file_type": filetype, + "importance_rank": ( + 1 + if destination.name.endswith( + ( + "filtered.mtz", + "xia2.multiplex_filtering.html", + "xia2.multiplex_filtering.log", + ) + ) + else 2 + ), + } + ) + + # Add parameters to the environment to be picked up downstream by trigger function + + self.recwrap.environment.update( + { + "filtered_mtz": ( + results_directory / f"{cluster_prefix}filtered.mtz" + ).as_posix() + } + ) + self.recwrap.environment.update( + { + "filtered_unmerged_mtz": ( + results_directory / f"{cluster_prefix}filtered_unmerged.mtz" + ).as_posix() + } + ) + self.recwrap.environment.update({"dimple_symlink": dimple_symlink}) + + # Send results to ispyb and trigger downstream recipe steps for this dataset + + self.log.info( + f"Triggering downstream recipe steps for dataset: '{dataset_name}" + ) + self.send_results_to_ispyb( + ispyb_d, + xtriage_results=xtriage_results, + cluster_num=cluster_num, + attachments=attachments, + ) + self._success_counter.inc() + else: + self._failure_counter.inc() + return success From 0a29ea6c7efbbfc2300ecb87387d18379e9cdd18 Mon Sep 17 00:00:00 2001 From: amyjaynethompson <52806925+amyjaynethompson@users.noreply.github.com> Date: Tue, 28 Apr 2026 09:04:48 +0100 Subject: [PATCH 2/4] documentation --- src/dlstbx/services/trigger.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/dlstbx/services/trigger.py b/src/dlstbx/services/trigger.py index 0a26a8b45..3c005b53d 100644 --- a/src/dlstbx/services/trigger.py +++ b/src/dlstbx/services/trigger.py @@ -2427,8 +2427,8 @@ def trigger_multiplex_filtering( # Check if finished copying files over needed_files = [ - multiplex_dir / "models.expt", - multiplex_dir / "observations.refl", + multiplex_dir / "models_scaled.expt", + multiplex_dir / "observations_scaled.refl", multiplex_dir / "scaled.mtz", multiplex_dir / "xia2-multiplex-working.phil", multiplex_dir / "xia2.multiplex.json", From 9e60c91d28b0b19975df48cb9290cf27c1433b01 Mon Sep 17 00:00:00 2001 From: amyjaynethompson <52806925+amyjaynethompson@users.noreply.github.com> Date: Wed, 6 May 2026 13:44:12 +0100 Subject: [PATCH 3/4] refactor to remove need for separate trigger function --- src/dlstbx/services/trigger.py | 385 +------------ src/dlstbx/wrapper/xia2_multiplex.py | 44 +- .../wrapper/xia2_multiplex_filtering.py | 521 ++++++++++-------- 3 files changed, 349 insertions(+), 601 deletions(-) diff --git a/src/dlstbx/services/trigger.py b/src/dlstbx/services/trigger.py index 3c005b53d..4245cfc65 100644 --- a/src/dlstbx/services/trigger.py +++ b/src/dlstbx/services/trigger.py @@ -230,34 +230,12 @@ class MultiplexParameters(pydantic.BaseModel): diffraction_plan_info: Optional[DiffractionPlanInfo] = None recipe: Optional[str] = None use_clustering: Optional[List[str]] = None - beamline: str - trigger_every_collection: bool - - -class MultiplexFilteringParameters(pydantic.BaseModel): - dcid: int = pydantic.Field(gt=0) - multiplex_job: pathlib.Path - automatic: Optional[bool] = False - comment: Optional[str] = None - diffraction_plan_info: Optional[DiffractionPlanInfo] = None - recipe: Optional[str] = None - beamline: str - multiplex_id: Optional[int] = pydantic.Field(default=0, gt=0) - cluster_num: Optional[str] = None - backoff_delay: float = pydantic.Field(default=8, alias="backoff-delay") - backoff_max_try: int = pydantic.Field(default=10, alias="backoff-max-try") - backoff_multiplier: float = pydantic.Field(default=2, alias="backoff-multiplier") - multiplex_parameters: dict + use_filtering: Optional[List[str]] = None filtering_group_size: Dict[str, int] = pydantic.Field( default={"default": 50}, alias="filtering-group-size" ) - - @pydantic.field_validator("multiplex_parameters", mode="before") - @classmethod - def convert_multiplex_parameters(cls, v): - if isinstance(v, ChainMapWithReplacement): - return dict(v) - return v + beamline: str + trigger_every_collection: bool class Xia2SsxReduceParameters(pydantic.BaseModel): @@ -2196,11 +2174,6 @@ def trigger_multiplex( else: job_parameters.append(("sample_group_id", str(group.sample_group_id))) - # Pass these to xia2.multiplex_filtering to avoid making all the same queries as above all over again - - for idx, dcid in enumerate(dcids): - job_parameters.append((f"group_dcid_{idx}", str(dcid))) - if parameters.spacegroup: job_parameters.append(("spacegroup", parameters.spacegroup)) if ( @@ -2220,348 +2193,27 @@ def trigger_multiplex( ("clustering.output_clusters", "true"), ] ) - for k, v in job_parameters: - jpp = self.ispyb.mx_processing.get_job_parameter_params() - jpp["job_id"] = jobid - jpp["parameter_key"] = k - jpp["parameter_value"] = v - jppid = self.ispyb.mx_processing.upsert_job_parameter( - list(jpp.values()) - ) - self.log.debug( - f"xia2.multiplex trigger generated JobParameterID {jppid} with {k}={v}", - ) - - message = {"recipes": [], "parameters": {"ispyb_process": jobid}} - rw.transport.send("processing_recipe", message) - - self.log.info(f"xia2.multiplex trigger: Processing job {jobid} triggered") - - return {"success": True, "return_value": jobids} - - @pydantic.validate_call(config={"arbitrary_types_allowed": True}) - def trigger_multiplex_filtering( - self, - rw: RecipeWrapper, - *, - parameters: MultiplexFilteringParameters, - session: sqlalchemy.orm.session.Session, - transaction: int, - message: Dict, - **kwargs, - ): - """Trigger a xia2.multiplex_filtering job for a given data collection. - This is triggered from a completed xia2.multiplex job and makes use of this - existing file structure. The filtering algorithms identify if there are any groups - of images which do not correlate with the rest of the data. The size of these - image groups can be varied. - - Currently, running filtering on clusters is not supported, so it is first verified - whether or not the triggered job corresponds to a cluster. - - A check is then run that the given multiplex directory exists, the job has finished - running, and that all the necessary files have finished copying. If they have not, a - delay is used like in xia2.multiplex. - - delay = backoff-delay * backoff-multiplier ** ntry - - If any xia2.multiplex jobs are still running (or files copying) after backoff-max-try - iterations, then xia2.multiplex_filtering will not run. - - The filtering parameters are then set (note future iterations will likey vary these) and - new ProcessingJob, ProcessingJobImageSweep and ProcessingJobParameter entries - will be created, and the resulting list of processingJobIds will be sent to - the `processing_recipe` queue. - - Note that initially this will only be triggered for VMXm. This filtering is done here - in the trigger service as future plans will roll out generally to all multiplex jobs. - - Recipe parameters: - - target: set this to "multiplex_filtering" - - beamline: the beamline as a string - - dcid: the dataCollectionId for the given data collection - - comment: a comment to be stored in the ProcessingJob.comment field - - automatic: boolean value passed to ProcessingJob.automatic field - - multiplex_job: location of the parent multiplex job ({ispyb_working_directory}/xia2.multiplex) - - multiplex_id:processingJobId of the parent multiplex job - - cluster_num: if the triggering multiplex job was a cluster, list the number - - multiplex_parameters: ispyb_processing_parameters for the parent multiplex job - - backoff-delay: base delay (in seconds) for exponential backoff in the event - that one or more xia2-dials processing job is still running or yet to start - - backoff-max-try: the number of times that a message re-sent before giving up - - backoff-multiplier: the multiplier for exponential backoff - - Example recipe parameters: - { - "target": "multiplex_filtering", - "dcid": "21152574", - "recipe": "postprocessing-multiplex-filtering", - "comment": "Extra filtering using xia2.multiplex triggered by automatic xia2.multiplex", - "automatic": true, - "beamline": "i02-2", - "multiplex_job": "/dls/mx/data/nt37104/nt37104-202/tmp/zocalo/VMXi-AB7814/well_62/images/image_291258/b8a346a9-c5cb-4eb5-8208-6115eb283910/xia2.multiplex", - "multiplex_id": "37941124", - "cluster_num": "$cluster_number", - "multiplex_parameters": { - "data": [ - "/dls/mx/data/nt37104/nt37104-202/processed/VMXi-AB7814/well_62/images/image_291252/c8ac49b8-741f-439d-94d7-3ab6f81b0c35/xia2-dials/DataFiles/nt37104v202_ximage291252_SAD_SWEEP1.refl;/dls/mx/data/nt37104/nt37104-202/processed/VMXi-AB7814/well_62/images/image_291252/c8ac49b8-741f-439d-94d7-3ab6f81b0c35/xia2-dials/DataFiles/nt37104v202_ximage291252_SAD_SWEEP1.expt", - "/dls/mx/data/nt37104/nt37104-202/processed/VMXi-AB7814/well_62/images/image_291253/cb1886c8-d3af-4412-ae9c-102859bab79c/xia2-dials/DataFiles/nt37104v202_ximage291253_SAD_SWEEP1.refl;/dls/mx/data/nt37104/nt37104-202/processed/VMXi-AB7814/well_62/images/image_291253/cb1886c8-d3af-4412-ae9c-102859bab79c/xia2-dials/DataFiles/nt37104v202_ximage291253_SAD_SWEEP1.expt", - "/dls/mx/data/nt37104/nt37104-202/processed/VMXi-AB7814/well_62/images/image_291254/4e44c987-5067-40d7-999e-4f89c70c7dc1/xia2-dials/DataFiles/nt37104v202_ximage291254_SAD_SWEEP1.expt;/dls/mx/data/nt37104/nt37104-202/processed/VMXi-AB7814/well_62/images/image_291254/4e44c987-5067-40d7-999e-4f89c70c7dc1/xia2-dials/DataFiles/nt37104v202_ximage291254_SAD_SWEEP1.refl", - "/dls/mx/data/nt37104/nt37104-202/processed/VMXi-AB7814/well_62/images/image_291255/1755523f-69c8-486b-8252-c2e961760488/xia2-dials/DataFiles/nt37104v202_ximage291255_SAD_SWEEP1.expt;/dls/mx/data/nt37104/nt37104-202/processed/VMXi-AB7814/well_62/images/image_291255/1755523f-69c8-486b-8252-c2e961760488/xia2-dials/DataFiles/nt37104v202_ximage291255_SAD_SWEEP1.refl", - "/dls/mx/data/nt37104/nt37104-202/processed/VMXi-AB7814/well_62/images/image_291256/0dba9917-4b69-454c-970c-6fde3cc956bf/xia2-dials/DataFiles/nt37104v202_ximage291256_SAD_SWEEP1.expt;/dls/mx/data/nt37104/nt37104-202/processed/VMXi-AB7814/well_62/images/image_291256/0dba9917-4b69-454c-970c-6fde3cc956bf/xia2-dials/DataFiles/nt37104v202_ximage291256_SAD_SWEEP1.refl", - "/dls/mx/data/nt37104/nt37104-202/processed/VMXi-AB7814/well_62/images/image_291257/2d732791-5c61-4b39-beb3-347506c4534b/xia2-dials/DataFiles/nt37104v202_ximage291257_SAD_SWEEP1.refl;/dls/mx/data/nt37104/nt37104-202/processed/VMXi-AB7814/well_62/images/image_291257/2d732791-5c61-4b39-beb3-347506c4534b/xia2-dials/DataFiles/nt37104v202_ximage291257_SAD_SWEEP1.expt", - "/dls/mx/data/nt37104/nt37104-202/processed/VMXi-AB7814/well_62/images/image_291258/4630060b-e3fd-4b6f-b28e-0a86f560a221/xia2-dials/DataFiles/nt37104v202_ximage291258_SAD_SWEEP1.expt;/dls/mx/data/nt37104/nt37104-202/processed/VMXi-AB7814/well_62/images/image_291258/4630060b-e3fd-4b6f-b28e-0a86f560a221/xia2-dials/DataFiles/nt37104v202_ximage291258_SAD_SWEEP1.refl" - ], - "sample_id": [ - "7087938" - ], - "group_dcid_0": [ - "21152550" - ], - "group_dcid_1": [ - "21152556" - ], - "group_dcid_2": [ - "21152559" - ], - "group_dcid_3": [ - "21152562" - ], - "group_dcid_4": [ - "21152565" - ], - "group_dcid_5": [ - "21152568" - ], - "group_dcid_6": [ - "21152574" - ], - "clustering.method": [ - "coordinate" - ], - "clustering.output_clusters": [ - "true" - ] - }, - "filtering_group_size": { - "default": 50, - "i02-1": 10 - }, - "backoff-delay": "8", - "backoff-max-try": "10", - "backoff-multiplier": "2" - } - """ - - # For now, just trigger for VMXm - if parameters.beamline != "i02-1": - self.log.info( - f"xia2.multiplex_filtering currently not triggered for beamline {parameters.beamline}." - ) - return {"success": True} - - # even though could be multiple different multiplex jobs for a given DCID, only have one here - # Think this is because the multiplex filtering is triggered off the individual multiplex jobs - - if parameters.cluster_num != "None": - is_cluster = True - else: - is_cluster = False - - if is_cluster: - self.log.info( - f"Incoming multiplex is cluster {parameters.cluster_num}. Filtering not currently supported for clusters." - ) - return {"success": True} - else: - multiplex_dir = parameters.multiplex_job - if not multiplex_dir.is_dir(): - self.log.error( - f"Given multiplex directory {multiplex_dir} does not exist. Aborting job." - ) - return {"success": True} - else: - self.log.info(f"Previous multiplex job at {multiplex_dir}") - # Add in delay in case not finished - status = { - "ntry": 0, - } - if isinstance(message, dict): - status.update(message.get("trigger-status", {})) - message_delay = int( - parameters.backoff_delay - * parameters.backoff_multiplier ** status["ntry"] - ) - status["ntry"] += 1 - self.log.debug( - f"dcid={parameters.dcid}\nmessage_delay={message_delay}\n{status}" - ) - - min_start_time = datetime.now() - timedelta(hours=24) - query = ( - ( - session.query(AutoProcProgram, ProcessingJob.dataCollectionId).join( - ProcessingJob, - ProcessingJob.processingJobId - == AutoProcProgram.processingJobId, - ) - ) - .filter(ProcessingJob.dataCollectionId == parameters.dcid) - .filter(ProcessingJob.automatic == True) # noqa E712 - .filter(ProcessingJob.recordTimestamp > min_start_time) # noqa E711 - .filter( - AutoProcProgram.processingJobId == parameters.multiplex_id - ) # check only parent multiplex - .filter( - or_( - AutoProcProgram.processingStatus == None, # noqa E711 - AutoProcProgram.processingStartTime == None, # noqa E711 - ) - ) - ) - - # If there are any running (or yet to start) jobs, then checkpoint with delay - waiting_processing_jobs = query.all() - - # Check if finished copying files over - - needed_files = [ - multiplex_dir / "models_scaled.expt", - multiplex_dir / "observations_scaled.refl", - multiplex_dir / "scaled.mtz", - multiplex_dir / "xia2-multiplex-working.phil", - multiplex_dir / "xia2.multiplex.json", - ] - - for mplx_file in needed_files: - if not mplx_file.is_file(): - waiting_processing_jobs.append(parameters.multiplex_id) - self.log.info( - f"Files still copying - {mplx_file} not yet present in {multiplex_dir}." - ) - break - - if len(waiting_processing_jobs) > 0: - self.log.info( - f"Waiting on xia2.multiplex {parameters.multiplex_job} from processing job id {parameters.multiplex_id} to finish processing (dcid {parameters.dcid})" - ) - if status["ntry"] > parameters.backoff_max_try: - # Give up waiting for this program to finish and trigger - # multiplex with remaining related results are available - self.log.info( - f"max-try exceeded, giving up waiting for related processings for dcid {parameters.dcid}\n" - ) - else: - # Send results to myself for next round of processing - rw.checkpoint( - { - "trigger-status": status, - }, - delay=message_delay, - transaction=transaction, - ) - - return {"success": True} - - dc = ( - session.query(DataCollection) - .filter(DataCollection.dataCollectionId == parameters.dcid) - .one() - ) - - # define params -> use image_mode because dataset mode is essentially equivalent to clustering - - job_parameters: list[tuple[str, str]] = [] - - job_parameters.extend([("input", str(multiplex_dir))]) + # See if beamline is in list of allowed ones for filtering + # If so, add filtering parameters to job_parameters + # This will set the xia2.multiplex wrapper to send the job for filtering after completed if ( - parameters.diffraction_plan_info - and parameters.diffraction_plan_info.anomalousScatterer + parameters.use_filtering + and parameters.beamline in parameters.use_filtering ): + group_size = parameters.filtering_group_size.get( + parameters.beamline, parameters.filtering_group_size["default"] + ) + job_parameters.extend( [ - ("anomalous", "true"), - ("absorption_level", "high"), + ("filtering.method", "deltacchalf"), + ("deltacchalf.stdcutoff", "3"), + ("deltacchalf.mode", "image_group"), + ("deltacchalf.group_size", str(group_size)), ] ) - - group_size = parameters.filtering_group_size.get( - parameters.beamline, parameters.filtering_group_size["default"] - ) - - job_parameters.extend( - [ - ("filtering.method", "deltacchalf"), - ("deltacchalf.stdcutoff", "3"), - ("deltacchalf.mode", "image_group"), - ("deltacchalf.group_size", str(group_size)), - ] - ) - - # compile all dcids actually used in previous multiplex job - # saves re-doing all the ispyb queries that multiplex does - # these are necessary as much filtering is done - # eg related dcids stored in the data base can be mix of grid scans and rotation which shoudl not be merged - - dcids = [] - - for i in parameters.multiplex_parameters: - if "group_dcid" in i: - dcids.append(parameters.multiplex_parameters[i][0]) - - # Need either sample_id or sample_group_id in the parameters so that synchweb will display the group name (ie xia2.multiplex_filtering (C7d2_)) - - if "sample_id" in parameters.multiplex_parameters: - sample_id = parameters.multiplex_parameters["sample_id"][0] - job_parameters.append(("sample_id", sample_id)) - self.log.debug(f"Has sample_id {sample_id}") - else: - sample_group_id = parameters.multiplex_parameters["sample_group_id"][0] - job_parameters.append(("sample_group_id", sample_group_id)) - self.log.debug(f"Has sample_group_id {sample_group_id}") - - # put parameters into ispyb data base - - jp = self.ispyb.mx_processing.get_job_params() - jp["automatic"] = parameters.automatic - jp["comments"] = parameters.comment - jp["datacollectionid"] = parameters.dcid - jp["display_name"] = "xia2.multiplex_filtering" - jp["recipe"] = "postprocessing-xia2-multiplex-filtering" - jobid = self.ispyb.mx_processing.upsert_job(list(jp.values())) - self.log.debug(f"xia2.multiplex_filtering trigger: generated JobID {jobid}") - - # Need to add all related dcids to jisp so that synchweb has a label for how many datasets are involved (ie 7x xia2.multiplex_filtering) - - query = ( - session.query(DataCollection) - .filter(DataCollection.dataCollectionId.in_(dcids)) - .options( - Load(DataCollection).load_only( - DataCollection.dataCollectionId, - DataCollection.wavelength, - DataCollection.startImageNumber, - DataCollection.numberOfImages, - raiseload=True, - ) - ) - ) - - for dc in query.all(): - jisp = self.ispyb.mx_processing.get_job_image_sweep_params() - jisp["datacollectionid"] = dc.dataCollectionId - jisp["start_image"] = dc.startImageNumber - jisp["end_image"] = dc.startImageNumber + dc.numberOfImages - 1 - jisp["job_id"] = jobid - jispid = self.ispyb.mx_processing.upsert_job_image_sweep( - list(jisp.values()) - ) - self.log.debug( - f"xia2.multiplex_filtering trigger: generated JobImageSweepID {jispid}" - ) - for k, v in job_parameters: jpp = self.ispyb.mx_processing.get_job_parameter_params() jpp["job_id"] = jobid @@ -2571,9 +2223,8 @@ def trigger_multiplex_filtering( list(jpp.values()) ) self.log.debug( - f"xia2.multiplex_filtering trigger generated JobParameterID {jppid} with {k}={v}" + f"xia2.multiplex trigger generated JobParameterID {jppid} with {k}={v}", ) - # send final message message = {"recipes": [], "parameters": {"ispyb_process": jobid}} rw.transport.send("processing_recipe", message) @@ -2582,7 +2233,7 @@ def trigger_multiplex_filtering( f"xia2.multiplex_filtering trigger: Processing job {jobid} triggered" ) - return {"success": True, "return_value": jobid} + return {"success": True, "return_value": jobid} @pydantic.validate_call(config={"arbitrary_types_allowed": True}) def trigger_xia2_ssx_reduce( diff --git a/src/dlstbx/wrapper/xia2_multiplex.py b/src/dlstbx/wrapper/xia2_multiplex.py index 164112a31..e554633a0 100644 --- a/src/dlstbx/wrapper/xia2_multiplex.py +++ b/src/dlstbx/wrapper/xia2_multiplex.py @@ -27,7 +27,12 @@ class Xia2MultiplexWrapper(Wrapper): name = "xia2.multiplex" def send_results_to_ispyb( - self, z, xtriage_results=None, cluster_num=None, attachments=[] + self, + z, + xtriage_results=None, + cluster_num=None, + attachments=[], + multiplex_filtering=False, ): ispyb_command_list = [] @@ -138,6 +143,14 @@ def send_results_to_ispyb( self.log.debug("Sending %s", str(ispyb_command_list)) self.recwrap.send_to("ispyb", {"ispyb_command_list": ispyb_command_list}) + # After xia2.multiplex, xia2.multiplex_filtering can be optionally run to improve data reduction + # Currently not supported for clusters + # Only triggered if filtering parameters exist -> logic handled in trigger_multiplex + + if not cluster_num and multiplex_filtering: + self.log.info("Triggering xia2.multiplex filtering.") + self.recwrap.send_to("filtering", True) + def construct_commandline(self, params): """Construct xia2.multiplex command line. Takes job parameter dictionary, returns array.""" @@ -150,10 +163,15 @@ def construct_commandline(self, params): command.append(f) if params.get("ispyb_parameters"): - ignore = {"sample_id", "sample_group_id"} - for param in params["ispyb_parameters"]: - if "group_dcid" in param: - ignore.add(param) + # ignore filtering parameters for xia2.multiplex_filtering + ignore = { + "sample_id", + "sample_group_id", + "filtering.method", + "deltacchalf.stdcutoff", + "deltacchalf.mode", + "deltacchalf.group_size", + } translation = { "d_min": "resolution.d_min", "spacegroup": "symmetry.space_group", @@ -430,11 +448,27 @@ def is_final_result(final_file: pathlib.Path) -> bool: self.log.info( f"Triggering downstream recipe steps for dataset: '{dataset_name}'" ) + + # Check if filtering parameters present -> if so, trigger filtering when sending results to ispyb + # Whether or not these are present is currently handled by the trigger service + + filtering = False + + if params.get("ispyb_parameters"): + if ("filtering.method", ["deltacchalf"]) in params[ + "ispyb_parameters" + ].items(): + self.log.info( + "Additional filtering with xia2.multiplex_filtering selected." + ) + filtering = True + self.send_results_to_ispyb( ispyb_d, xtriage_results=xtriage_results, cluster_num=cluster_num, attachments=attachments, + multiplex_filtering=filtering, ) self._success_counter.inc() diff --git a/src/dlstbx/wrapper/xia2_multiplex_filtering.py b/src/dlstbx/wrapper/xia2_multiplex_filtering.py index 1cf568fb9..74732381c 100644 --- a/src/dlstbx/wrapper/xia2_multiplex_filtering.py +++ b/src/dlstbx/wrapper/xia2_multiplex_filtering.py @@ -149,14 +149,20 @@ def send_results_to_ispyb( self.log.debug(f"Sending {ispyb_command_list}") self.recwrap.send_to("ispyb", {"ispyb_command_list": ispyb_command_list}) - def construct_commandline(self, params): - command = ["xia2.multiplex_filtering"] + def construct_commandline(self, params, multiplex_directory): + command = ["xia2.multiplex_filtering", f"input={multiplex_directory}"] # xia2.multiplex_filtering uses previous multiplex job files # so no specific data files to be input, just add ispyb_parameters to cmdline if params.get("ispyb_parameters"): - ignore = {"sample_id", "sample_group_id"} + ignore = { + "sample_id", + "sample_group_id", + "data", + "clustering.method", + "clustering.output_clusters", + } translation = { "d_min": "resolution.d_min", } @@ -171,11 +177,13 @@ def run(self): params = self.recwrap.recipe_step["job_parameters"] - command = self.construct_commandline(params) - working_directory = pathlib.Path(params["working_directory"]) results_directory = pathlib.Path(params["results_directory"]) + multiplex_dir = pathlib.Path(params["multiplex_directory"]) + + command = self.construct_commandline(params, multiplex_dir) + # Create working directory with symbolic link working_directory.mkdir(parents=True, exist_ok=True) @@ -184,254 +192,309 @@ def run(self): working_directory, params["create_symlink"] ) - # run xia2.multiplex_filtering in working directory + # Need to check that the required multiplex files have copied into the right directory + # (tmp/zocalo/) + # If not, implement backoff delay analogous to that used in the trigger service - self.log.info(f"command: {' '.join(command)}") - try: - start_time = time.perf_counter() - result = subprocess.run( - command, - capture_output=True, - timeout=params.get("timeout"), - cwd=working_directory, - ) - runtime = time.perf_counter() - start_time - self.log.info(f"xia2.multiplex_filtering took {runtime} seconds") - self._runtime_hist.observe(runtime) - except subprocess.TimeoutExpired as te: - success = False - self.log.warning( - f"xia2.multiplex_filtering timed out: {te.timeout}\n {te.cmd}" - ) - self.log.debug(te.stdout) - self.log.debug(te.stderr) - self._timeout_counter.inc() - else: - if success := not result.returncode: - self.log.info("xia2.multiplex_filtering successful") + needed_files = [ + multiplex_dir / "models.expt", + multiplex_dir / "observations.refl", + multiplex_dir / "scaled.mtz", + multiplex_dir / "xia2-multiplex-working.phil", + multiplex_dir / "xia2.multiplex.json", + ] + + ntry = 0 + waiting = True + timedout = False + backoff_max_try = 10 + backoff_multiplier = 2 + backoff_delay = 8 + while waiting: + waiting_processing_files = [] + for mplx_file in needed_files: + if not mplx_file.is_file(): + waiting_processing_files.append(mplx_file) + self.log.info( + f"Files still copying - {mplx_file} not yet present in {multiplex_dir}." + ) + if len(waiting_processing_files) > 0 and ntry < backoff_max_try: + delay = int(backoff_delay * backoff_multiplier**ntry) + time.sleep(delay) + ntry += 1 + elif len(waiting_processing_files) > 0 and ntry >= backoff_max_try: + self.log.warning("Timed out waiting for xia2.multiplex files to copy.") + timedout = True + waiting = False else: - self.log.info( - f"xia2.multiplex_filtering failed with exitcode {result.returncode}" + self.log.info("All files present for xia2.multiplex_filtering") + waiting = False + + if not timedout: + # run xia2.multiplex_filtering in working directory + + self.log.info(f"command: {' '.join(command)}") + try: + start_time = time.perf_counter() + result = subprocess.run( + command, + capture_output=True, + timeout=params.get("timeout"), + cwd=working_directory, + ) + runtime = time.perf_counter() - start_time + self.log.info(f"xia2.multiplex_filtering took {runtime} seconds") + self._runtime_hist.observe(runtime) + except subprocess.TimeoutExpired as te: + success = False + self.log.warning( + f"xia2.multiplex_filtering timed out: {te.timeout}\n {te.cmd}" ) - self.log.debug(result.stdout) - self.log.debug(result.stderr) - self.log.info(f"working_directory: {working_directory}") + self.log.debug(te.stdout) + self.log.debug(te.stderr) + self._timeout_counter.inc() + else: + if success := not result.returncode: + self.log.info("xia2.multiplex_filtering successful") + else: + self.log.info( + f"xia2.multiplex_filtering failed with exitcode {result.returncode}" + ) + self.log.debug(result.stdout) + self.log.debug(result.stderr) + self.log.info(f"working_directory: {working_directory}") - filtered_unmerged_mtz = working_directory / "filtered_unmerged.mtz" - multiplex_filtering_json = working_directory / "xia2.multiplex_filtering.json" + filtered_unmerged_mtz = working_directory / "filtered_unmerged.mtz" + multiplex_filtering_json = ( + working_directory / "xia2.multiplex_filtering.json" + ) - if not (filtered_unmerged_mtz.is_file() and multiplex_filtering_json.is_file()): - success = False + if not ( + filtered_unmerged_mtz.is_file() and multiplex_filtering_json.is_file() + ): + success = False - # Create results directory - results_directory.mkdir(parents=True, exist_ok=True) - if params.get("create_symlink"): - dlstbx.util.symlink.create_parent_symlink( - results_directory, params["create_symlink"] - ) - if pipeline_final_params := params.get("pipeline-final", []): - final_directory = pathlib.Path(pipeline_final_params["path"]) - final_directory.mkdir(parents=True, exist_ok=True) + # Create results directory + results_directory.mkdir(parents=True, exist_ok=True) if params.get("create_symlink"): dlstbx.util.symlink.create_parent_symlink( - final_directory, params["create_symlink"] - ) - - # Maybe move this function elsewhere - def is_final_result(final_file: pathlib.Path) -> bool: - return any( - fnmatch(str(final_file.name), patt) - for patt in pipeline_final_params["patterns"] + results_directory, params["create_symlink"] ) - - keep_ext = { - ".png": None, - ".log": "log", - ".json": None, - ".expt": None, - ".refl": None, - ".mtz": None, - ".html": "log", - ".sca": None, - } - keep = { - "filtered.mtz": "result", - "filtered_unmerged.mtz": "result", - "filtered.expt": "result", - "filtered.refl": "result", - "filtered.sca": "result", - "merging-stats.json": "graph", - "xia2.multiplex_filtering.json": "result", - } - - # Record these log files first so they appear at the top of the list - # of attachments in SynchWeb - - primary_log_files = [ - working_directory / "xia2.multiplex_filtering.html", - working_directory / "xia2.multiplex_filtering.log", - ] - - allfiles = [] - - if success: - with multiplex_filtering_json.open("r") as fh: - d = json.load(fh) - - # Retain place holder logic from multiplex wrapper for future iterations with clusters - # Note that 'all data' corresponds to the parent multiplex job - # It is also added to the multiplex_filtering_json so the html has user-friendly comparisons - # However, as it is just a copy of the results from multiplex, it is ignored here - - for dataset_name, dataset in d["datasets"].items(): - if dataset_name == "Filtered": - base_dir = working_directory - dimple_symlink = "dimple-xia2.multiplex_filtering" - cluster_prefix = "" - cluster_num = None - elif "coordinate cluster" in dataset_name: - self.log.warning("Not currently applying filtering to clusters") - continue - elif dataset_name == "All data": - self.log.warning( - 'Results for "All Data" already in parent xia2.multiplex run.' + if pipeline_final_params := params.get("pipeline-final", []): + final_directory = pathlib.Path(pipeline_final_params["path"]) + final_directory.mkdir(parents=True, exist_ok=True) + if params.get("create_symlink"): + dlstbx.util.symlink.create_parent_symlink( + final_directory, params["create_symlink"] ) - continue - else: - self.log.warning( - f"Ignoring unrecognised dataset pattern {dataset_name}" + + # Maybe move this function elsewhere + def is_final_result(final_file: pathlib.Path) -> bool: + return any( + fnmatch(str(final_file.name), patt) + for patt in pipeline_final_params["patterns"] ) - continue - filtered_unmerged_mtz = ( - base_dir / f"{cluster_prefix}filtered_unmerged.mtz" - ) - i_obs = iotbx.merging_statistics.select_data( - filtered_unmerged_mtz.as_posix(), data_labels=None - ) - merging_stats = dataset["merging_stats"] - merging_stats_anom = dataset["merging_stats_anom"] - with (base_dir / f"{cluster_prefix}merging-stats.json").open("w") as fh: - json.dump(merging_stats, fh) - - ispyb_d = { - "commandline": " ".join(command), - "spacegroup": i_obs.space_group().type().lookup_symbol(), - "unit_cell": list(i_obs.unit_cell().parameters()), - "scaling_statistics": {}, - } - for shell in ("overall", "innerShell", "outerShell"): - ispyb_d["scaling_statistics"][shell] = { - "cc_half": lookup(merging_stats, "cc_one_half", shell), - "completeness": lookup(merging_stats, "completeness", shell), - "mean_i_sig_i": lookup( - merging_stats, "i_over_sigma_mean", shell - ), - "multiplicity": lookup(merging_stats, "multiplicity", shell), - "n_tot_obs": lookup(merging_stats, "n_obs", shell), - "n_tot_unique_obs": lookup(merging_stats, "n_uniq", shell), - "r_merge": lookup(merging_stats, "r_merge", shell), - "res_lim_high": uctbx.d_star_sq_as_d( - lookup(merging_stats, "d_star_sq_min", shell) - ), - "res_lim_low": uctbx.d_star_sq_as_d( - lookup(merging_stats, "d_star_sq_max", shell) - ), - "anom_completeness": lookup( - merging_stats_anom, "anom_completeness", shell - ), - "anom_multiplicity": lookup( - merging_stats_anom, "multiplicity", shell - ), - "cc_anom": lookup(merging_stats_anom, "cc_anom", shell), - "r_meas_all_iplusi_minus": lookup( - merging_stats_anom, "r_meas", shell - ), - } - xtriage_results = dataset.get("xtriage") - attachments = [] + keep_ext = { + ".png": None, + ".log": "log", + ".json": None, + ".expt": None, + ".refl": None, + ".mtz": None, + ".html": "log", + ".sca": None, + } + keep = { + "filtered.mtz": "result", + "filtered_unmerged.mtz": "result", + "filtered.expt": "result", + "filtered.refl": "result", + "filtered.sca": "result", + "merging-stats.json": "graph", + "xia2.multiplex_filtering.json": "result", + } - for filename in set(primary_log_files + list(base_dir.iterdir())): - filetype = None - if not filename.is_file(): + # Record these log files first so they appear at the top of the list + # of attachments in SynchWeb + + primary_log_files = [ + working_directory / "xia2.multiplex_filtering.html", + working_directory / "xia2.multiplex_filtering.log", + ] + + allfiles = [] + + if success: + with multiplex_filtering_json.open("r") as fh: + d = json.load(fh) + + # Retain place holder logic from multiplex wrapper for future iterations with clusters + # Note that 'all data' corresponds to the parent multiplex job + # It is also added to the multiplex_filtering_json so the html has user-friendly comparisons + # However, as it is just a copy of the results from multiplex, it is ignored here + + for dataset_name, dataset in d["datasets"].items(): + if dataset_name == "Filtered": + base_dir = working_directory + dimple_symlink = "dimple-xia2.multiplex_filtering" + cluster_prefix = "" + cluster_num = None + elif "coordinate cluster" in dataset_name: + self.log.warning("Not currently applying filtering to clusters") continue - filetype = keep_ext.get(filename.suffix) - for file_pattern in keep: - if filename.name.endswith(file_pattern): - filetype = keep[file_pattern] - if filetype is None: + elif dataset_name == "All data": + self.log.warning( + 'Results for "All Data" already in parent xia2.multiplex run.' + ) continue - - destination = results_directory / filename.name - if ( - destination.as_posix() in allfiles - and filename not in primary_log_files - ): - destination = ( - results_directory / f"{cluster_prefix}{filename.name}" + else: + self.log.warning( + f"Ignoring unrecognised dataset pattern {dataset_name}" ) + continue - if destination.as_posix() not in allfiles: - self.log.debug(f"Copying {filename} to {destination}") - shutil.copy(filename, destination) - allfiles.append(destination.as_posix()) - if pipeline_final_params and is_final_result(destination): - destination = final_directory / destination.name - if destination not in allfiles: + filtered_unmerged_mtz = ( + base_dir / f"{cluster_prefix}filtered_unmerged.mtz" + ) + i_obs = iotbx.merging_statistics.select_data( + filtered_unmerged_mtz.as_posix(), data_labels=None + ) + merging_stats = dataset["merging_stats"] + merging_stats_anom = dataset["merging_stats_anom"] + with (base_dir / f"{cluster_prefix}merging-stats.json").open( + "w" + ) as fh: + json.dump(merging_stats, fh) + + ispyb_d = { + "commandline": " ".join(command), + "spacegroup": i_obs.space_group().type().lookup_symbol(), + "unit_cell": list(i_obs.unit_cell().parameters()), + "scaling_statistics": {}, + } + for shell in ("overall", "innerShell", "outerShell"): + ispyb_d["scaling_statistics"][shell] = { + "cc_half": lookup(merging_stats, "cc_one_half", shell), + "completeness": lookup( + merging_stats, "completeness", shell + ), + "mean_i_sig_i": lookup( + merging_stats, "i_over_sigma_mean", shell + ), + "multiplicity": lookup( + merging_stats, "multiplicity", shell + ), + "n_tot_obs": lookup(merging_stats, "n_obs", shell), + "n_tot_unique_obs": lookup(merging_stats, "n_uniq", shell), + "r_merge": lookup(merging_stats, "r_merge", shell), + "res_lim_high": uctbx.d_star_sq_as_d( + lookup(merging_stats, "d_star_sq_min", shell) + ), + "res_lim_low": uctbx.d_star_sq_as_d( + lookup(merging_stats, "d_star_sq_max", shell) + ), + "anom_completeness": lookup( + merging_stats_anom, "anom_completeness", shell + ), + "anom_multiplicity": lookup( + merging_stats_anom, "multiplicity", shell + ), + "cc_anom": lookup(merging_stats_anom, "cc_anom", shell), + "r_meas_all_iplusi_minus": lookup( + merging_stats_anom, "r_meas", shell + ), + } + xtriage_results = dataset.get("xtriage") + attachments = [] + + for filename in set(primary_log_files + list(base_dir.iterdir())): + filetype = None + if not filename.is_file(): + continue + filetype = keep_ext.get(filename.suffix) + for file_pattern in keep: + if filename.name.endswith(file_pattern): + filetype = keep[file_pattern] + if filetype is None: + continue + + destination = results_directory / filename.name + if ( + destination.as_posix() in allfiles + and filename not in primary_log_files + ): + destination = ( + results_directory / f"{cluster_prefix}{filename.name}" + ) + + if destination.as_posix() not in allfiles: self.log.debug(f"Copying {filename} to {destination}") shutil.copy(filename, destination) allfiles.append(destination.as_posix()) - - if filetype: - attachments.append( - { - "file_path": destination.parent.as_posix(), - "file_name": destination.name, - "file_type": filetype, - "importance_rank": ( - 1 - if destination.name.endswith( - ( - "filtered.mtz", - "xia2.multiplex_filtering.html", - "xia2.multiplex_filtering.log", + if pipeline_final_params and is_final_result(destination): + destination = final_directory / destination.name + if destination not in allfiles: + self.log.debug(f"Copying {filename} to {destination}") + shutil.copy(filename, destination) + allfiles.append(destination.as_posix()) + + if filetype: + attachments.append( + { + "file_path": destination.parent.as_posix(), + "file_name": destination.name, + "file_type": filetype, + "importance_rank": ( + 1 + if destination.name.endswith( + ( + "filtered.mtz", + "xia2.multiplex_filtering.html", + "xia2.multiplex_filtering.log", + ) ) - ) - else 2 - ), - } - ) + else 2 + ), + } + ) - # Add parameters to the environment to be picked up downstream by trigger function + # Add parameters to the environment to be picked up downstream by trigger function - self.recwrap.environment.update( - { - "filtered_mtz": ( - results_directory / f"{cluster_prefix}filtered.mtz" - ).as_posix() - } - ) - self.recwrap.environment.update( - { - "filtered_unmerged_mtz": ( - results_directory / f"{cluster_prefix}filtered_unmerged.mtz" - ).as_posix() - } - ) - self.recwrap.environment.update({"dimple_symlink": dimple_symlink}) + # As using the same downstream triggers, update "scaled_mtz" rather than calling it by "filtered.mtz" + + self.recwrap.environment.update( + { + "scaled_mtz": ( + results_directory / f"{cluster_prefix}filtered.mtz" + ).as_posix() + } + ) + self.recwrap.environment.update( + { + "scaled_unmerged_mtz": ( + results_directory + / f"{cluster_prefix}filtered_unmerged.mtz" + ).as_posix() + } + ) + self.recwrap.environment.update({"dimple_symlink": dimple_symlink}) - # Send results to ispyb and trigger downstream recipe steps for this dataset + # Send results to ispyb and trigger downstream recipe steps for this dataset - self.log.info( - f"Triggering downstream recipe steps for dataset: '{dataset_name}" - ) - self.send_results_to_ispyb( - ispyb_d, - xtriage_results=xtriage_results, - cluster_num=cluster_num, - attachments=attachments, - ) - self._success_counter.inc() + self.log.info( + f"Triggering downstream recipe steps for dataset: '{dataset_name}" + ) + self.send_results_to_ispyb( + ispyb_d, + xtriage_results=xtriage_results, + cluster_num=cluster_num, + attachments=attachments, + ) + self._success_counter.inc() + else: + self._failure_counter.inc() else: - self._failure_counter.inc() + # if timed out and gave up because file copying unsuccessful then return False for success state + success = False return success From 4384938ca7b16196a34afbd103660d033696f4ac Mon Sep 17 00:00:00 2001 From: amyjaynethompson <52806925+amyjaynethompson@users.noreply.github.com> Date: Wed, 6 May 2026 14:01:19 +0100 Subject: [PATCH 4/4] removing testing code --- src/dlstbx/services/trigger.py | 3 +-- src/dlstbx/wrapper/xia2_multiplex.py | 2 -- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/src/dlstbx/services/trigger.py b/src/dlstbx/services/trigger.py index 4245cfc65..fe3dd11a0 100644 --- a/src/dlstbx/services/trigger.py +++ b/src/dlstbx/services/trigger.py @@ -74,7 +74,6 @@ class RelatedDCIDs(pydantic.BaseModel): dcids: List[int] sample_id: Optional[int] = pydantic.Field(gt=0, default=None) sample_group_id: Optional[int] = pydantic.Field(gt=0, default=None) - name: Optional[str] = None class MetalIdParameters(pydantic.BaseModel): @@ -2233,7 +2232,7 @@ def trigger_multiplex( f"xia2.multiplex_filtering trigger: Processing job {jobid} triggered" ) - return {"success": True, "return_value": jobid} + return {"success": True, "return_value": jobids} @pydantic.validate_call(config={"arbitrary_types_allowed": True}) def trigger_xia2_ssx_reduce( diff --git a/src/dlstbx/wrapper/xia2_multiplex.py b/src/dlstbx/wrapper/xia2_multiplex.py index e554633a0..b786923e1 100644 --- a/src/dlstbx/wrapper/xia2_multiplex.py +++ b/src/dlstbx/wrapper/xia2_multiplex.py @@ -442,8 +442,6 @@ def is_final_result(final_file: pathlib.Path) -> bool: ) self.recwrap.environment.update({"dimple_symlink": dimple_symlink}) - self.recwrap.environment.update({"cluster_number": cluster_num}) - # Send results to ispyb and trigger downstream recipe steps for this dataset self.log.info( f"Triggering downstream recipe steps for dataset: '{dataset_name}'"