Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@

=== Bug Fixes

* Report RSS in bytes instead of pages. (See {ml-pull}[#2917].)
* Truncate oversized field values to prevent autodetect process crash. (See {ml-pull}2929[#2929], {es-pull}143180[#143180], issue: {ml-issue}2796[#2796].)
* Report RSS in bytes instead of pages. (See {ml-pull}2917[#2917].)

=== Enhancements

Expand Down
12 changes: 12 additions & 0 deletions include/api/CAnomalyJob.h
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,18 @@ class API_EXPORT CAnomalyJob : public CDataProcessor {
core_t::TTime time,
const TStrStrUMap& dataRowFields);

//! Prepare field values with truncation handling.
//! Extracts field values from \p dataRowFields, truncates oversized values,
//! and populates \p fieldValues with pointers to either original or truncated values.
//! \param fieldNames The names of fields to extract
//! \param dataRowFields The data row containing field values
//! \param fieldValues Output vector of pointers to field values
//! \param truncatedCopies Storage for truncated copies (must remain valid while fieldValues is used)
static void prepareTruncatedFieldValues(const TStrVec& fieldNames,
const TStrStrUMap& dataRowFields,
model::CAnomalyDetector::TStrCPtrVec& fieldValues,
TStrVec& truncatedCopies);

//! Parses a control message requesting that model state be persisted.
//! Extracts optional arguments to be used for persistence.
static bool parsePersistControlMessageArgs(const std::string& controlMessageArgs,
Expand Down
3 changes: 2 additions & 1 deletion include/core/CLoggerThrottler.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ namespace core {
//! This is thread safe but uses a very simple strategy: all accesses to a single
//! hash map are sychronised. We assume that log throttling is only applied to
//! messages which normally occur infrequently; for example, this is only currently
//! applied to WARN and ERROR level logging (see LogMacros.h). So there will be
//! applied to WARN, ERROR, and throttled INFO (LOG_INFO_THROTTLED) logging
//! (see LogMacros.h). So there will be
//! little contention. Furthermore, the overhead of locking and unlocking the mutex
//! should be neglible compared to the work done if the log line were actually
//! emitted. So this should actually give a significant performance improvement
Expand Down
18 changes: 18 additions & 0 deletions include/core/LogMacros.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,24 @@
BOOST_LOG_STREAM_SEV(ml::core::CLogger::instance().logger(), ml::core::CLogger::E_Info) \
LOG_LOCATION_INFO \
message
#ifdef LOG_INFO_THROTTLED
#undef LOG_INFO_THROTTLED
#endif
#define LOG_INFO_THROTTLED(message) \
do { \
std::size_t countOfInfoMessages; \
bool skipInfoMessage; \
std::tie(countOfInfoMessages, skipInfoMessage) = \
ml::core::CLogger::instance().throttler().skip(__FILE__, __LINE__); \
if (skipInfoMessage == false) { \
BOOST_LOG_STREAM_SEV(ml::core::CLogger::instance().logger(), \
ml::core::CLogger::E_Info) \
LOG_LOCATION_INFO \
message << (countOfInfoMessages > 1 \
? " | repeated [" + std::to_string(countOfInfoMessages) + "]" \
: ""); \
} \
} while (0)
#ifdef LOG_WARN
#undef LOG_WARN
#endif
Expand Down
144 changes: 144 additions & 0 deletions include/model/CFieldValueTruncator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the following additional limitation. Functionality enabled by the
* files subject to the Elastic License 2.0 may only be used in production when
* invoked by an Elasticsearch process with a license key installed that permits
* use of machine learning features. You may not use this file except in
* compliance with the Elastic License 2.0 and the foregoing additional
* limitation.
*/
#ifndef INCLUDED_ml_model_CFieldValueTruncator_h
#define INCLUDED_ml_model_CFieldValueTruncator_h

#include <core/CHashing.h>

#include <model/ImportExport.h>

#include <cstdio>
#include <string>

namespace ml {
namespace model {

//! \brief Enforces term field length constraints with collision prevention.
//!
//! In the anomaly detection domain, term fields (by, over, partition, influencer)
//! are categorical identifiers that must satisfy two invariants:
//! 1. **Bounded Length** - Prevent memory amplification and OOM crashes
//! 2. **Unique Identity** - Distinct field values must remain distinguishable
//!
//! Values exceeding MAX_FIELD_VALUE_LENGTH (256 chars) are transformed using
//! collision-safe truncation:
//! - Retain PREFIX_LENGTH (239) characters of original value
//! - Append HASH_SEPARATOR ('$')
//! - Append HASH_HEX_DIGITS (16) character hex hash of complete original value
//!
//! Format: "<prefix_239_chars>$<hash_16_hex_chars>"
//! Example: "very_long_field_value_that_exceeds_limit_(...)$a1b2c3d4e5f67890"
//!
//! The 256-character limit aligns with Elasticsearch's ignore_above default
//! for keyword fields. The hash suffix ensures data integrity while maintaining
//! human readability (first 239 characters visible) and compatibility with
//! prefix-based filtering.
class MODEL_EXPORT CFieldValueTruncator {
public:
//! Maximum length for term fields in anomaly detection.
static constexpr std::size_t MAX_FIELD_VALUE_LENGTH = 256;

//! Collision prevention format components
static constexpr char HASH_SEPARATOR = '$';
static constexpr std::size_t HASH_HEX_DIGITS = 16; // 16 hex chars = full 64-bit hash
static constexpr std::size_t HASH_SUFFIX_LENGTH = 1 /* separator */ + HASH_HEX_DIGITS; // 17 total

//! Content prefix length (readable portion after truncation)
static constexpr std::size_t PREFIX_LENGTH = MAX_FIELD_VALUE_LENGTH - HASH_SUFFIX_LENGTH; // 239

// Domain invariants (enforced at compile-time)
static_assert(PREFIX_LENGTH + HASH_SUFFIX_LENGTH == MAX_FIELD_VALUE_LENGTH,
"Term field format invariant: prefix + suffix = total length");
static_assert(PREFIX_LENGTH >= 200,
"Readable prefix must be substantial for human comprehension");

//! Check if a term field value exceeds the domain constraint.
//! \return true if the value requires length enforcement
static bool needsTruncation(const std::string& value) {
return value.size() > MAX_FIELD_VALUE_LENGTH;
}

//! Enforce term field length constraint in-place.
//! Applies collision-safe truncation for values exceeding the limit.
//! \param[in,out] value Field value to constrain
//! \return true if truncation was applied, false if already within limit
static bool truncate(std::string& value) {
if (needsTruncation(value) == false) {
return false;
}

std::string originalValue = std::move(value);
value.assign(originalValue, 0, PREFIX_LENGTH);
appendCollisionPreventionSuffix(originalValue, value);

return true;
}

//! Enforce term field length constraint, returning constrained copy.
//! \param value Original field value
//! \return Copy with length constraint enforced
static std::string truncated(const std::string& value) {
if (needsTruncation(value) == false) {
return value;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is copying the string value, right? (because the return type is string)
Is that intentional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It creates a copy because truncation is sometimes used at places where the caller does not own the input. Hence, having only in-place truncation is insufficient.

}

std::string result;
result.reserve(MAX_FIELD_VALUE_LENGTH);
result.assign(value, 0, PREFIX_LENGTH);
appendCollisionPreventionSuffix(value, result);

return result;
}

private:
//! \brief Hash encoding for collision prevention.
//!
//! Encapsulates the technical details of hash computation and formatting.
//! Separated from domain logic for clarity and testability.
struct HashEncoding {
//! Compute collision-resistant identity hash.
//! Uses safeMurmurHash64 (endian-neutral) for state persistence safety.
static std::uint64_t compute(const std::string& value) {
return core::CHashing::safeMurmurHash64(value.data(),
static_cast<int>(value.size()),
0); // Fixed seed for determinism
}

//! Format 64-bit hash as zero-padded lowercase hex string.
//! \param hash The hash value to format
//! \param[out] buffer Must be at least HASH_HEX_DIGITS + 1 bytes
//! \return Pointer to null-terminated hex string in buffer
static const char* toHex(std::uint64_t hash, char* buffer) {
// %016llx produces 16-char zero-padded lowercase hex (full 64 bits)
std::snprintf(buffer, HASH_HEX_DIGITS + 1, "%016llx",
static_cast<unsigned long long>(hash));
return buffer;
}
};

//! Append collision-prevention suffix: separator + hash.
//! \param originalValue Complete untruncated value for hash computation
//! \param[in,out] prefix Truncated prefix to which suffix is appended
static void appendCollisionPreventionSuffix(const std::string& originalValue,
std::string& prefix) {
std::uint64_t identityHash = HashEncoding::compute(originalValue);

prefix.reserve(MAX_FIELD_VALUE_LENGTH);
prefix.push_back(HASH_SEPARATOR);

char hashHexBuffer[HASH_HEX_DIGITS + 1];
prefix.append(HashEncoding::toHex(identityHash, hashHexBuffer));
}
};
}
}

#endif // INCLUDED_ml_model_CFieldValueTruncator_h
37 changes: 33 additions & 4 deletions lib/api/CAnomalyJob.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <maths/common/CIntegerTools.h>
#include <maths/common/COrderings.h>

#include <model/CFieldValueTruncator.h>
#include <model/CHierarchicalResultsAggregator.h>
#include <model/CHierarchicalResultsPopulator.h>
#include <model/CHierarchicalResultsProbabilityFinalizer.h>
Expand Down Expand Up @@ -1700,15 +1701,43 @@ const std::string* CAnomalyJob::fieldValue(const std::string& fieldName,
return !fieldName.empty() && fieldValue.empty() ? nullptr : &fieldValue;
}

void CAnomalyJob::prepareTruncatedFieldValues(const TStrVec& fieldNames,
const TStrStrUMap& dataRowFields,
model::CAnomalyDetector::TStrCPtrVec& fieldValues,
TStrVec& truncatedCopies) {

fieldValues.reserve(fieldNames.size());
// Reserve ensures no reallocation invalidates pointers stored in fieldValues.
truncatedCopies.reserve(fieldNames.size());

for (const auto& fieldName : fieldNames) {
const std::string* value = fieldValue(fieldName, dataRowFields);
if (value != nullptr && model::CFieldValueTruncator::needsTruncation(*value)) {
truncatedCopies.push_back(model::CFieldValueTruncator::truncated(*value));
fieldValues.push_back(&truncatedCopies.back());

std::string escapedFieldName = fieldName;
core::CStringUtils::escape('\\', "\n\r\t", escapedFieldName);
LOG_INFO_THROTTLED(
<< "Field '" << escapedFieldName
<< "' value (length=" << value->size() << ", prefix='"
<< value->substr(0, std::min<std::size_t>(50, value->size()))
<< "...') exceeds " << model::CFieldValueTruncator::MAX_FIELD_VALUE_LENGTH
<< " characters and has been truncated with collision-safe hash suffix");
} else {
fieldValues.push_back(value);
}
}
}

void CAnomalyJob::addRecord(const TAnomalyDetectorPtr& detector,
core_t::TTime time,
const TStrStrUMap& dataRowFields) {
model::CAnomalyDetector::TStrCPtrVec fieldValues;
TStrVec truncatedCopies;
const TStrVec& fieldNames = detector->fieldsOfInterest();
fieldValues.reserve(fieldNames.size());
for (const auto& fieldName : fieldNames) {
fieldValues.push_back(fieldValue(fieldName, dataRowFields));
}

prepareTruncatedFieldValues(fieldNames, dataRowFields, fieldValues, truncatedCopies);

detector->addRecord(time, fieldValues);
}
Expand Down
9 changes: 8 additions & 1 deletion lib/api/CDataProcessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
#include <core/CStringUtils.h>
#include <core/CTimeUtils.h>

#include <model/CFieldValueTruncator.h>

namespace ml {
namespace api {

Expand Down Expand Up @@ -49,7 +51,12 @@ std::string CDataProcessor::debugPrintRecord(const TStrStrUMap& dataRowFields) {
fieldValues.push_back(',');
}
fieldNames.append(rowIter->first);
fieldValues.append(rowIter->second);
const std::string& val = rowIter->second;
if (model::CFieldValueTruncator::needsTruncation(val)) {
fieldValues.append(model::CFieldValueTruncator::truncated(val));
} else {
fieldValues.append(val);
}
}

result << fieldNames << core_t::LINE_ENDING << fieldValues;
Expand Down
92 changes: 92 additions & 0 deletions lib/api/unittest/CAnomalyJobTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <model/CAnomalyDetectorModelConfig.h>
#include <model/CDataGatherer.h>
#include <model/CFieldValueTruncator.h>
#include <model/CLimits.h>

#include <api/CAnomalyJobConfig.h>
Expand Down Expand Up @@ -1205,4 +1206,95 @@ BOOST_AUTO_TEST_CASE(testHierarchicalResultsNormalizerShouldIncreaseMemoryUsage)
resourceMonitor.forceRefreshAll();
BOOST_TEST_REQUIRE(resourceMonitor.totalMemory() < memoryUsageBeforeUnregister);
}

BOOST_AUTO_TEST_CASE(testOversizedFieldValuesTruncated) {
// Verify that addRecord (via prepareTruncatedFieldValues) truncates oversized
// by/influencer values before they enter the model. We assert on persisted
// state because that reflects what the detector stored; if addRecord did
// not truncate, the full value would appear here.
model::CLimits limits;
api::CAnomalyJobConfig jobConfig = CTestAnomalyJob::makeSimpleJobConfig(
"count", "", "by_field", "", "", {"influencer_field"});

model::CAnomalyDetectorModelConfig modelConfig =
model::CAnomalyDetectorModelConfig::defaultConfig(BUCKET_SIZE);
std::stringstream outputStrm;
core::CJsonOutputStreamWrapper wrappedOutputStream(outputStrm);

CTestAnomalyJob job("job", limits, jobConfig, modelConfig, wrappedOutputStream);

std::string const oversizedValue(1000, 'x');
CTestAnomalyJob::TStrStrUMap dataRows{{"time", "1000"},
{"by_field", oversizedValue},
{"influencer_field", oversizedValue}};

BOOST_TEST_REQUIRE(job.handleRecord(dataRows));
BOOST_REQUIRE_EQUAL(uint64_t(1), job.numRecordsHandled());

// Advance past bucket boundary so results are output and state can be persisted.
CTestAnomalyJob::TStrStrUMap advanceRows{{"time", "5000"},
{"by_field", oversizedValue},
{"influencer_field", oversizedValue}};
BOOST_TEST_REQUIRE(job.handleRecord(advanceRows));
BOOST_REQUIRE_EQUAL(uint64_t(2), job.numRecordsHandled());

std::ostringstream* strm{nullptr};
api::CSingleStreamDataAdder::TOStreamP ptr{strm = new std::ostringstream()};
api::CSingleStreamDataAdder persister{ptr};
BOOST_TEST_REQUIRE(job.persistStateInForeground(persister, ""));
std::string const persistedState{strm->str()};

// Full oversized value must not be in state (addRecord truncated before store).
BOOST_TEST_REQUIRE(persistedState.find(oversizedValue) == std::string::npos);
// Persisted state must contain the truncated form produced by input truncation.
std::string const expectedTruncated = model::CFieldValueTruncator::truncated(oversizedValue);
BOOST_TEST_REQUIRE(persistedState.find(expectedTruncated) != std::string::npos);
}

BOOST_AUTO_TEST_CASE(testNormalFieldValuesNotTruncated) {
model::CLimits limits;
api::CAnomalyJobConfig jobConfig = CTestAnomalyJob::makeSimpleJobConfig(
"count", "", "by_field", "", "", {"influencer_field"});

model::CAnomalyDetectorModelConfig modelConfig =
model::CAnomalyDetectorModelConfig::defaultConfig(BUCKET_SIZE);
std::stringstream outputStrm;
core::CJsonOutputStreamWrapper wrappedOutputStream(outputStrm);

CTestAnomalyJob job("job", limits, jobConfig, modelConfig, wrappedOutputStream);

std::string const normalValue("normal_value");
CTestAnomalyJob::TStrStrUMap dataRows{
{"time", "1000"}, {"by_field", normalValue}, {"influencer_field", normalValue}};

BOOST_TEST_REQUIRE(job.handleRecord(dataRows));
BOOST_REQUIRE_EQUAL(uint64_t(1), job.numRecordsHandled());

// Advance past bucket boundary so results are output and state can be persisted.
CTestAnomalyJob::TStrStrUMap advanceRows{
{"time", "5000"}, {"by_field", normalValue}, {"influencer_field", normalValue}};
BOOST_TEST_REQUIRE(job.handleRecord(advanceRows));
BOOST_REQUIRE_EQUAL(uint64_t(2), job.numRecordsHandled());

std::ostringstream* strm{nullptr};
api::CSingleStreamDataAdder::TOStreamP ptr{strm = new std::ostringstream()};
api::CSingleStreamDataAdder persister{ptr};
BOOST_TEST_REQUIRE(job.persistStateInForeground(persister, ""));
std::string const persistedState{strm->str()};

BOOST_TEST_REQUIRE(persistedState.find(normalValue) != std::string::npos);
}

BOOST_AUTO_TEST_CASE(testDebugPrintRecordTruncatesLongValues) {
api::CDataProcessor::TStrStrUMap record;
record["field1"] = std::string(1000, 'x');
record["field2"] = "short";
std::string result = api::CDataProcessor::debugPrintRecord(record);
// truncated() produces prefix + '$' + 16 hex chars; full 1000-char value not present
BOOST_TEST_REQUIRE(result.find(std::string(1000, 'x')) == std::string::npos);
BOOST_TEST_REQUIRE(result.find(model::CFieldValueTruncator::HASH_SEPARATOR) !=
std::string::npos);
BOOST_TEST_REQUIRE(result.size() < 500);
}

BOOST_AUTO_TEST_SUITE_END()
Loading