diff --git a/ngen_forcing/VERYROUGH_RTI_Forcing_example.ipynb b/ngen_forcing/VERYROUGH_RTI_Forcing_example.ipynb new file mode 100644 index 0000000..acba744 --- /dev/null +++ b/ngen_forcing/VERYROUGH_RTI_Forcing_example.ipynb @@ -0,0 +1,618 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "c4b19730-d455-418f-a3a3-15efec3a8387", + "metadata": {}, + "outputs": [], + "source": [ + "# !pip install config\n", + "!pip install geoparquet \n", + "import geoparquet as gpq\n", + "import pickle\n", + "# import config\n", + "import os\n", + "import gc\n", + "\n", + "import geopandas as gpd\n", + "import pandas as pd\n", + "import numpy as np\n", + "import xarray as xr\n", + "import rasterio as rio\n", + "\n", + "from rasterio.io import MemoryFile\n", + "\n", + "from rasterio.features import rasterize\n", + "# from utils import parquet_to_gdf\n", + "# from Grid_to_parquet import get_dataset\n", + "\n", + "TEMPLATE_BLOB_NAME = \"nwm.20221001/forcing_medium_range/nwm.t00z.medium_range.forcing.f001.conus.nc\"\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e9159453-76c6-4bbe-b08f-a746a54f48f5", + "metadata": {}, + "outputs": [], + "source": [ + "NWM_BUCKET = \"national-water-model\"\n", + "\n", + "# WKT strings extracted from NWM grids\n", + "CONUS_NWM_WKT = 'PROJCS[\"Lambert_Conformal_Conic\",GEOGCS[\"GCS_Sphere\",DATUM[\"D_Sphere\",SPHEROID[\"Sphere\",6370000.0,0.0]], \\\n", + "PRIMEM[\"Greenwich\",0.0],UNIT[\"Degree\",0.0174532925199433]],PROJECTION[\"Lambert_Conformal_Conic_2SP\"],PARAMETER[\"false_easting\",0.0],\\\n", + "PARAMETER[\"false_northing\",0.0],PARAMETER[\"central_meridian\",-97.0],PARAMETER[\"standard_parallel_1\",30.0],\\\n", + "PARAMETER[\"standard_parallel_2\",60.0],PARAMETER[\"latitude_of_origin\",40.0],UNIT[\"Meter\",1.0]]'\n", + "\n", + "HI_NWM_WKT = 'PROJCS[\"Lambert_Conformal_Conic\",GEOGCS[\"GCS_Sphere\",DATUM[\"D_Sphere\",SPHEROID[\"Sphere\",6370000.0,0.0]],\\\n", + "PRIMEM[\"Greenwich\",0.0],UNIT[\"Degree\",0.0174532925199433]],PROJECTION[\"Lambert_Conformal_Conic_2SP\"],PARAMETER[\"false_easting\",0.0],\\\n", + "PARAMETER[\"false_northing\",0.0],PARAMETER[\"central_meridian\",-157.42],PARAMETER[\"standard_parallel_1\",10.0],\\\n", + "PARAMETER[\"standard_parallel_2\",30.0],PARAMETER[\"latitude_of_origin\",20.6],UNIT[\"Meter\",1.0]]'\n", + "\n", + "PR_NWM_WKT = 'PROJCS[\"Sphere_Lambert_Conformal_Conic\",GEOGCS[\"GCS_Sphere\",DATUM[\"D_Sphere\",SPHEROID[\"Sphere\",6370000.0,0.0]],\\\n", + "PRIMEM[\"Greenwich\",0.0],UNIT[\"Degree\",0.0174532925199433]],PROJECTION[\"Lambert_Conformal_Conic_2SP\"],PARAMETER[\"false_easting\",0.0],\\\n", + "PARAMETER[\"false_northing\",0.0],PARAMETER[\"central_meridian\",-65.91],PARAMETER[\"standard_parallel_1\",18.1],\\\n", + "PARAMETER[\"standard_parallel_2\",18.1],PARAMETER[\"latitude_of_origin\",18.1],UNIT[\"Meter\",1.0]]'" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "eaba8b05-2b98-4aea-974f-d26de67f729b", + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "from pathlib import Path\n", + "\n", + "\n", + "CACHE_DIR = Path(\".\", \"data_3\", \"cache\")\n", + "NWM_CACHE_DIR = os.path.join(CACHE_DIR, \"nwm\")\n", + "USGS_CACHE_DIR = os.path.join(CACHE_DIR, \"usgs\")\n", + "GEO_CACHE_DIR = os.path.join(CACHE_DIR, \"geo\")\n", + "\n", + "NWM_CACHE_H5 = os.path.join(NWM_CACHE_DIR, \"gcp_client.h5\")\n", + "\n", + "PARQUET_CACHE_DIR = os.path.join(CACHE_DIR, \"parquet\")\n", + "MEDIUM_RANGE_FORCING_PARQUET = os.path.join(PARQUET_CACHE_DIR, \"forcing_medium_range\")\n", + "FORCING_ANALYSIS_ASSIM_PARQUET = os.path.join(PARQUET_CACHE_DIR, \"forcing_analysis_assim\")\n", + "MEDIUM_RANGE_PARQUET = os.path.join(PARQUET_CACHE_DIR, \"medium_range\")\n", + "USGS_PARQUET = os.path.join(PARQUET_CACHE_DIR, \"usgs\")\n", + "\n", + "HUC10_SHP_FILEPATH = os.path.join(GEO_CACHE_DIR, \"wbdhu10_conus.shp\")\n", + "HUC10_PARQUET_FILEPATH = os.path.join(GEO_CACHE_DIR, \"wbdhu10_conus.parquet\")\n", + "HUC10_MEDIUM_RANGE_WEIGHTS_FILEPATH = os.path.join(GEO_CACHE_DIR, \"wbdhu10_medium_range_weights.pkl\")\n", + "\n", + "ROUTE_LINK_FILE = os.path.join(NWM_CACHE_DIR, \"RouteLink_CONUS.nc\")\n", + "ROUTE_LINK_PARQUET = os.path.join(NWM_CACHE_DIR, \"route_link_conus.parquet\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "82731a4e-70c8-4cc8-a6e1-2de5ff14a947", + "metadata": {}, + "outputs": [], + "source": [ + "def parquet_to_gdf(parquet_filepath: str) -> gpd.GeoDataFrame:\n", + " gdf = gpd.read_parquet(parquet_filepath)\n", + " return gdf\n", + "\n", + "def get_cache_dir(create: bool = True):\n", + " if not os.path.exists(NWM_CACHE_DIR) and create:\n", + " os.mkdir(NWM_CACHE_DIR)\n", + " if not os.path.exists(NWM_CACHE_DIR):\n", + " raise NotADirectoryError\n", + " return NWM_CACHE_DIR\n", + "\n", + "\n", + "def make_parent_dir(filepath):\n", + " Path(filepath).parent.mkdir(parents=True, exist_ok=True)\n", + "\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "74895a20-a42e-48ff-bff8-313a0ae26d14", + "metadata": {}, + "outputs": [], + "source": [ + "def get_dataset(\n", + " blob_name: str,\n", + " use_cache: bool = True\n", + ") -> xr.Dataset:\n", + " \"\"\"Retrieve a blob from the data service as xarray.Dataset.\n", + " Based largely on OWP HydroTools.\n", + " Parameters\n", + " ----------\n", + " blob_name: str, required\n", + " Name of blob to retrieve.\n", + " use_cacahe: bool, default True\n", + " If cache should be used. \n", + " If True, checks to see if file is in cache, and \n", + " if fetched from remote will save to cache.\n", + " Returns\n", + " -------\n", + " ds : xarray.Dataset\n", + " The data stored in the blob.\n", + " \"\"\"\n", + " nc_filepath = os.path.join(get_cache_dir(), blob_name)\n", + " make_parent_dir(nc_filepath)\n", + "\n", + " # If the file exists and use_cache = True\n", + " if os.path.exists(nc_filepath) and use_cache:\n", + " # Get dataset from cache\n", + " ds = xr.load_dataset(\n", + " nc_filepath,\n", + " engine='h5netcdf',\n", + " )\n", + " return ds\n", + " else:\n", + " # Get raw bytes\n", + " raw_bytes = get_blob(blob_name)\n", + " # Create Dataset\n", + " ds = xr.load_dataset(\n", + " MemoryFile(raw_bytes),\n", + " engine='h5netcdf',\n", + " )\n", + " if use_cache:\n", + " # Subset and cache\n", + " ds[\"RAINRATE\"].to_netcdf(\n", + " nc_filepath,\n", + " engine='h5netcdf',\n", + " )\n", + " return ds\n", + "\n", + "\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b199b72a-0220-4583-ac3a-9f27c1586fe1", + "metadata": {}, + "outputs": [], + "source": [ + "\n", + "def generate_weights_file(\n", + " gdf: gpd.GeoDataFrame,\n", + " src: xr.DataArray,\n", + " weights_filepath: str,\n", + " crosswalk_dict_key: str, \n", + "):\n", + " \"\"\"Generate a weights file.\"\"\"\n", + " \n", + " gdf_proj = gdf.to_crs(CONUS_NWM_WKT)\n", + " \n", + " crosswalk_dict = {}\n", + " \n", + " # This is a probably a really poor performing way to do this \n", + " for index, row in gdf_proj.iterrows():\n", + " geom_rasterize = rasterize([(row[\"geometry\"], 1)],\n", + " out_shape=src.rio.shape,\n", + " transform=src.rio.transform(),\n", + " all_touched=True,\n", + " fill=0,\n", + " dtype='uint8')\n", + " if crosswalk_dict_key:\n", + " crosswalk_dict[row[crosswalk_dict_key]\n", + " ] = np.where(geom_rasterize == 1)\n", + " else:\n", + " crosswalk_dict[index] = np.where(geom_rasterize == 1)\n", + "\n", + " with open(weights_filepath, 'wb') as f:\n", + " pickle.dump(crosswalk_dict, f)\n", + "\n", + " " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "dec7dd6a-483b-4401-9690-4fb213d2fc85", + "metadata": {}, + "outputs": [], + "source": [ + "def add_zonalstats_to_gdf_weights(\n", + " gdf: gpd.GeoDataFrame,\n", + " src: xr.DataArray,\n", + " weights_filepath: str,\n", + ") -> gpd.GeoDataFrame:\n", + " \"\"\"Calculates zonal stats and adds to GeoDataFrame\"\"\"\n", + "\n", + " df = calc_zonal_stats_weights(src, weights_filepath)\n", + " gdf_map = gdf.merge(df, left_on=\"huc10\", right_on=\"catchment_id\")\n", + "\n", + " return gdf_map" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cad942dd-29ec-4ed8-be6d-e5cc13dde0ef", + "metadata": {}, + "outputs": [], + "source": [ + "from google.cloud import storage\n", + "def get_blob(\n", + " blob_name: str,\n", + " bucket: str = NWM_BUCKET\n", + ") -> bytes:\n", + " \"\"\"Retrieve a blob from the data service as bytes.\n", + " Based largely on OWP HydroTools.\n", + " Parameters\n", + " ----------\n", + " blob_name : str, required\n", + " Name of blob to retrieve.\n", + " Returns\n", + " -------\n", + " data : bytes\n", + " The data stored in the blob.\n", + " \"\"\"\n", + " # Setup anonymous client and retrieve blob data\n", + " client = storage.Client.create_anonymous_client()\n", + " bucket = client.bucket(bucket)\n", + " return bucket.blob(blob_name).download_as_bytes(timeout=120)\n", + "\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "bb25a27c-fd98-478c-97ef-c53dbd73a6f1", + "metadata": {}, + "outputs": [], + "source": [ + "%%time\n", + "### MAIN\n", + "ds = get_dataset(TEMPLATE_BLOB_NAME, use_cache=True)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e8e7f87d-ae19-454e-b958-2668e15d5c10", + "metadata": {}, + "outputs": [], + "source": [ + "%%time\n", + "polygonfile = gpd.read_file(\"03w/nextgen_03W.gpkg\",layer=\"divides\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "95f6ad0a-ec08-42aa-a9a9-4694dd7ad1e0", + "metadata": {}, + "outputs": [], + "source": [ + "%%time\n", + "polygonfile.to_parquet(\"03w/ng_03W.parquet\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d540ca78-43d5-4387-bd77-c6d8848ca576", + "metadata": {}, + "outputs": [], + "source": [ + "%%time\n", + "gdf = parquet_to_gdf(\"03w/ng_03W.parquet\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "fb1dc54e-89ff-4d91-b73e-350da31e3ebb", + "metadata": {}, + "outputs": [], + "source": [ + "%%time\n", + "src = ds[\"RAINRATE\"]\n", + "generate_weights_file(gdf, src, \"data_3/weights.pkl\", crosswalk_dict_key=\"id\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d2101a68-2365-41db-a226-e70f661ed57a", + "metadata": {}, + "outputs": [], + "source": [ + "# !pip install ipdb\n", + "def calc_zonal_stats_weights(\n", + " src: xr.DataArray,\n", + " weights_filepath: str,\n", + ") -> pd.DataFrame:\n", + " \"\"\"Calculates zonal stats\"\"\"\n", + "\n", + " # Open weights dict from pickle\n", + " # This could probably be done once and passed as a reference.\n", + " with open(weights_filepath, 'rb') as f:\n", + " crosswalk_dict = pickle.load(f)\n", + "\n", + " r_array = src.values[0]\n", + " r_array[r_array == src.rio.nodata] = np.nan\n", + "\n", + " mean_dict = {}\n", + " for key, value in crosswalk_dict.items():\n", + " mean_dict[key] = np.nanmean(r_array[value])\n", + "\n", + " df = pd.DataFrame.from_dict(mean_dict,\n", + " orient='index',\n", + " columns=['value'])\n", + "\n", + " df.reset_index(inplace=True, names=\"catchment_id\")\n", + "\n", + " # This should not be needed, but without memory usage grows\n", + " del crosswalk_dict\n", + " del f\n", + " gc.collect()\n", + "\n", + " return df" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cc543b89-9335-4a30-bdae-83c30293d6dd", + "metadata": {}, + "outputs": [], + "source": [ + "%%time\n", + "calc_zonal_stats_weights(src, \"data_3/weights.pkl\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "fa8c71d3-2fd1-46eb-8fee-d3f0ddb00929", + "metadata": {}, + "outputs": [], + "source": [ + "def get_forcing_dict_RTIway(\n", + " pickle_file, # This would be a Feature list for parallel calling -- \n", + " # if there is a stored weights file, we use it \n", + " # (checking for an optional flag to force re-creation of the weights...)\n", + " folder_prefix,\n", + " file_list,\n", + "):\n", + "\n", + " var = \"RAINRATE\"\n", + " reng = \"rasterio\"\n", + " filehandles = [xr.open_dataset(folder_prefix / f, engine=reng)[var] for f in file_list]\n", + " # filehandles = [get_dataset(\"data/\" + f, use_cache=True) for f in file_list]\n", + " stats = []\n", + "\n", + " for _i, f in enumerate(filehandles):\n", + " print(f\"{_i}, {round(_i/len(file_list), 2)*100}\".ljust(40), end=\"\\r\")\n", + " stats.append(calc_zonal_stats_weights(f, pickle_file))\n", + "\n", + " [f.close() for f in filehandles]\n", + " return stats\n", + "\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "129e1681-d89c-4636-a487-0432fe7e283c", + "metadata": {}, + "outputs": [], + "source": [ + "\n", + "def get_forcing_dict_RTIway2(\n", + " pickle_file, # This would be a Feature list for parallel calling -- \n", + " # if there is a stored weights file, we use it \n", + " # (checking for an optional flag to force re-creation of the weights...)\n", + " gpkg_divides,\n", + " folder_prefix,\n", + " filelist,\n", + " var_list,\n", + "):\n", + " reng = \"rasterio\"\n", + " pick_val = \"value\"\n", + "\n", + " df_dict = {}\n", + " for _v in var_list:\n", + " df_dict[_v] = pd.DataFrame(index=gpkg_divides.index)\n", + "\n", + " ds_list = []\n", + " for _i, _nc_file in enumerate(filelist):\n", + " # _nc_file = (\"nwm.t00z.medium_range.forcing.f001.conus.nc\")\n", + " _full_nc_file = folder_prefix.joinpath(_nc_file)\n", + "\n", + " with xr.open_dataset(_full_nc_file, engine=reng) as _xds:\n", + " #_xds = ds_list[_i]\n", + " print(f\"{_i}, {round(_i/len(filelist), 5)*100}\".ljust(40), end=\"\\r\")\n", + " for _v in var_list:\n", + " _src = _xds[_v]\n", + "\n", + " # import ipdb; ipdb.set_trace()\n", + " _df_zonal_stats = (calc_zonal_stats_weights(_src, pickle_file))\n", + " # if adding statistics back to original GeoDataFrame\n", + " # gdf3 = pd.concat([gpkg_divides, _df_zonal_stats], axis=1)\n", + " df_dict[_v][_xds.time.values[0]] = _df_zonal_stats[pick_val]\n", + "\n", + " [_xds.close() for _xds in ds_list]\n", + "\n", + " return df_dict\n", + "\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8459aaa9-11ef-40d7-bced-1217ab59a2f5", + "metadata": {}, + "outputs": [], + "source": [ + "%%time\n", + "import time\n", + "folder_prefix = Path(\"data\")\n", + "list_of_files = [\n", + " f\"nwm.t12z.medium_range.forcing.f{_r:03}.conus.nc\" for _r in range(1, 241)\n", + "]\n", + "\n", + "f_03 = \"03w/nextgen_03W.gpkg\"\n", + "gpkg_divides = gpd.read_file(f_03, layer=\"divides\")\n", + "var_list = [\n", + " \"U2D\",\n", + " \"V2D\",\n", + " \"LWDOWN\",\n", + " \"RAINRATE\",\n", + " \"T2D\",\n", + " \"Q2D\",\n", + " \"PSFC\",\n", + " \"SWDOWN\",\n", + "]\n", + "\n", + "\n", + "# file_list = list_of_files[0:18]\n", + "file_list = list_of_files[0:3]\n", + "file_list = list_of_files[0:]\n", + "\n", + "pickle_file = \"data_3/weights.pkl\"\n", + "\n", + "start_time = time.time()\n", + "print(f\"Working on the new way\")\n", + "fd2 = get_forcing_dict_RTIway2(\n", + " pickle_file,\n", + " gpkg_divides,\n", + " folder_prefix,\n", + " file_list,\n", + " var_list,\n", + ")\n", + "print(time.time() - start_time)\n", + "\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d37d2590-1de1-4e9c-85f2-9c7a1172f07c", + "metadata": {}, + "outputs": [], + "source": [ + "# %%time\n", + "# if 1 == 1:\n", + "# _f = folder_prefix / list_of_files[0]\n", + "# src = xr.open_dataset(_f)[\"RAINRATE\"]\n", + "# generate_weights_file(gdf, src, \"data_3/weights_alt.pkl\", crosswalk_dict_key=\"id\")\n", + " " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ea5887b5-b637-4642-bd47-ade6118e6532", + "metadata": {}, + "outputs": [], + "source": [ + "len(fd2)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "61e4d101-7973-4e64-8619-4ca6ab9a0a18", + "metadata": {}, + "outputs": [], + "source": [ + "fd2[\"U2D\"]\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d3885ed0-8b83-46af-afc9-75639ff8b7c3", + "metadata": {}, + "outputs": [], + "source": [ + "import pandas as pd\n", + "\n", + "# pcp_var = pd.read_csv(\"RAINRATE.csv\", index_col=0)#.rename(\"APCP_surface\")\n", + "# lw_var = pd.read_csv(\"LWDOWN.csv\", index_col=0)#.rename(\"DLWRF_surface\")\n", + "# sw_var = pd.read_csv(\"SWDOWN.csv\", index_col=0)#.rename(\"DSWRF_surface\")\n", + "# sp_var = pd.read_csv(\"PSFC.csv\", index_col=0)#.rename(\"SPFH_2maboveground\")\n", + "# tmp_var = pd.read_csv(\"T2D.csv\", index_col=0)#.rename(\"TMP_2maboveground\")\n", + "# u2d_var = pd.read_csv(\"U2D.csv\", index_col=0)#.rename(\"UGRD_10maboveground\")\n", + "# v2d_var = pd.read_csv(\"V2D.csv\", index_col=0)#.rename(\"VGRD_10maboveground\")\n", + "# pcp_var2 = pd.read_csv(\"RAINRATE.csv\", index_col=0)#.rename(\"precip_rate\") ##BROKEN!!\n", + "pcp_var = fd2[\"RAINRATE\"] \n", + "lw_var = fd2[\"LWDOWN\"]\n", + "sw_var = fd2[\"SWDOWN\"]\n", + "sp_var = fd2[\"PSFC\"]\n", + "tmp_var = fd2[\"T2D\"]\n", + "u2d_var = fd2[\"U2D\"]\n", + "v2d_var = fd2[\"V2D\"]\n", + "pcp_var2 = fd2[\"RAINRATE\"]\n", + "\n", + "for _i in range(0,40000):\n", + " #_i = 0\n", + " try:\n", + " pcp_var_0 = pcp_var.transpose()[_i].rename(\"APCP_surface\")\n", + " lw_var_0 = lw_var.transpose()[_i].rename(\"DLWRF_surface\")\n", + " sw_var_0 = sw_var.transpose()[_i].rename(\"DSWRF_surface\")\n", + " sp_var_0 = sp_var.transpose()[_i].rename(\"SPFH_2maboveground\")\n", + " tmp_var_0 = tmp_var.transpose()[_i].rename(\"TMP_2maboveground\")\n", + " u2d_var_0 = u2d_var.transpose()[_i].rename(\"UGRD_10maboveground\")\n", + " v2d_var_0 = v2d_var.transpose()[_i].rename(\"VGRD_10maboveground\")\n", + " pcp_var2_0 = pcp_var2.transpose()[_i].rename(\"precip_rate\") ##BROKEN!!\n", + "\n", + " d = pd.concat([pcp_var_0, lw_var_0, sw_var_0, sp_var_0, tmp_var_0, u2d_var_0, v2d_var_0, pcp_var2_0], axis=1)\n", + " d.index.name = \"time\"\n", + "\n", + "\n", + " d.to_csv(f\"input_data_240/cat03w_{_i:07}.csv\")\n", + " except:\n", + " print(f\"no data for watershed {_i}\", end=\"\\t\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "358d9a99-2016-4c51-b65c-e9cd92fce358", + "metadata": {}, + "outputs": [], + "source": [ + "## Make a shell script string to rename the csvs...\n", + "gpkg_divides[\"id\"]\n", + "for _i, cat_id in enumerate(gpkg_divides[\"id\"]):\n", + " print(f\"mv cat03w_{_i:07}.csv cat03w_{cat_id}.csv\")\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "52383127-cc94-49cc-a0ba-3d3e2057fc08", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.9" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/ngen_forcing/defs.py b/ngen_forcing/defs.py new file mode 100644 index 0000000..58b2c1d --- /dev/null +++ b/ngen_forcing/defs.py @@ -0,0 +1,18 @@ +import rasterio.mask as riomask + + +def polymask(dataset, invert=False, all_touched=False): + def _polymask(poly): + return riomask.raster_geometry_mask( + dataset, [poly], invert=invert, all_touched=all_touched, crop=True + ) + + return _polymask + + +def xr_read_window(ds, window, mask=None): + data = ds.isel(window) + if mask is None: + return data + else: + return data.where(mask) diff --git a/ngen_forcing/process_nwm_forcing_to_ngen.py b/ngen_forcing/process_nwm_forcing_to_ngen.py new file mode 100644 index 0000000..2a34f9a --- /dev/null +++ b/ngen_forcing/process_nwm_forcing_to_ngen.py @@ -0,0 +1,194 @@ +from defs import xr_read_window, polymask +from rasterio import _io, windows +import concurrent.futures +import xarray as xr + + +class MemoryDataset(_io.MemoryDataset, windows.WindowMethodsMixin): + pass + + +def junk(): + + flist = gpkg_divides.geometry.to_list() + polys = flist + + +def get_forcing_dict_newway( + feature_list, + folder_prefix, + file_list, +): + + reng = "rasterio" + _xds = xr.open_dataset(folder_prefix.joinpath(file_list[0]), engine=reng) + _template_arr = _xds.U2D.values + _u2d = MemoryDataset( + _template_arr, + transform=_xds.U2D.rio.transform(), + gcps=None, + rpcs=None, + crs=None, + copy=False, + ) + + filehandles = [xr.open_dataset("data/" + f) for f in file_list] + stats = [] + for i, m in enumerate(map(polymask(_u2d), feature_list)): + print(f"{i}, {round(i/len(feature_list), 5)*100}".ljust(40), end="\r") + mask, _, window = m + mask = xr.DataArray(mask, dims=["y", "x"]) + winslices = dict(zip(["y", "x"], window.toslices())) + for f in filehandles: + cropped = xr_read_window(f, winslices, mask=mask) + stats.append(cropped.mean()) + [f.close() for f in filehandles] # Returns None for each file + + return stats + + +def get_forcing_dict_newway_parallel( + feature_list, + folder_prefix, + file_list, + para="thread", + para_n=2, +): + + reng = "rasterio" + _xds = xr.open_dataset(folder_prefix.joinpath(file_list[0]), engine=reng) + _template_arr = _xds.U2D.values + + _u2d = MemoryDataset( + _template_arr, + transform=_xds.U2D.rio.transform(), + gcps=None, + rpcs=None, + crs=None, + copy=False, + ) + filehandles = [xr.open_dataset("data/" + f) for f in file_list] + + if para == "process": + pool = concurrent.futures.ProcessPoolExecutor + elif para == "thread": + pool = concurrent.futures.ThreadPoolExecutor + else: + pool = concurrent.futures.ThreadPoolExecutor + + with pool(max_workers=para_n) as executor: + stats = [] + future_list = [] + + for i, m in enumerate(map(polymask(_u2d), feature_list)): + print(f"{i}, {round(i/len(feature_list), 5)*100}".ljust(40), end="\r") + mask, _, window = m + mask = xr.DataArray(mask, dims=["y", "x"]) + winslices = dict(zip(["y", "x"], window.toslices())) + for f in filehandles: + future = executor.submit(xr_read_window, f, winslices, mask=mask) + # cropped = xr_read_window(f, winslices, mask=mask) + # stats.append(cropped.mean()) + future_list.append(future) + for _f in concurrent.futures.as_completed(future_list): + stats.append(_f.result().mean()) + + [f.close() for f in filehandles] + return stats + + +def get_forcing_dict_newway_inverted( + feature_list, + folder_prefix, + file_list, +): + + reng = "rasterio" + _xds = xr.open_dataset(folder_prefix.joinpath(file_list[0]), engine=reng) + _template_arr = _xds.U2D.values + _u2d = MemoryDataset( + _template_arr, + transform=_xds.U2D.rio.transform(), + gcps=None, + rpcs=None, + crs=None, + copy=False, + ) + + filehandles = [xr.open_dataset("data/" + f) for f in file_list] + stats = [] + future_list = [] + mask_win_list = [] + + for i, m in enumerate(map(polymask(_u2d), feature_list)): + print(f"{i}, {round(i/len(feature_list), 5)*100}".ljust(40), end="\r") + mask, _, window = m + mask = xr.DataArray(mask, dims=["y", "x"]) + winslices = dict(zip(["y", "x"], window.toslices())) + mask_win_list.append((mask, winslices)) + + for f in filehandles: + print(f"{i}, {round(i/len(file_list), 2)*100}".ljust(40), end="\r") + for _m, _w in mask_win_list: + cropped = xr_read_window(f, _w, mask=_m) + stats.append(cropped.mean()) + + [f.close() for f in filehandles] + return stats + + +def get_forcing_dict_newway_inverted_parallel( + feature_list, + folder_prefix, + file_list, + para="thread", + para_n=2, +): + + import concurrent.futures + + reng = "rasterio" + _xds = xr.open_dataset(folder_prefix.joinpath(file_list[0]), engine=reng) + _template_arr = _xds.U2D.values + _u2d = MemoryDataset( + _template_arr, + transform=_xds.U2D.rio.transform(), + gcps=None, + rpcs=None, + crs=None, + copy=False, + ) + + mask_win_list = [] + for i, m in enumerate(map(polymask(_u2d), feature_list)): + print(f"{i}, {round(i/len(feature_list), 5)*100}".ljust(40), end="\r") + mask, _, window = m + mask = xr.DataArray(mask, dims=["y", "x"]) + winslices = dict(zip(["y", "x"], window.toslices())) + mask_win_list.append((mask, winslices)) + + filehandles = [xr.open_dataset("data/" + f) for f in file_list] + stats = [] + future_list = [] + + if para == "process": + pool = concurrent.futures.ProcessPoolExecutor + elif para == "thread": + pool = concurrent.futures.ThreadPoolExecutor + else: + pool = concurrent.futures.ThreadPoolExecutor + + with pool(max_workers=para_n) as executor: + + for f in filehandles: + print(f"{i}, {round(i/len(file_list), 2)*100}".ljust(40), end="\r") + for _m, _w in mask_win_list: + future = executor.submit(xr_read_window, f, _w, mask=_m) + # cropped = xr_read_window(f, _w, mask=_m) + # stats.append(cropped.mean()) + future_list.append(future) + for _f in concurrent.futures.as_completed(future_list): + stats.append(_f.result().mean()) + + [f.close() for f in filehandles] + return stats diff --git a/ngen_forcing/test_process_nwm_forcing_to_ngen.py b/ngen_forcing/test_process_nwm_forcing_to_ngen.py new file mode 100644 index 0000000..5fe24a3 --- /dev/null +++ b/ngen_forcing/test_process_nwm_forcing_to_ngen.py @@ -0,0 +1,198 @@ +# import rioxarray as rxr +import xarray as xr +import geopandas as gpd +from rasterstats import zonal_stats + +# import rasterio +import pandas as pd + +import time + +from process_nwm_forcing_to_ngen import ( + get_forcing_dict_newway, + get_forcing_dict_newway_parallel, + get_forcing_dict_newway_inverted, + get_forcing_dict_newway_inverted_parallel, +) + +from pathlib import Path +import warnings + +warnings.simplefilter("ignore") + +# Read forcing files +# Generate List of files + +# TODO: Add looping through lists of forcing files +# consider looking at the "listofnwmfilenames.py" in the data_access_examples repository. +# Integer values for runinput, varinput, etc. are listed at the top of the file +# and an example is given in the `main` function. + +# import listofnwmfilenames +# create_file_list( +# runinput, +# varinput, +# geoinput, +# meminput, +# start_date, +# end_date, +# fcst_cycle, +# ) + +""" +A set of test files can be generated downloading these files +wget -P data -c https://storage.googleapis.com/national-water-model/nwm.20220824/forcing_medium_range/nwm.t12z.medium_range.forcing.f001.conus.nc +wget -P data -c https://storage.googleapis.com/national-water-model/nwm.20220824/forcing_medium_range/nwm.t12z.medium_range.forcing.f002.conus.nc +wget -P data -c https://storage.googleapis.com/national-water-model/nwm.20220824/forcing_medium_range/nwm.t12z.medium_range.forcing.f003.conus.nc +wget -P 03w -c https://nextgen-hydrofabric.s3.amazonaws.com/v1.2/nextgen_03W.gpkg +""" + + +def get_forcing_dict( + gpkg_divides, + folder_prefix, + filelist, + var_list, +): + reng = "rasterio" + sum_stat = "mean" + + df_dict = {} + for _v in var_list: + df_dict[_v] = pd.DataFrame(index=gpkg_divides.index) + + ds_list = [] + for _nc_file in filelist: + # _nc_file = ("nwm.t00z.medium_range.forcing.f001.conus.nc") + _full_nc_file = folder_prefix.joinpath(_nc_file) + ds_list.append(xr.open_dataset(_full_nc_file, engine=reng)) + + for _i, _nc_file in enumerate(filelist): + _xds = ds_list[_i] + print(f"{_i}, {round(_i/len(filelist), 5)*100}".ljust(40), end="\r") + if 1 == 1: + for _v in var_list: + _src = _xds[_v] + _aff2 = _src.rio.transform() + _arr2 = _src.values[0] + + _df_zonal_stats = pd.DataFrame( + zonal_stats(gpkg_divides, _arr2, affine=_aff2) + ) + # if adding statistics back to original GeoDataFrame + # gdf3 = pd.concat([gpkg_divides, _df_zonal_stats], axis=1) + df_dict[_v][_xds.time.values[0]] = _df_zonal_stats[sum_stat] + + [_xds.close() for _xds in ds_list] + + return df_dict + + +# TODO: Convert the output to CSV with something like +# `gdf3.to_csv` + + +def main(): + + folder_prefix = Path("data") + list_of_files = [ + f"nwm.t12z.medium_range.forcing.f{_r:03}.conus.nc" for _r in range(1, 241) + ] + + # Read basin boundary file + f_03 = "03w/nextgen_03W.gpkg" + gpkg_divides = gpd.read_file(f_03, layer="divides") + var_list = [ + "U2D", + "V2D", + "LWDOWN", + "RAINRATE", + "T2D", + "Q2D", + "PSFC", + "SWDOWN", + ] + + # file_list = list_of_files[0:30] + # gpkg_subset = gpkg_divides[0:2000] + file_list = list_of_files[0:3] + gpkg_subset = gpkg_divides[0:200] + feature_list = gpkg_subset.geometry.to_list() + + # This way is extremely slow for anything more than a + # few files, so we comment it out of the test + start_time = time.time() + print(f"Working on the old (slow) way") + fd1 = get_forcing_dict( + gpkg_subset, + folder_prefix, + file_list, + var_list, + ) + print(time.time() - start_time) + + start_time = time.time() + print(f"Working on the new way") + fd2 = get_forcing_dict_newway( + feature_list, + folder_prefix, + file_list, + ) + print(time.time() - start_time) + + start_time = time.time() + print(f"Working on the new way with threading parallel.") + fd3 = get_forcing_dict_newway_parallel( + feature_list, + folder_prefix, + file_list, + para="thread", + para_n=16, + ) + print(time.time() - start_time) + + start_time = time.time() + print(f"Working on the new way with process parallel.") + fd3 = get_forcing_dict_newway_parallel( + feature_list, + folder_prefix, + file_list, + para="process", + para_n=16, + ) + print(time.time() - start_time) + + start_time = time.time() + print(f"Working on the new way with loops reversed.") + fd4 = get_forcing_dict_newway_inverted( + feature_list, + folder_prefix, + file_list, + ) + print(time.time() - start_time) + + start_time = time.time() + print(f"Working on the new way with loops reversed with threading parallel.") + fd4 = get_forcing_dict_newway_inverted_parallel( + feature_list, + folder_prefix, + file_list, + para="thread", + para_n=16, + ) + print(time.time() - start_time) + + start_time = time.time() + print(f"Working on the new way with loops reversed with process parallel.") + fd4 = get_forcing_dict_newway_inverted_parallel( + feature_list, + folder_prefix, + file_list, + para="process", + para_n=16, + ) + print(time.time() - start_time) + + +if __name__ == "__main__": + main()