diff --git a/transfers/transfer.py b/transfers/transfer.py index 67e6ba78..6e41aa9b 100644 --- a/transfers/transfer.py +++ b/transfers/transfer.py @@ -352,49 +352,49 @@ def transfer_all(metrics: Metrics) -> list[ProfileArtifact]: results = _execute_transfer(WellTransferer, flags=flags) metrics.well_metrics(*results) - # Get transfer flags - transfer_options = load_transfer_options() - - # ========================================================================= - # PHASE 1.5: Non-well location types (parallel, after wells, before other transfers) - # These create Things and Locations that chemistry/other transfers depend on. - # ========================================================================= - non_well_tasks = [] - if transfer_options.transfer_springs: - non_well_tasks.append(("Springs", transfer_springs)) - if transfer_options.transfer_perennial_streams: - non_well_tasks.append(("PerennialStreams", transfer_perennial_stream)) - if transfer_options.transfer_ephemeral_streams: - non_well_tasks.append(("EphemeralStreams", transfer_ephemeral_stream)) - if transfer_options.transfer_met_stations: - non_well_tasks.append(("MetStations", transfer_met)) - - if non_well_tasks: - message("PHASE 1.5: NON-WELL LOCATION TYPES (PARALLEL)") - with ThreadPoolExecutor(max_workers=len(non_well_tasks)) as executor: - futures = { - executor.submit( - _execute_session_transfer_with_timing, name, func, limit - ): name - for name, func in non_well_tasks - } - - for future in as_completed(futures): - name = futures[future] - try: - result_name, result, elapsed = future.result() - logger.info( - f"Non-well transfer {result_name} completed in {elapsed:.2f}s" - ) - except Exception as e: - logger.critical(f"Non-well transfer {name} failed: {e}") - - _transfer_parallel( - metrics, - flags, - limit, - transfer_options, - ) + # Get transfer flags + transfer_options = load_transfer_options() + + # ========================================================================= + # PHASE 1.5: Non-well location types (parallel, after wells, before other transfers) + # These create Things and Locations that chemistry/other transfers depend on. + # ========================================================================= + non_well_tasks = [] + if transfer_options.transfer_springs: + non_well_tasks.append(("Springs", transfer_springs)) + if transfer_options.transfer_perennial_streams: + non_well_tasks.append(("PerennialStreams", transfer_perennial_stream)) + if transfer_options.transfer_ephemeral_streams: + non_well_tasks.append(("EphemeralStreams", transfer_ephemeral_stream)) + if transfer_options.transfer_met_stations: + non_well_tasks.append(("MetStations", transfer_met)) + + if non_well_tasks: + message("PHASE 1.5: NON-WELL LOCATION TYPES (PARALLEL)") + with ThreadPoolExecutor(max_workers=len(non_well_tasks)) as executor: + futures = { + executor.submit( + _execute_session_transfer_with_timing, name, func, limit + ): name + for name, func in non_well_tasks + } + + for future in as_completed(futures): + name = futures[future] + try: + result_name, result, elapsed = future.result() + logger.info( + f"Non-well transfer {result_name} completed in {elapsed:.2f}s" + ) + except Exception as e: + logger.critical(f"Non-well transfer {name} failed: {e}") + + _transfer_parallel( + metrics, + flags, + limit, + transfer_options, + ) return profile_artifacts @@ -610,6 +610,14 @@ def _transfer_parallel( # the transfer process is bisected because the continuous water levels process is # very time consuming and we want to run it alone in its own phase. + # ========================================================================= + # PHASE 5: Cleanup locations. populate state, county, quadname + # ========================================================================= + if get_bool_env("CLEANUP_LOCATIONS", True): + message("CLEANING UP LOCATIONS") + with session_ctx() as session: + cleanup_locations(session) + def main(): message("START--------------------------------------") @@ -633,11 +641,6 @@ def main(): profile_artifacts = transfer_all(metrics) - if get_bool_env("CLEANUP_LOCATIONS", True): - message("CLEANING UP LOCATIONS") - with session_ctx() as session: - cleanup_locations(session) - metrics.close() metrics.save_to_storage_bucket() save_log_to_bucket() diff --git a/transfers/waterlevels_transducer_transfer.py b/transfers/waterlevels_transducer_transfer.py index d96b11d8..c17de915 100644 --- a/transfers/waterlevels_transducer_transfer.py +++ b/transfers/waterlevels_transducer_transfer.py @@ -126,8 +126,15 @@ def _transfer_hook(self, session: Session) -> None: logger.info(f"no {release_status} records for pointid {pointid}") continue + def _install_ts(value): + if isinstance(value, Timestamp): + return value + if hasattr(value, "date"): + return Timestamp(value) + return Timestamp(pd.to_datetime(value, errors="coerce")) + deps_sorted = sorted( - deployments, key=lambda d: Timestamp(d.installation_date) + deployments, key=lambda d: _install_ts(d.installation_date) ) observations = [