Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions ngen_forcing/defs.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,11 @@ def xr_read_window(ds, window, mask=None):
return data
else:
return data.where(mask)


def xr_read_window_time(ds, window, mask=None, idx=None, time=None):
data = ds.isel(window)
if mask is None:
return idx, time, data
else:
return idx, time, data.where(mask)
159 changes: 112 additions & 47 deletions ngen_forcing/process_nwm_forcing_to_ngen.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from defs import xr_read_window, polymask
from defs import xr_read_window, polymask, xr_read_window_time
from rasterio import _io, windows
import concurrent.futures
import xarray as xr
import pandas as pd


class MemoryDataset(_io.MemoryDataset, windows.WindowMethodsMixin):
Expand All @@ -15,50 +15,66 @@ def junk():


def get_forcing_dict_newway(
feature_index,
feature_list,
folder_prefix,
file_list,
var_list,
):

reng = "rasterio"
_xds = xr.open_dataset(folder_prefix.joinpath(file_list[0]), engine=reng)
_template_arr = _xds.U2D.values

_xds_dummy = xr.open_dataset(folder_prefix.joinpath(file_list[0]), engine=reng)
_template_arr = _xds_dummy.U2D.values
_u2d = MemoryDataset(
_template_arr,
transform=_xds.U2D.rio.transform(),
transform=_xds_dummy.U2D.rio.transform(),
gcps=None,
rpcs=None,
crs=None,
copy=False,
)

filehandles = [xr.open_dataset("data/" + f) for f in file_list]
stats = []
for i, m in enumerate(map(polymask(_u2d), feature_list)):
ds_list = []
for _nc_file in file_list:
_full_nc_file = folder_prefix.joinpath(_nc_file)
ds_list.append(xr.open_dataset(_full_nc_file, engine=reng))

df_dict = {}
for _v in var_list:
df_dict[_v] = pd.DataFrame(index=feature_index)

for i, feature in enumerate(feature_list):
print(f"{i}, {round(i/len(feature_list), 5)*100}".ljust(40), end="\r")
mask, _, window = m
mask, _, window = polymask(_u2d)(feature)
mask = xr.DataArray(mask, dims=["y", "x"])
winslices = dict(zip(["y", "x"], window.toslices()))
for f in filehandles:
cropped = xr_read_window(f, winslices, mask=mask)
stats.append(cropped.mean())
[f.close() for f in filehandles] # Returns None for each file
for j, _xds in enumerate(ds_list):
time_value = _xds.time.values[0]
cropped = xr_read_window(_xds, winslices, mask=mask)
stats = cropped.mean()
for var in var_list:
df_dict[var].loc[i, time_value] = stats[var]

return stats
[ds.close() for ds in ds_list]
return df_dict


def get_forcing_dict_newway_parallel(
feature_index,
feature_list,
folder_prefix,
file_list,
var_list,
para="thread",
para_n=2,
):

import concurrent.futures

reng = "rasterio"
_xds = xr.open_dataset(folder_prefix.joinpath(file_list[0]), engine=reng)
_template_arr = _xds.U2D.values

