diff --git a/ngen_forcing/defs.py b/ngen_forcing/defs.py index 58b2c1d..18175c8 100644 --- a/ngen_forcing/defs.py +++ b/ngen_forcing/defs.py @@ -16,3 +16,11 @@ def xr_read_window(ds, window, mask=None): return data else: return data.where(mask) + + +def xr_read_window_time(ds, window, mask=None, idx=None, time=None): + data = ds.isel(window) + if mask is None: + return idx, time, data + else: + return idx, time, data.where(mask) diff --git a/ngen_forcing/process_nwm_forcing_to_ngen.py b/ngen_forcing/process_nwm_forcing_to_ngen.py index 2a34f9a..ec299b5 100644 --- a/ngen_forcing/process_nwm_forcing_to_ngen.py +++ b/ngen_forcing/process_nwm_forcing_to_ngen.py @@ -1,7 +1,7 @@ -from defs import xr_read_window, polymask +from defs import xr_read_window, polymask, xr_read_window_time from rasterio import _io, windows -import concurrent.futures import xarray as xr +import pandas as pd class MemoryDataset(_io.MemoryDataset, windows.WindowMethodsMixin): @@ -15,50 +15,66 @@ def junk(): def get_forcing_dict_newway( + feature_index, feature_list, folder_prefix, file_list, + var_list, ): reng = "rasterio" - _xds = xr.open_dataset(folder_prefix.joinpath(file_list[0]), engine=reng) - _template_arr = _xds.U2D.values + + _xds_dummy = xr.open_dataset(folder_prefix.joinpath(file_list[0]), engine=reng) + _template_arr = _xds_dummy.U2D.values _u2d = MemoryDataset( _template_arr, - transform=_xds.U2D.rio.transform(), + transform=_xds_dummy.U2D.rio.transform(), gcps=None, rpcs=None, crs=None, copy=False, ) - filehandles = [xr.open_dataset("data/" + f) for f in file_list] - stats = [] - for i, m in enumerate(map(polymask(_u2d), feature_list)): + ds_list = [] + for _nc_file in file_list: + _full_nc_file = folder_prefix.joinpath(_nc_file) + ds_list.append(xr.open_dataset(_full_nc_file, engine=reng)) + + df_dict = {} + for _v in var_list: + df_dict[_v] = pd.DataFrame(index=feature_index) + + for i, feature in enumerate(feature_list): print(f"{i}, {round(i/len(feature_list), 5)*100}".ljust(40), end="\r") - mask, _, window = m + mask, _, window = polymask(_u2d)(feature) mask = xr.DataArray(mask, dims=["y", "x"]) winslices = dict(zip(["y", "x"], window.toslices())) - for f in filehandles: - cropped = xr_read_window(f, winslices, mask=mask) - stats.append(cropped.mean()) - [f.close() for f in filehandles] # Returns None for each file + for j, _xds in enumerate(ds_list): + time_value = _xds.time.values[0] + cropped = xr_read_window(_xds, winslices, mask=mask) + stats = cropped.mean() + for var in var_list: + df_dict[var].loc[i, time_value] = stats[var] - return stats + [ds.close() for ds in ds_list] + return df_dict def get_forcing_dict_newway_parallel( + feature_index, feature_list, folder_prefix, file_list, + var_list, para="thread", para_n=2, ): + import concurrent.futures + reng = "rasterio" _xds = xr.open_dataset(folder_prefix.joinpath(file_list[0]), engine=reng) _template_arr = _xds.U2D.values - _u2d = MemoryDataset( _template_arr, transform=_xds.U2D.rio.transform(), @@ -67,7 +83,10 @@ def get_forcing_dict_newway_parallel( crs=None, copy=False, ) - filehandles = [xr.open_dataset("data/" + f) for f in file_list] + ds_list = [xr.open_dataset(folder_prefix.joinpath(f)) for f in file_list] + # ds_list = [xr.open_dataset(folder_prefix.joinpath(f), engine=reng) for f in file_list] + # TODO: figure out why using the rasterio engine DOES NOT WORK with parallel + # TODO: figure out why NOT using the rasterio engine produces a different result if para == "process": pool = concurrent.futures.ProcessPoolExecutor @@ -76,71 +95,101 @@ def get_forcing_dict_newway_parallel( else: pool = concurrent.futures.ThreadPoolExecutor + stats = [] + future_list = [] + with pool(max_workers=para_n) as executor: - stats = [] - future_list = [] - for i, m in enumerate(map(polymask(_u2d), feature_list)): - print(f"{i}, {round(i/len(feature_list), 5)*100}".ljust(40), end="\r") - mask, _, window = m + for _i, _m in enumerate(map(polymask(_u2d), feature_list)): + print(f"{_i}, {round(_i/len(feature_list), 5)*100}".ljust(40), end="\r") + mask, _, window = _m mask = xr.DataArray(mask, dims=["y", "x"]) winslices = dict(zip(["y", "x"], window.toslices())) - for f in filehandles: - future = executor.submit(xr_read_window, f, winslices, mask=mask) + for ds in ds_list: + _t = ds.time.values[0] + future = executor.submit( + xr_read_window_time, ds, winslices, mask=mask, idx=_i, time=_t + ) # cropped = xr_read_window(f, winslices, mask=mask) # stats.append(cropped.mean()) future_list.append(future) for _f in concurrent.futures.as_completed(future_list): - stats.append(_f.result().mean()) + _j, _t, _s = _f.result() + stats.append((_j, _t, _s)) - [f.close() for f in filehandles] - return stats + df_dict = {} + for _v in var_list: + df_dict[_v] = pd.DataFrame(index=feature_index) + + for j, t, s in stats: + for var in var_list: + df_dict[var].loc[j, t] = s[var].mean() + + [ds.close() for ds in ds_list] + return df_dict def get_forcing_dict_newway_inverted( + feature_index, feature_list, folder_prefix, file_list, + var_list, ): reng = "rasterio" - _xds = xr.open_dataset(folder_prefix.joinpath(file_list[0]), engine=reng) - _template_arr = _xds.U2D.values + + _xds_dummy = xr.open_dataset(folder_prefix.joinpath(file_list[0]), engine=reng) + _template_arr = _xds_dummy.U2D.values _u2d = MemoryDataset( _template_arr, - transform=_xds.U2D.rio.transform(), + transform=_xds_dummy.U2D.rio.transform(), gcps=None, rpcs=None, crs=None, copy=False, ) + ds_list = [] + for _nc_file in file_list: + _full_nc_file = folder_prefix.joinpath(_nc_file) + ds_list.append(xr.open_dataset(_full_nc_file, engine=reng)) - filehandles = [xr.open_dataset("data/" + f) for f in file_list] stats = [] - future_list = [] mask_win_list = [] - for i, m in enumerate(map(polymask(_u2d), feature_list)): + for i, feature in enumerate(feature_list): print(f"{i}, {round(i/len(feature_list), 5)*100}".ljust(40), end="\r") - mask, _, window = m + mask, _, window = polymask(_u2d)(feature) mask = xr.DataArray(mask, dims=["y", "x"]) winslices = dict(zip(["y", "x"], window.toslices())) mask_win_list.append((mask, winslices)) - for f in filehandles: + for i, f in enumerate(ds_list): print(f"{i}, {round(i/len(file_list), 2)*100}".ljust(40), end="\r") - for _m, _w in mask_win_list: + time_value = f.time.values[0] + # TODO: when we read the window, could the time be added as a dimension? + for j, (_m, _w) in enumerate(mask_win_list): cropped = xr_read_window(f, _w, mask=_m) - stats.append(cropped.mean()) + stats.append((j, time_value, cropped.mean())) + + df_dict = {} + for _v in var_list: + df_dict[_v] = pd.DataFrame(index=feature_index) + + for j, t, s in stats: + for var in var_list: + df_dict[var].loc[j, t] = s[var] - [f.close() for f in filehandles] - return stats + [ds.close() for ds in ds_list] + return df_dict def get_forcing_dict_newway_inverted_parallel( + feature_index, feature_list, folder_prefix, file_list, + var_list, para="thread", para_n=2, ): @@ -167,7 +216,11 @@ def get_forcing_dict_newway_inverted_parallel( winslices = dict(zip(["y", "x"], window.toslices())) mask_win_list.append((mask, winslices)) - filehandles = [xr.open_dataset("data/" + f) for f in file_list] + ds_list = [xr.open_dataset(folder_prefix.joinpath(f)) for f in file_list] + # ds_list = [xr.open_dataset(folder_prefix.joinpath(f), engine=reng) for f in file_list] + # TODO: figure out why using the rasterio engine DOES NOT WORK with parallel + # TODO: figure out why NOT using the rasterio engine produces a different result + stats = [] future_list = [] @@ -180,15 +233,27 @@ def get_forcing_dict_newway_inverted_parallel( with pool(max_workers=para_n) as executor: - for f in filehandles: - print(f"{i}, {round(i/len(file_list), 2)*100}".ljust(40), end="\r") - for _m, _w in mask_win_list: - future = executor.submit(xr_read_window, f, _w, mask=_m) - # cropped = xr_read_window(f, _w, mask=_m) + for j, ds in enumerate(ds_list): + print(f"{j}, {round(i/len(file_list), 2)*100}".ljust(40), end="\r") + _t = ds.time.values[0] + for _i, (_m, _w) in enumerate(mask_win_list): + future = executor.submit( + xr_read_window_time, ds, _w, mask=_m, idx=_i, time=_t + ) + # cropped = xr_read_window(ds, _w, mask=_m) # stats.append(cropped.mean()) future_list.append(future) for _f in concurrent.futures.as_completed(future_list): - stats.append(_f.result().mean()) + _j, _t, _s = _f.result() + stats.append((_j, _t, _s)) + + df_dict = {} + for _v in var_list: + df_dict[_v] = pd.DataFrame(index=feature_index) + + for j, t, s in stats: + for var in var_list: + df_dict[var].loc[j, t] = s[var].mean() - [f.close() for f in filehandles] - return stats + [ds.close() for ds in ds_list] + return df_dict diff --git a/ngen_forcing/test_process_nwm_forcing_to_ngen.py b/ngen_forcing/test_process_nwm_forcing_to_ngen.py index 5fe24a3..5ea89cc 100644 --- a/ngen_forcing/test_process_nwm_forcing_to_ngen.py +++ b/ngen_forcing/test_process_nwm_forcing_to_ngen.py @@ -49,7 +49,8 @@ def get_forcing_dict( - gpkg_divides, + feature_index, + feature_list, folder_prefix, filelist, var_list, @@ -59,7 +60,7 @@ def get_forcing_dict( df_dict = {} for _v in var_list: - df_dict[_v] = pd.DataFrame(index=gpkg_divides.index) + df_dict[_v] = pd.DataFrame(index=feature_index) ds_list = [] for _nc_file in filelist: @@ -77,7 +78,7 @@ def get_forcing_dict( _arr2 = _src.values[0] _df_zonal_stats = pd.DataFrame( - zonal_stats(gpkg_divides, _arr2, affine=_aff2) + zonal_stats(feature_list, _arr2, affine=_aff2) ) # if adding statistics back to original GeoDataFrame # gdf3 = pd.concat([gpkg_divides, _df_zonal_stats], axis=1) @@ -121,10 +122,12 @@ def main(): # This way is extremely slow for anything more than a # few files, so we comment it out of the test + start_time = time.time() print(f"Working on the old (slow) way") fd1 = get_forcing_dict( - gpkg_subset, + gpkg_subset.index, + feature_list, folder_prefix, file_list, var_list, @@ -134,18 +137,22 @@ def main(): start_time = time.time() print(f"Working on the new way") fd2 = get_forcing_dict_newway( + gpkg_subset.index, feature_list, folder_prefix, file_list, + var_list, ) print(time.time() - start_time) start_time = time.time() print(f"Working on the new way with threading parallel.") - fd3 = get_forcing_dict_newway_parallel( + fd3t = get_forcing_dict_newway_parallel( + gpkg_subset.index, feature_list, folder_prefix, file_list, + var_list, para="thread", para_n=16, ) @@ -153,10 +160,12 @@ def main(): start_time = time.time() print(f"Working on the new way with process parallel.") - fd3 = get_forcing_dict_newway_parallel( + fd3p = get_forcing_dict_newway_parallel( + gpkg_subset.index, feature_list, folder_prefix, file_list, + var_list, para="process", para_n=16, ) @@ -165,18 +174,22 @@ def main(): start_time = time.time() print(f"Working on the new way with loops reversed.") fd4 = get_forcing_dict_newway_inverted( + gpkg_subset.index, feature_list, folder_prefix, file_list, + var_list, ) print(time.time() - start_time) start_time = time.time() print(f"Working on the new way with loops reversed with threading parallel.") - fd4 = get_forcing_dict_newway_inverted_parallel( + fd5t = get_forcing_dict_newway_inverted_parallel( + gpkg_subset.index, feature_list, folder_prefix, file_list, + var_list, para="thread", para_n=16, ) @@ -184,10 +197,12 @@ def main(): start_time = time.time() print(f"Working on the new way with loops reversed with process parallel.") - fd4 = get_forcing_dict_newway_inverted_parallel( + fd5p = get_forcing_dict_newway_inverted_parallel( + gpkg_subset.index, feature_list, folder_prefix, file_list, + var_list, para="process", para_n=16, )