diff --git a/products/feature-flagging/feature-flagging-agent/build.gradle.kts b/products/feature-flagging/feature-flagging-agent/build.gradle.kts index 902ae5a6023..1dcadde035f 100644 --- a/products/feature-flagging/feature-flagging-agent/build.gradle.kts +++ b/products/feature-flagging/feature-flagging-agent/build.gradle.kts @@ -16,6 +16,8 @@ dependencies { api(project(":products:feature-flagging:feature-flagging-lib")) api(project(":internal-api")) + testImplementation(libs.bundles.junit5) + testImplementation(libs.bundles.mockito) testImplementation(project(":utils:test-utils")) testRuntimeOnly(project(":dd-trace-core")) } diff --git a/products/feature-flagging/feature-flagging-agent/src/test/groovy/com/datadog/featureflag/FeatureFlaggingSystemTest.groovy b/products/feature-flagging/feature-flagging-agent/src/test/groovy/com/datadog/featureflag/FeatureFlaggingSystemTest.groovy deleted file mode 100644 index 32510d313d0..00000000000 --- a/products/feature-flagging/feature-flagging-agent/src/test/groovy/com/datadog/featureflag/FeatureFlaggingSystemTest.groovy +++ /dev/null @@ -1,60 +0,0 @@ -package com.datadog.featureflag - -import datadog.communication.ddagent.DDAgentFeaturesDiscovery -import datadog.communication.ddagent.SharedCommunicationObjects -import datadog.remoteconfig.Capabilities -import datadog.remoteconfig.ConfigurationDeserializer -import datadog.remoteconfig.ConfigurationPoller -import datadog.remoteconfig.Product -import datadog.trace.api.Config -import datadog.trace.test.util.DDSpecification -import okhttp3.HttpUrl - -class FeatureFlaggingSystemTest extends DDSpecification { - - void 'test feature flag system initialization'() { - setup: - final poller = Mock(ConfigurationPoller) - final discovery = Stub(DDAgentFeaturesDiscovery) { - discoverIfOutdated() >> {} - supportsEvpProxy() >> { return true } - } - final sco = Stub(SharedCommunicationObjects) { - configurationPoller(_ as Config) >> poller - featuresDiscovery(_ as Config) >> discovery - } - sco.featuresDiscovery = discovery - sco.agentUrl = HttpUrl.get('http://localhost') - - when: - FeatureFlaggingSystem.start(sco) - - then: - 1 * poller.addCapabilities(Capabilities.CAPABILITY_FFE_FLAG_CONFIGURATION_RULES) - 1 * poller.addListener(Product.FFE_FLAGS, _ as ConfigurationDeserializer, _) - 1 * poller.start() - - when: - FeatureFlaggingSystem.stop() - - then: - 1 * poller.removeCapabilities(Capabilities.CAPABILITY_FFE_FLAG_CONFIGURATION_RULES) - 1 * poller.removeListeners(Product.FFE_FLAGS) - 1 * poller.stop() - } - - void 'test that remote config is required'() { - setup: - injectSysConfig('remote_configuration.enabled', 'false') - final sco = Mock(SharedCommunicationObjects) - - when: - FeatureFlaggingSystem.start(sco) - - then: - thrown(IllegalStateException) - - cleanup: - FeatureFlaggingSystem.stop() - } -} diff --git a/products/feature-flagging/feature-flagging-agent/src/test/java/com/datadog/featureflag/FeatureFlaggingSystemTest.java b/products/feature-flagging/feature-flagging-agent/src/test/java/com/datadog/featureflag/FeatureFlaggingSystemTest.java new file mode 100644 index 00000000000..f7534278520 --- /dev/null +++ b/products/feature-flagging/feature-flagging-agent/src/test/java/com/datadog/featureflag/FeatureFlaggingSystemTest.java @@ -0,0 +1,64 @@ +package com.datadog.featureflag; + +import static datadog.trace.api.config.RemoteConfigConfig.REMOTE_CONFIGURATION_ENABLED; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import datadog.communication.ddagent.DDAgentFeaturesDiscovery; +import datadog.communication.ddagent.SharedCommunicationObjects; +import datadog.remoteconfig.Capabilities; +import datadog.remoteconfig.ConfigurationDeserializer; +import datadog.remoteconfig.ConfigurationPoller; +import datadog.remoteconfig.Product; +import datadog.trace.api.Config; +import datadog.trace.junit.utils.config.WithConfig; +import okhttp3.HttpUrl; +import okhttp3.OkHttpClient; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +class FeatureFlaggingSystemTest { + + @AfterEach + void stopFeatureFlaggingSystem() { + FeatureFlaggingSystem.stop(); + } + + @Test + void testFeatureFlagSystemInitialization() { + ConfigurationPoller poller = mock(ConfigurationPoller.class); + DDAgentFeaturesDiscovery discovery = mock(DDAgentFeaturesDiscovery.class); + SharedCommunicationObjects sharedCommunicationObjects = mock(SharedCommunicationObjects.class); + when(discovery.supportsEvpProxy()).thenReturn(true); + when(discovery.getEvpProxyEndpoint()).thenReturn("/evp_proxy/"); + when(sharedCommunicationObjects.configurationPoller(any(Config.class))).thenReturn(poller); + when(sharedCommunicationObjects.featuresDiscovery(any(Config.class))).thenReturn(discovery); + sharedCommunicationObjects.agentUrl = HttpUrl.get("http://localhost"); + sharedCommunicationObjects.agentHttpClient = new OkHttpClient.Builder().build(); + + FeatureFlaggingSystem.start(sharedCommunicationObjects); + + verify(poller).addCapabilities(Capabilities.CAPABILITY_FFE_FLAG_CONFIGURATION_RULES); + verify(poller).addListener(eq(Product.FFE_FLAGS), any(ConfigurationDeserializer.class), any()); + verify(poller).start(); + + FeatureFlaggingSystem.stop(); + + verify(poller).removeCapabilities(Capabilities.CAPABILITY_FFE_FLAG_CONFIGURATION_RULES); + verify(poller).removeListeners(Product.FFE_FLAGS); + verify(poller).stop(); + } + + @Test + @WithConfig(key = REMOTE_CONFIGURATION_ENABLED, value = "false") + void testThatRemoteConfigIsRequired() { + SharedCommunicationObjects sharedCommunicationObjects = mock(SharedCommunicationObjects.class); + + assertThrows( + IllegalStateException.class, () -> FeatureFlaggingSystem.start(sharedCommunicationObjects)); + } +} diff --git a/products/feature-flagging/feature-flagging-bootstrap/build.gradle.kts b/products/feature-flagging/feature-flagging-bootstrap/build.gradle.kts index d2d68b59ddf..d8e945a5565 100644 --- a/products/feature-flagging/feature-flagging-bootstrap/build.gradle.kts +++ b/products/feature-flagging/feature-flagging-bootstrap/build.gradle.kts @@ -30,5 +30,7 @@ extra["excludedClassesCoverage"] = listOf( ) dependencies { + testImplementation(libs.bundles.junit5) + testImplementation(libs.bundles.mockito) testImplementation(project(":utils:test-utils")) } diff --git a/products/feature-flagging/feature-flagging-bootstrap/src/test/groovy/datadog/trace/api/featureflag/FeatureFlaggingGatewayTest.groovy b/products/feature-flagging/feature-flagging-bootstrap/src/test/groovy/datadog/trace/api/featureflag/FeatureFlaggingGatewayTest.groovy deleted file mode 100644 index 6dd02e3f96e..00000000000 --- a/products/feature-flagging/feature-flagging-bootstrap/src/test/groovy/datadog/trace/api/featureflag/FeatureFlaggingGatewayTest.groovy +++ /dev/null @@ -1,76 +0,0 @@ -package datadog.trace.api.featureflag - -import datadog.trace.api.featureflag.exposure.ExposureEvent -import datadog.trace.api.featureflag.ufc.v1.ServerConfiguration -import spock.lang.Specification - -class FeatureFlaggingGatewayTest extends Specification { - - void 'test attaching a config listener'() { - given: - def listener = Mock(FeatureFlaggingGateway.ConfigListener) - final first = Stub(ServerConfiguration) - final second = Stub(ServerConfiguration) - - when: - FeatureFlaggingGateway.addConfigListener(listener) - FeatureFlaggingGateway.dispatch(first) - - then: - 1 * listener.accept(first) - 0 * _ - - when: - FeatureFlaggingGateway.dispatch(second) - - then: - 1 * listener.accept(second) - 0 * _ - - - cleanup: - FeatureFlaggingGateway.removeConfigListener(listener) - } - - void 'test attaching a listener after configured'() { - given: - def listener = Mock(FeatureFlaggingGateway.ConfigListener) - final first = Stub(ServerConfiguration) - - when: - FeatureFlaggingGateway.dispatch(first) - FeatureFlaggingGateway.addConfigListener(listener) - - then: - 1 * listener.accept(first) - 0 * _ - - cleanup: - FeatureFlaggingGateway.removeConfigListener(listener) - } - - void 'test attaching an exposure listener'() { - given: - def listener = Mock(FeatureFlaggingGateway.ExposureListener) - final first = Stub(ExposureEvent) - final second = Stub(ExposureEvent) - - when: - FeatureFlaggingGateway.addExposureListener(listener) - FeatureFlaggingGateway.dispatch(first) - - then: - 1 * listener.accept(first) - 0 * _ - - when: - FeatureFlaggingGateway.dispatch(second) - - then: - 1 * listener.accept(second) - 0 * _ - - cleanup: - FeatureFlaggingGateway.removeExposureListener(listener) - } -} diff --git a/products/feature-flagging/feature-flagging-bootstrap/src/test/java/datadog/trace/api/featureflag/FeatureFlaggingGatewayTest.java b/products/feature-flagging/feature-flagging-bootstrap/src/test/java/datadog/trace/api/featureflag/FeatureFlaggingGatewayTest.java new file mode 100644 index 00000000000..9eb6c35a5e4 --- /dev/null +++ b/products/feature-flagging/feature-flagging-bootstrap/src/test/java/datadog/trace/api/featureflag/FeatureFlaggingGatewayTest.java @@ -0,0 +1,83 @@ +package datadog.trace.api.featureflag; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; + +import datadog.trace.api.featureflag.exposure.ExposureEvent; +import datadog.trace.api.featureflag.ufc.v1.ServerConfiguration; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class FeatureFlaggingGatewayTest { + + @BeforeEach + @AfterEach + void resetCurrentConfiguration() { + FeatureFlaggingGateway.dispatch((ServerConfiguration) null); + } + + @Test + void testAttachingAConfigListener() { + FeatureFlaggingGateway.ConfigListener listener = + mock(FeatureFlaggingGateway.ConfigListener.class); + ServerConfiguration first = mock(ServerConfiguration.class); + ServerConfiguration second = mock(ServerConfiguration.class); + + try { + FeatureFlaggingGateway.addConfigListener(listener); + FeatureFlaggingGateway.dispatch(first); + + verify(listener).accept(first); + verifyNoMoreInteractions(listener); + + FeatureFlaggingGateway.dispatch(second); + + verify(listener).accept(second); + verifyNoMoreInteractions(listener); + } finally { + FeatureFlaggingGateway.removeConfigListener(listener); + } + } + + @Test + void testAttachingAListenerAfterConfigured() { + FeatureFlaggingGateway.ConfigListener listener = + mock(FeatureFlaggingGateway.ConfigListener.class); + ServerConfiguration first = mock(ServerConfiguration.class); + + try { + FeatureFlaggingGateway.dispatch(first); + FeatureFlaggingGateway.addConfigListener(listener); + + verify(listener).accept(first); + verifyNoMoreInteractions(listener); + } finally { + FeatureFlaggingGateway.removeConfigListener(listener); + } + } + + @Test + void testAttachingAnExposureListener() { + FeatureFlaggingGateway.ExposureListener listener = + mock(FeatureFlaggingGateway.ExposureListener.class); + ExposureEvent first = mock(ExposureEvent.class); + ExposureEvent second = mock(ExposureEvent.class); + + try { + FeatureFlaggingGateway.addExposureListener(listener); + FeatureFlaggingGateway.dispatch(first); + + verify(listener).accept(first); + verifyNoMoreInteractions(listener); + + FeatureFlaggingGateway.dispatch(second); + + verify(listener).accept(second); + verifyNoMoreInteractions(listener); + } finally { + FeatureFlaggingGateway.removeExposureListener(listener); + } + } +} diff --git a/products/feature-flagging/feature-flagging-lib/src/test/groovy/com/datadog/featureflag/ExposureWriterTests.groovy b/products/feature-flagging/feature-flagging-lib/src/test/groovy/com/datadog/featureflag/ExposureWriterTests.groovy deleted file mode 100644 index 960102b8da2..00000000000 --- a/products/feature-flagging/feature-flagging-lib/src/test/groovy/com/datadog/featureflag/ExposureWriterTests.groovy +++ /dev/null @@ -1,317 +0,0 @@ -package com.datadog.featureflag - -import static datadog.trace.agent.test.server.http.TestHttpServer.httpServer -import static java.util.concurrent.TimeUnit.MILLISECONDS - -import com.squareup.moshi.Moshi -import datadog.communication.ddagent.DDAgentFeaturesDiscovery -import datadog.communication.ddagent.SharedCommunicationObjects -import datadog.trace.agent.test.server.http.TestHttpServer -import datadog.trace.api.Config -import datadog.trace.api.IdGenerationStrategy -import datadog.trace.api.featureflag.FeatureFlaggingGateway -import datadog.trace.api.featureflag.exposure.Allocation -import datadog.trace.api.featureflag.exposure.ExposureEvent -import datadog.trace.api.featureflag.exposure.ExposuresRequest -import datadog.trace.api.featureflag.exposure.Flag -import datadog.trace.api.featureflag.exposure.Subject -import datadog.trace.api.featureflag.exposure.Variant -import datadog.trace.test.util.DDSpecification -import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.ConcurrentLinkedQueue -import java.util.concurrent.CountDownLatch -import java.util.concurrent.Executors -import okhttp3.HttpUrl -import okhttp3.OkHttpClient -import okio.Okio -import spock.lang.AutoCleanup -import spock.lang.Shared -import spock.util.concurrent.PollingConditions - -class ExposureWriterTests extends DDSpecification { - - @Shared - protected final Queue requests = new ConcurrentLinkedQueue<>() - - @Shared - protected final Set failed = Collections.newSetFromMap(new ConcurrentHashMap()) - - @Shared - @AutoCleanup - protected TestHttpServer server = httpServer { - final adapter = new Moshi.Builder().build().adapter(ExposuresRequest) - handlers { - prefix("/evp_proxy/api/v2/exposures") { - final exposuresRequest = adapter.fromJson(Okio.buffer(Okio.source(new ByteArrayInputStream(request.body)))) - final serviceName = exposuresRequest.context.service - final failForever = serviceName == 'fail-forever' - final fail = serviceName.startsWith('fail') && (failed.add(serviceName) || failForever) - if (fail) { - response.status(500).send('Boom!!!') - } else { - requests.add(exposuresRequest) - response.status(200).send('OK') - } - } - } - } - - @Shared - protected PollingConditions poll = new PollingConditions(timeout: 5) - - @Shared - protected SharedCommunicationObjects sco = Stub(SharedCommunicationObjects) { - featuresDiscovery(_ as Config) >> { - return Mock(DDAgentFeaturesDiscovery) { - supportsEvpProxy() >> true - getEvpProxyEndpoint() >> '/evp_proxy/' - } - } - }.tap { - agentUrl = HttpUrl.get(server.address) - agentHttpClient = new OkHttpClient.Builder().build() - } - - void cleanup() { - requests.clear() - failed.clear() - } - - void 'test exposure event writes'() { - setup: - def config = mockConfig(service, env, version) - def exposures = (1..5).collect { buildExposure() } - def writer = new ExposureWriterImpl(1 << 4, 100, MILLISECONDS, sco, config) - writer.init() - - when: - exposures.each { writer.accept(it) } - - then: - poll.eventually { - assert !requests.empty - requests.each { - assert it.context.service == service ?: 'unknown' - if (env) { - assert it.context.env == env - } - if (version) { - assert it.context.version == version.toString() - } - } - final received = requests*.exposures.flatten() as List - assertExposures(received, exposures) - } - - cleanup: - writer.close() - - where: - service | env | version - null | null | null - 'test-service' | 'test' | '23' - 'test-service' | null | '23' - 'test-service' | 'test' | null - } - - void 'test lru cache'() { - setup: - def config = mockConfig('test-service') - def exposures = (0..5).collect { buildExposure() } - def writer = new ExposureWriterImpl(1 << 4, 100, MILLISECONDS, sco, config) - writer.init() - - when: 'populating the cache' - exposures.each { writer.accept(it) } - - then: 'all events are written' - new PollingConditions(timeout: 1).eventually { - requests*.exposures.flatten().size() == exposures.size() - } - - when: 'publishing duplicate events' - exposures.each { writer.accept(it) } - - then: 'no events are written' - MILLISECONDS.sleep(300) // wait until a flush happens - requests*.exposures.flatten().size() == exposures.size() - - when: 'a new event is generated' - writer.accept(buildExposure()) - - then: 'oldest event is evicted and the new one is submitted' - poll.eventually { - requests*.exposures.flatten().size() == exposures.size() + 1 - } - - cleanup: - writer.close() - } - - void 'test high load scenario'() { - setup: - def config = mockConfig('test-service') - def exposuresPerThread = 100 - def random = new Random() - def threads = Runtime.runtime.availableProcessors() - def executor = Executors.newFixedThreadPool(threads) - def exposures = (1..(threads * exposuresPerThread)).collect { - buildExposure() - } - def latch = new CountDownLatch(1) - def writer = new ExposureWriterImpl(sco, config) - writer.init() - - when: - def futures = exposures.collate(exposuresPerThread).collect { partition -> - executor.submit { - latch.await() - partition.each { - MILLISECONDS.sleep(random.nextInt(2)) - writer.accept(it) - } - return true - } - } - latch.countDown() // start threads - - then: - futures.each { it.get() } // wait for all threads to finish - poll.eventually { - final received = requests*.exposures.flatten() as List - assertExposures(received, exposures) - } - - cleanup: - writer.close() - executor.shutdownNow() - } - - void 'test failures are retried'() { - setup: - def config = mockConfig(serviceName) - def writer = new ExposureWriterImpl(1 << 4, 100, MILLISECONDS, sco, config) - writer.init() - - when: - writer.accept(buildExposure()) - - then: - MILLISECONDS.sleep(500) // wait for a flush to happen - final found = requests.find { it.context.service == serviceName } - if (finallyFail) { - assert found == null: requests - } else { - assert found != null: requests - } - - cleanup: - writer.close() - - where: - serviceName | finallyFail - 'fail-once' | false - 'fail-forever' | true - } - - void 'test writer stops receiving exposures if evp proxy is not available'() { - given: - final sco = Stub(SharedCommunicationObjects) { - featuresDiscovery(_ as Config) >> { - return Mock(DDAgentFeaturesDiscovery) { - supportsEvpProxy() >> false - } - } - } - def writer = new ExposureWriterImpl(sco, Config.get()) - - when: - writer.init() - - then: - poll.eventually { - assert !writer.serializerThread.isAlive() - } - - when: - FeatureFlaggingGateway.dispatch(buildExposure()) - - then: - writer.queue.size() == 0 - - cleanup: - writer.close() - } - - private Config mockConfig(String serviceName, String env = 'test', String version = '0.0.0') { - return Mock(Config) { - getIdGenerationStrategy() >> IdGenerationStrategy.fromName("RANDOM") - getServiceName() >> serviceName - getEnv() >> env - getVersion() >> version - } - } - - private static void assertExposures(final List receivedExposures, final List expectedExposures) { - assert receivedExposures.size() == expectedExposures.size() - final received = new TreeSet(ExposureWriterTests::compare) - received.addAll(expectedExposures) - assert received.containsAll(expectedExposures) - } - - private static int compare(final ExposureEvent a, final ExposureEvent b) { - if (a.is(b)) { - return 0 - } - if (a == null) { - return -1 - } - if (b == null) { - return 1 - } - - def result = a.timestamp <=> b.timestamp - if (result) { - return result - } - - result = (a.flag?.key ?: '') <=> (b.flag?.key ?: '') - if (result) { - return result - } - - result = (a.variant?.key ?: '') <=> (b.variant?.key ?: '') - if (result) { - return result - } - - result = (a.allocation?.key ?: '') <=> (b.allocation?.key ?: '') - if (result) { - return result - } - - result = (a.subject?.id ?: '') <=> (b.subject?.id ?: '') - if (result) { - return result - } - - final aEntry = a.subject?.attributes?.entrySet()?.iterator()?.next() - final bEntry = b.subject?.attributes?.entrySet()?.iterator()?.next() - result = (aEntry?.key ?: '') <=> (bEntry?.key ?: '') - if (result) { - return result - } - return (aEntry?.value?.toString() ?: '') <=> (bEntry?.value?.toString() ?: '') - } - - private static ExposureEvent buildExposure() { - final idx = UUID.randomUUID().toString() - return new ExposureEvent( - System.currentTimeMillis(), - new Allocation("Allocation_$idx"), - new Flag("Flag_$idx"), - new Variant("Variant_$idx"), - new Subject("Subject_$idx", [("key_$idx".toString()): "value_$idx".toString()]) - ) - } -} diff --git a/products/feature-flagging/feature-flagging-lib/src/test/groovy/com/datadog/featureflag/LRUExposureCacheTest.groovy b/products/feature-flagging/feature-flagging-lib/src/test/groovy/com/datadog/featureflag/LRUExposureCacheTest.groovy deleted file mode 100644 index fa8bd14c3bd..00000000000 --- a/products/feature-flagging/feature-flagging-lib/src/test/groovy/com/datadog/featureflag/LRUExposureCacheTest.groovy +++ /dev/null @@ -1,256 +0,0 @@ -package com.datadog.featureflag - -import datadog.trace.api.featureflag.exposure.Allocation -import datadog.trace.api.featureflag.exposure.ExposureEvent -import datadog.trace.api.featureflag.exposure.Flag -import datadog.trace.api.featureflag.exposure.Subject -import datadog.trace.api.featureflag.exposure.Variant -import spock.lang.Specification - -class LRUExposureCacheTest extends Specification { - - void 'test adding elements'() { - given: - final cache = new LRUExposureCache(5) - final event = createEvent('flag', 'subject', 'variant', 'allocation') - - when: - final added = cache.add(event) - - then: - added - cache.size() == 1 - } - - void 'test adding duplicate events returns false'() { - given: - final cache = new LRUExposureCache(5) - final event = createEvent('flag', 'subject', 'variant', 'allocation') - - when: - cache.add(event) - final duplicateAdded = cache.add(event) - - then: - !duplicateAdded - cache.size() == 1 - } - - void 'test adding events with same key but different details updates cache'() { - given: - final cache = new LRUExposureCache(5) - final event1 = createEvent('flag', 'subject', 'variant1', 'allocation1') - final event2 = createEvent('flag', 'subject', 'variant2', 'allocation2') - final key = new ExposureCache.Key(event1) - - when: - final added1 = cache.add(event1) - final added2 = cache.add(event2) - final retrieved = cache.get(key) - - then: - added1 - added2 - cache.size() == 1 - retrieved.variant == 'variant2' - retrieved.allocation == 'allocation2' - } - - void 'test LRU eviction when capacity exceeded'() { - given: - final cache = new LRUExposureCache(2) - final event1 = createEvent('flag1', 'subject1', 'variant1', 'allocation1') - final event2 = createEvent('flag2', 'subject2', 'variant2', 'allocation2') - final event3 = createEvent('flag3', 'subject3', 'variant3', 'allocation3') - final key1 = new ExposureCache.Key(event1) - final key3 = new ExposureCache.Key(event3) - - when: - cache.add(event1) - cache.add(event2) - cache.add(event3) - - then: - cache.size() == 2 - cache.get(key1) == null // event1 should be evicted - cache.get(key3) != null // event3 should be present - cache.get(key3).variant == 'variant3' - cache.get(key3).allocation == 'allocation3' - } - - void 'test single capacity cache'() { - given: - final cache = new LRUExposureCache(1) - final event1 = createEvent('flag1', 'subject1', 'variant1', 'allocation1') - final event2 = createEvent('flag2', 'subject2', 'variant2', 'allocation2') - - when: - cache.add(event1) - cache.add(event2) - - then: - cache.size() == 1 - } - - void 'test zero capacity cache'() { - given: - final cache = new LRUExposureCache(0) - final event = createEvent('flag', 'subject', 'variant', 'allocation') - - when: - final added = cache.add(event) - - then: - added - cache.size() == 0 - } - - void 'test empty cache size'() { - given: - final cache = new LRUExposureCache(5) - - expect: - cache.size() == 0 - } - - void 'test multiple additions with same flag different subjects'() { - given: - final cache = new LRUExposureCache(10) - final events = [] - for (int i = 0; i < 5; i++) { - events << createEvent('flag', "subject${i}", 'variant', 'allocation') - } - - when: - def results = events.collect { cache.add(it) } - - then: - results.every { it == true } - cache.size() == 5 - } - - void 'test multiple additions with same subject different flags'() { - given: - final cache = new LRUExposureCache(10) - final events = [] - for (int i = 0; i < 5; i++) { - events << createEvent("flag${i}", 'subject', 'variant', 'allocation') - } - - when: - def results = events.collect { cache.add(it) } - - then: - results.every { it == true } - cache.size() == 5 - } - - void 'test key equality with null values'() { - given: - final cache = new LRUExposureCache(5) - final event1 = new ExposureEvent( - System.currentTimeMillis(), - new Allocation('allocation'), - new Flag(null), - new Variant('variant'), - new Subject(null, [:]) - ) - final event2 = new ExposureEvent( - System.currentTimeMillis(), - new Allocation('allocation'), - new Flag(null), - new Variant('variant'), - new Subject(null, [:]) - ) - - when: - cache.add(event1) - final duplicateAdded = cache.add(event2) - - then: - !duplicateAdded - cache.size() == 1 - } - - void 'test updating existing key maintains LRU position'() { - given: - final cache = new LRUExposureCache(3) - final event1 = createEvent('flag1', 'subject1', 'variant1', 'allocation1') - final event2 = createEvent('flag2', 'subject2', 'variant2', 'allocation2') - final event3 = createEvent('flag3', 'subject3', 'variant3', 'allocation3') - final event1Updated = createEvent('flag1', 'subject1', 'variant2', 'allocation2') - final event4 = createEvent('flag4', 'subject4', 'variant4', 'allocation4') - final key1 = new ExposureCache.Key(event1) - final key2 = new ExposureCache.Key(event2) - final key4 = new ExposureCache.Key(event4) - - when: - cache.add(event1) - cache.add(event2) - cache.add(event3) - cache.add(event1Updated) // Updates event1, moves to most recent - cache.add(event4) // Should evict event2, not event1 - - then: - cache.size() == 3 - cache.get(key1) != null // event1 should be updated and present - cache.get(key1).variant == 'variant2' // verify it was updated - cache.get(key1).allocation == 'allocation2' - cache.get(key2) == null // event2 should be evicted - cache.get(key4) != null // event4 should be present - cache.get(key4).variant == 'variant4' - } - - void 'test duplicate exposure keeps subject hot in LRU order'() { - given: - final cache = new LRUExposureCache(3) - final event1 = createEvent('flag1', 'subject1', 'variant1', 'allocation1') - final event2 = createEvent('flag2', 'subject2', 'variant2', 'allocation2') - final event3 = createEvent('flag3', 'subject3', 'variant3', 'allocation3') - // same key + same details as event1: will go through the "duplicate" path - final event1Duplicate = createEvent('flag1', 'subject1', 'variant1', 'allocation1') - final event4 = createEvent('flag4', 'subject4', 'variant4', 'allocation4') - - final key1 = new ExposureCache.Key(event1) - final key2 = new ExposureCache.Key(event2) - final key4 = new ExposureCache.Key(event4) - - when: - // Fill cache - def added1 = cache.add(event1) - def added2 = cache.add(event2) - def added3 = cache.add(event3) - - // Duplicate exposure for subject1: should *not* change size, but *should* bump recency - def duplicateAdded = cache.add(event1Duplicate) - - // Now push over capacity: the least recently used *non-hot* entry (event2) should be evicted - def added4 = cache.add(event4) - - then: - added1 - added2 - added3 - !duplicateAdded // dedup correctly - added4 - - cache.size() == 3 - - cache.get(key1) != null // hot subject1 should still be present - cache.get(key2) == null // subject2 should be evicted - cache.get(key4) != null // newest subject4 should be present - - cache.get(key1).variant == 'variant1' - cache.get(key1).allocation == 'allocation1' - } - - private static ExposureEvent createEvent(String flag, String subject, String variant, String allocation) { - return new ExposureEvent( - System.currentTimeMillis(), - new Allocation(allocation), - new Flag(flag), - new Variant(variant), - new Subject(subject, [:]) - ) - } -} diff --git a/products/feature-flagging/feature-flagging-lib/src/test/java/com/datadog/featureflag/ExposureWriterTests.java b/products/feature-flagging/feature-flagging-lib/src/test/java/com/datadog/featureflag/ExposureWriterTests.java new file mode 100644 index 00000000000..4236ea009fb --- /dev/null +++ b/products/feature-flagging/feature-flagging-lib/src/test/java/com/datadog/featureflag/ExposureWriterTests.java @@ -0,0 +1,455 @@ +package com.datadog.featureflag; + +import static java.util.Collections.singletonMap; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.squareup.moshi.JsonAdapter; +import com.squareup.moshi.Moshi; +import datadog.common.queue.MessagePassingBlockingQueue; +import datadog.communication.ddagent.DDAgentFeaturesDiscovery; +import datadog.communication.ddagent.SharedCommunicationObjects; +import datadog.trace.agent.test.server.http.JavaTestHttpServer; +import datadog.trace.api.Config; +import datadog.trace.api.IdGenerationStrategy; +import datadog.trace.api.featureflag.FeatureFlaggingGateway; +import datadog.trace.api.featureflag.exposure.Allocation; +import datadog.trace.api.featureflag.exposure.ExposureEvent; +import datadog.trace.api.featureflag.exposure.ExposuresRequest; +import datadog.trace.api.featureflag.exposure.Flag; +import datadog.trace.api.featureflag.exposure.Subject; +import datadog.trace.api.featureflag.exposure.Variant; +import java.io.ByteArrayInputStream; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Random; +import java.util.Set; +import java.util.TreeSet; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import okhttp3.HttpUrl; +import okhttp3.OkHttpClient; +import okio.Okio; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.tabletest.junit.TableTest; + +class ExposureWriterTests { + + private static final Queue REQUESTS = new ConcurrentLinkedQueue<>(); + private static final Set FAILED = + Collections.newSetFromMap(new ConcurrentHashMap()); + + private static JavaTestHttpServer server; + private static SharedCommunicationObjects sharedCommunicationObjects; + + @BeforeAll + static void startServer() { + JsonAdapter adapter = + new Moshi.Builder().build().adapter(ExposuresRequest.class); + server = + JavaTestHttpServer.httpServer( + s -> + s.handlers( + h -> + h.prefix( + "/evp_proxy/api/v2/exposures", + api -> { + ExposuresRequest exposuresRequest = + adapter.fromJson( + Okio.buffer( + Okio.source( + new ByteArrayInputStream( + api.getRequest().getBody())))); + String serviceName = exposuresRequest.context.get("service"); + boolean failForever = "fail-forever".equals(serviceName); + boolean fail = + serviceName.startsWith("fail") + && (FAILED.add(serviceName) || failForever); + if (fail) { + api.getResponse().status(500).send("Boom!!!"); + } else { + REQUESTS.add(exposuresRequest); + api.getResponse().status(200).send("OK"); + } + }))); + sharedCommunicationObjects = sharedCommunicationObjects(true); + } + + @AfterAll + static void stopServer() { + if (server != null) { + server.close(); + } + } + + @AfterEach + void cleanup() { + REQUESTS.clear(); + FAILED.clear(); + } + + @TableTest({ + "service | env | version", + " | | ", + "'test-service' | 'test' | '23' ", + "'test-service' | | '23' ", + "'test-service' | 'test' | " + }) + void testExposureEventWrites(String service, String env, String version) throws Exception { + Config config = mockConfig(service, env, version); + List exposures = buildExposures(5); + + try (ExposureWriterImpl writer = + new ExposureWriterImpl(1 << 4, 100, MILLISECONDS, sharedCommunicationObjects, config)) { + writer.init(); + for (ExposureEvent exposure : exposures) { + writer.accept(exposure); + } + + eventually( + () -> { + assertFalse(REQUESTS.isEmpty()); + for (ExposuresRequest request : REQUESTS) { + assertContext(request.context, service, env, version); + } + assertExposures(allExposures(), exposures); + }, + 5000); + } + } + + @Test + void testLruCache() throws Exception { + Config config = mockConfig("test-service"); + List exposures = buildExposures(6); + + try (ExposureWriterImpl writer = + new ExposureWriterImpl(1 << 4, 100, MILLISECONDS, sharedCommunicationObjects, config)) { + writer.init(); + // populating the cache + for (ExposureEvent exposure : exposures) { + writer.accept(exposure); + } + + // all events are written + eventually(() -> assertEquals(exposures.size(), allExposures().size()), 1000); + + // publishing duplicate events + for (ExposureEvent exposure : exposures) { + writer.accept(exposure); + } + + // no events are written + MILLISECONDS.sleep(300); // wait until a flush happens + assertEquals(exposures.size(), allExposures().size()); + + // a new event is generated + writer.accept(buildExposure()); + + // oldest event is evicted and the new one is submitted + eventually(() -> assertEquals(exposures.size() + 1, allExposures().size()), 5000); + } + } + + @Test + void testHighLoadScenario() throws Exception { + Config config = mockConfig("test-service"); + int exposuresPerThread = 100; + Random random = new Random(); + int threads = Runtime.getRuntime().availableProcessors(); + ExecutorService executor = Executors.newFixedThreadPool(threads); + List exposures = buildExposures(threads * exposuresPerThread); + CountDownLatch latch = new CountDownLatch(1); + + try (ExposureWriterImpl writer = new ExposureWriterImpl(sharedCommunicationObjects, config)) { + writer.init(); + List> futures = new ArrayList<>(); + for (int index = 0; index < exposures.size(); index += exposuresPerThread) { + List partition = + exposures.subList(index, Math.min(index + exposuresPerThread, exposures.size())); + futures.add( + executor.submit( + () -> { + latch.await(); + for (ExposureEvent exposure : partition) { + MILLISECONDS.sleep(random.nextInt(2)); + writer.accept(exposure); + } + return true; + })); + } + latch.countDown(); // start threads + + for (Future future : futures) { + assertTrue(future.get()); // wait for all threads to finish + } + eventually(() -> assertExposures(allExposures(), exposures), 5000); + } finally { + executor.shutdownNow(); + } + } + + @TableTest({ + "serviceName | finallyFail", + "'fail-once' | false ", + "'fail-forever' | true " + }) + void testFailuresAreRetried(String serviceName, boolean finallyFail) throws Exception { + Config config = mockConfig(serviceName); + + try (ExposureWriterImpl writer = + new ExposureWriterImpl(1 << 4, 100, MILLISECONDS, sharedCommunicationObjects, config)) { + writer.init(); + writer.accept(buildExposure()); + + if (finallyFail) { + MILLISECONDS.sleep(500); // wait for a flush to happen + assertNull(findRequest(serviceName), REQUESTS.toString()); + } else { + eventually(() -> assertNotNull(findRequest(serviceName), REQUESTS.toString()), 5000); + } + } + } + + @Test + void testWriterStopsReceivingExposuresIfEvpProxyIsNotAvailable() throws Exception { + SharedCommunicationObjects sharedCommunicationObjects = sharedCommunicationObjects(false); + + try (ExposureWriterImpl writer = + new ExposureWriterImpl(sharedCommunicationObjects, Config.get())) { + writer.init(); + Thread serializerThread = getField(writer, "serializerThread", Thread.class); + eventually(() -> assertFalse(serializerThread.isAlive()), 5000); + + FeatureFlaggingGateway.dispatch(buildExposure()); + + MessagePassingBlockingQueue queue = + getField(writer, "queue", MessagePassingBlockingQueue.class); + assertEquals(0, queue.size()); + } + } + + private static Config mockConfig(String serviceName) { + return mockConfig(serviceName, "test", "0.0.0"); + } + + private static Config mockConfig(String serviceName, String env, String version) { + Config config = mock(Config.class); + when(config.getIdGenerationStrategy()).thenReturn(IdGenerationStrategy.fromName("RANDOM")); + when(config.getServiceName()).thenReturn(serviceName); + when(config.getEnv()).thenReturn(env); + when(config.getVersion()).thenReturn(version); + return config; + } + + private static SharedCommunicationObjects sharedCommunicationObjects(boolean evpProxyAvailable) { + DDAgentFeaturesDiscovery discovery = mock(DDAgentFeaturesDiscovery.class); + when(discovery.supportsEvpProxy()).thenReturn(evpProxyAvailable); + if (evpProxyAvailable) { + when(discovery.getEvpProxyEndpoint()).thenReturn("/evp_proxy/"); + } + + SharedCommunicationObjects sharedCommunicationObjects = new SharedCommunicationObjects(); + sharedCommunicationObjects.setFeaturesDiscovery(discovery); + sharedCommunicationObjects.agentUrl = HttpUrl.get(server.getAddress()); + sharedCommunicationObjects.agentHttpClient = new OkHttpClient.Builder().build(); + return sharedCommunicationObjects; + } + + private static void assertContext( + Map context, String service, String env, String version) { + assertEquals(service == null ? "unknown" : service, context.get("service")); + assertOptionalContextValue(context, "env", env); + assertOptionalContextValue(context, "version", version); + } + + private static void assertOptionalContextValue( + Map context, String key, String value) { + if (value == null) { + assertFalse(context.containsKey(key)); + } else { + assertEquals(value, context.get(key)); + } + } + + private static ExposuresRequest findRequest(String serviceName) { + for (ExposuresRequest request : REQUESTS) { + if (serviceName.equals(request.context.get("service"))) { + return request; + } + } + return null; + } + + private static List allExposures() { + List exposures = new ArrayList<>(); + for (ExposuresRequest request : REQUESTS) { + exposures.addAll(request.exposures); + } + return exposures; + } + + private static void assertExposures( + List receivedExposures, List expectedExposures) { + assertEquals(expectedExposures.size(), receivedExposures.size()); + TreeSet received = new TreeSet<>(ExposureWriterTests::compare); + received.addAll(receivedExposures); + assertTrue(received.containsAll(expectedExposures)); + } + + private static int compare(ExposureEvent first, ExposureEvent second) { + if (first == second) { + return 0; + } + if (first == null) { + return -1; + } + if (second == null) { + return 1; + } + + int result = Long.compare(first.timestamp, second.timestamp); + if (result != 0) { + return result; + } + + result = compareNullableString(first.flag == null ? null : first.flag.key, second.flag); + if (result != 0) { + return result; + } + + result = + compareNullableString(first.variant == null ? null : first.variant.key, second.variant); + if (result != 0) { + return result; + } + + result = + compareNullableString( + first.allocation == null ? null : first.allocation.key, second.allocation); + if (result != 0) { + return result; + } + + result = compareNullableString(first.subject == null ? null : first.subject.id, second.subject); + if (result != 0) { + return result; + } + + Map.Entry firstEntry = firstEntry(first.subject); + Map.Entry secondEntry = firstEntry(second.subject); + result = + compareNullableString( + firstEntry == null ? null : firstEntry.getKey(), + secondEntry == null ? null : secondEntry.getKey()); + if (result != 0) { + return result; + } + return compareNullableString( + firstEntry == null ? null : String.valueOf(firstEntry.getValue()), + secondEntry == null ? null : String.valueOf(secondEntry.getValue())); + } + + private static int compareNullableString(String first, Flag second) { + return compareNullableString(first, second == null ? null : second.key); + } + + private static int compareNullableString(String first, Variant second) { + return compareNullableString(first, second == null ? null : second.key); + } + + private static int compareNullableString(String first, Allocation second) { + return compareNullableString(first, second == null ? null : second.key); + } + + private static int compareNullableString(String first, Subject second) { + return compareNullableString(first, second == null ? null : second.id); + } + + private static int compareNullableString(String first, String second) { + String firstValue = first == null ? "" : first; + String secondValue = second == null ? "" : second; + return firstValue.compareTo(secondValue); + } + + private static Map.Entry firstEntry(Subject subject) { + if (subject == null || subject.attributes == null) { + return null; + } + Iterator> iterator = subject.attributes.entrySet().iterator(); + return iterator.hasNext() ? iterator.next() : null; + } + + private static List buildExposures(int count) { + List exposures = new ArrayList<>(); + for (int index = 0; index < count; index++) { + exposures.add(buildExposure()); + } + return exposures; + } + + private static ExposureEvent buildExposure() { + String id = UUID.randomUUID().toString(); + return new ExposureEvent( + System.currentTimeMillis(), + new Allocation("Allocation_" + id), + new Flag("Flag_" + id), + new Variant("Variant_" + id), + new Subject("Subject_" + id, singletonMap("key_" + id, (Object) ("value_" + id)))); + } + + private static T getField(Object target, String fieldName, Class fieldType) + throws NoSuchFieldException, IllegalAccessException { + Field field = target.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + return fieldType.cast(field.get(target)); + } + + private static void eventually(ThrowingRunnable assertion, long timeoutMillis) throws Exception { + long deadline = System.nanoTime() + MILLISECONDS.toNanos(timeoutMillis); + AssertionError lastAssertionError = null; + Exception lastException = null; + while (System.nanoTime() <= deadline) { + try { + assertion.run(); + return; + } catch (AssertionError error) { + lastAssertionError = error; + } catch (Exception exception) { + lastException = exception; + } + MILLISECONDS.sleep(20); + } + if (lastAssertionError != null) { + throw lastAssertionError; + } + if (lastException != null) { + throw lastException; + } + throw new AssertionError("condition was not satisfied before timeout"); + } + + @FunctionalInterface + private interface ThrowingRunnable { + void run() throws Exception; + } +} diff --git a/products/feature-flagging/feature-flagging-lib/src/test/java/com/datadog/featureflag/LRUExposureCacheTest.java b/products/feature-flagging/feature-flagging-lib/src/test/java/com/datadog/featureflag/LRUExposureCacheTest.java new file mode 100644 index 00000000000..43c4c2d40f0 --- /dev/null +++ b/products/feature-flagging/feature-flagging-lib/src/test/java/com/datadog/featureflag/LRUExposureCacheTest.java @@ -0,0 +1,244 @@ +package com.datadog.featureflag; + +import static java.util.Collections.emptyMap; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import datadog.trace.api.featureflag.exposure.Allocation; +import datadog.trace.api.featureflag.exposure.ExposureEvent; +import datadog.trace.api.featureflag.exposure.Flag; +import datadog.trace.api.featureflag.exposure.Subject; +import datadog.trace.api.featureflag.exposure.Variant; +import java.util.ArrayList; +import java.util.List; +import org.junit.jupiter.api.Test; + +class LRUExposureCacheTest { + + @Test + void testAddingElements() { + LRUExposureCache cache = new LRUExposureCache(5); + ExposureEvent event = createEvent("flag", "subject", "variant", "allocation"); + + boolean added = cache.add(event); + + assertTrue(added); + assertEquals(1, cache.size()); + } + + @Test + void testAddingDuplicateEventsReturnsFalse() { + LRUExposureCache cache = new LRUExposureCache(5); + ExposureEvent event = createEvent("flag", "subject", "variant", "allocation"); + + cache.add(event); + boolean duplicateAdded = cache.add(event); + + assertFalse(duplicateAdded); + assertEquals(1, cache.size()); + } + + @Test + void testAddingEventsWithSameKeyButDifferentDetailsUpdatesCache() { + LRUExposureCache cache = new LRUExposureCache(5); + ExposureEvent event1 = createEvent("flag", "subject", "variant1", "allocation1"); + ExposureEvent event2 = createEvent("flag", "subject", "variant2", "allocation2"); + ExposureCache.Key key = new ExposureCache.Key(event1); + + boolean added1 = cache.add(event1); + boolean added2 = cache.add(event2); + ExposureCache.Value retrieved = cache.get(key); + + assertTrue(added1); + assertTrue(added2); + assertEquals(1, cache.size()); + assertEquals("variant2", retrieved.variant); + assertEquals("allocation2", retrieved.allocation); + } + + @Test + void testLruEvictionWhenCapacityExceeded() { + LRUExposureCache cache = new LRUExposureCache(2); + ExposureEvent event1 = createEvent("flag1", "subject1", "variant1", "allocation1"); + ExposureEvent event2 = createEvent("flag2", "subject2", "variant2", "allocation2"); + ExposureEvent event3 = createEvent("flag3", "subject3", "variant3", "allocation3"); + ExposureCache.Key key1 = new ExposureCache.Key(event1); + ExposureCache.Key key3 = new ExposureCache.Key(event3); + + cache.add(event1); + cache.add(event2); + cache.add(event3); + + assertEquals(2, cache.size()); + assertNull(cache.get(key1)); // event1 should be evicted + assertNotNull(cache.get(key3)); // event3 should be present + assertEquals("variant3", cache.get(key3).variant); + assertEquals("allocation3", cache.get(key3).allocation); + } + + @Test + void testSingleCapacityCache() { + LRUExposureCache cache = new LRUExposureCache(1); + ExposureEvent event1 = createEvent("flag1", "subject1", "variant1", "allocation1"); + ExposureEvent event2 = createEvent("flag2", "subject2", "variant2", "allocation2"); + + cache.add(event1); + cache.add(event2); + + assertEquals(1, cache.size()); + } + + @Test + void testZeroCapacityCache() { + LRUExposureCache cache = new LRUExposureCache(0); + ExposureEvent event = createEvent("flag", "subject", "variant", "allocation"); + + boolean added = cache.add(event); + + assertTrue(added); + assertEquals(0, cache.size()); + } + + @Test + void testEmptyCacheSize() { + LRUExposureCache cache = new LRUExposureCache(5); + + assertEquals(0, cache.size()); + } + + @Test + void testMultipleAdditionsWithSameFlagDifferentSubjects() { + LRUExposureCache cache = new LRUExposureCache(10); + List events = new ArrayList<>(); + for (int index = 0; index < 5; index++) { + events.add(createEvent("flag", "subject" + index, "variant", "allocation")); + } + + for (ExposureEvent event : events) { + assertTrue(cache.add(event)); + } + + assertEquals(5, cache.size()); + } + + @Test + void testMultipleAdditionsWithSameSubjectDifferentFlags() { + LRUExposureCache cache = new LRUExposureCache(10); + List events = new ArrayList<>(); + for (int index = 0; index < 5; index++) { + events.add(createEvent("flag" + index, "subject", "variant", "allocation")); + } + + for (ExposureEvent event : events) { + assertTrue(cache.add(event)); + } + + assertEquals(5, cache.size()); + } + + @Test + void testKeyEqualityWithNullValues() { + LRUExposureCache cache = new LRUExposureCache(5); + ExposureEvent event1 = + new ExposureEvent( + System.currentTimeMillis(), + new Allocation("allocation"), + new Flag(null), + new Variant("variant"), + new Subject(null, emptyMap())); + ExposureEvent event2 = + new ExposureEvent( + System.currentTimeMillis(), + new Allocation("allocation"), + new Flag(null), + new Variant("variant"), + new Subject(null, emptyMap())); + + cache.add(event1); + boolean duplicateAdded = cache.add(event2); + + assertFalse(duplicateAdded); + assertEquals(1, cache.size()); + } + + @Test + void testUpdatingExistingKeyMaintainsLruPosition() { + LRUExposureCache cache = new LRUExposureCache(3); + ExposureEvent event1 = createEvent("flag1", "subject1", "variant1", "allocation1"); + ExposureEvent event2 = createEvent("flag2", "subject2", "variant2", "allocation2"); + ExposureEvent event3 = createEvent("flag3", "subject3", "variant3", "allocation3"); + ExposureEvent event1Updated = createEvent("flag1", "subject1", "variant2", "allocation2"); + ExposureEvent event4 = createEvent("flag4", "subject4", "variant4", "allocation4"); + ExposureCache.Key key1 = new ExposureCache.Key(event1); + ExposureCache.Key key2 = new ExposureCache.Key(event2); + ExposureCache.Key key4 = new ExposureCache.Key(event4); + + cache.add(event1); + cache.add(event2); + cache.add(event3); + cache.add(event1Updated); // Updates event1, moves to most recent + cache.add(event4); // Should evict event2, not event1 + + assertEquals(3, cache.size()); + assertNotNull(cache.get(key1)); // event1 should be updated and present + assertEquals("variant2", cache.get(key1).variant); // verify it was updated + assertEquals("allocation2", cache.get(key1).allocation); + assertNull(cache.get(key2)); // event2 should be evicted + assertNotNull(cache.get(key4)); // event4 should be present + assertEquals("variant4", cache.get(key4).variant); + } + + @Test + void testDuplicateExposureKeepsSubjectHotInLruOrder() { + LRUExposureCache cache = new LRUExposureCache(3); + ExposureEvent event1 = createEvent("flag1", "subject1", "variant1", "allocation1"); + ExposureEvent event2 = createEvent("flag2", "subject2", "variant2", "allocation2"); + ExposureEvent event3 = createEvent("flag3", "subject3", "variant3", "allocation3"); + // same key + same details as event1: will go through the "duplicate" path + ExposureEvent event1Duplicate = createEvent("flag1", "subject1", "variant1", "allocation1"); + ExposureEvent event4 = createEvent("flag4", "subject4", "variant4", "allocation4"); + + ExposureCache.Key key1 = new ExposureCache.Key(event1); + ExposureCache.Key key2 = new ExposureCache.Key(event2); + ExposureCache.Key key4 = new ExposureCache.Key(event4); + + // Fill cache + boolean added1 = cache.add(event1); + boolean added2 = cache.add(event2); + boolean added3 = cache.add(event3); + + // Duplicate exposure for subject1: should *not* change size, but *should* bump recency + boolean duplicateAdded = cache.add(event1Duplicate); + + // Now push over capacity: the least recently used *non-hot* entry (event2) should be evicted + boolean added4 = cache.add(event4); + + assertTrue(added1); + assertTrue(added2); + assertTrue(added3); + assertFalse(duplicateAdded); // dedup correctly + assertTrue(added4); + + assertEquals(3, cache.size()); + + assertNotNull(cache.get(key1)); // hot subject1 should still be present + assertNull(cache.get(key2)); // subject2 should be evicted + assertNotNull(cache.get(key4)); // newest subject4 should be present + + assertEquals("variant1", cache.get(key1).variant); + assertEquals("allocation1", cache.get(key1).allocation); + } + + private static ExposureEvent createEvent( + String flag, String subject, String variant, String allocation) { + return new ExposureEvent( + System.currentTimeMillis(), + new Allocation(allocation), + new Flag(flag), + new Variant(variant), + new Subject(subject, emptyMap())); + } +}