From 3125744755ec7520c0a98e723c64d012a2692cb8 Mon Sep 17 00:00:00 2001 From: Philipp Geier Date: Thu, 7 May 2026 12:54:25 +0000 Subject: [PATCH] Fixes and metadata changes to make ocean work with mtg2 keys --- src/multio/action/encode-mtg2/EncodeMtg2.cc | 9 +- src/multio/action/mask/Mask.cc | 4 +- .../single-field-sink/SingleFieldSink.cc | 2 +- .../action/statistics-mtg2/OperationWindow.cc | 24 +- .../action/statistics-mtg2/Statistics.cc | 59 ++- .../statistics-mtg2/TemporalStatistics.cc | 4 + .../statistics-mtg2/TemporalStatistics.h | 2 + .../cfg/StatisticsConfiguration.cc | 5 +- .../statistics-mtg2/cfg/StatisticsOptions.cc | 13 +- .../statistics-mtg2/cfg/StatisticsOptions.h | 6 +- .../period-updaters/DayPeriodUpdater.h | 6 +- .../period-updaters/HourPeriodUpdater.h | 4 +- .../period-updaters/MonthPeriodUpdater.h | 4 +- .../period-updaters/PeriodUpdater.h | 2 +- src/multio/action/transport/Transport.cc | 33 +- src/multio/action/transport/Transport.h | 2 +- src/multio/domain/Mask.cc | 2 +- .../multio-replay-nemo-capi-partial-agg.cc | 4 +- src/multio/tools/multio-replay-nemo-capi.cc | 4 +- src/multio/tools/multio-replay-nemo-fapi.f90 | 2 +- .../action/statistics-mtg2/CMakeLists.txt | 7 + .../ClimDTNoInitialCondition.cc | 405 ++++++++++++++++++ 22 files changed, 508 insertions(+), 95 deletions(-) create mode 100644 tests/multio/action/statistics-mtg2/ClimDTNoInitialCondition.cc diff --git a/src/multio/action/encode-mtg2/EncodeMtg2.cc b/src/multio/action/encode-mtg2/EncodeMtg2.cc index a15d3a56f..0466b442f 100644 --- a/src/multio/action/encode-mtg2/EncodeMtg2.cc +++ b/src/multio/action/encode-mtg2/EncodeMtg2.cc @@ -46,8 +46,8 @@ std::unique_ptr encode(metkit::mars2grib::Mars2Grib& T* values, size_t size, const dm::FullMarsRecord& marsRec, const dm::MiscRecord& miscRec) { const auto mars = dm::dumpRecord(marsRec); - const auto misc = dm::dumpRecord(miscRec); - + const auto misc = dm::dumpUnscopedRecord(miscRec); + if (!cache) { return encoder.encode(values, size, mars, misc); } @@ -88,10 +88,6 @@ void EncodeMtg2::executeImpl(Message msg) { // Apply mappings auto mappingResult = mars2mars::applyMappings(mars2mars::allRules(), marsRec, miscRec); - // Dump (mapped) mars and misc keys to local configurations - const auto mars = dm::dumpRecord(marsRec); - const auto misc = dm::dumpUnscopedRecord(miscRec); - executeNext(dispatchPrecisionTag(msg.precision(), [&](auto pt) { using Precision = typename decltype(pt)::type; msg.payload().acquire(); @@ -123,7 +119,6 @@ void EncodeMtg2::executeImpl(Message msg) { eckit::Buffer buf{sample->messageSize()}; sample->copyInto(reinterpret_cast(buf.data()), buf.size()); - // TODO(pgeier) write mapped metadata return Message{Message::Header{Message::Tag::Field, Peer{msg.source()}, Peer{msg.destination()}, dm::dumpRecord(marsRec)}, std::move(buf)}; diff --git a/src/multio/action/mask/Mask.cc b/src/multio/action/mask/Mask.cc index 94bdc58ca..041b8aa62 100644 --- a/src/multio/action/mask/Mask.cc +++ b/src/multio/action/mask/Mask.cc @@ -69,8 +69,8 @@ message::Message Mask::createMasked(message::Message msg) const { } message::Metadata& md = msg.modifyMetadata(); - md.set("missingValue", missingValue_); - md.set("bitmapPresent", true); + md.set("misc-missingValue", missingValue_); + md.set("misc-bitmapPresent", true); return msg; } diff --git a/src/multio/action/single-field-sink/SingleFieldSink.cc b/src/multio/action/single-field-sink/SingleFieldSink.cc index 04dc783f5..4497a3543 100644 --- a/src/multio/action/single-field-sink/SingleFieldSink.cc +++ b/src/multio/action/single-field-sink/SingleFieldSink.cc @@ -65,7 +65,7 @@ void SingleFieldSink::write(Message msg) { std::ostringstream oss; - oss << rootPath_ << msg.metadata().get("level") << "::" << paramOrId + oss << rootPath_ << msg.metadata().get("levelist") << "::" << paramOrId << "::" << msg.metadata().get("step"); eckit::LocalConfiguration config; diff --git a/src/multio/action/statistics-mtg2/OperationWindow.cc b/src/multio/action/statistics-mtg2/OperationWindow.cc index ebd44edfc..007186035 100644 --- a/src/multio/action/statistics-mtg2/OperationWindow.cc +++ b/src/multio/action/statistics-mtg2/OperationWindow.cc @@ -61,7 +61,8 @@ eckit::DateTime yyyymmdd_hhmmss2DateTime(uint64_t yyyymmdd, uint64_t hhmmss) { OperationWindow make_window(const std::unique_ptr& periodUpdater, const StatisticsConfiguration& cfg) { // Note: A subtraction eckit::DateTime - eckit::Second yields eckit::Second instead of eckit::DateTime - // We do our calculations based on a difference since an arbitrary epoch (1st of January in the year 0) as a workarounds + // We do our calculations based on a difference since an arbitrary epoch (1st of January in the year 0) as a + // workarounds eckit::DateTime epoch{eckit::Date{0000, 01, 01}, eckit::Time{00, 00, 00}}; eckit::Second deltaCurr = cfg.curr() - epoch; eckit::Second deltaStart = deltaCurr - eckit::Second{cfg.timespan().value_or(0) * 3600.0}; @@ -70,7 +71,8 @@ OperationWindow make_window(const std::unique_ptr& periodUpdater, eckit::DateTime startPoint{periodUpdater->computeWinStartTime(epoch + deltaStart)}; eckit::DateTime creationPoint{periodUpdater->computeWinCreationTime(epoch + deltaStart)}; eckit::DateTime endPoint{periodUpdater->computeWinEndTime(startPoint)}; - return OperationWindow{epochPoint, startPoint, creationPoint, endPoint, cfg.timeIncrementInSeconds(), cfg.options().windowType()}; + return OperationWindow{ + epochPoint, startPoint, creationPoint, endPoint, cfg.timeIncrementInSeconds(), cfg.options().windowType()}; }; OperationWindow load_window(std::shared_ptr& IOmanager, const StatisticsOptions& opt) { @@ -195,21 +197,21 @@ bool OperationWindow::isWithin(const eckit::DateTime& dt) const { } bool OperationWindow::gtLowerBound(const eckit::DateTime& dt, bool throw_error) const { - if (throw_error && creationPoint_ >= dt) { + if (throw_error && startPoint_ >= dt) { std::ostringstream os; os << *this << " : " << dt << " is outside of current period : lower Bound violation" << std::endl; throw eckit::SeriousBug(os.str(), Here()); } - return dt > creationPoint_; + return dt > startPoint_; }; bool OperationWindow::geLowerBound(const eckit::DateTime& dt, bool throw_error) const { - if (throw_error && creationPoint_ > dt) { + if (throw_error && startPoint_ > dt) { std::ostringstream os; os << *this << " : " << dt << " is outside of current period : lower Bound violation" << std::endl; throw eckit::SeriousBug(os.str(), Here()); } - return dt >= creationPoint_; + return dt >= startPoint_; }; bool OperationWindow::leUpperBound(const eckit::DateTime& dt, bool throw_error) const { @@ -463,7 +465,8 @@ void OperationWindow::serialize(IOBuffer& currState, const std::string& fname, c outFile << "timeIncrementInSeconds_ :: " << timeIncrementInSeconds_ << std::endl; outFile << "count_ :: " << count_ << std::endl; outFile << "counts_.size() :: " << counts_.size() << std::endl; - outFile << "windowType_ :: " << (windowType_ == WindowType::ForwardOffset ? "forward-offset" : "backward-offset") << std::endl; + outFile << "windowType_ :: " + << (windowType_ == WindowType::ForwardOffset ? "forward-offset" : "backward-offset") << std::endl; outFile.close(); } @@ -495,7 +498,7 @@ void OperationWindow::serialize(IOBuffer& currState, const std::string& fname, c const size_t countsSize = counts_.size(); currState[17] = static_cast(countsSize); for (size_t i = 0; i < countsSize; ++i) { - currState[i+18] = static_cast(counts_[i]); + currState[i + 18] = static_cast(counts_[i]); } currState.computeChecksum(); @@ -520,7 +523,7 @@ void OperationWindow::deserialize(const IOBuffer& currState, const std::string& const auto countsSize = static_cast(currState[17]); counts_.resize(countsSize); for (size_t i = 0; i < countsSize; ++i) { - counts_[i] = static_cast(currState[i+18]); + counts_[i] = static_cast(currState[i + 18]); } if (opt.debugRestart()) { @@ -535,7 +538,8 @@ void OperationWindow::deserialize(const IOBuffer& currState, const std::string& outFile << "timeIncrementInSeconds_ :: " << timeIncrementInSeconds_ << std::endl; outFile << "count_ :: " << count_ << std::endl; outFile << "counts_.size() :: " << counts_.size() << std::endl; - outFile << "windowType_ :: " << (windowType_ == WindowType::ForwardOffset ? "forward-offset" : "backward-offset") << std::endl; + outFile << "windowType_ :: " + << (windowType_ == WindowType::ForwardOffset ? "forward-offset" : "backward-offset") << std::endl; outFile.close(); } diff --git a/src/multio/action/statistics-mtg2/Statistics.cc b/src/multio/action/statistics-mtg2/Statistics.cc index c85cb08f1..e314a2bb9 100644 --- a/src/multio/action/statistics-mtg2/Statistics.cc +++ b/src/multio/action/statistics-mtg2/Statistics.cc @@ -45,7 +45,8 @@ Statistics::Statistics(const ComponentConfiguration& compConf) : operationMapping_{StatisticsOperationMapping::makeStatisticsOperationMapping()}, IOmanager_{StatisticsIOFactory::instance().build(opt_.restartLib(), opt_.restartPath(), opt_.restartPrefix())} {} -std::string Statistics::generateRestartNameFromFlush(const message::Message& msg, const FlushMetadataKeys& flush) const { +std::string Statistics::generateRestartNameFromFlush(const message::Message& msg, + const FlushMetadataKeys& flush) const { std::string folderName; @@ -241,8 +242,6 @@ bool Statistics::HasRestartKey(const std::string& key) { } - - void Statistics::updateLatestDateTime(const StatisticsConfiguration& cfg) { std::ostringstream tmp; @@ -318,10 +317,10 @@ void Statistics::executeImpl(message::Message msg) { } // The incomming message must occur AFTER the current point in the window! - if (cfg.curr() <= ts.cwin().currPoint()) { + if (cfg.curr() < ts.cwin().currPoint()) { std::ostringstream os; - os << "Current time is before or equal to the current point in the window :: " << cfg.curr() << " > " - << ts.cwin().currPoint() << std::endl; + os << "Current time is before or equal to the current point in the window :: " << cfg.curr() + << " <= " << ts.cwin().currPoint() << ". Message: " << msg << std::endl; throw eckit::SeriousBug(os.str(), Here()); } @@ -331,6 +330,7 @@ void Statistics::executeImpl(message::Message msg) { ts.updateWindow(msg, cfg); } + // Update data ts.updateData(msg, cfg); @@ -412,7 +412,10 @@ void Statistics::emitStatistics(TemporalStatistics& ts, message::Peer source, me auto opname = (*it)->operation(); if (opname != "instant") { if (currentLoop == 1) { - const std::int64_t timespan = ts.win().currPointInHours() - ts.win().creationPointInHours(); + const std::int64_t timespan + = ts.win().currPointInHours() + - ((ts.periodUpdater().timeUnit() == "month") ? ts.win().creationPointInHours() + : ts.win().startPointInHours()); dm::dumpEntry(dm::TIMESPAN, dm::TIMESPAN.makeEntry(timespan), md); paramMapping_.applyMapping(md, opname, !opt_.disableStrictMapping()); } @@ -430,7 +433,9 @@ void Statistics::emitStatistics(TemporalStatistics& ts, message::Peer source, me Here()); } // Squash means we don't map (already done in previous loop), but extend the timespan - timespan.set(ts.win().currPointInHours() - ts.win().creationPointInHours()); + timespan.set(ts.win().currPointInHours() + - ((ts.periodUpdater().timeUnit() == "month") ? ts.win().creationPointInHours() + : ts.win().startPointInHours())); dm::dumpEntry(dm::TIMESPAN, timespan, md); } else { @@ -460,29 +465,41 @@ void Statistics::emitStatistics(TemporalStatistics& ts, message::Peer source, me // For instant fields or on flushes, timespan is not set yet if (!lengthOfWindow.isSet()) { - // The window spaws between creationPoint to endPoint + // The window spaws between startingPoint, creationPoint to endPoint. // Prev & Current point describe the last updated data points. - // In this case we are explicitly interested in creation to current point - lengthOfWindow.set(ts.win().currPointInHours() - ts.win().creationPointInHours()); + // CreationPoint describes the time which the window is created - this can lay within a window (i.e. + // mid of a day, or month) whereas the startingPoint is explicitly the start of the window that can + // then at time 0 of a day or explicitly the first day of a month etc... + if (ts.periodUpdater().timeUnit() == "month") { + // For months we emit from the creation point - if a simulation is started in the mid, not the + // whole month should be considered. + lengthOfWindow.set(ts.win().currPointInHours() - ts.win().creationPointInHours()); + } + else { + // For days we emit from the starting point - if the initial condition is not send (i.e. for + // ocean), the window often starts at hour 1 instead of 0 In this case we explicitly want it to + // start at 0 although first data arrived at hour 1 + lengthOfWindow.set(ts.win().currPointInHours() - ts.win().startPointInHours()); + } } dm::dumpEntry(dm::STEP, dm::STEP.makeEntry(lengthOfWindow.get().toHours()), md); - // We explicitly take the creation point - alternative would be the start point. - // The start point may be different for the first window, i.e. if the simulation starts in the mid of a month. - // To not confuse the output, we explicitly just output the window for which data has been received. - // As discussed with DGOV and scientist, half months are typically not of interest and should be ignored. - // Some additional mechanism has to make sure that these do not occur in the output (i.e. additional action). - auto dt = ts.win().creationPoint(); + // We explicitly take the creation point for months and the start point for days (read comment above). + // The start point may be different for the first window, i.e. if the simulation starts in the mid of a + // month. To not confuse the output, we explicitly just output the window for which data has been + // received. As discussed with DGOV and scientist, half months are typically not of interest and should + // be ignored. Some additional mechanism has to make sure that these do not occur in the output (i.e. + // additional action). + auto dt = (ts.periodUpdater().timeUnit() == "month") ? ts.win().creationPoint() : ts.win().startPoint(); dm::dumpEntry(dm::DATE, dm::DATE.makeEntry(dt.date().yyyymmdd()), md); - dm::dumpEntry(dm::TIME, dm::TIME.makeEntry(dt.time().hhmmss()), md); // Official MARS time is in hhmm, in multio hhmmss is used + dm::dumpEntry(dm::TIME, dm::TIME.makeEntry(dt.time().hhmmss()), + md); // Official MARS time is in hhmm, in multio hhmmss is used break; } } - for (const auto& kv : opt_.setMetadata()) { - md.set(kv.first, kv.second); - } + md.updateOverwrite(opt_.setMetadata()); (*it)->compute(payload, cfg); executeNext( diff --git a/src/multio/action/statistics-mtg2/TemporalStatistics.cc b/src/multio/action/statistics-mtg2/TemporalStatistics.cc index b176ef674..bda2597b1 100644 --- a/src/multio/action/statistics-mtg2/TemporalStatistics.cc +++ b/src/multio/action/statistics-mtg2/TemporalStatistics.cc @@ -41,6 +41,10 @@ TemporalStatistics::TemporalStatistics(std::shared_ptr& IOmanager, LOG_DEBUG_LIB(LibMultio) << opt.logPrefix() << " *** Load restart files" << std::endl; } +const PeriodUpdater& TemporalStatistics::periodUpdater() const { + return *periodUpdater_.get(); +} + void TemporalStatistics::dump(std::shared_ptr& IOmanager, const StatisticsOptions& opt) const { LOG_DEBUG_LIB(LibMultio) << opt.logPrefix() << " *** Dump restart files" << std::endl; diff --git a/src/multio/action/statistics-mtg2/TemporalStatistics.h b/src/multio/action/statistics-mtg2/TemporalStatistics.h index f99fe5d8f..2e22a07f9 100644 --- a/src/multio/action/statistics-mtg2/TemporalStatistics.h +++ b/src/multio/action/statistics-mtg2/TemporalStatistics.h @@ -40,6 +40,8 @@ class TemporalStatistics { message::Metadata& metadata(); void print(std::ostream& os) const; + + const PeriodUpdater& periodUpdater() const; private: std::unique_ptr periodUpdater_; diff --git a/src/multio/action/statistics-mtg2/cfg/StatisticsConfiguration.cc b/src/multio/action/statistics-mtg2/cfg/StatisticsConfiguration.cc index f74708a4e..7f81ee559 100644 --- a/src/multio/action/statistics-mtg2/cfg/StatisticsConfiguration.cc +++ b/src/multio/action/statistics-mtg2/cfg/StatisticsConfiguration.cc @@ -72,10 +72,9 @@ OutputTimeReference readOutputTimeReference(const FieldMetadataKeys& md, const S if (!stream) { // Look for stream in options const auto& omd = opt.setMetadata(); - auto it = std::find_if(omd.begin(), omd.end(), [](const auto& pair) { return pair.first == "stream"; }); - if (it != omd.end()) { - stream = it->second; + if (auto it = omd.find("stream"); it != omd.end()) { + stream = it->second.get(); } } diff --git a/src/multio/action/statistics-mtg2/cfg/StatisticsOptions.cc b/src/multio/action/statistics-mtg2/cfg/StatisticsOptions.cc index fc216b24b..f55d3ad59 100644 --- a/src/multio/action/statistics-mtg2/cfg/StatisticsOptions.cc +++ b/src/multio/action/statistics-mtg2/cfg/StatisticsOptions.cc @@ -138,18 +138,11 @@ std::optional parseOutputTimeRef(const eckit::LocalConfigur throw eckit::UserError(os.str(), Here()); } -std::vector> parseSetMetadata(const eckit::LocalConfiguration& cfg) { +message::Metadata parseSetMetadata(const eckit::LocalConfiguration& cfg) { if (!cfg.has("set-metadata")) { return {}; } - - auto subCfg = cfg.getSubConfiguration("set-metadata"); - std::vector> res; - for (auto key : subCfg.keys()) { - auto value = subCfg.getString(key); - res.emplace_back(std::pair(key, value)); - } - return res; + return message::toMetadata(cfg.getSubConfiguration("set-metadata")); } @@ -214,7 +207,7 @@ bool StatisticsOptions::disableStrictMapping() const { bool StatisticsOptions::disableSquashing() const { return disableSquashing_; } -const std::vector>& StatisticsOptions::setMetadata() const { +const message::Metadata& StatisticsOptions::setMetadata() const { return setMetadata_; } diff --git a/src/multio/action/statistics-mtg2/cfg/StatisticsOptions.h b/src/multio/action/statistics-mtg2/cfg/StatisticsOptions.h index a3467154b..6689f6fc7 100644 --- a/src/multio/action/statistics-mtg2/cfg/StatisticsOptions.h +++ b/src/multio/action/statistics-mtg2/cfg/StatisticsOptions.h @@ -6,6 +6,8 @@ #include "eckit/config/LocalConfiguration.h" +#include "multio/message/Metadata.h" + namespace multio::action::statistics_mtg2 { @@ -45,7 +47,7 @@ class StatisticsOptions { const bool disableStrictMapping_; const bool disableSquashing_; - const std::vector> setMetadata_; + const message::Metadata setMetadata_; const std::optional outputTimeReference_; @@ -69,7 +71,7 @@ class StatisticsOptions { bool disableStrictMapping() const; bool disableSquashing() const; - const std::vector>& setMetadata() const; + const message::Metadata& setMetadata() const; std::optional outputTimeReference() const; }; diff --git a/src/multio/action/statistics-mtg2/period-updaters/DayPeriodUpdater.h b/src/multio/action/statistics-mtg2/period-updaters/DayPeriodUpdater.h index e45be38b1..4d2ebd88d 100644 --- a/src/multio/action/statistics-mtg2/period-updaters/DayPeriodUpdater.h +++ b/src/multio/action/statistics-mtg2/period-updaters/DayPeriodUpdater.h @@ -34,11 +34,7 @@ class DayPeriodUpdater final : public PeriodUpdater { return os.str(); }; - const std::string timeUnit() const { - std::ostringstream os; - os << "day"; - return os.str(); - }; + const std::string timeUnit() const { return "day"; }; eckit::DateTime computeWinStartTime(const eckit::DateTime& nextTime) const { const auto& d = nextTime.date(); diff --git a/src/multio/action/statistics-mtg2/period-updaters/HourPeriodUpdater.h b/src/multio/action/statistics-mtg2/period-updaters/HourPeriodUpdater.h index a018970fa..2af951887 100644 --- a/src/multio/action/statistics-mtg2/period-updaters/HourPeriodUpdater.h +++ b/src/multio/action/statistics-mtg2/period-updaters/HourPeriodUpdater.h @@ -33,9 +33,7 @@ class HourPeriodUpdater final : public PeriodUpdater { }; const std::string timeUnit() const { - std::ostringstream os; - os << "hour"; - return os.str(); + return "hour"; }; eckit::DateTime computeWinStartTime(const eckit::DateTime& nextTime) const { diff --git a/src/multio/action/statistics-mtg2/period-updaters/MonthPeriodUpdater.h b/src/multio/action/statistics-mtg2/period-updaters/MonthPeriodUpdater.h index dfa861f2f..265f0a586 100644 --- a/src/multio/action/statistics-mtg2/period-updaters/MonthPeriodUpdater.h +++ b/src/multio/action/statistics-mtg2/period-updaters/MonthPeriodUpdater.h @@ -29,9 +29,7 @@ class MonthPeriodUpdater final : public PeriodUpdater { }; const std::string timeUnit() const { - std::ostringstream os; - os << "month"; - return os.str(); + return "month"; }; eckit::DateTime computeWinStartTime(const eckit::DateTime& nextTime) const { diff --git a/src/multio/action/statistics-mtg2/period-updaters/PeriodUpdater.h b/src/multio/action/statistics-mtg2/period-updaters/PeriodUpdater.h index 2776234ee..67077a400 100644 --- a/src/multio/action/statistics-mtg2/period-updaters/PeriodUpdater.h +++ b/src/multio/action/statistics-mtg2/period-updaters/PeriodUpdater.h @@ -87,4 +87,4 @@ class PeriodUpdater { } }; -} // namespace multio::action::statistics_mtg2 \ No newline at end of file +} // namespace multio::action::statistics_mtg2 diff --git a/src/multio/action/transport/Transport.cc b/src/multio/action/transport/Transport.cc index c7acf9ec5..b3f1a6117 100644 --- a/src/multio/action/transport/Transport.cc +++ b/src/multio/action/transport/Transport.cc @@ -32,7 +32,7 @@ std::vector getHashKeys(const eckit::Configuration& conf) { if (conf.has("hash-keys")) { return conf.getStringVector("hash-keys"); } - return std::vector{"category", "name", "level"}; + return std::vector{"param", "levtype", "levelist"}; } } // namespace @@ -77,36 +77,29 @@ void Transport::print(std::ostream& os) const { message::Peer Transport::chooseServer(const message::Metadata& metadata) { ASSERT_MSG(serverCount_ > 0, "No server to choose from"); - auto getMetadataValue = [&](const std::string& hashKey) -> const message::MetadataValue& { + auto hashMetadataValue = [&](const std::string& hashKey) -> size_t { auto searchHashKey = metadata.find(hashKey); if (searchHashKey == metadata.end()) { std::ostringstream os; os << "The hash key \"" << hashKey << "\" is not defined in the metadata object: " << metadata << std::endl; throw multio::transport::TransportException(os.str(), Here()); } - return searchHashKey->second; + return std::hash{}(searchHashKey->second); }; auto constructHash = [&]() { - std::ostringstream os; - + size_t ret = 0; for (const std::string& s : hashKeys_) { - getMetadataValue(s).visit(eckit::Overloaded{ - [&s](const auto& v) -> util::IfTypeNotOf { - throw message::MetadataWrongTypeException(s, Here()); - }, - [&os](const auto& v) -> util::IfTypeOf { os << v; }, - }); + ret = util::hashAppend(ret, hashMetadataValue(s)); } - return os.str(); + return ret; }; switch (distType_) { case DistributionType::hashed_cyclic: { - std::string hashString = constructHash(); ASSERT(usedServerCount_ <= serverCount_); - auto offset = std::hash{}(hashString) % usedServerCount_; + auto offset = constructHash() % usedServerCount_; auto id = (serverId_ + offset) % serverCount_; ASSERT(id < serverPeers_.size()); @@ -114,18 +107,18 @@ message::Peer Transport::chooseServer(const message::Metadata& metadata) { return *serverPeers_[id]; } case DistributionType::hashed_to_single: { - std::string hashString = constructHash(); - auto id = std::hash{}(hashString) % serverCount_; + auto id = constructHash() % serverCount_; ASSERT(id < serverPeers_.size()); return *serverPeers_[id]; } case DistributionType::even: { - std::string hashString = constructHash(); + auto hash = constructHash(); - if (destinations_.find(hashString) != end(destinations_)) { - return destinations_.at(hashString); + if (auto searchDest = destinations_.find(hash); searchDest != end(destinations_)) { + return searchDest->second; + ; } auto it = std::min_element(begin(counters_), end(counters_)); @@ -137,7 +130,7 @@ message::Peer Transport::chooseServer(const message::Metadata& metadata) { ++counters_[id]; auto dest = *serverPeers_[id]; - destinations_[hashString] = *serverPeers_[id]; + destinations_[hash] = *serverPeers_[id]; return dest; } diff --git a/src/multio/action/transport/Transport.h b/src/multio/action/transport/Transport.h index 1d24cad0e..2411b6e4b 100644 --- a/src/multio/action/transport/Transport.h +++ b/src/multio/action/transport/Transport.h @@ -48,7 +48,7 @@ class Transport : public Action { // Distribute fields message::Peer chooseServer(const message::Metadata& metadata); - std::map destinations_; + std::map destinations_; std::vector counters_; enum class DistributionType : unsigned diff --git a/src/multio/domain/Mask.cc b/src/multio/domain/Mask.cc index f1b1a635e..c87bdaca0 100644 --- a/src/multio/domain/Mask.cc +++ b/src/multio/domain/Mask.cc @@ -26,7 +26,7 @@ Mask& Mask::instance() { } std::string Mask::key(const message::Metadata& md) { - return "(" + md.get("domain") + "," + std::to_string(md.get("level")) + ")"; + return "(" + md.get("domain") + "," + std::to_string(md.get("levelist")) + ")"; } void Mask::add(message::Message msg) { diff --git a/src/multio/tools/multio-replay-nemo-capi-partial-agg.cc b/src/multio/tools/multio-replay-nemo-capi-partial-agg.cc index 010ed9824..46bf5d45e 100644 --- a/src/multio/tools/multio-replay-nemo-capi-partial-agg.cc +++ b/src/multio/tools/multio-replay-nemo-capi-partial-agg.cc @@ -262,7 +262,7 @@ void MultioReplayNemoCApi::writeMasks() { multio_metadata_set_string(md, "category", "ocean-mask"); multio_metadata_set_int(md, "misc-globalSize", globalSize_); - multio_metadata_set_int(md, "level", level_); + multio_metadata_set_int(md, "levelist", level_); multio_metadata_set_bool(md, "toAllServers", true); multio_write_mask(multio_handle, md, masks.data(), masks.size()); @@ -309,7 +309,7 @@ void MultioReplayNemoCApi::writeFields() { // Set reused fields once at the beginning multio_metadata_set_string(md, "category", "ocean-2d"); multio_metadata_set_int(md, "misc-globalSize", globalSize_); - multio_metadata_set_int(md, "level", level_); + multio_metadata_set_int(md, "levelist", level_); multio_metadata_set_int(md, "step", step_); multio_metadata_set_bool(md, "bitmapPresent", false); diff --git a/src/multio/tools/multio-replay-nemo-capi.cc b/src/multio/tools/multio-replay-nemo-capi.cc index 901db9319..564e3e93c 100644 --- a/src/multio/tools/multio-replay-nemo-capi.cc +++ b/src/multio/tools/multio-replay-nemo-capi.cc @@ -262,7 +262,7 @@ void MultioReplayNemoCApi::writeMasks() { multio_metadata_set_string(md, "category", "ocean-mask"); // Global size has been set through parametrization // multio_metadata_set_int(md, "globalSize", globalSize_); - multio_metadata_set_int(md, "level", level_); + multio_metadata_set_int(md, "levelist", level_); multio_metadata_set_bool(md, "toAllServers", true); multio_write_mask(multio_handle, md, masks.data(), masks.size()); @@ -310,7 +310,7 @@ void MultioReplayNemoCApi::writeFields() { multio_metadata_set_string(md, "category", "ocean-2d"); // globalSize has been set through parametrization // multio_metadata_set_int(md, "globalSize", globalSize_); - multio_metadata_set_int(md, "level", level_); + multio_metadata_set_int(md, "levelist", level_); multio_metadata_set_int(md, "step", step_); // To mimic nemoV4; it will be overwritten diff --git a/src/multio/tools/multio-replay-nemo-fapi.f90 b/src/multio/tools/multio-replay-nemo-fapi.f90 index b7b9a5d2a..9c4dc30af 100644 --- a/src/multio/tools/multio-replay-nemo-fapi.f90 +++ b/src/multio/tools/multio-replay-nemo-fapi.f90 @@ -380,7 +380,7 @@ subroutine write_fields(mio, rank, client_count, nemo_parameters, grib_param_id, if (cerr /= MULTIO_SUCCESS) ERROR STOP 20 cerr = md%set_int("misc-globalSize", global_size) if (cerr /= MULTIO_SUCCESS) ERROR STOP 21 - cerr = md%set_int("level", level) + cerr = md%set_int("levelist", level) if (cerr /= MULTIO_SUCCESS) ERROR STOP 22 cerr = md%set_int("step", step) if (cerr /= MULTIO_SUCCESS) ERROR STOP 23 diff --git a/tests/multio/action/statistics-mtg2/CMakeLists.txt b/tests/multio/action/statistics-mtg2/CMakeLists.txt index 907496711..bc433f68a 100644 --- a/tests/multio/action/statistics-mtg2/CMakeLists.txt +++ b/tests/multio/action/statistics-mtg2/CMakeLists.txt @@ -64,4 +64,11 @@ ecbuild_add_test( LIBS multio-action-debug-sink multio-action-statistics-mtg2 ) +ecbuild_add_test( + TARGET ${PREFIX}_climdt_no_initialcondition + SOURCES ClimDTNoInitialCondition.cc + NO_AS_NEEDED + LIBS multio-action-debug-sink multio-action-statistics-mtg2 multio-action-encode-mtg2 multio-action-print +) + add_subdirectory(operations) diff --git a/tests/multio/action/statistics-mtg2/ClimDTNoInitialCondition.cc b/tests/multio/action/statistics-mtg2/ClimDTNoInitialCondition.cc new file mode 100644 index 000000000..7abc06a5d --- /dev/null +++ b/tests/multio/action/statistics-mtg2/ClimDTNoInitialCondition.cc @@ -0,0 +1,405 @@ +/* + * (C) Copyright 2025- ECMWF. + * + * This software is licensed under the terms of the Apache Licence Version 2.0 + * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. + * In applying this licence, ECMWF does not waive the privileges and immunities + * granted to it by virtue of its status as an intergovernmental organisation nor + * does it submit to any jurisdiction. + */ + + +/// @author Kevin Nobel + + +#include "eckit/io/Buffer.h" +#include "eckit/testing/Test.h" + +#include "metkit/codes/api/CodesAPI.h" + +#include "../../MultioTestEnvironment.h" + +/// This file contains daily and monthly statistics. +/// This primarily tests current behaviour for ocean output. +/// +/// Test param: 262100 -> 263100 (average) + +namespace multio::test::statistics_mtg2 { + +using multio::message::Message; +using multio::message::Metadata; +using multio::test::MultioTestEnvironment; + +struct SampleParams { + std::int64_t paramIn; + std::int64_t paramOut; + std::string levtype; + std::int64_t levelist; +}; + + +message::Metadata mkMd(const SampleParams& p, int64_t step) { + return {{"param", p.paramIn}, // Total Precipitation + {"levelist", p.levelist}, + {"date", 19880101}, + {"time", 0}, + {"step", step}, + {"levtype", p.levtype}, + {"grid", "eORCA1_T"}, + {"activity", "baseline"}, + {"class", "d1"}, + {"dataset", "climate-dt"}, + {"experiment", "hist"}, + {"expver", "j36u"}, + {"generation", 2}, + {"model", "IFS-NEMO"}, + {"realization", 1}, + {"resolution", "standard"}, + {"type", "fc"}, + {"packing", "ccsds"}, + {"misc-subCentre", 1003}, + {"misc-generatingProcessIdentifier", 156}, + {"misc-timeIncrementInSeconds", 3600}, + {"misc-precision", "double"}}; +} + +static const std::vector params{{262100, 263100, "o2d", 1}, {262507, 263507, "o3d", 67}}; + +CASE("hourly to daily - no initial condition") { + for (const auto& p : params) { + const auto plan = R"json({ + "name": "hourly to daily", + "actions": [ + { + "type": "statistics-mtg2", + "output-frequency": "1d", + "operations": [ "average" ], + "options": { + "set-metadata": { + "stream": "clte" + } + } + }, + { + "type": "debug-sink" + } + ] + })json"; + auto env = MultioTestEnvironment(plan); + std::vector payloadData(1024, 1.23); + + EXPECT_EQUAL(env.debugSink().size(), 0); + + // No initia + for (std::int64_t step = 1; step <= 24 * (31 + 29 + 31); ++step) { + auto md = mkMd(p, step); + + eckit::Buffer payload{payloadData.data(), sizeof(double) * payloadData.size()}; + Message msg{{Message::Tag::Field, {}, {}, std::move(md)}, std::move(payload)}; + EXPECT_NO_THROW(env.process(std::move(msg))); + } + + // Expect last message not to be flushed, hence we substract -1 + EXPECT_EQUAL(env.debugSink().size(), 31 + 29 + 31 - 1); + + // Send a flush last-step to trigger emitting the statistics message + EXPECT_NO_THROW(env.process({{Message::Tag::Flush, {}, {}, {{"flushKind", "last-step"}}}})); + + // Now we got our last message plus additional flush message + EXPECT_EQUAL(env.debugSink().size(), 31 + 29 + 31 + 1); + + + for (auto [month, daysInMonth] : std::vector>{{1, 31}, {2, 29}, {3, 31}}) { + for (std::int64_t day = 1; day <= daysInMonth; ++day) { + EXPECT_EQUAL(env.debugSink().front().payload().size() / sizeof(double), payloadData.size()); + + auto md = env.debugSink().front().metadata(); + EXPECT_EQUAL(p.paramOut, md.get("param")); + EXPECT_EQUAL(24, md.get("step")); + EXPECT_EQUAL(24, md.get("timespan")); + int64_t date = 19880000 + month * 100 + day; + EXPECT_EQUAL(date, md.get("date")); + EXPECT_EQUAL(0, md.get("time")); + EXPECT_EQUAL("clte", md.get("stream")); + EXPECT(std::nullopt == md.getOpt("stattype")); + + EXPECT_EQUAL(1003, md.get("misc-subCentre")); + EXPECT_EQUAL(156, md.get("misc-generatingProcessIdentifier")); + EXPECT_EQUAL(3600, md.get("misc-timeIncrementInSeconds")); + + env.debugSink().pop(); + } + } + + EXPECT(env.debugSink().front().tag() == Message::Tag::Flush); + env.debugSink().pop(); + EXPECT_EQUAL(env.debugSink().size(), 0); + } +} + + +CASE("hourly to daily to monthly - no initial condition") { + for (const auto& p : params) { + const auto plan = R"json({ + "name": "hourly to daily", + "actions": [ + { + "type": "statistics-mtg2", + "output-frequency": "1d", + "operations": [ "average" ], + "options": { + "set-metadata": { + "stream": "clte" + } + } + }, + { + "type": "statistics-mtg2", + "output-frequency": "1m", + "operations": [ "average" ], + "options": { + "set-metadata": { + "stream": "clmn", + "misc-timeIncrementInSeconds": 86400 + } + } + }, + { + "type": "debug-sink" + } + ] + })json"; + auto env = MultioTestEnvironment(plan); + std::vector payloadData(1024, 1.23); + + EXPECT_EQUAL(env.debugSink().size(), 0); + + // No initia + for (std::int64_t step = 1; step <= 24 * (31 + 29 + 31); ++step) { + auto md = mkMd(p, step); + + eckit::Buffer payload{payloadData.data(), sizeof(double) * payloadData.size()}; + Message msg{{Message::Tag::Field, {}, {}, std::move(md)}, std::move(payload)}; + EXPECT_NO_THROW(env.process(std::move(msg))); + } + + // Expect last message not to be flushed, hence we expect just two messages for two month + EXPECT_EQUAL(env.debugSink().size(), 2); + + // Send a flush last-step to trigger emitting the statistics message + EXPECT_NO_THROW(env.process({{Message::Tag::Flush, {}, {}, {{"flushKind", "last-step"}}}})); + + // Now we got our last message plus additional flush message + EXPECT_EQUAL(env.debugSink().size(), 4); + + + for (auto [month, daysInMonth] : std::vector>{{1, 31}, {2, 29}, {3, 31}}) { + EXPECT_EQUAL(env.debugSink().front().payload().size() / sizeof(double), payloadData.size()); + + auto md = env.debugSink().front().metadata(); + EXPECT_EQUAL(p.paramOut, md.get("param")); + int64_t timespan = daysInMonth * 24; + EXPECT_EQUAL(timespan, md.get("step")); + EXPECT_EQUAL(timespan, md.get("timespan")); + + int64_t date = 19880001 + month * 100; + EXPECT_EQUAL(date, md.get("date")); + EXPECT_EQUAL(0, md.get("time")); + EXPECT_EQUAL("clmn", md.get("stream")); + EXPECT(std::nullopt == md.getOpt("stattype")); + + EXPECT_EQUAL(1003, md.get("misc-subCentre")); + EXPECT_EQUAL(156, md.get("misc-generatingProcessIdentifier")); + EXPECT_EQUAL(3600 * 24, md.get("misc-timeIncrementInSeconds")); + env.debugSink().pop(); + } + + EXPECT(env.debugSink().front().tag() == Message::Tag::Flush); + env.debugSink().pop(); + EXPECT_EQUAL(env.debugSink().size(), 0); + } +} + + +CASE("hourly to daily - no initial condition - with encoding") { + for (const auto& p : params) { + const auto plan = R"json({ + "name": "hourly to daily", + "actions": [ + { + "type": "statistics-mtg2", + "output-frequency": "1d", + "operations": [ "average" ], + "options": { + "set-metadata": { + "stream": "clte" + } + } + }, + { + "type": "encode-mtg2", + "cached": true + }, + { + "type": "debug-sink" + } + ] + })json"; + auto env = MultioTestEnvironment(plan); + std::vector payloadData(1024, 1.23); + + EXPECT_EQUAL(env.debugSink().size(), 0); + + // No initia + for (std::int64_t step = 1; step <= 24 * (31 + 29 + 31); ++step) { + auto md = mkMd(p, step); + + eckit::Buffer payload{payloadData.data(), sizeof(double) * payloadData.size()}; + Message msg{{Message::Tag::Field, {}, {}, std::move(md)}, std::move(payload)}; + EXPECT_NO_THROW(env.process(std::move(msg))); + } + + // Expect last message not to be flushed, hence we substract -1 + EXPECT_EQUAL(env.debugSink().size(), 31 + 29 + 31 - 1); + + // Send a flush last-step to trigger emitting the statistics message + EXPECT_NO_THROW(env.process({{Message::Tag::Flush, {}, {}, {{"flushKind", "last-step"}}}})); + + // Now we got our last message plus additional flush message + EXPECT_EQUAL(env.debugSink().size(), 31 + 29 + 31 + 1); + + + for (auto [month, daysInMonth] : std::vector>{{1, 31}, {2, 29}, {3, 31}}) { + for (std::int64_t day = 1; day <= daysInMonth; ++day) { + auto codesHandle = metkit::codes::codesHandleFromMessage( + {static_cast(env.debugSink().front().payload().data()), + env.debugSink().front().payload().size()}); + + auto md = env.debugSink().front().metadata(); + EXPECT_EQUAL(p.paramOut, md.get("param")); + EXPECT_EQUAL(p.paramOut, codesHandle->getLong("param")); + EXPECT_EQUAL(24, md.get("step")); + EXPECT_EQUAL(24, codesHandle->getLong("step")); + int64_t date = 19880000 + month * 100 + day; + EXPECT_EQUAL(date, md.get("date")); + EXPECT_EQUAL(date, codesHandle->getLong("date")); + EXPECT_EQUAL(0, md.get("time")); + EXPECT_EQUAL(0, codesHandle->getLong("time")); + EXPECT_EQUAL("clte", md.get("stream")); + EXPECT_EQUAL("clte", codesHandle->getString("stream")); + EXPECT(std::nullopt == md.getOpt("stattype")); + + EXPECT_EQUAL(1003, codesHandle->getLong("subCentre")); + EXPECT_EQUAL(156, codesHandle->getLong("generatingProcessIdentifier")); + EXPECT_EQUAL(3600, codesHandle->getLong("timeIncrement")); + + env.debugSink().pop(); + } + } + + EXPECT(env.debugSink().front().tag() == Message::Tag::Flush); + env.debugSink().pop(); + EXPECT_EQUAL(env.debugSink().size(), 0); + } +} + + +CASE("hourly to daily to monthly - no initial condition - with encoding") { + for (const auto& p : params) { + const auto plan = R"json({ + "name": "hourly to daily", + "actions": [ + { + "type": "statistics-mtg2", + "output-frequency": "1d", + "operations": [ "average" ], + "options": { + "set-metadata": { + "stream": "clte" + } + } + }, + { + "type": "statistics-mtg2", + "output-frequency": "1m", + "operations": [ "average" ], + "options": { + "set-metadata": { + "stream": "clmn", + "misc-timeIncrementInSeconds": 86400 + } + } + }, + { + "type": "encode-mtg2", + "cached": true + }, + { + "type": "debug-sink" + } + ] + })json"; + auto env = MultioTestEnvironment(plan); + std::vector payloadData(1024, 1.23); + + EXPECT_EQUAL(env.debugSink().size(), 0); + + // No initia + for (std::int64_t step = 1; step <= 24 * (31 + 29 + 31); ++step) { + auto md = mkMd(p, step); + + eckit::Buffer payload{payloadData.data(), sizeof(double) * payloadData.size()}; + Message msg{{Message::Tag::Field, {}, {}, std::move(md)}, std::move(payload)}; + EXPECT_NO_THROW(env.process(std::move(msg))); + } + + // Expect last message not to be flushed, hence we expect just two messages for two month + EXPECT_EQUAL(env.debugSink().size(), 2); + + // Send a flush last-step to trigger emitting the statistics message + EXPECT_NO_THROW(env.process({{Message::Tag::Flush, {}, {}, {{"flushKind", "last-step"}}}})); + + // Now we got our last message plus additional flush message + EXPECT_EQUAL(env.debugSink().size(), 4); + + + for (auto [month, daysInMonth] : std::vector>{{1, 31}, {2, 29}, {3, 31}}) { + auto codesHandle = metkit::codes::codesHandleFromMessage( + {static_cast(env.debugSink().front().payload().data()), + env.debugSink().front().payload().size()}); + + auto md = env.debugSink().front().metadata(); + EXPECT_EQUAL(p.paramOut, md.get("param")); + EXPECT_EQUAL(p.paramOut, codesHandle->getLong("param")); + int64_t timespan = daysInMonth * 24; + EXPECT_EQUAL(timespan, md.get("step")); + EXPECT_EQUAL(timespan, codesHandle->getLong("step")); + EXPECT_EQUAL(timespan, md.get("step")); + EXPECT_EQUAL(timespan, codesHandle->getLong("step")); + int64_t date = 19880001 + month * 100; + EXPECT_EQUAL(date, md.get("date")); + EXPECT_EQUAL(date, codesHandle->getLong("date")); + EXPECT_EQUAL(0, md.get("time")); + EXPECT_EQUAL(0, codesHandle->getLong("time")); + EXPECT_EQUAL("clmn", md.get("stream")); + EXPECT_EQUAL("clmn", codesHandle->getString("stream")); + EXPECT(std::nullopt == md.getOpt("stattype")); + + EXPECT_EQUAL(1003, codesHandle->getLong("subCentre")); + EXPECT_EQUAL(156, codesHandle->getLong("generatingProcessIdentifier")); + EXPECT_EQUAL(3600 * 24, codesHandle->getLong("timeIncrement")); + + env.debugSink().pop(); + } + + EXPECT(env.debugSink().front().tag() == Message::Tag::Flush); + env.debugSink().pop(); + EXPECT_EQUAL(env.debugSink().size(), 0); + } +} + +} // namespace multio::test::statistics_mtg2 + +int main(int argc, char** argv) { + return eckit::testing::run_tests(argc, argv); +}