diff --git a/communication/src/main/java/datadog/communication/BackendApiFactory.java b/communication/src/main/java/datadog/communication/BackendApiFactory.java index 3ce78b88c22..f944d95eb5c 100644 --- a/communication/src/main/java/datadog/communication/BackendApiFactory.java +++ b/communication/src/main/java/datadog/communication/BackendApiFactory.java @@ -24,6 +24,11 @@ public BackendApiFactory(Config config, SharedCommunicationObjects sharedCommuni } public @Nullable BackendApi createBackendApi(Intake intake) { + return createBackendApi(intake, null, true); + } + + public @Nullable BackendApi createBackendApi( + Intake intake, @Nullable String preferredEvpProxyEndpoint, boolean responseCompression) { HttpRetryPolicy.Factory retryPolicyFactory = new HttpRetryPolicy.Factory(5, 100, 2.0, true); if (intake.isAgentlessEnabled(config)) { @@ -49,6 +54,15 @@ public BackendApiFactory(Config config, SharedCommunicationObjects sharedCommuni if (featuresDiscovery.supportsEvpProxy()) { String traceId = config.getIdGenerationStrategy().generateTraceId().toString(); String evpProxyEndpoint = featuresDiscovery.getEvpProxyEndpoint(); + if (preferredEvpProxyEndpoint != null + && featuresDiscovery.supportsEvpProxyEndpoint(preferredEvpProxyEndpoint)) { + evpProxyEndpoint = preferredEvpProxyEndpoint; + } + log.debug( + "Creating EVP proxy client for {} using endpoint {} with responseCompression={}", + intake, + evpProxyEndpoint, + responseCompression); HttpUrl evpProxyUrl = sharedCommunicationObjects.agentUrl.resolve(evpProxyEndpoint); String subdomain = intake.getUrlPrefix(); return new EvpProxyApi( @@ -57,7 +71,7 @@ public BackendApiFactory(Config config, SharedCommunicationObjects sharedCommuni subdomain, retryPolicyFactory, sharedCommunicationObjects.agentHttpClient, - true); + responseCompression); } log.warn( diff --git a/communication/src/main/java/datadog/communication/EvpProxy.java b/communication/src/main/java/datadog/communication/EvpProxy.java new file mode 100644 index 00000000000..2a88a6d9fdf --- /dev/null +++ b/communication/src/main/java/datadog/communication/EvpProxy.java @@ -0,0 +1,12 @@ +package datadog.communication; + +/** Shared EVP proxy constants. */ +public final class EvpProxy { + + public static final String SUBDOMAIN_HEADER = "X-Datadog-EVP-Subdomain"; + + /** EVP uncompressed request-body limit in bytes. */ + public static final int PAYLOAD_SIZE_LIMIT_BYTES = 5 * 1024 * 1024; + + private EvpProxy() {} +} diff --git a/communication/src/main/java/datadog/communication/EvpProxyApi.java b/communication/src/main/java/datadog/communication/EvpProxyApi.java index 83037ab9663..b00838c68f3 100644 --- a/communication/src/main/java/datadog/communication/EvpProxyApi.java +++ b/communication/src/main/java/datadog/communication/EvpProxyApi.java @@ -20,7 +20,6 @@ public class EvpProxyApi implements BackendApi { private static final Logger log = LoggerFactory.getLogger(EvpProxyApi.class); private static final String API_VERSION = "v2"; - private static final String X_DATADOG_EVP_SUBDOMAIN_HEADER = "X-Datadog-EVP-Subdomain"; private static final String X_DATADOG_TRACE_ID_HEADER = "x-datadog-trace-id"; private static final String X_DATADOG_PARENT_ID_HEADER = "x-datadog-parent-id"; private static final String ACCEPT_ENCODING_HEADER = "Accept-Encoding"; @@ -62,7 +61,7 @@ public T post( Request.Builder requestBuilder = new Request.Builder() .url(url) - .addHeader(X_DATADOG_EVP_SUBDOMAIN_HEADER, subdomain) + .addHeader(EvpProxy.SUBDOMAIN_HEADER, subdomain) .addHeader(X_DATADOG_TRACE_ID_HEADER, traceId) .addHeader(X_DATADOG_PARENT_ID_HEADER, traceId); @@ -79,6 +78,11 @@ public T post( } final Request request = requestBuilder.post(requestBody).build(); + log.debug( + "Posting EVP request to {} with responseCompression={} requestCompression={}", + url, + responseCompression, + requestCompression); try (okhttp3.Response response = OkHttpUtils.sendWithRetries(httpClient, retryPolicyFactory, request)) { diff --git a/communication/src/main/java/datadog/communication/ddagent/DDAgentFeaturesDiscovery.java b/communication/src/main/java/datadog/communication/ddagent/DDAgentFeaturesDiscovery.java index 55929c73f51..6b05ad82533 100644 --- a/communication/src/main/java/datadog/communication/ddagent/DDAgentFeaturesDiscovery.java +++ b/communication/src/main/java/datadog/communication/ddagent/DDAgentFeaturesDiscovery.java @@ -98,6 +98,7 @@ private static class State { String debuggerSnapshotEndpoint; String debuggerDiagnosticsEndpoint; String evpProxyEndpoint; + Set evpProxyEndpoints = emptySet(); String version; String telemetryProxyEndpoint; Set peerTags = emptySet(); @@ -288,6 +289,7 @@ private boolean processInfoResponse(State newState, String response) { break; } } + newState.evpProxyEndpoints = unmodifiableSet(endpoints); for (String endpoint : telemetryProxyEndpoints) { if (containsEndpoint(endpoints, endpoint)) { @@ -426,6 +428,10 @@ public String getEvpProxyEndpoint() { return discoveryState.evpProxyEndpoint; } + public boolean supportsEvpProxyEndpoint(String endpoint) { + return containsEndpoint(discoveryState.evpProxyEndpoints, endpoint); + } + public HttpUrl buildUrl(String endpoint) { return agentBaseUrl.resolve(endpoint); } diff --git a/dd-trace-api/src/main/java/datadog/trace/api/config/FeatureFlaggingConfig.java b/dd-trace-api/src/main/java/datadog/trace/api/config/FeatureFlaggingConfig.java index 28151f88864..5f567908254 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/config/FeatureFlaggingConfig.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/config/FeatureFlaggingConfig.java @@ -3,4 +3,12 @@ public class FeatureFlaggingConfig { public static final String FLAGGING_PROVIDER_ENABLED = "experimental.flagging.provider.enabled"; + + /** + * Killswitch for the EVP {@code flagevaluation} emission path. Default: enabled. Disabling it + * turns off EVP flag-evaluation counts while leaving the OTel {@code feature_flag.evaluations} + * metric path untouched. Maps to {@code DD_FLAGGING_EVALUATION_COUNTS_ENABLED}. + */ + public static final String FLAGGING_EVALUATION_COUNTS_ENABLED = + "flagging.evaluation.counts.enabled"; } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/ddintake/DDEvpProxyApi.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/ddintake/DDEvpProxyApi.java index e911a980c31..9ec4eca5f4e 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/writer/ddintake/DDEvpProxyApi.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/ddintake/DDEvpProxyApi.java @@ -3,6 +3,7 @@ import static datadog.trace.common.writer.DDIntakeWriter.DEFAULT_INTAKE_TIMEOUT; import static datadog.trace.common.writer.DDIntakeWriter.DEFAULT_INTAKE_VERSION; +import datadog.communication.EvpProxy; import datadog.communication.http.HttpRetryPolicy; import datadog.communication.http.OkHttpUtils; import datadog.trace.api.civisibility.InstrumentationBridge; @@ -26,7 +27,6 @@ public class DDEvpProxyApi extends RemoteApi { private static final Logger log = LoggerFactory.getLogger(DDEvpProxyApi.class); - private static final String DD_EVP_SUBDOMAIN_HEADER = "X-Datadog-EVP-Subdomain"; private static final String CONTENT_ENCODING_HEADER = "Content-Encoding"; private static final String GZIP_CONTENT_TYPE = "gzip"; @@ -131,7 +131,7 @@ public Response sendSerializedTraces(Payload payload) { Request.Builder builder = new Request.Builder() .url(proxiedApiUrl) - .addHeader(DD_EVP_SUBDOMAIN_HEADER, subdomain) + .addHeader(EvpProxy.SUBDOMAIN_HEADER, subdomain) .tag(OkHttpUtils.CustomListener.class, telemetryListener); if (isCompressionEnabled()) { diff --git a/dd-trace-core/src/main/java/datadog/trace/llmobs/writer/ddintake/LLMObsSpanMapper.java b/dd-trace-core/src/main/java/datadog/trace/llmobs/writer/ddintake/LLMObsSpanMapper.java index 7849052b9d3..1f14502cc27 100644 --- a/dd-trace-core/src/main/java/datadog/trace/llmobs/writer/ddintake/LLMObsSpanMapper.java +++ b/dd-trace-core/src/main/java/datadog/trace/llmobs/writer/ddintake/LLMObsSpanMapper.java @@ -2,6 +2,7 @@ import static datadog.communication.http.OkHttpUtils.gzippedMsgpackRequestBodyOf; +import datadog.communication.EvpProxy; import datadog.communication.serialization.GrowableBuffer; import datadog.communication.serialization.Writable; import datadog.communication.serialization.msgpack.MsgPackWriter; @@ -99,7 +100,7 @@ public class LLMObsSpanMapper implements RemoteMapper { private int spansWritten; public LLMObsSpanMapper() { - this(5 << 20); + this(EvpProxy.PAYLOAD_SIZE_LIMIT_BYTES); } private LLMObsSpanMapper(int size) { diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/writer/ddintake/DDEvpProxyApiTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/writer/ddintake/DDEvpProxyApiTest.groovy index fd797695d99..52ad90a9514 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/writer/ddintake/DDEvpProxyApiTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/writer/ddintake/DDEvpProxyApiTest.groovy @@ -2,6 +2,7 @@ package datadog.trace.common.writer.ddintake import com.fasterxml.jackson.core.type.TypeReference import com.fasterxml.jackson.databind.ObjectMapper +import datadog.communication.EvpProxy import datadog.communication.serialization.ByteBufferConsumer import datadog.communication.serialization.FlushingBuffer import datadog.communication.serialization.msgpack.MsgPackWriter @@ -64,7 +65,7 @@ class DDEvpProxyApiTest extends DDCoreSpecification { clientResponse.status().present clientResponse.status().asInt == 200 agentEvpProxy.getLastRequest().path == path - agentEvpProxy.getLastRequest().getHeader(DDEvpProxyApi.DD_EVP_SUBDOMAIN_HEADER) == intakeSubdomain + agentEvpProxy.getLastRequest().getHeader(EvpProxy.SUBDOMAIN_HEADER) == intakeSubdomain cleanup: agentEvpProxy.close() @@ -100,7 +101,7 @@ class DDEvpProxyApiTest extends DDCoreSpecification { clientResponse.status().present clientResponse.status().asInt == 200 agentEvpProxy.getLastRequest().path == path - agentEvpProxy.getLastRequest().getHeader(DDEvpProxyApi.DD_EVP_SUBDOMAIN_HEADER) == intakeSubdomain + agentEvpProxy.getLastRequest().getHeader(EvpProxy.SUBDOMAIN_HEADER) == intakeSubdomain cleanup: agentEvpProxy.close() diff --git a/internal-api/src/main/java/datadog/trace/api/telemetry/CoreMetricCollector.java b/internal-api/src/main/java/datadog/trace/api/telemetry/CoreMetricCollector.java index d33fcf1529d..d09dc2f2ac0 100644 --- a/internal-api/src/main/java/datadog/trace/api/telemetry/CoreMetricCollector.java +++ b/internal-api/src/main/java/datadog/trace/api/telemetry/CoreMetricCollector.java @@ -29,6 +29,14 @@ private CoreMetricCollector() { this.metricsQueue = new ArrayBlockingQueue<>(RAW_QUEUE_SIZE); } + public void count(String metricName, long value, String tag) { + if (value <= 0) { + return; + } + this.metricsQueue.offer( + new CoreMetric(METRIC_NAMESPACE, true, metricName, "count", value, tag)); + } + @Override public void prepareMetrics() { // Collect span metrics diff --git a/internal-api/src/main/java/datadog/trace/util/AgentThreadFactory.java b/internal-api/src/main/java/datadog/trace/util/AgentThreadFactory.java index 752adb8899d..6001a221a13 100644 --- a/internal-api/src/main/java/datadog/trace/util/AgentThreadFactory.java +++ b/internal-api/src/main/java/datadog/trace/util/AgentThreadFactory.java @@ -66,7 +66,9 @@ public enum AgentThread { LLMOBS_EVALS_PROCESSOR("dd-llmobs-evals-processor"), - FEATURE_FLAG_EXPOSURE_PROCESSOR("dd-ffe-exposure-processor"); + FEATURE_FLAG_EXPOSURE_PROCESSOR("dd-ffe-exposure-processor"), + + FEATURE_FLAG_EVALUATION_PROCESSOR("dd-ffe-evaluation-processor"); public final String threadName; diff --git a/internal-api/src/test/groovy/datadog/trace/api/telemetry/CoreMetricCollectorTest.groovy b/internal-api/src/test/groovy/datadog/trace/api/telemetry/CoreMetricCollectorTest.groovy index 205da2bd0f5..5d0b920bfb9 100644 --- a/internal-api/src/test/groovy/datadog/trace/api/telemetry/CoreMetricCollectorTest.groovy +++ b/internal-api/src/test/groovy/datadog/trace/api/telemetry/CoreMetricCollectorTest.groovy @@ -53,4 +53,24 @@ class CoreMetricCollectorTest extends DDSpecification { collector.prepareMetrics() collector.drain().size() == limit } + + def "direct count core metric"() { + setup: + def collector = CoreMetricCollector.getInstance() + collector.drain() + + when: + collector.count('flagevaluation.rows.dropped', 3, 'reason:queue_overflow') + def metrics = collector.drain() + + then: + metrics.size() == 1 + + def metric = metrics[0] + metric.type == 'count' + metric.value == 3 + metric.namespace == 'tracers' + metric.metricName == 'flagevaluation.rows.dropped' + metric.tags == ['reason:queue_overflow'] + } } diff --git a/metadata/supported-configurations.json b/metadata/supported-configurations.json index 09498b2beda..77446820e3a 100644 --- a/metadata/supported-configurations.json +++ b/metadata/supported-configurations.json @@ -1489,6 +1489,14 @@ "aliases": [] } ], + "DD_FLAGGING_EVALUATION_COUNTS_ENABLED": [ + { + "version": "A", + "type": "boolean", + "default": "true", + "aliases": [] + } + ], "DD_FORCE_CLEAR_TEXT_HTTP_FOR_INTAKE_CLIENT": [ { "version": "A", diff --git a/products/feature-flagging/feature-flagging-agent/src/main/java/com/datadog/featureflag/FeatureFlaggingSystem.java b/products/feature-flagging/feature-flagging-agent/src/main/java/com/datadog/featureflag/FeatureFlaggingSystem.java index 02689767bad..d50bdaf92ee 100644 --- a/products/feature-flagging/feature-flagging-agent/src/main/java/com/datadog/featureflag/FeatureFlaggingSystem.java +++ b/products/feature-flagging/feature-flagging-agent/src/main/java/com/datadog/featureflag/FeatureFlaggingSystem.java @@ -2,6 +2,8 @@ import datadog.communication.ddagent.SharedCommunicationObjects; import datadog.trace.api.Config; +import datadog.trace.api.config.FeatureFlaggingConfig; +import datadog.trace.api.featureflag.flagevaluation.FlagEvaluationWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -11,6 +13,7 @@ public class FeatureFlaggingSystem { private static volatile RemoteConfigService CONFIG_SERVICE; private static volatile ExposureWriter EXPOSURE_WRITER; + private static volatile FlagEvaluationWriter FLAG_EVAL_WRITER; private FeatureFlaggingSystem() {} @@ -27,10 +30,31 @@ public static void start(final SharedCommunicationObjects sco) { EXPOSURE_WRITER = new ExposureWriterImpl(sco, config); EXPOSURE_WRITER.init(); + // EVP flagevaluation writer — gated by the killswitch + // DD_FLAGGING_EVALUATION_COUNTS_ENABLED (default: on), read through the tracer config system. + final boolean evalCountsEnabled = + config + .configProvider() + .getBoolean(FeatureFlaggingConfig.FLAGGING_EVALUATION_COUNTS_ENABLED, true); + if (evalCountsEnabled) { + final FlagEvaluationWriterImpl evalWriter = new FlagEvaluationWriterImpl(sco, config); + evalWriter.start(); // registers with FeatureFlaggingGateway + FLAG_EVAL_WRITER = evalWriter; + LOGGER.debug("Flag evaluation EVP writer started"); + } else { + LOGGER.debug( + "Flag evaluation EVP writer disabled ({}=false)", + FeatureFlaggingConfig.FLAGGING_EVALUATION_COUNTS_ENABLED); + } + LOGGER.debug("Feature Flagging system started"); } public static void stop() { + if (FLAG_EVAL_WRITER != null) { + FLAG_EVAL_WRITER.close(); // also deregisters from gateway + FLAG_EVAL_WRITER = null; + } if (EXPOSURE_WRITER != null) { EXPOSURE_WRITER.close(); EXPOSURE_WRITER = null; diff --git a/products/feature-flagging/feature-flagging-api/src/main/java/datadog/trace/api/openfeature/DDEvaluator.java b/products/feature-flagging/feature-flagging-api/src/main/java/datadog/trace/api/openfeature/DDEvaluator.java index 91c0aafdc7a..762ffe5997c 100644 --- a/products/feature-flagging/feature-flagging-api/src/main/java/datadog/trace/api/openfeature/DDEvaluator.java +++ b/products/feature-flagging/feature-flagging-api/src/main/java/datadog/trace/api/openfeature/DDEvaluator.java @@ -387,11 +387,15 @@ private static ProviderEvaluation resolveVariant( + e.getMessage()); } + // Stamp eval-time at the resolution point so first/last_evaluation reflect evaluation time, + // not hook-fire time. Passed to the hook via provider metadata "dd.eval.timestamp_ms". + final long evalTimestampMs = System.currentTimeMillis(); final ImmutableMetadata.ImmutableMetadataBuilder metadataBuilder = ImmutableMetadata.builder() .addString("flagKey", flag.key) .addString("variationType", flag.variationType.name()) - .addString("allocationKey", allocation.key); + .addString("allocationKey", allocation.key) + .addLong("dd.eval.timestamp_ms", evalTimestampMs); final ProviderEvaluation result = ProviderEvaluation.builder() .value(mappedValue) diff --git a/products/feature-flagging/feature-flagging-api/src/main/java/datadog/trace/api/openfeature/FlagEvalLoggingHook.java b/products/feature-flagging/feature-flagging-api/src/main/java/datadog/trace/api/openfeature/FlagEvalLoggingHook.java new file mode 100644 index 00000000000..6e682cd151a --- /dev/null +++ b/products/feature-flagging/feature-flagging-api/src/main/java/datadog/trace/api/openfeature/FlagEvalLoggingHook.java @@ -0,0 +1,117 @@ +package datadog.trace.api.openfeature; + +import datadog.trace.api.featureflag.FeatureFlaggingGateway; +import datadog.trace.api.featureflag.flagevaluation.FlagEvalEvent; +import datadog.trace.api.featureflag.flagevaluation.FlagEvaluationWriter; +import dev.openfeature.sdk.FlagEvaluationDetails; +import dev.openfeature.sdk.Hook; +import dev.openfeature.sdk.HookContext; +import dev.openfeature.sdk.ImmutableMetadata; +import java.util.Collections; +import java.util.Map; + +/** + * OpenFeature {@code Hook} that captures flag evaluation events for EVP {@code flagevaluation} + * emission. + * + *

Contract: {@code finallyAfter} does ONLY cheap scalar extraction + a non-blocking offer to the + * writer's bounded queue. No inline aggregation on the hook thread. + * + *

This hook is registered alongside the existing OTel {@link FlagEvalMetricsHook} — it does NOT + * replace it (the existing OTel metrics hook is left unchanged). + * + *

The writer is resolved lazily from {@link FeatureFlaggingGateway#getFlagEvalWriter()} on each + * call, so the hook is always safe to register — if the writer is absent (killswitch off or not yet + * started) it is a no-op. + */ +class FlagEvalLoggingHook implements Hook { + + /** + * Singleton instance: always registered when the provider is created; harmless when writer=null + * (killswitch off or not yet started). + */ + static final FlagEvalLoggingHook INSTANCE = new FlagEvalLoggingHook<>(null); + + /** + * Optional injected writer (test-only). When non-null, bypasses the gateway lookup. Production + * instances use {@code null} (resolved via gateway). + */ + private final FlagEvaluationWriter injectedWriter; + + /** Production constructor — resolves writer from gateway. */ + FlagEvalLoggingHook() { + this.injectedWriter = null; + } + + /** Test-only constructor — injects a writer directly, bypassing the gateway. */ + FlagEvalLoggingHook(final FlagEvaluationWriter writer) { + this.injectedWriter = writer; + } + + /** + * Cheap capture + non-blocking enqueue only. Runs at the {@code finally} stage so it covers + * success, error, and default-value paths. + */ + @Override + public void finallyAfter( + final HookContext ctx, + final FlagEvaluationDetails details, + final Map hints) { + // Resolve writer: prefer injected (test), then gateway + final FlagEvaluationWriter w = + injectedWriter != null ? injectedWriter : FeatureFlaggingGateway.getFlagEvalWriter(); + if (w == null || details == null) { + return; + } + try { + // Cheap scalar extraction — no JSON, no map lookups beyond metadata.asMap() + final String flagKey = details.getFlagKey(); + final ImmutableMetadata metadata = details.getFlagMetadata(); + + // allocationKey: "allocationKey" (camelCase) — consistent with FlagEvalMetricsHook.java + final String allocationKey = metadata != null ? metadata.getString("allocationKey") : null; + + // eval-time: from flag metadata "dd.eval.timestamp_ms" (Long), fallback to hook-fire time. + // ImmutableMetadata.getLong available since sdk 1.4+. + final Long evalTimeObj = metadata != null ? metadata.getLong("dd.eval.timestamp_ms") : null; + final long evalTimeMs = evalTimeObj != null ? evalTimeObj : System.currentTimeMillis(); + + // variant: the OpenFeature variant key (same source as the OTel FlagEvalMetricsHook), NOT the + // evaluated value. A null variant means no variant was selected (runtime default). + final String variant = details.getVariant(); + + // error message: prefer the human-readable message; fall back to the error code name when + // the message is empty (some providers populate only the code). null on success. + String errorMessage = details.getErrorMessage(); + if ((errorMessage == null || errorMessage.isEmpty()) && details.getErrorCode() != null) { + errorMessage = details.getErrorCode().name(); + } + if (errorMessage != null && errorMessage.isEmpty()) { + errorMessage = null; + } + + // targetingKey from evaluation context + final String targetingKey = + ctx != null && ctx.getCtx() != null ? ctx.getCtx().getTargetingKey() : null; + + // attrs: flatten EvaluationContext attributes for the full-tier canonical key + final Map attrs = extractAttrs(ctx); + + w.enqueue( + new FlagEvalEvent( + flagKey, variant, allocationKey, targetingKey, errorMessage, evalTimeMs, attrs)); + } catch (Exception e) { + // Never let EVP recording break flag evaluation + } + } + + /** Extracts converted, flattened attributes from the evaluation context. */ + private Map extractAttrs(final HookContext ctx) { + if (ctx == null || ctx.getCtx() == null) { + return Collections.emptyMap(); + } + final Map attrs = DDEvaluator.flattenContext(ctx.getCtx()); + attrs.remove("targetingKey"); + return attrs; + } +} diff --git a/products/feature-flagging/feature-flagging-api/src/main/java/datadog/trace/api/openfeature/FlagEvalHook.java b/products/feature-flagging/feature-flagging-api/src/main/java/datadog/trace/api/openfeature/FlagEvalMetricsHook.java similarity index 91% rename from products/feature-flagging/feature-flagging-api/src/main/java/datadog/trace/api/openfeature/FlagEvalHook.java rename to products/feature-flagging/feature-flagging-api/src/main/java/datadog/trace/api/openfeature/FlagEvalMetricsHook.java index 1132602a53f..a3e0dbfc83c 100644 --- a/products/feature-flagging/feature-flagging-api/src/main/java/datadog/trace/api/openfeature/FlagEvalHook.java +++ b/products/feature-flagging/feature-flagging-api/src/main/java/datadog/trace/api/openfeature/FlagEvalMetricsHook.java @@ -7,11 +7,11 @@ import dev.openfeature.sdk.ImmutableMetadata; import java.util.Map; -class FlagEvalHook implements Hook { +class FlagEvalMetricsHook implements Hook { private final FlagEvalMetrics metrics; - FlagEvalHook(FlagEvalMetrics metrics) { + FlagEvalMetricsHook(FlagEvalMetrics metrics) { this.metrics = metrics; } diff --git a/products/feature-flagging/feature-flagging-api/src/main/java/datadog/trace/api/openfeature/Provider.java b/products/feature-flagging/feature-flagging-api/src/main/java/datadog/trace/api/openfeature/Provider.java index c492ef49c69..7476f13fb7a 100644 --- a/products/feature-flagging/feature-flagging-api/src/main/java/datadog/trace/api/openfeature/Provider.java +++ b/products/feature-flagging/feature-flagging-api/src/main/java/datadog/trace/api/openfeature/Provider.java @@ -16,6 +16,7 @@ import dev.openfeature.sdk.exceptions.OpenFeatureError; import dev.openfeature.sdk.exceptions.ProviderNotReadyError; import java.lang.reflect.Constructor; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; @@ -34,7 +35,7 @@ public class Provider extends EventProvider implements Metadata { private final AtomicReference initializationState = new AtomicReference<>(InitializationState.NOT_STARTED); private final FlagEvalMetrics flagEvalMetrics; - private final FlagEvalHook flagEvalHook; + private final FlagEvalMetricsHook flagEvalMetricsHook; public Provider() { this(DEFAULT_OPTIONS, null); @@ -48,17 +49,17 @@ public Provider(final Options options) { this.options = options; this.evaluator = evaluator; FlagEvalMetrics metrics = null; - FlagEvalHook hook = null; + FlagEvalMetricsHook hook = null; try { metrics = new FlagEvalMetrics(); - hook = new FlagEvalHook(metrics); + hook = new FlagEvalMetricsHook(metrics); } catch (LinkageError | Exception e) { // FlagEvalMetrics logs the detailed error when it can load but OTel SDK init fails. // This outer catch fires when the class itself can't load (OTel API absent entirely). log.warn("Evaluation metrics unavailable — OTel classes not on classpath", e); } this.flagEvalMetrics = metrics; - this.flagEvalHook = hook; + this.flagEvalMetricsHook = hook; } @Override @@ -168,10 +169,14 @@ private Evaluator buildEvaluator() throws Exception { @Override public List getProviderHooks() { - if (flagEvalHook == null) { - return Collections.emptyList(); + final List hooks = new ArrayList<>(2); + if (flagEvalMetricsHook != null) { + hooks.add(flagEvalMetricsHook); } - return Collections.singletonList(flagEvalHook); + // EVP flagevaluation hook: always registered; no-op when writer is absent (killswitch off). + // Writer is resolved lazily from FeatureFlaggingGateway.getFlagEvalWriter() on each call. + hooks.add(FlagEvalLoggingHook.INSTANCE); + return Collections.unmodifiableList(hooks); } @Override diff --git a/products/feature-flagging/feature-flagging-api/src/test/java/datadog/trace/api/openfeature/FlagEvalLoggingHookTest.java b/products/feature-flagging/feature-flagging-api/src/test/java/datadog/trace/api/openfeature/FlagEvalLoggingHookTest.java new file mode 100644 index 00000000000..2e75108d236 --- /dev/null +++ b/products/feature-flagging/feature-flagging-api/src/test/java/datadog/trace/api/openfeature/FlagEvalLoggingHookTest.java @@ -0,0 +1,368 @@ +package datadog.trace.api.openfeature; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; + +import datadog.trace.api.featureflag.flagevaluation.FlagEvalEvent; +import datadog.trace.api.featureflag.flagevaluation.FlagEvaluationWriter; +import dev.openfeature.sdk.ErrorCode; +import dev.openfeature.sdk.FlagEvaluationDetails; +import dev.openfeature.sdk.FlagValueType; +import dev.openfeature.sdk.HookContext; +import dev.openfeature.sdk.ImmutableMetadata; +import dev.openfeature.sdk.MutableContext; +import dev.openfeature.sdk.Reason; +import dev.openfeature.sdk.Value; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.jupiter.api.Test; + +/** + * Unit tests for {@link FlagEvalLoggingHook}: cheap capture, non-blocking enqueue, eval-time + * metadata, absent-variant detection, and killswitch-via-writer-null behaviour. + */ +class FlagEvalLoggingHookTest { + + // ---- helpers ---- + + /** + * Creates a writer that captures the enqueued event for assertion. Uses an anonymous class since + * FlagEvaluationWriter has multiple abstract methods. + */ + private FlagEvaluationWriter capturingWriter(final AtomicReference ref) { + return new FlagEvaluationWriter() { + @Override + public void enqueue(final FlagEvalEvent event) { + ref.set(event); + } + + @Override + public void start() {} + + @Override + public void close() {} + }; + } + + private static FlagEvalLoggingHook hookWithWriter(final FlagEvaluationWriter writer) { + return new FlagEvalLoggingHook<>(writer); + } + + private static FlagEvaluationDetails details( + final String flagKey, + final Object value, + final String variant, + final String reason, + final ImmutableMetadata metadata) { + final FlagEvaluationDetails.FlagEvaluationDetailsBuilder builder = + FlagEvaluationDetails.builder().flagKey(flagKey).value(value).reason(reason); + if (variant != null) { + builder.variant(variant); + } + if (metadata != null) { + builder.flagMetadata(metadata); + } + return builder.build(); + } + + private static HookContext hookCtxWithTargetingKey( + final String flagKey, final String targetingKey) { + final MutableContext ctx = new MutableContext(targetingKey); + return HookContext.builder() + .flagKey(flagKey) + .type(FlagValueType.STRING) + .defaultValue("default") + .ctx(ctx) + .build(); + } + + // ---- test: hook calls writer.enqueue once with flagKey, variant, allocationKey ---- + + @Test + void finallyAfterEnqueuesEventWithAllBasicFields() { + final AtomicReference captured = new AtomicReference<>(); + final FlagEvalLoggingHook hook = hookWithWriter(capturingWriter(captured)); + + final FlagEvaluationDetails det = + details( + "my-flag", + "on-value", + "on", + Reason.TARGETING_MATCH.name(), + ImmutableMetadata.builder().addString("allocationKey", "alloc-1").build()); + + hook.finallyAfter(null, det, Collections.emptyMap()); + + assertNotNull(captured.get(), "writer.enqueue must be called once"); + final FlagEvalEvent e = captured.get(); + assertEquals("my-flag", e.flagKey); + assertEquals("on", e.variant, "variant must be the OpenFeature variant key"); + assertEquals("alloc-1", e.allocationKey); + } + + // ---- variant comes from details.getVariant(), NOT details.getValue() ---- + + @Test + void variantIsTheVariantKeyNotTheEvaluatedValue() { + final AtomicReference captured = new AtomicReference<>(); + final FlagEvalLoggingHook hook = hookWithWriter(capturingWriter(captured)); + + // value and variant DIFFER, so a value-vs-variant mistake is detectable. + final FlagEvaluationDetails det = + details( + "g1-flag", + "the-evaluated-value", // value + "the-variant-key", // variant + Reason.TARGETING_MATCH.name(), + null); + + hook.finallyAfter(null, det, Collections.emptyMap()); + + assertNotNull(captured.get()); + assertEquals( + "the-variant-key", + captured.get().variant, + "variant must be sourced from details.getVariant(), not details.getValue()"); + } + + // ---- test: evalTimeMs from metadata "dd.eval.timestamp_ms" ---- + + @Test + void evalTimeMsComesFromMetadataWhenPresent() { + final AtomicReference captured = new AtomicReference<>(); + final FlagEvalLoggingHook hook = hookWithWriter(capturingWriter(captured)); + + final long expectedTimestamp = 1_700_000_000_000L; + final FlagEvaluationDetails det = + details( + "ts-flag", + "v", + "v", + Reason.SPLIT.name(), + ImmutableMetadata.builder() + .addString("allocationKey", "a") + .addLong("dd.eval.timestamp_ms", expectedTimestamp) + .build()); + + hook.finallyAfter(null, det, Collections.emptyMap()); + + assertNotNull(captured.get()); + assertEquals( + expectedTimestamp, + captured.get().evalTimeMs, + "evalTimeMs must come from dd.eval.timestamp_ms metadata when present"); + } + + // ---- test: evalTimeMs falls back to System.currentTimeMillis() when absent ---- + + @Test + void evalTimeMsFallsBackToCurrentTimeWhenMetadataAbsent() { + final AtomicReference captured = new AtomicReference<>(); + final FlagEvalLoggingHook hook = hookWithWriter(capturingWriter(captured)); + + final long before = System.currentTimeMillis(); + final FlagEvaluationDetails det = + details("ts-flag", "v", "v", Reason.SPLIT.name(), null); + + hook.finallyAfter(null, det, Collections.emptyMap()); + + final long after = System.currentTimeMillis(); + assertNotNull(captured.get()); + final long ts = captured.get().evalTimeMs; + assertTrue( + ts >= before && ts <= after, + "evalTimeMs must fall back to hook-fire time when metadata absent. got: " + ts); + } + + // ---- test: absent variant -> variant is null -> runtime default ---- + + @Test + void absentVariantProducesNullVariant() { + final AtomicReference captured = new AtomicReference<>(); + final FlagEvalLoggingHook hook = hookWithWriter(capturingWriter(captured)); + + // A runtime default returns the default value but no variant. + final FlagEvaluationDetails det = + details("def-flag", "default-value", null, Reason.DEFAULT.name(), null); + + hook.finallyAfter(null, det, Collections.emptyMap()); + + assertNotNull(captured.get()); + assertNull(captured.get().variant, "Absent variant must stay null (runtime default)"); + } + + // ---- test: error message captured from details (error object support) ---- + + @Test + void errorMessageCapturedFromDetails() { + final AtomicReference captured = new AtomicReference<>(); + final FlagEvalLoggingHook hook = hookWithWriter(capturingWriter(captured)); + + final FlagEvaluationDetails det = + FlagEvaluationDetails.builder() + .flagKey("err-flag") + .value("default") + .reason(Reason.ERROR.name()) + .errorCode(ErrorCode.TYPE_MISMATCH) + .errorMessage("value does not match declared type") + .build(); + + hook.finallyAfter(null, det, Collections.emptyMap()); + + assertNotNull(captured.get()); + assertEquals( + "value does not match declared type", + captured.get().errorMessage, + "errorMessage must be captured from the evaluation details"); + } + + // ---- test: error code used as fallback message when error message is empty ---- + + @Test + void errorCodeUsedAsFallbackWhenMessageEmpty() { + final AtomicReference captured = new AtomicReference<>(); + final FlagEvalLoggingHook hook = hookWithWriter(capturingWriter(captured)); + + final FlagEvaluationDetails det = + FlagEvaluationDetails.builder() + .flagKey("err-flag") + .value("default") + .reason(Reason.ERROR.name()) + .errorCode(ErrorCode.FLAG_NOT_FOUND) + .build(); + + hook.finallyAfter(null, det, Collections.emptyMap()); + + assertNotNull(captured.get()); + assertEquals( + "FLAG_NOT_FOUND", + captured.get().errorMessage, + "error code name must be used when no error message is present"); + } + + // ---- test: success path has no error message ---- + + @Test + void successPathHasNullErrorMessage() { + final AtomicReference captured = new AtomicReference<>(); + final FlagEvalLoggingHook hook = hookWithWriter(capturingWriter(captured)); + + final FlagEvaluationDetails det = + details("ok-flag", "v", "v", Reason.TARGETING_MATCH.name(), null); + + hook.finallyAfter(null, det, Collections.emptyMap()); + + assertNotNull(captured.get()); + assertNull(captured.get().errorMessage, "success path must have no error message"); + } + + // ---- test: hook does NO aggregation on the hook thread ---- + + @Test + void finallyAfterOnlyCallsEnqueueNoOtherWriterMethods() { + final FlagEvaluationWriter writer = mock(FlagEvaluationWriter.class); + final FlagEvalLoggingHook hook = hookWithWriter(writer); + + final FlagEvaluationDetails det = + details("flag", "v", "v", Reason.TARGETING_MATCH.name(), null); + + hook.finallyAfter(null, det, Collections.emptyMap()); + + // Exactly one enqueue call, no start/close/aggregate + verify(writer, times(1)).enqueue(any(FlagEvalEvent.class)); + verify(writer, never()).close(); + verify(writer, never()).start(); + } + + // ---- test: writer=null -> no-op (killswitch off / not yet started) ---- + + @Test + void writerNullIsNoOp() { + final FlagEvalLoggingHook hook = hookWithWriter(null); + final FlagEvaluationDetails det = + details("flag", "v", "v", Reason.TARGETING_MATCH.name(), null); + + // Must not throw; nothing is enqueued + hook.finallyAfter(null, det, Collections.emptyMap()); + } + + // ---- test: details=null -> no-op ---- + + @Test + void detailsNullIsNoOp() { + final FlagEvaluationWriter writer = mock(FlagEvaluationWriter.class); + final FlagEvalLoggingHook hook = hookWithWriter(writer); + + // Should not throw + hook.finallyAfter(null, null, Collections.emptyMap()); + + verifyNoInteractions(writer); + } + + // ---- test: targetingKey extracted from evaluation context ---- + + @Test + void targetingKeyExtractedFromContext() { + final AtomicReference captured = new AtomicReference<>(); + final FlagEvalLoggingHook hook = hookWithWriter(capturingWriter(captured)); + + final FlagEvaluationDetails det = + details("ctx-flag", "v", "v", Reason.SPLIT.name(), null); + + final HookContext hookCtx = hookCtxWithTargetingKey("ctx-flag", "user-42"); + + hook.finallyAfter(hookCtx, det, Collections.emptyMap()); + + assertNotNull(captured.get()); + assertEquals( + "user-42", + captured.get().targetingKey, + "targetingKey must be extracted from the evaluation context"); + } + + @Test + void contextAttributesAreFlattenedAndConvertedBeforeEnqueue() { + final AtomicReference captured = new AtomicReference<>(); + final FlagEvalLoggingHook hook = hookWithWriter(capturingWriter(captured)); + + final Map profile = new HashMap<>(); + profile.put("tier", "gold"); + final Map attributes = new HashMap<>(); + attributes.put("score", 42); + attributes.put("profile", profile); + final MutableContext context = + new MutableContext(Value.objectToValue(attributes).asStructure().asMap()); + context.setTargetingKey("user-42"); + + final HookContext hookCtx = + HookContext.builder() + .flagKey("ctx-flag") + .type(FlagValueType.STRING) + .defaultValue("default") + .ctx(context) + .build(); + final FlagEvaluationDetails det = + details("ctx-flag", "v", "v", Reason.TARGETING_MATCH.name(), null); + + hook.finallyAfter(hookCtx, det, Collections.emptyMap()); + + assertNotNull(captured.get()); + assertEquals(42, captured.get().attrs.get("score")); + assertEquals("gold", captured.get().attrs.get("profile.tier")); + assertFalse(captured.get().attrs.containsKey("targetingKey")); + assertTrue( + captured.get().attrs.values().stream().noneMatch(Value.class::isInstance), + "context attrs must contain converted scalar values, not OpenFeature Value wrappers"); + } +} diff --git a/products/feature-flagging/feature-flagging-api/src/test/java/datadog/trace/api/openfeature/FlagEvalHookTest.java b/products/feature-flagging/feature-flagging-api/src/test/java/datadog/trace/api/openfeature/FlagEvalMetricsHookTest.java similarity index 89% rename from products/feature-flagging/feature-flagging-api/src/test/java/datadog/trace/api/openfeature/FlagEvalHookTest.java rename to products/feature-flagging/feature-flagging-api/src/test/java/datadog/trace/api/openfeature/FlagEvalMetricsHookTest.java index 8ed17d91cbb..322a06d2de7 100644 --- a/products/feature-flagging/feature-flagging-api/src/test/java/datadog/trace/api/openfeature/FlagEvalHookTest.java +++ b/products/feature-flagging/feature-flagging-api/src/test/java/datadog/trace/api/openfeature/FlagEvalMetricsHookTest.java @@ -13,12 +13,12 @@ import java.util.Collections; import org.junit.jupiter.api.Test; -class FlagEvalHookTest { +class FlagEvalMetricsHookTest { @Test void finallyAfterRecordsBasicEvaluation() { FlagEvalMetrics metrics = mock(FlagEvalMetrics.class); - FlagEvalHook hook = new FlagEvalHook(metrics); + FlagEvalMetricsHook hook = new FlagEvalMetricsHook(metrics); FlagEvaluationDetails details = FlagEvaluationDetails.builder() @@ -44,7 +44,7 @@ void finallyAfterRecordsBasicEvaluation() { @Test void finallyAfterRecordsErrorEvaluation() { FlagEvalMetrics metrics = mock(FlagEvalMetrics.class); - FlagEvalHook hook = new FlagEvalHook(metrics); + FlagEvalMetricsHook hook = new FlagEvalMetricsHook(metrics); FlagEvaluationDetails details = FlagEvaluationDetails.builder() @@ -68,7 +68,7 @@ void finallyAfterRecordsErrorEvaluation() { @Test void finallyAfterHandlesNullFlagMetadata() { FlagEvalMetrics metrics = mock(FlagEvalMetrics.class); - FlagEvalHook hook = new FlagEvalHook(metrics); + FlagEvalMetricsHook hook = new FlagEvalMetricsHook(metrics); FlagEvaluationDetails details = FlagEvaluationDetails.builder() @@ -87,7 +87,7 @@ void finallyAfterHandlesNullFlagMetadata() { @Test void finallyAfterHandlesNullVariantAndReason() { FlagEvalMetrics metrics = mock(FlagEvalMetrics.class); - FlagEvalHook hook = new FlagEvalHook(metrics); + FlagEvalMetricsHook hook = new FlagEvalMetricsHook(metrics); FlagEvaluationDetails details = FlagEvaluationDetails.builder().flagKey("my-flag").value("default").build(); @@ -100,7 +100,7 @@ void finallyAfterHandlesNullVariantAndReason() { @Test void finallyAfterNeverThrows() { FlagEvalMetrics metrics = mock(FlagEvalMetrics.class); - FlagEvalHook hook = new FlagEvalHook(metrics); + FlagEvalMetricsHook hook = new FlagEvalMetricsHook(metrics); // Should not throw even with completely null inputs hook.finallyAfter(null, null, null); @@ -110,7 +110,7 @@ void finallyAfterNeverThrows() { @Test void finallyAfterIsNoOpWhenMetricsIsNull() { - FlagEvalHook hook = new FlagEvalHook(null); + FlagEvalMetricsHook hook = new FlagEvalMetricsHook(null); FlagEvaluationDetails details = FlagEvaluationDetails.builder() diff --git a/products/feature-flagging/feature-flagging-api/src/test/java/datadog/trace/api/openfeature/ProviderTest.java b/products/feature-flagging/feature-flagging-api/src/test/java/datadog/trace/api/openfeature/ProviderTest.java index 27d4dd5d2b5..b99cc01c92f 100644 --- a/products/feature-flagging/feature-flagging-api/src/test/java/datadog/trace/api/openfeature/ProviderTest.java +++ b/products/feature-flagging/feature-flagging-api/src/test/java/datadog/trace/api/openfeature/ProviderTest.java @@ -326,12 +326,14 @@ protected Class loadEvaluatorClass() throws ClassNotFoundException { } @Test - public void testGetProviderHooksReturnsFlagEvalHook() { + public void testGetProviderHooksReturnsFlagEvalMetricsHook() { Provider provider = new Provider(new Options().initTimeout(10, MILLISECONDS), mock(Evaluator.class)); List hooks = provider.getProviderHooks(); - assertThat(hooks.size(), equalTo(1)); - assertThat(hooks.get(0) instanceof FlagEvalHook, equalTo(true)); + // Two hooks: OTel FlagEvalMetricsHook (index 0) + FlagEvalLoggingHook (index 1) + assertThat(hooks.size(), equalTo(2)); + assertThat(hooks.get(0) instanceof FlagEvalMetricsHook, equalTo(true)); + assertThat(hooks.get(1) instanceof FlagEvalLoggingHook, equalTo(true)); } @Test @@ -343,9 +345,8 @@ public void testShutdownCleansUpMetrics() throws Exception { provider.initialize(null); provider.shutdown(); verify(evaluator).shutdown(); - // After shutdown, getProviderHooks still returns a list (hook is still present but metrics is - // shut down) - assertThat(provider.getProviderHooks().size(), equalTo(1)); + // After shutdown, getProviderHooks still returns a list with both OTel + logging hooks + assertThat(provider.getProviderHooks().size(), equalTo(2)); } public interface EvaluateMethod { diff --git a/products/feature-flagging/feature-flagging-bootstrap/src/main/java/datadog/trace/api/featureflag/FeatureFlaggingGateway.java b/products/feature-flagging/feature-flagging-bootstrap/src/main/java/datadog/trace/api/featureflag/FeatureFlaggingGateway.java index b9d73ffa7ab..a0856fc0ab3 100644 --- a/products/feature-flagging/feature-flagging-bootstrap/src/main/java/datadog/trace/api/featureflag/FeatureFlaggingGateway.java +++ b/products/feature-flagging/feature-flagging-bootstrap/src/main/java/datadog/trace/api/featureflag/FeatureFlaggingGateway.java @@ -1,6 +1,7 @@ package datadog.trace.api.featureflag; import datadog.trace.api.featureflag.exposure.ExposureEvent; +import datadog.trace.api.featureflag.flagevaluation.FlagEvaluationWriter; import datadog.trace.api.featureflag.ufc.v1.ServerConfiguration; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; @@ -19,6 +20,15 @@ public interface ExposureListener extends Consumer {} private static final AtomicReference CURRENT_CONFIG = new AtomicReference<>(); + /** + * The active EVP flagevaluation writer. Registered by {@code FlagEvaluationWriterImpl.start()} + * when the killswitch {@code DD_FLAGGING_EVALUATION_COUNTS_ENABLED} is on (default). Read by + * {@code FlagEvalLoggingHook} to route evaluations into the two-tier aggregator. {@code null} + * when the EVP path is disabled. + */ + private static final AtomicReference FLAG_EVAL_WRITER = + new AtomicReference<>(); + private FeatureFlaggingGateway() {} public static void addConfigListener(final ConfigListener listener) { @@ -49,4 +59,23 @@ public static void removeExposureListener(final ExposureListener listener) { public static void dispatch(final ExposureEvent event) { EXPOSURE_LISTENERS.forEach(listener -> listener.accept(event)); } + + /** + * Registers the active EVP flagevaluation writer. Called by {@code + * FlagEvaluationWriterImpl.start()} when the feature is enabled. Replaces any previously + * registered writer. + * + * @param writer the writer to register, or {@code null} to deregister + */ + public static void setFlagEvalWriter(final FlagEvaluationWriter writer) { + FLAG_EVAL_WRITER.set(writer); + } + + /** + * Returns the active EVP flagevaluation writer, or {@code null} when disabled (killswitch off or + * not yet started). + */ + public static FlagEvaluationWriter getFlagEvalWriter() { + return FLAG_EVAL_WRITER.get(); + } } diff --git a/products/feature-flagging/feature-flagging-bootstrap/src/main/java/datadog/trace/api/featureflag/flagevaluation/FlagEvalEvent.java b/products/feature-flagging/feature-flagging-bootstrap/src/main/java/datadog/trace/api/featureflag/flagevaluation/FlagEvalEvent.java new file mode 100644 index 00000000000..5d7a3588492 --- /dev/null +++ b/products/feature-flagging/feature-flagging-bootstrap/src/main/java/datadog/trace/api/featureflag/flagevaluation/FlagEvalEvent.java @@ -0,0 +1,76 @@ +package datadog.trace.api.featureflag.flagevaluation; + +import java.util.Collections; +import java.util.Map; + +/** + * Lightweight data record capturing a single flag evaluation for EVP flagevaluation emission. + * + *

This is the currency passed from the {@code FlagEvalLoggingHook} (feature-flagging-api) to the + * {@code FlagEvaluationWriter} (feature-flagging-lib) via a non-blocking bounded queue. + * + *

All fields captured at hook-fire time on the evaluation thread. No aggregation happens here. + */ +public final class FlagEvalEvent { + + /** The feature flag key. Never null. */ + public final String flagKey; + + /** + * The evaluated variant/value as a string. {@code null} means the default value was returned + * (runtime default). + */ + public final String variant; + + /** The allocation key from flag metadata ("allocationKey"). May be null. */ + public final String allocationKey; + + /** The targeting key from the evaluation context. May be null. */ + public final String targetingKey; + + /** + * The evaluation error message when the evaluation failed, else {@code null}. Sourced from the + * OpenFeature evaluation details (error message, falling back to the error code). + */ + public final String errorMessage; + + /** + * Evaluation timestamp in milliseconds since epoch. Stamped at eval-entry time from flag metadata + * key {@code "dd.eval.timestamp_ms"}, or falls back to hook-fire time when absent. This ensures + * first/last_evaluation reflect evaluation time, not hook-fire time. + */ + public final long evalTimeMs; + + /** + * Flattened evaluation context attributes. Used for the full-tier canonical context key. May be + * empty but never null. + */ + public final Map attrs; + + public FlagEvalEvent( + final String flagKey, + final String variant, + final String allocationKey, + final String targetingKey, + final long evalTimeMs, + final Map attrs) { + this(flagKey, variant, allocationKey, targetingKey, null, evalTimeMs, attrs); + } + + public FlagEvalEvent( + final String flagKey, + final String variant, + final String allocationKey, + final String targetingKey, + final String errorMessage, + final long evalTimeMs, + final Map attrs) { + this.flagKey = flagKey; + this.variant = variant; + this.allocationKey = allocationKey; + this.targetingKey = targetingKey; + this.errorMessage = errorMessage; + this.evalTimeMs = evalTimeMs; + this.attrs = attrs != null ? attrs : Collections.emptyMap(); + } +} diff --git a/products/feature-flagging/feature-flagging-bootstrap/src/main/java/datadog/trace/api/featureflag/flagevaluation/FlagEvaluationWriter.java b/products/feature-flagging/feature-flagging-bootstrap/src/main/java/datadog/trace/api/featureflag/flagevaluation/FlagEvaluationWriter.java new file mode 100644 index 00000000000..9bfe40d11ca --- /dev/null +++ b/products/feature-flagging/feature-flagging-bootstrap/src/main/java/datadog/trace/api/featureflag/flagevaluation/FlagEvaluationWriter.java @@ -0,0 +1,27 @@ +package datadog.trace.api.featureflag.flagevaluation; + +/** + * Defines an EVP flagevaluation writer responsible for aggregating flag evaluation events and + * flushing them to the EVP proxy. + * + *

Implementations must use a background thread (serializing handler) for aggregation and + * transport. The {@link #enqueue(FlagEvalEvent)} method must be non-blocking and callable from the + * OpenFeature hook thread without backpressure. + */ +public interface FlagEvaluationWriter extends AutoCloseable { + + /** + * Non-blocking enqueue of a flag evaluation event. May silently drop the event if the internal + * bounded queue is full (best-effort, observable via drop counter). + * + * @param event the flag evaluation event captured at hook-fire time + */ + void enqueue(FlagEvalEvent event); + + /** Starts the background serializing thread. Must be called once after construction. */ + void start(); + + /** Stops the background thread and releases resources. */ + @Override + void close(); +} diff --git a/products/feature-flagging/feature-flagging-lib/build.gradle.kts b/products/feature-flagging/feature-flagging-lib/build.gradle.kts index 3291e239d40..7ea6674ece6 100644 --- a/products/feature-flagging/feature-flagging-lib/build.gradle.kts +++ b/products/feature-flagging/feature-flagging-lib/build.gradle.kts @@ -1,6 +1,7 @@ plugins { `java-library` id("dd-trace-java.version-file") + id("me.champeau.jmh") } apply(from = "$rootDir/gradle/java.gradle") @@ -28,3 +29,8 @@ dependencies { testImplementation(project(":utils:test-utils")) testImplementation(project(":dd-java-agent:testing")) } + +jmh { + jmhVersion = libs.versions.jmh.get() + duplicateClassesStrategy = DuplicatesStrategy.EXCLUDE +} diff --git a/products/feature-flagging/feature-flagging-lib/src/jmh/java/com/datadog/featureflag/FlagEvaluationHotPathBenchmark.java b/products/feature-flagging/feature-flagging-lib/src/jmh/java/com/datadog/featureflag/FlagEvaluationHotPathBenchmark.java new file mode 100644 index 00000000000..996e38bcd39 --- /dev/null +++ b/products/feature-flagging/feature-flagging-lib/src/jmh/java/com/datadog/featureflag/FlagEvaluationHotPathBenchmark.java @@ -0,0 +1,98 @@ +package com.datadog.featureflag; + +import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; + +import datadog.communication.BackendApiFactory; +import datadog.trace.api.Config; +import datadog.trace.api.featureflag.flagevaluation.FlagEvalEvent; +import java.util.HashMap; +import java.util.Map; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/** + * Hot-path benchmark for EVP {@code flagevaluation} recording. + * + *

The OpenFeature {@code Finally} hook runs synchronously on the caller's evaluation thread, so + * the cost it charges the user's evaluation must stay flat. This benchmark isolates the two stages: + * + *

    + *
  • {@code evalThreadCapture}: what the evaluation thread pays — building the lightweight + * {@link FlagEvalEvent} snapshot (the hook's scalar/shallow capture) plus the non-blocking + * {@code enqueue}. This is the cost that must stay flat under load. + *
  • {@code workerAggregate}: the deferred work that runs off the evaluation thread on the + * background worker — deterministic context prune + canonical-context key + two-tier map + * aggregation. Measured so the off-thread cost is characterized too. + *
+ * + *

Run: {@code ./gradlew :products:feature-flagging:feature-flagging-lib:jmh} (optionally {@code + * -PjmhIncludes=FlagEvaluationHotPathBenchmark}). Use {@code -prof gc} via JMH args for allocs. + */ +@State(Scope.Benchmark) +@Warmup(iterations = 3, time = 2, timeUnit = SECONDS) +@Measurement(iterations = 5, time = 1, timeUnit = SECONDS) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(NANOSECONDS) +@Fork(value = 1) +public class FlagEvaluationHotPathBenchmark { + + private Map attrs; + + // Eval-thread stage: a writer whose bounded queue is large and never drained, so enqueue cost is + // measured without a competing consumer thread. + private FlagEvaluationWriterImpl writer; + + // Worker stage: the aggregating handler, exercised directly (off the eval thread in production). + private FlagEvaluationWriterImpl.SerializingHandlerForTest handler; + + @Setup(Level.Iteration) + public void setUp() { + attrs = new HashMap<>(); + attrs.put("tier", "enterprise"); + attrs.put("region", "us-east-1"); + attrs.put("seats", 42); + attrs.put("beta", Boolean.TRUE); + + final Config config = Config.get(); + // The factory is never invoked here (neither the writer worker nor the handler is started), + // so a plain factory suffices; createBackendApi() is only called from run(). + final BackendApiFactory factory = new BackendApiFactory(config, null); + final Map ddContext = new HashMap<>(); + ddContext.put("service", "bench-service"); + handler = FlagEvaluationWriterImpl.createHandlerForTest(factory, ddContext); + + // Capacity large enough that the benchmark never overflows within a measurement window. + writer = new FlagEvaluationWriterImpl(1 << 20, Long.MAX_VALUE, NANOSECONDS, factory, config); + } + + /** Eval-thread cost: build the capture snapshot + non-blocking enqueue. */ + @Benchmark + public void evalThreadCapture(final Blackhole blackhole) { + final FlagEvalEvent ev = + new FlagEvalEvent( + "checkout-flag", "treatment", "alloc-7", "user-123", null, 1_700_000_000_000L, attrs); + writer.enqueue(ev); + blackhole.consume(ev); + } + + /** Off-thread worker cost: deterministic prune + canonical key + two-tier aggregation. */ + @Benchmark + public void workerAggregate(final Blackhole blackhole) { + final FlagEvalEvent ev = + new FlagEvalEvent( + "checkout-flag", "treatment", "alloc-7", "user-123", null, 1_700_000_000_000L, attrs); + handler.aggregateEvent(ev); + blackhole.consume(handler.fullTier.size()); + } +} diff --git a/products/feature-flagging/feature-flagging-lib/src/main/java/com/datadog/featureflag/ExposureWriterImpl.java b/products/feature-flagging/feature-flagging-lib/src/main/java/com/datadog/featureflag/ExposureWriterImpl.java index 3783ae914f3..598f3c3c949 100644 --- a/products/feature-flagging/feature-flagging-lib/src/main/java/com/datadog/featureflag/ExposureWriterImpl.java +++ b/products/feature-flagging/feature-flagging-lib/src/main/java/com/datadog/featureflag/ExposureWriterImpl.java @@ -4,24 +4,18 @@ import static datadog.trace.util.AgentThreadFactory.newAgentThread; import static java.util.concurrent.TimeUnit.SECONDS; -import com.squareup.moshi.JsonAdapter; -import com.squareup.moshi.Moshi; import datadog.common.queue.MessagePassingBlockingQueue; import datadog.common.queue.Queues; -import datadog.communication.BackendApi; import datadog.communication.BackendApiFactory; import datadog.communication.ddagent.SharedCommunicationObjects; import datadog.trace.api.Config; import datadog.trace.api.featureflag.FeatureFlaggingGateway; import datadog.trace.api.featureflag.exposure.ExposureEvent; import datadog.trace.api.featureflag.exposure.ExposuresRequest; -import datadog.trace.api.intake.Intake; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; -import okhttp3.RequestBody; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,6 +25,7 @@ public class ExposureWriterImpl implements ExposureWriter { private static final int DEFAULT_CAPACITY = 1 << 16; // 65536 elements private static final int DEFAULT_FLUSH_INTERVAL_IN_SECONDS = 1; private static final int FLUSH_THRESHOLD = 100; + private static final String EXPOSURES_ROUTE = "exposures"; private final MessagePassingBlockingQueue queue; private final Thread serializerThread; @@ -46,21 +41,13 @@ public ExposureWriterImpl(final SharedCommunicationObjects sco, final Config con final SharedCommunicationObjects sco, final Config config) { this.queue = Queues.mpscBlockingConsumerArrayQueue(capacity); - final Map context = new HashMap<>(4); - context.put("service", config.getServiceName() == null ? "unknown" : config.getServiceName()); - if (config.getEnv() != null) { - context.put("env", config.getEnv()); - } - if (config.getVersion() != null) { - context.put("version", config.getVersion()); - } final ExposureSerializingHandler serializer = new ExposureSerializingHandler( new BackendApiFactory(config, sco), queue, flushInterval, timeUnit, - context, + FeatureFlagEvpContext.from(config), this::close); this.serializerThread = newAgentThread(FEATURE_FLAG_EXPOSURE_PROCESSOR, serializer); } @@ -89,10 +76,7 @@ private static class ExposureSerializingHandler implements Runnable { private final long ticksRequiredToFlush; private long lastTicks; - private final JsonAdapter jsonAdapter; - private final BackendApiFactory backendApiFactory; - private BackendApi evp; - + private final FeatureFlagEvpPublisher evpPublisher; private final Map context; private final ExposureCache cache; @@ -108,8 +92,7 @@ public ExposureSerializingHandler( final Runnable errorCallback) { this.queue = queue; this.cache = new LRUExposureCache(queue.capacity()); - this.jsonAdapter = new Moshi.Builder().build().adapter(ExposuresRequest.class); - this.backendApiFactory = backendApiFactory; + this.evpPublisher = new FeatureFlagEvpPublisher<>(backendApiFactory, ExposuresRequest.class); this.context = context; this.lastTicks = System.nanoTime(); @@ -122,8 +105,7 @@ public ExposureSerializingHandler( @Override public void run() { - evp = backendApiFactory.createBackendApi(Intake.EVENT_PLATFORM); - if (evp == null) { + if (!evpPublisher.start()) { errorCallback.run(); throw new IllegalArgumentException("EVP Proxy not available"); } @@ -169,10 +151,7 @@ protected void flushIfNecessary() { if (shouldFlush()) { try { final ExposuresRequest exposures = new ExposuresRequest(this.context, this.buffer); - final String reqBod = jsonAdapter.toJson(exposures); - final RequestBody requestBody = - RequestBody.create(okhttp3.MediaType.parse("application/json"), reqBod); - evp.post("exposures", requestBody, stream -> null, null, false); + evpPublisher.post(EXPOSURES_ROUTE, exposures); this.buffer.clear(); } catch (Exception e) { LOGGER.error("Could not submit exposures", e); diff --git a/products/feature-flagging/feature-flagging-lib/src/main/java/com/datadog/featureflag/FeatureFlagEvpContext.java b/products/feature-flagging/feature-flagging-lib/src/main/java/com/datadog/featureflag/FeatureFlagEvpContext.java new file mode 100644 index 00000000000..c964efa6c7f --- /dev/null +++ b/products/feature-flagging/feature-flagging-lib/src/main/java/com/datadog/featureflag/FeatureFlagEvpContext.java @@ -0,0 +1,22 @@ +package com.datadog.featureflag; + +import datadog.trace.api.Config; +import java.util.HashMap; +import java.util.Map; + +final class FeatureFlagEvpContext { + + private FeatureFlagEvpContext() {} + + static Map from(final Config config) { + final Map context = new HashMap<>(4); + context.put("service", config.getServiceName() == null ? "unknown" : config.getServiceName()); + if (config.getEnv() != null) { + context.put("env", config.getEnv()); + } + if (config.getVersion() != null) { + context.put("version", config.getVersion()); + } + return context; + } +} diff --git a/products/feature-flagging/feature-flagging-lib/src/main/java/com/datadog/featureflag/FeatureFlagEvpPublisher.java b/products/feature-flagging/feature-flagging-lib/src/main/java/com/datadog/featureflag/FeatureFlagEvpPublisher.java new file mode 100644 index 00000000000..b98ca414729 --- /dev/null +++ b/products/feature-flagging/feature-flagging-lib/src/main/java/com/datadog/featureflag/FeatureFlagEvpPublisher.java @@ -0,0 +1,72 @@ +package com.datadog.featureflag; + +import com.squareup.moshi.JsonAdapter; +import com.squareup.moshi.Moshi; +import datadog.communication.BackendApi; +import datadog.communication.BackendApiFactory; +import datadog.trace.api.intake.Intake; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import okhttp3.MediaType; +import okhttp3.RequestBody; + +final class FeatureFlagEvpPublisher { + + private static final MediaType JSON = MediaType.parse("application/json"); + + private final BackendApiFactory backendApiFactory; + private final String preferredEvpProxyEndpoint; + private final boolean responseCompression; + private final JsonAdapter jsonAdapter; + private BackendApi evp; + + FeatureFlagEvpPublisher(final BackendApiFactory backendApiFactory, final Class requestType) { + this(backendApiFactory, requestType, null, true); + } + + FeatureFlagEvpPublisher( + final BackendApiFactory backendApiFactory, + final Class requestType, + final String preferredEvpProxyEndpoint, + final boolean responseCompression) { + this.backendApiFactory = backendApiFactory; + this.preferredEvpProxyEndpoint = preferredEvpProxyEndpoint; + this.responseCompression = responseCompression; + this.jsonAdapter = new Moshi.Builder().build().adapter(requestType); + } + + boolean start() { + if (evp == null) { + evp = + preferredEvpProxyEndpoint == null && responseCompression + ? backendApiFactory.createBackendApi(Intake.EVENT_PLATFORM) + : backendApiFactory.createBackendApi( + Intake.EVENT_PLATFORM, preferredEvpProxyEndpoint, responseCompression); + } + return evp != null; + } + + void post(final String route, final T request) throws IOException { + post(route, serialize(request)); + } + + byte[] serialize(final T request) { + return utf8Bytes(jsonAdapter.toJson(request)); + } + + void post(final String route, final byte[] json) throws IOException { + if (!start()) { + throw new IllegalStateException("EVP Proxy not available"); + } + final RequestBody requestBody = RequestBody.create(JSON, json); + evp.post(route, requestBody, stream -> null, null, false); + } + + static byte[] utf8Bytes(final String json) { + try { + return json.getBytes("UTF-8"); + } catch (UnsupportedEncodingException e) { + throw new AssertionError("UTF-8 must be available", e); + } + } +} diff --git a/products/feature-flagging/feature-flagging-lib/src/main/java/com/datadog/featureflag/FlagEvaluationWriterImpl.java b/products/feature-flagging/feature-flagging-lib/src/main/java/com/datadog/featureflag/FlagEvaluationWriterImpl.java new file mode 100644 index 00000000000..75a614eed29 --- /dev/null +++ b/products/feature-flagging/feature-flagging-lib/src/main/java/com/datadog/featureflag/FlagEvaluationWriterImpl.java @@ -0,0 +1,1050 @@ +package com.datadog.featureflag; + +import static datadog.communication.ddagent.DDAgentFeaturesDiscovery.V2_EVP_PROXY_ENDPOINT; +import static datadog.trace.util.AgentThreadFactory.AgentThread.FEATURE_FLAG_EVALUATION_PROCESSOR; +import static datadog.trace.util.AgentThreadFactory.newAgentThread; +import static java.util.concurrent.TimeUnit.SECONDS; + +import com.squareup.moshi.JsonAdapter; +import com.squareup.moshi.Moshi; +import com.squareup.moshi.Types; +import datadog.common.queue.MessagePassingBlockingQueue; +import datadog.common.queue.Queues; +import datadog.communication.BackendApiFactory; +import datadog.communication.EvpProxy; +import datadog.communication.ddagent.SharedCommunicationObjects; +import datadog.trace.api.Config; +import datadog.trace.api.featureflag.FeatureFlaggingGateway; +import datadog.trace.api.featureflag.flagevaluation.FlagEvalEvent; +import datadog.trace.api.featureflag.flagevaluation.FlagEvaluationWriter; +import datadog.trace.api.telemetry.CoreMetricCollector; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.io.ByteArrayOutputStream; +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * EVP {@code flagevaluation} writer for Java. + * + *

Uses the same EVP publisher path as {@link ExposureWriterImpl}, with two-tier aggregation + * replacing the single-exposure buffer. Routes to {@code /evp_proxy/v2/api/v2/flagevaluation}. + * + *

Two-tier aggregation contract: + * + *

    + *
  • Two-tier aggregation: full → degraded → drop-counted. + *
  • Full key: (flagKey, variant, allocationKey, runtimeDefault, errorMessage, targetingKey, + * canonical-context-key). + *
  • Degraded key: (flagKey, variant, allocationKey, runtimeDefault, errorMessage) — no + * targetingKey/context. + *
  • Canonical context key: sorted entries, type-tagged length-delimited encoding — NOT a hash + * (collision-safe, comparable string identity). + *
  • Context pruning: deterministic (sort before cut), ≤256 fields, string values ≤256 chars; + * the pruned attributes are what gets aggregated and serialized. + *
  • Caps: globalCap=131072, perFlagCap=10000, degradedCap=32768. + *
  • Eval-time: min/max of firstEvalMs/lastEvalMs across events in the same bucket. + *
  • Runtime default: absent variant means {@code runtimeDefaultUsed=true}. + *
  • Flush interval: 10 seconds. + *
  • Queue: bounded MessagePassingBlockingQueue (capacity 2^16), non-blocking offer; on overflow + * the event is dropped and the {@code droppedQueueOverflow} counter is incremented and + * surfaced on flush. + *
  • Shutdown: {@link #close()} drains the queue and performs a final flush before the worker + * thread exits. + *
+ */ +public class FlagEvaluationWriterImpl implements FlagEvaluationWriter { + + private static final Logger LOGGER = LoggerFactory.getLogger(FlagEvaluationWriterImpl.class); + + static final int DEFAULT_CAPACITY = 1 << 16; // 65536 elements + static final int FLUSH_INTERVAL_SECONDS = 10; + static final int EVAL_SCALE_TARGET_FLAGS = 2_500; + static final int EVAL_SCALE_FULL_BUCKETS_PER_FLAG = 50; + static final int EVAL_SCALE_USERS_PER_FLAG = 1_000; + static final int EVAL_SCALE_PER_FLAG_HEADROOM_MULTIPLIER = 10; + static final int EVAL_SCALE_DEGRADED_BUCKETS_PER_FLAG = 10; + static final int EVAL_SCALE_FULL_BUCKET_TARGET = + EVAL_SCALE_TARGET_FLAGS * EVAL_SCALE_FULL_BUCKETS_PER_FLAG; + static final int EVAL_SCALE_PER_FLAG_BUCKET_TARGET = + EVAL_SCALE_PER_FLAG_HEADROOM_MULTIPLIER * EVAL_SCALE_USERS_PER_FLAG; + static final int EVAL_SCALE_DEGRADED_BUCKET_TARGET = + EVAL_SCALE_TARGET_FLAGS * EVAL_SCALE_DEGRADED_BUCKETS_PER_FLAG; + static final int GLOBAL_CAP = 131_072; + static final int PER_FLAG_CAP = EVAL_SCALE_PER_FLAG_BUCKET_TARGET; + static final int DEGRADED_CAP = 32_768; + static final int FLAG_EVALUATION_PAYLOAD_SIZE_LIMIT_BYTES = EvpProxy.PAYLOAD_SIZE_LIMIT_BYTES; + static final String FLAG_EVALUATION_DROPPED_METRIC = "flagevaluation.rows.dropped"; + static final String FLAG_EVALUATION_DEGRADED_METRIC = "flagevaluation.rows.degraded"; + static final String FLAG_EVALUATION_SPLITS_METRIC = "flagevaluation.payload.splits"; + static final String DROP_REASON_QUEUE_OVERFLOW = "queue_overflow"; + static final String DROP_REASON_DEGRADED_CAP = "degraded_cap"; + static final String DROP_REASON_PAYLOAD_LIMIT = "payload_limit"; + static final String DEGRADED_REASON_CARDINALITY_CAP = "cardinality_cap"; + static final String DEGRADED_REASON_PAYLOAD_LIMIT = "payload_limit"; + private static final String FLAG_EVALUATION_ROUTE = "flagevaluation"; + private static final byte[] FLAG_EVALUATION_PAYLOAD_SUFFIX = + FeatureFlagEvpPublisher.utf8Bytes("]}"); + private static final byte[] JSON_COMMA = FeatureFlagEvpPublisher.utf8Bytes(","); + private static final JsonAdapter FLAG_EVALUATION_EVENT_JSON_ADAPTER; + private static final JsonAdapter> CONTEXT_JSON_ADAPTER; + private static final CoreMetricCollector CORE_METRICS = CoreMetricCollector.getInstance(); + + static { + final Moshi moshi = new Moshi.Builder().build(); + FLAG_EVALUATION_EVENT_JSON_ADAPTER = moshi.adapter(FlagEvaluationEvent.class); + final Type contextType = Types.newParameterizedType(Map.class, String.class, String.class); + CONTEXT_JSON_ADAPTER = moshi.adapter(contextType); + } + + // Context pruning limits: max fields and max string value length + static final int MAX_CONTEXT_FIELDS = 256; + static final int MAX_FIELD_LENGTH = 256; + + // Type tags for canonical context key (type-tagged length-delimited encoding) + private static final byte CTX_TAG_STRING = 's'; + private static final byte CTX_TAG_BOOL = 'b'; + private static final byte CTX_TAG_INT = 'i'; + private static final byte CTX_TAG_LONG = 'l'; + private static final byte CTX_TAG_FLOAT = 'f'; + private static final byte CTX_TAG_DOUBLE = 'd'; + private static final byte CTX_TAG_OTHER = 'o'; + + private final MessagePassingBlockingQueue queue; + private final FlagEvaluationSerializingHandler serializer; + private final Thread serializerThread; + + private static void countMetric(final String metricName, final long value, final String reason) { + if (value <= 0) { + return; + } + CORE_METRICS.count(metricName, value, reason == null ? null : "reason:" + reason); + } + + /** + * Observable counter for events dropped because the bounded hand-off queue was full when the hook + * tried to enqueue (backpressure). Incremented on the hook thread, surfaced on flush. + */ + private final AtomicLong droppedQueueOverflow = new AtomicLong(0); + + public FlagEvaluationWriterImpl(final SharedCommunicationObjects sco, final Config config) { + this(DEFAULT_CAPACITY, FLUSH_INTERVAL_SECONDS, SECONDS, sco, config); + } + + FlagEvaluationWriterImpl( + final int capacity, + final long flushInterval, + final TimeUnit timeUnit, + final SharedCommunicationObjects sco, + final Config config) { + this(capacity, flushInterval, timeUnit, new BackendApiFactory(config, sco), config); + } + + /** Package-private constructor allowing a {@link BackendApiFactory} to be injected for tests. */ + FlagEvaluationWriterImpl( + final int capacity, + final long flushInterval, + final TimeUnit timeUnit, + final BackendApiFactory backendApiFactory, + final Config config) { + this.queue = Queues.mpscBlockingConsumerArrayQueue(capacity); + this.serializer = + new FlagEvaluationSerializingHandler( + backendApiFactory, + queue, + flushInterval, + timeUnit, + FeatureFlagEvpContext.from(config), + droppedQueueOverflow, + this::close); + this.serializerThread = newAgentThread(FEATURE_FLAG_EVALUATION_PROCESSOR, serializer); + } + + @Override + public void start() { + // Register with the gateway so FlagEvalLoggingHook can route evaluations to this writer + FeatureFlaggingGateway.setFlagEvalWriter(this); + this.serializerThread.start(); + } + + /** Test seam: starts the worker thread WITHOUT registering with the global gateway. */ + void startForTest() { + this.serializerThread.start(); + } + + /** Test seam: current full-tier bucket count in the worker's aggregator. */ + int aggregatorFullTierSizeForTest() { + return serializer.fullTier.size(); + } + + @Override + public void close() { + // Deregister from the gateway so no new events are enqueued. + FeatureFlaggingGateway.setFlagEvalWriter(null); + if (!this.serializerThread.isAlive()) { + return; + } + // Ask the worker to drain the queue and final-flush, then interrupt to wake it from poll(). + serializer.requestShutdown(); + this.serializerThread.interrupt(); + try { + // Bounded wait for the worker's final flush so queued events are not lost on shutdown. + this.serializerThread.join(TimeUnit.SECONDS.toMillis(5)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + @Override + public void enqueue(final FlagEvalEvent event) { + if (event == null) { + return; + } + // Non-blocking offer. Bound the context snapshot before the queue, then count overflow so loss + // is observable rather than silent; the count is surfaced on the next flush. + final FlagEvalEvent boundedEvent = + new FlagEvalEvent( + event.flagKey, + event.variant, + event.allocationKey, + event.targetingKey, + event.errorMessage, + event.evalTimeMs, + pruneContext(event.attrs)); + if (!queue.offer(boundedEvent)) { + droppedQueueOverflow.incrementAndGet(); + } + } + + /** Returns the count of events dropped due to queue-overflow backpressure (observable). */ + long droppedQueueOverflow() { + return droppedQueueOverflow.get(); + } + + /** Test seam: returns one queued event without starting the worker. */ + FlagEvalEvent pollQueuedEventForTest() { + return queue.poll(); + } + + /** Test seam: flushes serializer state without starting the worker. */ + void flushForTest() { + serializer.flush(); + } + + // ---- Deterministic context pruning ---- + + /** + * Produces the deterministically-pruned context map used for both the canonical key and the + * serialized payload. Keys are sorted FIRST, then the first {@link #MAX_CONTEXT_FIELDS} + * non-oversized values are kept, so two logically-identical contexts always prune to the same + * subset (and therefore the same canonical key). Oversized string values (>{@link + * #MAX_FIELD_LENGTH} chars) are skipped. Returns an empty map for null/empty input. + */ + static Map pruneContext(final Map attrs) { + if (attrs == null || attrs.isEmpty()) { + return java.util.Collections.emptyMap(); + } + // TreeMap gives a deterministic, sorted ordering BEFORE we apply the field cap. + final TreeMap out = new TreeMap<>(); + final TreeMap sorted = new TreeMap<>(attrs); + int count = 0; + for (final Map.Entry entry : sorted.entrySet()) { + if (count >= MAX_CONTEXT_FIELDS) { + break; + } + final Object v = entry.getValue(); + if (v instanceof String && ((String) v).length() > MAX_FIELD_LENGTH) { + continue; // skip oversized string values + } + out.put(entry.getKey(), v); + count++; + } + return out; + } + + // ---- Canonical context key ---- + // Sorted entries, type-tagged length-delimited encoding. NOT a hash (collision-safe string key). + // Implementation mirrors dd-trace-go/openfeature/flagevaluation.go canonicalContextKey(). + + /** + * Builds the canonical context key from the already-pruned context map for the full-tier bucket + * identity. Expects a pruned, sorted map (e.g. produced by {@link #pruneContext(Map)}); iterating + * a {@link TreeMap} yields keys in sorted order, so the encoding is deterministic. + * + *

Returns an empty string for null/empty context. + */ + static String canonicalContextKey(final Map prunedAttrs) { + if (prunedAttrs == null || prunedAttrs.isEmpty()) { + return ""; + } + // Ensure deterministic ordering even if a non-sorted map was passed. + final Map sorted = + (prunedAttrs instanceof TreeMap) ? prunedAttrs : new TreeMap<>(prunedAttrs); + final StringBuilder sb = new StringBuilder(); + for (final Map.Entry entry : sorted.entrySet()) { + appendLengthDelimited(sb, entry.getKey()); + appendContextValue(sb, entry.getValue()); + } + return sb.toString(); + } + + private static void appendLengthDelimited(final StringBuilder sb, final String s) { + // Encode as big-endian 8-hex-char length prefix + raw chars (deterministic, unambiguous + // field boundary regardless of value content). + sb.append(String.format("%08x", (long) s.length())); + sb.append(s); + } + + private static void appendContextValue(final StringBuilder sb, final Object v) { + if (v instanceof Boolean) { + sb.append((char) CTX_TAG_BOOL); + appendLengthDelimited(sb, v.toString()); + } else if (v instanceof Long) { + sb.append((char) CTX_TAG_LONG); + appendLengthDelimited(sb, v.toString()); + } else if (v instanceof Integer) { + sb.append((char) CTX_TAG_INT); + appendLengthDelimited(sb, v.toString()); + } else if (v instanceof Float) { + sb.append((char) CTX_TAG_FLOAT); + appendLengthDelimited(sb, v.toString()); + } else if (v instanceof Double) { + sb.append((char) CTX_TAG_DOUBLE); + appendLengthDelimited(sb, v.toString()); + } else if (v instanceof String) { + sb.append((char) CTX_TAG_STRING); + appendLengthDelimited(sb, (String) v); + } else { + sb.append((char) CTX_TAG_OTHER); + appendLengthDelimited(sb, v == null ? "" : v.toString()); + } + } + + // ---- Data classes ---- + + /** Aggregation bucket for a single (full or degraded) key. */ + static class EvalBucket { + long count; + long firstEvalMs; + long lastEvalMs; + boolean runtimeDefaultUsed; + String flagKey; + String variant; + String allocationKey; + String targetingKey; + String errorMessage; + Map prunedAttrs; // pruned context for serialization (full tier only) + + EvalBucket( + final String flagKey, + final String variant, + final String allocationKey, + final String targetingKey, + final String errorMessage, + final long evalTimeMs, + final boolean runtimeDefaultUsed, + final Map prunedAttrs) { + this.flagKey = flagKey; + this.variant = variant; + this.allocationKey = allocationKey; + this.targetingKey = targetingKey; + this.errorMessage = errorMessage; + this.firstEvalMs = evalTimeMs; + this.lastEvalMs = evalTimeMs; + this.count = 1; + this.runtimeDefaultUsed = runtimeDefaultUsed; + this.prunedAttrs = prunedAttrs; + } + + /** Number of context fields stored in this bucket (after pruning). */ + int prunedContextFieldCount() { + return prunedAttrs == null ? 0 : prunedAttrs.size(); + } + + void merge(final long evalTimeMs, final boolean isDefault) { + count++; + if (evalTimeMs < firstEvalMs) firstEvalMs = evalTimeMs; + if (evalTimeMs > lastEvalMs) lastEvalMs = evalTimeMs; + if (isDefault) runtimeDefaultUsed = true; + } + } + + /** Snapshot produced by {@link SerializingHandlerForTest#drainAndAggregate()} for tests. */ + static class AggregatedState { + final Map fullTier; + final Map degradedTier; + final long droppedDegradedOverflow; + + AggregatedState( + final Map fullTier, + final Map degradedTier, + final long droppedDegradedOverflow) { + this.fullTier = fullTier; + this.degradedTier = degradedTier; + this.droppedDegradedOverflow = droppedDegradedOverflow; + } + } + + // ---- Serializing handler (background thread logic) ---- + + static class FlagEvaluationSerializingHandler implements Runnable { + private final MessagePassingBlockingQueue queue; + private final long ticksRequiredToFlush; + + @SuppressFBWarnings( + value = "AT_NONATOMIC_64BIT_PRIMITIVE", + justification = "the field is confined to the single serializer thread") + private long lastTicks; + + private final FeatureFlagEvpPublisher evpPublisher; + final Map context; + private final AtomicLong droppedQueueOverflow; + private final Runnable errorCallback; + private final int payloadSizeLimitBytes; + + // Two-tier aggregation maps (accessed only from the serializer thread; package-private for + // test) + final Map fullTier = new HashMap<>(); + final Map degradedTier = new HashMap<>(); + + // Per-flag full-tier count tracking + final Map perFlagCount = new HashMap<>(); + + // Global full-tier count + @SuppressFBWarnings( + value = "AT_STALE_THREAD_WRITE_OF_PRIMITIVE", + justification = "the field is confined to the single serializer thread") + int globalFullCount = 0; + + // Observable counter for events dropped when both tiers are at capacity + final AtomicLong droppedDegradedOverflow = new AtomicLong(0); + + // Shutdown coordination: set by close(), drives a final drain+flush before the worker exits. + private final AtomicBoolean shutdownRequested = new AtomicBoolean(false); + private final CountDownLatch finalFlushDone = new CountDownLatch(1); + + FlagEvaluationSerializingHandler( + final BackendApiFactory backendApiFactory, + final MessagePassingBlockingQueue queue, + final long flushInterval, + final TimeUnit timeUnit, + final Map context, + final AtomicLong droppedQueueOverflow, + final Runnable errorCallback) { + this( + backendApiFactory, + queue, + flushInterval, + timeUnit, + context, + droppedQueueOverflow, + errorCallback, + FLAG_EVALUATION_PAYLOAD_SIZE_LIMIT_BYTES); + } + + FlagEvaluationSerializingHandler( + final BackendApiFactory backendApiFactory, + final MessagePassingBlockingQueue queue, + final long flushInterval, + final TimeUnit timeUnit, + final Map context, + final AtomicLong droppedQueueOverflow, + final Runnable errorCallback, + final int payloadSizeLimitBytes) { + this.queue = queue; + this.evpPublisher = + new FeatureFlagEvpPublisher<>( + backendApiFactory, FlagEvaluationsRequest.class, V2_EVP_PROXY_ENDPOINT, false); + this.context = context; + this.droppedQueueOverflow = droppedQueueOverflow; + this.payloadSizeLimitBytes = payloadSizeLimitBytes; + this.lastTicks = System.nanoTime(); + this.ticksRequiredToFlush = timeUnit.toNanos(flushInterval); + this.errorCallback = errorCallback; + LOGGER.debug("starting flag evaluation serializer"); + } + + /** Signals the worker to drain the queue and perform a final flush before exiting. */ + void requestShutdown() { + shutdownRequested.set(true); + } + + @Override + public void run() { + if (!evpPublisher.start()) { + finalFlushDone.countDown(); + errorCallback.run(); + throw new IllegalArgumentException("EVP Proxy not available"); + } + try { + runDutyCycle(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + // On exit (interrupt or shutdown request), drain everything still buffered and flush it so + // queued events are not lost on shutdown. + drainAndFlush(); + finalFlushDone.countDown(); + } + LOGGER.debug("flag evaluation processor worker exited."); + } + + private void runDutyCycle() throws InterruptedException { + final Thread thread = Thread.currentThread(); + while (!thread.isInterrupted() && !shutdownRequested.get()) { + FlagEvalEvent event; + while ((event = queue.poll(100, TimeUnit.MILLISECONDS)) != null) { + aggregateEvent(event); + } + flushIfNecessary(); + } + } + + /** Drains all remaining queued events and performs a final flush. Used on shutdown. */ + void drainAndFlush() { + FlagEvalEvent event; + while ((event = queue.poll()) != null) { + aggregateEvent(event); + } + flush(); + } + + // ---- Aggregation logic ---- + + /** Routes an event into the full tier or degraded tier, or drops and counts on overflow. */ + void aggregateEvent(final FlagEvalEvent event) { + final boolean isDefault = event.variant == null; + + // Prune the context deterministically ONCE; both the canonical key and the stored payload + // use this pruned view (never the raw attrs). + final Map prunedAttrs = pruneContext(event.attrs); + final String ctxKey = canonicalContextKey(prunedAttrs); + final String fullKey = buildFullKey(event, ctxKey); + + // Try full tier + if (fullTier.containsKey(fullKey)) { + fullTier.get(fullKey).merge(event.evalTimeMs, isDefault); + return; + } + + // Check full-tier caps before inserting + final int flagCount = perFlagCount.getOrDefault(event.flagKey, 0); + if (globalFullCount < GLOBAL_CAP && flagCount < PER_FLAG_CAP) { + final EvalBucket bucket = + new EvalBucket( + event.flagKey, + event.variant, + event.allocationKey, + event.targetingKey, + event.errorMessage, + event.evalTimeMs, + isDefault, + prunedAttrs); + fullTier.put(fullKey, bucket); + globalFullCount++; + perFlagCount.put(event.flagKey, flagCount + 1); + return; + } + + // Full tier saturated — route to degraded tier + final String degradedKey = buildDegradedKey(event); + if (degradedTier.containsKey(degradedKey)) { + degradedTier.get(degradedKey).merge(event.evalTimeMs, isDefault); + return; + } + + if (degradedTier.size() < DEGRADED_CAP) { + final EvalBucket bucket = + new EvalBucket( + event.flagKey, + event.variant, + event.allocationKey, + null, // degraded omits targetingKey + event.errorMessage, + event.evalTimeMs, + isDefault, + null); // degraded omits context + degradedTier.put(degradedKey, bucket); + return; + } + + // Both tiers saturated — drop and count + droppedDegradedOverflow.incrementAndGet(); + } + + private static String buildFullKey(final FlagEvalEvent event, final String ctxKey) { + return event.flagKey + + '\0' + + nullToEmpty(event.variant) + + '\0' + + nullToEmpty(event.allocationKey) + + '\0' + + (event.variant == null ? "1" : "0") + + '\0' + + nullToEmpty(event.errorMessage) + + '\0' + + nullToEmpty(event.targetingKey) + + '\0' + + ctxKey; + } + + private static String buildDegradedKey(final FlagEvalEvent event) { + return event.flagKey + + '\0' + + nullToEmpty(event.variant) + + '\0' + + nullToEmpty(event.allocationKey) + + '\0' + + (event.variant == null ? "1" : "0") + + '\0' + + nullToEmpty(event.errorMessage); + } + + private static String nullToEmpty(final String s) { + return s != null ? s : ""; + } + + // ---- Flush logic ---- + + void flushIfNecessary() { + if (fullTier.isEmpty() && degradedTier.isEmpty() && droppedQueueOverflow.get() == 0) { + return; + } + if (shouldFlush()) { + flush(); + } + } + + void flush() { + // Surface backpressure (queue-overflow) drops as an observable warning even when there is + // nothing else to flush. + final long qDrops = droppedQueueOverflow.getAndSet(0); + countMetric(FLAG_EVALUATION_DROPPED_METRIC, qDrops, DROP_REASON_QUEUE_OVERFLOW); + if (qDrops > 0) { + LOGGER.warn( + "flag evaluation queue full — dropped {} evaluation(s) under backpressure" + + " (best-effort telemetry)", + qDrops); + } + final long dgDrops = droppedDegradedOverflow.getAndSet(0); + countMetric(FLAG_EVALUATION_DROPPED_METRIC, dgDrops, DROP_REASON_DEGRADED_CAP); + if (dgDrops > 0) { + LOGGER.warn( + "degraded aggregation tier full — dropped {} evaluation(s); raise degraded cap" + + " (best-effort telemetry)", + dgDrops); + } + + if (fullTier.isEmpty() && degradedTier.isEmpty()) { + return; + } + try { + countMetric( + FLAG_EVALUATION_DEGRADED_METRIC, + degradedEvaluationCount(), + DEGRADED_REASON_CARDINALITY_CAP); + final List events = buildEventList(); + if (events.isEmpty()) { + return; + } + final EncodedFlagEvaluationPayloads payloads = buildPayloads(events); + countMetric( + FLAG_EVALUATION_DROPPED_METRIC, + payloads.droppedPayloadLimit, + DROP_REASON_PAYLOAD_LIMIT); + countMetric( + FLAG_EVALUATION_DEGRADED_METRIC, + payloads.degradedPayloadLimit, + DEGRADED_REASON_PAYLOAD_LIMIT); + if (payloads.bodies.size() > 1) { + countMetric(FLAG_EVALUATION_SPLITS_METRIC, payloads.bodies.size() - 1, null); + } + if (payloads.droppedPayloadLimit > 0) { + LOGGER.warn( + "flag evaluation payload too large — dropped {} evaluation(s)" + + " (best-effort telemetry)", + payloads.droppedPayloadLimit); + } + for (final byte[] payload : payloads.bodies) { + evpPublisher.post(FLAG_EVALUATION_ROUTE, payload); + } + fullTier.clear(); + degradedTier.clear(); + perFlagCount.clear(); + globalFullCount = 0; + lastTicks = System.nanoTime(); + } catch (Exception e) { + LOGGER.error("Could not submit flag evaluations", e); + } + } + + private long degradedEvaluationCount() { + long count = 0; + for (final EvalBucket bucket : degradedTier.values()) { + count += bucket.count; + } + return count; + } + + private List buildEventList() { + final long flushTimeMs = System.currentTimeMillis(); + final List events = + new ArrayList<>(fullTier.size() + degradedTier.size()); + for (final EvalBucket bucket : fullTier.values()) { + events.add(FlagEvaluationEvent.fromBucket(bucket, true, flushTimeMs)); + } + for (final EvalBucket bucket : degradedTier.values()) { + events.add(FlagEvaluationEvent.fromBucket(bucket, false, flushTimeMs)); + } + return events; + } + + private EncodedFlagEvaluationPayloads buildPayloads(final List events) { + final byte[] prefix = flagEvaluationPayloadPrefix(context); + EncodedFlagEvaluationPayloadBuilder current = new EncodedFlagEvaluationPayloadBuilder(prefix); + final List payloads = new ArrayList<>(); + long droppedPayloadLimit = 0; + long degradedPayloadLimit = 0; + + for (final FlagEvaluationEvent event : events) { + byte[] eventBytes = encodeFlagEvaluationEvent(event); + + if (!current.canAdd(eventBytes, payloadSizeLimitBytes) && !current.isEmpty()) { + payloads.add(current.toByteArray()); + current = new EncodedFlagEvaluationPayloadBuilder(prefix); + } + + if (current.canAdd(eventBytes, payloadSizeLimitBytes)) { + current.add(eventBytes); + continue; + } + + final FlagEvaluationEvent degraded = event.withoutTargetingKeyAndContext(); + if (degraded != null) { + eventBytes = encodeFlagEvaluationEvent(degraded); + if (!current.canAdd(eventBytes, payloadSizeLimitBytes) && !current.isEmpty()) { + payloads.add(current.toByteArray()); + current = new EncodedFlagEvaluationPayloadBuilder(prefix); + } + if (current.canAdd(eventBytes, payloadSizeLimitBytes)) { + current.add(eventBytes); + degradedPayloadLimit += event.evaluation_count; + continue; + } + } + + droppedPayloadLimit += event.evaluation_count; + } + + if (!current.isEmpty()) { + payloads.add(current.toByteArray()); + } + return new EncodedFlagEvaluationPayloads(payloads, droppedPayloadLimit, degradedPayloadLimit); + } + + private static byte[] flagEvaluationPayloadPrefix(final Map context) { + return FeatureFlagEvpPublisher.utf8Bytes( + "{\"context\":" + CONTEXT_JSON_ADAPTER.toJson(context) + ",\"flagEvaluations\":["); + } + + private static byte[] encodeFlagEvaluationEvent(final FlagEvaluationEvent event) { + return FeatureFlagEvpPublisher.utf8Bytes(FLAG_EVALUATION_EVENT_JSON_ADAPTER.toJson(event)); + } + + private static final class EncodedFlagEvaluationPayloads { + private final List bodies; + private final long droppedPayloadLimit; + private final long degradedPayloadLimit; + + private EncodedFlagEvaluationPayloads( + final List bodies, + final long droppedPayloadLimit, + final long degradedPayloadLimit) { + this.bodies = bodies; + this.droppedPayloadLimit = droppedPayloadLimit; + this.degradedPayloadLimit = degradedPayloadLimit; + } + } + + private static final class EncodedFlagEvaluationPayloadBuilder { + private final byte[] prefix; + private final List events = new ArrayList<>(); + private int eventBytes; + + private EncodedFlagEvaluationPayloadBuilder(final byte[] prefix) { + this.prefix = prefix; + } + + private boolean isEmpty() { + return events.isEmpty(); + } + + private boolean canAdd(final byte[] event, final int payloadSizeLimitBytes) { + return sizeWith(event) <= payloadSizeLimitBytes; + } + + private int sizeWith(final byte[] event) { + return prefix.length + + FLAG_EVALUATION_PAYLOAD_SUFFIX.length + + eventBytes + + event.length + + events.size(); + } + + private void add(final byte[] event) { + events.add(event); + eventBytes += event.length; + } + + private byte[] toByteArray() { + final int size = prefix.length + FLAG_EVALUATION_PAYLOAD_SUFFIX.length + eventBytes; + final ByteArrayOutputStream out = new ByteArrayOutputStream(size + events.size()); + out.write(prefix, 0, prefix.length); + for (int i = 0; i < events.size(); i++) { + if (i > 0) { + out.write(JSON_COMMA, 0, JSON_COMMA.length); + } + final byte[] event = events.get(i); + out.write(event, 0, event.length); + } + out.write(FLAG_EVALUATION_PAYLOAD_SUFFIX, 0, FLAG_EVALUATION_PAYLOAD_SUFFIX.length); + return out.toByteArray(); + } + } + + private boolean shouldFlush() { + final long nanoTime = System.nanoTime(); + final long ticks = nanoTime - lastTicks; + if (ticks > ticksRequiredToFlush) { + lastTicks = nanoTime; + return true; + } + return false; + } + } + + // ---- Test-seam inner class (package-private) ---- + + /** + * Test-accessible handler that exposes {@link #drainAndAggregate()} and {@link #flush()} without + * starting a real background thread. + */ + static class SerializingHandlerForTest extends FlagEvaluationSerializingHandler { + + SerializingHandlerForTest(final BackendApiFactory factory, final Map context) { + this(factory, context, FLAG_EVALUATION_PAYLOAD_SIZE_LIMIT_BYTES); + } + + SerializingHandlerForTest( + final BackendApiFactory factory, + final Map context, + final int payloadSizeLimitBytes) { + super( + factory, + Queues.mpscBlockingConsumerArrayQueue(DEFAULT_CAPACITY), + Long.MAX_VALUE, // effectively never auto-flush + TimeUnit.NANOSECONDS, + context, + new AtomicLong(0), + () -> {}, + payloadSizeLimitBytes); + } + + private final List staged = new ArrayList<>(); + + /** Adds an event to the staged list (simulates hook enqueue). */ + void add(final FlagEvalEvent event) { + staged.add(event); + } + + /** Aggregates all staged events and returns the current aggregation state. */ + AggregatedState drainAndAggregate() { + for (final FlagEvalEvent e : staged) { + aggregateEvent(e); + } + staged.clear(); + return new AggregatedState( + new HashMap<>(fullTier), new HashMap<>(degradedTier), droppedDegradedOverflow.get()); + } + + /** Simulates filling the full tier to GLOBAL_CAP by injecting synthetic distinct buckets. */ + void simulateFullTierAtCap() { + for (int i = globalFullCount; i < GLOBAL_CAP; i++) { + final String key = "synthetic-full-" + i; + fullTier.put(key, new EvalBucket(key, "on", "alloc", null, null, 1L, false, null)); + globalFullCount++; + perFlagCount.merge(key, 1, Integer::sum); + } + } + + /** + * Simulates filling the degraded tier to DEGRADED_CAP by injecting synthetic distinct buckets. + */ + void simulateDegradedTierAtCap() { + for (int i = degradedTier.size(); i < DEGRADED_CAP; i++) { + final String key = "synthetic-dg-" + i; + degradedTier.put(key, new EvalBucket(key, "on", "alloc", null, null, 1L, false, null)); + } + } + } + + /** Factory method for test use — creates a SerializingHandlerForTest. */ + static SerializingHandlerForTest createHandlerForTest( + final BackendApiFactory factory, final Map context) { + return new SerializingHandlerForTest(factory, context); + } + + static SerializingHandlerForTest createHandlerForTest( + final BackendApiFactory factory, + final Map context, + final int payloadSizeLimitBytes) { + return new SerializingHandlerForTest(factory, context, payloadSizeLimitBytes); + } + + // ---- Request/response DTOs for JSON serialization ---- + // Structure mirrors the flageval-worker batched event payload: + // variant/allocation/flag are {"key": ...} objects, error is {"message": ...}, and per-event + // context nests evaluation attributes under {"evaluation": {...}}. + + static class FlagEvaluationsRequest { + public final Map context; + public final List flagEvaluations; + + FlagEvaluationsRequest( + final Map context, final List flagEvaluations) { + this.context = context; + this.flagEvaluations = flagEvaluations; + } + } + + static class FlagEvaluationEvent { + public final long timestamp; + public final FlagKeyObject flag; + public final long first_evaluation; + public final long last_evaluation; + public final long evaluation_count; + public final KeyObject variant; // null = omitted (degraded tier or runtime default) + public final KeyObject allocation; // null = omitted + public final String targeting_key; // null = omitted (degraded tier) + public final Boolean runtime_default_used; // null = omitted when false + public final EventContext context; // null = omitted (degraded tier) + public final ErrorObject error; // null = omitted (no error) + + FlagEvaluationEvent( + final long timestamp, + final String flagKey, + final long firstEvalMs, + final long lastEvalMs, + final long count, + final String variant, + final String allocation, + final String targetingKey, + final boolean runtimeDefaultUsed, + final String errorMessage, + final Map evaluationAttrs) { + this.timestamp = timestamp; + this.flag = new FlagKeyObject(flagKey); + this.first_evaluation = firstEvalMs; + this.last_evaluation = lastEvalMs; + this.evaluation_count = count; + this.variant = (variant != null && !variant.isEmpty()) ? new KeyObject(variant) : null; + this.allocation = + (allocation != null && !allocation.isEmpty()) ? new KeyObject(allocation) : null; + this.targeting_key = targetingKey; + this.runtime_default_used = runtimeDefaultUsed ? Boolean.TRUE : null; + this.context = + (evaluationAttrs != null && !evaluationAttrs.isEmpty()) + ? new EventContext(evaluationAttrs) + : null; + this.error = + (errorMessage != null && !errorMessage.isEmpty()) ? new ErrorObject(errorMessage) : null; + } + + static FlagEvaluationEvent fromBucket( + final EvalBucket bucket, final boolean isFullTier, final long flushTimeMs) { + return new FlagEvaluationEvent( + flushTimeMs, + bucket.flagKey, + bucket.firstEvalMs, + bucket.lastEvalMs, + bucket.count, + bucket.variant, + bucket.allocationKey, + isFullTier ? bucket.targetingKey : null, // degraded omits targetingKey + bucket.runtimeDefaultUsed, + bucket.errorMessage, + isFullTier ? bucket.prunedAttrs : null); // degraded omits context + } + + FlagEvaluationEvent withoutTargetingKeyAndContext() { + if (targeting_key == null && context == null) { + return null; + } + return new FlagEvaluationEvent( + timestamp, + flag.key, + first_evaluation, + last_evaluation, + evaluation_count, + keyOf(variant), + keyOf(allocation), + null, + Boolean.TRUE.equals(runtime_default_used), + messageOf(error), + null); + } + } + + private static String keyOf(final KeyObject object) { + return object == null ? null : object.key; + } + + private static String messageOf(final ErrorObject object) { + return object == null ? null : object.message; + } + + /** {@code {"key": "..."}} object for variant / allocation. */ + static class KeyObject { + public final String key; + + KeyObject(final String key) { + this.key = key; + } + } + + /** {@code {"key": "..."}} object for the flag. */ + static class FlagKeyObject { + public final String key; + + FlagKeyObject(final String key) { + this.key = key; + } + } + + /** {@code {"message": "..."}} object for an evaluation error. */ + static class ErrorObject { + public final String message; + + ErrorObject(final String message) { + this.message = message; + } + } + + /** Per-event {@code context} object: evaluation attributes nest under {@code evaluation}. */ + static class EventContext { + public final Map evaluation; + + EventContext(final Map evaluation) { + this.evaluation = evaluation; + } + } +} diff --git a/products/feature-flagging/feature-flagging-lib/src/test/java/com/datadog/featureflag/FlagEvaluationWriterImplTest.java b/products/feature-flagging/feature-flagging-lib/src/test/java/com/datadog/featureflag/FlagEvaluationWriterImplTest.java new file mode 100644 index 00000000000..00e14e25f90 --- /dev/null +++ b/products/feature-flagging/feature-flagging-lib/src/test/java/com/datadog/featureflag/FlagEvaluationWriterImplTest.java @@ -0,0 +1,790 @@ +package com.datadog.featureflag; + +import static datadog.communication.ddagent.DDAgentFeaturesDiscovery.V2_EVP_PROXY_ENDPOINT; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.squareup.moshi.JsonAdapter; +import com.squareup.moshi.Moshi; +import com.squareup.moshi.Types; +import datadog.communication.BackendApi; +import datadog.communication.BackendApiFactory; +import datadog.trace.api.featureflag.flagevaluation.FlagEvalEvent; +import datadog.trace.api.intake.Intake; +import datadog.trace.api.telemetry.CoreMetricCollector; +import datadog.trace.api.telemetry.MetricCollector; +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import okhttp3.RequestBody; +import okio.Buffer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** + * Unit tests for FlagEvaluationWriterImpl: two-tier aggregation, canonical context key, caps, + * deterministic pruning, worker payload shape, and EVP transport. + */ +class FlagEvaluationWriterImplTest { + + // Moshi adapter for parsing the emitted JSON back into a Map for structural assertions. + private static final JsonAdapter> JSON_MAP; + private static final long REALISTIC_EVAL_MS = 1_760_000_000_000L; + + static { + final Moshi moshi = new Moshi.Builder().build(); + final Type type = Types.newParameterizedType(Map.class, String.class, Object.class); + JSON_MAP = moshi.adapter(type); + } + + // ---- helpers ---- + + @BeforeEach + void clearCoreMetricsBefore() { + clearCoreMetrics(); + } + + @AfterEach + void clearCoreMetricsAfter() { + clearCoreMetrics(); + } + + private void clearCoreMetrics() { + CoreMetricCollector.getInstance().drain(); + } + + private static FlagEvalEvent event( + String flagKey, + String variant, + String allocationKey, + String targetingKey, + long evalTimeMs, + Map attrs) { + return new FlagEvalEvent(flagKey, variant, allocationKey, targetingKey, evalTimeMs, attrs); + } + + private static FlagEvalEvent errorEvent(String flagKey, String errorMessage, long evalTimeMs) { + return new FlagEvalEvent( + flagKey, null, null, null, errorMessage, evalTimeMs, Collections.emptyMap()); + } + + private static FlagEvalEvent simpleEvent(String flagKey, String variant) { + return event(flagKey, variant, "alloc1", "user-1", 1000L, Collections.emptyMap()); + } + + private static String repeat(char c, int count) { + char[] chars = new char[count]; + Arrays.fill(chars, c); + return new String(chars); + } + + private TestWriterSetup buildTestWriter(BackendApi mockEvp) { + return buildTestWriter( + mockEvp, FlagEvaluationWriterImpl.FLAG_EVALUATION_PAYLOAD_SIZE_LIMIT_BYTES); + } + + private TestWriterSetup buildTestWriter(BackendApi mockEvp, int payloadSizeLimitBytes) { + BackendApiFactory factory = mock(BackendApiFactory.class); + when(factory.createBackendApi(any())).thenReturn(mockEvp); + when(factory.createBackendApi(any(), any(), eq(false))).thenReturn(mockEvp); + + Map context = new HashMap<>(); + context.put("service", "test-service"); + + FlagEvaluationWriterImpl.SerializingHandlerForTest handler = + FlagEvaluationWriterImpl.createHandlerForTest(factory, context, payloadSizeLimitBytes); + + return new TestWriterSetup(handler, mockEvp, factory); + } + + static class TestWriterSetup { + final FlagEvaluationWriterImpl.SerializingHandlerForTest handler; + final BackendApi mockEvp; + final BackendApiFactory factory; + + TestWriterSetup( + FlagEvaluationWriterImpl.SerializingHandlerForTest handler, + BackendApi mockEvp, + BackendApiFactory factory) { + this.handler = handler; + this.mockEvp = mockEvp; + this.factory = factory; + } + } + + static class CapturedJson { + final String raw; + final Map parsed; + + CapturedJson(final String raw, final Map parsed) { + this.raw = raw; + this.parsed = parsed; + } + } + + /** Captures the JSON body posted by a flush(). */ + private CapturedJson flushAndCapture(TestWriterSetup setup) throws Exception { + final List captured = flushAndCaptureAll(setup); + assertEquals(1, captured.size(), "Expected exactly one posted payload"); + return captured.get(0); + } + + /** Captures all JSON bodies posted by a flush(). */ + private List flushAndCaptureAll(TestWriterSetup setup) throws Exception { + final List captured = new ArrayList<>(); + when(setup.mockEvp.post(eq("flagevaluation"), any(RequestBody.class), any(), any(), eq(false))) + .thenAnswer( + inv -> { + captured.add(inv.getArgument(1)); + return null; + }); + setup.handler.drainAndAggregate(); + setup.handler.flush(); + final List json = new ArrayList<>(); + for (final RequestBody body : captured) { + json.add(readJson(body)); + } + return json; + } + + private CapturedJson readJson(final RequestBody body) throws Exception { + assertNotNull(body, "RequestBody must have been posted"); + Buffer buf = new Buffer(); + body.writeTo(buf); + String raw = buf.readUtf8(); + return new CapturedJson(raw, JSON_MAP.fromJson(raw)); + } + + private static long metricSum( + final Collection metrics, + final String metricName, + final String tag) { + long sum = 0; + for (final MetricCollector.Metric metric : metrics) { + if (!metricName.equals(metric.metricName)) { + continue; + } + if (tag == null) { + if (!metric.tags.isEmpty()) { + continue; + } + } else if (!metric.tags.contains(tag)) { + continue; + } + sum += metric.value.longValue(); + } + return sum; + } + + /** Captures the JSON body posted by a flush(), parsed into a Map. */ + private Map flushAndCaptureJson(TestWriterSetup setup) throws Exception { + return flushAndCapture(setup).parsed; + } + + // ---- test: two identical events -> one full-tier bucket, count 2, first <= last ---- + + @Test + void identicalEventsAggregateIntoOneBucketWithCount2() { + BackendApi mockEvp = mock(BackendApi.class); + TestWriterSetup setup = buildTestWriter(mockEvp); + + setup.handler.add(event("flag-a", "on", "alloc1", "user-1", 1000L, Collections.emptyMap())); + setup.handler.add(event("flag-a", "on", "alloc1", "user-1", 2000L, Collections.emptyMap())); + + FlagEvaluationWriterImpl.AggregatedState state = setup.handler.drainAndAggregate(); + assertEquals( + 1, state.fullTier.size(), "Identical events must produce exactly 1 full-tier bucket"); + FlagEvaluationWriterImpl.EvalBucket bucket = state.fullTier.values().iterator().next(); + assertEquals(2, bucket.count, "Count must be 2"); + assertEquals(1000L, bucket.firstEvalMs, "first_evaluation must be min(1000, 2000)=1000"); + assertEquals(2000L, bucket.lastEvalMs, "last_evaluation must be max(1000, 2000)=2000"); + assertTrue(bucket.firstEvalMs <= bucket.lastEvalMs); + } + + // ---- variant comes from the OpenFeature variant, distinct from the evaluated value ---- + + @Test + void variantIsTheVariantKeyNotTheEvaluatedValue() throws Exception { + BackendApi mockEvp = mock(BackendApi.class); + TestWriterSetup setup = buildTestWriter(mockEvp); + + // The event already carries the variant key ("on"); the evaluated value ("on-value") is + // intentionally NOT part of the event — the hook is responsible for sourcing variant from + // details.getVariant(). Here we assert the emitted variant.key == the variant key, never a + // value. + setup.handler.add(event("flag-v", "on", "alloc1", "user-1", 1000L, Collections.emptyMap())); + + Map json = flushAndCaptureJson(setup); + Map ev = firstEvent(json); + Map variantObj = (Map) ev.get("variant"); + assertNotNull(variantObj, "variant object must be present"); + assertEquals("on", variantObj.get("key"), "variant.key must be the variant key"); + } + + // ---- type-tagged canonical key distinguishes int 1 vs string "1" ---- + + @Test + void differentValueTypesProduceDifferentBuckets() { + BackendApi mockEvp = mock(BackendApi.class); + TestWriterSetup setup = buildTestWriter(mockEvp); + + Map attrsInt = new HashMap<>(); + attrsInt.put("score", 1); + Map attrsStr = new HashMap<>(); + attrsStr.put("score", "1"); + + setup.handler.add(event("flag-b", "on", "alloc1", "user-1", 1000L, attrsInt)); + setup.handler.add(event("flag-b", "on", "alloc1", "user-1", 1000L, attrsStr)); + + FlagEvaluationWriterImpl.AggregatedState state = setup.handler.drainAndAggregate(); + assertEquals( + 2, + state.fullTier.size(), + "int 1 vs String \"1\" must produce 2 distinct buckets (type-tagged canonical key)"); + } + + // ---- full-tier overflow past GLOBAL_CAP routes to degraded ---- + + @Test + void globalCapOverflowRoutesToDegradedTier() { + BackendApi mockEvp = mock(BackendApi.class); + TestWriterSetup setup = buildTestWriter(mockEvp); + + setup.handler.simulateFullTierAtCap(); + setup.handler.add(simpleEvent("extra-flag", "on")); + + FlagEvaluationWriterImpl.AggregatedState state = setup.handler.drainAndAggregate(); + assertTrue(state.degradedTier.size() > 0, "Overflow past GLOBAL_CAP must route to degraded"); + assertEquals(0, state.droppedDegradedOverflow, "No drops yet (degraded not full)"); + } + + // ---- degraded-tier overflow increments the observable dropped counter ---- + + @Test + void degradedCapOverflowIncrementsDroppedCounter() { + BackendApi mockEvp = mock(BackendApi.class); + TestWriterSetup setup = buildTestWriter(mockEvp); + + setup.handler.simulateFullTierAtCap(); + setup.handler.simulateDegradedTierAtCap(); + setup.handler.add(simpleEvent("drop-flag", "on")); + + FlagEvaluationWriterImpl.AggregatedState state = setup.handler.drainAndAggregate(); + assertTrue( + state.droppedDegradedOverflow > 0, + "Beyond DEGRADED_CAP must increment droppedDegradedOverflow counter"); + } + + @Test + void degradedCapOverflowTelemetryIsEmittedOnFlush() { + BackendApi mockEvp = mock(BackendApi.class); + TestWriterSetup setup = buildTestWriter(mockEvp); + + setup.handler.droppedDegradedOverflow.addAndGet(3); + setup.handler.flush(); + + final Collection metrics = + CoreMetricCollector.getInstance().drain(); + assertEquals( + 3, + metricSum( + metrics, + FlagEvaluationWriterImpl.FLAG_EVALUATION_DROPPED_METRIC, + "reason:" + FlagEvaluationWriterImpl.DROP_REASON_DEGRADED_CAP)); + } + + // ---- queue-overflow backpressure increments an observable drop counter on enqueue ---- + + @Test + void queueOverflowIncrementsObservableDropCounter() { + // Tiny capacity so we can overflow deterministically. Capacity must be a power of two. + BackendApi mockEvp = mock(BackendApi.class); + BackendApiFactory factory = mock(BackendApiFactory.class); + when(factory.createBackendApi(any())).thenReturn(mockEvp); + when(factory.createBackendApi(any(), any(), eq(false))).thenReturn(mockEvp); + FlagEvaluationWriterImpl writer = + new FlagEvaluationWriterImpl(2, 10L, java.util.concurrent.TimeUnit.SECONDS, factory, cfg()); + + // Never start() the worker, so nothing drains the queue; offers past capacity must be counted. + for (int i = 0; i < 100; i++) { + writer.enqueue(simpleEvent("of-flag", "on")); + } + + assertTrue( + writer.droppedQueueOverflow() > 0, + "queue-overflow drops must be counted in the observable droppedQueueOverflow counter"); + + final long queueDrops = writer.droppedQueueOverflow(); + writer.flushForTest(); + final Collection metrics = + CoreMetricCollector.getInstance().drain(); + assertEquals( + queueDrops, + metricSum( + metrics, + FlagEvaluationWriterImpl.FLAG_EVALUATION_DROPPED_METRIC, + "reason:" + FlagEvaluationWriterImpl.DROP_REASON_QUEUE_OVERFLOW), + "queue-overflow telemetry must report the dropped evaluation count"); + } + + private static datadog.trace.api.Config cfg() { + datadog.trace.api.Config config = mock(datadog.trace.api.Config.class); + when(config.getServiceName()).thenReturn("test-service"); + return config; + } + + // ---- enqueue does NOT aggregate (no buckets formed) — aggregation is the worker's job ---- + + @Test + void enqueueDoesNotAggregateOnTheCallingThread() { + BackendApi mockEvp = mock(BackendApi.class); + BackendApiFactory factory = mock(BackendApiFactory.class); + when(factory.createBackendApi(any())).thenReturn(mockEvp); + when(factory.createBackendApi(any(), any(), eq(false))).thenReturn(mockEvp); + FlagEvaluationWriterImpl writer = + new FlagEvaluationWriterImpl( + 16, Long.MAX_VALUE, java.util.concurrent.TimeUnit.NANOSECONDS, factory, cfg()); + + // Do NOT start the worker. enqueue() must only offer to the queue; the aggregation maps stay + // empty because nothing on the calling thread aggregates. + writer.enqueue(simpleEvent("g2-flag", "on")); + writer.enqueue(simpleEvent("g2-flag", "on")); + + assertEquals( + 0, + writer.aggregatorFullTierSizeForTest(), + "enqueue must not build aggregation buckets on the calling thread"); + assertEquals(0, writer.droppedQueueOverflow(), "no overflow expected for 2 events in cap 16"); + } + + @Test + void enqueueQueuesPrunedContextSnapshotBeforeBuffering() { + BackendApi mockEvp = mock(BackendApi.class); + BackendApiFactory factory = mock(BackendApiFactory.class); + when(factory.createBackendApi(any())).thenReturn(mockEvp); + when(factory.createBackendApi(any(), any(), eq(false))).thenReturn(mockEvp); + FlagEvaluationWriterImpl writer = + new FlagEvaluationWriterImpl( + 16, Long.MAX_VALUE, java.util.concurrent.TimeUnit.NANOSECONDS, factory, cfg()); + + Map rawAttrs = new HashMap<>(); + rawAttrs.put("oversized", repeat('x', 300)); + for (int i = 0; i < 300; i++) { + rawAttrs.put(String.format("k%03d", i), "v" + i); + } + + writer.enqueue(event("bounded-flag", "on", "alloc1", "user-1", 1000L, rawAttrs)); + + FlagEvalEvent queued = writer.pollQueuedEventForTest(); + assertNotNull(queued, "enqueue must offer one event to the bounded queue"); + assertEquals(256, queued.attrs.size(), "queued attrs must be pruned before buffering"); + assertFalse(queued.attrs.containsKey("oversized"), "oversized string must not be queued"); + assertTrue(queued.attrs.containsKey("k000"), "deterministic first sorted key survives"); + assertFalse(queued.attrs.containsKey("k299"), "fields beyond the 256-field cap are not queued"); + } + + // ---- close() drains the queue and final-flushes before the worker exits ---- + + @Test + void closeDrainsAndFinalFlushesQueuedEvents() throws Exception { + final java.util.concurrent.CountDownLatch posted = new java.util.concurrent.CountDownLatch(1); + final RequestBody[] captured = {null}; + BackendApi mockEvp = mock(BackendApi.class); + when(mockEvp.post(eq("flagevaluation"), any(RequestBody.class), any(), any(), eq(false))) + .thenAnswer( + inv -> { + captured[0] = inv.getArgument(1); + posted.countDown(); + return null; + }); + BackendApiFactory factory = mock(BackendApiFactory.class); + when(factory.createBackendApi(any())).thenReturn(mockEvp); + when(factory.createBackendApi(any(), any(), eq(false))).thenReturn(mockEvp); + + // Large flush interval so the ONLY flush that can happen is the shutdown drain-flush. + FlagEvaluationWriterImpl writer = + new FlagEvaluationWriterImpl( + 64, + java.util.concurrent.TimeUnit.DAYS.toSeconds(1), + java.util.concurrent.TimeUnit.SECONDS, + factory, + cfg()); + writer.startForTest(); + writer.enqueue(simpleEvent("shutdown-flag", "on")); + + // close() must drain + final-flush, not just interrupt. + writer.close(); + + assertTrue( + posted.await(5, java.util.concurrent.TimeUnit.SECONDS), + "close() must drain the queue and final-flush before exit"); + assertNotNull(captured[0], "a flag evaluation payload must have been posted on shutdown"); + Buffer buf = new Buffer(); + captured[0].writeTo(buf); + @SuppressWarnings("unchecked") + Map json = JSON_MAP.fromJson(buf.readUtf8()); + assertNotNull( + eventForFlag(json, "shutdown-flag"), "the queued event must be in the final flush"); + } + + @Test + void emittedTimestampUsesFlushTimeAndFirstLastUseEvaluationTime() throws Exception { + BackendApi mockEvp = mock(BackendApi.class); + TestWriterSetup setup = buildTestWriter(mockEvp); + + setup.handler.add( + event("ts-flag", "on", "alloc1", "user-1", REALISTIC_EVAL_MS, Collections.emptyMap())); + setup.handler.add( + event("ts-flag", "on", "alloc1", "user-1", REALISTIC_EVAL_MS + 10, Collections.emptyMap())); + + long beforeFlush = System.currentTimeMillis(); + Map json = flushAndCaptureJson(setup); + long afterFlush = System.currentTimeMillis(); + Map ev = firstEvent(json); + long timestamp = ((Number) ev.get("timestamp")).longValue(); + assertTrue( + timestamp >= beforeFlush && timestamp <= afterFlush, "event timestamp must use flush time"); + assertEquals((double) REALISTIC_EVAL_MS, ((Number) ev.get("first_evaluation")).doubleValue()); + assertEquals( + (double) (REALISTIC_EVAL_MS + 10), ((Number) ev.get("last_evaluation")).doubleValue()); + } + + // ---- absent variant -> runtimeDefaultUsed=true ---- + + @Test + void absentVariantSetsRuntimeDefaultUsed() { + BackendApi mockEvp = mock(BackendApi.class); + TestWriterSetup setup = buildTestWriter(mockEvp); + + setup.handler.add(event("flag-c", null, "alloc1", "user-1", 1000L, Collections.emptyMap())); + + FlagEvaluationWriterImpl.AggregatedState state = setup.handler.drainAndAggregate(); + assertEquals(1, state.fullTier.size()); + FlagEvaluationWriterImpl.EvalBucket bucket = state.fullTier.values().iterator().next(); + assertTrue(bucket.runtimeDefaultUsed, "Absent variant must set runtimeDefaultUsed=true"); + } + + // ---- degraded event omits targeting_key + context ---- + + @Test + void degradedTierEventOmitsTargetingKeyAndContext() throws Exception { + BackendApi mockEvp = mock(BackendApi.class); + TestWriterSetup setup = buildTestWriter(mockEvp); + + setup.handler.degradedTier.put( + "dg-flag", + new FlagEvaluationWriterImpl.EvalBucket( + "dg-flag", "on", "alloc1", null, null, 1000L, false, null)); + + Map json = flushAndCaptureJson(setup); + Map dg = eventForFlag(json, "dg-flag"); + assertNotNull(dg, "degraded event must be emitted"); + assertNull(dg.get("targeting_key"), "Degraded event must omit targeting_key"); + assertNull(dg.get("context"), "Degraded event must omit context"); + final Collection metrics = + CoreMetricCollector.getInstance().drain(); + assertEquals( + 1, + metricSum( + metrics, + FlagEvaluationWriterImpl.FLAG_EVALUATION_DEGRADED_METRIC, + "reason:" + FlagEvaluationWriterImpl.DEGRADED_REASON_CARDINALITY_CAP)); + } + + // ---- context >256 fields is pruned to <=256 and stored pruned ---- + + @Test + void contextExceeding256FieldsIsPrunedToStoredPrunedAttrs() { + BackendApi mockEvp = mock(BackendApi.class); + TestWriterSetup setup = buildTestWriter(mockEvp); + + Map hugeAttrs = new HashMap<>(); + for (int i = 0; i < 300; i++) { + hugeAttrs.put("key" + i, "v" + i); + } + setup.handler.add(event("flag-d", "on", "alloc1", "user-1", 1000L, hugeAttrs)); + + FlagEvaluationWriterImpl.AggregatedState state = setup.handler.drainAndAggregate(); + assertEquals(1, state.fullTier.size()); + FlagEvaluationWriterImpl.EvalBucket bucket = state.fullTier.values().iterator().next(); + assertEquals( + 256, + bucket.prunedContextFieldCount(), + "Context must be pruned to exactly 256 fields and the PRUNED attrs stored"); + assertEquals(256, bucket.prunedAttrs.size(), "stored prunedAttrs map must be the pruned view"); + } + + // ---- pruning is deterministic — same input -> same kept subset across runs ---- + + @Test + void pruningIsDeterministicSortBeforeCut() { + Map attrs = new HashMap<>(); + for (int i = 0; i < 300; i++) { + attrs.put(String.format("k%03d", i), "v" + i); + } + Map p1 = FlagEvaluationWriterImpl.pruneContext(attrs); + Map p2 = FlagEvaluationWriterImpl.pruneContext(new HashMap<>(attrs)); + assertEquals(256, p1.size()); + assertEquals(p1.keySet(), p2.keySet(), "Pruned key set must be deterministic across calls"); + // Sorted-before-cut means the first 256 sorted keys survive: k000..k255, never k256+. + assertTrue(p1.containsKey("k000"), "lowest sorted key must survive"); + assertTrue(p1.containsKey("k255"), "256th sorted key must survive"); + assertFalse(p1.containsKey("k256"), "257th sorted key must be cut"); + assertFalse(p1.containsKey("k299"), "highest sorted key must be cut"); + } + + // ---- string context value >256 chars is skipped from the pruned attrs ---- + + @Test + void contextValueExceeding256CharsIsSkippedFromPrunedAttrs() { + BackendApi mockEvp = mock(BackendApi.class); + TestWriterSetup setup = buildTestWriter(mockEvp); + + StringBuilder longValBuilder = new StringBuilder(300); + for (int i = 0; i < 300; i++) longValBuilder.append('x'); + String longVal = longValBuilder.toString(); + + Map attrs = new HashMap<>(); + attrs.put("long-val", longVal); // >256 chars + attrs.put("short-val", "ok"); + setup.handler.add(event("flag-e", "on", "alloc1", "user-1", 1000L, attrs)); + + FlagEvaluationWriterImpl.AggregatedState state = setup.handler.drainAndAggregate(); + assertEquals(1, state.fullTier.size()); + FlagEvaluationWriterImpl.EvalBucket bucket = state.fullTier.values().iterator().next(); + assertFalse( + bucket.prunedAttrs.containsKey("long-val"), + "Oversized string value must be skipped from the pruned attrs"); + assertTrue(bucket.prunedAttrs.containsKey("short-val"), "Normal value must survive pruning"); + } + + // ---- flush posts to the "flagevaluation" endpoint ---- + + @Test + void flushPostsToFlagevaluationEndpoint() throws Exception { + BackendApi mockEvp = mock(BackendApi.class); + TestWriterSetup setup = buildTestWriter(mockEvp); + + setup.handler.add(event("flag-f", "on", "alloc1", "user-1", 1000L, Collections.emptyMap())); + setup.handler.drainAndAggregate(); + setup.handler.flush(); + + verify(setup.factory) + .createBackendApi(eq(Intake.EVENT_PLATFORM), eq(V2_EVP_PROXY_ENDPOINT), eq(false)); + verify(mockEvp).post(eq("flagevaluation"), any(RequestBody.class), any(), any(), eq(false)); + } + + @Test + void flushSplitsPayloadsByEncodedSize() throws Exception { + final int limit = 1_100; + BackendApi mockEvp = mock(BackendApi.class); + TestWriterSetup setup = buildTestWriter(mockEvp, limit); + + for (int i = 0; i < 4; i++) { + Map attrs = new HashMap<>(); + attrs.put("payload", repeat('x', 180)); + setup.handler.add(event("split-flag-" + i, "on", "alloc1", "user-" + i, 1000L + i, attrs)); + } + + List captured = flushAndCaptureAll(setup); + + assertTrue(captured.size() > 1, "flush must split oversized batches into multiple payloads"); + int events = 0; + for (CapturedJson payload : captured) { + assertTrue( + FeatureFlagEvpPublisher.utf8Bytes(payload.raw).length <= limit, + "every emitted payload must stay under the configured byte limit"); + events += eventCount(payload.parsed); + } + assertEquals(4, events, "splitting must preserve every event"); + final Collection metrics = + CoreMetricCollector.getInstance().drain(); + assertEquals( + captured.size() - 1, + metricSum(metrics, FlagEvaluationWriterImpl.FLAG_EVALUATION_SPLITS_METRIC, null)); + } + + @Test + void oversizedFullPayloadRowIsDegradedBeforeDrop() throws Exception { + final int limit = 512; + BackendApi mockEvp = mock(BackendApi.class); + TestWriterSetup setup = buildTestWriter(mockEvp, limit); + + Map attrs = new HashMap<>(); + for (int i = 0; i < 4; i++) { + attrs.put("payload-" + i, repeat('x', 200)); + } + setup.handler.add(event("oversized-full", "on", "alloc1", "user-1", 1000L, attrs)); + setup.handler.add(event("oversized-full", "on", "alloc1", "user-1", 2000L, attrs)); + + List captured = flushAndCaptureAll(setup); + + assertEquals(1, captured.size(), "degraded row should still be posted"); + assertTrue( + FeatureFlagEvpPublisher.utf8Bytes(captured.get(0).raw).length <= limit, + "degraded payload must fit under the configured byte limit"); + Map ev = firstEvent(captured.get(0).parsed); + assertEquals(2.0, ((Number) ev.get("evaluation_count")).doubleValue()); + assertNull(ev.get("targeting_key"), "degraded event must omit targeting_key"); + assertNull(ev.get("context"), "degraded event must omit context"); + final Collection metrics = + CoreMetricCollector.getInstance().drain(); + assertEquals( + 2, + metricSum( + metrics, + FlagEvaluationWriterImpl.FLAG_EVALUATION_DEGRADED_METRIC, + "reason:" + FlagEvaluationWriterImpl.DEGRADED_REASON_PAYLOAD_LIMIT)); + } + + @Test + void oversizedDegradedPayloadRowIsDropped() throws Exception { + final int limit = 128; + BackendApi mockEvp = mock(BackendApi.class); + TestWriterSetup setup = buildTestWriter(mockEvp, limit); + + Map attrs = new HashMap<>(); + attrs.put("payload", "x"); + setup.handler.add(event(repeat('f', 512), "on", "alloc1", "user-1", 1000L, attrs)); + setup.handler.add(event(repeat('f', 512), "on", "alloc1", "user-1", 2000L, attrs)); + + List captured = flushAndCaptureAll(setup); + + assertTrue(captured.isEmpty(), "row must be dropped when degraded JSON still exceeds limit"); + final Collection metrics = + CoreMetricCollector.getInstance().drain(); + assertEquals( + 2, + metricSum( + metrics, + FlagEvaluationWriterImpl.FLAG_EVALUATION_DROPPED_METRIC, + "reason:" + FlagEvaluationWriterImpl.DROP_REASON_PAYLOAD_LIMIT)); + } + + @Test + void capSizingUsesNamedScaleConstants() { + assertEquals(125_000, FlagEvaluationWriterImpl.EVAL_SCALE_FULL_BUCKET_TARGET); + assertEquals(10_000, FlagEvaluationWriterImpl.EVAL_SCALE_PER_FLAG_BUCKET_TARGET); + assertEquals(25_000, FlagEvaluationWriterImpl.EVAL_SCALE_DEGRADED_BUCKET_TARGET); + assertEquals(131_072, FlagEvaluationWriterImpl.GLOBAL_CAP); + assertEquals(10_000, FlagEvaluationWriterImpl.PER_FLAG_CAP); + assertEquals(32_768, FlagEvaluationWriterImpl.DEGRADED_CAP); + } + + // ---- emitted full-tier payload uses the flageval-worker wire shape ---- + + @Test + void fullTierPayloadUsesWorkerWireShape() throws Exception { + BackendApi mockEvp = mock(BackendApi.class); + TestWriterSetup setup = buildTestWriter(mockEvp); + + Map attrs = new HashMap<>(); + attrs.put("region", "us-east-1"); + setup.handler.add(event("my-flag", "on", "alloc-x", "user-1", REALISTIC_EVAL_MS, attrs)); + + CapturedJson captured = flushAndCapture(setup); + + Map json = captured.parsed; + Map ev = firstEvent(json); + assertObjectWithKey(ev.get("variant"), "on", "variant must serialize as {\"key\":...}"); + assertObjectWithKey( + ev.get("allocation"), "alloc-x", "allocation must serialize as {\"key\":...}"); + assertObjectWithKey(ev.get("flag"), "my-flag", "flag must serialize as {\"key\":...}"); + // context must nest under "evaluation", not raw at top level (additionalProperties:false). + Map ctx = (Map) ev.get("context"); + assertNotNull(ctx, "full-tier context must be present"); + Map evalAttrs = (Map) ctx.get("evaluation"); + assertNotNull(evalAttrs, "context.evaluation must hold the attributes"); + assertEquals("us-east-1", evalAttrs.get("region")); + } + + // ---- error path serializes error as {"message":...} ---- + + @Test + void errorPayloadSerializesErrorObject() throws Exception { + BackendApi mockEvp = mock(BackendApi.class); + TestWriterSetup setup = buildTestWriter(mockEvp); + + setup.handler.add(errorEvent("err-flag", "type mismatch", REALISTIC_EVAL_MS)); + + CapturedJson captured = flushAndCapture(setup); + + Map json = captured.parsed; + Map ev = firstEvent(json); + Map error = (Map) ev.get("error"); + assertNotNull(error, "error must be present on the error path"); + assertEquals( + "type mismatch", error.get("message"), "error must serialize as {\"message\":...}"); + // runtime default: absent variant -> runtime_default_used true. + assertEquals(Boolean.TRUE, ev.get("runtime_default_used")); + } + + @Test + void workerPayloadDoesNotEmitTopLevelReason() throws Exception { + BackendApi mockEvp = mock(BackendApi.class); + TestWriterSetup setup = buildTestWriter(mockEvp); + + setup.handler.add( + event("reason-schema-flag", "on", "alloc-x", "user-1", REALISTIC_EVAL_MS, new HashMap<>())); + + CapturedJson captured = flushAndCapture(setup); + Map ev = firstEvent(captured.parsed); + assertFalse(ev.containsKey("reason"), "payload must not emit hidden reason cardinality"); + } + + @Test + void flagEvalEventDoesNotCarryReason() { + boolean hasReasonField = + Arrays.stream(FlagEvalEvent.class.getDeclaredFields()) + .anyMatch(field -> field.getName().equals("reason")); + + assertFalse(hasReasonField, "EVP event snapshots must not carry hidden reason cardinality"); + } + + private static void assertObjectWithKey(Object o, String expectedKey, String msg) { + assertTrue(o instanceof Map, msg + " (must be a JSON object, not a bare string)"); + assertEquals(expectedKey, ((Map) o).get("key"), msg); + } + + @SuppressWarnings("unchecked") + private static Map firstEvent(Map batch) { + List events = (List) batch.get("flagEvaluations"); + assertNotNull(events, "flagEvaluations array must be present"); + assertFalse(events.isEmpty(), "flagEvaluations must not be empty"); + return (Map) events.get(0); + } + + @SuppressWarnings("unchecked") + private static int eventCount(Map batch) { + List events = (List) batch.get("flagEvaluations"); + assertNotNull(events, "flagEvaluations array must be present"); + return events.size(); + } + + @SuppressWarnings("unchecked") + private static Map eventForFlag(Map batch, String flagKey) { + List events = (List) batch.get("flagEvaluations"); + for (Object o : events) { + Map ev = (Map) o; + Map flag = (Map) ev.get("flag"); + if (flag != null && flagKey.equals(flag.get("key"))) { + return ev; + } + } + return null; + } +}