From 889df5d482c28a8e25994fdb9bb8d96bd59a8d67 Mon Sep 17 00:00:00 2001 From: James Halgren Date: Fri, 24 Mar 2023 10:24:46 -0500 Subject: [PATCH 1/3] fix duplicate variable names in test --- .../test_process_nwm_forcing_to_ngen.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/ngen_forcing/test_process_nwm_forcing_to_ngen.py b/ngen_forcing/test_process_nwm_forcing_to_ngen.py index 5fe24a3..b068ac1 100644 --- a/ngen_forcing/test_process_nwm_forcing_to_ngen.py +++ b/ngen_forcing/test_process_nwm_forcing_to_ngen.py @@ -49,7 +49,8 @@ def get_forcing_dict( - gpkg_divides, + feature_index, + feature_list, folder_prefix, filelist, var_list, @@ -59,7 +60,7 @@ def get_forcing_dict( df_dict = {} for _v in var_list: - df_dict[_v] = pd.DataFrame(index=gpkg_divides.index) + df_dict[_v] = pd.DataFrame(index=feature_index) ds_list = [] for _nc_file in filelist: @@ -77,7 +78,7 @@ def get_forcing_dict( _arr2 = _src.values[0] _df_zonal_stats = pd.DataFrame( - zonal_stats(gpkg_divides, _arr2, affine=_aff2) + zonal_stats(feature_list, _arr2, affine=_aff2) ) # if adding statistics back to original GeoDataFrame # gdf3 = pd.concat([gpkg_divides, _df_zonal_stats], axis=1) @@ -121,10 +122,12 @@ def main(): # This way is extremely slow for anything more than a # few files, so we comment it out of the test + start_time = time.time() print(f"Working on the old (slow) way") fd1 = get_forcing_dict( - gpkg_subset, + gpkg_subset.index, + feature_list, folder_prefix, file_list, var_list, @@ -142,7 +145,7 @@ def main(): start_time = time.time() print(f"Working on the new way with threading parallel.") - fd3 = get_forcing_dict_newway_parallel( + fd3t = get_forcing_dict_newway_parallel( feature_list, folder_prefix, file_list, @@ -153,7 +156,7 @@ def main(): start_time = time.time() print(f"Working on the new way with process parallel.") - fd3 = get_forcing_dict_newway_parallel( + fd3p = get_forcing_dict_newway_parallel( feature_list, folder_prefix, file_list, @@ -173,7 +176,7 @@ def main(): start_time = time.time() print(f"Working on the new way with loops reversed with threading parallel.") - fd4 = get_forcing_dict_newway_inverted_parallel( + fd5t = get_forcing_dict_newway_inverted_parallel( feature_list, folder_prefix, file_list, @@ -184,7 +187,7 @@ def main(): start_time = time.time() print(f"Working on the new way with loops reversed with process parallel.") - fd4 = get_forcing_dict_newway_inverted_parallel( + fd5p = get_forcing_dict_newway_inverted_parallel( feature_list, folder_prefix, file_list, From b6a49f73dbf576dcdf3fdc6cefa9c983e03a4a86 Mon Sep 17 00:00:00 2001 From: James Halgren Date: Mon, 27 Mar 2023 16:28:17 -0500 Subject: [PATCH 2/3] update single-thread implementations --- ngen_forcing/defs.py | 8 ++ ngen_forcing/process_nwm_forcing_to_ngen.py | 77 +++++++++++++------ .../test_process_nwm_forcing_to_ngen.py | 4 + 3 files changed, 65 insertions(+), 24 deletions(-) diff --git a/ngen_forcing/defs.py b/ngen_forcing/defs.py index 58b2c1d..18175c8 100644 --- a/ngen_forcing/defs.py +++ b/ngen_forcing/defs.py @@ -16,3 +16,11 @@ def xr_read_window(ds, window, mask=None): return data else: return data.where(mask) + + +def xr_read_window_time(ds, window, mask=None, idx=None, time=None): + data = ds.isel(window) + if mask is None: + return idx, time, data + else: + return idx, time, data.where(mask) diff --git a/ngen_forcing/process_nwm_forcing_to_ngen.py b/ngen_forcing/process_nwm_forcing_to_ngen.py index 2a34f9a..f607e47 100644 --- a/ngen_forcing/process_nwm_forcing_to_ngen.py +++ b/ngen_forcing/process_nwm_forcing_to_ngen.py @@ -2,6 +2,7 @@ from rasterio import _io, windows import concurrent.futures import xarray as xr +import pandas as pd class MemoryDataset(_io.MemoryDataset, windows.WindowMethodsMixin): @@ -15,36 +16,49 @@ def junk(): def get_forcing_dict_newway( + feature_index, feature_list, folder_prefix, file_list, + var_list, ): reng = "rasterio" - _xds = xr.open_dataset(folder_prefix.joinpath(file_list[0]), engine=reng) - _template_arr = _xds.U2D.values + + _xds_dummy = xr.open_dataset(folder_prefix.joinpath(file_list[0]), engine=reng) + _template_arr = _xds_dummy.U2D.values _u2d = MemoryDataset( _template_arr, - transform=_xds.U2D.rio.transform(), + transform=_xds_dummy.U2D.rio.transform(), gcps=None, rpcs=None, crs=None, copy=False, ) - filehandles = [xr.open_dataset("data/" + f) for f in file_list] - stats = [] - for i, m in enumerate(map(polymask(_u2d), feature_list)): + ds_list = [] + for _nc_file in file_list: + _full_nc_file = folder_prefix.joinpath(_nc_file) + ds_list.append(xr.open_dataset(_full_nc_file, engine=reng)) + + df_dict = {} + for _v in var_list: + df_dict[_v] = pd.DataFrame(index=feature_index) + + for i, feature in enumerate(feature_list): print(f"{i}, {round(i/len(feature_list), 5)*100}".ljust(40), end="\r") - mask, _, window = m + mask, _, window = polymask(_u2d)(feature) mask = xr.DataArray(mask, dims=["y", "x"]) winslices = dict(zip(["y", "x"], window.toslices())) - for f in filehandles: - cropped = xr_read_window(f, winslices, mask=mask) - stats.append(cropped.mean()) - [f.close() for f in filehandles] # Returns None for each file + for j, _xds in enumerate(ds_list): + time_value = _xds.time.values[0] + cropped = xr_read_window(_xds, winslices, mask=mask) + stats = cropped.mean() + for var in var_list: + df_dict[var].loc[i, time_value] = stats[var] - return stats + [ds.close() for ds in ds_list] + return df_dict def get_forcing_dict_newway_parallel( @@ -98,43 +112,58 @@ def get_forcing_dict_newway_parallel( def get_forcing_dict_newway_inverted( + feature_index, feature_list, folder_prefix, file_list, + var_list, ): reng = "rasterio" - _xds = xr.open_dataset(folder_prefix.joinpath(file_list[0]), engine=reng) - _template_arr = _xds.U2D.values + + _xds_dummy = xr.open_dataset(folder_prefix.joinpath(file_list[0]), engine=reng) + _template_arr = _xds_dummy.U2D.values _u2d = MemoryDataset( _template_arr, - transform=_xds.U2D.rio.transform(), + transform=_xds_dummy.U2D.rio.transform(), gcps=None, rpcs=None, crs=None, copy=False, ) + ds_list = [] + for _nc_file in file_list: + _full_nc_file = folder_prefix.joinpath(_nc_file) + ds_list.append(xr.open_dataset(_full_nc_file, engine=reng)) - filehandles = [xr.open_dataset("data/" + f) for f in file_list] stats = [] - future_list = [] mask_win_list = [] - for i, m in enumerate(map(polymask(_u2d), feature_list)): + for i, feature in enumerate(feature_list): print(f"{i}, {round(i/len(feature_list), 5)*100}".ljust(40), end="\r") - mask, _, window = m + mask, _, window = polymask(_u2d)(feature) mask = xr.DataArray(mask, dims=["y", "x"]) winslices = dict(zip(["y", "x"], window.toslices())) mask_win_list.append((mask, winslices)) - for f in filehandles: + for i, f in enumerate(ds_list): print(f"{i}, {round(i/len(file_list), 2)*100}".ljust(40), end="\r") - for _m, _w in mask_win_list: + time_value = f.time.values[0] + # TODO: when we read the window, could the time be added as a dimension? + for j, (_m, _w) in enumerate(mask_win_list): cropped = xr_read_window(f, _w, mask=_m) - stats.append(cropped.mean()) + stats.append((j, time_value, cropped.mean())) - [f.close() for f in filehandles] - return stats + df_dict = {} + for _v in var_list: + df_dict[_v] = pd.DataFrame(index=feature_index) + + for j, t, s in stats: + for var in var_list: + df_dict[var].loc[j, t] = s[var] + + [ds.close() for ds in ds_list] + return df_dict def get_forcing_dict_newway_inverted_parallel( diff --git a/ngen_forcing/test_process_nwm_forcing_to_ngen.py b/ngen_forcing/test_process_nwm_forcing_to_ngen.py index b068ac1..b7717d0 100644 --- a/ngen_forcing/test_process_nwm_forcing_to_ngen.py +++ b/ngen_forcing/test_process_nwm_forcing_to_ngen.py @@ -137,9 +137,11 @@ def main(): start_time = time.time() print(f"Working on the new way") fd2 = get_forcing_dict_newway( + gpkg_subset.index, feature_list, folder_prefix, file_list, + var_list, ) print(time.time() - start_time) @@ -168,9 +170,11 @@ def main(): start_time = time.time() print(f"Working on the new way with loops reversed.") fd4 = get_forcing_dict_newway_inverted( + gpkg_subset.index, feature_list, folder_prefix, file_list, + var_list, ) print(time.time() - start_time) From 0fd7d8b3a87fa7e1aeadb6b480add42df914c945 Mon Sep 17 00:00:00 2001 From: James Halgren Date: Mon, 27 Mar 2023 16:28:43 -0500 Subject: [PATCH 3/3] update parallel implementations --- ngen_forcing/process_nwm_forcing_to_ngen.py | 82 +++++++++++++------ .../test_process_nwm_forcing_to_ngen.py | 8 ++ 2 files changed, 67 insertions(+), 23 deletions(-) diff --git a/ngen_forcing/process_nwm_forcing_to_ngen.py b/ngen_forcing/process_nwm_forcing_to_ngen.py index f607e47..ec299b5 100644 --- a/ngen_forcing/process_nwm_forcing_to_ngen.py +++ b/ngen_forcing/process_nwm_forcing_to_ngen.py @@ -1,6 +1,5 @@ -from defs import xr_read_window, polymask +from defs import xr_read_window, polymask, xr_read_window_time from rasterio import _io, windows -import concurrent.futures import xarray as xr import pandas as pd @@ -62,17 +61,20 @@ def get_forcing_dict_newway( def get_forcing_dict_newway_parallel( + feature_index, feature_list, folder_prefix, file_list, + var_list, para="thread", para_n=2, ): + import concurrent.futures + reng = "rasterio" _xds = xr.open_dataset(folder_prefix.joinpath(file_list[0]), engine=reng) _template_arr = _xds.U2D.values - _u2d = MemoryDataset( _template_arr, transform=_xds.U2D.rio.transform(), @@ -81,7 +83,10 @@ def get_forcing_dict_newway_parallel( crs=None, copy=False, ) - filehandles = [xr.open_dataset("data/" + f) for f in file_list] + ds_list = [xr.open_dataset(folder_prefix.joinpath(f)) for f in file_list] + # ds_list = [xr.open_dataset(folder_prefix.joinpath(f), engine=reng) for f in file_list] + # TODO: figure out why using the rasterio engine DOES NOT WORK with parallel + # TODO: figure out why NOT using the rasterio engine produces a different result if para == "process": pool = concurrent.futures.ProcessPoolExecutor @@ -90,25 +95,38 @@ def get_forcing_dict_newway_parallel( else: pool = concurrent.futures.ThreadPoolExecutor + stats = [] + future_list = [] + with pool(max_workers=para_n) as executor: - stats = [] - future_list = [] - for i, m in enumerate(map(polymask(_u2d), feature_list)): - print(f"{i}, {round(i/len(feature_list), 5)*100}".ljust(40), end="\r") - mask, _, window = m + for _i, _m in enumerate(map(polymask(_u2d), feature_list)): + print(f"{_i}, {round(_i/len(feature_list), 5)*100}".ljust(40), end="\r") + mask, _, window = _m mask = xr.DataArray(mask, dims=["y", "x"]) winslices = dict(zip(["y", "x"], window.toslices())) - for f in filehandles: - future = executor.submit(xr_read_window, f, winslices, mask=mask) + for ds in ds_list: + _t = ds.time.values[0] + future = executor.submit( + xr_read_window_time, ds, winslices, mask=mask, idx=_i, time=_t + ) # cropped = xr_read_window(f, winslices, mask=mask) # stats.append(cropped.mean()) future_list.append(future) for _f in concurrent.futures.as_completed(future_list): - stats.append(_f.result().mean()) + _j, _t, _s = _f.result() + stats.append((_j, _t, _s)) - [f.close() for f in filehandles] - return stats + df_dict = {} + for _v in var_list: + df_dict[_v] = pd.DataFrame(index=feature_index) + + for j, t, s in stats: + for var in var_list: + df_dict[var].loc[j, t] = s[var].mean() + + [ds.close() for ds in ds_list] + return df_dict def get_forcing_dict_newway_inverted( @@ -167,9 +185,11 @@ def get_forcing_dict_newway_inverted( def get_forcing_dict_newway_inverted_parallel( + feature_index, feature_list, folder_prefix, file_list, + var_list, para="thread", para_n=2, ): @@ -196,7 +216,11 @@ def get_forcing_dict_newway_inverted_parallel( winslices = dict(zip(["y", "x"], window.toslices())) mask_win_list.append((mask, winslices)) - filehandles = [xr.open_dataset("data/" + f) for f in file_list] + ds_list = [xr.open_dataset(folder_prefix.joinpath(f)) for f in file_list] + # ds_list = [xr.open_dataset(folder_prefix.joinpath(f), engine=reng) for f in file_list] + # TODO: figure out why using the rasterio engine DOES NOT WORK with parallel + # TODO: figure out why NOT using the rasterio engine produces a different result + stats = [] future_list = [] @@ -209,15 +233,27 @@ def get_forcing_dict_newway_inverted_parallel( with pool(max_workers=para_n) as executor: - for f in filehandles: - print(f"{i}, {round(i/len(file_list), 2)*100}".ljust(40), end="\r") - for _m, _w in mask_win_list: - future = executor.submit(xr_read_window, f, _w, mask=_m) - # cropped = xr_read_window(f, _w, mask=_m) + for j, ds in enumerate(ds_list): + print(f"{j}, {round(i/len(file_list), 2)*100}".ljust(40), end="\r") + _t = ds.time.values[0] + for _i, (_m, _w) in enumerate(mask_win_list): + future = executor.submit( + xr_read_window_time, ds, _w, mask=_m, idx=_i, time=_t + ) + # cropped = xr_read_window(ds, _w, mask=_m) # stats.append(cropped.mean()) future_list.append(future) for _f in concurrent.futures.as_completed(future_list): - stats.append(_f.result().mean()) + _j, _t, _s = _f.result() + stats.append((_j, _t, _s)) + + df_dict = {} + for _v in var_list: + df_dict[_v] = pd.DataFrame(index=feature_index) - [f.close() for f in filehandles] - return stats + for j, t, s in stats: + for var in var_list: + df_dict[var].loc[j, t] = s[var].mean() + + [ds.close() for ds in ds_list] + return df_dict diff --git a/ngen_forcing/test_process_nwm_forcing_to_ngen.py b/ngen_forcing/test_process_nwm_forcing_to_ngen.py index b7717d0..5ea89cc 100644 --- a/ngen_forcing/test_process_nwm_forcing_to_ngen.py +++ b/ngen_forcing/test_process_nwm_forcing_to_ngen.py @@ -148,9 +148,11 @@ def main(): start_time = time.time() print(f"Working on the new way with threading parallel.") fd3t = get_forcing_dict_newway_parallel( + gpkg_subset.index, feature_list, folder_prefix, file_list, + var_list, para="thread", para_n=16, ) @@ -159,9 +161,11 @@ def main(): start_time = time.time() print(f"Working on the new way with process parallel.") fd3p = get_forcing_dict_newway_parallel( + gpkg_subset.index, feature_list, folder_prefix, file_list, + var_list, para="process", para_n=16, ) @@ -181,9 +185,11 @@ def main(): start_time = time.time() print(f"Working on the new way with loops reversed with threading parallel.") fd5t = get_forcing_dict_newway_inverted_parallel( + gpkg_subset.index, feature_list, folder_prefix, file_list, + var_list, para="thread", para_n=16, ) @@ -192,9 +198,11 @@ def main(): start_time = time.time() print(f"Working on the new way with loops reversed with process parallel.") fd5p = get_forcing_dict_newway_inverted_parallel( + gpkg_subset.index, feature_list, folder_prefix, file_list, + var_list, para="process", para_n=16, )