From 959687d46255cde8edc3934c1a04640c04bb01eb Mon Sep 17 00:00:00 2001 From: Fabian Thorsen Date: Wed, 11 Mar 2026 14:32:20 +0100 Subject: [PATCH] add jules logging bridge --- Project.toml | 2 + src/JulES.jl | 15 +- src/io.jl | 304 ++++++++++++++++++++------------------- src/python_logger.jl | 53 +++++++ src/run_jules_wrapper.jl | 184 ++++++++++++------------ src/run_serial.jl | 117 +++++++-------- src/scenariomodelling.jl | 40 +++--- 7 files changed, 388 insertions(+), 327 deletions(-) create mode 100644 src/python_logger.jl diff --git a/Project.toml b/Project.toml index 7eca464..8d5a2f9 100644 --- a/Project.toml +++ b/Project.toml @@ -15,6 +15,8 @@ Statistics = "10745b16-79ce-11e8-11f9-7d13ad32a3b2" Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" TuLiPa = "970f5c25-cd7d-4f04-b50d-7a4fe2af6639" YAML = "ddb6d928-2868-570f-bddf-ab3f9cf99eb6" +PythonCall = "6099a3de-0909-46bc-b1f4-468b9a2dfc0d" +Logging = "56ddb016-857b-54e1-b83d-db4d58db5568" [weakdeps] OrdinaryDiffEq = "1dea7af3-3e70-54e6-95c3-0bf5283fa5ed" diff --git a/src/JulES.jl b/src/JulES.jl index b485f9b..374ec1d 100644 --- a/src/JulES.jl +++ b/src/JulES.jl @@ -1,5 +1,13 @@ module JulES +macro debugtime(msg, expr) + quote + local stats = Base.@timed $(esc(expr)) + @debug $msg elapsed_s = round(stats.time, digits=3) bytes = stats.bytes gctime_s = round(stats.gctime, digits=3) + stats.value + end +end + import TuLiPa using Distributed @@ -7,11 +15,11 @@ using Dates using Statistics using Clustering using Distributions -using DataFrames +using DataFrames using JSON using YAML using HDF5 - +using Logging # Used by ifm #using ComponentArrays #using Interpolations @@ -25,7 +33,8 @@ using HDF5 # using OptimizationBBO # using Zygote -include("abstract_types.jl") +include("python_logger.jl") +include("abstract_types.jl") include("dimension_types.jl") include("ifm.jl") include("generic_io.jl") diff --git a/src/io.jl b/src/io.jl index e40ba9b..6bebe91 100644 --- a/src/io.jl +++ b/src/io.jl @@ -10,7 +10,7 @@ struct DefaultJulESInput <: AbstractJulESInput datayear::Int weatheryear::Int onlysubsystemmodel::Bool # TODO: can probably remove this - + steps::Int steplength::Millisecond simstarttime::TuLiPa.ProbTime @@ -22,29 +22,27 @@ struct DefaultJulESInput <: AbstractJulESInput phaseindelta::Millisecond phaseinsteps::Int - horizons::Dict{Tuple{TermName, CommodityName}, TuLiPa.Horizon} + horizons::Dict{Tuple{TermName,CommodityName},TuLiPa.Horizon} function DefaultJulESInput(config, dataset, datayear, weatheryear) mainconfig = config["main"] settings = config[mainconfig["settings"]] numcores = mainconfig["numcores"] cores = collect(1:numcores) - + onlysubsystemmodel = false if !haskey(settings["problems"], "prognosis") && !haskey(settings["problems"], "endvalue") && haskey(settings["problems"], "stochastic") && !haskey(settings["problems"], "clearing") onlysubsystemmodel = true end - println("Time parameters") - @time timeparams = get_timeparams(mainconfig, settings, datayear, weatheryear) + @debugtime "Time parameters done" timeparams = get_timeparams(mainconfig, settings, datayear, weatheryear) steps, steplength, simstarttime, scenmod_data, tnormaltype, tphaseintype, phaseinoffset, phaseindelta, phaseinsteps = timeparams - println("Handle elements") - @time begin + @debugtime "Handle elements" begin elements = dataset["elements"] add_scenariotimeperiod_int!(elements, settings["time"]["weatheryearstart"], settings["time"]["weatheryearstop"]) - + if haskey(dataset, "elements_ppp") elements_ppp = dataset["elements_ppp"] add_scenariotimeperiod_int!(elements_ppp, settings["time"]["weatheryearstart"], settings["time"]["weatheryearstop"]) @@ -61,7 +59,7 @@ struct DefaultJulESInput <: AbstractJulESInput if !onlysubsystemmodel for element in elements if element.typename == TuLiPa.GLOBALENEQKEY - enekvglobaldict[split(element.instancename,"GlobalEneq_")[2]] = element.value["Value"] + enekvglobaldict[split(element.instancename, "GlobalEneq_")[2]] = element.value["Value"] end end end @@ -111,7 +109,7 @@ function get_distribution_method_mp(input::DefaultJulESInput, default::String="b settings = get_settings(input) # Retrieve the distribution method value if !get_onlysubsystemmodel(input) - method = get(settings["problems"]["stochastic"],"distribution_method_mp", default) + method = get(settings["problems"]["stochastic"], "distribution_method_mp", default) else method = "core_main" end @@ -129,10 +127,10 @@ Withmp is the original method where scenarios are distributed on the same core a """ function get_distribution_method_sp(input::DefaultJulESInput, default::String="withmp") settings = get_settings(input) - + # Retrieve the distribution method value if !get_onlysubsystemmodel(input) - method = get(settings["problems"]["stochastic"],"distribution_method_sp", default) + method = get(settings["problems"]["stochastic"], "distribution_method_sp", default) else method = "even" end @@ -195,39 +193,39 @@ function get_ifm_weights(input::DefaultJulESInput) return w else - return Dict{String, Dict{String, Float64}}() + return Dict{String,Dict{String,Float64}}() end -end +end function get_datascenarios(datayear::Int64, weatheryear::Int64, weekstart::Int64, datanumscen::Int64, simtimetype::String) # Standard time for market clearing - perfect information so simple time type - datasimtime = TuLiPa.getisoyearstart(datayear) + Week(weekstart-1) - weathersimtime = TuLiPa.getisoyearstart(weatheryear) + Week(weekstart-1) + datasimtime = TuLiPa.getisoyearstart(datayear) + Week(weekstart - 1) + weathersimtime = TuLiPa.getisoyearstart(weatheryear) + Week(weekstart - 1) simtime = get_tnormal(simtimetype, datasimtime, weathersimtime) # Make scenariooffset for all uncertainty scenarios datascenarios = Vector{WeatherScenario}(undef, datanumscen) for scen in 1:datanumscen - weatherscenariotime = TuLiPa.getisoyearstart(weatheryear + scen - 1) + Week(weekstart-1) + weatherscenariotime = TuLiPa.getisoyearstart(weatheryear + scen - 1) + Week(weekstart - 1) weatheroffset = weatherscenariotime - weathersimtime - datascenarios[scen] = WeatherScenario(weatheroffset, 1/datanumscen, scen) + datascenarios[scen] = WeatherScenario(weatheroffset, 1 / datanumscen, scen) end return (simtime, NoScenarioModellingMethod(datascenarios)) end function get_timeparams(mainconfig::Dict, settings::Dict, datayear::Int, weatheryear::Int) weekstart = mainconfig["weekstart"] - + weatheryearstart = settings["time"]["weatheryearstart"] weatheryearstop = settings["time"]["weatheryearstop"] datanumscen = weatheryearstop - weatheryearstart # scenarios to consider uncertainty for - + simulationyears = mainconfig["simulationyears"] extrasteps = mainconfig["extrasteps"] steplength = get_steplength(settings) - steps = Int(ceil((TuLiPa.getisoyearstart(datayear + simulationyears) - TuLiPa.getisoyearstart(datayear)).value/steplength.value) + extrasteps); - + steps = Int(ceil((TuLiPa.getisoyearstart(datayear + simulationyears) - TuLiPa.getisoyearstart(datayear)).value / steplength.value) + extrasteps) + # Phasein settings phaseinoffset = steplength # phase in straight away from second stage scenarios if haskey(settings["time"]["probtime"], "phaseintime") @@ -297,8 +295,8 @@ function get_scentime(simtime::TuLiPa.ProbTime, scenario::AbstractScenario, inpu end function add_scenariotimeperiod_int!(elements::Vector{TuLiPa.DataElement}, start::Int, stop::Int) - push!(elements, TuLiPa.getelement(TuLiPa.TIMEPERIOD_CONCEPT, "ScenarioTimePeriod", "ScenarioTimePeriod", - ("Start", TuLiPa.getisoyearstart(start)), ("Stop", TuLiPa.getisoyearstart(stop)))) + push!(elements, TuLiPa.getelement(TuLiPa.TIMEPERIOD_CONCEPT, "ScenarioTimePeriod", "ScenarioTimePeriod", + ("Start", TuLiPa.getisoyearstart(start)), ("Stop", TuLiPa.getisoyearstart(stop)))) return end @@ -377,7 +375,7 @@ function get_simperiod(input::AbstractJulESInput) steplength = get_steplength(input) skipmed = Millisecond(Hour(0)) skipmax_steps = input.settings["time"]["skipmax"]::Int - skipmax = Millisecond(Hour(steplength*(skipmax_steps-1))) + skipmax = Millisecond(Hour(steplength * (skipmax_steps - 1))) return (t, N, steplength, skipmed, skipmax) end @@ -392,9 +390,9 @@ end function get_aggzonecopl(aggzone::Dict) aggzonecopl = Dict() - for (k,v) in aggzone + for (k, v) in aggzone for vv in v - aggzonecopl["PowerBalance_" * vv] = "PowerBalance_" * k + aggzonecopl["PowerBalance_"*vv] = "PowerBalance_" * k end end @@ -445,9 +443,9 @@ end # ------------------------------------------------------------------------------------------- function get_horizons(settings, datayear) - horizons = Dict{Tuple{TermName, CommodityName}, TuLiPa.Horizon}() + horizons = Dict{Tuple{TermName,CommodityName},TuLiPa.Horizon}() commoditites = settings["horizons"]["commodities"] - n_durations = Dict{Tuple{TermName, CommodityName}, Tuple{Int, Millisecond}}() + n_durations = Dict{Tuple{TermName,CommodityName},Tuple{Int,Millisecond}}() for term in keys(settings["horizons"]) if !(term in ["commodities", "shrinkable"]) @@ -566,7 +564,7 @@ function parse_duration(config, namestart) end end end - println(config) + @error "Duration key not found in config" namestart = namestart keys = collect(keys(config)) error("Key $namestart not in config") end @@ -599,7 +597,7 @@ mutable struct DefaultJulESOutput <: AbstractJulESOutput hydrolevels::Array{Float64} batterylevels::Array{Float64} othervalues::Dict - + modelobjects::Dict powerbalances::Vector rhsterms::Vector @@ -626,14 +624,14 @@ mutable struct DefaultJulESOutput <: AbstractJulESOutput actualQ::Matrix{Float64} function DefaultJulESOutput(input) - return new(Dict(),Dict(),Dict(),Dict(),[], - Dict(), - [],[],[],[],[],[],[], - [],[], - [],[],[],[],[],[],Dict(), - Dict(),[],[],[],[],[],Dict(),[],[],Dict(),[],[],Dict(),Dict(), - [],[], - [],[],[],Matrix{Float64}(undef, (0,0)),[],Matrix{Float64}(undef, (0,0))) + return new(Dict(), Dict(), Dict(), Dict(), [], + Dict(), + [], [], [], [], [], [], [], + [], [], + [], [], [], [], [], [], Dict(), + Dict(), [], [], [], [], [], Dict(), [], [], Dict(), [], [], Dict(), Dict(), + [], [], + [], [], [], Matrix{Float64}(undef, (0, 0)), [], Matrix{Float64}(undef, (0, 0))) end end @@ -675,9 +673,9 @@ function init_local_output() for (subix, core) in db.dist_mp if settings["results"]["storagevalues"] if has_headlosscost(settings["problems"]["stochastic"]["master"]) - num_storagevalues = get_numscen_stoch(db.input)*2 + 2 # scenarios + master operative + master operative after headlosscost adjustment + num_storagevalues = get_numscen_stoch(db.input) * 2 + 2 # scenarios + master operative + master operative after headlosscost adjustment else - num_storagevalues = get_numscen_stoch(db.input)*2 + 1 # scenarios + master operative + num_storagevalues = get_numscen_stoch(db.input) * 2 + 1 # scenarios + master operative end if haskey(settings["problems"], "clearing") num_storagevalues += 2 @@ -768,7 +766,7 @@ end function collect_ifm_u0(stepnr) db = get_local_db() - d = Dict{String, Vector{Float64}}() + d = Dict{String,Vector{Float64}}() for core in get_cores(db.input) fetched = fetch(@spawnat core local_collect_ifm_u0(stepnr)) if fetched isa RemoteException @@ -784,7 +782,7 @@ end function local_collect_ifm_u0(stepnr) db = get_local_db() - d = Dict{String, Vector{Float64}}() + d = Dict{String,Vector{Float64}}() for (name, core) in db.dist_ifm if core == db.core (stored_stepnr, u0) = db.div[IFM_DB_STATE_KEY][name] @@ -797,7 +795,7 @@ end function collect_ifm_Q(stepnr) db = get_local_db() - d = Dict{String, Float64}() + d = Dict{String,Float64}() for core in get_cores(db.input) fetched = fetch(@spawnat core local_collect_ifm_Q(stepnr)) if fetched isa RemoteException @@ -813,7 +811,7 @@ end function local_collect_ifm_Q(stepnr) db = get_local_db() - d = Dict{String, Float64}() + d = Dict{String,Float64}() for (name, core) in db.dist_ifm if core == db.core (stored_stepnr, Q) = db.div[IFM_DB_FLOW_KEY][name] @@ -857,7 +855,7 @@ function collect_ifm_allQ_local(stoch_scenixs, name, stepnr) for (_name, core) in db.dist_ifm if (core == db.core) && (_name == name) for (i, scenix) in enumerate(stoch_scenixs) - Q_sum .+= db.ifm_output[name][scenix][2]*get_probability(db.scenmod_stoch.scenarios[i]) + Q_sum .+= db.ifm_output[name][scenix][2] * get_probability(db.scenmod_stoch.scenarios[i]) end return Q_sum end @@ -926,7 +924,7 @@ function update_output(t::TuLiPa.ProbTime, stepnr::Int) db.output.timing_sp[(scenix, subix)][stepnr, :] .= fetch(f) @spawnat core reset_maintiming_sp(scenix, subix) end - end + end if has_result_scenarios(settings) db.output.scenweights_sim[stepnr, :] .= [get_probability(scen) for scen in get_scenarios(db.scenmod_sim)] @@ -936,9 +934,9 @@ function update_output(t::TuLiPa.ProbTime, stepnr::Int) if has_result_storagevalues(settings) for (subix, core) in db.dist_mp if has_headlosscost(settings["problems"]["stochastic"]["master"]) - dim = get_numscen_stoch(db.input)*2 + 2 # scenarios + master operative + master operative after headlosscost adjustment + dim = get_numscen_stoch(db.input) * 2 + 2 # scenarios + master operative + master operative after headlosscost adjustment else - dim = get_numscen_stoch(db.input)*2 + 1 # scenarios + master operative + dim = get_numscen_stoch(db.input) * 2 + 1 # scenarios + master operative end if has_result_storagevalues_all_problems(settings) || !haskey(settings["problems"], "clearing") f = @spawnat core get_storagevalues_stoch(subix) @@ -1013,7 +1011,7 @@ function update_output(t::TuLiPa.ProbTime, stepnr::Int) if settings["results"]["mainresults"] == "all" resultobjects = TuLiPa.getobjects(prob_results) # collect results for all areas else - resultobjects = TuLiPa.getpowerobjects(db.output.modelobjects, settings["results"]["mainresults"]); # only collect results for one area + resultobjects = TuLiPa.getpowerobjects(db.output.modelobjects, settings["results"]["mainresults"]) # only collect results for one area end powerbalances, rhsterms, rhstermbalances, plants, plantbalances, plantarrows, demands, demandbalances, demandarrows, hydrostorages, batterystorages = TuLiPa.order_result_objects(resultobjects, true) @@ -1029,28 +1027,28 @@ function update_output(t::TuLiPa.ProbTime, stepnr::Int) db.output.hydrostorages = hydrostorages db.output.batterystorages = batterystorages - db.output.prices = zeros(Int(numperiods_powerhorizon*steps), length(db.output.powerbalances)) - db.output.rhstermvalues = zeros(Int(numperiods_powerhorizon*steps), length(db.output.rhsterms)) - db.output.production = zeros(Int(numperiods_powerhorizon*steps), length(db.output.plants)) - db.output.consumption = zeros(Int(numperiods_powerhorizon*steps), length(db.output.demands)) - db.output.hydrolevels = zeros(Int(numperiods_hydrohorizon*steps), length(db.output.hydrostorages)) - db.output.batterylevels = zeros(Int(numperiods_powerhorizon*steps), length(db.output.batterystorages)) + db.output.prices = zeros(Int(numperiods_powerhorizon * steps), length(db.output.powerbalances)) + db.output.rhstermvalues = zeros(Int(numperiods_powerhorizon * steps), length(db.output.rhsterms)) + db.output.production = zeros(Int(numperiods_powerhorizon * steps), length(db.output.plants)) + db.output.consumption = zeros(Int(numperiods_powerhorizon * steps), length(db.output.demands)) + db.output.hydrolevels = zeros(Int(numperiods_hydrohorizon * steps), length(db.output.hydrostorages)) + db.output.batterylevels = zeros(Int(numperiods_powerhorizon * steps), length(db.output.batterystorages)) if haskey(settings["results"], "otherterms") otherinfo = settings["results"]["otherterms"] otherobjects, otherbalances = TuLiPa.order_result_objects_other(resultobjects, otherinfo) db.output.otherobjects = otherobjects db.output.otherbalances = otherbalances - + for key in keys(otherinfo) db.output.othervalues[key] = Dict() - + for commodity in keys(otherinfo[key]) horizon = TuLiPa.get_horizon_commodity(resultobjects, commodity) if key == "RHSTerms" - db.output.othervalues[key][commodity] = zeros(TuLiPa.getnumperiods(horizon)*steps, length(otherobjects[key][commodity])) + db.output.othervalues[key][commodity] = zeros(TuLiPa.getnumperiods(horizon) * steps, length(otherobjects[key][commodity])) elseif key == "Vars" - db.output.othervalues[key][commodity] = zeros(TuLiPa.getnumperiods(horizon)*steps, length(otherobjects[key][commodity])) + db.output.othervalues[key][commodity] = zeros(TuLiPa.getnumperiods(horizon) * steps, length(otherobjects[key][commodity])) end end end @@ -1059,10 +1057,10 @@ function update_output(t::TuLiPa.ProbTime, stepnr::Int) if has_ifm_results(db.input) db = get_local_db() steplength = get_steplength(db) - steplength_days = steplength/Day(1) + steplength_days = steplength / Day(1) pred_days = length(db.ifm_output[db.output.ifm_stations[1]][1][2]) - num_values = Int(pred_days + steplength_days*(get_steps(db)-1)) - ifm_rhsterm_to_station = db.input.dataset["ifm_rhsterm_to_station"] + num_values = Int(pred_days + steplength_days * (get_steps(db) - 1)) + ifm_rhsterm_to_station = db.input.dataset["ifm_rhsterm_to_station"] db.output.actualQ = zeros(Float64, (length(db.output.ifm_stations), num_values)) delta = TuLiPa.MsTimeDelta(Day(1)) @@ -1070,7 +1068,7 @@ function update_output(t::TuLiPa.ProbTime, stepnr::Int) profile = get_profile_from_resultobjects_rhsterm(resultobjects, station, ifm_rhsterm_to_station) if !isnothing(profile) for j in 1:num_values - start = t + Day(j-1) + start = t + Day(j - 1) db.output.actualQ[i, j] = TuLiPa.getweightedaverage(profile, TuLiPa.getscenariotime(start), delta) end end @@ -1078,16 +1076,16 @@ function update_output(t::TuLiPa.ProbTime, stepnr::Int) end end - if stepnr == 2 + if stepnr == 2 db.output.statenames = collect(keys(db.startstates)) db.output.statematrix = zeros(length(values(db.startstates)), Int(steps)) end if stepnr != 1 - db.output.statematrix[:,stepnr-1] .= collect(values(db.startstates)) + db.output.statematrix[:, stepnr-1] .= collect(values(db.startstates)) end - powerrange = Int(numperiods_powerhorizon*(stepnr-1)+1):Int(numperiods_powerhorizon*(stepnr)) - hydrorange = Int(numperiods_hydrohorizon*(stepnr-1)+1):Int(numperiods_hydrohorizon*(stepnr)) + powerrange = Int(numperiods_powerhorizon * (stepnr - 1) + 1):Int(numperiods_powerhorizon * (stepnr)) + hydrorange = Int(numperiods_hydrohorizon * (stepnr - 1) + 1):Int(numperiods_hydrohorizon * (stepnr)) TuLiPa.get_results!(prob_results, db.output.prices, db.output.rhstermvalues, db.output.production, db.output.consumption, db.output.hydrolevels, db.output.batterylevels, db.output.powerbalances, db.output.rhsterms, db.output.plants, db.output.plantbalances, db.output.plantarrows, db.output.demands, db.output.demandbalances, db.output.demandarrows, db.output.hydrostorages, db.output.batterystorages, db.output.modelobjects, powerrange, hydrorange, periodduration_power, t) if haskey(settings["results"], "otherterms") @@ -1098,8 +1096,8 @@ function update_output(t::TuLiPa.ProbTime, stepnr::Int) collect_interval = get_result_prices_ppp(settings) if collect_interval != 0 - if (stepnr-1) % collect_interval == 0 - collect_step = div((stepnr-1), collect_interval) + 1 + if (stepnr - 1) % collect_interval == 0 + collect_step = div((stepnr - 1), collect_interval) + 1 futures = [] stoch_scenixs = [scenario.parentscenario for scenario in db.scenmod_stoch.scenarios] @@ -1174,7 +1172,7 @@ function get_ppp_prices(scenix, bids) delta_long += TuLiPa.getduration(TuLiPa.gettimedelta(horizon_long, t)).value ppp.div["deltas_long"][t] = delta_long for (i, bid) in enumerate(bids) - ppp.div["prices_long"][i,t] = TuLiPa.getcondual(ppp.longprob, bid, t) + ppp.div["prices_long"][i, t] = TuLiPa.getcondual(ppp.longprob, bid, t) end end @@ -1187,7 +1185,7 @@ function get_ppp_prices(scenix, bids) delta_med += TuLiPa.getduration(TuLiPa.gettimedelta(horizon_med, t)).value ppp.div["deltas_med"][t] = delta_med for (i, bid) in enumerate(bids) - ppp.div["prices_med"][i,t] = TuLiPa.getcondual(ppp.medprob, bid, t) + ppp.div["prices_med"][i, t] = TuLiPa.getcondual(ppp.medprob, bid, t) end end @@ -1200,7 +1198,7 @@ function get_ppp_prices(scenix, bids) delta_short += TuLiPa.getduration(TuLiPa.gettimedelta(horizon_short, t)).value ppp.div["deltas_short"][t] = delta_short for (i, bid) in enumerate(bids) - ppp.div["prices_short"][i,t] = TuLiPa.getcondual(ppp.shortprob, bid, t) + ppp.div["prices_short"][i, t] = TuLiPa.getcondual(ppp.shortprob, bid, t) end end @@ -1240,15 +1238,15 @@ end function get_output_storagevalues(output, steplength, skipmax) db = get_local_db() settings = get_settings(db) - + if has_result_storagevalues(settings) f = @spawnat db.core_main get_output_storagevalues_local(steplength, skipmax) storagenames, storagevalues, shorts, scenarionames, skipfactor = fetch(f) - + if has_headlosscost(settings["problems"]["stochastic"]["master"]) - dim = get_numscen_stoch(db.input)*2 + 2 # scenarios + master operative + master operative after headlosscost adjustment + dim = get_numscen_stoch(db.input) * 2 + 2 # scenarios + master operative + master operative after headlosscost adjustment else - dim = get_numscen_stoch(db.input)*2 + 1 # scenarios + master operative + dim = get_numscen_stoch(db.input) * 2 + 1 # scenarios + master operative end output["storagenames"] = [sn * "_sv" for sn in storagenames] output["storagevalues_main"] = -cat(storagevalues..., dims=3)[:, dim+1, :] @@ -1303,14 +1301,14 @@ function get_output_storagevalues_local(steplength, skipmax) end end - skipfactor = (skipmax+Millisecond(steplength))/Millisecond(steplength) + skipfactor = (skipmax + Millisecond(steplength)) / Millisecond(steplength) return (storagenames, storagevalues, shorts, scenarionames, skipfactor) end function get_storagenames_from_subix(subix) db = get_local_db() - + storagenames = String[] for (j, statevar) in enumerate(db.mp[subix].cuts.statevars) push!(storagenames, TuLiPa.getinstancename(first(TuLiPa.getvarout(statevar)))) @@ -1321,7 +1319,7 @@ end function get_output_ppp_prices(output) db = get_local_db() settings = get_settings(db) - + if get_result_prices_ppp(settings) != 0 f = @spawnat db.core_main get_output_prices_ppp_local() ret = fetch(f) @@ -1329,7 +1327,7 @@ function get_output_ppp_prices(output) throw(ret) end balancenames, pl, dl, pm, dm, ps, ds = ret - + output["balancenames_ppp"] = balancenames output["prices_long"] = pl output["deltas_long"] = dl @@ -1355,6 +1353,18 @@ function get_output_prices_ppp_local() return balancenames, pl, dl, pm, dm, ps, ds end + +function log_df(label, df) + buf = IOBuffer() + show(IOContext(buf, :limit => false, :displaysize => (50, 200)), df) + @info label table = String(take!(buf)) +end +function log_df_wide(label, df; chunksize=8) + cols = names(df) + for (i, chunk) in enumerate(Iterators.partition(cols, chunksize)) + log_df("$label ($i)", df[!, collect(chunk)]) + end +end function get_output_memory(output) db = get_local_db() settings = get_settings(db) @@ -1362,7 +1372,7 @@ function get_output_memory(output) if has_result_memory(settings) cores = get_cores(db) names = ["sum_unique", "core", "input", "output", "horizons", "dummyobjects", "dummyobjects_ppp", "startstates", "subsystems", "subsystems_evp", "subsystems_stoch", "scenmod_sim", "scenmod_stoch", "ifm", "ppp", "prices_ppp", "evp", "mp", "sp", "cp", "dist_ifm", "dist_ppp", "dist_evp", "dist_mp", "dist_sp", "core_main", "div", "ifm_output", "ifm_derived", "sum"] - + results = zeros(length(names), length(cores)) futures = [] @sync for core in cores @@ -1380,7 +1390,7 @@ function get_output_memory(output) end df = insertcols(df, 1, :core_id => names) - println(df) + log_df("Memory usage (MB)", df) end return @@ -1393,7 +1403,7 @@ function get_output_memory_local() for (i, field) in enumerate(fieldnames(typeof(db))) field_value = getfield(db, field) field_memory_size = Base.summarysize(field_value) / 1e6 - values[i + 1] = field_memory_size + values[i+1] = field_memory_size end values[end] = sum(values[1:end-1]) @@ -1429,9 +1439,9 @@ function get_output_timing_local(data, steplength, skipmax) settings = get_settings(db) if has_result_times(settings) - skipfactor = (skipmax+Millisecond(steplength))/Millisecond(steplength) - factors = [skipfactor,skipfactor,1] - + skipfactor = (skipmax + Millisecond(steplength)) / Millisecond(steplength) + factors = [skipfactor, skipfactor, 1] + timing_cp = get_timing_cp_local() timings_ppp = [] @@ -1441,7 +1451,7 @@ function get_output_timing_local(data, steplength, skipmax) if haskey(settings["problems"], "prognosis") dims = (get_steps(db), 3, 3, length(timings_ppp)) timings_ppp1 = reshape(cat(timings_ppp..., dims=4), dims) - timings_ppp2 = transpose(dropdims(mean(timings_ppp1,dims=(1,4)),dims=(1,4))).*factors + timings_ppp2 = transpose(dropdims(mean(timings_ppp1, dims=(1, 4)), dims=(1, 4))) .* factors else timings_ppp2 = zeros(3, 3) end @@ -1454,19 +1464,19 @@ function get_output_timing_local(data, steplength, skipmax) push!(df_evp, [scenix, subix, values[1], values[2], values[3], core, fetch(f)]) end df_evp[!, :other] = df_evp[!, :total] - df_evp[!, :solve] - df_evp[!, :update] - df_evp[df_evp.skipmed .== true, [:update, :solve, :total]] .= df_evp[df_evp.skipmed .== true, [:update, :solve, :total]] .* skipfactor + df_evp[df_evp.skipmed.==true, [:update, :solve, :total]] .= df_evp[df_evp.skipmed.==true, [:update, :solve, :total]] .* skipfactor if nrow(df_evp) != 0 - df_evp_subix = combine(groupby(df_evp, [:subix]), - :update => sum => :evp_u, - :solve => sum => :evp_s, - :other => sum => :evp_o, - :total => sum => :evp_tot) + df_evp_subix = combine(groupby(df_evp, [:subix]), + :update => sum => :evp_u, + :solve => sum => :evp_s, + :other => sum => :evp_o, + :total => sum => :evp_tot) timings_evp = mean.(eachcol(select(df_evp_subix, Not([:subix, :evp_o])))) - df_evp_core = combine(groupby(df_evp, [:core]), - :update => sum => :evp_u, - :solve => sum => :evp_s, - :other => sum => :evp_o, - :total => sum => :evp_tot) + df_evp_core = combine(groupby(df_evp, [:core]), + :update => sum => :evp_u, + :solve => sum => :evp_s, + :other => sum => :evp_o, + :total => sum => :evp_tot) else df_evp_subix = DataFrame([name => [] for name in ["subix", "evp_u", "evp_s", "evp_o", "evp_tot"]]) timings_evp = [0.0, 0.0, 0.0] @@ -1481,15 +1491,15 @@ function get_output_timing_local(data, steplength, skipmax) push!(df_mp, [subix, values[1], values[2], values[3], values[4], values[5], core, fetch(f)]) end df_mp[!, :mp_tot] = df_mp[!, :mp_s] + df_mp[!, :mp_u] + df_mp[!, :mp_fin] + df_mp[!, :mp_o] - df_mp[df_mp.skipmed .== true, [:mp_u, :mp_s, :mp_fin, :mp_o, :mp_tot, :bend_it]] .= df_mp[df_mp.skipmed .== true, [:mp_u, :mp_s, :mp_fin, :mp_o, :mp_tot, :bend_it]] .* skipfactor + df_mp[df_mp.skipmed.==true, [:mp_u, :mp_s, :mp_fin, :mp_o, :mp_tot, :bend_it]] .= df_mp[df_mp.skipmed.==true, [:mp_u, :mp_s, :mp_fin, :mp_o, :mp_tot, :bend_it]] .* skipfactor if nrow(df_mp) != 0 timings_mp = mean.(eachcol(select(df_mp, Not([:subix, :core, :skipmed, :mp_fin, :mp_o, :bend_it])))) - df_mp_core = combine(groupby(df_mp, [:core]), - :mp_u => sum => :mp_u, - :mp_s => sum => :mp_s, - :mp_fin => sum => :mp_fin, - :mp_o => sum => :mp_o, - :mp_tot => sum => :mp_tot) + df_mp_core = combine(groupby(df_mp, [:core]), + :mp_u => sum => :mp_u, + :mp_s => sum => :mp_s, + :mp_fin => sum => :mp_fin, + :mp_o => sum => :mp_o, + :mp_tot => sum => :mp_tot) else timings_mp = [0.0, 0.0, 0.0] df_mp_core = DataFrame([name => [] for name in ["core", "mp_u", "mp_s", "mp_fin", "mp_o", "mp_tot"]]) @@ -1502,19 +1512,19 @@ function get_output_timing_local(data, steplength, skipmax) push!(df_sp, [scenix, subix, values[1], values[2], values[3], core, fetch(f)]) end df_sp[!, :total] = df_sp[!, :solve] + df_sp[!, :update] + df_sp[!, :other] - df_sp[df_sp.skipmed .== true, [:update, :solve, :other, :total]] .= df_sp[df_sp.skipmed .== true, [:update, :solve, :other, :total]] .* skipfactor + df_sp[df_sp.skipmed.==true, [:update, :solve, :other, :total]] .= df_sp[df_sp.skipmed.==true, [:update, :solve, :other, :total]] .* skipfactor if nrow(df_sp) != 0 - df_sp_subix = combine(groupby(df_sp, [:subix]), - :update => sum => :sp_u, - :solve => sum => :sp_s, - :other => sum => :sp_o, - :total => sum => :sp_tot) + df_sp_subix = combine(groupby(df_sp, [:subix]), + :update => sum => :sp_u, + :solve => sum => :sp_s, + :other => sum => :sp_o, + :total => sum => :sp_tot) timings_sp = mean.(eachcol(select(df_sp_subix, Not([:subix, :sp_o])))) - df_sp_core = combine(groupby(df_sp, [:core]), - :update => sum => :sp_u, - :solve => sum => :sp_s, - :other => sum => :sp_o, - :total => sum => :sp_tot) + df_sp_core = combine(groupby(df_sp, [:core]), + :update => sum => :sp_u, + :solve => sum => :sp_s, + :other => sum => :sp_o, + :total => sum => :sp_tot) else df_sp_subix = DataFrame([name => [] for name in ["subix", "sp_u", "sp_s", "sp_o", "sp_tot"]]) timings_sp = [0.0, 0.0, 0.0] @@ -1522,27 +1532,27 @@ function get_output_timing_local(data, steplength, skipmax) end # TODO: df_sp_scen - df_subix = outerjoin(df_evp_subix, df_mp, df_sp_subix, on = :subix) + df_subix = outerjoin(df_evp_subix, df_mp, df_sp_subix, on=:subix) df_subix = coalesce.(df_subix, 0.0) df_subix[!, :tot] = df_subix[!, :evp_tot] + df_subix[!, :mp_tot] + df_subix[!, :sp_tot] df_subix = sort(df_subix, :tot, rev=true) df_subix = df_subix[!, [:subix, :tot, :evp_tot, :mp_tot, :sp_tot, :evp_u, :evp_s, :evp_o, :bend_it, :mp_u, :mp_s, :mp_fin, :mp_o, :sp_u, :sp_s, :sp_o]] - df_core = outerjoin(df_evp_core, df_mp_core, df_sp_core, on = :core) + df_core = outerjoin(df_evp_core, df_mp_core, df_sp_core, on=:core) df_core = coalesce.(df_core, 0.0) df_core[!, :tot] = df_core[!, :evp_tot] + df_core[!, :mp_tot] + df_core[!, :sp_tot] df_core = sort(df_core, :tot, rev=true) df_core = df_core[!, [:core, :tot, :evp_tot, :mp_tot, :sp_tot, :evp_u, :evp_s, :evp_o, :mp_u, :mp_s, :mp_fin, :mp_o, :sp_u, :sp_s, :sp_o]] if haskey(settings["problems"], "clearing") - all = vcat(timings_ppp2, reshape(timings_evp,1,3), reshape(timings_mp,1,3), reshape(timings_sp,1,3), mean(timing_cp, dims=1)) - df = DataFrame(model=["long","med","short","evp","mp","sp","clearing"], update=all[:,1], solve=all[:,2], total=all[:,3]) + all = vcat(timings_ppp2, reshape(timings_evp, 1, 3), reshape(timings_mp, 1, 3), reshape(timings_sp, 1, 3), mean(timing_cp, dims=1)) + df = DataFrame(model=["long", "med", "short", "evp", "mp", "sp", "clearing"], update=all[:, 1], solve=all[:, 2], total=all[:, 3]) df[!, :other] = df[!, :total] - df[!, :solve] - df[!, :update] - display(df[!, [1, 2, 3, 5, 4]]) + log_df("Timing breakdown", df[!, [1, 2, 3, 5, 4]]) end - display(df_core) - display(df_subix) + log_df_wide("Core timing", df_core) + log_df_wide("Subix timing", df_subix) end # # display number of elements of each type per subsystem # unique_types = ["subix", "name_first_element", "name_second_element", "total_count"] # @@ -1559,13 +1569,13 @@ function get_output_timing_local(data, steplength, skipmax) # display(df_sub_element_type) if settings["results"]["times"] - if haskey(settings["problems"], "prognosis") + if haskey(settings["problems"], "prognosis") data["prognosistimes"] = timings_ppp1 end - if haskey(settings["problems"], "endvalue") + if haskey(settings["problems"], "endvalue") data["endvaluetimes"] = Matrix{Float64}(df_evp) end - if haskey(settings["problems"], "stochastic") + if haskey(settings["problems"], "stochastic") data["mptimes"] = Matrix{Float64}(df_mp) data["sptimes"] = Matrix{Float64}(df_sp) end @@ -1604,7 +1614,7 @@ function get_output_main_local() if get_onlysubsystemmodel(db.input) startstates_main = get_startstates_from_mp() else - startstates_main = get_startstates_from_cp() + startstates_main = get_startstates_from_cp() end for (k, v) in startstates_main db.startstates[k] = v @@ -1614,7 +1624,7 @@ function get_output_main_local() db.output.statenames = collect(keys(db.startstates)) db.output.statematrix = collect(values(db.startstates)) else - db.output.statematrix[:,steps] .= collect(values(db.startstates)) + db.output.statematrix[:, steps] .= collect(values(db.startstates)) end if haskey(settings["results"], "mainresults") @@ -1636,7 +1646,7 @@ function get_output_main_local() end # Only keep rhsterms that have at least one value (TODO: Do the same for sypply and demands) - rhstermtotals = dropdims(sum(db.output.rhstermvalues,dims=1),dims=1) + rhstermtotals = dropdims(sum(db.output.rhstermvalues, dims=1), dims=1) rhstermsupplyidx = [] rhstermdemandidx = [] @@ -1649,20 +1659,20 @@ function get_output_main_local() end # Put rhsterms together with supplies and demands - rhstermsupplyvalues = db.output.rhstermvalues[:,rhstermsupplyidx] - rhstermdemandvalues = db.output.rhstermvalues[:,rhstermdemandidx]*-1 + rhstermsupplyvalues = db.output.rhstermvalues[:, rhstermsupplyidx] + rhstermdemandvalues = db.output.rhstermvalues[:, rhstermdemandidx] * -1 rhstermsupplynames = [TuLiPa.getinstancename(rhsterm) for rhsterm in db.output.rhsterms[rhstermsupplyidx]] rhstermsupplybalancenames = [split(TuLiPa.getinstancename(r), "PowerBalance_")[2] for r in db.output.rhstermbalances[rhstermsupplyidx]] rhstermdemandnames = [TuLiPa.getinstancename(rhsterm) for rhsterm in db.output.rhsterms[rhstermdemandidx]] rhstermdemandbalancenames = [split(TuLiPa.getinstancename(r), "PowerBalance_")[2] for r in db.output.rhstermbalances[rhstermdemandidx]] - supplynames = [[TuLiPa.getinstancename(plant) for plant in db.output.plants];rhstermsupplynames] - supplybalancenames = [[split(TuLiPa.getinstancename(p), "PowerBalance_")[2] for p in db.output.plantbalances];rhstermsupplybalancenames] - supplyvalues = hcat(db.output.production,rhstermsupplyvalues) + supplynames = [[TuLiPa.getinstancename(plant) for plant in db.output.plants]; rhstermsupplynames] + supplybalancenames = [[split(TuLiPa.getinstancename(p), "PowerBalance_")[2] for p in db.output.plantbalances]; rhstermsupplybalancenames] + supplyvalues = hcat(db.output.production, rhstermsupplyvalues) - demandnames = [[TuLiPa.getinstancename(demand) for demand in db.output.demands];rhstermdemandnames] - demandbalancenames = [[split(TuLiPa.getinstancename(p), "PowerBalance_")[2] for p in db.output.demandbalances];rhstermdemandbalancenames] + demandnames = [[TuLiPa.getinstancename(demand) for demand in db.output.demands]; rhstermdemandnames] + demandbalancenames = [[split(TuLiPa.getinstancename(p), "PowerBalance_")[2] for p in db.output.demandbalances]; rhstermdemandbalancenames] demandvalues = hcat(db.output.consumption, rhstermdemandvalues) # Prepare for plotting results @@ -1672,17 +1682,17 @@ function get_output_main_local() # Convert reservoir filling to TWh hydrolevels1 = copy(db.output.hydrolevels) - for (i,hydroname) in enumerate(hydronames) + for (i, hydroname) in enumerate(hydronames) if haskey(TuLiPa.getbalance(db.output.modelobjects[db.output.hydrostorages[i]]).metadata, TuLiPa.GLOBALENEQKEY) - hydrolevels1[:,i] .= hydrolevels1[:,i]*TuLiPa.getbalance(db.output.modelobjects[db.output.hydrostorages[i]]).metadata[TuLiPa.GLOBALENEQKEY] + hydrolevels1[:, i] .= hydrolevels1[:, i] * TuLiPa.getbalance(db.output.modelobjects[db.output.hydrostorages[i]]).metadata[TuLiPa.GLOBALENEQKEY] end end # Indexes TODO: Replace with generic (for each commodity, and for state) dim = get_outputindex(mainconfig, get_datayear(db), get_weatheryear(db)) - x1 = [TuLiPa.getisoyearstart(dim) + Week(mainconfig["weekstart"]-1) + periodduration_power*(t-1) for t in 1:first(size(supplyvalues))] # power/load resolution - x2 = [TuLiPa.getisoyearstart(dim) + Week(mainconfig["weekstart"]-1) + periodduration_hydro*(t-1) for t in 1:first(size(hydrolevels1))]; # reservoir resolution - x3 = [TuLiPa.getisoyearstart(dim) + Week(mainconfig["weekstart"]-1) + steplength*(t-1) for t in 1:steps]; # state resolution + x1 = [TuLiPa.getisoyearstart(dim) + Week(mainconfig["weekstart"] - 1) + periodduration_power * (t - 1) for t in 1:first(size(supplyvalues))] # power/load resolution + x2 = [TuLiPa.getisoyearstart(dim) + Week(mainconfig["weekstart"] - 1) + periodduration_hydro * (t - 1) for t in 1:first(size(hydrolevels1))] # reservoir resolution + x3 = [TuLiPa.getisoyearstart(dim) + Week(mainconfig["weekstart"] - 1) + steplength * (t - 1) for t in 1:steps] # state resolution outputformat = mainconfig["outputformat"] if outputformat != "juliadict" @@ -1698,18 +1708,18 @@ function get_output_main_local() data["resnames"] = hydronames data["resmatrix"] = hydrolevels1 - data["resindex"] = x2 + data["resindex"] = x2 if has_result_hydrolevels_water(settings) data["resmatrix_water"] = db.output.hydrolevels end data["batnames"] = batterynames data["batmatrix"] = db.output.batterylevels - data["batindex"] = x1 + data["batindex"] = x1 data["statenames"] = db.output.statenames data["statematrix"] = permutedims(db.output.statematrix) - data["stateindex"] = x3 + data["stateindex"] = x3 data["supplyvalues"] = supplyvalues data["supplynames"] = supplynames @@ -1722,8 +1732,8 @@ function get_output_main_local() if haskey(settings["results"], "otherterms") for key in keys(db.output.othervalues) for commodity in keys(db.output.othervalues[key]) - data["othernames_" * key * "_" * commodity] = [TuLiPa.getinstancename(id) for id in db.output.otherobjects[key][commodity]] |> Vector{String} - data["othervalues_" * key * "_" * commodity] = db.output.othervalues[key][commodity] + data["othernames_"*key*"_"*commodity] = [TuLiPa.getinstancename(id) for id in db.output.otherobjects[key][commodity]] |> Vector{String} + data["othervalues_"*key*"_"*commodity] = db.output.othervalues[key][commodity] end end end diff --git a/src/python_logger.jl b/src/python_logger.jl new file mode 100644 index 0000000..f2e61d3 --- /dev/null +++ b/src/python_logger.jl @@ -0,0 +1,53 @@ +using PythonCall +using Logging + +const _py_logging = Ref{Py}() +function py_logging() + if !isassigned(_py_logging) + _py_logging[] = pyimport("logging") + end + return _py_logging[] +end + +const _min_level = Ref{LogLevel}(Info) + +struct PythonLogBridge <: AbstractLogger end +Logging.shouldlog(::PythonLogBridge, level, _module, group, id) = true +Logging.min_enabled_level(::PythonLogBridge) = _min_level[] +Logging.catch_exceptions(::PythonLogBridge) = true + +function Logging.handle_message( + ::PythonLogBridge, level, message, _module, group, id, filepath, line; + kwargs... +) + log = py_logging() + name = _module !== nothing ? string(_module) : "julia" + pylevel = if level == Debug + 10 + elseif level == Info + 20 + elseif level == Warn + 30 + elseif level == Error + 40 + else + 50 + end + extra = pydict(Dict(string(k) => string(v) for (k, v) in kwargs)) + log.getLogger(name).log(pylevel, string(message), extra=extra) +end + +function use_python_logging!() + log = py_logging() + pylevel = pyconvert(Int, log.root.level) + _min_level[] = if pylevel <= 10 + Debug + elseif pylevel <= 20 + Info + elseif pylevel <= 30 + Warn + else + Error + end + global_logger(PythonLogBridge()) +end \ No newline at end of file diff --git a/src/run_jules_wrapper.jl b/src/run_jules_wrapper.jl index b5f2b08..bdededa 100644 --- a/src/run_jules_wrapper.jl +++ b/src/run_jules_wrapper.jl @@ -1,99 +1,103 @@ function getdataset(config, names, filename_clearing, filename_aggregated) - settings = config[config["main"]["settings"]] - - sti_dataset = joinpath(config["main"]["outputpath"]) - - clearing = JulES.JSON.parsefile(joinpath(sti_dataset, filename_clearing)) - clearing = JulES.TuLiPa.getelements(clearing) - - aggregated = JulES.JSON.parsefile(joinpath(sti_dataset, filename_aggregated)) - aggregated = JulES.TuLiPa.getelements(aggregated) - - timevectors = JulES.JSON.parsefile(joinpath(sti_dataset, names["FILENAME_DATAELEMENTS_TIMEVECTORS"] )) - timevectors = JulES.TuLiPa.getelements(timevectors, sti_dataset) - - elements = vcat(clearing, timevectors) - elements_ppp = vcat(aggregated, timevectors) - - storage_mapping = JulES.JSON.parsefile( - joinpath(sti_dataset, names["FILENAME_STORAGE_MAPPING"]), - dicttype=Dict{String, String}, - ) - - startmag_aggregated = JulES.JSON.parsefile( - joinpath(sti_dataset, names["FILENAME_START_STORAGES_AGGREGATED"]), - dicttype=Dict{String, Float64}, - ) - - startmag_clearing = JulES.JSON.parsefile( - joinpath(sti_dataset, names["FILENAME_START_STORAGES_CLEARING"]), - dicttype=Dict{String, Float64}, - ) - - return Dict( - "elements" => elements, - "elements_ppp" => elements_ppp, - "detailedrescopl" => storage_mapping, - "startmagdict" => startmag_clearing, - "aggstartmagdict" => startmag_aggregated, - ) + settings = config[config["main"]["settings"]] + + sti_dataset = joinpath(config["main"]["outputpath"]) + + clearing = JulES.JSON.parsefile(joinpath(sti_dataset, filename_clearing)) + clearing = JulES.TuLiPa.getelements(clearing) + + aggregated = JulES.JSON.parsefile(joinpath(sti_dataset, filename_aggregated)) + aggregated = JulES.TuLiPa.getelements(aggregated) + + timevectors = JulES.JSON.parsefile(joinpath(sti_dataset, names["FILENAME_DATAELEMENTS_TIMEVECTORS"])) + timevectors = JulES.TuLiPa.getelements(timevectors, sti_dataset) + + elements = vcat(clearing, timevectors) + elements_ppp = vcat(aggregated, timevectors) + + storage_mapping = JulES.JSON.parsefile( + joinpath(sti_dataset, names["FILENAME_STORAGE_MAPPING"]), + dicttype=Dict{String,String}, + ) + + startmag_aggregated = JulES.JSON.parsefile( + joinpath(sti_dataset, names["FILENAME_START_STORAGES_AGGREGATED"]), + dicttype=Dict{String,Float64}, + ) + + startmag_clearing = JulES.JSON.parsefile( + joinpath(sti_dataset, names["FILENAME_START_STORAGES_CLEARING"]), + dicttype=Dict{String,Float64}, + ) + + return Dict( + "elements" => elements, + "elements_ppp" => elements_ppp, + "detailedrescopl" => storage_mapping, + "startmagdict" => startmag_clearing, + "aggstartmagdict" => startmag_aggregated, + ) end function load_ifm_dep() - if myid() == 1 - function ensure_packages(pkgs::Vector{String}) - deps = values(Pkg.dependencies()) - not_installed = filter(pkg -> !any(d -> d.name == pkg, deps), pkgs) - if !isempty(not_installed) - println("Installing missing packages: ", join(not_installed, ", ")) - Pkg.add(not_installed) - else - println("All packages already installed.") - end - end - ensure_packages(["OrdinaryDiffEq", "ComponentArrays", "Interpolations", "JLD2"]) - end - - @everywhere begin - Pkg.instantiate() - Base.eval(Main, :(using OrdinaryDiffEq)) - Base.eval(Main, :(using ComponentArrays)) - Base.eval(Main, :(using Interpolations)) - Base.eval(Main, :(using JLD2)) - end + if myid() == 1 + function ensure_packages(pkgs::Vector{String}) + deps = values(Pkg.dependencies()) + not_installed = filter(pkg -> !any(d -> d.name == pkg, deps), pkgs) + if !isempty(not_installed) + @info "Installing missing packages: ", join(not_installed, ", ") + Pkg.add(not_installed) + else + @info "All packages already installed." + end + end + ensure_packages(["OrdinaryDiffEq", "ComponentArrays", "Interpolations", "JLD2"]) + end + + @everywhere begin + Pkg.instantiate() + Base.eval(Main, :(using OrdinaryDiffEq)) + Base.eval(Main, :(using ComponentArrays)) + Base.eval(Main, :(using Interpolations)) + Base.eval(Main, :(using JLD2)) + end end function run_jules( - config_path, - datayear, - weatheryear, - outputpath, - JulESNames, - filename_clearing, - filename_aggregated - ) - - config = YAML.load_file(config_path) - - dataset = getdataset( - config, - JulESNames, - filename_clearing, - filename_aggregated - ) - - input = JulES.DefaultJulESInput(config, dataset, datayear, weatheryear) - - has_ifm_results(input) && load_ifm_dep() - - @time data = JulES.run_serial(input) - println("Total serial time above") - - println("Save output") - @time h5open(outputpath, "w") do file - for (k,v) in data - println(k) - write(file, k, v) - end + config_path, + datayear, + weatheryear, + outputpath, + JulESNames, + filename_clearing, + filename_aggregated +) + + @info "Starting JulES run" datayear = datayear weatheryear = weatheryear outputpath = outputpath workers = nworkers() + + config = YAML.load_file(config_path) + + dataset = getdataset( + config, + JulESNames, + filename_clearing, + filename_aggregated + ) + + input = JulES.DefaultJulESInput(config, dataset, datayear, weatheryear) + + if has_ifm_results(input) + @info "Loading IFM dependencies" + load_ifm_dep() + else + @debug "IFM dependency loading skipped" reason = "direct mode" + end + + @debugtime "Run serial" data = JulES.run_serial(input) + @debugtime "Save output" h5open(outputpath, "w") do file + for (k, v) in data + @debug k + write(file, k, v) end + end end diff --git a/src/run_serial.jl b/src/run_serial.jl index a0b5bb1..27d8671 100644 --- a/src/run_serial.jl +++ b/src/run_serial.jl @@ -33,8 +33,8 @@ function run_serial(input::AbstractJulESInput) skipmed = Millisecond(0) end end - println(string("\nThe simulation took: ", round(totaltime/60; digits=2), " minutes")) - println(string("Time usage per simulation step: ", round(totaltime/steps; digits=2), " seconds\n")) + @info string("The simulation took: ", round(totaltime / 60; digits=2), " minutes") + @info string("Time usage per simulation step: ", round(totaltime / steps; digits=2), " seconds\n") output = get_output_final(steplength, skipmax) cleanup_jules(input) @@ -44,10 +44,12 @@ end function init_jules(input::AbstractJulESInput) (t, steps, steplength, skipmed, skipmax) = get_simperiod(input) + @info "Initialized JulES simulation" start_time = t steps = steps steplength_ms = steplength skipmax_ms = skipmax.value workers = nworkers() + init_extensions(input) init_databases(input) - + return (t, steps, steplength, skipmed, skipmax) end @@ -98,62 +100,53 @@ function init_databases(input::AbstractJulESInput) cores = get_cores(input) firstcore = first(cores) - println("Add local dbs") - @time begin + @debugtime "Add local dbs" begin @sync for core in cores @spawnat core create_local_db() end end - println("Add local cores") - @time begin + @debugtime "Add local cores" begin @sync for core in cores @spawnat core add_local_core(core) end end - println("Add local input") - @time begin + @debugtime "Add local input" begin @sync for core in cores @spawnat core add_local_input(input) end end - println("Add local dummyobjects") - @time begin + @debugtime "Add local dummyobjects" begin @sync for core in cores @spawnat core add_local_dummyobjects() end end - println("Add local subsystems") - @time begin + @debugtime "Add local subsystems" begin wait(@spawnat firstcore add_local_subsystems()) end - println("Add local scenmod") - @time begin + @debugtime "Add local scenmod" begin @sync for core in cores @spawnat core add_local_scenariomodelling() end end - + # will calculate distribution on core c and then # transfer this data to all other cores - println("Add local problem distribution") - @time begin + @debugtime "Add local problem distribution" begin wait(@spawnat firstcore add_local_problem_distribution()) end - println("Add local horizons") - @time begin + @debugtime "Add local horizons" begin @sync for core in cores @spawnat core add_local_horizons() end end - println("Add local problems") - @time begin + @debugtime "Add local problems" begin @sync for core in cores @spawnat core add_local_problems() end @@ -164,8 +157,7 @@ function init_databases(input::AbstractJulESInput) end end - println("Add local output") - @time begin + @debugtime "Add local output" begin add_local_output() end return @@ -226,8 +218,8 @@ function add_local_dummyobjects() end (dummyobjects, dummydeps) = TuLiPa.getmodelobjects(elements, validate=true, deps=true) aggzonedict = Dict() - for (k,v) in get_aggzone(get_settings(db)) - aggzonedict[TuLiPa.Id(TuLiPa.BALANCE_CONCEPT,"PowerBalance_" * k)] = [dummyobjects[TuLiPa.Id(TuLiPa.BALANCE_CONCEPT,"PowerBalance_" * vv)] for vv in v] + for (k, v) in get_aggzone(get_settings(db)) + aggzonedict[TuLiPa.Id(TuLiPa.BALANCE_CONCEPT, "PowerBalance_" * k)] = [dummyobjects[TuLiPa.Id(TuLiPa.BALANCE_CONCEPT, "PowerBalance_" * vv)] for vv in v] end TuLiPa.aggzone!(dummyobjects, aggzonedict) db.dummyobjects = (dummyobjects, dummydeps) @@ -328,14 +320,14 @@ function create_subsystems(db) push!(subsystems, subsystem) end num_shortterm = length(subsystems) - println("Number of shortterm storagesystems $num_shortterm") + @info "Number of shortterm storagesystems $num_shortterm" longtermstoragesystems = TuLiPa.getlongtermstoragesystems(storagesystems, Hour(settings["subsystems"]["shorttermstoragecutoff_hours"])) for storagesystem in longtermstoragesystems commodities = get_commodities_from_storagesystem(storagesystem) if length(commodities) == 1 continue # TODO: error and fix dataset linvasselv and vakkerjordvatn have two subsystems, one not connected to power market, send liste til Carl - end + end # all = Set() # for obj in storagesystem @@ -419,7 +411,7 @@ function create_subsystems(db) horizonterm_stoch = get_term_ppp(get_horizons(db.input), commodities, longstochduration) priceareas = get_priceareas(storagesystem) - skipmed_impact = true + skipmed_impact = true if has_longevduration(settings) longevduration = parse_duration(settings["subsystems"], "longevduration") horizonterm_evp = get_term_ppp(get_horizons(db.input), commodities, longevduration) @@ -430,9 +422,9 @@ function create_subsystems(db) end push!(subsystems, subsystem) end - println("Number of longterm storagesystems $(length(subsystems)-num_shortterm)") + @info "Number of longterm storagesystems $(length(subsystems)-num_shortterm)" num_ignored = length(shorttermstoragesystems) + length(longtermstoragesystems) - length(subsystems) - println("Number of ignored storagesystems not connected to power $num_ignored") + @info "Number of ignored storagesystems not connected to power $num_ignored" else error("getsubsystem() not implemented for $(method)") end @@ -447,7 +439,7 @@ function has_longevduration(settings) return true end end - return false + return false end # Which time resolution (short, med, long) should we use horizons and prices from @@ -464,7 +456,7 @@ function get_term_ppp(horizons, commodities, duration) end horizon_long = horizons[(LongTermName, dummycommodity)] @assert duration <= TuLiPa.getduration(horizon_long) # TODO: also account for slack in case of reuse of watervalues - return LongTermName + return LongTermName end function get_filtered_dependencies(elements, dependencies) @@ -495,7 +487,7 @@ function get_element_from_obj(dataelements::Vector{TuLiPa.DataElement}, obj::Any end end end - + function get_commodities_from_dataelements(elements::Vector{TuLiPa.DataElement}) commodities = CommodityName[] for element in elements @@ -534,7 +526,7 @@ end function set_local_subsystems(subsystems, subsystems_evp, subsystems_stoch) db = get_local_db() - + db.subsystems = subsystems db.subsystems_evp = subsystems_evp db.subsystems_stoch = subsystems_stoch @@ -605,20 +597,20 @@ function add_local_problem_distribution() if !get_onlysubsystemmodel(db.input) db.dist_ppp = get_dist_ppp(db.input) - println(db.dist_ppp) + @debug db.dist_ppp db.dist_evp = get_dist_evp(db.input, db.subsystems_evp) - println(db.dist_evp) + @debug db.dist_evp end db.dist_ifm = get_dist_ifm(db.input) - println(db.dist_ifm) + @debug db.dist_ifm (dist_mp, dist_sp) = get_dist_stoch(db.input, db.subsystems_stoch) - println(dist_mp) - println(dist_sp) + @debug dist_mp + @debug dist_sp db.dist_sp = dist_sp db.dist_mp = dist_mp db.core_main = get_core_main(db.input) - println(db.core_main) + @debug db.core_main dists = (db.dist_ifm, db.dist_ppp, db.dist_evp, db.dist_sp, db.dist_mp, db.core_main) @@ -642,7 +634,7 @@ function set_local_dists(dists) db.dist_sp = dist_sp db.dist_mp = dist_mp db.core_main = core_main - + return end @@ -661,7 +653,7 @@ function add_local_horizons() db = get_local_db() horizons = get_horizons(db.input) - d = Dict{Tuple{ScenarioIx, TermName, CommodityName}, TuLiPa.Horizon}() + d = Dict{Tuple{ScenarioIx,TermName,CommodityName},TuLiPa.Horizon}() if !get_onlysubsystemmodel(db.input) for (scenarioix, ownercore) in db.dist_ppp @@ -743,26 +735,23 @@ function step_jules(t, steplength, stepnr, skipmed) cores = get_cores(db) firstcore = first(cores) - println(t) - println("Garbage collection") - @time begin + @debug t stepnr = stepnr + @debugtime "Garbage collection" begin if mod(stepnr, 20) == 0 @sync for core in cores @spawnat core GC.gc() end end end - - println("Startstates") - @time begin + + @debugtime "Startstates" begin @sync for core in cores @spawnat core update_startstates(stepnr, t) end end - - println("Scenario modelling") - @time begin - if stepnr == 1 + + @debugtime "Scenario modelling" begin + if stepnr == 1 wait(@spawnat firstcore update_scenmod_sim()) end # TODO: Add option to do scenariomodelling per individual or group of subsystem (e.g per area, commodity ...) @@ -770,8 +759,7 @@ function step_jules(t, steplength, stepnr, skipmed) wait(@spawnat firstcore update_scenmod_stoch(t, skipmed)) end - println("Solve inflow models") - @time begin + @debugtime "Solve inflow models" begin @sync for core in cores @spawnat core solve_ifm(t, stepnr) end @@ -785,8 +773,7 @@ function step_jules(t, steplength, stepnr, skipmed) end end - println("Solve price prognosis problems") - @time begin + @debugtime "Solve price prognosis problems" begin @sync for core in cores @spawnat core solve_ppp(t, steplength, stepnr, skipmed) end @@ -795,23 +782,20 @@ function step_jules(t, steplength, stepnr, skipmed) @spawnat core synchronize_horizons(skipmed) end end - - println("End value problems") - @time begin + + @debugtime "End value problems" begin @sync for core in cores @spawnat core solve_evp(t, stepnr, skipmed) end end - println("Subsystem problems") - @time begin + @debugtime "Subsystem problems" begin @sync for core in cores @spawnat core solve_stoch(t, stepnr, skipmed) end end - println("Clearing problem") - @time begin + @debugtime "Clearing problem" begin if !get_onlysubsystemmodel(db.input) @sync for core in cores @spawnat core solve_cp(t, stepnr, skipmed) @@ -819,11 +803,10 @@ function step_jules(t, steplength, stepnr, skipmed) end end - println("Update output") - @time begin + @debugtime "Update output" begin wait(@spawnat db.core_main update_output(t, stepnr)) end - + # do dynamic load balancing here return end diff --git a/src/scenariomodelling.jl b/src/scenariomodelling.jl index 768474f..0072bc9 100644 --- a/src/scenariomodelling.jl +++ b/src/scenariomodelling.jl @@ -5,12 +5,12 @@ See abstract_types.jl for more """ struct NothingScenarioModellingMethod <: AbstractScenarioModellingMethod end -mutable struct NoScenarioModellingMethod{T <: AbstractScenario} <: AbstractScenarioModellingMethod +mutable struct NoScenarioModellingMethod{T<:AbstractScenario} <: AbstractScenarioModellingMethod scenarios::Vector{T} end # mutable struct ResidualLoadMethod <: ScenarioModellingMethod # choose scenario based on residual load (also energy inflow) # end -mutable struct InflowClusteringMethod{T <: AbstractScenario} <: AbstractScenarioModellingMethod +mutable struct InflowClusteringMethod{T<:AbstractScenario} <: AbstractScenarioModellingMethod scenarios::Vector{T} inflowfactors::Vector{Float64} objects::Vector @@ -24,7 +24,7 @@ mutable struct InflowClusteringMethod{T <: AbstractScenario} <: AbstractScenario end get_parts(method::InflowClusteringMethod{WeatherScenario}) = method.parts -mutable struct SumInflowQuantileMethod{T <: AbstractScenario} <: AbstractScenarioModellingMethod +mutable struct SumInflowQuantileMethod{T<:AbstractScenario} <: AbstractScenarioModellingMethod scenarios::Vector{T} inflowfactors::Vector{Float64} objects::Vector @@ -58,7 +58,7 @@ function set_changes(scenmod::NoScenarioModellingMethod, changes::Vector{Weather scenmod.scenarios = changes return end -function set_changes(scenmod::Union{SumInflowQuantileMethod{WeatherScenario},InflowClusteringMethod{WeatherScenario}}, changes::Tuple{Vector{WeatherScenario}, Vector{Float64}}) +function set_changes(scenmod::Union{SumInflowQuantileMethod{WeatherScenario},InflowClusteringMethod{WeatherScenario}}, changes::Tuple{Vector{WeatherScenario},Vector{Float64}}) scenarios, inflowfactors = changes scenmod.scenarios = scenarios @@ -66,7 +66,7 @@ function set_changes(scenmod::Union{SumInflowQuantileMethod{WeatherScenario},Inf return end -get_inflowfactors(scenmod::AbstractScenarioModellingMethod) = [1/length(scenmod.scenarios) for s in 1:length(scenmod.scenarios)] +get_inflowfactors(scenmod::AbstractScenarioModellingMethod) = [1 / length(scenmod.scenarios) for s in 1:length(scenmod.scenarios)] get_inflowfactors(scenmod::Union{SumInflowQuantileMethod{WeatherScenario},InflowClusteringMethod{WeatherScenario}}) = scenmod.inflowfactors """Choose scenarios from a larger set based on a method, and calculate weights and other parameters that result from the scenario modelling""" @@ -75,7 +75,7 @@ function choose_scenarios!(scenmod::SumInflowQuantileMethod{WeatherScenario}, sc scenariooptions = get_scenarios(scenmodmethodoptions) weightsoptions = [get_probability(scenario) for scenario in scenariooptions] factoroptions = get_inflowfactors(scenmodmethodoptions) - + # Calculate total energy inflow in the system for the scenariodelta totalsumenergyinflow = zeros(length(scenariooptions)) for obj in scenmod.objects @@ -88,7 +88,7 @@ function choose_scenarios!(scenmod::SumInflowQuantileMethod{WeatherScenario}, sc for rhsterm in TuLiPa.getrhsterms(obj) for (i, scenario) in enumerate(scenariooptions) time = get_scentnormal(simtime, scenario, input) - totalsumenergyinflow[i] += TuLiPa.getparamvalue(rhsterm, time, scenmod.scendelta)*enekvglobal*factoroptions[i] + totalsumenergyinflow[i] += TuLiPa.getparamvalue(rhsterm, time, scenmod.scendelta) * enekvglobal * factoroptions[i] end end end @@ -99,7 +99,7 @@ function choose_scenarios!(scenmod::SumInflowQuantileMethod{WeatherScenario}, sc n = fit(Normal, totalsumenergyinflow, weightsoptions) quantiles = [i for i in 1-scenmod.maxquantile:(2*scenmod.maxquantile-1)/(numscen-1):scenmod.maxquantile] # get quantiles from maxquantile qvalues = quantile.(n, quantiles) # get quantile values from distribution - + # Could also use probability density of the quantiles in the calculation of the weights if scenmod.usedensity d = pdf.(n, qvalues) # get probability density for each quantile @@ -110,21 +110,21 @@ function choose_scenarios!(scenmod::SumInflowQuantileMethod{WeatherScenario}, sc # How much should the inflow in the scenario be adjusted so that it is similar to the quantile? for i in 1:numscen qvalue = qvalues[i] - idx = findmin(abs.(totalsumenergyinflow.-qvalue))[2] + idx = findmin(abs.(totalsumenergyinflow .- qvalue))[2] scenmod.scenarios[i] = deepcopy(scenariooptions[idx]) - scenmod.inflowfactors[i] = qvalue/totalsumenergyinflow[idx] + scenmod.inflowfactors[i] = qvalue / totalsumenergyinflow[idx] end # How much should each scenario be weighted - combination of weighting function and probability density x = collect(-numscen+1:2:numscen-1) y = (scenmod.a .* x .^ 2 .+ x .* scenmod.b .+ scenmod.c) .* d for i in 1:numscen - scenmod.scenarios[i].p_weather = y[i]/sum(y) + scenmod.scenarios[i].p_weather = y[i] / sum(y) end scenariosum = sum([scenario.p_weather for scenario in scenmod.scenarios]) if !isapprox(scenariosum, 1, atol=0.0001) - println([scenario.p_weather for scenario in scenmod.scenarios]) + @error "Scenario probabilities do not sum to 1" scenarios = [scenario.p_weather for scenario in scenmod.scenarios] scenariosum = scenariosum error("Sum of scenarios not 1, it is $(scenariosum)") end return @@ -138,10 +138,10 @@ function choose_scenarios!(scenmod::InflowClusteringMethod{WeatherScenario}, sce # Calculate total energy inflow in the system for each part of the scenariodelta sumenergyinflow = zeros(length(scenariooptions)) - partsumenergyinflow = zeros(scenmod.parts ,length(scenariooptions)) - scendeltapart = scenmod.scendelta/scenmod.parts + partsumenergyinflow = zeros(scenmod.parts, length(scenariooptions)) + scendeltapart = scenmod.scendelta / scenmod.parts - parts = scenmod.parts + parts = scenmod.parts for obj in scenmod.objects if obj isa TuLiPa.BaseBalance @@ -153,9 +153,9 @@ function choose_scenarios!(scenmod::InflowClusteringMethod{WeatherScenario}, sce for rhsterm in TuLiPa.getrhsterms(obj) for (i, scenario) in enumerate(scenariooptions) time = get_scentnormal(simtime, scenario, input) - sumenergyinflow[i] += TuLiPa.getparamvalue(rhsterm, time, scenmod.scendelta)*enekvglobal*inflowfactoroptions[i] + sumenergyinflow[i] += TuLiPa.getparamvalue(rhsterm, time, scenmod.scendelta) * enekvglobal * inflowfactoroptions[i] for j in 1:parts - partsumenergyinflow[j,i] += TuLiPa.getparamvalue(rhsterm, time + scendeltapart*(j-1), scendeltapart)*enekvglobal*inflowfactoroptions[i] + partsumenergyinflow[j, i] += TuLiPa.getparamvalue(rhsterm, time + scendeltapart * (j - 1), scendeltapart) * enekvglobal * inflowfactoroptions[i] end end end @@ -181,19 +181,19 @@ function choose_scenarios!(scenmod::InflowClusteringMethod{WeatherScenario}, sce # Scenario in middle of cluster clustersumenergyinflows = sumenergyinflow[idxs] meanclustersumenergyinflow = mean(clustersumenergyinflows) - clusteridx = findmin(x->abs(x-meanclustersumenergyinflow), clustersumenergyinflows)[2] + clusteridx = findmin(x -> abs(x - meanclustersumenergyinflow), clustersumenergyinflows)[2] totalidx = findfirst(x -> x == clustersumenergyinflows[clusteridx], sumenergyinflow) scenmod.scenarios[i] = deepcopy(scenariooptions[totalidx]) # Adjust scenario to represent actual middle of cluster - scenmod.inflowfactors[i] = meanclustersumenergyinflow/sumenergyinflow[totalidx] + scenmod.inflowfactors[i] = meanclustersumenergyinflow / sumenergyinflow[totalidx] # Weight based on amount of scenarios in cluster and weight of options scenmod.scenarios[i].p_weather = sum(weightsoptions[idxs]) end scenariosum = sum([scenario.p_weather for scenario in scenmod.scenarios]) if !isapprox(scenariosum, 1, atol=0.0001) - println([scenario.p_weather for scenario in scenmod.scenarios]) + @error "Scenario probabilities do not sum to 1" scenarios = [scenario.p_weather for scenario in scenmod.scenarios] scenariosum = scenariosum error("Sum of scenarios not 1, it is $(scenariosum)") end