Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 51 additions & 48 deletions transfers/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,49 +352,49 @@ def transfer_all(metrics: Metrics) -> list[ProfileArtifact]:
results = _execute_transfer(WellTransferer, flags=flags)
metrics.well_metrics(*results)

# Get transfer flags
transfer_options = load_transfer_options()

# =========================================================================
# PHASE 1.5: Non-well location types (parallel, after wells, before other transfers)
# These create Things and Locations that chemistry/other transfers depend on.
# =========================================================================
non_well_tasks = []
if transfer_options.transfer_springs:
non_well_tasks.append(("Springs", transfer_springs))
if transfer_options.transfer_perennial_streams:
non_well_tasks.append(("PerennialStreams", transfer_perennial_stream))
if transfer_options.transfer_ephemeral_streams:
non_well_tasks.append(("EphemeralStreams", transfer_ephemeral_stream))
if transfer_options.transfer_met_stations:
non_well_tasks.append(("MetStations", transfer_met))

if non_well_tasks:
message("PHASE 1.5: NON-WELL LOCATION TYPES (PARALLEL)")
with ThreadPoolExecutor(max_workers=len(non_well_tasks)) as executor:
futures = {
executor.submit(
_execute_session_transfer_with_timing, name, func, limit
): name
for name, func in non_well_tasks
}

for future in as_completed(futures):
name = futures[future]
try:
result_name, result, elapsed = future.result()
logger.info(
f"Non-well transfer {result_name} completed in {elapsed:.2f}s"
)
except Exception as e:
logger.critical(f"Non-well transfer {name} failed: {e}")

_transfer_parallel(
metrics,
flags,
limit,
transfer_options,
)
# Get transfer flags
transfer_options = load_transfer_options()

# =========================================================================
# PHASE 1.5: Non-well location types (parallel, after wells, before other transfers)
# These create Things and Locations that chemistry/other transfers depend on.
# =========================================================================
non_well_tasks = []
if transfer_options.transfer_springs:
non_well_tasks.append(("Springs", transfer_springs))
if transfer_options.transfer_perennial_streams:
non_well_tasks.append(("PerennialStreams", transfer_perennial_stream))
if transfer_options.transfer_ephemeral_streams:
non_well_tasks.append(("EphemeralStreams", transfer_ephemeral_stream))
if transfer_options.transfer_met_stations:
non_well_tasks.append(("MetStations", transfer_met))

if non_well_tasks:
message("PHASE 1.5: NON-WELL LOCATION TYPES (PARALLEL)")
with ThreadPoolExecutor(max_workers=len(non_well_tasks)) as executor:
futures = {
executor.submit(
_execute_session_transfer_with_timing, name, func, limit
): name
for name, func in non_well_tasks
}

for future in as_completed(futures):
name = futures[future]
try:
result_name, result, elapsed = future.result()
logger.info(
f"Non-well transfer {result_name} completed in {elapsed:.2f}s"
)
except Exception as e:
logger.critical(f"Non-well transfer {name} failed: {e}")

_transfer_parallel(
metrics,
flags,
limit,
transfer_options,
)

return profile_artifacts

Expand Down Expand Up @@ -610,6 +610,14 @@ def _transfer_parallel(
# the transfer process is bisected because the continuous water levels process is
# very time consuming and we want to run it alone in its own phase.

# =========================================================================
# PHASE 5: Cleanup locations. populate state, county, quadname
Copy link

Copilot AI Jan 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment uses lowercase 'populate' which is inconsistent with the capitalization style used in other phase comments. Consider capitalizing to 'Populate' for consistency.

Suggested change
# PHASE 5: Cleanup locations. populate state, county, quadname
# PHASE 5: Cleanup locations. Populate state, county, quadname

Copilot uses AI. Check for mistakes.
# =========================================================================
if get_bool_env("CLEANUP_LOCATIONS", True):
message("CLEANING UP LOCATIONS")
with session_ctx() as session:
cleanup_locations(session)


def main():
message("START--------------------------------------")
Expand All @@ -633,11 +641,6 @@ def main():

profile_artifacts = transfer_all(metrics)

if get_bool_env("CLEANUP_LOCATIONS", True):
message("CLEANING UP LOCATIONS")
with session_ctx() as session:
cleanup_locations(session)

metrics.close()
metrics.save_to_storage_bucket()
save_log_to_bucket()
Expand Down
9 changes: 8 additions & 1 deletion transfers/waterlevels_transducer_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,15 @@ def _transfer_hook(self, session: Session) -> None:
logger.info(f"no {release_status} records for pointid {pointid}")
continue

def _install_ts(value):
Copy link

Copilot AI Jan 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function name _install_ts is ambiguous. Consider renaming to _convert_to_timestamp or _normalize_installation_timestamp to better convey its purpose of converting various date formats to Timestamp objects.

Copilot uses AI. Check for mistakes.
if isinstance(value, Timestamp):
return value
if hasattr(value, "date"):
return Timestamp(value)
return Timestamp(pd.to_datetime(value, errors="coerce"))
Copy link

Copilot AI Jan 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When pd.to_datetime(value, errors='coerce') returns NaT (Not a Time), converting it to Timestamp could cause sorting issues. Consider adding error handling or validation to ensure valid timestamps are returned, or document that NaT values are acceptable for sorting.

Suggested change
return Timestamp(pd.to_datetime(value, errors="coerce"))
dt = pd.to_datetime(value, errors="coerce")
if pd.isna(dt):
# Use a maximal timestamp so invalid/missing dates sort last
return Timestamp.max
return Timestamp(dt)

Copilot uses AI. Check for mistakes.

deps_sorted = sorted(
deployments, key=lambda d: Timestamp(d.installation_date)
deployments, key=lambda d: _install_ts(d.installation_date)
)

observations = [
Expand Down
Loading