diff --git a/transfers/transfer.py b/transfers/transfer.py index 6e41aa9b..73c82a21 100644 --- a/transfers/transfer.py +++ b/transfers/transfer.py @@ -425,7 +425,11 @@ def _run_continuous_water_level_transfers(metrics, flags): results_map[result_name] = result logger.info(f"Parallel task {result_name} completed in {elapsed:.2f}s") except Exception as e: - logger.critical(f"Parallel task {name} failed: {e}") + import traceback + + logger.critical( + f"Parallel task {name} failed: {traceback.format_exc()}" + ) if "Pressure" in results_map and results_map["Pressure"]: metrics.pressure_metrics(*results_map["Pressure"]) diff --git a/transfers/waterlevels_transducer_transfer.py b/transfers/waterlevels_transducer_transfer.py index c17de915..8552ca7e 100644 --- a/transfers/waterlevels_transducer_transfer.py +++ b/transfers/waterlevels_transducer_transfer.py @@ -154,7 +154,7 @@ def _install_ts(value): insert(TransducerObservation), filtered_observations, ) - session.add(block) + block = self._get_or_create_block(session, block) logger.info( f"Added {len(observations)} water levels {release_status} block" ) @@ -250,6 +250,33 @@ def _build_itertuples_field_map(df: pd.DataFrame) -> dict[str, str]: mapping[col] = field return mapping + def _get_or_create_block( + self, session: Session, block: TransducerObservationBlock + ) -> TransducerObservationBlock: + existing = ( + session.query(TransducerObservationBlock) + .filter( + TransducerObservationBlock.thing_id == block.thing_id, + TransducerObservationBlock.parameter_id == block.parameter_id, + TransducerObservationBlock.review_status == block.review_status, + TransducerObservationBlock.start_datetime == block.start_datetime, + TransducerObservationBlock.end_datetime == block.end_datetime, + ) + .one_or_none() + ) + if existing: + existing.comment = block.comment or existing.comment + existing.release_status = block.release_status or existing.release_status + existing.reviewer_id = block.reviewer_id or existing.reviewer_id + existing.created_by_name = block.created_by_name or existing.created_by_name + existing.created_by_id = block.created_by_id or existing.created_by_id + existing.updated_by_name = block.updated_by_name or existing.updated_by_name + existing.updated_by_id = block.updated_by_id or existing.updated_by_id + return existing + + session.add(block) + return block + class WaterLevelsContinuousPressureTransferer(WaterLevelsContinuousTransferer): source_table = "WaterLevelsContinuous_Pressure" @@ -328,7 +355,12 @@ def _legacy_payload(self, row: pd.Series) -> dict: def _find_deployment(ts, deployments): - date = ts.date() + if isinstance(ts, Timestamp): + date = ts.date() + elif hasattr(ts, "date"): + date = ts.date() + else: + date = pd.Timestamp(ts).date() for d in deployments: if d.installation_date > date: break # because sorted by start