Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions Scripts/qkrig_ts_daily.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#!/usr/bin/env python3
"""
Extract TS for a single date and save into water-year-wise catchment CSVs.
Usage: python qkrig_ts_daily.py YYYY-MM-DD /output_dir
"""

import sys, os, datetime as dt
import numpy as np
import pandas as pd
import geopandas as gpd

# --- USER CONFIG ---
GPKG_PATH = "~/.ngiab/hydrofabric/v2.2/conus_nextgen.gpkg"
EXPORT_DIR = "/mnt/disk1/usgskrig/exports/gridsize/200/conus/range/100km/all_guages/"
LAYER = "divides"
ID_FIELD = "divide_id"
GRID_LON_KEY = "grid_lon"
GRID_LAT_KEY = "grid_lat"
GRID_VAL_KEY = "z_interp"
GRID_VAR_KEY = "kriging_variance" # variance key confirmed

# --- Load GDF ---
gdf = gpd.read_file(GPKG_PATH, layer=LAYER)
if gdf.crs.is_geographic:
gdf_proj = gdf.to_crs("EPSG:5070")
else:
gdf_proj = gdf
cent_proj = gdf_proj.geometry.centroid
gdf["centroid"] = gpd.GeoSeries(cent_proj, crs=gdf_proj.crs).to_crs(4326)

# --- Helper functions ---
def npz_path_for_date(d: dt.date) -> str:
return os.path.join(EXPORT_DIR, f"interp_{d.isoformat()}.npz")

def nearest_grid_value(lons, lats, vals, pt_lon, pt_lat):
ix = np.argmin(np.abs(lons - pt_lon))
iy = np.argmin(np.abs(lats - pt_lat))
return float(vals[iy, ix])

def grid_sample_both(npz_path, centroids):
with np.load(npz_path, allow_pickle=True) as z:
L, A = z[GRID_LON_KEY], z[GRID_LAT_KEY]
V, VV = z[GRID_VAL_KEY], z.get(GRID_VAR_KEY, None)
if VV is None:
return centroids.apply(lambda pt: (nearest_grid_value(L, A, V, pt.x, pt.y), np.nan))
return centroids.apply(lambda pt: (
nearest_grid_value(L, A, V, pt.x, pt.y),
nearest_grid_value(L, A, VV, pt.x, pt.y)
))

def water_year(d: dt.date) -> int:
"""Return the water year for a given date (Oct 1 - Sep 30)."""
return d.year + 1 if d.month >= 10 else d.year

# --- MAIN ---
if __name__ == "__main__":
if len(sys.argv) < 3:
print("Usage: python qkrig_ts_daily.py YYYY-MM-DD /output_dir")
sys.exit(1)

date_str = sys.argv[1]
out_dir = sys.argv[2]
d = dt.date.fromisoformat(date_str)
npz_file = npz_path_for_date(d)

if not os.path.exists(npz_file):
print(f"No NPZ file for {d}, skipping")
sys.exit(0)

# --- Sample daily values (both mean + variance) ---
vals = grid_sample_both(npz_file, gdf["centroid"])
ser_val = vals.apply(lambda x: x[0])
ser_var = vals.apply(lambda x: x[1])

wy = water_year(d)
wy_dir = os.path.join(out_dir, f"WY{wy}")
os.makedirs(wy_dir, exist_ok=True)

# --- Append to per-catchment CSVs ---
for cat_id, val, var_val in zip(gdf[ID_FIELD].values, ser_val.values, ser_var.values):
cat_file = os.path.join(wy_dir, f"{cat_id}.csv")
df_day = pd.DataFrame({"date": [d], "qkrig": [val], "variance": [var_val]})
if os.path.exists(cat_file):
df_day.to_csv(cat_file, mode='a', header=False, index=False)
else:
df_day.to_csv(cat_file, mode='w', header=True, index=False)

print(f"Appended daily values + variance for {date_str} to WY{wy} catchment files")
20 changes: 20 additions & 0 deletions Scripts/qkrig_ts_range.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/bin/bash
set -e

START_DATE="2011-06-01"
END_DATE="2019-09-30"
OUT_DIR="/mnt/disk1/qkrig/conus/"
JOBS=4 # number of parallel jobs

mkdir -p "$OUT_DIR"