_u2d = MemoryDataset(
_template_arr,
transform=_xds.U2D.rio.transform(),
Expand All @@ -67,7 +83,10 @@ def get_forcing_dict_newway_parallel(
crs=None,
copy=False,
)
filehandles = [xr.open_dataset("data/" + f) for f in file_list]
ds_list = [xr.open_dataset(folder_prefix.joinpath(f)) for f in file_list]
# ds_list = [xr.open_dataset(folder_prefix.joinpath(f), engine=reng) for f in file_list]
# TODO: figure out why using the rasterio engine DOES NOT WORK with parallel
# TODO: figure out why NOT using the rasterio engine produces a different result

if para == "process":
pool = concurrent.futures.ProcessPoolExecutor
Expand All @@ -76,71 +95,101 @@ def get_forcing_dict_newway_parallel(
else:
pool = concurrent.futures.ThreadPoolExecutor

stats = []
future_list = []

with pool(max_workers=para_n) as executor:
stats = []
future_list = []

for i, m in enumerate(map(polymask(_u2d), feature_list)):
print(f"{i}, {round(i/len(feature_list), 5)*100}".ljust(40), end="\r")
mask, _, window = m
for _i, _m in enumerate(map(polymask(_u2d), feature_list)):
print(f"{_i}, {round(_i/len(feature_list), 5)*100}".ljust(40), end="\r")
mask, _, window = _m
mask = xr.DataArray(mask, dims=["y", "x"])
winslices = dict(zip(["y", "x"], window.toslices()))
for f in filehandles:
future = executor.submit(xr_read_window, f, winslices, mask=mask)
for ds in ds_list:
_t = ds.time.values[0]
future = executor.submit(
xr_read_window_time, ds, winslices, mask=mask, idx=_i, time=_t
)
# cropped = xr_read_window(f, winslices, mask=mask)
# stats.append(cropped.mean())
future_list.append(future)
for _f in concurrent.futures.as_completed(future_list):
stats.append(_f.result().mean())
_j, _t, _s = _f.result()
stats.append((_j, _t, _s))

[f.close() for f in filehandles]
return stats
df_dict = {}
for _v in var_list:
df_dict[_v] = pd.DataFrame(index=feature_index)

for j, t, s in stats:
for var in var_list:
df_dict[var].loc[j, t] = s[var].mean()

[ds.close() for ds in ds_list]
return df_dict


def get_forcing_dict_newway_inverted(
feature_index,
feature_list,
folder_prefix,
file_list,
var_list,
):

reng = "rasterio"
_xds = xr.open_dataset(folder_prefix.joinpath(file_list[0]), engine=reng)
_template_arr = _xds.U2D.values

_xds_dummy = xr.open_dataset(folder_prefix.joinpath(file_list[0]), engine=reng)
_template_arr = _xds_dummy.U2D.values
_u2d = MemoryDataset(
_template_arr,
transform=_xds.U2D.rio.transform(),
transform=_xds_dummy.U2D.rio.transform(),
gcps=None,
rpcs=None,
crs=None,
copy=False,
)
ds_list = []
for _nc_file in file_list:
_full_nc_file = folder_prefix.joinpath(_nc_file)
ds_list.append(xr.open_dataset(_full_nc_file, engine=reng))

filehandles = [xr.open_dataset("data/" + f) for f in file_list]
stats = []
future_list = []
mask_win_list = []

for i, m in enumerate(map(polymask(_u2d), feature_list)):
for i, feature in enumerate(feature_list):
print(f"{i}, {round(i/len(feature_list), 5)*100}".ljust(40), end="\r")
mask, _, window = m
mask, _, window = polymask(_u2d)(feature)
mask = xr.DataArray(mask, dims=["y", "x"])
winslices = dict(zip(["y", "x"], window.toslices()))
mask_win_list.append((mask, winslices))

for f in filehandles:
for i, f in enumerate(ds_list):
print(f"{i}, {round(i/len(file_list), 2)*100}".ljust(40), end="\r")
for _m, _w in mask_win_list:
time_value = f.time.values[0]
# TODO: when we read the window, could the time be added as a dimension?
for j, (_m, _w) in enumerate(mask_win_list):
cropped = xr_read_window(f, _w, mask=_m)
stats.append(cropped.mean())
stats.append((j, time_value, cropped.mean()))

df_dict = {}
for _v in var_list:
df_dict[_v] = pd.DataFrame(index=feature_index)

for j, t, s in stats:
for var in var_list:
df_dict[var].loc[j, t] = s[var]

[f.close() for f in filehandles]
return stats
[ds.close() for ds in ds_list]
return df_dict


def get_forcing_dict_newway_inverted_parallel(
feature_index,
feature_list,
folder_prefix,
file_list,
var_list,
para="thread",
para_n=2,
):
Expand All @@ -167,7 +216,11 @@ def get_forcing_dict_newway_inverted_parallel(
winslices = dict(zip(["y", "x"], window.toslices()))
mask_win_list.append((mask, winslices))

filehandles = [xr.open_dataset("data/" + f) for f in file_list]
ds_list = [xr.open_dataset(folder_prefix.joinpath(f)) for f in file_list]
# ds_list = [xr.open_dataset(folder_prefix.joinpath(f), engine=reng) for f in file_list]
# TODO: figure out why using the rasterio engine DOES NOT WORK with parallel
# TODO: figure out why NOT using the rasterio engine produces a different result

stats = []
future_list = []

Expand All @@ -180,15 +233,27 @@ def get_forcing_dict_newway_inverted_parallel(

with pool(max_workers=para_n) as executor:

for f in filehandles:
print(f"{i}, {round(i/len(file_list), 2)*100}".ljust(40), end="\r")
for _m, _w in mask_win_list:
future = executor.submit(xr_read_window, f, _w, mask=_m)
# cropped = xr_read_window(f, _w, mask=_m)
for j, ds in enumerate(ds_list):
print(f"{j}, {round(i/len(file_list), 2)*100}".ljust(40), end="\r")
_t = ds.time.values[0]
for _i, (_m, _w) in enumerate(mask_win_list):
future = executor.submit(
xr_read_window_time, ds, _w, mask=_m, idx=_i, time=_t
)
# cropped = xr_read_window(ds, _w, mask=_m)
# stats.append(cropped.mean())
future_list.append(future)
for _f in concurrent.futures.as_completed(future_list):
stats.append(_f.result().mean())
_j, _t, _s = _f.result()
stats.append((_j, _t, _s))

df_dict = {}
for _v in var_list:
df_dict[_v] = pd.DataFrame(index=feature_index)

for j, t, s in stats:
for var in var_list:
df_dict[var].loc[j, t] = s[var].mean()

[f.close() for f in filehandles]
return stats
[ds.close() for ds in ds_list]
return df_dict
Loading