diff --git a/setup.py b/setup.py index 91ea4695f..9bd1fb46f 100644 --- a/setup.py +++ b/setup.py @@ -93,6 +93,7 @@ "xchem_collate = dlstbx.wrapper.xchem_collate:XChemCollateWrapper", "xia2 = dlstbx.wrapper.xia2:Xia2Wrapper", "xia2.multiplex = dlstbx.wrapper.xia2_multiplex:Xia2MultiplexWrapper", + "xia2.multiplex_filtering = dlstbx.wrapper.xia2_multiplex_filtering:Xia2MultiplexFilteringWrapper", "xia2.overload = dlstbx.wrapper.xia2_overload:Xia2OverloadWrapper", "xia2.strategy = dlstbx.wrapper.xia2_strategy:Xia2StrategyWrapper", "xia2.to_shelxcde = dlstbx.wrapper.xia2_to_shelxcde:Xia2toShelxcdeWrapper", diff --git a/src/dlstbx/services/trigger.py b/src/dlstbx/services/trigger.py index d74ff5d1e..fe3dd11a0 100644 --- a/src/dlstbx/services/trigger.py +++ b/src/dlstbx/services/trigger.py @@ -229,6 +229,10 @@ class MultiplexParameters(pydantic.BaseModel): diffraction_plan_info: Optional[DiffractionPlanInfo] = None recipe: Optional[str] = None use_clustering: Optional[List[str]] = None + use_filtering: Optional[List[str]] = None + filtering_group_size: Dict[str, int] = pydantic.Field( + default={"default": 50}, alias="filtering-group-size" + ) beamline: str trigger_every_collection: bool @@ -2168,6 +2172,7 @@ def trigger_multiplex( job_parameters.append(("sample_id", str(group.sample_id))) else: job_parameters.append(("sample_group_id", str(group.sample_group_id))) + if parameters.spacegroup: job_parameters.append(("spacegroup", parameters.spacegroup)) if ( @@ -2187,6 +2192,27 @@ def trigger_multiplex( ("clustering.output_clusters", "true"), ] ) + + # See if beamline is in list of allowed ones for filtering + # If so, add filtering parameters to job_parameters + # This will set the xia2.multiplex wrapper to send the job for filtering after completed + + if ( + parameters.use_filtering + and parameters.beamline in parameters.use_filtering + ): + group_size = parameters.filtering_group_size.get( + parameters.beamline, parameters.filtering_group_size["default"] + ) + + job_parameters.extend( + [ + ("filtering.method", "deltacchalf"), + ("deltacchalf.stdcutoff", "3"), + ("deltacchalf.mode", "image_group"), + ("deltacchalf.group_size", str(group_size)), + ] + ) for k, v in job_parameters: jpp = self.ispyb.mx_processing.get_job_parameter_params() jpp["job_id"] = jobid @@ -2202,7 +2228,9 @@ def trigger_multiplex( message = {"recipes": [], "parameters": {"ispyb_process": jobid}} rw.transport.send("processing_recipe", message) - self.log.info(f"xia2.multiplex trigger: Processing job {jobid} triggered") + self.log.info( + f"xia2.multiplex_filtering trigger: Processing job {jobid} triggered" + ) return {"success": True, "return_value": jobids} diff --git a/src/dlstbx/wrapper/xia2_multiplex.py b/src/dlstbx/wrapper/xia2_multiplex.py index 17be9e0b1..b786923e1 100644 --- a/src/dlstbx/wrapper/xia2_multiplex.py +++ b/src/dlstbx/wrapper/xia2_multiplex.py @@ -27,7 +27,12 @@ class Xia2MultiplexWrapper(Wrapper): name = "xia2.multiplex" def send_results_to_ispyb( - self, z, xtriage_results=None, cluster_num=None, attachments=[] + self, + z, + xtriage_results=None, + cluster_num=None, + attachments=[], + multiplex_filtering=False, ): ispyb_command_list = [] @@ -138,6 +143,14 @@ def send_results_to_ispyb( self.log.debug("Sending %s", str(ispyb_command_list)) self.recwrap.send_to("ispyb", {"ispyb_command_list": ispyb_command_list}) + # After xia2.multiplex, xia2.multiplex_filtering can be optionally run to improve data reduction + # Currently not supported for clusters + # Only triggered if filtering parameters exist -> logic handled in trigger_multiplex + + if not cluster_num and multiplex_filtering: + self.log.info("Triggering xia2.multiplex filtering.") + self.recwrap.send_to("filtering", True) + def construct_commandline(self, params): """Construct xia2.multiplex command line. Takes job parameter dictionary, returns array.""" @@ -150,7 +163,15 @@ def construct_commandline(self, params): command.append(f) if params.get("ispyb_parameters"): - ignore = {"sample_id", "sample_group_id"} + # ignore filtering parameters for xia2.multiplex_filtering + ignore = { + "sample_id", + "sample_group_id", + "filtering.method", + "deltacchalf.stdcutoff", + "deltacchalf.mode", + "deltacchalf.group_size", + } translation = { "d_min": "resolution.d_min", "spacegroup": "symmetry.space_group", @@ -425,11 +446,27 @@ def is_final_result(final_file: pathlib.Path) -> bool: self.log.info( f"Triggering downstream recipe steps for dataset: '{dataset_name}'" ) + + # Check if filtering parameters present -> if so, trigger filtering when sending results to ispyb + # Whether or not these are present is currently handled by the trigger service + + filtering = False + + if params.get("ispyb_parameters"): + if ("filtering.method", ["deltacchalf"]) in params[ + "ispyb_parameters" + ].items(): + self.log.info( + "Additional filtering with xia2.multiplex_filtering selected." + ) + filtering = True + self.send_results_to_ispyb( ispyb_d, xtriage_results=xtriage_results, cluster_num=cluster_num, attachments=attachments, + multiplex_filtering=filtering, ) self._success_counter.inc() diff --git a/src/dlstbx/wrapper/xia2_multiplex_filtering.py b/src/dlstbx/wrapper/xia2_multiplex_filtering.py new file mode 100644 index 000000000..74732381c --- /dev/null +++ b/src/dlstbx/wrapper/xia2_multiplex_filtering.py @@ -0,0 +1,500 @@ +from __future__ import annotations + +import json +import pathlib +import shutil +import subprocess +import time +from fnmatch import fnmatch + +import iotbx.merging_statistics +from cctbx import uctbx + +import dlstbx.util.symlink +from dlstbx.wrapper import Wrapper + + +def lookup(merging_stats, item, shell): + i_bin = {"innerShell": 0, "outerShell": -1}.get(shell) + if i_bin is not None: + return merging_stats[item][i_bin] + return merging_stats["overall"][item] + + +class Xia2MultiplexFilteringWrapper(Wrapper): + """ + Wrapper for xia2.multiplex_filtering. Largely based off wrapper for xia2.multiplex. + + xia2.multiplex_filtering takes existing multiplex results and performs some + additional filtering to improve data reduction quality. + + """ + + _logger_name = "dlstbx.wrap.xia2.multiplex_filtering" + name = "xia2.multiplex_filtering" + + def send_results_to_ispyb( + self, z, xtriage_results=None, cluster_num=None, attachments=[] + ): + ispyb_command_list = [] + + # Place holder code for future iterations where may run filtering jobs on clusters + + if cluster_num is not None: + register_autoproc_prog = { + "ispyb_command": "register_processing", + "program": "xia2.multiplex_filtering", + "cmdline": "xia2.multiplex_filtering (ap-zoc)", + "environment": {"cluster": cluster_num}, + "store_result": "ispyb_autoprocprogram_id", + } + ispyb_command_list.append(register_autoproc_prog) + + # Step 1: Add new record to AutoProc, keep the AutoProcID + + register_autoproc = { + "ispyb_command": "write_autoproc", + "autoproc_id": None, + "store_result": "ispyb_autoproc_id", + "spacegroup": z["spacegroup"], + "refinedcell_a": z["unit_cell"][0], + "refinedcell_b": z["unit_cell"][1], + "refinedcell_c": z["unit_cell"][2], + "refinedcell_alpha": z["unit_cell"][3], + "refinedcell_beta": z["unit_cell"][4], + "refinedcell_gamma": z["unit_cell"][5], + } + ispyb_command_list.append(register_autoproc) + + # Step 2: Store scaling results, linked to the AutoProcID + # Keep the AutoProcScalingID + + insert_scaling = z["scaling_statistics"] + insert_scaling.update( + { + "ispyb_command": "insert_scaling", + "autoproc_id": "$ispyb_autoproc_id", + "store_result": "ispyb_autoprocscaling_id", + } + ) + ispyb_command_list.append(insert_scaling) + + # Step 3: Store integration result, linked to the ScalingID + # Use pre-registered integration id for 'Filtered' dataset + + if cluster_num is not None: + integration_id = None + else: + integration_id = "$ispyb_integration_id" + + integration = { + "ispyb_command": "upsert_integration", + "scaling_id": "$ispyb_autoprocscaling_id", + "integration_id": integration_id, + "cell_a": z["unit_cell"][0], + "cell_b": z["unit_cell"][1], + "cell_c": z["unit_cell"][2], + "cell_alpha": z["unit_cell"][3], + "cell_beta": z["unit_cell"][4], + "cell_gamma": z["unit_cell"][5], + } + ispyb_command_list.append(integration) + + # Step 4: Upload attachments + + if attachments: + for attachment in attachments: + upload_attachment = { + "ispyb_command": "add_program_attachment", + "program_id": "$ispyb_autoprocprogram_id", + "file_name": attachment["file_name"], + "file_path": attachment["file_path"], + "file_type": attachment["file_type"], + "importance_rank": attachment["importance_rank"], + } + ispyb_command_list.append(upload_attachment) + + # Step 5: Register successful processing for cluster jobs (placeholder code) + + if cluster_num is not None: + update_autoproc_prog = { + "ispyb_command": "update_processing_status", + "program_id": "$ispyb_autoprocprogram_id", + "message": "processing successful", + "status": "success", + } + ispyb_command_list.append(update_autoproc_prog) + + if xtriage_results is not None: + for level, messages in xtriage_results.items(): + for message in messages: + if ( + message["text"] + == "The merging statistics indicate that the data may be assigned to the wrong space group." + ): + # this is not a useful warning for multi-crystal + continue + ispyb_command_list.append( + { + "ispyb_command": "add_program_message", + "program_id": "$ispyb_autoprocprogram_id", + "message": message["text"], + "description": message["summary"], + "severity": {0: "INFO", 1: "WARNING", 2: "ERROR"}.get( + message["level"] + ), + } + ) + + self.log.debug(f"Sending {ispyb_command_list}") + self.recwrap.send_to("ispyb", {"ispyb_command_list": ispyb_command_list}) + + def construct_commandline(self, params, multiplex_directory): + command = ["xia2.multiplex_filtering", f"input={multiplex_directory}"] + + # xia2.multiplex_filtering uses previous multiplex job files + # so no specific data files to be input, just add ispyb_parameters to cmdline + + if params.get("ispyb_parameters"): + ignore = { + "sample_id", + "sample_group_id", + "data", + "clustering.method", + "clustering.output_clusters", + } + translation = { + "d_min": "resolution.d_min", + } + for param, value in params["ispyb_parameters"].items(): + if param not in ignore: + command.append(translation.get(param, param) + "=" + value[0]) + + return command + + def run(self): + assert hasattr(self, "recwrap"), "No recipewrapper object found" + + params = self.recwrap.recipe_step["job_parameters"] + + working_directory = pathlib.Path(params["working_directory"]) + results_directory = pathlib.Path(params["results_directory"]) + + multiplex_dir = pathlib.Path(params["multiplex_directory"]) + + command = self.construct_commandline(params, multiplex_dir) + + # Create working directory with symbolic link + + working_directory.mkdir(parents=True, exist_ok=True) + if params.get("create_symlink"): + dlstbx.util.symlink.create_parent_symlink( + working_directory, params["create_symlink"] + ) + + # Need to check that the required multiplex files have copied into the right directory + # (tmp/zocalo/) + # If not, implement backoff delay analogous to that used in the trigger service + + needed_files = [ + multiplex_dir / "models.expt", + multiplex_dir / "observations.refl", + multiplex_dir / "scaled.mtz", + multiplex_dir / "xia2-multiplex-working.phil", + multiplex_dir / "xia2.multiplex.json", + ] + + ntry = 0 + waiting = True + timedout = False + backoff_max_try = 10 + backoff_multiplier = 2 + backoff_delay = 8 + while waiting: + waiting_processing_files = [] + for mplx_file in needed_files: + if not mplx_file.is_file(): + waiting_processing_files.append(mplx_file) + self.log.info( + f"Files still copying - {mplx_file} not yet present in {multiplex_dir}." + ) + if len(waiting_processing_files) > 0 and ntry < backoff_max_try: + delay = int(backoff_delay * backoff_multiplier**ntry) + time.sleep(delay) + ntry += 1 + elif len(waiting_processing_files) > 0 and ntry >= backoff_max_try: + self.log.warning("Timed out waiting for xia2.multiplex files to copy.") + timedout = True + waiting = False + else: + self.log.info("All files present for xia2.multiplex_filtering") + waiting = False + + if not timedout: + # run xia2.multiplex_filtering in working directory + + self.log.info(f"command: {' '.join(command)}") + try: + start_time = time.perf_counter() + result = subprocess.run( + command, + capture_output=True, + timeout=params.get("timeout"), + cwd=working_directory, + ) + runtime = time.perf_counter() - start_time + self.log.info(f"xia2.multiplex_filtering took {runtime} seconds") + self._runtime_hist.observe(runtime) + except subprocess.TimeoutExpired as te: + success = False + self.log.warning( + f"xia2.multiplex_filtering timed out: {te.timeout}\n {te.cmd}" + ) + self.log.debug(te.stdout) + self.log.debug(te.stderr) + self._timeout_counter.inc() + else: + if success := not result.returncode: + self.log.info("xia2.multiplex_filtering successful") + else: + self.log.info( + f"xia2.multiplex_filtering failed with exitcode {result.returncode}" + ) + self.log.debug(result.stdout) + self.log.debug(result.stderr) + self.log.info(f"working_directory: {working_directory}") + + filtered_unmerged_mtz = working_directory / "filtered_unmerged.mtz" + multiplex_filtering_json = ( + working_directory / "xia2.multiplex_filtering.json" + ) + + if not ( + filtered_unmerged_mtz.is_file() and multiplex_filtering_json.is_file() + ): + success = False + + # Create results directory + results_directory.mkdir(parents=True, exist_ok=True) + if params.get("create_symlink"): + dlstbx.util.symlink.create_parent_symlink( + results_directory, params["create_symlink"] + ) + if pipeline_final_params := params.get("pipeline-final", []): + final_directory = pathlib.Path(pipeline_final_params["path"]) + final_directory.mkdir(parents=True, exist_ok=True) + if params.get("create_symlink"): + dlstbx.util.symlink.create_parent_symlink( + final_directory, params["create_symlink"] + ) + + # Maybe move this function elsewhere + def is_final_result(final_file: pathlib.Path) -> bool: + return any( + fnmatch(str(final_file.name), patt) + for patt in pipeline_final_params["patterns"] + ) + + keep_ext = { + ".png": None, + ".log": "log", + ".json": None, + ".expt": None, + ".refl": None, + ".mtz": None, + ".html": "log", + ".sca": None, + } + keep = { + "filtered.mtz": "result", + "filtered_unmerged.mtz": "result", + "filtered.expt": "result", + "filtered.refl": "result", + "filtered.sca": "result", + "merging-stats.json": "graph", + "xia2.multiplex_filtering.json": "result", + } + + # Record these log files first so they appear at the top of the list + # of attachments in SynchWeb + + primary_log_files = [ + working_directory / "xia2.multiplex_filtering.html", + working_directory / "xia2.multiplex_filtering.log", + ] + + allfiles = [] + + if success: + with multiplex_filtering_json.open("r") as fh: + d = json.load(fh) + + # Retain place holder logic from multiplex wrapper for future iterations with clusters + # Note that 'all data' corresponds to the parent multiplex job + # It is also added to the multiplex_filtering_json so the html has user-friendly comparisons + # However, as it is just a copy of the results from multiplex, it is ignored here + + for dataset_name, dataset in d["datasets"].items(): + if dataset_name == "Filtered": + base_dir = working_directory + dimple_symlink = "dimple-xia2.multiplex_filtering" + cluster_prefix = "" + cluster_num = None + elif "coordinate cluster" in dataset_name: + self.log.warning("Not currently applying filtering to clusters") + continue + elif dataset_name == "All data": + self.log.warning( + 'Results for "All Data" already in parent xia2.multiplex run.' + ) + continue + else: + self.log.warning( + f"Ignoring unrecognised dataset pattern {dataset_name}" + ) + continue + + filtered_unmerged_mtz = ( + base_dir / f"{cluster_prefix}filtered_unmerged.mtz" + ) + i_obs = iotbx.merging_statistics.select_data( + filtered_unmerged_mtz.as_posix(), data_labels=None + ) + merging_stats = dataset["merging_stats"] + merging_stats_anom = dataset["merging_stats_anom"] + with (base_dir / f"{cluster_prefix}merging-stats.json").open( + "w" + ) as fh: + json.dump(merging_stats, fh) + + ispyb_d = { + "commandline": " ".join(command), + "spacegroup": i_obs.space_group().type().lookup_symbol(), + "unit_cell": list(i_obs.unit_cell().parameters()), + "scaling_statistics": {}, + } + for shell in ("overall", "innerShell", "outerShell"): + ispyb_d["scaling_statistics"][shell] = { + "cc_half": lookup(merging_stats, "cc_one_half", shell), + "completeness": lookup( + merging_stats, "completeness", shell + ), + "mean_i_sig_i": lookup( + merging_stats, "i_over_sigma_mean", shell + ), + "multiplicity": lookup( + merging_stats, "multiplicity", shell + ), + "n_tot_obs": lookup(merging_stats, "n_obs", shell), + "n_tot_unique_obs": lookup(merging_stats, "n_uniq", shell), + "r_merge": lookup(merging_stats, "r_merge", shell), + "res_lim_high": uctbx.d_star_sq_as_d( + lookup(merging_stats, "d_star_sq_min", shell) + ), + "res_lim_low": uctbx.d_star_sq_as_d( + lookup(merging_stats, "d_star_sq_max", shell) + ), + "anom_completeness": lookup( + merging_stats_anom, "anom_completeness", shell + ), + "anom_multiplicity": lookup( + merging_stats_anom, "multiplicity", shell + ), + "cc_anom": lookup(merging_stats_anom, "cc_anom", shell), + "r_meas_all_iplusi_minus": lookup( + merging_stats_anom, "r_meas", shell + ), + } + xtriage_results = dataset.get("xtriage") + attachments = [] + + for filename in set(primary_log_files + list(base_dir.iterdir())): + filetype = None + if not filename.is_file(): + continue + filetype = keep_ext.get(filename.suffix) + for file_pattern in keep: + if filename.name.endswith(file_pattern): + filetype = keep[file_pattern] + if filetype is None: + continue + + destination = results_directory / filename.name + if ( + destination.as_posix() in allfiles + and filename not in primary_log_files + ): + destination = ( + results_directory / f"{cluster_prefix}{filename.name}" + ) + + if destination.as_posix() not in allfiles: + self.log.debug(f"Copying {filename} to {destination}") + shutil.copy(filename, destination) + allfiles.append(destination.as_posix()) + if pipeline_final_params and is_final_result(destination): + destination = final_directory / destination.name + if destination not in allfiles: + self.log.debug(f"Copying {filename} to {destination}") + shutil.copy(filename, destination) + allfiles.append(destination.as_posix()) + + if filetype: + attachments.append( + { + "file_path": destination.parent.as_posix(), + "file_name": destination.name, + "file_type": filetype, + "importance_rank": ( + 1 + if destination.name.endswith( + ( + "filtered.mtz", + "xia2.multiplex_filtering.html", + "xia2.multiplex_filtering.log", + ) + ) + else 2 + ), + } + ) + + # Add parameters to the environment to be picked up downstream by trigger function + + # As using the same downstream triggers, update "scaled_mtz" rather than calling it by "filtered.mtz" + + self.recwrap.environment.update( + { + "scaled_mtz": ( + results_directory / f"{cluster_prefix}filtered.mtz" + ).as_posix() + } + ) + self.recwrap.environment.update( + { + "scaled_unmerged_mtz": ( + results_directory + / f"{cluster_prefix}filtered_unmerged.mtz" + ).as_posix() + } + ) + self.recwrap.environment.update({"dimple_symlink": dimple_symlink}) + + # Send results to ispyb and trigger downstream recipe steps for this dataset + + self.log.info( + f"Triggering downstream recipe steps for dataset: '{dataset_name}" + ) + self.send_results_to_ispyb( + ispyb_d, + xtriage_results=xtriage_results, + cluster_num=cluster_num, + attachments=attachments, + ) + self._success_counter.inc() + else: + self._failure_counter.inc() + else: + # if timed out and gave up because file copying unsuccessful then return False for success state + success = False + return success