# Generate list of dates
DATES=$(seq 0 $(( ($(date -d "$END_DATE" +%s) - $(date -d "$START_DATE" +%s)) / 86400 )) | \
xargs -I{} date -I -d "$START_DATE + {} days")

# Run extraction in parallel using xargs
echo "$DATES" | xargs -n 1 -P $JOBS -I{} python3 qkrig_ts_daily.py {} "$OUT_DIR"

echo "✅ All water-year catchment CSVs updated in $OUT_DIR"


96 changes: 92 additions & 4 deletions src/core/base_krig.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,31 +58,119 @@ def __init__(self, data, config_path, year, month, day):
# Semivariogram cache
self._semivar_cache: Optional[Tuple[np.ndarray, np.ndarray]] = None # (bin_centers_km, semi_variance)
self._semivar_bins_used: Optional[int] = None

def fit_daily_variogram(self):
"""
Fit empirical variogram for this day.
Returns sill, nugget, range in DEGREES (PyKrige-ready).
"""
num_points = len(self.lons)
if num_points < 5:
raise ValueError("Too few points for variogram fitting")

distances = []
semivariances = []

for i in range(num_points):
for j in range(i + 1, num_points):
_, _, d_m = self.geod.inv(
self.lons[i], self.lats[i],
self.lons[j], self.lats[j]
)
distances.append(d_m / 1000.0) # km
semivariances.append(0.5 * (self.values[i] - self.values[j]) ** 2)

distances = np.asarray(distances)
semivariances = np.asarray(semivariances)

n_bins = self.variogram_bins
max_dist = np.percentile(distances, 90)
bins = np.linspace(0, max_dist, n_bins + 1)
bin_ids = np.digitize(distances, bins)

gamma = []
bin_centers = []

for k in range(1, len(bins)):
mask = bin_ids == k
if np.any(mask):
gamma.append(np.mean(semivariances[mask]))
bin_centers.append(0.5 * (bins[k] + bins[k - 1]))

gamma = np.asarray(gamma)
bin_centers = np.asarray(bin_centers)

nugget = float(np.min(gamma))
sill = float(np.percentile(gamma, 90))

# First distance where semivariance reaches sill
idx = np.argmax(gamma >= 0.95 * sill)
range_km = bin_centers[idx]
range_deg = range_km / 111.0

return {
"nugget": nugget,
"sill": sill,
"range": range_deg
}


# ---------------------------------------------------------------------
# Core computations
# ---------------------------------------------------------------------
def compute_kriging(self):
kcfg = self.config.get("kriging", {}) or {}
use_daily = kcfg.get("fit_daily_variogram", False)

variogram_params = None

if kcfg.get("range"):
if use_daily:
try:
vg = self.fit_daily_variogram()

if vg["range"] <= 0 or not np.isfinite(vg["sill"]):
raise ValueError("Invalid daily variogram")

variogram_params = {
"sill": vg["sill"],
"range": vg["range"],
"nugget": vg["nugget"],
}

print(
f"[{self.year}-{self.month:02d}-{self.day:02d}] "
f"Daily variogram | "
f"sill={vg['sill']:.3f}, "
f"range={vg['range']*111:.1f} km, "
f"nugget={vg['nugget']:.3f}"
)

except Exception as e:
print(f"⚠️ Daily variogram failed, using config values: {e}")

if variogram_params is None and kcfg.get("range"):
variogram_params = {
"sill": kcfg.get("sill", None),
"range": float(kcfg["range"]) / 111.0, # km -> degrees (approx)
"range": float(kcfg["range"]) / 111.0,
"nugget": kcfg.get("nugget", 0.0),
}

ok = OrdinaryKriging(
self.lons, self.lats, self.values,
self.lons,
self.lats,
self.values,
variogram_model=self.variogram_model,
exact_values=kcfg.get("exact_values", True),
nlags=kcfg.get("nlags", 12),
weight=kcfg.get("weight", True),
variogram_parameters=variogram_params,
)

self.z_interp, self.kriging_variance = ok.execute("grid", self.grid_lon, self.grid_lat)
self.z_interp, self.kriging_variance = ok.execute(
"grid", self.grid_lon, self.grid_lat
)



def compute_semivariogram(self, bins: Optional[int] = None) -> Tuple[np.ndarray, np.ndarray]:
"""
Expand Down