diff --git a/docs/CHANGELOG.asciidoc b/docs/CHANGELOG.asciidoc index 916d929bc..5c2485db4 100644 --- a/docs/CHANGELOG.asciidoc +++ b/docs/CHANGELOG.asciidoc @@ -32,7 +32,8 @@ === Bug Fixes -* Report RSS in bytes instead of pages. (See {ml-pull}[#2917].) +* Truncate oversized field values to prevent autodetect process crash. (See {ml-pull}2929[#2929], {es-pull}143180[#143180], issue: {ml-issue}2796[#2796].) +* Report RSS in bytes instead of pages. (See {ml-pull}2917[#2917].) === Enhancements diff --git a/include/api/CAnomalyJob.h b/include/api/CAnomalyJob.h index 279a59793..fd1cc4bc8 100644 --- a/include/api/CAnomalyJob.h +++ b/include/api/CAnomalyJob.h @@ -392,6 +392,18 @@ class API_EXPORT CAnomalyJob : public CDataProcessor { core_t::TTime time, const TStrStrUMap& dataRowFields); + //! Prepare field values with truncation handling. + //! Extracts field values from \p dataRowFields, truncates oversized values, + //! and populates \p fieldValues with pointers to either original or truncated values. + //! \param fieldNames The names of fields to extract + //! \param dataRowFields The data row containing field values + //! \param fieldValues Output vector of pointers to field values + //! \param truncatedCopies Storage for truncated copies (must remain valid while fieldValues is used) + static void prepareTruncatedFieldValues(const TStrVec& fieldNames, + const TStrStrUMap& dataRowFields, + model::CAnomalyDetector::TStrCPtrVec& fieldValues, + TStrVec& truncatedCopies); + //! Parses a control message requesting that model state be persisted. //! Extracts optional arguments to be used for persistence. static bool parsePersistControlMessageArgs(const std::string& controlMessageArgs, diff --git a/include/core/CLoggerThrottler.h b/include/core/CLoggerThrottler.h index c6a450512..58b89c4d8 100644 --- a/include/core/CLoggerThrottler.h +++ b/include/core/CLoggerThrottler.h @@ -30,7 +30,8 @@ namespace core { //! This is thread safe but uses a very simple strategy: all accesses to a single //! hash map are sychronised. We assume that log throttling is only applied to //! messages which normally occur infrequently; for example, this is only currently -//! applied to WARN and ERROR level logging (see LogMacros.h). So there will be +//! applied to WARN, ERROR, and throttled INFO (LOG_INFO_THROTTLED) logging +//! (see LogMacros.h). So there will be //! little contention. Furthermore, the overhead of locking and unlocking the mutex //! should be neglible compared to the work done if the log line were actually //! emitted. So this should actually give a significant performance improvement diff --git a/include/core/LogMacros.h b/include/core/LogMacros.h index 0c84a88d2..a66a6f5dd 100644 --- a/include/core/LogMacros.h +++ b/include/core/LogMacros.h @@ -83,6 +83,24 @@ BOOST_LOG_STREAM_SEV(ml::core::CLogger::instance().logger(), ml::core::CLogger::E_Info) \ LOG_LOCATION_INFO \ message +#ifdef LOG_INFO_THROTTLED +#undef LOG_INFO_THROTTLED +#endif +#define LOG_INFO_THROTTLED(message) \ + do { \ + std::size_t countOfInfoMessages; \ + bool skipInfoMessage; \ + std::tie(countOfInfoMessages, skipInfoMessage) = \ + ml::core::CLogger::instance().throttler().skip(__FILE__, __LINE__); \ + if (skipInfoMessage == false) { \ + BOOST_LOG_STREAM_SEV(ml::core::CLogger::instance().logger(), \ + ml::core::CLogger::E_Info) \ + LOG_LOCATION_INFO \ + message << (countOfInfoMessages > 1 \ + ? " | repeated [" + std::to_string(countOfInfoMessages) + "]" \ + : ""); \ + } \ + } while (0) #ifdef LOG_WARN #undef LOG_WARN #endif diff --git a/include/model/CFieldValueTruncator.h b/include/model/CFieldValueTruncator.h new file mode 100644 index 000000000..6c0b94aa6 --- /dev/null +++ b/include/model/CFieldValueTruncator.h @@ -0,0 +1,144 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the following additional limitation. Functionality enabled by the + * files subject to the Elastic License 2.0 may only be used in production when + * invoked by an Elasticsearch process with a license key installed that permits + * use of machine learning features. You may not use this file except in + * compliance with the Elastic License 2.0 and the foregoing additional + * limitation. + */ +#ifndef INCLUDED_ml_model_CFieldValueTruncator_h +#define INCLUDED_ml_model_CFieldValueTruncator_h + +#include + +#include + +#include +#include + +namespace ml { +namespace model { + +//! \brief Enforces term field length constraints with collision prevention. +//! +//! In the anomaly detection domain, term fields (by, over, partition, influencer) +//! are categorical identifiers that must satisfy two invariants: +//! 1. **Bounded Length** - Prevent memory amplification and OOM crashes +//! 2. **Unique Identity** - Distinct field values must remain distinguishable +//! +//! Values exceeding MAX_FIELD_VALUE_LENGTH (256 chars) are transformed using +//! collision-safe truncation: +//! - Retain PREFIX_LENGTH (239) characters of original value +//! - Append HASH_SEPARATOR ('$') +//! - Append HASH_HEX_DIGITS (16) character hex hash of complete original value +//! +//! Format: "$" +//! Example: "very_long_field_value_that_exceeds_limit_(...)$a1b2c3d4e5f67890" +//! +//! The 256-character limit aligns with Elasticsearch's ignore_above default +//! for keyword fields. The hash suffix ensures data integrity while maintaining +//! human readability (first 239 characters visible) and compatibility with +//! prefix-based filtering. +class MODEL_EXPORT CFieldValueTruncator { +public: + //! Maximum length for term fields in anomaly detection. + static constexpr std::size_t MAX_FIELD_VALUE_LENGTH = 256; + + //! Collision prevention format components + static constexpr char HASH_SEPARATOR = '$'; + static constexpr std::size_t HASH_HEX_DIGITS = 16; // 16 hex chars = full 64-bit hash + static constexpr std::size_t HASH_SUFFIX_LENGTH = 1 /* separator */ + HASH_HEX_DIGITS; // 17 total + + //! Content prefix length (readable portion after truncation) + static constexpr std::size_t PREFIX_LENGTH = MAX_FIELD_VALUE_LENGTH - HASH_SUFFIX_LENGTH; // 239 + + // Domain invariants (enforced at compile-time) + static_assert(PREFIX_LENGTH + HASH_SUFFIX_LENGTH == MAX_FIELD_VALUE_LENGTH, + "Term field format invariant: prefix + suffix = total length"); + static_assert(PREFIX_LENGTH >= 200, + "Readable prefix must be substantial for human comprehension"); + + //! Check if a term field value exceeds the domain constraint. + //! \return true if the value requires length enforcement + static bool needsTruncation(const std::string& value) { + return value.size() > MAX_FIELD_VALUE_LENGTH; + } + + //! Enforce term field length constraint in-place. + //! Applies collision-safe truncation for values exceeding the limit. + //! \param[in,out] value Field value to constrain + //! \return true if truncation was applied, false if already within limit + static bool truncate(std::string& value) { + if (needsTruncation(value) == false) { + return false; + } + + std::string originalValue = std::move(value); + value.assign(originalValue, 0, PREFIX_LENGTH); + appendCollisionPreventionSuffix(originalValue, value); + + return true; + } + + //! Enforce term field length constraint, returning constrained copy. + //! \param value Original field value + //! \return Copy with length constraint enforced + static std::string truncated(const std::string& value) { + if (needsTruncation(value) == false) { + return value; + } + + std::string result; + result.reserve(MAX_FIELD_VALUE_LENGTH); + result.assign(value, 0, PREFIX_LENGTH); + appendCollisionPreventionSuffix(value, result); + + return result; + } + +private: + //! \brief Hash encoding for collision prevention. + //! + //! Encapsulates the technical details of hash computation and formatting. + //! Separated from domain logic for clarity and testability. + struct HashEncoding { + //! Compute collision-resistant identity hash. + //! Uses safeMurmurHash64 (endian-neutral) for state persistence safety. + static std::uint64_t compute(const std::string& value) { + return core::CHashing::safeMurmurHash64(value.data(), + static_cast(value.size()), + 0); // Fixed seed for determinism + } + + //! Format 64-bit hash as zero-padded lowercase hex string. + //! \param hash The hash value to format + //! \param[out] buffer Must be at least HASH_HEX_DIGITS + 1 bytes + //! \return Pointer to null-terminated hex string in buffer + static const char* toHex(std::uint64_t hash, char* buffer) { + // %016llx produces 16-char zero-padded lowercase hex (full 64 bits) + std::snprintf(buffer, HASH_HEX_DIGITS + 1, "%016llx", + static_cast(hash)); + return buffer; + } + }; + + //! Append collision-prevention suffix: separator + hash. + //! \param originalValue Complete untruncated value for hash computation + //! \param[in,out] prefix Truncated prefix to which suffix is appended + static void appendCollisionPreventionSuffix(const std::string& originalValue, + std::string& prefix) { + std::uint64_t identityHash = HashEncoding::compute(originalValue); + + prefix.reserve(MAX_FIELD_VALUE_LENGTH); + prefix.push_back(HASH_SEPARATOR); + + char hashHexBuffer[HASH_HEX_DIGITS + 1]; + prefix.append(HashEncoding::toHex(identityHash, hashHexBuffer)); + } +}; +} +} + +#endif // INCLUDED_ml_model_CFieldValueTruncator_h diff --git a/lib/api/CAnomalyJob.cc b/lib/api/CAnomalyJob.cc index d7321cd2a..374becaf3 100644 --- a/lib/api/CAnomalyJob.cc +++ b/lib/api/CAnomalyJob.cc @@ -28,6 +28,7 @@ #include #include +#include #include #include #include @@ -1700,15 +1701,43 @@ const std::string* CAnomalyJob::fieldValue(const std::string& fieldName, return !fieldName.empty() && fieldValue.empty() ? nullptr : &fieldValue; } +void CAnomalyJob::prepareTruncatedFieldValues(const TStrVec& fieldNames, + const TStrStrUMap& dataRowFields, + model::CAnomalyDetector::TStrCPtrVec& fieldValues, + TStrVec& truncatedCopies) { + + fieldValues.reserve(fieldNames.size()); + // Reserve ensures no reallocation invalidates pointers stored in fieldValues. + truncatedCopies.reserve(fieldNames.size()); + + for (const auto& fieldName : fieldNames) { + const std::string* value = fieldValue(fieldName, dataRowFields); + if (value != nullptr && model::CFieldValueTruncator::needsTruncation(*value)) { + truncatedCopies.push_back(model::CFieldValueTruncator::truncated(*value)); + fieldValues.push_back(&truncatedCopies.back()); + + std::string escapedFieldName = fieldName; + core::CStringUtils::escape('\\', "\n\r\t", escapedFieldName); + LOG_INFO_THROTTLED( + << "Field '" << escapedFieldName + << "' value (length=" << value->size() << ", prefix='" + << value->substr(0, std::min(50, value->size())) + << "...') exceeds " << model::CFieldValueTruncator::MAX_FIELD_VALUE_LENGTH + << " characters and has been truncated with collision-safe hash suffix"); + } else { + fieldValues.push_back(value); + } + } +} + void CAnomalyJob::addRecord(const TAnomalyDetectorPtr& detector, core_t::TTime time, const TStrStrUMap& dataRowFields) { model::CAnomalyDetector::TStrCPtrVec fieldValues; + TStrVec truncatedCopies; const TStrVec& fieldNames = detector->fieldsOfInterest(); - fieldValues.reserve(fieldNames.size()); - for (const auto& fieldName : fieldNames) { - fieldValues.push_back(fieldValue(fieldName, dataRowFields)); - } + + prepareTruncatedFieldValues(fieldNames, dataRowFields, fieldValues, truncatedCopies); detector->addRecord(time, fieldValues); } diff --git a/lib/api/CDataProcessor.cc b/lib/api/CDataProcessor.cc index 93db8c475..cb796e19a 100644 --- a/lib/api/CDataProcessor.cc +++ b/lib/api/CDataProcessor.cc @@ -15,6 +15,8 @@ #include #include +#include + namespace ml { namespace api { @@ -49,7 +51,12 @@ std::string CDataProcessor::debugPrintRecord(const TStrStrUMap& dataRowFields) { fieldValues.push_back(','); } fieldNames.append(rowIter->first); - fieldValues.append(rowIter->second); + const std::string& val = rowIter->second; + if (model::CFieldValueTruncator::needsTruncation(val)) { + fieldValues.append(model::CFieldValueTruncator::truncated(val)); + } else { + fieldValues.append(val); + } } result << fieldNames << core_t::LINE_ENDING << fieldValues; diff --git a/lib/api/unittest/CAnomalyJobTest.cc b/lib/api/unittest/CAnomalyJobTest.cc index d5384327e..3e9a9da8f 100644 --- a/lib/api/unittest/CAnomalyJobTest.cc +++ b/lib/api/unittest/CAnomalyJobTest.cc @@ -17,6 +17,7 @@ #include #include +#include #include #include @@ -1205,4 +1206,95 @@ BOOST_AUTO_TEST_CASE(testHierarchicalResultsNormalizerShouldIncreaseMemoryUsage) resourceMonitor.forceRefreshAll(); BOOST_TEST_REQUIRE(resourceMonitor.totalMemory() < memoryUsageBeforeUnregister); } + +BOOST_AUTO_TEST_CASE(testOversizedFieldValuesTruncated) { + // Verify that addRecord (via prepareTruncatedFieldValues) truncates oversized + // by/influencer values before they enter the model. We assert on persisted + // state because that reflects what the detector stored; if addRecord did + // not truncate, the full value would appear here. + model::CLimits limits; + api::CAnomalyJobConfig jobConfig = CTestAnomalyJob::makeSimpleJobConfig( + "count", "", "by_field", "", "", {"influencer_field"}); + + model::CAnomalyDetectorModelConfig modelConfig = + model::CAnomalyDetectorModelConfig::defaultConfig(BUCKET_SIZE); + std::stringstream outputStrm; + core::CJsonOutputStreamWrapper wrappedOutputStream(outputStrm); + + CTestAnomalyJob job("job", limits, jobConfig, modelConfig, wrappedOutputStream); + + std::string const oversizedValue(1000, 'x'); + CTestAnomalyJob::TStrStrUMap dataRows{{"time", "1000"}, + {"by_field", oversizedValue}, + {"influencer_field", oversizedValue}}; + + BOOST_TEST_REQUIRE(job.handleRecord(dataRows)); + BOOST_REQUIRE_EQUAL(uint64_t(1), job.numRecordsHandled()); + + // Advance past bucket boundary so results are output and state can be persisted. + CTestAnomalyJob::TStrStrUMap advanceRows{{"time", "5000"}, + {"by_field", oversizedValue}, + {"influencer_field", oversizedValue}}; + BOOST_TEST_REQUIRE(job.handleRecord(advanceRows)); + BOOST_REQUIRE_EQUAL(uint64_t(2), job.numRecordsHandled()); + + std::ostringstream* strm{nullptr}; + api::CSingleStreamDataAdder::TOStreamP ptr{strm = new std::ostringstream()}; + api::CSingleStreamDataAdder persister{ptr}; + BOOST_TEST_REQUIRE(job.persistStateInForeground(persister, "")); + std::string const persistedState{strm->str()}; + + // Full oversized value must not be in state (addRecord truncated before store). + BOOST_TEST_REQUIRE(persistedState.find(oversizedValue) == std::string::npos); + // Persisted state must contain the truncated form produced by input truncation. + std::string const expectedTruncated = model::CFieldValueTruncator::truncated(oversizedValue); + BOOST_TEST_REQUIRE(persistedState.find(expectedTruncated) != std::string::npos); +} + +BOOST_AUTO_TEST_CASE(testNormalFieldValuesNotTruncated) { + model::CLimits limits; + api::CAnomalyJobConfig jobConfig = CTestAnomalyJob::makeSimpleJobConfig( + "count", "", "by_field", "", "", {"influencer_field"}); + + model::CAnomalyDetectorModelConfig modelConfig = + model::CAnomalyDetectorModelConfig::defaultConfig(BUCKET_SIZE); + std::stringstream outputStrm; + core::CJsonOutputStreamWrapper wrappedOutputStream(outputStrm); + + CTestAnomalyJob job("job", limits, jobConfig, modelConfig, wrappedOutputStream); + + std::string const normalValue("normal_value"); + CTestAnomalyJob::TStrStrUMap dataRows{ + {"time", "1000"}, {"by_field", normalValue}, {"influencer_field", normalValue}}; + + BOOST_TEST_REQUIRE(job.handleRecord(dataRows)); + BOOST_REQUIRE_EQUAL(uint64_t(1), job.numRecordsHandled()); + + // Advance past bucket boundary so results are output and state can be persisted. + CTestAnomalyJob::TStrStrUMap advanceRows{ + {"time", "5000"}, {"by_field", normalValue}, {"influencer_field", normalValue}}; + BOOST_TEST_REQUIRE(job.handleRecord(advanceRows)); + BOOST_REQUIRE_EQUAL(uint64_t(2), job.numRecordsHandled()); + + std::ostringstream* strm{nullptr}; + api::CSingleStreamDataAdder::TOStreamP ptr{strm = new std::ostringstream()}; + api::CSingleStreamDataAdder persister{ptr}; + BOOST_TEST_REQUIRE(job.persistStateInForeground(persister, "")); + std::string const persistedState{strm->str()}; + + BOOST_TEST_REQUIRE(persistedState.find(normalValue) != std::string::npos); +} + +BOOST_AUTO_TEST_CASE(testDebugPrintRecordTruncatesLongValues) { + api::CDataProcessor::TStrStrUMap record; + record["field1"] = std::string(1000, 'x'); + record["field2"] = "short"; + std::string result = api::CDataProcessor::debugPrintRecord(record); + // truncated() produces prefix + '$' + 16 hex chars; full 1000-char value not present + BOOST_TEST_REQUIRE(result.find(std::string(1000, 'x')) == std::string::npos); + BOOST_TEST_REQUIRE(result.find(model::CFieldValueTruncator::HASH_SEPARATOR) != + std::string::npos); + BOOST_TEST_REQUIRE(result.size() < 500); +} + BOOST_AUTO_TEST_SUITE_END() diff --git a/lib/model/CBucketGatherer.cc b/lib/model/CBucketGatherer.cc index cdffd8d23..70e9f9511 100644 --- a/lib/model/CBucketGatherer.cc +++ b/lib/model/CBucketGatherer.cc @@ -23,6 +23,7 @@ #include #include +#include #include #include @@ -115,7 +116,9 @@ bool restoreInfluencerPersonAttributeCounts(core::CStateRestoreTraverser& traver const std::string& name = traverser.name(); RESTORE_BUILT_IN(PERSON_UID_TAG, person) RESTORE_BUILT_IN(ATTRIBUTE_UID_TAG, attribute) - RESTORE_NO_ERROR(INFLUENCER_TAG, influence = traverser.value()) + RESTORE_NO_ERROR(INFLUENCER_TAG, influence = traverser.value(); + CFieldValueTruncator::truncate(influence)) + if (name == COUNT_TAG) { if (core::CStringUtils::stringToType(traverser.value(), count) == false) { LOG_ERROR(<< "Failed to restore COUNT_TAG, got " << traverser.value()); diff --git a/lib/model/CDynamicStringIdRegistry.cc b/lib/model/CDynamicStringIdRegistry.cc index 9974e8e15..c293f664d 100644 --- a/lib/model/CDynamicStringIdRegistry.cc +++ b/lib/model/CDynamicStringIdRegistry.cc @@ -18,6 +18,7 @@ #include #include +#include #include #include @@ -251,7 +252,9 @@ bool CDynamicStringIdRegistry::acceptRestoreTraverser(core::CStateRestoreTravers do { const std::string& name = traverser.name(); if (name == NAMES_TAG) { - m_Names.emplace_back(traverser.value()); + std::string value = traverser.value(); + CFieldValueTruncator::truncate(value); + m_Names.emplace_back(std::move(value)); } else if (name == FREE_NAMES_TAG) { if (!core::CPersistUtils::restore(FREE_NAMES_TAG, m_FreeUids, traverser)) { return false; diff --git a/lib/model/CEventRateBucketGatherer.cc b/lib/model/CEventRateBucketGatherer.cc index a01ddc9cd..600719089 100644 --- a/lib/model/CEventRateBucketGatherer.cc +++ b/lib/model/CEventRateBucketGatherer.cc @@ -27,6 +27,7 @@ #include #include +#include #include #include @@ -684,6 +685,7 @@ bool restoreInfluencerUniqueStrings(core::CStateRestoreTraverser& traverser, const std::string& name = traverser.name(); if (name == DICTIONARY_WORD_TAG) { key = traverser.value(); + CFieldValueTruncator::truncate(key); } else if (name == UNIQUE_WORD_TAG) { CUniqueStringFeatureData::TWord value; if (value.fromDelimited(traverser.value()) == false) { diff --git a/lib/model/CGathererTools.cc b/lib/model/CGathererTools.cc index 378e0ddd2..2c9e48414 100644 --- a/lib/model/CGathererTools.cc +++ b/lib/model/CGathererTools.cc @@ -23,6 +23,8 @@ #include #include +#include + #include namespace ml { @@ -89,6 +91,7 @@ struct SInfluencerSumSerializer { const std::string& name = traverser.name(); if (name == SUM_MAP_KEY_TAG) { key = traverser.value(); + CFieldValueTruncator::truncate(key); } else if (name == SUM_MAP_VALUE_TAG) { if (core::CStringUtils::stringToType(traverser.value(), map[key]) == false) { LOG_ERROR(<< "Invalid sum in " << traverser.value()); diff --git a/lib/model/unittest/CDynamicStringIdRegistryTest.cc b/lib/model/unittest/CDynamicStringIdRegistryTest.cc index 1d4aa1698..b060340dd 100644 --- a/lib/model/unittest/CDynamicStringIdRegistryTest.cc +++ b/lib/model/unittest/CDynamicStringIdRegistryTest.cc @@ -15,6 +15,7 @@ #include #include +#include #include #include @@ -109,4 +110,34 @@ BOOST_AUTO_TEST_CASE(testPersist) { BOOST_REQUIRE_EQUAL(restoredJson.str(), origJson.str()); } +BOOST_AUTO_TEST_CASE(testRestoreTruncatesOversizedNames) { + CResourceMonitor resourceMonitor; + CDynamicStringIdRegistry registry("person", counter_t::E_TSADNumberNewPeople, + counter_t::E_TSADNumberNewPeopleNotAllowed, + counter_t::E_TSADNumberNewPeopleRecycled); + + bool addedPerson = false; + std::string shortName("foo"); + std::string oversizedName(1000, 'x'); + registry.addName(shortName, 0, resourceMonitor, addedPerson); + registry.addName(oversizedName, 0, resourceMonitor, addedPerson); + + std::ostringstream origJson; + core::CJsonStatePersistInserter::persist( + origJson, std::bind_front(&CDynamicStringIdRegistry::acceptPersistInserter, ®istry)); + + std::istringstream is("{\"topLevel\" : " + origJson.str() + "}"); + core::CJsonStateRestoreTraverser traverser(is); + CDynamicStringIdRegistry restoredRegistry("person", counter_t::E_TSADNumberNewPeople, + counter_t::E_TSADNumberNewPeopleNotAllowed, + counter_t::E_TSADNumberNewPeopleRecycled); + traverser.traverseSubLevel(std::bind_front( + &CDynamicStringIdRegistry::acceptRestoreTraverser, &restoredRegistry)); + + BOOST_REQUIRE_EQUAL(2, restoredRegistry.numberNames()); + BOOST_REQUIRE_EQUAL(shortName, restoredRegistry.name(0, "")); + BOOST_REQUIRE_EQUAL(CFieldValueTruncator::MAX_FIELD_VALUE_LENGTH, + restoredRegistry.name(1, "").size()); +} + BOOST_AUTO_TEST_SUITE_END() diff --git a/lib/model/unittest/CEventRateDataGathererTest.cc b/lib/model/unittest/CEventRateDataGathererTest.cc index 859d0e204..2a231a6cb 100644 --- a/lib/model/unittest/CEventRateDataGathererTest.cc +++ b/lib/model/unittest/CEventRateDataGathererTest.cc @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -1891,4 +1892,81 @@ BOOST_FIXTURE_TEST_CASE(testDiurnalFeatures, CDiurnalTestFixture) { } } +BOOST_FIXTURE_TEST_CASE(testRestoreTruncatesOversizedInfluencerValues, CTestFixture) { + // Verify that oversized influencer field values persisted in old state + // are truncated on restore. This exercises truncation in: + // - CBucketGatherer::restoreInfluencerPersonAttributeCounts (Finding 7) + // - CEventRateBucketGatherer::restoreInfluencerUniqueStrings (Finding 8) + + constexpr core_t::TTime startTime = 0; + constexpr core_t::TTime bucketLength = 600; + SModelParams params(bucketLength); + params.s_LatencyBuckets = 2; + + TFeatureVec features; + features.push_back(model_t::E_IndividualUniqueCountByBucketAndPerson); + TStrVec influencerFieldNames{"IF1"}; + + CDataGatherer gatherer = CDataGathererBuilder(model_t::E_EventRate, features, + params, key, startTime) + .personFieldName("P") + .valueFieldName("V") + .influenceFieldNames(influencerFieldNames) + .build(); + + BOOST_REQUIRE_EQUAL(0, addPerson(gatherer, m_ResourceMonitor, "p", "v", 1)); + + // Add arrivals with an oversized influencer value (bypasses CAnomalyJob input truncation). + std::string const oversizedInfluencer(500, 'x'); + addArrival(gatherer, m_ResourceMonitor, startTime + 1, "p", "val1", oversizedInfluencer); + addArrival(gatherer, m_ResourceMonitor, startTime + 2, "p", "val2", oversizedInfluencer); + + // Persist — the JSON will contain the oversized influencer value. + std::ostringstream origJson; + core::CJsonStatePersistInserter::persist( + origJson, [&gatherer](core::CJsonStatePersistInserter& inserter) { + gatherer.acceptPersistInserter(inserter); + }); + + // Sanity check: the persisted JSON contains the full oversized value. + BOOST_TEST_REQUIRE(origJson.str().find(oversizedInfluencer) != std::string::npos); + + // Restore from persisted JSON — truncation should apply. + std::istringstream origJsonStrm{"{\"topLevel\" : " + origJson.str() + "}"}; + core::CJsonStateRestoreTraverser traverser(origJsonStrm); + + CBucketGatherer::SBucketGathererInitData bucketGathererInitData{ + EMPTY_STRING, EMPTY_STRING, EMPTY_STRING, EMPTY_STRING, {}, 0, 0}; + CDataGatherer restoredGatherer(model_t::E_EventRate, model_t::E_None, params, + EMPTY_STRING, key, bucketGathererInitData, traverser); + + // Persist restored gatherer — should NOT contain the oversized value. + std::ostringstream restoredJson; + core::CJsonStatePersistInserter::persist( + restoredJson, [&restoredGatherer](core::CJsonStatePersistInserter& inserter) { + restoredGatherer.acceptPersistInserter(inserter); + }); + + // The full 500-char string must no longer appear (it was truncated to 256). + BOOST_TEST_REQUIRE(restoredJson.str().find(oversizedInfluencer) == std::string::npos); + // Restore-path truncation must produce the same format as CFieldValueTruncator::truncated. + std::string const expectedTruncated = + model::CFieldValueTruncator::truncated(oversizedInfluencer); + BOOST_TEST_REQUIRE(restoredJson.str().find(expectedTruncated) != std::string::npos); + + // Verify idempotency: restore again and persist — should be identical. + std::istringstream restoredJsonStrm{"{\"topLevel\" : " + restoredJson.str() + "}"}; + core::CJsonStateRestoreTraverser traverser2(restoredJsonStrm); + CDataGatherer restoredGatherer2(model_t::E_EventRate, model_t::E_None, params, + EMPTY_STRING, key, bucketGathererInitData, traverser2); + + std::ostringstream restoredJson2; + core::CJsonStatePersistInserter::persist( + restoredJson2, [&restoredGatherer2](core::CJsonStatePersistInserter& inserter) { + restoredGatherer2.acceptPersistInserter(inserter); + }); + + BOOST_REQUIRE_EQUAL(restoredJson.str(), restoredJson2.str()); +} + BOOST_AUTO_TEST_SUITE_END() diff --git a/lib/model/unittest/CFieldValueTruncatorTest.cc b/lib/model/unittest/CFieldValueTruncatorTest.cc new file mode 100644 index 000000000..b17a33b7c --- /dev/null +++ b/lib/model/unittest/CFieldValueTruncatorTest.cc @@ -0,0 +1,154 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the following additional limitation. Functionality enabled by the + * files subject to the Elastic License 2.0 may only be used in production when + * invoked by an Elasticsearch process with a license key installed that permits + * use of machine learning features. You may not use this file except in + * compliance with the Elastic License 2.0 and the foregoing additional + * limitation. + */ + +#include + +#include + +BOOST_AUTO_TEST_SUITE(CFieldValueTruncatorTest) + +using namespace ml; +using namespace model; + +// ============================================================================ +// Constraint Enforcement Behavior +// ============================================================================ + +BOOST_AUTO_TEST_CASE(testShortValueUnchanged) { + std::string value("short"); + BOOST_REQUIRE_EQUAL(false, CFieldValueTruncator::truncate(value)); + BOOST_REQUIRE_EQUAL("short", value); +} + +BOOST_AUTO_TEST_CASE(testValueAtExactLimitUnchanged) { + std::string value(CFieldValueTruncator::MAX_FIELD_VALUE_LENGTH, 'x'); + BOOST_REQUIRE_EQUAL(false, CFieldValueTruncator::truncate(value)); + BOOST_REQUIRE_EQUAL(CFieldValueTruncator::MAX_FIELD_VALUE_LENGTH, value.size()); +} + +BOOST_AUTO_TEST_CASE(testOversizedValueEnforcedTo256Chars) { + std::string value(1000, 'x'); + BOOST_REQUIRE_EQUAL(true, CFieldValueTruncator::truncate(value)); + BOOST_REQUIRE_EQUAL(CFieldValueTruncator::MAX_FIELD_VALUE_LENGTH, value.size()); +} + +BOOST_AUTO_TEST_CASE(testEmptyValueUnchanged) { + std::string value; + BOOST_REQUIRE_EQUAL(false, CFieldValueTruncator::truncate(value)); + BOOST_REQUIRE_EQUAL(0, value.size()); +} + +BOOST_AUTO_TEST_CASE(testConstOverloadPreservesOriginal) { + const std::string longValue(1000, 'x'); + std::string result = CFieldValueTruncator::truncated(longValue); + BOOST_REQUIRE_EQUAL(CFieldValueTruncator::MAX_FIELD_VALUE_LENGTH, result.size()); + BOOST_REQUIRE_EQUAL(1000, longValue.size()); +} + +BOOST_AUTO_TEST_CASE(testConstOverloadShortValueReturnsSame) { + const std::string shortValue("short"); + std::string result = CFieldValueTruncator::truncated(shortValue); + BOOST_REQUIRE_EQUAL("short", result); +} + +BOOST_AUTO_TEST_CASE(testNeedsTruncation) { + BOOST_REQUIRE_EQUAL(false, CFieldValueTruncator::needsTruncation("short")); + BOOST_REQUIRE_EQUAL(false, CFieldValueTruncator::needsTruncation("")); + BOOST_REQUIRE_EQUAL(false, CFieldValueTruncator::needsTruncation(std::string( + CFieldValueTruncator::MAX_FIELD_VALUE_LENGTH, 'x'))); + BOOST_REQUIRE_EQUAL(true, CFieldValueTruncator::needsTruncation(std::string( + CFieldValueTruncator::MAX_FIELD_VALUE_LENGTH + 1, 'x'))); + BOOST_REQUIRE_EQUAL(true, CFieldValueTruncator::needsTruncation(std::string(1000, 'x'))); +} + +// ============================================================================ +// Hash Suffix Format Validation +// ============================================================================ + +BOOST_AUTO_TEST_CASE(testTruncatedValueHasCorrectFormat) { + std::string value(1000, 'x'); + std::string result = CFieldValueTruncator::truncated(value); + + // Format: 239 prefix + '$' + 16 hex chars = 256 total + BOOST_REQUIRE_EQUAL(256, result.size()); + BOOST_REQUIRE_EQUAL(CFieldValueTruncator::HASH_SEPARATOR, result[239]); + + // Prefix should match original + BOOST_REQUIRE_EQUAL(0, result.compare(0, 239, value, 0, 239)); + + // Hash portion should be lowercase hex digits + for (std::size_t i = 240; i < 256; ++i) { + BOOST_REQUIRE(std::isxdigit(result[i])); + BOOST_REQUIRE((result[i] >= '0' && result[i] <= '9') || + (result[i] >= 'a' && result[i] <= 'f')); + } +} + +BOOST_AUTO_TEST_CASE(testInPlaceTruncationPreservesFormat) { + std::string value(1000, 'z'); + bool wasTruncated = CFieldValueTruncator::truncate(value); + + BOOST_REQUIRE_EQUAL(true, wasTruncated); + BOOST_REQUIRE_EQUAL(256, value.size()); + BOOST_REQUIRE_EQUAL(CFieldValueTruncator::HASH_SEPARATOR, value[239]); + + // Verify hash portion is valid hex + for (std::size_t i = 240; i < 256; ++i) { + BOOST_REQUIRE(std::isxdigit(value[i])); + } +} + +// ============================================================================ +// Collision Prevention (Data Integrity) +// ============================================================================ + +BOOST_AUTO_TEST_CASE(testDistinctValuesProduceDistinctResults) { + std::string prefix(239, 'x'); + std::string value1 = prefix + std::string(1000, 'A'); + std::string value2 = prefix + std::string(1000, 'B'); + + std::string truncated1 = CFieldValueTruncator::truncated(value1); + std::string truncated2 = CFieldValueTruncator::truncated(value2); + + // Same prefix (239 chars + separator) + BOOST_REQUIRE_EQUAL(truncated1.substr(0, 240), truncated2.substr(0, 240)); + + // But different hash suffixes prevent collision + BOOST_REQUIRE_NE(truncated1.substr(240), truncated2.substr(240)); + BOOST_REQUIRE_NE(truncated1, truncated2); +} + +BOOST_AUTO_TEST_CASE(testCollisionsPreventedByHashSuffix) { + // Two values differing only after position 256 (original collision case) + std::string value1(300, 'x'); + value1.replace(280, 20, "AAAAAAAAAAAAAAAAAAAA"); + + std::string value2(300, 'x'); + value2.replace(280, 20, "BBBBBBBBBBBBBBBBBBBB"); + + std::string truncated1 = CFieldValueTruncator::truncated(value1); + std::string truncated2 = CFieldValueTruncator::truncated(value2); + + // Must be distinct despite identical first 239 chars + BOOST_REQUIRE_NE(truncated1, truncated2); + BOOST_REQUIRE_EQUAL(CFieldValueTruncator::MAX_FIELD_VALUE_LENGTH, truncated1.size()); + BOOST_REQUIRE_EQUAL(CFieldValueTruncator::MAX_FIELD_VALUE_LENGTH, truncated2.size()); +} + +BOOST_AUTO_TEST_CASE(testDeterministicHashing) { + std::string value(1000, 'y'); + std::string result1 = CFieldValueTruncator::truncated(value); + std::string result2 = CFieldValueTruncator::truncated(value); + + BOOST_REQUIRE_EQUAL(result1, result2); +} + +BOOST_AUTO_TEST_SUITE_END() diff --git a/lib/model/unittest/CMakeLists.txt b/lib/model/unittest/CMakeLists.txt index 8e6d6dcf4..e8f64ac6b 100644 --- a/lib/model/unittest/CMakeLists.txt +++ b/lib/model/unittest/CMakeLists.txt @@ -22,6 +22,7 @@ set (SRCS CDetectionRuleTest.cc CDetectorEqualizerTest.cc CDynamicStringIdRegistryTest.cc + CFieldValueTruncatorTest.cc CEventRateAnomalyDetectorTest.cc CEventRateDataGathererTest.cc CEventRateModelTest.cc diff --git a/lib/model/unittest/CMetricDataGathererTest.cc b/lib/model/unittest/CMetricDataGathererTest.cc index 76feaf054..041389789 100644 --- a/lib/model/unittest/CMetricDataGathererTest.cc +++ b/lib/model/unittest/CMetricDataGathererTest.cc @@ -18,6 +18,7 @@ #include #include +#include #include #include #include @@ -111,6 +112,24 @@ void addArrival(CDataGatherer& gatherer, gatherer.addArrival(fieldValues, eventData, resourceMonitor); } +void addArrival(CDataGatherer& gatherer, + CResourceMonitor& resourceMonitor, + core_t::TTime time, + const std::string& person, + double value, + const std::string& influencer) { + CDataGatherer::TStrCPtrVec fieldValues; + fieldValues.push_back(&person); + fieldValues.push_back(influencer.empty() ? nullptr : &influencer); + std::string const valueAsString(core::CStringUtils::typeToString(value)); + fieldValues.push_back(&valueAsString); + + CEventData eventData; + eventData.time(time); + + gatherer.addArrival(fieldValues, eventData, resourceMonitor); +} + void addArrival(CDataGatherer& gatherer, CResourceMonitor& resourceMonitor, core_t::TTime time, @@ -1843,4 +1862,84 @@ BOOST_FIXTURE_TEST_CASE(testVarp, CTestFixture) { } } +BOOST_FIXTURE_TEST_CASE(testRestoreTruncatesOversizedInfluencerSums, CTestFixture) { + // Verify that oversized influencer keys in CGathererTools::SInfluencerSumSerializer + // are truncated on restore. This exercises truncation in the metric sum gatherer's + // influencer bucket sum restore path. + + constexpr core_t::TTime startTime = 0; + constexpr core_t::TTime bucketLength = 600; + SModelParams params(bucketLength); + params.s_LatencyBuckets = 2; + params.s_SampleCountFactor = 1; + params.s_SampleQueueGrowthFactor = 0.1; + + TFeatureVec features; + features.push_back(model_t::E_IndividualSumByBucketAndPerson); + TStrVec const influencerNames{"i1"}; + + CDataGatherer gatherer = + CDataGathererBuilder(model_t::E_Metric, features, params, KEY, startTime) + .influenceFieldNames(influencerNames) + .sampleCountOverride(2U) + .build(); + + BOOST_REQUIRE_EQUAL(0, addPerson("p", gatherer, m_ResourceMonitor, 1)); + + // Add arrivals with an oversized influencer value (bypasses CAnomalyJob input truncation). + std::string const oversizedInfluencer(500, 'y'); + addArrival(gatherer, m_ResourceMonitor, startTime + 1, "p", 1.0, oversizedInfluencer); + addArrival(gatherer, m_ResourceMonitor, startTime + 2, "p", 2.0, oversizedInfluencer); + + // Advance past the first bucket so influencer sums are flushed to the persistable queue. + gatherer.timeNow(startTime + bucketLength); + + // Persist — the JSON will contain the oversized influencer value. + std::ostringstream origJson; + core::CJsonStatePersistInserter::persist( + origJson, [&gatherer](core::CJsonStatePersistInserter& inserter) { + gatherer.acceptPersistInserter(inserter); + }); + + // Sanity check: the persisted JSON contains the full oversized value. + BOOST_TEST_REQUIRE(origJson.str().find(oversizedInfluencer) != std::string::npos); + + // Restore from persisted JSON — truncation should apply. + std::istringstream origJsonStrm{"{\"topLevel\" : " + origJson.str() + "}"}; + core::CJsonStateRestoreTraverser traverser(origJsonStrm); + + CBucketGatherer::SBucketGathererInitData bucketGathererInitData{ + EMPTY_STRING, EMPTY_STRING, EMPTY_STRING, EMPTY_STRING, {}, 0, 0}; + CDataGatherer restoredGatherer(model_t::E_Metric, model_t::E_None, params, EMPTY_STRING, + KEY, bucketGathererInitData, traverser); + + // Persist restored gatherer — should NOT contain the oversized value. + std::ostringstream restoredJson; + core::CJsonStatePersistInserter::persist( + restoredJson, [&restoredGatherer](core::CJsonStatePersistInserter& inserter) { + restoredGatherer.acceptPersistInserter(inserter); + }); + + // The full 500-char string must no longer appear (it was truncated to 256). + BOOST_TEST_REQUIRE(restoredJson.str().find(oversizedInfluencer) == std::string::npos); + // Restore-path truncation must produce the same format as CFieldValueTruncator::truncated. + std::string const expectedTruncated = + model::CFieldValueTruncator::truncated(oversizedInfluencer); + BOOST_TEST_REQUIRE(restoredJson.str().find(expectedTruncated) != std::string::npos); + + // Verify idempotency: restore again and persist — should be identical. + std::istringstream restoredJsonStrm{"{\"topLevel\" : " + restoredJson.str() + "}"}; + core::CJsonStateRestoreTraverser traverser2(restoredJsonStrm); + CDataGatherer restoredGatherer2(model_t::E_Metric, model_t::E_None, params, EMPTY_STRING, + KEY, bucketGathererInitData, traverser2); + + std::ostringstream restoredJson2; + core::CJsonStatePersistInserter::persist( + restoredJson2, [&restoredGatherer2](core::CJsonStatePersistInserter& inserter) { + restoredGatherer2.acceptPersistInserter(inserter); + }); + + BOOST_REQUIRE_EQUAL(restoredJson.str(), restoredJson2.str()); +} + BOOST_AUTO_TEST_SUITE_END()