diff --git a/config/config.default.yaml b/config/config.default.yaml index ac838bcfc6..0cd5cf8595 100644 --- a/config/config.default.yaml +++ b/config/config.default.yaml @@ -688,6 +688,12 @@ sector: dh_areas: buffer: 1000 handle_missing_countries: fill + subnodes: + enable: false + n_subnodes: 10 + countries: [] + demand_column: Dem_GWh + label_column: Label heat_pump_sources: urban central: - air diff --git a/config/schema.json b/config/schema.json index f8de41cf8e..4ab45e34bc 100644 --- a/config/schema.json +++ b/config/schema.json @@ -3921,6 +3921,39 @@ "additionalProperties": true, "description": "District heating areas settings.", "type": "object" + }, + "subnodes": { + "description": "Configuration for `sector.district_heating.subnodes` settings.", + "properties": { + "enable": { + "default": false, + "description": "Enable subnodes in district heating sector.", + "type": "boolean" + }, + "n_subnodes": { + "default": 10, + "description": "Number of largest district heating subnodes that are explicitly represented in the network.", + "type": "integer" + }, + "countries": { + "default": [], + "description": "List of country codes to consider for district heating subnodes. If empty, all countries are considered.", + "items": { + "type": "string" + }, + "type": "array" + }, + "demand_column": { + "default": "Dem_GWh", + "description": "Name of the column in the single-system level data to use for subnodes.", + "type": "string" + }, + "label_column": { + "default": "Label", + "description": "Name of the column in the single-system level data to use for subnode labels.", + "type": "string" + } + } } } }, @@ -5966,6 +5999,39 @@ "additionalProperties": true, "description": "District heating areas settings.", "type": "object" + }, + "subnodes": { + "description": "Configuration for `sector.district_heating.subnodes` settings.", + "properties": { + "enable": { + "default": false, + "description": "Enable subnodes in district heating sector.", + "type": "boolean" + }, + "n_subnodes": { + "default": 10, + "description": "Number of largest district heating subnodes that are explicitly represented in the network.", + "type": "integer" + }, + "countries": { + "default": [], + "description": "List of country codes to consider for district heating subnodes. If empty, all countries are considered.", + "items": { + "type": "string" + }, + "type": "array" + }, + "demand_column": { + "default": "Dem_GWh", + "description": "Name of the column in the single-system level data to use for subnodes.", + "type": "string" + }, + "label_column": { + "default": "Label", + "description": "Name of the column in the single-system level data to use for subnode labels.", + "type": "string" + } + } } } }, @@ -7483,6 +7549,39 @@ } } }, + "_SubnodesConfig": { + "description": "Configuration for `sector.district_heating.subnodes` settings.", + "properties": { + "enable": { + "default": false, + "description": "Enable subnodes in district heating sector.", + "type": "boolean" + }, + "n_subnodes": { + "default": 10, + "description": "Number of largest district heating subnodes that are explicitly represented in the network.", + "type": "integer" + }, + "countries": { + "default": [], + "description": "List of country codes to consider for district heating subnodes. If empty, all countries are considered.", + "items": { + "type": "string" + }, + "type": "array" + }, + "demand_column": { + "default": "Dem_GWh", + "description": "Name of the column in the single-system level data to use for subnodes.", + "type": "string" + }, + "label_column": { + "default": "Label", + "description": "Name of the column in the single-system level data to use for subnode labels.", + "type": "string" + } + } + }, "_TechnologyMappingConfig": { "description": "Configuration for `electricity.estimate_renewable_capacities.technology_mapping` settings.", "properties": { @@ -10031,6 +10130,39 @@ "additionalProperties": true, "description": "District heating areas settings.", "type": "object" + }, + "subnodes": { + "description": "Configuration for `sector.district_heating.subnodes` settings.", + "properties": { + "enable": { + "default": false, + "description": "Enable subnodes in district heating sector.", + "type": "boolean" + }, + "n_subnodes": { + "default": 10, + "description": "Number of largest district heating subnodes that are explicitly represented in the network.", + "type": "integer" + }, + "countries": { + "default": [], + "description": "List of country codes to consider for district heating subnodes. If empty, all countries are considered.", + "items": { + "type": "string" + }, + "type": "array" + }, + "demand_column": { + "default": "Dem_GWh", + "description": "Name of the column in the single-system level data to use for subnodes.", + "type": "string" + }, + "label_column": { + "default": "Label", + "description": "Name of the column in the single-system level data to use for subnode labels.", + "type": "string" + } + } } } }, diff --git a/config/test/config.overnight.yaml b/config/test/config.overnight.yaml index e0508de4b7..ac32c37b68 100644 --- a/config/test/config.overnight.yaml +++ b/config/test/config.overnight.yaml @@ -74,6 +74,10 @@ sector: - 10 - 21 district_heating: + subnodes: + enable: true + n_subnodes: 3 + countries: ['BE'] ptes: supplemental_heating: enable: true diff --git a/doc/release_notes.rst b/doc/release_notes.rst index 552e9d1508..32b89dd124 100644 --- a/doc/release_notes.rst +++ b/doc/release_notes.rst @@ -9,6 +9,9 @@ Release Notes Upcoming Release ================ +* Added option to model n largest district heating systems (according to demand in #1516) explicitly. Enable via ``sector: district_heating: + subnodes: enable: true``. + * Remove snakemake's slurm plugin from windows installations (https://github.com/PyPSA/pypsa-eur/pull/2009). * Added Xpress solver configuration options (``xpress-default`` and ``xpress-gpu``) with barrier method settings optimized for large-scale linear programming problems. diff --git a/rules/build_sector.smk b/rules/build_sector.smk index 046eae8243..0b4c441fc6 100755 --- a/rules/build_sector.smk +++ b/rules/build_sector.smk @@ -3,6 +3,14 @@ # SPDX-License-Identifier: MIT +def input_regions_onshore_district_heating(w): + """Return extended regions file when subnodes enabled, else standard regions.""" + if config_provider("sector", "district_heating", "subnodes", "enable")(w): + return resources("regions_onshore_base_s_{clusters}_subnodes.geojson") + else: + return resources("regions_onshore_base_s_{clusters}.geojson") + + rule build_population_layouts: message: "Building population layout data (total, urban, rural) from NUTS3 shapes and World Bank statistics" @@ -205,7 +213,7 @@ rule build_temperature_profiles: drop_leap_day=config_provider("enable", "drop_leap_day"), input: pop_layout=resources("pop_layout_total.nc"), - regions_onshore=resources("regions_onshore_base_s_{clusters}.geojson"), + regions_onshore=input_regions_onshore_district_heating, cutout=lambda w: input_cutout( w, config_provider("sector", "heat_demand_cutout")(w) ), @@ -274,7 +282,7 @@ rule build_central_heating_temperature_profiles: energy_totals_year=config_provider("energy", "energy_totals_year"), input: temp_air_total=resources("temp_air_total_base_s_{clusters}.nc"), - regions_onshore=resources("regions_onshore_base_s_{clusters}.geojson"), + regions_onshore=input_regions_onshore_district_heating, output: central_heating_forward_temperature_profiles=resources( "central_heating_forward_temperature_profiles_base_s_{clusters}_{planning_horizons}.nc" @@ -341,7 +349,7 @@ rule build_geothermal_heat_potential: isi_heat_potentials=rules.retrieve_geothermal_heat_utilisation_potentials.output[ "isi_heat_potentials" ], - regions_onshore=resources("regions_onshore_base_s_{clusters}.geojson"), + regions_onshore=input_regions_onshore_district_heating, lau_regions=rules.retrieve_lau_regions.output["zip"], output: heat_source_power=resources( @@ -413,7 +421,7 @@ rule build_ates_potentials: input: aquifer_shapes_shp=rules.retrieve_aquifer_data_bgr.output["aquifer_shapes"][0], dh_areas=resources("dh_areas_base_s_{clusters}.geojson"), - regions_onshore=resources("regions_onshore_base_s_{clusters}.geojson"), + regions_onshore=input_regions_onshore_district_heating, central_heating_forward_temperature_profiles=resources( "central_heating_forward_temperature_profiles_base_s_{clusters}_{planning_horizons}.nc" ), @@ -477,6 +485,8 @@ def input_hera_data(w) -> dict[str, str]: rule build_river_heat_potential: + message: + "Building river water heat potential for {wildcards.clusters} clusters" params: drop_leap_day=config_provider("enable", "drop_leap_day"), snapshots=config_provider("snapshots"), @@ -486,7 +496,7 @@ rule build_river_heat_potential: enable_heat_source_maps=config_provider("plotting", "enable_heat_source_maps"), input: unpack(input_hera_data), - regions_onshore=resources("regions_onshore_base_s_{clusters}.geojson"), + regions_onshore=input_regions_onshore_district_heating, dh_areas=resources("dh_areas_base_s_{clusters}.geojson"), output: heat_source_power=resources( @@ -610,6 +620,8 @@ def input_seawater_temperature(w) -> dict[str, str]: rule build_sea_heat_potential: + message: + "Building sea water heat potential for {wildcards.clusters} clusters" params: drop_leap_day=config_provider("enable", "drop_leap_day"), snapshots=config_provider("snapshots"), @@ -619,7 +631,7 @@ rule build_sea_heat_potential: input: # seawater_temperature=lambda w: input_seawater_temperature(w), unpack(input_seawater_temperature), - regions_onshore=resources("regions_onshore_base_s_{clusters}.geojson"), + regions_onshore=input_regions_onshore_district_heating, dh_areas=resources("dh_areas_base_s_{clusters}.geojson"), output: heat_source_temperature=resources("temp_sea_water_base_s_{clusters}.nc"), @@ -663,7 +675,7 @@ rule build_cop_profiles: central_heating_return_temperature_profiles=resources( "central_heating_return_temperature_profiles_base_s_{clusters}_{planning_horizons}.nc" ), - regions_onshore=resources("regions_onshore_base_s_{clusters}.geojson"), + regions_onshore=input_regions_onshore_district_heating, output: cop_profiles=resources("cop_profiles_base_s_{clusters}_{planning_horizons}.nc"), resources: @@ -700,7 +712,7 @@ rule build_ptes_operations: central_heating_return_temperature_profiles=resources( "central_heating_return_temperature_profiles_base_s_{clusters}_{planning_horizons}.nc" ), - regions_onshore=resources("regions_onshore_base_s_{clusters}.geojson"), + regions_onshore=input_regions_onshore_district_heating, output: ptes_direct_utilisation_profiles=resources( "ptes_direct_utilisation_profiles_base_s_{clusters}_{planning_horizons}.nc" @@ -722,8 +734,6 @@ rule build_ptes_operations: rule build_direct_heat_source_utilisation_profiles: - message: - "Building direct heat source utilization profiles for industrial applications for {wildcards.clusters} clusters and {wildcards.planning_horizons} planning horizon" params: direct_utilisation_heat_sources=config_provider( "sector", "district_heating", "direct_utilisation_heat_sources" @@ -763,7 +773,7 @@ rule build_solar_thermal_profiles: solar_thermal=config_provider("solar_thermal"), input: pop_layout=resources("pop_layout_total.nc"), - regions_onshore=resources("regions_onshore_base_s_{clusters}.geojson"), + regions_onshore=input_regions_onshore_district_heating, cutout=lambda w: input_cutout(w, config_provider("solar_thermal", "cutout")(w)), output: solar_thermal=resources("solar_thermal_total_base_s_{clusters}.nc"), @@ -822,7 +832,7 @@ if (COUNTRY_HDD_DATASET := dataset_version("country_hdd"))["source"] in ["build" cutouts=["cutouts/europe-1940-2024-era5.nc"], country_shapes=resources("country_shapes.geojson"), output: - era5_hdd=f"{COUNTRY_HDD_DATASET['folder']}/era5-HDD-per-country.csv", + era5_hdd=f"{COUNTRY_HDD_DATASET["folder"]}/era5-HDD-per-country.csv", log: logs("build_country_hdd.log"), benchmark: @@ -837,7 +847,7 @@ rule build_heat_totals: message: "Building heat totals" input: - hdd=f"{COUNTRY_HDD_DATASET['folder']}/era5-HDD-per-country.csv", + hdd=f"{COUNTRY_HDD_DATASET["folder"]}/era5-HDD-per-country.csv", energy_totals=resources("energy_totals.csv"), output: heat_totals=resources("heat_totals.csv"), @@ -1361,8 +1371,8 @@ rule build_transport_demand: "pop_weighted_energy_totals_s_{clusters}.csv" ), transport_data=resources("transport_data.csv"), - traffic_data_KFZ=f"{MOBILITY_PROFILES_DATASET['folder']}/kfz.csv", - traffic_data_Pkw=f"{MOBILITY_PROFILES_DATASET['folder']}/pkw.csv", + traffic_data_KFZ=f"{MOBILITY_PROFILES_DATASET["folder"]}/kfz.csv", + traffic_data_Pkw=f"{MOBILITY_PROFILES_DATASET["folder"]}/pkw.csv", temp_air_total=resources("temp_air_total_base_s_{clusters}.nc"), output: transport_demand=resources("transport_demand_s_{clusters}.csv"), @@ -1439,6 +1449,103 @@ rule build_existing_heating_distribution: "../scripts/build_existing_heating_distribution.py" +rule identify_district_heating_subnodes: + message: + "Identifying district heating subnodes and extending onshore regions for {wildcards.clusters} clusters" + params: + countries=config_provider("countries"), + subnode_countries=config_provider( + "sector", "district_heating", "subnodes", "countries" + ), + n_subnodes=config_provider( + "sector", "district_heating", "subnodes", "n_subnodes" + ), + demand_column=config_provider( + "sector", "district_heating", "subnodes", "demand_column" + ), + label_column=config_provider( + "sector", "district_heating", "subnodes", "label_column" + ), + input: + dh_areas=resources("dh_areas_base_s_{clusters}.geojson"), + regions_onshore=resources("regions_onshore_base_s_{clusters}.geojson"), + output: + dh_subnodes=resources("dh_subnodes_base_s_{clusters}.geojson"), + regions_onshore_extended=resources( + "regions_onshore_base_s_{clusters}_subnodes.geojson" + ), + resources: + mem_mb=2000, + log: + logs("identify_district_heating_subnodes_s_{clusters}.log"), + benchmark: + benchmarks("identify_district_heating_subnodes/s_{clusters}") + script: + "../scripts/identify_district_heating_subnodes.py" + + +rule prepare_district_heating_subnodes: + message: + "Preparing district heating subnode demand data for {wildcards.clusters} clusters and {wildcards.planning_horizons} planning horizon" + params: + district_heating_loss=config_provider( + "sector", "district_heating", "district_heating_loss" + ), + reduce_space_heat_exogenously=config_provider( + "sector", "reduce_space_heat_exogenously" + ), + reduce_space_heat_exogenously_factor=config_provider( + "sector", "reduce_space_heat_exogenously_factor" + ), + energy_totals_year=config_provider("energy", "energy_totals_year"), + input: + dh_subnodes=resources("dh_subnodes_base_s_{clusters}.geojson"), + pop_layout=resources("pop_layout_base_s_{clusters}.csv"), + district_heat_share=resources( + "district_heat_share_base_s_{clusters}_{planning_horizons}.csv" + ), + pop_weighted_energy_totals=resources( + "pop_weighted_energy_totals_s_{clusters}.csv" + ), + pop_weighted_heat_totals=resources("pop_weighted_heat_totals_s_{clusters}.csv"), + heating_efficiencies=resources("heating_efficiencies.csv"), + industrial_demand=resources( + "industrial_energy_demand_base_s_{clusters}_{planning_horizons}.csv" + ), + hourly_heat_demand=resources("hourly_heat_demand_total_base_s_{clusters}.nc"), + heat_dsm_profile=resources( + "residential_heat_dsm_profile_total_base_s_{clusters}.csv" + ), + output: + district_heat_share_subnodes=resources( + "district_heat_share_subnodes_base_s_{clusters}_{planning_horizons}.csv" + ), + pop_weighted_energy_totals_subnodes=resources( + "pop_weighted_energy_totals_subnodes_s_{clusters}_{planning_horizons}.csv" + ), + pop_weighted_heat_totals_subnodes=resources( + "pop_weighted_heat_totals_subnodes_s_{clusters}_{planning_horizons}.csv" + ), + industrial_demand_subnodes=resources( + "industrial_energy_demand_subnodes_base_s_{clusters}_{planning_horizons}.csv" + ), + hourly_heat_demand_subnodes=resources( + "hourly_heat_demand_total_subnodes_base_s_{clusters}_{planning_horizons}.nc" + ), + heat_dsm_profile_subnodes=resources( + "residential_heat_dsm_profile_total_subnodes_base_s_{clusters}_{planning_horizons}.csv" + ), + threads: 1 + resources: + mem_mb=4000, + log: + logs("prepare_district_heating_subnodes_{clusters}_{planning_horizons}.log"), + benchmark: + benchmarks("prepare_district_heating_subnodes/s_{clusters}_{planning_horizons}") + script: + "../scripts/prepare_district_heating_subnodes.py" + + rule time_aggregation: message: "Performing time series aggregation for temporal resolution reduction for {wildcards.clusters} clusters and {wildcards.opts} electric options and {wildcards.sector_opts} sector options" @@ -1596,17 +1703,31 @@ rule prepare_sector_network: ), network=resources("networks/base_s_{clusters}_elec_{opts}.nc"), eurostat=rules.retrieve_eurostat_balances.output["directory"], - pop_weighted_energy_totals=resources( - "pop_weighted_energy_totals_s_{clusters}.csv" + pop_weighted_energy_totals=lambda w: ( + resources( + "pop_weighted_energy_totals_subnodes_s_{clusters}_{planning_horizons}.csv" + ) + if config_provider("sector", "district_heating", "subnodes", "enable")(w) + else resources("pop_weighted_energy_totals_s_{clusters}.csv") + ), + pop_weighted_heat_totals=lambda w: ( + resources( + "pop_weighted_heat_totals_subnodes_s_{clusters}_{planning_horizons}.csv" + ) + if config_provider("sector", "district_heating", "subnodes", "enable")(w) + else resources("pop_weighted_heat_totals_s_{clusters}.csv") ), - pop_weighted_heat_totals=resources("pop_weighted_heat_totals_s_{clusters}.csv"), shipping_demand=resources("shipping_demand_s_{clusters}.csv"), transport_demand=resources("transport_demand_s_{clusters}.csv"), transport_data=resources("transport_data_s_{clusters}.csv"), avail_profile=resources("avail_profile_s_{clusters}.csv"), dsm_profile=resources("dsm_profile_s_{clusters}.csv"), - heat_dsm_profile=resources( - "residential_heat_dsm_profile_total_base_s_{clusters}.csv" + heat_dsm_profile=lambda w: ( + resources( + "residential_heat_dsm_profile_total_subnodes_base_s_{clusters}_{planning_horizons}.csv" + ) + if config_provider("sector", "district_heating", "subnodes", "enable")(w) + else resources("residential_heat_dsm_profile_total_base_s_{clusters}.csv") ), co2_totals_name=resources("co2_totals.csv"), co2=rules.retrieve_ghg_emissions.output["csv"], @@ -1614,7 +1735,7 @@ rule prepare_sector_network: "biomass_potentials_s_{clusters}_{planning_horizons}.csv" ), costs=lambda w: ( - resources(f"costs_{config_provider('costs', 'year')(w)}_processed.csv") + resources(f"costs_{config_provider("costs", "year")(w)}_processed.csv") if config_provider("foresight")(w) == "overnight" else resources("costs_{planning_horizons}_processed.csv") ), @@ -1622,17 +1743,38 @@ rule prepare_sector_network: busmap_s=resources("busmap_base_s.csv"), busmap=resources("busmap_base_s_{clusters}.csv"), clustered_pop_layout=resources("pop_layout_base_s_{clusters}.csv"), - industrial_demand=resources( - "industrial_energy_demand_base_s_{clusters}_{planning_horizons}.csv" + industrial_demand=lambda w: ( + resources( + "industrial_energy_demand_subnodes_base_s_{clusters}_{planning_horizons}.csv" + ) + if config_provider("sector", "district_heating", "subnodes", "enable")(w) + else resources( + "industrial_energy_demand_base_s_{clusters}_{planning_horizons}.csv" + ) ), - hourly_heat_demand_total=resources( - "hourly_heat_demand_total_base_s_{clusters}.nc" + hourly_heat_demand_total=lambda w: ( + resources( + "hourly_heat_demand_total_subnodes_base_s_{clusters}_{planning_horizons}.nc" + ) + if config_provider("sector", "district_heating", "subnodes", "enable")(w) + else resources("hourly_heat_demand_total_base_s_{clusters}.nc") ), industrial_production=resources( "industrial_production_base_s_{clusters}_{planning_horizons}.csv" ), - district_heat_share=resources( - "district_heat_share_base_s_{clusters}_{planning_horizons}.csv" + district_heat_share=lambda w: ( + resources( + "district_heat_share_subnodes_base_s_{clusters}_{planning_horizons}.csv" + ) + if config_provider("sector", "district_heating", "subnodes", "enable")(w) + else resources( + "district_heat_share_base_s_{clusters}_{planning_horizons}.csv" + ) + ), + dh_subnodes=lambda w: ( + resources("dh_subnodes_base_s_{clusters}.geojson") + if config_provider("sector", "district_heating", "subnodes", "enable")(w) + else [] ), heating_efficiencies=resources("heating_efficiencies.csv"), temp_soil_total=resources("temp_soil_total_base_s_{clusters}.nc"), diff --git a/scripts/identify_district_heating_subnodes.py b/scripts/identify_district_heating_subnodes.py new file mode 100644 index 0000000000..b20ba632a6 --- /dev/null +++ b/scripts/identify_district_heating_subnodes.py @@ -0,0 +1,235 @@ +# SPDX-FileCopyrightText: Contributors to PyPSA-Eur +# +# SPDX-License-Identifier: MIT +""" +Selects n largest district heating systems from dh_areas, creates extended +onshore regions, and outputs subnode metadata for downstream rules. +""" + +import logging + +import geopandas as gpd +import pandas as pd + +from scripts._helpers import configure_logging, set_scenario_config + +logger = logging.getLogger(__name__) + + +def _find_containing_region(point, regions): + """ + Find the region containing a point, or the nearest region if not contained. + + Parameters + ---------- + point : shapely.geometry.Point + The point to locate + regions : geopandas.GeoDataFrame + GeoDataFrame of regions with geometry column, indexed by region name + + Returns + ------- + str + Name (index) of the containing or nearest region + """ + containing = regions[regions.contains(point)] + if len(containing) > 0: + return containing.index[0] + distances = regions.geometry.distance(point) + return distances.idxmin() + + +def identify_largest_district_heating_systems( + dh_areas: gpd.GeoDataFrame, + regions_onshore: gpd.GeoDataFrame, + n_subnodes: int, + countries: list[str], + demand_column: str = "Dem_GWh", + label_column: str = "Label", +) -> gpd.GeoDataFrame: + """ + Selects the largest DH systems from dh_areas based on demand, filters by + country, and maps each system to its containing onshore region. + + Parameters + ---------- + dh_areas : geopandas.GeoDataFrame + District heating areas with geometry and demand data + regions_onshore : geopandas.GeoDataFrame + Onshore regions indexed by cluster name + n_subnodes : int + Number of largest DH systems to select + countries : list[str] + List of country codes to filter DH areas + demand_column : str, optional + Column name containing demand values in GWh/a, by default "Dem_GWh" + label_column : str, optional + Column name containing DH system labels, by default "Label" + + Returns + ------- + geopandas.GeoDataFrame + Selected subnodes with columns: name, cluster, subnode_label, + yearly_heat_demand_MWh, country, geometry + """ + if "country" in dh_areas.columns: + dh_areas = dh_areas[dh_areas["country"].isin(countries)].copy() + logger.info(f"Filtered to {len(dh_areas)} DH areas in {countries}") + else: + logger.warning("No 'country' column in dh_areas") + + dh_areas_valid = dh_areas.dropna(subset=[demand_column]) + if len(dh_areas_valid) == 0: + logger.warning("No valid DH areas found") + return gpd.GeoDataFrame() + + subnodes = dh_areas_valid.nlargest(n_subnodes, demand_column).copy() + logger.info( + f"Selected {len(subnodes)} largest DH systems ({subnodes[demand_column].sum():.1f} GWh/a)" + ) + + subnodes["yearly_heat_demand_MWh"] = subnodes[demand_column] * 1e3 + + if subnodes.crs != regions_onshore.crs: + subnodes = subnodes.to_crs(regions_onshore.crs) + + subnodes["centroid"] = subnodes.geometry.centroid + subnodes["cluster"] = subnodes["centroid"].apply( + lambda p: _find_containing_region(p, regions_onshore) + ) + + subnodes["subnode_label"] = subnodes[label_column].fillna("DH").astype(str) + subnodes["subnode_label"] = ( + subnodes["subnode_label"] + .str.replace(r"[^\w\s-]", "", regex=True) + .str.strip() + .str.replace(r"\s+", "_", regex=True) + ) + subnodes["name"] = subnodes["cluster"] + " " + subnodes["subnode_label"] + + # Handle duplicate names + duplicates = subnodes["name"].duplicated(keep=False) + if duplicates.any(): + for name in subnodes.loc[duplicates, "name"].unique(): + mask = subnodes["name"] == name + subnodes.loc[mask, "name"] = [f"{name}_{i}" for i in range(mask.sum())] + + return subnodes[ + [ + "name", + "cluster", + "subnode_label", + "yearly_heat_demand_MWh", + "country", + "geometry", + ] + ].copy() + + +def extend_regions_onshore( + regions_onshore: gpd.GeoDataFrame, + subnodes: gpd.GeoDataFrame, +) -> gpd.GeoDataFrame: + """ + Extend onshore regions with subnode geometries cut from parent clusters. + + For each subnode, cuts its geometry from the parent cluster and adds + the subnode as a new region entry. + + Parameters + ---------- + regions_onshore : geopandas.GeoDataFrame + Original onshore regions indexed by cluster name + subnodes : geopandas.GeoDataFrame + Subnodes with name, cluster, and geometry columns + + Returns + ------- + geopandas.GeoDataFrame + Extended regions with subnodes added and parent geometries updated + """ + if len(subnodes) == 0: + return regions_onshore.copy() + + subnodes_crs = subnodes.to_crs(regions_onshore.crs) + regions_extended = regions_onshore.copy() + + for cluster in subnodes_crs["cluster"].unique(): + cluster_subnodes = subnodes_crs[subnodes_crs["cluster"] == cluster] + subnode_union = cluster_subnodes.union_all() + + if cluster in regions_extended.index: + original_geom = regions_extended.loc[cluster, "geometry"] + regions_extended.loc[cluster, "geometry"] = original_geom.difference( + subnode_union + ) + + subnode_entries = subnodes_crs[["name", "geometry"]].set_index("name") + return pd.concat([regions_extended, subnode_entries]) + + +if __name__ == "__main__": + if "snakemake" not in globals(): + from scripts._helpers import mock_snakemake + + snakemake = mock_snakemake( + "identify_district_heating_subnodes", + clusters=48, + ) + + configure_logging(snakemake) + set_scenario_config(snakemake) + + countries = snakemake.params.get("countries", []) + subnode_countries = snakemake.params.get("subnode_countries", None) + n_subnodes = snakemake.params.get("n_subnodes", 40) + demand_column = snakemake.params.get("demand_column", "Dem_GWh") + label_column = snakemake.params.get("label_column", "Label") + + if subnode_countries: + invalid = set(subnode_countries) - set(countries) + if invalid: + logger.warning(f"Invalid subnode_countries {invalid} ignored") + effective_subnode_countries = [c for c in subnode_countries if c in countries] + else: + effective_subnode_countries = countries + + logger.info(f"Identifying {n_subnodes} subnodes from {effective_subnode_countries}") + + dh_areas = gpd.read_file(snakemake.input.dh_areas) + regions_onshore = gpd.read_file(snakemake.input.regions_onshore).set_index("name") + + subnodes = identify_largest_district_heating_systems( + dh_areas, + regions_onshore, + n_subnodes, + effective_subnode_countries, + demand_column, + label_column, + ) + + if len(subnodes) == 0: + logger.warning("No subnodes identified") + subnodes = gpd.GeoDataFrame( + columns=[ + "name", + "cluster", + "subnode_label", + "yearly_heat_demand_MWh", + "country", + "geometry", + ], + crs=regions_onshore.crs, + ) + + regions_extended = extend_regions_onshore(regions_onshore, subnodes) + + subnodes.to_file(snakemake.output.dh_subnodes, driver="GeoJSON") + regions_extended.to_file( + snakemake.output.regions_onshore_extended, driver="GeoJSON" + ) + + if len(subnodes) > 0: + logger.info( + f"Saved {len(subnodes)} subnodes ({subnodes['yearly_heat_demand_MWh'].sum() / 1e6:.2f} TWh/a)" + ) diff --git a/scripts/lib/validation/config/sector.py b/scripts/lib/validation/config/sector.py index 1c554073b4..a723c0df13 100644 --- a/scripts/lib/validation/config/sector.py +++ b/scripts/lib/validation/config/sector.py @@ -15,6 +15,31 @@ from scripts.lib.validation.config._base import ConfigModel +class _SubnodesConfig(BaseModel): + """Configuration for `sector.district_heating.subnodes` settings.""" + + enable: bool = Field( + False, + description="Enable subnodes in district heating sector.", + ) + n_subnodes: int = Field( + 10, + description="Number of largest district heating subnodes that are explicitly represented in the network.", + ) + countries: list[str] = Field( + [], + description="List of country codes to consider for district heating subnodes. If empty, all countries are considered.", + ) + demand_column: str = Field( + "Dem_GWh", + description="Name of the column in the single-system level data to use for subnodes.", + ) + label_column: str = Field( + "Label", + description="Name of the column in the single-system level data to use for subnode labels.", + ) + + class _DistrictHeatingConfig(ConfigModel): """Configuration for `sector.district_heating` settings.""" @@ -117,6 +142,10 @@ class _DistrictHeatingConfig(ConfigModel): default_factory=lambda: {"buffer": 1000, "handle_missing_countries": "fill"}, description="District heating areas settings.", ) + subnodes: _SubnodesConfig = Field( + default_factory=_SubnodesConfig, + description="Configuration options for explicit representation of largest district heating systems as subnodes.", + ) class _ResidentialHeatDsmConfig(BaseModel): diff --git a/scripts/prepare_district_heating_subnodes.py b/scripts/prepare_district_heating_subnodes.py new file mode 100644 index 0000000000..f039108293 --- /dev/null +++ b/scripts/prepare_district_heating_subnodes.py @@ -0,0 +1,824 @@ +# SPDX-FileCopyrightText: Contributors to PyPSA-Eur +# +# SPDX-License-Identifier: MIT +""" +Prepare district heating subnode demand data for prepare_sector_network. + +Extends energy/heat totals, district heat shares, and time-series data for subnodes. +""" + +import logging + +import geopandas as gpd +import pandas as pd +import xarray as xr + +from scripts._helpers import configure_logging, set_scenario_config + +logger = logging.getLogger(__name__) + + +def _to_scalar(val): + """ + Convert a pandas Series, DataFrame, or array-like to a scalar float. + + Parameters + ---------- + val : scalar, pandas.Series, pandas.DataFrame, or array-like + Value to convert + + Returns + ------- + float + Scalar float value, or 0.0 if input is None + """ + if isinstance(val, pd.Series): + val = val.iloc[0] + if isinstance(val, pd.DataFrame): + val = val.iloc[0, 0] + if hasattr(val, "item"): + val = val.item() + return float(val) if val is not None else 0.0 + + +def _get_row(df, idx): + """ + Get a row from DataFrame as Series, handling duplicate indices. + + Parameters + ---------- + df : pandas.DataFrame + DataFrame to extract row from + idx : hashable + Index label of the row to extract + + Returns + ------- + pandas.Series + Row data as Series. If index is duplicated, returns first occurrence. + """ + result = df.loc[idx] + return result.iloc[0] if isinstance(result, pd.DataFrame) else result + + +def extend_district_heat_share( + district_heat_share: pd.DataFrame, + subnodes: gpd.GeoDataFrame, + pop_weighted_energy_totals: pd.DataFrame, + pop_weighted_heat_totals: pd.DataFrame, + heating_efficiencies: pd.DataFrame, + district_heating_loss: float, + space_heat_reduction: float = 0.0, +) -> pd.DataFrame: + """ + Extend district heat share DataFrame to include subnodes. + + Scales subnode demands to match GIS data. If the sum of subnode demands + exceeds the cluster's district heating demand, subnodes are scaled + proportionally and the parent cluster's demand is set to zero. + + Parameters + ---------- + district_heat_share : pandas.DataFrame + District heat shares indexed by cluster name + subnodes : geopandas.GeoDataFrame + Subnodes with cluster, name, and yearly_heat_demand_MWh columns + pop_weighted_energy_totals : pandas.DataFrame + Population-weighted energy totals by cluster + pop_weighted_heat_totals : pandas.DataFrame + Population-weighted heat totals by cluster + heating_efficiencies : pandas.DataFrame + Heating efficiencies by country code + district_heating_loss : float + District heating network loss factor (e.g., 0.15 for 15% loss) + space_heat_reduction : float, optional + Fraction of space heating demand reduction, by default 0.0 + + Returns + ------- + pandas.DataFrame + Extended district heat share with subnodes added + """ + if len(subnodes) == 0: + return district_heat_share.copy() + + extended_share = district_heat_share.copy() + + if "district fraction before subnodes" not in extended_share.columns: + extended_share["district fraction before subnodes"] = extended_share[ + "district fraction of node" + ].copy() + + extended_share["parent_node"] = extended_share.index.to_series() + + for cluster in subnodes["cluster"].unique(): + cluster_subnodes = subnodes[subnodes["cluster"] == cluster] + + if cluster not in pop_weighted_energy_totals.index: + logger.warning(f"Cluster {cluster} not in energy totals, skipping") + continue + + ct = cluster[:2] + cluster_energy = _get_row(pop_weighted_energy_totals, cluster) + cluster_heat = _get_row(pop_weighted_heat_totals, cluster) + + # Calculate useful heat: space from heat_totals, water from energy_totals + total_useful_heat_mwh = 0.0 + for sector in ["residential", "services"]: + for use in ["water", "space"]: + col = f"total {sector} {use}" + eff_col = f"total {sector} {use} efficiency" + + # Get efficiency + if ( + eff_col in heating_efficiencies.columns + and ct in heating_efficiencies.index + ): + eff = _to_scalar(heating_efficiencies.loc[ct, eff_col]) + else: + eff = 1.0 + + # Space from heat_totals, water from energy_totals + if use == "space": + if col in cluster_heat.index: + val = _to_scalar(cluster_heat[col]) + total_useful_heat_mwh += ( + val * eff * (1 - space_heat_reduction) * 1e6 + ) + else: # water + if col in cluster_energy.index: + val = _to_scalar(cluster_energy[col]) + total_useful_heat_mwh += val * eff * 1e6 + + if total_useful_heat_mwh == 0.0: + logger.warning( + f"Zero heat demand for cluster {cluster}, skipping its subnodes" + ) + continue + + # Get cluster's current district heating demand (before subnodes) + cluster_dist_fraction = extended_share.loc[cluster, "district fraction of node"] + cluster_dh_demand_mwh = ( + cluster_dist_fraction * total_useful_heat_mwh * (1 + district_heating_loss) + ) + + # Sum of all subnode demands for this cluster + total_subnode_demand = cluster_subnodes["yearly_heat_demand_MWh"].sum() + + # Check if aggregate subnode demand exceeds cluster's district heating demand + if total_subnode_demand > cluster_dh_demand_mwh: + scale_factor = cluster_dh_demand_mwh / total_subnode_demand + logger.warning( + f"Cluster {cluster}: Sum of subnode demands ({total_subnode_demand / 1e6:.3f} TWh) " + f"exceeds cluster's district heating demand ({cluster_dh_demand_mwh / 1e6:.3f} TWh). " + f"Scaling subnode demands by {scale_factor:.2%}. Parent node demand set to zero." + ) + # After scaling, all district heating demand goes to subnodes + remaining_cluster_demand_mwh = 0.0 + else: + scale_factor = 1.0 + remaining_cluster_demand_mwh = cluster_dh_demand_mwh - total_subnode_demand + + # Process each subnode in the cluster + for _, subnode in cluster_subnodes.iterrows(): + name = subnode["name"] + demand_mwh = subnode["yearly_heat_demand_MWh"] * scale_factor + + # Calculate district fraction for subnode + # Note: For subnodes, district fraction should be 1.0 because: + # 1. Subnodes are 100% urban central heat (no rural or decentral portion) + # 2. The subnode's demand is already embedded in pop_weighted_energy_totals + # (scaled by extend_pop_weighted_energy_totals) + # 3. Using dist_fraction < 1.0 would double-scale the demand + # + # The "original district heat share" stores the proportion of parent's + # district heating demand that this subnode represents + original_share = demand_mwh / ( + total_useful_heat_mwh * (1 + district_heating_loss) + ) + + # Add entry for subnode with parent_node mapping for resource bus lookups + extended_share.loc[name] = { + "original district heat share": original_share, + "district fraction of node": 1.0, # Subnodes are 100% urban central + "urban fraction": 1.0, # Subnodes are 100% urban by definition + "district fraction before subnodes": 0.0, # New entry, no pre-subnode value + "parent_node": cluster, # Parent cluster for resource bus lookups + } + + # Update parent cluster's district fraction + new_cluster_fraction = remaining_cluster_demand_mwh / ( + total_useful_heat_mwh * (1 + district_heating_loss) + ) + extended_share.loc[cluster, "district fraction of node"] = new_cluster_fraction + + return extended_share + + +def extend_pop_weighted_energy_totals( + pop_weighted_energy_totals: pd.DataFrame, + pop_weighted_heat_totals: pd.DataFrame, + subnodes: gpd.GeoDataFrame, + heating_efficiencies: pd.DataFrame, + district_heating_loss: float, + space_heat_reduction: float = 0.0, +) -> pd.DataFrame: + """ + Extend population-weighted energy totals to include subnodes. + + Creates entries for each subnode by scaling the parent cluster's energy + profile based on the subnode's GIS demand data. Uses heat_totals for space + heating (due to .update() in prepare_sector_network) and energy_totals for + water heating when computing useful heat. + + Parameters + ---------- + pop_weighted_energy_totals : pandas.DataFrame + Population-weighted energy totals indexed by cluster name + pop_weighted_heat_totals : pandas.DataFrame + Population-weighted heat totals indexed by cluster name + subnodes : geopandas.GeoDataFrame + Subnodes with cluster, name, and yearly_heat_demand_MWh columns + heating_efficiencies : pandas.DataFrame + Heating efficiencies by country code + district_heating_loss : float + District heating network loss factor + space_heat_reduction : float, optional + Fraction of space heating demand reduction, by default 0.0 + + Returns + ------- + pandas.DataFrame + Extended energy totals with subnodes added + """ + if len(subnodes) == 0: + return pop_weighted_energy_totals.copy() + + extended_totals = pop_weighted_energy_totals.copy() + + for _, subnode in subnodes.iterrows(): + cluster = subnode["cluster"] + name = subnode["name"] + demand_mwh = subnode["yearly_heat_demand_MWh"] + + if cluster not in extended_totals.index: + logger.warning(f"Cluster {cluster} not found, skipping subnode {name}") + continue + + ct = cluster[:2] + subnode_totals = _get_row(extended_totals, cluster).copy() + + # Calculate useful heat: space from heat_totals, water from energy_totals + cluster_useful_heat = 0.0 + cluster_heat_row = _get_row(pop_weighted_heat_totals, cluster) + cluster_energy_row = _get_row(pop_weighted_energy_totals, cluster) + + for sector in ["residential", "services"]: + for use in ["water", "space"]: + col = f"total {sector} {use}" + eff_col = f"total {sector} {use} efficiency" + + # Get efficiency + if ( + eff_col in heating_efficiencies.columns + and ct in heating_efficiencies.index + ): + eff = _to_scalar(heating_efficiencies.loc[ct, eff_col]) + else: + eff = 1.0 + + # Space comes from heat_totals (after .update()), water from energy_totals + if use == "space": + if col in cluster_heat_row.index: + val = _to_scalar(cluster_heat_row[col]) + cluster_useful_heat += ( + val * eff * (1 - space_heat_reduction) * 1e6 + ) + else: # water + if col in cluster_energy_row.index: + val = _to_scalar(cluster_energy_row[col]) + cluster_useful_heat += val * eff * 1e6 + + if cluster_useful_heat == 0.0: + continue + + # Scale factor: subnode_useful_heat * (1 + dh_loss) = GIS_demand + scale_factor = demand_mwh / (cluster_useful_heat * (1 + district_heating_loss)) + + # Scale all heat-related columns (water, space, heat) + heat_cols = [ + col + for col in subnode_totals.index + if any(x in col for x in ["water", "space", "heat"]) + ] + for col in heat_cols: + cluster_val = _to_scalar(extended_totals.loc[cluster, col]) + subnode_totals[col] = cluster_val * scale_factor + + # Add subnode entry + extended_totals.loc[name] = subnode_totals + + # District fraction is reduced in extend_district_heat_share, not here + + return extended_totals + + +def extend_pop_weighted_heat_totals( + pop_weighted_heat_totals: pd.DataFrame, + pop_weighted_energy_totals: pd.DataFrame, + subnodes: gpd.GeoDataFrame, + heating_efficiencies: pd.DataFrame, + district_heating_loss: float, + space_heat_reduction: float = 0.0, +) -> pd.DataFrame: + """ + Extend population-weighted heat totals to include subnodes. + + Creates entries for each subnode by scaling the parent cluster's heat + profile. Uses the same scaling methodology as extend_pop_weighted_energy_totals + since heat_totals.update() overwrites space columns in energy_totals. + + Parameters + ---------- + pop_weighted_heat_totals : pandas.DataFrame + Population-weighted heat totals indexed by cluster name + pop_weighted_energy_totals : pandas.DataFrame + Population-weighted energy totals indexed by cluster name + subnodes : geopandas.GeoDataFrame + Subnodes with cluster, name, and yearly_heat_demand_MWh columns + heating_efficiencies : pandas.DataFrame + Heating efficiencies by country code + district_heating_loss : float + District heating network loss factor + space_heat_reduction : float, optional + Fraction of space heating demand reduction, by default 0.0 + + Returns + ------- + pandas.DataFrame + Extended heat totals with subnodes added + """ + if len(subnodes) == 0: + return pop_weighted_heat_totals.copy() + + extended_totals = pop_weighted_heat_totals.copy() + + for _, subnode in subnodes.iterrows(): + cluster = subnode["cluster"] + name = subnode["name"] + demand_mwh = subnode["yearly_heat_demand_MWh"] + ct = cluster[:2] + + if cluster not in extended_totals.index: + logger.warning(f"Cluster {cluster} not in heat totals, skipping {name}") + continue + + # Copy cluster's heat profile + subnode_totals = _get_row(extended_totals, cluster).copy() + + # Calculate cluster's total useful heat demand using the SAME formula as + # extend_pop_weighted_energy_totals and prepare_sector_network.build_heat_demand + # This includes: (space * eff * (1-reduction) + water * eff) + cluster_useful_heat = 0.0 + + # Get energy totals for water heating (not in heat_totals) + cluster_energy = _get_row(pop_weighted_energy_totals, cluster) + + for sector in ["residential", "services"]: + for use in ["water", "space"]: + col = f"total {sector} {use}" + eff_col = f"total {sector} {use} efficiency" + + # Get efficiency + if ( + eff_col in heating_efficiencies.columns + and ct in heating_efficiencies.index + ): + eff = _to_scalar(heating_efficiencies.loc[ct, eff_col]) + else: + eff = 1.0 + + # Space comes from heat_totals, water from energy_totals + if use == "space": + if col in subnode_totals.index: + val = _to_scalar(subnode_totals[col]) + cluster_useful_heat += ( + val * eff * (1 - space_heat_reduction) * 1e6 + ) + else: # water + if col in cluster_energy.index: + val = _to_scalar(cluster_energy[col]) + cluster_useful_heat += val * eff * 1e6 + + if cluster_useful_heat == 0.0: + continue + + # Scale factor: subnode_useful_heat * (1 + dh_loss) = GIS_demand + # So: scale * cluster_useful_heat * (1 + dh_loss) = demand_mwh + scale_factor = demand_mwh / (cluster_useful_heat * (1 + district_heating_loss)) + + # Scale all columns in heat_totals + for col in subnode_totals.index: + cluster_val = _to_scalar(extended_totals.loc[cluster, col]) + subnode_totals[col] = cluster_val * scale_factor + + # Add subnode entry + extended_totals.loc[name] = subnode_totals + + # NOTE: We do NOT reduce the cluster's values here! + # The cluster's district fraction is reduced in extend_district_heat_share + # to account for the demand moving to subnodes. + + return extended_totals + + +def extend_industrial_demand( + industrial_demand: pd.DataFrame, + subnodes: gpd.GeoDataFrame, + district_heat_share: pd.DataFrame, + pop_weighted_energy_totals: pd.DataFrame, + heating_efficiencies: pd.DataFrame, + district_heating_loss: float, +) -> pd.DataFrame: + """ + Extend industrial demand to include subnodes based on parent cluster's LT heat ratio. + + Allocates low-temperature heat for industry to subnodes proportionally based on + the ratio of LT industry heat to total district heating demand in the parent cluster. + + Parameters + ---------- + industrial_demand : pandas.DataFrame + Industrial demand by sector indexed by cluster name + subnodes : geopandas.GeoDataFrame + Subnodes with cluster, name, and yearly_heat_demand_MWh columns + district_heat_share : pandas.DataFrame + District heat shares indexed by cluster name + pop_weighted_energy_totals : pandas.DataFrame + Population-weighted energy totals indexed by cluster name + heating_efficiencies : pandas.DataFrame + Heating efficiencies by country code + district_heating_loss : float + District heating network loss factor + + Returns + ------- + pandas.DataFrame + Extended industrial demand with subnodes added and parent values reduced + """ + if len(subnodes) == 0: + return industrial_demand.copy() + + extended_demand = industrial_demand.copy() + + # Group subnodes by cluster + for cluster in subnodes["cluster"].unique(): + cluster_subnodes = subnodes[subnodes["cluster"] == cluster] + + if cluster not in industrial_demand.index: + logger.warning(f"Cluster {cluster} not in industrial demand, skipping") + continue + + if cluster not in pop_weighted_energy_totals.index: + logger.warning(f"Cluster {cluster} not in energy totals, skipping") + continue + + if cluster not in district_heat_share.index: + logger.warning(f"Cluster {cluster} not in district heat share, skipping") + continue + + ct = cluster[:2] + + # Get cluster's LT industry heat demand (TWh) + cluster_lt_heat_twh = _to_scalar( + industrial_demand.loc[cluster, "low-temperature heat"] + ) + + if cluster_lt_heat_twh == 0.0: + # No LT industry heat in this cluster - subnodes only get urban central heat + for _, subnode in cluster_subnodes.iterrows(): + name = subnode["name"] + # Create subnode entry with zeros for all columns + extended_demand.loc[name] = 0.0 + continue + + # Calculate cluster's total heat demand (MWh) + cluster_energy = _get_row(pop_weighted_energy_totals, cluster) + total_heat_demand_mwh = 0.0 + for sector in ["residential", "services"]: + for use in ["water", "space"]: + col = f"total {sector} {use}" + if col in cluster_energy.index: + eff_col = f"total {sector} {use} efficiency" + if ( + eff_col in heating_efficiencies.columns + and ct in heating_efficiencies.index + ): + eff = _to_scalar(heating_efficiencies.loc[ct, eff_col]) + else: + eff = 1.0 + val = _to_scalar(cluster_energy[col]) + total_heat_demand_mwh += val * eff * 1e6 + + if total_heat_demand_mwh == 0.0: + continue + + # Get cluster's district heating demand (MWh) including losses + cluster_dist_fraction = _to_scalar( + district_heat_share.loc[cluster, "district fraction of node"] + ) + cluster_dh_demand_mwh = ( + cluster_dist_fraction * total_heat_demand_mwh * (1 + district_heating_loss) + ) + + if cluster_dh_demand_mwh == 0.0: + continue + + # Compute ratio of LT industry heat to total district heating demand + cluster_lt_heat_mwh = cluster_lt_heat_twh * 1e6 + lt_industry_ratio = cluster_lt_heat_mwh / cluster_dh_demand_mwh + + # Cap at 1.0 (LT industry heat cannot exceed district heating demand) + lt_industry_ratio = min(lt_industry_ratio, 1.0) + + logger.info( + f"Cluster {cluster}: LT industry heat ratio = {lt_industry_ratio:.2%} " + f"(LT={cluster_lt_heat_twh:.3f} TWh, DH={cluster_dh_demand_mwh / 1e6:.3f} TWh)" + ) + + # Sum of subnode demands for capping + total_subnode_demand = cluster_subnodes["yearly_heat_demand_MWh"].sum() + + # Scale factor if subnodes exceed cluster district heating demand + if total_subnode_demand > cluster_dh_demand_mwh: + scale_factor = cluster_dh_demand_mwh / total_subnode_demand + else: + scale_factor = 1.0 + + # Process each subnode + total_subnode_lt_twh = 0.0 + for _, subnode in cluster_subnodes.iterrows(): + name = subnode["name"] + subnode_dh_mwh = subnode["yearly_heat_demand_MWh"] * scale_factor + + # Split subnode district heating demand: part to LT industry + subnode_lt_mwh = subnode_dh_mwh * lt_industry_ratio + subnode_lt_twh = subnode_lt_mwh / 1e6 + + # Create subnode entry with LT heat for industry + # Initialize with zeros for all columns + extended_demand.loc[name] = 0.0 + extended_demand.loc[name, "low-temperature heat"] = subnode_lt_twh + + total_subnode_lt_twh += subnode_lt_twh + + # Reduce parent cluster's LT industry heat + remaining_lt_twh = max(0, cluster_lt_heat_twh - total_subnode_lt_twh) + extended_demand.loc[cluster, "low-temperature heat"] = remaining_lt_twh + + logger.info( + f"Cluster {cluster}: Moved {total_subnode_lt_twh:.3f} TWh LT heat to subnodes, " + f"remaining {remaining_lt_twh:.3f} TWh" + ) + + return extended_demand + + +def extend_hourly_heat_demand( + hourly_heat_demand: xr.Dataset, + subnodes: gpd.GeoDataFrame, +) -> xr.Dataset: + """ + Extend hourly heat demand Dataset to include subnodes. + + Each subnode inherits the hourly heat profile of its parent cluster. + + Parameters + ---------- + hourly_heat_demand : xarray.Dataset + Hourly heat demand profiles with 'node' coordinate + subnodes : geopandas.GeoDataFrame + Subnodes with name and cluster columns + + Returns + ------- + xarray.Dataset + Extended hourly heat demand with subnodes added + """ + if len(subnodes) == 0: + return hourly_heat_demand + + # The dataset uses 'node' as the coordinate name + base_nodes = pd.Index(hourly_heat_demand.coords["node"].values) + + # Build list of new datasets for subnodes + new_datasets = [] + for _, subnode in subnodes.iterrows(): + name = subnode["name"] + cluster = subnode["cluster"] + if cluster in base_nodes: + # Copy parent's hourly profile for all variables + parent_data = hourly_heat_demand.sel(node=cluster) + new_datasets.append(parent_data.assign_coords(node=name)) + + if not new_datasets: + return hourly_heat_demand + + # Concatenate + new_data = xr.concat(new_datasets, dim="node") + result = xr.concat([hourly_heat_demand, new_data], dim="node") + + # Ensure clean coordinate + result_nodes = [str(n) for n in result.coords["node"].values] + result = result.assign_coords(node=result_nodes) + + return result + + +def extend_heat_dsm_profile( + dsm_profile: pd.DataFrame, + subnodes: gpd.GeoDataFrame, +) -> pd.DataFrame: + """ + Extend heat demand-side management profile to include subnodes. + + Each subnode inherits the DSM profile of its parent cluster. + + Parameters + ---------- + dsm_profile : pandas.DataFrame + Heat DSM profiles with cluster names as columns + subnodes : geopandas.GeoDataFrame + Subnodes with name and cluster columns + + Returns + ------- + pandas.DataFrame + Extended DSM profile with subnodes added as new columns + """ + if len(subnodes) == 0: + return dsm_profile.copy() + + extended = dsm_profile.copy() + base_nodes = dsm_profile.columns + + for _, subnode in subnodes.iterrows(): + name = subnode["name"] + cluster = subnode["cluster"] + if cluster in base_nodes: + extended[name] = dsm_profile[cluster] + + return extended + + +if __name__ == "__main__": + if "snakemake" not in globals(): + from scripts._helpers import mock_snakemake + + snakemake = mock_snakemake( + "prepare_district_heating_subnodes", + clusters=48, + planning_horizons=2050, + ) + + configure_logging(snakemake) # pylint: disable=E0606 + set_scenario_config(snakemake) + + # Load configuration + district_heating_loss = snakemake.params.get("district_heating_loss", 0.15) + energy_totals_year = int(snakemake.params.get("energy_totals_year", 2019)) + + # Get space heat reduction factor + investment_year = int(snakemake.wildcards.planning_horizons) + reduce_space_heat = snakemake.params.get("reduce_space_heat_exogenously", False) + if reduce_space_heat: + reduction_factors = snakemake.params.get( + "reduce_space_heat_exogenously_factor", {} + ) + # Get the value for the current investment year, or closest available + if isinstance(reduction_factors, dict): + if investment_year in reduction_factors: + space_heat_reduction = reduction_factors[investment_year] + else: + # Find the closest year + years = sorted(reduction_factors.keys()) + closest_year = min(years, key=lambda y: abs(y - investment_year)) + space_heat_reduction = reduction_factors[closest_year] + else: + space_heat_reduction = float(reduction_factors) + else: + space_heat_reduction = 0.0 + + logger.info(f"Using space heat reduction factor: {space_heat_reduction}") + logger.info(f"Using energy totals year: {energy_totals_year}") + + # Load input data + subnodes = gpd.read_file(snakemake.input.dh_subnodes) + + if len(subnodes) == 0: + logger.info("No subnodes found, creating pass-through outputs") + else: + logger.info(f"Processing {len(subnodes)} subnodes") + + pop_layout = pd.read_csv(snakemake.input.pop_layout, index_col=0) + district_heat_share = pd.read_csv(snakemake.input.district_heat_share, index_col=0) + pop_weighted_energy_totals = pd.read_csv( + snakemake.input.pop_weighted_energy_totals, index_col=0 + ) + pop_weighted_heat_totals = pd.read_csv( + snakemake.input.pop_weighted_heat_totals, index_col=0 + ) + # Load heating efficiencies for the energy_totals_year (same as prepare_sector_network) + heating_efficiencies = pd.read_csv( + snakemake.input.heating_efficiencies, index_col=[1, 0] + ).loc[energy_totals_year] + industrial_demand = pd.read_csv(snakemake.input.industrial_demand, index_col=0) + + # Load time-series data + hourly_heat_demand = xr.open_dataset(snakemake.input.hourly_heat_demand) + dsm_profile = pd.read_csv( + snakemake.input.heat_dsm_profile, header=1, index_col=0, parse_dates=True + ) + + # Extend district heat share (handles demand capping and adds parent_node column) + district_heat_share_extended = extend_district_heat_share( + district_heat_share=district_heat_share, + subnodes=subnodes, + pop_weighted_energy_totals=pop_weighted_energy_totals, + pop_weighted_heat_totals=pop_weighted_heat_totals, + heating_efficiencies=heating_efficiencies, + district_heating_loss=district_heating_loss, + space_heat_reduction=space_heat_reduction, + ) + + # Extend population-weighted energy totals + pop_weighted_energy_totals_extended = extend_pop_weighted_energy_totals( + pop_weighted_energy_totals=pop_weighted_energy_totals, + pop_weighted_heat_totals=pop_weighted_heat_totals, + subnodes=subnodes, + heating_efficiencies=heating_efficiencies, + district_heating_loss=district_heating_loss, + space_heat_reduction=space_heat_reduction, + ) + + # Extend population-weighted heat totals (same scaling as energy_totals) + pop_weighted_heat_totals_extended = extend_pop_weighted_heat_totals( + pop_weighted_heat_totals=pop_weighted_heat_totals, + pop_weighted_energy_totals=pop_weighted_energy_totals, + subnodes=subnodes, + heating_efficiencies=heating_efficiencies, + district_heating_loss=district_heating_loss, + space_heat_reduction=space_heat_reduction, + ) + + # Extend industrial demand (LT heat for industry in subnodes) + industrial_demand_extended = extend_industrial_demand( + industrial_demand=industrial_demand, + subnodes=subnodes, + district_heat_share=district_heat_share, # Use ORIGINAL + pop_weighted_energy_totals=pop_weighted_energy_totals, + heating_efficiencies=heating_efficiencies, + district_heating_loss=district_heating_loss, + ) + + # Extend time-series data + hourly_heat_demand_extended = extend_hourly_heat_demand( + hourly_heat_demand, subnodes + ) + dsm_profile_extended = extend_heat_dsm_profile(dsm_profile, subnodes) + + # Save outputs + district_heat_share_extended.to_csv(snakemake.output.district_heat_share_subnodes) + logger.info( + f"Saved extended district heat share to {snakemake.output.district_heat_share_subnodes}" + ) + + pop_weighted_energy_totals_extended.to_csv( + snakemake.output.pop_weighted_energy_totals_subnodes + ) + pop_weighted_heat_totals_extended.to_csv( + snakemake.output.pop_weighted_heat_totals_subnodes + ) + + industrial_demand_extended.to_csv(snakemake.output.industrial_demand_subnodes) + logger.info( + f"Saved extended industrial demand to {snakemake.output.industrial_demand_subnodes}" + ) + + # Save extended time-series + hourly_heat_demand_extended.to_netcdf(snakemake.output.hourly_heat_demand_subnodes) + logger.info( + f"Saved extended hourly heat demand to {snakemake.output.hourly_heat_demand_subnodes}" + ) + + dsm_profile_extended.to_csv(snakemake.output.heat_dsm_profile_subnodes) + logger.info( + f"Saved extended DSM profile to {snakemake.output.heat_dsm_profile_subnodes}" + ) + + # Log summary + if len(subnodes) > 0: + total_subnode_demand = subnodes["yearly_heat_demand_MWh"].sum() + logger.info( + f"Processed {len(subnodes)} subnodes with total demand of {total_subnode_demand / 1e6:.2f} TWh/a" + ) diff --git a/scripts/prepare_sector_network.py b/scripts/prepare_sector_network.py index 0c4184b01f..4530b6bf3e 100755 --- a/scripts/prepare_sector_network.py +++ b/scripts/prepare_sector_network.py @@ -49,16 +49,29 @@ logger = logging.getLogger(__name__) -def define_spatial(nodes, options): +def define_spatial(nodes, options, district_heating_nodes=None): """ Namespace for spatial. Parameters ---------- - nodes : list-like + nodes : pd.Index + Base/parent nodes (from pop_layout.index). These are the main cluster nodes + where resource buses (gas, biomass, etc.) and generation are located. + options : dict + Sector options dictionary + district_heating_nodes : pd.Index, optional + Extended nodes including subnodes for district heating (from district_heat_info.index). + If None, defaults to nodes (no subnodes case). """ + # If no subnodes provided, district_heating_nodes equals nodes + if district_heating_nodes is None: + district_heating_nodes = nodes - spatial.nodes = nodes + spatial.nodes = nodes # Parent/base nodes only + spatial.district_heating_nodes = ( + district_heating_nodes # Includes subnodes if enabled + ) # biomass @@ -1212,10 +1225,23 @@ def add_methanol_reforming_cc(n, costs): ) -def add_dac(n, costs): +def add_dac(n, costs, pop_layout, district_heat_info): heat_carriers = ["urban central heat", "services urban decentral heat"] heat_buses = n.buses.index[n.buses.carrier.isin(heat_carriers)] - locations = n.buses.location[heat_buses] + locations_incl_subnodes = n.buses.location[heat_buses] + # Map to parent clusters for CO2 bus lookup + # Use parent_node from district_heat_info if available; otherwise nodes are their own parent + if "parent_node" in district_heat_info.columns: + locations = pd.Index( + [ + district_heat_info.loc[loc, "parent_node"] + if loc in district_heat_info.index + else loc + for loc in locations_incl_subnodes + ] + ) + else: + locations = locations_incl_subnodes electricity_input = ( costs.at["direct air capture", "electricity-input"] @@ -1924,16 +1950,6 @@ def add_storage_and_grids( logger.info( "Add natural gas infrastructure, incl. LNG terminals, production, storage and entry-points." ) - - add_carrier_buses( - n=n, - carrier="gas", - costs=costs, - spatial=spatial, - options=options, - cf_industry=None, - ) - gas_pipes = pd.read_csv(clustered_gas_network_file, index_col=0) if options["H2_retrofit"]: @@ -2323,8 +2339,13 @@ def add_EVs( unit="MWh_el", ) + # Ensure p_set columns align with spatial.nodes + p_set = p_set.reindex(columns=spatial.nodes) + # Calculate temperature-corrected efficiency car_efficiency = options["transport_electric_efficiency"] + # Reindex temperature to spatial.nodes to ensure alignment + temperature = temperature.reindex(columns=spatial.nodes) efficiency = get_temp_efficency( car_efficiency, temperature, @@ -2720,32 +2741,9 @@ def build_heat_demand( n, hourly_heat_demand_file, pop_weighted_energy_totals, heating_efficiencies ): """ - Build heat demand time series and adjust electricity load to account for electric heating. - - Parameters - ---------- - n : pypsa.Network - The PyPSA network container object - hourly_heat_demand_file : str - Path to netCDF file containing hourly heat demand data - pop_weighted_energy_totals : pd.DataFrame - Population-weighted energy totals containing columns for total and - electricity consumption for different sectors and uses - heating_efficiencies : dict - Dictionary mapping sector and use combinations to their heating efficiencies - - Returns - ------- - pd.DataFrame - Heat demand time series with hierarchical columns for different sectors - and uses (residential/services, water/space) + Build heat demand time series and adjust electricity load for electric heating. - Notes - ----- - The function: - - Constructs heat demand profiles for different sectors and uses - - Adjusts the electricity load profiles by subtracting electric heating - - Modifies the network object in-place by updating n.loads_t.p_set + When subnodes are enabled, input files already include subnode profiles. """ heat_demand_shape = ( xr.open_dataset(hourly_heat_demand_file).to_dataframe().unstack(level=1) @@ -2775,11 +2773,16 @@ def build_heat_demand( electric_heat_supply = pd.concat(electric_heat_supply, axis=1) # subtract from electricity load since heat demand already in heat_demand + # Only subtract for nodes that exist in n.loads (base nodes, not subnodes) electric_nodes = n.loads.index[n.loads.carrier == "electricity"] - n.loads_t.p_set[electric_nodes] = ( - n.loads_t.p_set[electric_nodes] - - electric_heat_supply.T.groupby(level=1).sum().T[electric_nodes] + existing_nodes = electric_heat_supply.columns.get_level_values(1).intersection( + electric_nodes ) + if len(existing_nodes) > 0: + n.loads_t.p_set[existing_nodes] = ( + n.loads_t.p_set[existing_nodes] + - electric_heat_supply.T.groupby(level=1).sum().T[existing_nodes] + ) return heat_demand @@ -2797,7 +2800,7 @@ def add_heat( ates_recovery_factor: float, enable_ates: bool, ates_marginal_cost_charger: float, - district_heat_share_file: str, + district_heat_info: pd.DataFrame, solar_thermal_total_file: str, retro_cost_file: str, floor_area_file: str, @@ -2887,7 +2890,6 @@ def add_heat( cop = xr.open_dataarray(cop_profiles_file) direct_heat_profile = xr.open_dataarray(direct_heat_source_utilisation_profile_file) - district_heat_info = pd.read_csv(district_heat_share_file, index_col=0) dist_fraction = district_heat_info["district fraction of node"] urban_fraction = district_heat_info["urban fraction"] @@ -2916,16 +2918,25 @@ def add_heat( heat_system.central_or_decentral ] if heat_system == HeatSystem.URBAN_CENTRAL: - nodes = dist_fraction.index[dist_fraction > 0] + heat_nodes = dist_fraction.index[dist_fraction > 0] + # Map subnodes to their parent cluster for resource/electricity buses + if "parent_node" in district_heat_info.columns: + parent_of_subnode = pd.Series( + district_heat_info.loc[heat_nodes, "parent_node"].values, + index=heat_nodes, + ) + else: + parent_of_subnode = pd.Series(heat_nodes, index=heat_nodes) else: - nodes = pop_layout.index + heat_nodes = pop_layout.index + parent_of_subnode = pd.Series(heat_nodes, index=heat_nodes) n.add("Carrier", f"{heat_system} heat") n.add( "Bus", - nodes + f" {heat_system.value} heat", - location=nodes, + heat_nodes + f" {heat_system.value} heat", + location=heat_nodes, carrier=f"{heat_system.value} heat", unit="MWh_th", ) @@ -2934,9 +2945,9 @@ def add_heat( if options["heat_vent"][heat_system.system_type.value]: n.add( "Generator", - nodes + f" {heat_system} heat vent", - bus=nodes + f" {heat_system} heat", - location=nodes, + heat_nodes + f" {heat_system} heat vent", + bus=heat_nodes + f" {heat_system} heat", + location=heat_nodes, carrier=f"{heat_system} heat vent", p_nom_extendable=True, p_max_pu=0, @@ -2947,7 +2958,8 @@ def add_heat( ## Add heat load factor = heat_system.heat_demand_weighting( - urban_fraction=urban_fraction[nodes], dist_fraction=dist_fraction[nodes] + urban_fraction=urban_fraction[heat_nodes], + dist_fraction=dist_fraction[heat_nodes], ) if heat_system != HeatSystem.URBAN_CENTRAL: heat_load = ( @@ -2959,25 +2971,22 @@ def add_heat( ] .T.groupby(level=1) .sum() - .T[nodes] + .T[heat_nodes] .multiply(factor) ) else: - heat_load = ( - heat_demand.T.groupby(level=1) - .sum() - .T[nodes] - .multiply( - factor * (1 + options["district_heating"]["district_heating_loss"]) - ) + # Urban central heat_demand includes subnodes if enabled + heat_demand_grouped = heat_demand.T.groupby(level=1).sum().T + heat_load = heat_demand_grouped[heat_nodes].multiply( + factor * (1 + options["district_heating"]["district_heating_loss"]) ) n.add( "Load", - nodes, + heat_nodes, suffix=f" {heat_system} heat", - bus=nodes + f" {heat_system} heat", + bus=heat_nodes + f" {heat_system} heat", carrier=f"{heat_system} heat", p_set=heat_load.loc[n.snapshots], ) @@ -2988,23 +2997,22 @@ def add_heat( HeatSystem.URBAN_CENTRAL, ]: factor = heat_system.heat_demand_weighting( - urban_fraction=urban_fraction[nodes], dist_fraction=dist_fraction[nodes] + urban_fraction=urban_fraction[heat_nodes], + dist_fraction=dist_fraction[heat_nodes], ) - heat_dsm_profile = pd.read_csv( + heat_dsm_profile_raw = pd.read_csv( heat_dsm_profile_file, header=1, index_col=0, parse_dates=True, - )[nodes].reindex(n.snapshots) + ) + heat_dsm_profile = heat_dsm_profile_raw[heat_nodes].reindex(n.snapshots) - e_nom = ( - heat_demand[["residential space"]] - .T.groupby(level=1) - .sum() - .T[nodes] - .multiply(factor) + heat_demand_res_space = ( + heat_demand[["residential space"]].T.groupby(level=1).sum().T ) + e_nom = heat_demand_res_space[heat_nodes].multiply(factor) heat_dsm_restriction_value = options["residential_heat"]["dsm"][ "restriction_value" @@ -3022,9 +3030,9 @@ def add_heat( # Thermal (standing) losses of buildings assumed to be the same as decentralized water tanks n.add( "Store", - nodes, + heat_nodes, suffix=f" {heat_system} heat dsm", - bus=nodes + f" {heat_system} heat", + bus=heat_nodes + f" {heat_system} heat", carrier=f"{heat_system} heat dsm", standing_loss=costs.at[ "decentral water tank storage", "standing_losses" @@ -3043,8 +3051,8 @@ def add_heat( n.add( "Bus", - nodes + f" {heat_system} water tanks", - location=nodes, + heat_nodes + f" {heat_system} water tanks", + location=heat_nodes, carrier=f"{heat_system} water tanks", unit="MWh_th", ) @@ -3056,10 +3064,10 @@ def add_heat( n.add( "Link", - nodes, + heat_nodes, suffix=f" {heat_system} water tanks charger", - bus0=nodes + f" {heat_system} heat", - bus1=nodes + f" {heat_system} water tanks", + bus0=heat_nodes + f" {heat_system} heat", + bus1=heat_nodes + f" {heat_system} water tanks", efficiency=costs.at[ heat_system.central_or_decentral + " water tank charger", "efficiency", @@ -3074,10 +3082,10 @@ def add_heat( n.add( "Link", - nodes, + heat_nodes, suffix=f" {heat_system} water tanks discharger", - bus0=nodes + f" {heat_system} water tanks", - bus1=nodes + f" {heat_system} heat", + bus0=heat_nodes + f" {heat_system} water tanks", + bus1=heat_nodes + f" {heat_system} heat", carrier=f"{heat_system} water tanks discharger", efficiency=costs.at[ heat_system.central_or_decentral + " water tank discharger", @@ -3090,14 +3098,15 @@ def add_heat( ) n.links.loc[ - nodes + f" {heat_system} water tanks charger", "energy to power ratio" + heat_nodes + f" {heat_system} water tanks charger", + "energy to power ratio", ] = energy_to_power_ratio_water_tanks n.add( "Store", - nodes, + heat_nodes, suffix=f" {heat_system} water tanks", - bus=nodes + f" {heat_system} water tanks", + bus=heat_nodes + f" {heat_system} water tanks", e_cyclic=True, e_nom_extendable=True, carrier=f"{heat_system} water tanks", @@ -3120,8 +3129,8 @@ def add_heat( n.add( "Bus", - nodes + f" {heat_system} water pits", - location=nodes, + heat_nodes + f" {heat_system} water pits", + location=heat_nodes, carrier=f"{heat_system} water pits", unit="MWh_th", ) @@ -3132,10 +3141,10 @@ def add_heat( n.add( "Link", - nodes, + heat_nodes, suffix=f" {heat_system} water pits charger", - bus0=nodes + f" {heat_system} heat", - bus1=nodes + f" {heat_system} water pits", + bus0=heat_nodes + f" {heat_system} heat", + bus1=heat_nodes + f" {heat_system} water pits", efficiency=costs.at[ "central water pit charger", "efficiency", @@ -3151,9 +3160,9 @@ def add_heat( if options["district_heating"]["ptes"]["supplemental_heating"][ "enable" ]: + ptes_supp_data = xr.open_dataarray(ptes_direct_utilisation_profile) ptes_supplemental_heating_required = ( - xr.open_dataarray(ptes_direct_utilisation_profile) - .sel(name=nodes) + ptes_supp_data.sel(name=heat_nodes) .to_pandas() .reindex(index=n.snapshots) ) @@ -3162,10 +3171,10 @@ def add_heat( n.add( "Link", - nodes, + heat_nodes, suffix=f" {heat_system} water pits discharger", - bus0=nodes + f" {heat_system} water pits", - bus1=nodes + f" {heat_system} heat", + bus0=heat_nodes + f" {heat_system} water pits", + bus1=heat_nodes + f" {heat_system} heat", carrier=f"{heat_system} water pits discharger", efficiency=costs.at[ "central water pit discharger", @@ -3176,15 +3185,15 @@ def add_heat( lifetime=costs.at["central water pit storage", "lifetime"], ) n.links.loc[ - nodes + f" {heat_system} water pits charger", + heat_nodes + f" {heat_system} water pits charger", "energy to power ratio", ] = energy_to_power_ratio_water_pit if options["district_heating"]["ptes"]["dynamic_capacity"]: - # Load pre-calculated e_max_pu profiles + # Load pre-calculated e_max_pu profiles (pre-extended for subnodes) e_max_pu_data = xr.open_dataarray(ptes_e_max_pu_file) e_max_pu = ( - e_max_pu_data.sel(name=nodes) + e_max_pu_data.sel(name=heat_nodes) .to_pandas() .reindex(index=n.snapshots) ) @@ -3193,9 +3202,9 @@ def add_heat( n.add( "Store", - nodes, + heat_nodes, suffix=f" {heat_system} water pits", - bus=nodes + f" {heat_system} water pits", + bus=heat_nodes + f" {heat_system} water pits", e_cyclic=True, e_nom_extendable=True, e_max_pu=e_max_pu, @@ -3213,17 +3222,17 @@ def add_heat( n.add( "Bus", - nodes + f" {heat_system} aquifer thermal energy storage", - location=nodes, + heat_nodes + f" {heat_system} aquifer thermal energy storage", + location=heat_nodes, carrier=f"{heat_system} aquifer thermal energy storage", unit="MWh_th", ) n.add( "Link", - nodes + f" {heat_system} aquifer thermal energy storage charger", - bus0=nodes + f" {heat_system} heat", - bus1=nodes + f" {heat_system} aquifer thermal energy storage", + heat_nodes + f" {heat_system} aquifer thermal energy storage charger", + bus0=heat_nodes + f" {heat_system} heat", + bus1=heat_nodes + f" {heat_system} aquifer thermal energy storage", efficiency=1.0, carrier=f"{heat_system} aquifer thermal energy storage charger", p_nom_extendable=True, @@ -3236,9 +3245,10 @@ def add_heat( n.add( "Link", - nodes + f" {heat_system} aquifer thermal energy storage discharger", - bus1=nodes + f" {heat_system} heat", - bus0=nodes + f" {heat_system} aquifer thermal energy storage", + heat_nodes + + f" {heat_system} aquifer thermal energy storage discharger", + bus1=heat_nodes + f" {heat_system} heat", + bus0=heat_nodes + f" {heat_system} aquifer thermal energy storage", efficiency=1.0, carrier=f"{heat_system} aquifer thermal energy storage discharger", p_nom_extendable=True, @@ -3248,15 +3258,17 @@ def add_heat( / 2, ) - ates_e_nom_max = pd.read_csv(ates_e_nom_max, index_col=0)["ates_potential"] + ates_e_nom_max_data = pd.read_csv(ates_e_nom_max, index_col=0)[ + "ates_potential" + ] n.add( "Store", - nodes, + heat_nodes, suffix=f" {heat_system} aquifer thermal energy storage", - bus=nodes + f" {heat_system} aquifer thermal energy storage", + bus=heat_nodes + f" {heat_system} aquifer thermal energy storage", e_cyclic=True, e_nom_extendable=True, - e_nom_max=ates_e_nom_max[nodes], + e_nom_max=ates_e_nom_max_data[heat_nodes], carrier=f"{heat_system} aquifer thermal energy storage", standing_loss=1 - ates_recovery_factor ** (1 / 8760), lifetime=costs.at["central geothermal heat source", "lifetime"], @@ -3266,32 +3278,34 @@ def add_heat( for heat_source in params.heat_pump_sources[heat_system.system_type.value]: costs_name_heat_pump = heat_system.heat_pump_costs_name(heat_source) - cop_heat_pump = ( - cop.sel( - heat_system=heat_system.system_type.value, - heat_source=heat_source, - name=nodes, + if options["time_dep_hp_cop"]: + cop_heat_pump = ( + cop.sel( + heat_system=heat_system.system_type.value, + heat_source=heat_source, + name=heat_nodes, + ) + .to_pandas() + .reindex(index=n.snapshots) ) - .to_pandas() - .reindex(index=n.snapshots) - if options["time_dep_hp_cop"] - else costs.loc[[costs_name_heat_pump], ["efficiency"]] - ) + else: + cop_heat_pump = costs.loc[[costs_name_heat_pump], ["efficiency"]] if heat_source in params.limited_heat_sources: - # get potential - p_max_source = pd.read_csv( + p_max_source_raw = pd.read_csv( heat_source_profile_files[heat_source], index_col=0, parse_dates=True, - ).squeeze()[nodes] + ).squeeze() - # if only dimension is nodes, convert series to dataframe with columns as nodes and index as snapshots + p_max_source = p_max_source_raw[heat_nodes] + + # if only dimension is heat_nodes, convert series to dataframe with columns as heat_nodes and index as snapshots if p_max_source.ndim == 1: p_max_source = pd.DataFrame( [p_max_source] * len(n.snapshots), index=n.snapshots, - columns=nodes, + columns=heat_nodes, ) # add resource @@ -3299,8 +3313,8 @@ def add_heat( n.add("Carrier", heat_carrier) n.add( "Bus", - nodes, - location=nodes, + heat_nodes, + location=heat_nodes, suffix=f" {heat_carrier}", carrier=heat_carrier, ) @@ -3326,9 +3340,9 @@ def add_heat( n.add( "Generator", - nodes, + heat_nodes, suffix=f" {heat_carrier}", - bus=nodes + f" {heat_carrier}", + bus=heat_nodes + f" {heat_carrier}", carrier=heat_carrier, p_nom_extendable=True, capital_cost=capital_cost, @@ -3339,11 +3353,11 @@ def add_heat( # add heat pump converting source heat + electricity to urban central heat n.add( "Link", - nodes, + heat_nodes, suffix=f" {heat_system} {heat_source} heat pump", - bus0=nodes + f" {heat_system} heat", - bus1=nodes, - bus2=nodes + f" {heat_carrier}", + bus0=heat_nodes + f" {heat_system} heat", + bus1=parent_of_subnode.values, + bus2=heat_nodes + f" {heat_carrier}", carrier=f"{heat_system} {heat_source} heat pump", efficiency=(1 / cop_heat_pump.clip(lower=0.001)).squeeze(), efficiency2=(1 - (1 / cop_heat_pump.clip(lower=0.001))).squeeze(), @@ -3358,11 +3372,12 @@ def add_heat( ) if heat_source in params.direct_utilisation_heat_sources: - # 1 if source temperature exceeds forward temperature, 0 otherwise: + # Direct heat utilisation profiles already include subnode-specific data + # (computed from temperature profiles that use extended regions file) efficiency_direct_utilisation = ( direct_heat_profile.sel( heat_source=heat_source, - name=nodes, + name=heat_nodes, ) .to_pandas() .reindex(index=n.snapshots) @@ -3370,10 +3385,10 @@ def add_heat( # add link for direct usage of heat source when source temperature exceeds forward temperature n.add( "Link", - nodes, + heat_nodes, suffix=f" {heat_system} {heat_source} heat direct utilisation", - bus0=nodes + f" {heat_carrier}", - bus1=nodes + f" {heat_system} heat", + bus0=heat_nodes + f" {heat_carrier}", + bus1=heat_nodes + f" {heat_system} heat", efficiency=efficiency_direct_utilisation, carrier=f"{heat_system} {heat_source} heat direct utilisation", p_nom_extendable=True, @@ -3402,11 +3417,11 @@ def add_heat( ): n.add( "Link", - nodes, + heat_nodes, suffix=f" {heat_system} {heat_source} heat pump", - bus0=nodes + f" {heat_system} heat", - bus1=nodes, - bus2=nodes + f" {heat_system} water pits", + bus0=heat_nodes + f" {heat_system} heat", + bus1=parent_of_subnode.values, + bus2=heat_nodes + f" {heat_system} water pits", carrier=f"{heat_system} {heat_source} heat pump", efficiency=(1 / (cop_heat_pump - 1).clip(lower=0.001)).squeeze(), efficiency2=(1 - 1 / cop_heat_pump.clip(lower=0.001)).squeeze(), @@ -3423,10 +3438,10 @@ def add_heat( else: n.add( "Link", - nodes, + heat_nodes, suffix=f" {heat_system} {heat_source} heat pump", - bus0=nodes + f" {heat_system} heat", - bus1=nodes, + bus0=heat_nodes + f" {heat_system} heat", + bus1=parent_of_subnode.values, carrier=f"{heat_system} {heat_source} heat pump", efficiency=(1 / cop_heat_pump.clip(lower=0.001)).squeeze(), capital_cost=costs.at[costs_name_heat_pump, "capital_cost"] @@ -3444,9 +3459,9 @@ def add_heat( n.add( "Link", - nodes + f" {heat_system} resistive heater", - bus0=nodes, - bus1=nodes + f" {heat_system} heat", + heat_nodes + f" {heat_system} resistive heater", + bus0=parent_of_subnode.values, + bus1=heat_nodes + f" {heat_system} heat", carrier=f"{heat_system} resistive heater", efficiency=costs.at[key, "efficiency"], capital_cost=costs.at[key, "efficiency"] @@ -3459,12 +3474,17 @@ def add_heat( if options["boilers"]: key = f"{heat_system.central_or_decentral} gas boiler" + # Map subnodes to parent cluster gas buses + gas_bus_for_subnodes = spatial.gas.df.loc[ + parent_of_subnode.values, "nodes" + ].values + n.add( "Link", - nodes + f" {heat_system} gas boiler", + heat_nodes + f" {heat_system} gas boiler", p_nom_extendable=True, - bus0=spatial.gas.df.loc[nodes, "nodes"].values, - bus1=nodes + f" {heat_system} heat", + bus0=gas_bus_for_subnodes, + bus1=heat_nodes + f" {heat_system} heat", bus2="co2 atmosphere", carrier=f"{heat_system} gas boiler", efficiency=costs.at[key, "efficiency"], @@ -3480,16 +3500,16 @@ def add_heat( n.add( "Generator", - nodes, + heat_nodes, suffix=f" {heat_system} solar thermal collector", - bus=nodes + f" {heat_system} heat", + bus=heat_nodes + f" {heat_system} heat", carrier=f"{heat_system} solar thermal", p_nom_extendable=True, capital_cost=costs.at[ heat_system.central_or_decentral + " solar thermal", "capital_cost" ] * overdim_factor, - p_max_pu=solar_thermal[nodes], + p_max_pu=solar_thermal[heat_nodes], lifetime=costs.at[ heat_system.central_or_decentral + " solar thermal", "lifetime" ], @@ -3502,12 +3522,16 @@ def add_heat( # Solid biomass CHP is added in add_biomass continue fuel_nodes = getattr(spatial, fuel).df + # Map subnodes to parent cluster fuel buses + fuel_bus_for_subnodes = fuel_nodes.loc[ + parent_of_subnode.values, "nodes" + ].values n.add( "Link", - nodes + f" urban central {fuel} CHP", - bus0=fuel_nodes.loc[nodes, "nodes"].values, - bus1=nodes, - bus2=nodes + " urban central heat", + heat_nodes + f" urban central {fuel} CHP", + bus0=fuel_bus_for_subnodes, + bus1=parent_of_subnode.values, + bus2=heat_nodes + " urban central heat", bus3="co2 atmosphere", carrier=f"urban central {fuel} CHP", p_nom_extendable=True, @@ -3521,14 +3545,18 @@ def add_heat( lifetime=costs.at["central gas CHP", "lifetime"], ) + # CO2 storage bus for subnodes + co2_bus_for_subnodes = spatial.co2.df.loc[ + parent_of_subnode.values, "nodes" + ].values n.add( "Link", - nodes + f" urban central {fuel} CHP CC", - bus0=fuel_nodes.loc[nodes, "nodes"].values, - bus1=nodes, - bus2=nodes + " urban central heat", + heat_nodes + f" urban central {fuel} CHP CC", + bus0=fuel_bus_for_subnodes, + bus1=parent_of_subnode.values, + bus2=heat_nodes + " urban central heat", bus3="co2 atmosphere", - bus4=spatial.co2.df.loc[nodes, "nodes"].values, + bus4=co2_bus_for_subnodes, carrier=f"urban central {fuel} CHP CC", p_nom_extendable=True, capital_cost=costs.at["central gas CHP", "capital_cost"] @@ -3566,11 +3594,11 @@ def add_heat( ): n.add( "Link", - nodes + f" {heat_system} micro gas CHP", + heat_nodes + f" {heat_system} micro gas CHP", p_nom_extendable=True, - bus0=spatial.gas.df.loc[nodes, "nodes"].values, - bus1=nodes, - bus2=nodes + f" {heat_system} heat", + bus0=spatial.gas.df.loc[parent_of_subnode.values, "nodes"].values, + bus1=parent_of_subnode.values, + bus2=heat_nodes + f" {heat_system} heat", bus3="co2 atmosphere", carrier=heat_system.value + " micro gas CHP", efficiency=costs.at["micro CHP", "efficiency"], @@ -3785,6 +3813,7 @@ def add_biomass( spatial, cf_industry, pop_layout, + district_heat_info, biomass_potentials_file, biomass_transport_costs_file=None, nyears=1, @@ -4248,22 +4277,33 @@ def add_biomass( ) # AC buses with district heating - urban_central = n.buses.index[n.buses.carrier == "urban central heat"] + urban_central_incl_subnodes = n.buses.index[n.buses.carrier == "urban central heat"] if ( - not urban_central.empty + not urban_central_incl_subnodes.empty and options["chp"]["enable"] and ("solid biomass" in options["chp"]["fuel"]) ): - urban_central = urban_central.str[: -len(" urban central heat")] + urban_central_incl_subnodes = urban_central_incl_subnodes.str[ + : -len(" urban central heat") + ] + # Map subnodes to parent clusters for resource bus lookups + if "parent_node" in district_heat_info.columns: + urban_central = pd.Index( + district_heat_info.loc[ + urban_central_incl_subnodes, "parent_node" + ].values + ) + else: + urban_central = urban_central_incl_subnodes key = "central solid biomass CHP" n.add( "Link", - urban_central + " urban central solid biomass CHP", + urban_central_incl_subnodes + " urban central solid biomass CHP", bus0=spatial.biomass.df.loc[urban_central, "nodes"].values, bus1=urban_central, - bus2=urban_central + " urban central heat", + bus2=urban_central_incl_subnodes + " urban central heat", carrier="urban central solid biomass CHP", p_nom_extendable=True, capital_cost=costs.at[key, "capital_cost"] * costs.at[key, "efficiency"], @@ -4275,10 +4315,10 @@ def add_biomass( n.add( "Link", - urban_central + " urban central solid biomass CHP CC", + urban_central_incl_subnodes + " urban central solid biomass CHP CC", bus0=spatial.biomass.df.loc[urban_central, "nodes"].values, - bus1=urban_central, - bus2=urban_central + " urban central heat", + bus1=urban_central.values, + bus2=urban_central_incl_subnodes + " urban central heat", bus3="co2 atmosphere", bus4=spatial.co2.df.loc[urban_central, "nodes"].values, carrier="urban central solid biomass CHP CC", @@ -4565,17 +4605,6 @@ def add_industry( - Process emission handling """ logger.info("Add industrial demand") - - # Ensure the gas carrier bus exists before adding any gas-for-industry links. - add_carrier_buses( - n=n, - carrier="gas", - costs=costs, - spatial=spatial, - options=options, - cf_industry=None, - ) - # add oil buses for shipping, aviation and naptha for industry add_carrier_buses( n, @@ -4601,12 +4630,6 @@ def add_industry( # 1e6 to convert TWh to MWh industrial_demand = pd.read_csv(industrial_demand_file, index_col=0) * 1e6 * nyears - if not options["biomass"]: - raise ValueError( - "Industry demand includes solid biomass, but `sector.biomass` is disabled. " - "Enable `sector: {biomass: true}` in config." - ) - n.add( "Bus", spatial.biomass.industry, @@ -5003,9 +5026,19 @@ def add_industry( ) # TODO simplify bus expression + # For LT industry heat, we need to use all nodes that have urban central heat buses + # This includes subnodes when subnodes are enabled + lt_heat_nodes = [ + node + for node in industrial_demand.index + if node + " urban central heat" in n.buses.index + or node + " services urban decentral heat" in n.buses.index + ] + lt_heat_nodes = pd.Index(lt_heat_nodes) + n.add( "Load", - nodes, + lt_heat_nodes, suffix=" low-temperature heat for industry", bus=[ ( @@ -5013,10 +5046,10 @@ def add_industry( if node + " urban central heat" in n.buses.index else node + " services urban decentral heat" ) - for node in nodes + for node in lt_heat_nodes ], carrier="low-temperature heat for industry", - p_set=industrial_demand.loc[nodes, "low-temperature heat"] / nhours, + p_set=industrial_demand.loc[lt_heat_nodes, "low-temperature heat"] / nhours, ) # remove today's industrial electricity demand by scaling down total electricity demand @@ -5473,72 +5506,102 @@ def add_waste_heat( options["use_fischer_tropsch_waste_heat"] and "Fischer-Tropsch" in link_carriers ): - n.links.loc[urban_central + " Fischer-Tropsch", "bus3"] = ( - urban_central + " urban central heat" - ) - n.links.loc[urban_central + " Fischer-Tropsch", "efficiency3"] = ( - 0.95 - n.links.loc[urban_central + " Fischer-Tropsch", "efficiency"] - ) * options["use_fischer_tropsch_waste_heat"] + # Filter to only nodes that have Fischer-Tropsch links + ft_nodes = urban_central[ + (urban_central + " Fischer-Tropsch").isin(n.links.index) + ] + if not ft_nodes.empty: + n.links.loc[ft_nodes + " Fischer-Tropsch", "bus3"] = ( + ft_nodes + " urban central heat" + ) + n.links.loc[ft_nodes + " Fischer-Tropsch", "efficiency3"] = ( + 0.95 - n.links.loc[ft_nodes + " Fischer-Tropsch", "efficiency"] + ) * options["use_fischer_tropsch_waste_heat"] # Sabatier process waste heat if options["use_methanation_waste_heat"] and "Sabatier" in link_carriers: - n.links.loc[urban_central + " Sabatier", "bus3"] = ( - urban_central + " urban central heat" - ) - n.links.loc[urban_central + " Sabatier", "efficiency3"] = ( - 0.95 - n.links.loc[urban_central + " Sabatier", "efficiency"] - ) * options["use_methanation_waste_heat"] + # Filter to only nodes that have Sabatier links + sabatier_nodes = urban_central[ + (urban_central + " Sabatier").isin(n.links.index) + ] + if not sabatier_nodes.empty: + n.links.loc[sabatier_nodes + " Sabatier", "bus3"] = ( + sabatier_nodes + " urban central heat" + ) + n.links.loc[sabatier_nodes + " Sabatier", "efficiency3"] = ( + 0.95 - n.links.loc[sabatier_nodes + " Sabatier", "efficiency"] + ) * options["use_methanation_waste_heat"] # Haber-Bosch process waste heat if options["use_haber_bosch_waste_heat"] and "Haber-Bosch" in link_carriers: - n.links.loc[urban_central + " Haber-Bosch", "bus3"] = ( - urban_central + " urban central heat" - ) - total_energy_input = ( - cf_industry["MWh_H2_per_tNH3_electrolysis"] - + cf_industry["MWh_elec_per_tNH3_electrolysis"] - ) / cf_industry["MWh_NH3_per_tNH3"] - electricity_input = ( - cf_industry["MWh_elec_per_tNH3_electrolysis"] - / cf_industry["MWh_NH3_per_tNH3"] - ) - n.links.loc[urban_central + " Haber-Bosch", "efficiency3"] = ( - 0.15 * total_energy_input / electricity_input - ) * options["use_haber_bosch_waste_heat"] + # Filter to only nodes that have Haber-Bosch links + hb_nodes = urban_central[ + (urban_central + " Haber-Bosch").isin(n.links.index) + ] + if not hb_nodes.empty: + n.links.loc[hb_nodes + " Haber-Bosch", "bus3"] = ( + hb_nodes + " urban central heat" + ) + total_energy_input = ( + cf_industry["MWh_H2_per_tNH3_electrolysis"] + + cf_industry["MWh_elec_per_tNH3_electrolysis"] + ) / cf_industry["MWh_NH3_per_tNH3"] + electricity_input = ( + cf_industry["MWh_elec_per_tNH3_electrolysis"] + / cf_industry["MWh_NH3_per_tNH3"] + ) + n.links.loc[hb_nodes + " Haber-Bosch", "efficiency3"] = ( + 0.15 * total_energy_input / electricity_input + ) * options["use_haber_bosch_waste_heat"] # Methanolisation waste heat if ( options["use_methanolisation_waste_heat"] and "methanolisation" in link_carriers ): - n.links.loc[urban_central + " methanolisation", "bus4"] = ( - urban_central + " urban central heat" - ) - n.links.loc[urban_central + " methanolisation", "efficiency4"] = ( - costs.at["methanolisation", "heat-output"] - / costs.at["methanolisation", "hydrogen-input"] - ) * options["use_methanolisation_waste_heat"] + # Filter to only nodes that have methanolisation links + methanol_nodes = urban_central[ + (urban_central + " methanolisation").isin(n.links.index) + ] + if not methanol_nodes.empty: + n.links.loc[methanol_nodes + " methanolisation", "bus4"] = ( + methanol_nodes + " urban central heat" + ) + n.links.loc[methanol_nodes + " methanolisation", "efficiency4"] = ( + costs.at["methanolisation", "heat-output"] + / costs.at["methanolisation", "hydrogen-input"] + ) * options["use_methanolisation_waste_heat"] # Electrolysis waste heat if ( options["use_electrolysis_waste_heat"] and "H2 Electrolysis" in link_carriers ): - n.links.loc[urban_central + " H2 Electrolysis", "bus2"] = ( - urban_central + " urban central heat" - ) - n.links.loc[urban_central + " H2 Electrolysis", "efficiency2"] = ( - 0.84 - n.links.loc[urban_central + " H2 Electrolysis", "efficiency"] - ) * options["use_electrolysis_waste_heat"] + # Filter to only nodes that have H2 Electrolysis links + elec_nodes = urban_central[ + (urban_central + " H2 Electrolysis").isin(n.links.index) + ] + if not elec_nodes.empty: + n.links.loc[elec_nodes + " H2 Electrolysis", "bus2"] = ( + elec_nodes + " urban central heat" + ) + n.links.loc[elec_nodes + " H2 Electrolysis", "efficiency2"] = ( + 0.84 - n.links.loc[elec_nodes + " H2 Electrolysis", "efficiency"] + ) * options["use_electrolysis_waste_heat"] # Fuel cell waste heat if options["use_fuel_cell_waste_heat"] and "H2 Fuel Cell" in link_carriers: - n.links.loc[urban_central + " H2 Fuel Cell", "bus2"] = ( - urban_central + " urban central heat" - ) - n.links.loc[urban_central + " H2 Fuel Cell", "efficiency2"] = ( - 0.95 - n.links.loc[urban_central + " H2 Fuel Cell", "efficiency"] - ) * options["use_fuel_cell_waste_heat"] + # Filter to only nodes that have H2 Fuel Cell links + fc_nodes = urban_central[ + (urban_central + " H2 Fuel Cell").isin(n.links.index) + ] + if not fc_nodes.empty: + n.links.loc[fc_nodes + " H2 Fuel Cell", "bus2"] = ( + fc_nodes + " urban central heat" + ) + n.links.loc[fc_nodes + " H2 Fuel Cell", "efficiency2"] = ( + 0.95 - n.links.loc[fc_nodes + " H2 Fuel Cell", "efficiency"] + ) * options["use_fuel_cell_waste_heat"] def add_agriculture( @@ -6327,7 +6390,14 @@ def add_import_options( year = int(snakemake.params["energy_totals_year"]) heating_efficiencies = pd.read_csv(fn, index_col=[1, 0]).loc[year] - spatial = define_spatial(pop_layout.index, options) + # Load district heat share early to get district_heating_nodes for define_spatial + # This file includes both parent nodes and subnodes (if subnodes are enabled) + district_heat_info = pd.read_csv(snakemake.input.district_heat_share, index_col=0) + district_heating_nodes = district_heat_info.index + + spatial = define_spatial( + pop_layout.index, options, district_heating_nodes=district_heating_nodes + ) if snakemake.params.foresight in ["myopic", "perfect"]: add_lifetime_wind_solar(n, costs) @@ -6416,7 +6486,7 @@ def add_import_options( ], enable_ates=snakemake.params.sector["district_heating"]["ates"]["enable"], ptes_direct_utilisation_profile=snakemake.input.ptes_direct_utilisation_profiles, - district_heat_share_file=snakemake.input.district_heat_share, + district_heat_info=district_heat_info, solar_thermal_total_file=snakemake.input.solar_thermal_total, retro_cost_file=snakemake.input.retro_cost, floor_area_file=snakemake.input.floor_area, @@ -6443,6 +6513,7 @@ def add_import_options( spatial=spatial, cf_industry=cf_industry, pop_layout=pop_layout, + district_heat_info=district_heat_info, biomass_potentials_file=snakemake.input.biomass_potentials, biomass_transport_costs_file=snakemake.input.biomass_transport_costs, nyears=nyears, @@ -6504,7 +6575,7 @@ def add_import_options( ) if options["dac"]: - add_dac(n, costs) + add_dac(n, costs, pop_layout, district_heat_info) if not options["electricity_transmission_grid"]: decentral(n)