-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstranded_assets_combination_script.py
More file actions
147 lines (124 loc) · 5.28 KB
/
stranded_assets_combination_script.py
File metadata and controls
147 lines (124 loc) · 5.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import json
import os
import pandas as pd
import re
from typing import Any
GRID_FILENAME_REGEX = re.compile(
r"stranded_assets_from_(?P<service_hours>\d*)_to_grid_(?P<grid_hours>\d*)_priority_(?P<grid_priority>GP|SG)_hpc_run_.*_info_file.json"
)
GRID_REGEX = re.compile(r"BW_UP_(?P<grid_hours>\d*)_(?P<grid_priority>.*)")
def parse_optimisation_data_dict(filedata: dict[str, Any]) -> dict[str, Any]:
temp_dict = filedata["system_appraisals"]["iteration_0"]["cumulative_results"]
temp_dict.update(
filedata["system_appraisals"]["iteration_0"]["environmental_appraisal"]
)
temp_dict.update(
filedata["system_appraisals"]["iteration_0"]["technical_appraisal"]
)
temp_dict.update(
filedata["system_appraisals"]["iteration_0"]["financial_appraisal"]
)
temp_dict.update(filedata["system_appraisals"]["iteration_0"]["criteria"])
temp_dict.update(filedata["system_appraisals"]["iteration_0"]["system_details"])
temp_dict.pop("input_files")
grid_match = GRID_REGEX.match(filedata["scenario"]["name"])
temp_dict["prioritise_self_generation"] = (
False if grid_match.group("grid_priority") == "GP" else True
)
temp_dict["grid_hours"] = (grid_hours:=float(grid_match.group("grid_hours")))
if grid_hours <= 9:
import pdb
pdb.set_trace()
return temp_dict
def parse_simulation_data_dict(
filedata: dict[str, Any], filename: str
) -> dict[str, Any]:
temp_dict = filedata["simulation_1"]["system_appraisal"]["environmental_appraisal"]
temp_dict.update(
filedata["simulation_1"]["system_appraisal"]["technical_appraisal"]
)
temp_dict.update(
filedata["simulation_1"]["system_appraisal"]["financial_appraisal"]
)
temp_dict.update(filedata["simulation_1"]["system_appraisal"]["criteria"])
grid_match = GRID_FILENAME_REGEX.match(filename)
# Try the alternative naming scheme if this files.
if grid_match is None:
alternative_regex = re.compile(
r"(?P<grid_hours>\d*)_hour_grid_only_info_file.json"
)
grid_match = alternative_regex.match(filename)
try:
temp_dict["prioritise_self_generation"] = (
True if grid_match.group("grid_priority") == "SG" else False
)
except IndexError:
# Assume grid-priority if not specified.
temp_dict["prioritise_self_generation"] = False
temp_dict["grid_hours"] = float(grid_match.group("grid_hours"))
try:
temp_dict["service_hours"] = float(grid_match.group("service_hours"))
except IndexError:
# Assume grid-priority if not specified.
temp_dict["service_hours"] = temp_dict["grid_hours"]
return temp_dict
def generate_combined_results_frame(results_dirname: str) -> pd.DataFrame:
filename_to_data_map: dict[str, Any] = {}
for dirname in os.listdir(
(base_dir := os.path.join(results_dirname, "optimisation_outputs"))
):
with open(
(filename := os.path.join(base_dir, dirname, "optimisation_output_1.json")),
"r",
encoding="UTF-8",
) as f:
file_data = json.load(f)
filename_to_data_map[filename] = parse_optimisation_data_dict(file_data)
filename_to_data_frame = pd.DataFrame(filename_to_data_map)
filename_to_data_frame = filename_to_data_frame.transpose()
return filename_to_data_frame
def generate_combined_simulations_results_frame(results_dirname: str) -> pd.DataFrame:
filename_to_data_map = dict()
for filename in os.listdir(
(sub_dirname := os.path.join(results_dirname, "simulation_results"))
):
# Skip directories.
if not os.path.isfile(filepath := os.path.join(sub_dirname, filename)):
continue
# Parse the contents and include.
with open(filepath, "r", encoding="UTF-8") as f:
file_data = json.load(f)
filename_to_data_map[filename] = parse_simulation_data_dict(
file_data, filename
)
filename_to_data_frame = pd.DataFrame(filename_to_data_map)
filename_to_data_frame = filename_to_data_frame.transpose()
return filename_to_data_frame
def main():
# Parse optimisation results
for results_dirname in [
"stranded_assets_results_3",
"stranded_assets_results_4_unsubsidised_grid",
]:
data_frame = generate_combined_results_frame(results_dirname)
with open(f"combined_{results_dirname}.csv", "w", encoding="UTF-8") as f:
data_frame.to_csv(f)
# Parse the grid-only simulations
combined_stranded_assets_results = generate_combined_simulations_results_frame(
"stranded_assets_results_from_13_hour_grid"
)
with open(
"combined_stranded_assets_results_from_13_hour_grid_simulations.csv",
"w",
encoding="UTF-8",
) as f:
combined_stranded_assets_results.to_csv(f)
combined_grid_results = generate_combined_simulations_results_frame(
"stranded_assets_grid_only_results"
)
with open(
"combined_stranded_assets_grid_only_results.csv", "w", encoding="UTF-8"
) as f:
combined_grid_results.to_csv(f)
if __name__ == "__main__":
main()