diff --git a/python/activator/activator.py b/python/activator/activator.py index 6d950899..e232299a 100644 --- a/python/activator/activator.py +++ b/python/activator/activator.py @@ -614,7 +614,7 @@ def _process_visit_or_cancel(expected_visit: FannedOutVisit): if not mwi.get_main_pipeline_files(): raise IgnorableVisit(f"No pipeline configured for {expected_visit}.") # TODO: pipeline execution requires a clean run until DM-38041. - cleanups.callback(mwi.clean_local_repo, expid_set) + cleanups.callback(mwi.clean_local_repo) # Copy calibrations for this detector/visit mwi.prep_butler() diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index 4b22856d..ba07f91e 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -1834,30 +1834,36 @@ def _query_datasets_by_storage_class(self, butler, exposure_ids, collections, st ) for t in matching_types ) - def clean_local_repo(self, exposure_ids: set[int]) -> None: + def clean_local_repo(self) -> None: """Remove local repo content that is only needed for a single visit. This includes raws and pipeline outputs. - - Parameter - --------- - exposure_ids : `set` [`int`] - Identifiers of the exposures to be removed. """ with lsst.utils.timer.time_this(_log, msg="clean_local_repo", level=logging.DEBUG): self.butler.registry.refresh() - if exposure_ids: - raws = self.butler.query_datasets( - 'raw', - collections=self.instrument.makeDefaultRawIngestRunName(), - where=f"exposure in ({', '.join(str(x) for x in exposure_ids)})", - find_first=False, - explain=False, # Raws might not have been ingested. - instrument=self.visit.instrument, - detector=self.visit.detector, - ) - _log_trace.debug("Removing %d raws for exposures %s.", len(raws), exposure_ids) - self.butler.pruneDatasets(raws, disassociate=True, unstore=True, purge=True) + + # Clean out raws + raws = self.butler.query_datasets( + "raw", + collections=self.instrument.makeDefaultRawIngestRunName(), + find_first=False, + explain=False, # Raws might not have been ingested. + instrument=self.visit.instrument, + detector=self.visit.detector, + ) + n_raws = len(raws) + if n_raws == 0: + _log_trace.debug("No raws to remove for detector %s.", self.visit.detector) + else: + _log_trace.debug("Removing %d raw(s) for detector %s.", n_raws, self.visit.detector) + try: + self.butler.pruneDatasets(raws, disassociate=True, unstore=True, purge=True) + except Exception: + _log_trace.exception("Raw removal failed for detector %s.", self.visit.detector) + raise + _log_trace.debug( + "Successfully removed %d raw(s) for detector %s.", n_raws, self.visit.detector) + # Outputs are all in their own runs, so just drop them. preload_run = runs.get_preload_run(self.instrument, self._deployment, self._day_obs) _remove_run_completely(self.butler, preload_run) diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py index 37ccd01c..41c48d95 100644 --- a/tests/test_middleware_interface.py +++ b/tests/test_middleware_interface.py @@ -1098,7 +1098,7 @@ def test_clean_local_repo(self): self._assert_in_collection(butler, "*", "bias", calib_data_id_2) self._assert_in_collection(butler, "*", "bias", calib_data_id_3) - self.interface.clean_local_repo({raw_data_id["exposure"]}) + self.interface.clean_local_repo() self._assert_not_in_collection(butler, "*", "raw", raw_data_id) self._assert_not_in_collection(butler, "*", "src", processed_data_id) self._assert_not_in_collection(butler, "*", "calexp", processed_data_id)