Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion transfers/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,11 @@ def _run_continuous_water_level_transfers(metrics, flags):
results_map[result_name] = result
logger.info(f"Parallel task {result_name} completed in {elapsed:.2f}s")
except Exception as e:
logger.critical(f"Parallel task {name} failed: {e}")
import traceback

logger.critical(
f"Parallel task {name} failed: {traceback.format_exc()}"
)

if "Pressure" in results_map and results_map["Pressure"]:
metrics.pressure_metrics(*results_map["Pressure"])
Expand Down
36 changes: 34 additions & 2 deletions transfers/waterlevels_transducer_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def _install_ts(value):
insert(TransducerObservation),
filtered_observations,
)
session.add(block)
block = self._get_or_create_block(session, block)
logger.info(
f"Added {len(observations)} water levels {release_status} block"
)
Expand Down Expand Up @@ -250,6 +250,33 @@ def _build_itertuples_field_map(df: pd.DataFrame) -> dict[str, str]:
mapping[col] = field
return mapping

def _get_or_create_block(
self, session: Session, block: TransducerObservationBlock
) -> TransducerObservationBlock:
existing = (
session.query(TransducerObservationBlock)
.filter(
TransducerObservationBlock.thing_id == block.thing_id,
TransducerObservationBlock.parameter_id == block.parameter_id,
TransducerObservationBlock.review_status == block.review_status,
TransducerObservationBlock.start_datetime == block.start_datetime,
TransducerObservationBlock.end_datetime == block.end_datetime,
)
.one_or_none()
)
if existing:
existing.comment = block.comment or existing.comment
existing.release_status = block.release_status or existing.release_status

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve existing block release_status when reusing

When a matching block already exists, this assignment overwrites existing.release_status with block.release_status. New blocks inherit ReleaseMixin’s default release status of "draft" (db/base.py), so rerunning the transfer for the same thing/parameter/time window will downgrade any previously public/private block back to draft. That’s a regression for reimports; only update release_status when it was explicitly set (or leave it unchanged for existing blocks).

Useful? React with 👍 / 👎.

existing.reviewer_id = block.reviewer_id or existing.reviewer_id
existing.created_by_name = block.created_by_name or existing.created_by_name
existing.created_by_id = block.created_by_id or existing.created_by_id
existing.updated_by_name = block.updated_by_name or existing.updated_by_name
existing.updated_by_id = block.updated_by_id or existing.updated_by_id
return existing

session.add(block)
return block


class WaterLevelsContinuousPressureTransferer(WaterLevelsContinuousTransferer):
source_table = "WaterLevelsContinuous_Pressure"
Expand Down Expand Up @@ -328,7 +355,12 @@ def _legacy_payload(self, row: pd.Series) -> dict:


def _find_deployment(ts, deployments):
date = ts.date()
if isinstance(ts, Timestamp):
date = ts.date()
elif hasattr(ts, "date"):
date = ts.date()
else:
date = pd.Timestamp(ts).date()
for d in deployments:
if d.installation_date > date:
break # because sorted by start
Expand Down
Loading