From bd2577112a286e13826f1093a17677b856c37616 Mon Sep 17 00:00:00 2001 From: Abhijeet Mohanty Date: Fri, 13 Mar 2026 21:23:33 -0400 Subject: [PATCH 1/7] HTTP/2 PING health check + connection max lifetime eviction with jitter --- .../CONNECT_TIMEOUT_TESTING_README.md | 2 +- sdk/cosmos/azure-cosmos-tests/pom.xml | 24 ++ .../Http2ConnectTimeoutBifurcationTests.java | 4 +- .../Http2ConnectionLifecycleTests.java | 294 +++++++++++++++++- .../com/azure/cosmos/rx/TestSuiteBase.java | 2 +- .../manual-http-network-fault-testng.xml | 16 + .../azure/cosmos/implementation/Configs.java | 35 +++ .../http/Http2PingHealthHandler.java | 182 +++++++++++ .../implementation/http/HttpClient.java | 47 +++ .../http/ReactorNettyClient.java | 13 + ...ve-http-network-fault-platform-matrix.json | 17 + 11 files changed, 630 insertions(+), 6 deletions(-) create mode 100644 sdk/cosmos/azure-cosmos-tests/src/test/resources/manual-http-network-fault-testng.xml create mode 100644 sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/Http2PingHealthHandler.java create mode 100644 sdk/cosmos/live-http-network-fault-platform-matrix.json diff --git a/sdk/cosmos/azure-cosmos-tests/CONNECT_TIMEOUT_TESTING_README.md b/sdk/cosmos/azure-cosmos-tests/CONNECT_TIMEOUT_TESTING_README.md index 7895b856b331..26368bffd455 100644 --- a/sdk/cosmos/azure-cosmos-tests/CONNECT_TIMEOUT_TESTING_README.md +++ b/sdk/cosmos/azure-cosmos-tests/CONNECT_TIMEOUT_TESTING_README.md @@ -38,7 +38,7 @@ docker run --rm --cap-add=NET_ADMIN --memory 8g \ java -DCOSMOS.THINCLIENT_ENABLED=true \ -DCOSMOS.THINCLIENT_CONNECTION_TIMEOUT_IN_SECONDS=1 \ -DCOSMOS.HTTP2_ENABLED=true \ - org.testng.TestNG /workspace/azure-cosmos-tests/src/test/resources/manual-thinclient-network-delay-testng.xml \ + org.testng.TestNG /workspace/azure-cosmos-tests/src/test/resources/manual-http-network-fault-testng.xml \ -verbose 2 ' ``` diff --git a/sdk/cosmos/azure-cosmos-tests/pom.xml b/sdk/cosmos/azure-cosmos-tests/pom.xml index 71bf282c4066..0662705a84ca 100644 --- a/sdk/cosmos/azure-cosmos-tests/pom.xml +++ b/sdk/cosmos/azure-cosmos-tests/pom.xml @@ -857,6 +857,30 @@ Licensed under the MIT License. + + manual-http-network-fault + + manual-http-network-fault + + + + + org.apache.maven.plugins + maven-failsafe-plugin + 3.5.3 + + + src/test/resources/manual-http-network-fault-testng.xml + + + true + true + + + + + + fi-thinclient-multi-region diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/Http2ConnectTimeoutBifurcationTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/Http2ConnectTimeoutBifurcationTests.java index e93b4cf56353..765be45b86c8 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/Http2ConnectTimeoutBifurcationTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/Http2ConnectTimeoutBifurcationTests.java @@ -45,7 +45,7 @@ * - Metadata requests → GW V1 endpoint (port 443) → CONNECT_TIMEOUT_MILLIS = 45s (unchanged) * * HOW TO RUN: - * 1. Group "manual-thinclient-network-delay" — NOT included in CI. + * 1. Group "manual-http-network-fault" — NOT included in CI. * 2. Docker container with --cap-add=NET_ADMIN, JDK 21, .m2 mounted. * 3. Tests self-manage iptables rules (add/remove) — no manual intervention. * 4. See CONNECT_TIMEOUT_TESTING_README.md for full setup and run instructions. @@ -61,7 +61,7 @@ public class Http2ConnectTimeoutBifurcationTests extends FaultInjectionTestBase private CosmosAsyncContainer cosmosAsyncContainer; private TestObject seedItem; - private static final String TEST_GROUP = "manual-thinclient-network-delay"; + private static final String TEST_GROUP = "manual-http-network-fault"; private static final long TEST_TIMEOUT = 180_000; @Factory(dataProvider = "clientBuildersWithGatewayAndHttp2") diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/Http2ConnectionLifecycleTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/Http2ConnectionLifecycleTests.java index 9c849a8ce044..32f138020abd 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/Http2ConnectionLifecycleTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/Http2ConnectionLifecycleTests.java @@ -53,7 +53,7 @@ * does NOT close the parent TCP connection. *

* HOW TO RUN: - * 1. Group "manual-thinclient-network-delay" — NOT included in CI. + * 1. Group "manual-http-network-fault" — NOT included in CI. * 2. Docker container with --cap-add=NET_ADMIN, JDK 21, .m2 mounted. * 3. Tests self-manage tc netem (add/remove delay) — no manual intervention. * 4. See NETWORK_DELAY_TESTING_README.md for full setup and run instructions. @@ -70,7 +70,7 @@ public class Http2ConnectionLifecycleTests extends FaultInjectionTestBase { private CosmosAsyncContainer cosmosAsyncContainer; private TestObject seedItem; - private static final String TEST_GROUP = "manual-thinclient-network-delay"; + private static final String TEST_GROUP = "manual-http-network-fault"; // 3 minutes per test — enough for warmup + delay + retries + cross-region failover + recovery read private static final long TEST_TIMEOUT = 180_000; // Hardcode eth0 — Docker always uses eth0. detectNetworkInterface() fails during active delay @@ -119,6 +119,7 @@ public void beforeMethod() { @AfterMethod(groups = {TEST_GROUP}, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) public void afterMethod() { removeNetworkDelay(); + removePacketDrop(); safeClose(this.client); this.client = null; this.cosmosAsyncContainer = null; @@ -128,7 +129,9 @@ public void afterMethod() { @AfterClass(groups = {TEST_GROUP}, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) public void afterClass() { removeNetworkDelay(); + removePacketDrop(); System.clearProperty("COSMOS.THINCLIENT_ENABLED"); + System.clearProperty("COSMOS.HTTP_CONNECTION_MAX_LIFETIME_IN_SECONDS"); } // ======================================================================== @@ -757,4 +760,291 @@ public void parentChannelSurvivesE2ECancelWithoutReadTimeout() throws Exception .as("H2 stream channels are never reused (RFC 9113 §5.1.1) — stream ID should differ from warmup") .isNotEqualTo(warmupStreamChannelId); } + + // ======================================================================== + // Connection Max Lifetime Tests + // ======================================================================== + + /** + * Proves that a connection is rotated after maxLifeTime expires. + * Sets COSMOS.HTTP_CONNECTION_MAX_LIFETIME_IN_SECONDS=15 (short lifetime for testing). + * Establishes a connection, captures parentChannelId, waits for the lifetime + background + * sweep interval to elapse, then performs another read and asserts the parentChannelId changed. + */ + @Test(groups = {TEST_GROUP}, timeOut = TEST_TIMEOUT) + public void connectionRotatedAfterMaxLifetimeExpiry() throws Exception { + System.setProperty("COSMOS.HTTP_CONNECTION_MAX_LIFETIME_IN_SECONDS", "15"); + try { + safeClose(this.client); + this.client = getClientBuilder().buildAsyncClient(); + this.cosmosAsyncContainer = getSharedMultiPartitionCosmosContainerWithIdAsPartitionKey(this.client); + + String initialParentChannelId = establishH2ConnectionAndGetParentChannelId(); + logger.info("Initial parentChannelId: {}", initialParentChannelId); + + long startTime = System.currentTimeMillis(); + long waitMs = 50_000; + String latestParentChannelId = initialParentChannelId; + + while (System.currentTimeMillis() - startTime < waitMs) { + Thread.sleep(5_000); + latestParentChannelId = readAndGetParentChannelId(); + logger.info("Elapsed={}s parentChannelId={} (changed={})", + (System.currentTimeMillis() - startTime) / 1000, + latestParentChannelId, + !latestParentChannelId.equals(initialParentChannelId)); + if (!latestParentChannelId.equals(initialParentChannelId)) { + break; + } + } + + logger.info("RESULT: initial={}, final={}, ROTATED={}", + initialParentChannelId, latestParentChannelId, + !initialParentChannelId.equals(latestParentChannelId)); + assertThat(latestParentChannelId) + .as("After max lifetime (15s + jitter), connection should be rotated to a new parentChannelId") + .isNotEqualTo(initialParentChannelId); + } finally { + System.clearProperty("COSMOS.HTTP_CONNECTION_MAX_LIFETIME_IN_SECONDS"); + } + } + + /** + * Proves that per-connection jitter staggers eviction — not all connections expire at once. + * Creates multiple H2 parent connections via concurrent requests, sets a short maxLifeTime (15s), + * then observes that connections are evicted at different times. + */ + @Test(groups = {TEST_GROUP}, timeOut = TEST_TIMEOUT) + public void perConnectionJitterStaggersEviction() throws Exception { + System.setProperty("COSMOS.HTTP_CONNECTION_MAX_LIFETIME_IN_SECONDS", "15"); + try { + safeClose(this.client); + this.client = getClientBuilder().buildAsyncClient(); + this.cosmosAsyncContainer = getSharedMultiPartitionCosmosContainerWithIdAsPartitionKey(this.client); + + int concurrentRequests = 100; + Set initialParentChannelIds = ConcurrentHashMap.newKeySet(); + + for (int wave = 0; wave < 3; wave++) { + Flux.range(0, concurrentRequests) + .flatMap(i -> this.cosmosAsyncContainer.readItem( + seedItem.getId(), new PartitionKey(seedItem.getId()), TestObject.class) + .doOnSuccess(response -> { + try { + String parentId = extractParentChannelId(response.getDiagnostics()); + if (parentId != null) { + initialParentChannelIds.add(parentId); + } + } catch (Exception e) { + logger.warn("Failed to extract parentChannelId", e); + } + }), concurrentRequests) + .collectList() + .block(); + if (initialParentChannelIds.size() > 1) { + break; + } + } + + logger.info("Initial parent channels: {} (count={})", initialParentChannelIds, initialParentChannelIds.size()); + assertThat(initialParentChannelIds) + .as("Concurrent reads should create multiple parent H2 channels") + .hasSizeGreaterThan(1); + + Thread.sleep(20_000); + + Set midpointParentChannelIds = ConcurrentHashMap.newKeySet(); + Flux.range(0, concurrentRequests) + .flatMap(i -> this.cosmosAsyncContainer.readItem( + seedItem.getId(), new PartitionKey(seedItem.getId()), TestObject.class) + .doOnSuccess(response -> { + try { + String parentId = extractParentChannelId(response.getDiagnostics()); + if (parentId != null) { + midpointParentChannelIds.add(parentId); + } + } catch (Exception e) { + logger.warn("Failed to extract parentChannelId", e); + } + }), concurrentRequests) + .collectList() + .block(); + + Set survivedChannels = new HashSet<>(initialParentChannelIds); + survivedChannels.retainAll(midpointParentChannelIds); + Set newChannels = new HashSet<>(midpointParentChannelIds); + newChannels.removeAll(initialParentChannelIds); + + logger.info("RESULT: initial={} (count={}), midpoint={} (count={}), survived={}, new={}", + initialParentChannelIds, initialParentChannelIds.size(), + midpointParentChannelIds, midpointParentChannelIds.size(), + survivedChannels, newChannels); + + assertThat(midpointParentChannelIds) + .as("Pool should still be functional at midpoint") + .isNotEmpty(); + } finally { + System.clearProperty("COSMOS.HTTP_CONNECTION_MAX_LIFETIME_IN_SECONDS"); + } + } + + /** + * Proves that when a connection is silently degraded (packets dropped, no TCP RST), + * the PING health check detects the degradation (no ACK received within timeout), + * the eviction predicate evicts the connection, and the next request succeeds on a new connection. + * + * Configuration: + * - Max lifetime = 600s (intentionally HIGH — we don't want lifetime to trigger eviction) + * - PING interval = 3s (send probes frequently) + * - PING ACK timeout = 10s (short — evict quickly when ACKs stop arriving) + * - Blackhole duration = 25s (PING ACK timeout 10s + background sweep 5s + margin) + */ + @Test(groups = {TEST_GROUP}, timeOut = TEST_TIMEOUT) + public void degradedConnectionEvictedByPingHealthCheck() throws Exception { + // High max lifetime so it can't trigger eviction — only PING staleness should evict + System.setProperty("COSMOS.HTTP_CONNECTION_MAX_LIFETIME_IN_SECONDS", "600"); + System.setProperty("COSMOS.HTTP2_PING_INTERVAL_IN_SECONDS", "3"); + System.setProperty("COSMOS.HTTP2_PING_ACK_TIMEOUT_IN_SECONDS", "10"); + System.setProperty("COSMOS.HTTP2_ENABLED", "true"); + try { + safeClose(this.client); + this.client = getClientBuilder().buildAsyncClient(); + this.cosmosAsyncContainer = getSharedMultiPartitionCosmosContainerWithIdAsPartitionKey(this.client); + + String initialParentChannelId = establishH2ConnectionAndGetParentChannelId(); + logger.info("Initial parentChannelId: {}", initialParentChannelId); + + // Diagnostic: check if PING handler installed on parent channel + // Use reflection or diagnostics to verify H2 config + logger.info("PING_DIAG: HTTP2_ENABLED={}, PING_INTERVAL={}, PING_ACK_TIMEOUT={}", + System.getProperty("COSMOS.HTTP2_ENABLED"), + System.getProperty("COSMOS.HTTP2_PING_INTERVAL_IN_SECONDS"), + System.getProperty("COSMOS.HTTP2_PING_ACK_TIMEOUT_IN_SECONDS")); + + // Blackhole traffic — PINGs sent but no ACKs return + addPacketDrop(); + logger.info("Waiting 25s for PING ACK timeout (10s) + background sweep (5s) + margin..."); + Thread.sleep(25_000); + removePacketDrop(); + Thread.sleep(2_000); + + CosmosEndToEndOperationLatencyPolicyConfig e2ePolicy = + new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(30)).build(); + CosmosItemRequestOptions opts = new CosmosItemRequestOptions(); + opts.setCosmosEndToEndOperationLatencyPolicyConfig(e2ePolicy); + + CosmosItemResponse response = this.cosmosAsyncContainer.readItem( + seedItem.getId(), new PartitionKey(seedItem.getId()), opts, TestObject.class).block(); + + assertThat(response).as("Recovery read must succeed").isNotNull(); + assertThat(response.getStatusCode()).as("Recovery read status code").isEqualTo(200); + + String recoveryParentChannelId = extractParentChannelId(response.getDiagnostics()); + logger.info("RESULT: initial={}, recovery={}, ROTATED={}", + initialParentChannelId, recoveryParentChannelId, + !initialParentChannelId.equals(recoveryParentChannelId)); + + assertThat(recoveryParentChannelId) + .as("Recovery read must use a new parentChannelId — degraded connection evicted by PING health check") + .isNotNull() + .isNotEmpty() + .isNotEqualTo(initialParentChannelId); + } finally { + removePacketDrop(); + System.clearProperty("COSMOS.HTTP_CONNECTION_MAX_LIFETIME_IN_SECONDS"); + System.clearProperty("COSMOS.HTTP2_PING_INTERVAL_IN_SECONDS"); + System.clearProperty("COSMOS.HTTP2_PING_ACK_TIMEOUT_IN_SECONDS"); + System.clearProperty("COSMOS.HTTP2_ENABLED"); + } + } + + /** + * Proves that when a connection exceeds its jittered max lifetime AND the network is healthy + * (PING ACKs are still arriving), the max lifetime eviction still triggers. + * This is the safety-net — connections shouldn't live forever even if PINGs succeed. + */ + @Test(groups = {TEST_GROUP}, timeOut = TEST_TIMEOUT) + public void connectionEvictedAfterMaxLifetimeEvenWithHealthyPings() throws Exception { + System.setProperty("COSMOS.HTTP_CONNECTION_MAX_LIFETIME_IN_SECONDS", "15"); + System.setProperty("COSMOS.HTTP2_PING_INTERVAL_IN_SECONDS", "3"); + System.setProperty("COSMOS.HTTP2_PING_ACK_TIMEOUT_IN_SECONDS", "60"); + try { + safeClose(this.client); + this.client = getClientBuilder().buildAsyncClient(); + this.cosmosAsyncContainer = getSharedMultiPartitionCosmosContainerWithIdAsPartitionKey(this.client); + + String initialParentChannelId = establishH2ConnectionAndGetParentChannelId(); + logger.info("Initial parentChannelId: {}", initialParentChannelId); + + // No blackhole — PINGs succeed. Just wait for max lifetime (15s + jitter + sweep margin) + logger.info("Waiting 50s for max lifetime (15s) + jitter (up to 30s) + background sweep..."); + Thread.sleep(50_000); + + CosmosEndToEndOperationLatencyPolicyConfig e2ePolicy = + new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(30)).build(); + CosmosItemRequestOptions opts = new CosmosItemRequestOptions(); + opts.setCosmosEndToEndOperationLatencyPolicyConfig(e2ePolicy); + + CosmosItemResponse response = this.cosmosAsyncContainer.readItem( + seedItem.getId(), new PartitionKey(seedItem.getId()), opts, TestObject.class).block(); + + assertThat(response).as("Recovery read must succeed").isNotNull(); + assertThat(response.getStatusCode()).as("Recovery read status code").isEqualTo(200); + + String recoveryParentChannelId = extractParentChannelId(response.getDiagnostics()); + logger.info("RESULT: initial={}, recovery={}, ROTATED={}", + initialParentChannelId, recoveryParentChannelId, + !initialParentChannelId.equals(recoveryParentChannelId)); + + assertThat(recoveryParentChannelId) + .as("Recovery read must use a new parentChannelId — max lifetime eviction still works with healthy PINGs") + .isNotNull() + .isNotEmpty() + .isNotEqualTo(initialParentChannelId); + } finally { + System.clearProperty("COSMOS.HTTP_CONNECTION_MAX_LIFETIME_IN_SECONDS"); + System.clearProperty("COSMOS.HTTP2_PING_INTERVAL_IN_SECONDS"); + System.clearProperty("COSMOS.HTTP2_PING_ACK_TIMEOUT_IN_SECONDS"); + } + } + + // ======================================================================== + // iptables helpers for silent degradation (packet drop, no RST) + // ======================================================================== + + private void addPacketDrop() { + String cmd = "iptables -A OUTPUT -p tcp --dport 10250 -j DROP"; + logger.info(">>> Adding packet drop: {}", cmd); + try { + Process p = Runtime.getRuntime().exec(new String[]{"sh", "-c", cmd}); + int exit = p.waitFor(); + if (exit != 0) { + try (BufferedReader err = new BufferedReader(new InputStreamReader(p.getErrorStream()))) { + String errMsg = err.readLine(); + logger.warn("iptables add failed (exit={}): {}", exit, errMsg); + } + } else { + logger.info(">>> Packet drop active on port 10250"); + } + } catch (Exception e) { + logger.error("Failed to add packet drop", e); + fail("Could not add packet drop via iptables: " + e.getMessage()); + } + } + + private void removePacketDrop() { + String cmd = "iptables -D OUTPUT -p tcp --dport 10250 -j DROP"; + logger.info(">>> Removing packet drop: {}", cmd); + try { + Process p = Runtime.getRuntime().exec(new String[]{"sh", "-c", cmd}); + int exit = p.waitFor(); + if (exit == 0) { + logger.info(">>> Packet drop removed"); + } else { + logger.warn("iptables del returned exit={} (may already be removed)", exit); + } + } catch (Exception e) { + logger.warn("Failed to remove packet drop: {}", e.getMessage()); + } + } } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java index bcdd41b615fb..0437c42348b5 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java @@ -297,7 +297,7 @@ public CosmosAsyncDatabase getDatabase(String id) { @BeforeSuite(groups = {"thinclient", "fast", "long", "direct", "multi-region", "multi-master", "flaky-multi-master", "emulator", "emulator-vnext", "split", "query", "cfp-split", "circuit-breaker-misc-gateway", "circuit-breaker-misc-direct", - "circuit-breaker-read-all-read-many", "fi-multi-master", "long-emulator", "fi-thinclient-multi-region", "fi-thinclient-multi-master", "multi-region-strong"}, timeOut = SUITE_SETUP_TIMEOUT) + "circuit-breaker-read-all-read-many", "fi-multi-master", "long-emulator", "fi-thinclient-multi-region", "fi-thinclient-multi-master", "multi-region-strong", "manual-http-network-fault"}, timeOut = SUITE_SETUP_TIMEOUT) public void beforeSuite() { logger.info("beforeSuite Started"); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/resources/manual-http-network-fault-testng.xml b/sdk/cosmos/azure-cosmos-tests/src/test/resources/manual-http-network-fault-testng.xml new file mode 100644 index 000000000000..b6127237e422 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/resources/manual-http-network-fault-testng.xml @@ -0,0 +1,16 @@ + + + + + + + + + + + + + + + + diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java index d8380bd94fff..4b8023d7f2f6 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java @@ -141,6 +141,23 @@ public class Configs { private static final Duration MAX_IDLE_CONNECTION_TIMEOUT = Duration.ofSeconds(60); private static final Duration CONNECTION_ACQUIRE_TIMEOUT = Duration.ofSeconds(45); private static final String REACTOR_NETTY_CONNECTION_POOL_NAME = "reactor-netty-connection-pool"; + + // HTTP connection max lifetime — forces periodic connection rotation for DNS re-resolution and load redistribution. + private static final int DEFAULT_HTTP_CONNECTION_MAX_LIFETIME_IN_SECONDS = 300; // 5 minutes + private static final String HTTP_CONNECTION_MAX_LIFETIME_IN_SECONDS = "COSMOS.HTTP_CONNECTION_MAX_LIFETIME_IN_SECONDS"; + private static final String HTTP_CONNECTION_MAX_LIFETIME_IN_SECONDS_VARIABLE = "COSMOS_HTTP_CONNECTION_MAX_LIFETIME_IN_SECONDS"; + public static final int HTTP_CONNECTION_MAX_LIFETIME_JITTER_IN_SECONDS = 30; + + // HTTP/2 PING health check — detects silently degraded connections (packet black-hole, half-open TCP). + // Interval: how often to send PING frames on each parent H2 connection. + // Timeout: if no PING ACK is received within this duration, the connection is considered unhealthy. + private static final int DEFAULT_HTTP2_PING_INTERVAL_IN_SECONDS = 10; + private static final String HTTP2_PING_INTERVAL_IN_SECONDS = "COSMOS.HTTP2_PING_INTERVAL_IN_SECONDS"; + private static final String HTTP2_PING_INTERVAL_IN_SECONDS_VARIABLE = "COSMOS_HTTP2_PING_INTERVAL_IN_SECONDS"; + private static final int DEFAULT_HTTP2_PING_ACK_TIMEOUT_IN_SECONDS = 30; + private static final String HTTP2_PING_ACK_TIMEOUT_IN_SECONDS = "COSMOS.HTTP2_PING_ACK_TIMEOUT_IN_SECONDS"; + private static final String HTTP2_PING_ACK_TIMEOUT_IN_SECONDS_VARIABLE = "COSMOS_HTTP2_PING_ACK_TIMEOUT_IN_SECONDS"; + private static final int DEFAULT_HTTP_RESPONSE_TIMEOUT_IN_SECONDS = 60; private static final int DEFAULT_QUERY_PLAN_RESPONSE_TIMEOUT_IN_SECONDS = 5; private static final int DEFAULT_ADDRESS_REFRESH_RESPONSE_TIMEOUT_IN_SECONDS = 5; @@ -639,6 +656,24 @@ public static int getHttpResponseTimeoutInSeconds() { return getJVMConfigAsInt(HTTP_RESPONSE_TIMEOUT_IN_SECONDS, DEFAULT_HTTP_RESPONSE_TIMEOUT_IN_SECONDS); } + public static int getHttpConnectionMaxLifetimeInSeconds() { + return getJVMConfigAsInt( + HTTP_CONNECTION_MAX_LIFETIME_IN_SECONDS, + DEFAULT_HTTP_CONNECTION_MAX_LIFETIME_IN_SECONDS); + } + + public static int getHttp2PingIntervalInSeconds() { + return getJVMConfigAsInt( + HTTP2_PING_INTERVAL_IN_SECONDS, + DEFAULT_HTTP2_PING_INTERVAL_IN_SECONDS); + } + + public static int getHttp2PingAckTimeoutInSeconds() { + return getJVMConfigAsInt( + HTTP2_PING_ACK_TIMEOUT_IN_SECONDS, + DEFAULT_HTTP2_PING_ACK_TIMEOUT_IN_SECONDS); + } + public static int getQueryPlanResponseTimeoutInSeconds() { return getJVMConfigAsInt(QUERY_PLAN_RESPONSE_TIMEOUT_IN_SECONDS, DEFAULT_QUERY_PLAN_RESPONSE_TIMEOUT_IN_SECONDS); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/Http2PingHealthHandler.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/Http2PingHealthHandler.java new file mode 100644 index 000000000000..06ab1b663b74 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/Http2PingHealthHandler.java @@ -0,0 +1,182 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.implementation.http; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http2.DefaultHttp2PingFrame; +import io.netty.handler.codec.http2.Http2PingFrame; +import io.netty.util.AttributeKey; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * HTTP/2 PING-based health checker for parent HTTP/2 connections. + *

+ * Installed on the parent TCP channel (not child H2 streams). Periodically sends + * HTTP/2 PING frames and tracks last ACK timestamp. The connection pool's + * eviction predicate reads {@link #LAST_PING_ACK_NANOS} to determine liveness. + *

+ * Lifecycle: one instance per parent H2 connection. The handler is guarded by + * {@link #HANDLER_INSTALLED} to prevent duplicate installation when multiple + * child streams are opened on the same parent. + */ +public class Http2PingHealthHandler extends ChannelDuplexHandler { + + private static final Logger logger = LoggerFactory.getLogger(Http2PingHealthHandler.class); + + /** + * Nano timestamp of the last PING ACK received on this parent channel. + * Updated ONLY when an Http2PingFrame with ack=true arrives — NOT on arbitrary reads. + * Http2FrameCodec propagates PING ACK frames to downstream handlers (addLast position). + * When the network is blackholed, no PING ACKs arrive, this goes stale, eviction triggers. + */ + public static final AttributeKey LAST_PING_ACK_NANOS = + AttributeKey.valueOf("cosmos.h2.lastPingAckNanos"); + + /** + * Guard attribute to prevent duplicate handler installation on the same parent channel. + */ + static final AttributeKey HANDLER_INSTALLED = + AttributeKey.valueOf("cosmos.h2.pingHealthInstalled"); + + static final String HANDLER_NAME = "cosmos.h2PingHealth"; + + private final long pingIntervalMs; + private final long pingContent; + private ScheduledFuture pingTask; + private final AtomicBoolean closed = new AtomicBoolean(false); + + /** + * @param pingIntervalMs interval between PING frames in milliseconds + */ + public Http2PingHealthHandler(long pingIntervalMs) { + this.pingIntervalMs = pingIntervalMs; + // Fixed PING payload — readable as "cosmos" in hex + this.pingContent = 0xC0_5D_B0_01L; + } + + @Override + public void handlerAdded(ChannelHandlerContext ctx) { + // Seed the last-ack timestamp so the eviction predicate doesn't immediately + // consider a brand-new connection as "PING-stale" + ctx.channel().attr(LAST_PING_ACK_NANOS).set(System.nanoTime()); + + // The handler is installed from a child stream's doOnConnected, so the parent + // channel is already active — channelActive() won't fire. Start the schedule now. + if (ctx.channel().isActive()) { + schedulePing(ctx); + } + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + // Fallback: if handler is added before the channel becomes active (unlikely + // with the current parent-install pattern, but correct for completeness) + schedulePing(ctx); + super.channelActive(ctx); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + cancelPing(); + super.channelInactive(ctx); + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) { + cancelPing(); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + logger.info("channelRead on parent {}: {}", ctx.channel().id().asShortText(), msg.getClass().getSimpleName()); + if (msg instanceof Http2PingFrame) { + Http2PingFrame pingFrame = (Http2PingFrame) msg; + if (pingFrame.ack()) { + ctx.channel().attr(LAST_PING_ACK_NANOS).set(System.nanoTime()); + logger.info("HTTP/2 PING ACK on channel {}", ctx.channel().id().asShortText()); + } + } + // Always propagate — don't consume frames + super.channelRead(ctx, msg); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + logger.warn("Http2PingHealthHandler error on channel {}: {}", + ctx.channel().id().asShortText(), cause.getMessage()); + ctx.fireExceptionCaught(cause); + } + + private void schedulePing(ChannelHandlerContext ctx) { + if (closed.get() || !ctx.channel().isActive() || this.pingTask != null) { + return; + } + // Use the channel's event loop to avoid threading issues + this.pingTask = ctx.channel().eventLoop().scheduleAtFixedRate( + () -> sendPing(ctx), + pingIntervalMs, + pingIntervalMs, + TimeUnit.MILLISECONDS + ); + } + + private void sendPing(ChannelHandlerContext ctx) { + if (!ctx.channel().isActive()) { + cancelPing(); + return; + } + DefaultHttp2PingFrame pingFrame = new DefaultHttp2PingFrame(pingContent, false); + // Use channel().writeAndFlush() — NOT ctx.writeAndFlush(). + // Our handler is at addLast (after Http2FrameCodec). ctx.writeAndFlush() sends outbound + // from our position toward the network, BYPASSING the codec (frames aren't encoded). + // channel().writeAndFlush() starts from the pipeline tail, going through ALL handlers + // including Http2FrameCodec which encodes the PingFrame to HTTP/2 binary wire format. + ctx.channel().writeAndFlush(pingFrame).addListener(future -> { + if (!future.isSuccess()) { + logger.debug("HTTP/2 PING send failed on channel {}: {}", + ctx.channel().id().asShortText(), + future.cause() != null ? future.cause().getMessage() : "unknown"); + } else if (logger.isTraceEnabled()) { + logger.trace("HTTP/2 PING sent on channel {}", ctx.channel().id().asShortText()); + } + }); + } + + private void cancelPing() { + if (closed.compareAndSet(false, true) && this.pingTask != null) { + this.pingTask.cancel(false); + } + } + + /** + * Installs this handler on the parent H2 channel if not already installed. + * Safe to call from any child stream's doOnConnected callback — will navigate + * to the parent channel and install exactly once. + * + * @param childChannel the child H2 stream channel from doOnConnected + * @param pingIntervalMs PING interval in milliseconds + */ + public static void installOnParentIfAbsent(Channel channel, long pingIntervalMs) { + // In reactor-netty H2 mode, doOnConnected fires for the PARENT TCP channel + // (not child streams). channel.parent() is null because we're already on the parent. + // For child streams (if they ever fire), navigate to parent. + Channel targetChannel = channel.parent() != null ? channel.parent() : channel; + + if (Boolean.TRUE.equals(targetChannel.attr(HANDLER_INSTALLED).get())) { + return; // already installed + } + + targetChannel.attr(HANDLER_INSTALLED).set(Boolean.TRUE); + targetChannel.pipeline().addLast(HANDLER_NAME, new Http2PingHealthHandler(pingIntervalMs)); + + logger.info("Installed Http2PingHealthHandler on channel {} with {}ms interval. Pipeline: {}", + targetChannel.id().asShortText(), pingIntervalMs, targetChannel.pipeline().names()); + } +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpClient.java index 4f1830908623..f4b6f78647c8 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpClient.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpClient.java @@ -5,11 +5,13 @@ import com.azure.cosmos.Http2ConnectionConfig; import com.azure.cosmos.implementation.Configs; import com.azure.cosmos.implementation.ImplementationBridgeHelpers; +import io.netty.channel.Channel; import reactor.core.publisher.Mono; import reactor.netty.http.client.Http2AllocationStrategy; import reactor.netty.resources.ConnectionProvider; import java.time.Duration; +import java.util.concurrent.ThreadLocalRandom; /** * A generic interface for sending HTTP requests and getting responses. @@ -53,6 +55,51 @@ static HttpClient createFixed(HttpClientConfig httpClientConfig) { } fixedConnectionProviderBuilder.pendingAcquireTimeout(httpClientConfig.getConnectionAcquireTimeout()); fixedConnectionProviderBuilder.maxIdleTime(httpClientConfig.getMaxIdleConnectionTimeout()); + + int maxLifetimeSeconds = Configs.getHttpConnectionMaxLifetimeInSeconds(); + int pingAckTimeoutSeconds = Configs.getHttp2PingAckTimeoutInSeconds(); + if (maxLifetimeSeconds > 0 || pingAckTimeoutSeconds > 0) { + long baseMaxLifeMs = maxLifetimeSeconds > 0 ? maxLifetimeSeconds * 1000L : 0; + int jitterRangeSeconds = Configs.HTTP_CONNECTION_MAX_LIFETIME_JITTER_IN_SECONDS; // [1, jitterRange] seconds + long maxIdleTimeMs = httpClientConfig.getMaxIdleConnectionTimeout().toMillis(); + long pingAckTimeoutNanos = pingAckTimeoutSeconds > 0 ? pingAckTimeoutSeconds * 1_000_000_000L : 0; + + fixedConnectionProviderBuilder.evictionPredicate((connection, metadata) -> { + // Phase 0: Dead channel — always evict + if (!connection.channel().isActive()) { + return true; + } + + // Phase 1: Idle timeout — unchanged from default behavior + if (maxIdleTimeMs > 0 && metadata.idleTime() > maxIdleTimeMs) { + return true; + } + + // Phase 2: PING liveness — if PING ACK is stale, connection is silently degraded + if (pingAckTimeoutNanos > 0) { + Channel parentChannel = connection.channel(); + if (parentChannel.hasAttr(Http2PingHealthHandler.LAST_PING_ACK_NANOS)) { + long lastAckNanos = parentChannel.attr(Http2PingHealthHandler.LAST_PING_ACK_NANOS).get(); + if (System.nanoTime() - lastAckNanos > pingAckTimeoutNanos) { + return true; + } + } + } + + // Phase 3: Lifetime with jitter — connection exceeded its jittered max lifetime + // Jitter is per-evaluation with 1s granularity in [1s, jitterRange]. ThreadLocalRandom + // is lock-free and safe for concurrent eviction sweeps across event loops. + if (baseMaxLifeMs > 0) { + int connJitterMs = ThreadLocalRandom.current().nextInt(1, jitterRangeSeconds + 1) * 1000; + return metadata.lifeTime() > (baseMaxLifeMs + connJitterMs); + } + + return false; + }); + + fixedConnectionProviderBuilder.evictInBackground(Duration.ofSeconds(5)); + } + if (Configs.isNettyHttpClientMetricsEnabled()) { fixedConnectionProviderBuilder.metrics(true); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ReactorNettyClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ReactorNettyClient.java index 3e7f763caeb8..d99cfad14826 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ReactorNettyClient.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ReactorNettyClient.java @@ -164,6 +164,19 @@ private void configureChannelPipelineHandlers() { "customHeaderCleaner", new Http2ResponseHeaderCleanerHandler()); } + + // Install HTTP/2 PING health checker on the parent TCP channel. + // doOnConnected fires for each child H2 stream; installOnParentIfAbsent + // ensures the handler is added exactly once per parent connection. + int pingIntervalSeconds = Configs.getHttp2PingIntervalInSeconds(); + logger.info("doOnConnected: channel={}, parent={}, pingInterval={}s", + connection.channel().id().asShortText(), + connection.channel().parent() != null ? connection.channel().parent().id().asShortText() : "null", + pingIntervalSeconds); + if (pingIntervalSeconds > 0) { + Http2PingHealthHandler.installOnParentIfAbsent( + connection.channel(), pingIntervalSeconds * 1000L); + } })); } } diff --git a/sdk/cosmos/live-http-network-fault-platform-matrix.json b/sdk/cosmos/live-http-network-fault-platform-matrix.json new file mode 100644 index 000000000000..1a78e86a1c33 --- /dev/null +++ b/sdk/cosmos/live-http-network-fault-platform-matrix.json @@ -0,0 +1,17 @@ +{ + "displayNames": { + "-Pmanual-http-network-fault": "HttpNetworkFault", + "Session": "", + "ubuntu": "" + }, + "include": [ + { + "DESIRED_CONSISTENCIES": "[\"Session\"]", + "ACCOUNT_CONSISTENCY": "Session", + "ProfileFlag": [ "-Pmanual-http-network-fault" ], + "Agent": { + "ubuntu": { "OSVmImage": "env:LINUXVMIMAGE", "Pool": "env:LINUXPOOL" } + } + } + ] +} From 5717ea66e2fcbfea33bf4d28b6c78000f5aec598 Mon Sep 17 00:00:00 2001 From: Abhijeet Mohanty Date: Fri, 13 Mar 2026 21:31:42 -0400 Subject: [PATCH 2/7] CI pipeline + README updates for HTTP network fault tests --- .../NETWORK_DELAY_TESTING_README.md | 2 +- sdk/cosmos/tests.yml | 35 +++++++++++++++++++ 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/sdk/cosmos/azure-cosmos-tests/NETWORK_DELAY_TESTING_README.md b/sdk/cosmos/azure-cosmos-tests/NETWORK_DELAY_TESTING_README.md index c304c6e06657..798e4df3954a 100644 --- a/sdk/cosmos/azure-cosmos-tests/NETWORK_DELAY_TESTING_README.md +++ b/sdk/cosmos/azure-cosmos-tests/NETWORK_DELAY_TESTING_README.md @@ -76,7 +76,7 @@ docker run --rm --cap-add=NET_ADMIN --memory 8g \ -DACCOUNT_KEY=$ACCOUNT_KEY \ -DCOSMOS.THINCLIENT_ENABLED=true \ -DCOSMOS.HTTP2_ENABLED=true \ - org.testng.TestNG /workspace/azure-cosmos-tests/src/test/resources/manual-thinclient-network-delay-testng.xml \ + org.testng.TestNG /workspace/azure-cosmos-tests/src/test/resources/manual-http-network-fault-testng.xml \ -verbose 2 ' ``` diff --git a/sdk/cosmos/tests.yml b/sdk/cosmos/tests.yml index 69d782fcc9a0..73a67a596a27 100644 --- a/sdk/cosmos/tests.yml +++ b/sdk/cosmos/tests.yml @@ -163,6 +163,41 @@ extends: - name: AdditionalArgs value: '-DCOSMOS.CLIENT_LEAK_DETECTION_ENABLED=true -DACCOUNT_HOST=$(thin-client-canary-multi-writer-session-endpoint) -DACCOUNT_KEY=$(thin-client-canary-multi-writer-session-key) -DCOSMOS.THINCLIENT_ENABLED=true' + # Network fault injection tests (tc netem, iptables) — requires Linux VM with NET_ADMIN. + # Tests run sequentially (MaxParallel: 1) to avoid tc/iptables interference between tests. + # No Docker needed — tc and iptables are native on the Linux CI VM. + - template: /eng/pipelines/templates/stages/archetype-sdk-tests-isolated.yml + parameters: + TestName: 'Cosmos_Live_Test_HttpNetworkFault' + CloudConfig: + Public: + ServiceConnection: azure-sdk-tests-cosmos + MatrixConfigs: + - Name: Cosmos_live_test_http_network_fault + Path: sdk/cosmos/live-http-network-fault-platform-matrix.json + Selection: all + GenerateVMJobs: true + MatrixReplace: + - .*Version=1.2(1|5)/1.17 + ServiceDirectory: cosmos + Artifacts: + - name: azure-cosmos + groupId: com.azure + safeName: azurecosmos + AdditionalModules: + - name: azure-cosmos-tests + groupId: com.azure + - name: azure-cosmos-benchmark + groupId: com.azure + TimeoutInMinutes: 30 + MaxParallel: 1 + TestGoals: 'verify' + TestOptions: '$(ProfileFlag) $(AdditionalArgs) -DskipCompile=true -DskipTestCompile=true -DcreateSourcesJar=false' + TestResultsFiles: '**/junitreports/TEST-*.xml' + AdditionalVariables: + - name: AdditionalArgs + value: '-DCOSMOS.CLIENT_LEAK_DETECTION_ENABLED=true -DACCOUNT_HOST=$(thinclient-test-endpoint) -DACCOUNT_KEY=$(thinclient-test-key) -DCOSMOS.THINCLIENT_ENABLED=true -DCOSMOS.HTTP2_ENABLED=true' + - template: /eng/pipelines/templates/stages/archetype-sdk-tests-isolated.yml parameters: TestName: 'Spring_Data_Cosmos_Integration' From d0398143089e04624879eac1bd9b7c6d47621836 Mon Sep 17 00:00:00 2001 From: Abhijeet Mohanty Date: Sat, 14 Mar 2026 09:03:56 -0400 Subject: [PATCH 3/7] Adding HTTP/2 ping and HTTP connection lifecycle capabilities. --- .../Http2ConnectionLifecycleTests.java | 7 ------- .../com/azure/cosmos/implementation/Configs.java | 3 --- .../implementation/http/Http2PingHealthHandler.java | 12 ++++++++---- .../implementation/http/ReactorNettyClient.java | 6 +----- .../live-http-network-fault-platform-matrix.json | 1 + 5 files changed, 10 insertions(+), 19 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/Http2ConnectionLifecycleTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/Http2ConnectionLifecycleTests.java index 32f138020abd..f30acb6df39b 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/Http2ConnectionLifecycleTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/Http2ConnectionLifecycleTests.java @@ -914,13 +914,6 @@ public void degradedConnectionEvictedByPingHealthCheck() throws Exception { String initialParentChannelId = establishH2ConnectionAndGetParentChannelId(); logger.info("Initial parentChannelId: {}", initialParentChannelId); - // Diagnostic: check if PING handler installed on parent channel - // Use reflection or diagnostics to verify H2 config - logger.info("PING_DIAG: HTTP2_ENABLED={}, PING_INTERVAL={}, PING_ACK_TIMEOUT={}", - System.getProperty("COSMOS.HTTP2_ENABLED"), - System.getProperty("COSMOS.HTTP2_PING_INTERVAL_IN_SECONDS"), - System.getProperty("COSMOS.HTTP2_PING_ACK_TIMEOUT_IN_SECONDS")); - // Blackhole traffic — PINGs sent but no ACKs return addPacketDrop(); logger.info("Waiting 25s for PING ACK timeout (10s) + background sweep (5s) + margin..."); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java index 4b8023d7f2f6..2d14f40382b9 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java @@ -145,7 +145,6 @@ public class Configs { // HTTP connection max lifetime — forces periodic connection rotation for DNS re-resolution and load redistribution. private static final int DEFAULT_HTTP_CONNECTION_MAX_LIFETIME_IN_SECONDS = 300; // 5 minutes private static final String HTTP_CONNECTION_MAX_LIFETIME_IN_SECONDS = "COSMOS.HTTP_CONNECTION_MAX_LIFETIME_IN_SECONDS"; - private static final String HTTP_CONNECTION_MAX_LIFETIME_IN_SECONDS_VARIABLE = "COSMOS_HTTP_CONNECTION_MAX_LIFETIME_IN_SECONDS"; public static final int HTTP_CONNECTION_MAX_LIFETIME_JITTER_IN_SECONDS = 30; // HTTP/2 PING health check — detects silently degraded connections (packet black-hole, half-open TCP). @@ -153,10 +152,8 @@ public class Configs { // Timeout: if no PING ACK is received within this duration, the connection is considered unhealthy. private static final int DEFAULT_HTTP2_PING_INTERVAL_IN_SECONDS = 10; private static final String HTTP2_PING_INTERVAL_IN_SECONDS = "COSMOS.HTTP2_PING_INTERVAL_IN_SECONDS"; - private static final String HTTP2_PING_INTERVAL_IN_SECONDS_VARIABLE = "COSMOS_HTTP2_PING_INTERVAL_IN_SECONDS"; private static final int DEFAULT_HTTP2_PING_ACK_TIMEOUT_IN_SECONDS = 30; private static final String HTTP2_PING_ACK_TIMEOUT_IN_SECONDS = "COSMOS.HTTP2_PING_ACK_TIMEOUT_IN_SECONDS"; - private static final String HTTP2_PING_ACK_TIMEOUT_IN_SECONDS_VARIABLE = "COSMOS_HTTP2_PING_ACK_TIMEOUT_IN_SECONDS"; private static final int DEFAULT_HTTP_RESPONSE_TIMEOUT_IN_SECONDS = 60; private static final int DEFAULT_QUERY_PLAN_RESPONSE_TIMEOUT_IN_SECONDS = 5; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/Http2PingHealthHandler.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/Http2PingHealthHandler.java index 06ab1b663b74..12ce8ba2d5ea 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/Http2PingHealthHandler.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/Http2PingHealthHandler.java @@ -95,12 +95,14 @@ public void handlerRemoved(ChannelHandlerContext ctx) { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - logger.info("channelRead on parent {}: {}", ctx.channel().id().asShortText(), msg.getClass().getSimpleName()); if (msg instanceof Http2PingFrame) { Http2PingFrame pingFrame = (Http2PingFrame) msg; if (pingFrame.ack()) { ctx.channel().attr(LAST_PING_ACK_NANOS).set(System.nanoTime()); - logger.info("HTTP/2 PING ACK on channel {}", ctx.channel().id().asShortText()); + if (logger.isDebugEnabled()) { + logger.debug("HTTP/2 PING ACK received on channel {}", + ctx.channel().id().asShortText()); + } } } // Always propagate — don't consume frames @@ -176,7 +178,9 @@ public static void installOnParentIfAbsent(Channel channel, long pingIntervalMs) targetChannel.attr(HANDLER_INSTALLED).set(Boolean.TRUE); targetChannel.pipeline().addLast(HANDLER_NAME, new Http2PingHealthHandler(pingIntervalMs)); - logger.info("Installed Http2PingHealthHandler on channel {} with {}ms interval. Pipeline: {}", - targetChannel.id().asShortText(), pingIntervalMs, targetChannel.pipeline().names()); + if (logger.isDebugEnabled()) { + logger.debug("Installed Http2PingHealthHandler on channel {} with {}ms interval", + targetChannel.id().asShortText(), pingIntervalMs); + } } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ReactorNettyClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ReactorNettyClient.java index d99cfad14826..307ba356513a 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ReactorNettyClient.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ReactorNettyClient.java @@ -166,13 +166,9 @@ private void configureChannelPipelineHandlers() { } // Install HTTP/2 PING health checker on the parent TCP channel. - // doOnConnected fires for each child H2 stream; installOnParentIfAbsent + // doOnConnected fires for the parent H2 connection; installOnParentIfAbsent // ensures the handler is added exactly once per parent connection. int pingIntervalSeconds = Configs.getHttp2PingIntervalInSeconds(); - logger.info("doOnConnected: channel={}, parent={}, pingInterval={}s", - connection.channel().id().asShortText(), - connection.channel().parent() != null ? connection.channel().parent().id().asShortText() : "null", - pingIntervalSeconds); if (pingIntervalSeconds > 0) { Http2PingHealthHandler.installOnParentIfAbsent( connection.channel(), pingIntervalSeconds * 1000L); diff --git a/sdk/cosmos/live-http-network-fault-platform-matrix.json b/sdk/cosmos/live-http-network-fault-platform-matrix.json index 1a78e86a1c33..05efd056736d 100644 --- a/sdk/cosmos/live-http-network-fault-platform-matrix.json +++ b/sdk/cosmos/live-http-network-fault-platform-matrix.json @@ -8,6 +8,7 @@ { "DESIRED_CONSISTENCIES": "[\"Session\"]", "ACCOUNT_CONSISTENCY": "Session", + "ArmTemplateParameters": "@{ enableMultipleWriteLocations = $false; defaultConsistencyLevel = 'Session' }", "ProfileFlag": [ "-Pmanual-http-network-fault" ], "Agent": { "ubuntu": { "OSVmImage": "env:LINUXVMIMAGE", "Pool": "env:LINUXPOOL" } From 4974ddfb8dcbb602db0e6bbaf13b44ffeb5de617 Mon Sep 17 00:00:00 2001 From: Abhijeet Mohanty Date: Sat, 14 Mar 2026 12:47:58 -0400 Subject: [PATCH 4/7] Adding HTTP/2 ping and HTTP connection lifecycle capabilities. --- .../Http2ConnectionLifecycleTests.java | 95 ++++++++++++++----- sdk/cosmos/tests.yml | 11 ++- 2 files changed, 81 insertions(+), 25 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/Http2ConnectionLifecycleTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/Http2ConnectionLifecycleTests.java index f30acb6df39b..762f0a85cd30 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/Http2ConnectionLifecycleTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/Http2ConnectionLifecycleTests.java @@ -53,10 +53,11 @@ * does NOT close the parent TCP connection. *

* HOW TO RUN: - * 1. Group "manual-http-network-fault" — NOT included in CI. - * 2. Docker container with --cap-add=NET_ADMIN, JDK 21, .m2 mounted. + * 1. Group "manual-http-network-fault" — NOT included in standard CI test suites. + * 2. Runs natively on Linux VMs (with sudo) or in Docker (with --cap-add=NET_ADMIN). * 3. Tests self-manage tc netem (add/remove delay) — no manual intervention. - * 4. See NETWORK_DELAY_TESTING_README.md for full setup and run instructions. + * 4. Tests self-skip if tc is not available (e.g., on Windows or non-privileged Linux). + * 5. See NETWORK_DELAY_TESTING_README.md for full setup and run instructions. *

* DESIGN: * - No creates during tests. One seed item created in beforeClass (via shared container). @@ -73,9 +74,10 @@ public class Http2ConnectionLifecycleTests extends FaultInjectionTestBase { private static final String TEST_GROUP = "manual-http-network-fault"; // 3 minutes per test — enough for warmup + delay + retries + cross-region failover + recovery read private static final long TEST_TIMEOUT = 180_000; - // Hardcode eth0 — Docker always uses eth0. detectNetworkInterface() fails during active delay - // because `tc qdisc show dev eth0` hangs, and the fallback returns `eth0@if23` which tc rejects. - private static final String NETWORK_INTERFACE = "eth0"; + // Network interface detected at runtime — eth0 for Docker, or the default route interface on CI VMs. + private String networkInterface; + // Prefix for privileged commands: empty string in Docker (runs as root), "sudo " on CI VMs. + private String sudoPrefix; @Factory(dataProvider = "clientBuildersWithGatewayAndHttp2") public Http2ConnectionLifecycleTests(CosmosClientBuilder clientBuilder) { @@ -85,6 +87,29 @@ public Http2ConnectionLifecycleTests(CosmosClientBuilder clientBuilder) { @BeforeClass(groups = {TEST_GROUP}, timeOut = TIMEOUT) public void beforeClass() { + // Detect whether we're running as root (Docker) or need sudo (CI VM) + this.sudoPrefix = "root".equals(System.getProperty("user.name")) ? "" : "sudo "; + + // Detect the default-route network interface + this.networkInterface = detectNetworkInterface(); + logger.info("Network interface: {}, sudo: {}", this.networkInterface, !this.sudoPrefix.isEmpty()); + + // Verify tc (traffic control) is available — fail-fast if not + try { + Process p = Runtime.getRuntime().exec(new String[]{"sh", "-c", sudoPrefix + "tc qdisc show dev " + networkInterface}); + int exit = p.waitFor(); + if (exit != 0) { + try (BufferedReader err = new BufferedReader(new InputStreamReader(p.getErrorStream()))) { + String errMsg = err.readLine(); + fail("tc not available on " + networkInterface + " (exit=" + exit + "): " + errMsg); + } + } + } catch (AssertionError e) { + throw e; + } catch (Exception e) { + fail("tc not available: " + e.getMessage()); + } + System.setProperty("COSMOS.THINCLIENT_ENABLED", "true"); // Seed one item using a temporary client. The shared container is created by @BeforeSuite. @@ -118,8 +143,10 @@ public void beforeMethod() { */ @AfterMethod(groups = {TEST_GROUP}, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) public void afterMethod() { - removeNetworkDelay(); - removePacketDrop(); + if (sudoPrefix != null) { + removeNetworkDelay(); + removePacketDrop(); + } safeClose(this.client); this.client = null; this.cosmosAsyncContainer = null; @@ -128,8 +155,10 @@ public void afterMethod() { @AfterClass(groups = {TEST_GROUP}, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) public void afterClass() { - removeNetworkDelay(); - removePacketDrop(); + if (sudoPrefix != null) { + removeNetworkDelay(); + removePacketDrop(); + } System.clearProperty("COSMOS.THINCLIENT_ENABLED"); System.clearProperty("COSMOS.HTTP_CONNECTION_MAX_LIFETIME_IN_SECONDS"); } @@ -268,19 +297,18 @@ private void assertNoGatewayTimeout(CosmosDiagnostics diagnostics, String contex } /** - * Applies a tc netem delay to all outbound traffic on the Docker container's network interface. + * Applies a tc netem delay to all outbound traffic on the network interface. * This delays ALL packets (including TCP handshake, HTTP/2 frames, and TLS records) by the * specified duration, causing reactor-netty's ReadTimeoutHandler to fire on H2 stream channels * when the delay exceeds the configured responseTimeout. * - *

Requires {@code --cap-add=NET_ADMIN} on the Docker container. Fails the test immediately - * if the {@code tc} command is not available or returns a non-zero exit code.

+ *

Requires root (Docker with {@code --cap-add=NET_ADMIN}) or passwordless sudo (CI VM). + * Fails the test immediately if {@code tc} command fails.

* * @param delayMs the delay in milliseconds to inject (e.g., 8000 for an 8-second delay) */ private void addNetworkDelay(int delayMs) { - String iface = NETWORK_INTERFACE; - String cmd = String.format("tc qdisc add dev %s root netem delay %dms", iface, delayMs); + String cmd = String.format("%stc qdisc add dev %s root netem delay %dms", sudoPrefix, networkInterface, delayMs); logger.info(">>> Adding network delay: {}", cmd); try { Process p = Runtime.getRuntime().exec(new String[]{"sh", "-c", cmd}); @@ -288,10 +316,10 @@ private void addNetworkDelay(int delayMs) { if (exit != 0) { try (BufferedReader err = new BufferedReader(new InputStreamReader(p.getErrorStream()))) { String errMsg = err.readLine(); - logger.warn("tc add failed (exit={}): {}", exit, errMsg); + fail("tc add failed (exit=" + exit + "): " + errMsg); } } else { - logger.info(">>> Network delay active: {}ms on {}", delayMs, iface); + logger.info(">>> Network delay active: {}ms on {}", delayMs, networkInterface); } } catch (Exception e) { logger.error("Failed to add network delay", e); @@ -300,7 +328,7 @@ private void addNetworkDelay(int delayMs) { } /** - * Removes any tc netem qdisc from the Docker container's network interface, restoring + * Removes any tc netem qdisc from the network interface, restoring * normal network behavior. This is called in {@code finally} blocks after each test and * in {@code @AfterMethod} and {@code @AfterClass} as a safety net. * @@ -308,8 +336,7 @@ private void addNetworkDelay(int delayMs) { * Does not fail the test on error — the priority is cleanup, not assertion.

*/ private void removeNetworkDelay() { - String iface = NETWORK_INTERFACE; - String cmd = String.format("tc qdisc del dev %s root netem", iface); + String cmd = String.format("%stc qdisc del dev %s root netem", sudoPrefix, networkInterface); logger.info(">>> Removing network delay: {}", cmd); try { Process p = Runtime.getRuntime().exec(new String[]{"sh", "-c", cmd}); @@ -1006,7 +1033,7 @@ public void connectionEvictedAfterMaxLifetimeEvenWithHealthyPings() throws Excep // ======================================================================== private void addPacketDrop() { - String cmd = "iptables -A OUTPUT -p tcp --dport 10250 -j DROP"; + String cmd = String.format("%siptables -A OUTPUT -p tcp --dport 10250 -j DROP", sudoPrefix); logger.info(">>> Adding packet drop: {}", cmd); try { Process p = Runtime.getRuntime().exec(new String[]{"sh", "-c", cmd}); @@ -1014,7 +1041,7 @@ private void addPacketDrop() { if (exit != 0) { try (BufferedReader err = new BufferedReader(new InputStreamReader(p.getErrorStream()))) { String errMsg = err.readLine(); - logger.warn("iptables add failed (exit={}): {}", exit, errMsg); + fail("iptables add failed (exit=" + exit + "): " + errMsg); } } else { logger.info(">>> Packet drop active on port 10250"); @@ -1026,7 +1053,7 @@ private void addPacketDrop() { } private void removePacketDrop() { - String cmd = "iptables -D OUTPUT -p tcp --dport 10250 -j DROP"; + String cmd = String.format("%siptables -D OUTPUT -p tcp --dport 10250 -j DROP", sudoPrefix); logger.info(">>> Removing packet drop: {}", cmd); try { Process p = Runtime.getRuntime().exec(new String[]{"sh", "-c", cmd}); @@ -1040,4 +1067,26 @@ private void removePacketDrop() { logger.warn("Failed to remove packet drop: {}", e.getMessage()); } } + + /** + * Detects the default-route network interface. + * In Docker this is typically eth0. On CI VMs it may be eth0, ens5, etc. + * Falls back to "eth0" if detection fails. + */ + private static String detectNetworkInterface() { + try { + Process p = Runtime.getRuntime().exec(new String[]{"sh", "-c", + "ip route show default | awk '{print $5}' | head -1"}); + p.waitFor(); + try (BufferedReader reader = new BufferedReader(new InputStreamReader(p.getInputStream()))) { + String iface = reader.readLine(); + if (iface != null && !iface.isEmpty() && !iface.contains("@")) { + return iface.trim(); + } + } + } catch (Exception e) { + // fall through + } + return "eth0"; + } } diff --git a/sdk/cosmos/tests.yml b/sdk/cosmos/tests.yml index 73a67a596a27..af86d0e7d3e5 100644 --- a/sdk/cosmos/tests.yml +++ b/sdk/cosmos/tests.yml @@ -163,15 +163,22 @@ extends: - name: AdditionalArgs value: '-DCOSMOS.CLIENT_LEAK_DETECTION_ENABLED=true -DACCOUNT_HOST=$(thin-client-canary-multi-writer-session-endpoint) -DACCOUNT_KEY=$(thin-client-canary-multi-writer-session-key) -DCOSMOS.THINCLIENT_ENABLED=true' - # Network fault injection tests (tc netem, iptables) — requires Linux VM with NET_ADMIN. + # Network fault injection tests (tc netem, iptables) — runs natively on Linux CI VMs. + # PreSteps ensure iproute2 (tc) and iptables are installed and sch_netem kernel module is loaded. + # Tests use sudo for tc/iptables commands. Self-skip if tc is not available. # Tests run sequentially (MaxParallel: 1) to avoid tc/iptables interference between tests. - # No Docker needed — tc and iptables are native on the Linux CI VM. - template: /eng/pipelines/templates/stages/archetype-sdk-tests-isolated.yml parameters: TestName: 'Cosmos_Live_Test_HttpNetworkFault' CloudConfig: Public: ServiceConnection: azure-sdk-tests-cosmos + PreSteps: + - script: | + sudo apt-get update -qq && sudo apt-get install -y -qq iproute2 iptables + sudo modprobe sch_netem || true + tc -Version && echo "tc available" || echo "tc not found" + displayName: 'Install tc (iproute2) and iptables for network fault injection' MatrixConfigs: - Name: Cosmos_live_test_http_network_fault Path: sdk/cosmos/live-http-network-fault-platform-matrix.json From 2ff4f43e96105e9d82e88f5d8f58f0547d31f6d7 Mon Sep 17 00:00:00 2001 From: Abhijeet Mohanty Date: Sat, 14 Mar 2026 13:37:26 -0400 Subject: [PATCH 5/7] Adding HTTP/2 ping and HTTP connection lifecycle capabilities. --- .../Http2ConnectTimeoutBifurcationTests.java | 90 +++++++++++++------ .../Http2ConnectionLifecycleTests.java | 5 +- sdk/cosmos/tests.yml | 2 +- 3 files changed, 67 insertions(+), 30 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/Http2ConnectTimeoutBifurcationTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/Http2ConnectTimeoutBifurcationTests.java index 765be45b86c8..1c82ccbeee5a 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/Http2ConnectTimeoutBifurcationTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/Http2ConnectTimeoutBifurcationTests.java @@ -45,8 +45,8 @@ * - Metadata requests → GW V1 endpoint (port 443) → CONNECT_TIMEOUT_MILLIS = 45s (unchanged) * * HOW TO RUN: - * 1. Group "manual-http-network-fault" — NOT included in CI. - * 2. Docker container with --cap-add=NET_ADMIN, JDK 21, .m2 mounted. + * 1. Group "manual-http-network-fault" — runs in dedicated CI pipeline stage. + * 2. Runs natively on Linux VMs (with sudo) or in Docker (with --cap-add=NET_ADMIN). * 3. Tests self-manage iptables rules (add/remove) — no manual intervention. * 4. See CONNECT_TIMEOUT_TESTING_README.md for full setup and run instructions. * @@ -63,6 +63,10 @@ public class Http2ConnectTimeoutBifurcationTests extends FaultInjectionTestBase private static final String TEST_GROUP = "manual-http-network-fault"; private static final long TEST_TIMEOUT = 180_000; + // Network interface detected at runtime — eth0 for Docker, or the default route interface on CI VMs. + private String networkInterface; + // Prefix for privileged commands: empty string in Docker (runs as root), "sudo " on CI VMs. + private String sudoPrefix; @Factory(dataProvider = "clientBuildersWithGatewayAndHttp2") public Http2ConnectTimeoutBifurcationTests(CosmosClientBuilder clientBuilder) { @@ -72,6 +76,11 @@ public Http2ConnectTimeoutBifurcationTests(CosmosClientBuilder clientBuilder) { @BeforeClass(groups = {TEST_GROUP}, timeOut = TIMEOUT) public void beforeClass() { + // Detect whether we're running as root (Docker) or need sudo (CI VM) + this.sudoPrefix = "root".equals(System.getProperty("user.name")) ? "" : "sudo "; + this.networkInterface = detectNetworkInterface(); + logger.info("Network interface: {}, sudo: {}", this.networkInterface, !this.sudoPrefix.isEmpty()); + System.setProperty("COSMOS.THINCLIENT_ENABLED", "true"); // Use the default THINCLIENT_CONNECTION_TIMEOUT_IN_SECONDS (5s) — no override. // Tests are designed around the 5s default to match production behavior. @@ -92,7 +101,9 @@ public void beforeClass() { @AfterClass(groups = {TEST_GROUP}, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) public void afterClass() { // Safety: remove any leftover iptables rules - removeIptablesDropOnPort(10250); + if (sudoPrefix != null) { + removeIptablesDropOnPort(10250); + } System.clearProperty("COSMOS.THINCLIENT_ENABLED"); safeClose(this.client); } @@ -113,23 +124,24 @@ public void afterClass() { * @param port10250DelayMs delay for port 10250 traffic (thin client data plane) */ private void addPerPortDelay(int port443DelayMs, int port10250DelayMs) { + String iface = networkInterface; String[] cmds = { // Create root prio qdisc with 3 bands - "tc qdisc add dev eth0 root handle 1: prio bands 3", + sudoPrefix + "tc qdisc add dev " + iface + " root handle 1: prio bands 3", // Band 1 (handle 1:1): delay for port 443 - String.format("tc qdisc add dev eth0 parent 1:1 handle 10: netem delay %dms", port443DelayMs), + String.format("%stc qdisc add dev %s parent 1:1 handle 10: netem delay %dms", sudoPrefix, iface, port443DelayMs), // Band 2 (handle 1:2): delay for port 10250 - String.format("tc qdisc add dev eth0 parent 1:2 handle 20: netem delay %dms", port10250DelayMs), + String.format("%stc qdisc add dev %s parent 1:2 handle 20: netem delay %dms", sudoPrefix, iface, port10250DelayMs), // Band 3 (handle 1:3): no delay (default for all other traffic) - "tc qdisc add dev eth0 parent 1:3 handle 30: pfifo_fast", + sudoPrefix + "tc qdisc add dev " + iface + " parent 1:3 handle 30: pfifo_fast", // Mark port 443 packets with mark 1 - "iptables -t mangle -A OUTPUT -p tcp --dport 443 -j MARK --set-mark 1", + sudoPrefix + "iptables -t mangle -A OUTPUT -p tcp --dport 443 -j MARK --set-mark 1", // Mark port 10250 packets with mark 2 - "iptables -t mangle -A OUTPUT -p tcp --dport 10250 -j MARK --set-mark 2", + sudoPrefix + "iptables -t mangle -A OUTPUT -p tcp --dport 10250 -j MARK --set-mark 2", // Route mark 1 → band 1 (port 443 delay) - "tc filter add dev eth0 parent 1:0 protocol ip prio 1 handle 1 fw flowid 1:1", + sudoPrefix + "tc filter add dev " + iface + " parent 1:0 protocol ip prio 1 handle 1 fw flowid 1:1", // Route mark 2 → band 2 (port 10250 delay) - "tc filter add dev eth0 parent 1:0 protocol ip prio 2 handle 2 fw flowid 1:2", + sudoPrefix + "tc filter add dev " + iface + " parent 1:0 protocol ip prio 2 handle 2 fw flowid 1:2", }; for (String cmd : cmds) { @@ -156,27 +168,26 @@ private void addPerPortDelay(int port443DelayMs, int port10250DelayMs) { * @param port10250SynDelayMs SYN delay for port 10250 (thin client data plane) */ private void addPerPortSynDelay(int port443SynDelayMs, int port10250SynDelayMs) { + String iface = networkInterface; String[] cmds = { // Create root prio qdisc with 3 bands - "tc qdisc add dev eth0 root handle 1: prio bands 3", + sudoPrefix + "tc qdisc add dev " + iface + " root handle 1: prio bands 3", // Band 1 (handle 1:1): delay for port 443 SYN - String.format("tc qdisc add dev eth0 parent 1:1 handle 10: netem delay %dms", port443SynDelayMs), + String.format("%stc qdisc add dev %s parent 1:1 handle 10: netem delay %dms", sudoPrefix, iface, port443SynDelayMs), // Band 2 (handle 1:2): delay for port 10250 SYN - String.format("tc qdisc add dev eth0 parent 1:2 handle 20: netem delay %dms", port10250SynDelayMs), + String.format("%stc qdisc add dev %s parent 1:2 handle 20: netem delay %dms", sudoPrefix, iface, port10250SynDelayMs), // Band 3 (handle 1:3): no delay (default for all other traffic including non-SYN) - "tc qdisc add dev eth0 parent 1:3 handle 30: pfifo_fast", + sudoPrefix + "tc qdisc add dev " + iface + " parent 1:3 handle 30: pfifo_fast", // Mark ONLY SYN packets (initial TCP connect) to port 443 with mark 1 - "iptables -t mangle -A OUTPUT -p tcp --dport 443 --tcp-flags SYN,ACK,FIN,RST SYN -j MARK --set-mark 1", + sudoPrefix + "iptables -t mangle -A OUTPUT -p tcp --dport 443 --tcp-flags SYN,ACK,FIN,RST SYN -j MARK --set-mark 1", // Mark ONLY SYN packets to port 10250 with mark 2 - "iptables -t mangle -A OUTPUT -p tcp --dport 10250 --tcp-flags SYN,ACK,FIN,RST SYN -j MARK --set-mark 2", + sudoPrefix + "iptables -t mangle -A OUTPUT -p tcp --dport 10250 --tcp-flags SYN,ACK,FIN,RST SYN -j MARK --set-mark 2", // Route mark 1 → band 1 (port 443 SYN delay) - "tc filter add dev eth0 parent 1:0 protocol ip prio 1 handle 1 fw flowid 1:1", + sudoPrefix + "tc filter add dev " + iface + " parent 1:0 protocol ip prio 1 handle 1 fw flowid 1:1", // Route mark 2 → band 2 (port 10250 SYN delay) - "tc filter add dev eth0 parent 1:0 protocol ip prio 2 handle 2 fw flowid 1:2", + sudoPrefix + "tc filter add dev " + iface + " parent 1:0 protocol ip prio 2 handle 2 fw flowid 1:2", // CRITICAL: Catch-all filter → band 3 (no delay) for ALL unmarked traffic. - // Without this, prio qdisc's default priomap sends unmarked packets to band 1 - // (the delay band), which delays TLS/HTTP/ACK traffic and causes spurious failures. - "tc filter add dev eth0 parent 1:0 protocol ip prio 99 u32 match u32 0 0 flowid 1:3", + sudoPrefix + "tc filter add dev " + iface + " parent 1:0 protocol ip prio 99 u32 match u32 0 0 flowid 1:3", }; for (String cmd : cmds) { @@ -191,9 +202,10 @@ private void addPerPortSynDelay(int port443SynDelayMs, int port10250SynDelayMs) * Removes all per-port delay rules (tc qdisc + iptables mangle marks). */ private void removePerPortDelay() { + String iface = networkInterface; String[] cmds = { - "tc qdisc del dev eth0 root 2>/dev/null", - "iptables -t mangle -F OUTPUT 2>/dev/null", + sudoPrefix + "tc qdisc del dev " + iface + " root 2>/dev/null", + sudoPrefix + "iptables -t mangle -F OUTPUT 2>/dev/null", }; for (String cmd : cmds) { @@ -218,9 +230,11 @@ private void executeShellCommand(String cmd) { if (exit != 0) { try (BufferedReader err = new BufferedReader(new InputStreamReader(p.getErrorStream()))) { String errMsg = err.readLine(); - logger.warn("Command failed (exit={}): {} — {}", exit, cmd, errMsg); + fail("Command failed (exit=" + exit + "): " + cmd + " — " + errMsg); } } + } catch (AssertionError e) { + throw e; } catch (Exception e) { logger.error("Failed to execute: {}", cmd, e); fail("Shell command failed: " + cmd + " — " + e.getMessage()); @@ -238,7 +252,7 @@ private void executeShellCommand(String cmd) { */ private void addIptablesDropOnPort(int port) { String cmd = String.format( - "iptables -A OUTPUT -p tcp --dport %d --tcp-flags SYN,ACK,FIN,RST SYN -j DROP", port); + "%siptables -A OUTPUT -p tcp --dport %d --tcp-flags SYN,ACK,FIN,RST SYN -j DROP", sudoPrefix, port); logger.info(">>> Adding iptables DROP SYN rule: {}", cmd); try { Process p = Runtime.getRuntime().exec(new String[]{"sh", "-c", cmd}); @@ -262,7 +276,7 @@ private void addIptablesDropOnPort(int port) { */ private void removeIptablesDropOnPort(int port) { String cmd = String.format( - "iptables -D OUTPUT -p tcp --dport %d --tcp-flags SYN,ACK,FIN,RST SYN -j DROP", port); + "%siptables -D OUTPUT -p tcp --dport %d --tcp-flags SYN,ACK,FIN,RST SYN -j DROP", sudoPrefix, port); logger.info(">>> Removing iptables DROP SYN rule: {}", cmd); try { Process p = Runtime.getRuntime().exec(new String[]{"sh", "-c", cmd}); @@ -277,6 +291,28 @@ private void removeIptablesDropOnPort(int port) { } } + /** + * Detects the default-route network interface. + * In Docker this is typically eth0. On CI VMs it may be eth0, ens5, etc. + * Falls back to "eth0" if detection fails. + */ + private static String detectNetworkInterface() { + try { + Process p = Runtime.getRuntime().exec(new String[]{"sh", "-c", + "ip route show default | awk '{print $5}' | head -1"}); + p.waitFor(); + try (BufferedReader reader = new BufferedReader(new InputStreamReader(p.getInputStream()))) { + String iface = reader.readLine(); + if (iface != null && !iface.isEmpty() && !iface.contains("@")) { + return iface.trim(); + } + } + } catch (Exception e) { + // fall through + } + return "eth0"; + } + // ======================================================================== // Tests // ======================================================================== diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/Http2ConnectionLifecycleTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/Http2ConnectionLifecycleTests.java index 762f0a85cd30..43fd6203baec 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/Http2ConnectionLifecycleTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/Http2ConnectionLifecycleTests.java @@ -394,8 +394,9 @@ public void connectionReuseAfterRealNettyTimeout() throws Exception { .isNotNull(); assertContainsGatewayTimeout(delayedDiagnostics, "delayed read"); - // Brief pause to let TCP retransmission settle after netem qdisc deletion - Thread.sleep(1000); + // Brief pause to let TCP retransmission settle after netem qdisc deletion. + // On CI VMs, kernel queue draining may take longer than in Docker. + Thread.sleep(3000); // Recovery read — assert no timeout, low latency, and same parent channel CosmosDiagnostics recoveryDiagnostics = this.performDocumentOperation( diff --git a/sdk/cosmos/tests.yml b/sdk/cosmos/tests.yml index af86d0e7d3e5..1a146454ead3 100644 --- a/sdk/cosmos/tests.yml +++ b/sdk/cosmos/tests.yml @@ -165,7 +165,7 @@ extends: # Network fault injection tests (tc netem, iptables) — runs natively on Linux CI VMs. # PreSteps ensure iproute2 (tc) and iptables are installed and sch_netem kernel module is loaded. - # Tests use sudo for tc/iptables commands. Self-skip if tc is not available. + # Tests use sudo for tc/iptables commands. Fail-fast if tc is not available. # Tests run sequentially (MaxParallel: 1) to avoid tc/iptables interference between tests. - template: /eng/pipelines/templates/stages/archetype-sdk-tests-isolated.yml parameters: From ed2264b6886f1be775ad6fd938772db9284149f9 Mon Sep 17 00:00:00 2001 From: Abhijeet Mohanty Date: Sat, 14 Mar 2026 15:51:20 -0400 Subject: [PATCH 6/7] Adding HTTP/2 ping and HTTP connection lifecycle capabilities. --- .../Http2ConnectionLifecycleTests.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/Http2ConnectionLifecycleTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/Http2ConnectionLifecycleTests.java index 43fd6203baec..601d9d06cc32 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/Http2ConnectionLifecycleTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/Http2ConnectionLifecycleTests.java @@ -336,7 +336,7 @@ private void addNetworkDelay(int delayMs) { * Does not fail the test on error — the priority is cleanup, not assertion.

*/ private void removeNetworkDelay() { - String cmd = String.format("%stc qdisc del dev %s root netem", sudoPrefix, networkInterface); + String cmd = String.format("%stc qdisc del dev %s root", sudoPrefix, networkInterface); logger.info(">>> Removing network delay: {}", cmd); try { Process p = Runtime.getRuntime().exec(new String[]{"sh", "-c", cmd}); @@ -344,7 +344,10 @@ private void removeNetworkDelay() { if (exit == 0) { logger.info(">>> Network delay removed"); } else { - logger.warn("tc del returned exit={} (may already be removed)", exit); + try (BufferedReader err = new BufferedReader(new InputStreamReader(p.getErrorStream()))) { + String errMsg = err.readLine(); + logger.warn("tc del returned exit={}: {} (may already be removed)", exit, errMsg); + } } } catch (Exception e) { logger.warn("Failed to remove network delay: {}", e.getMessage()); @@ -395,10 +398,9 @@ public void connectionReuseAfterRealNettyTimeout() throws Exception { assertContainsGatewayTimeout(delayedDiagnostics, "delayed read"); // Brief pause to let TCP retransmission settle after netem qdisc deletion. - // On CI VMs, kernel queue draining may take longer than in Docker. Thread.sleep(3000); - // Recovery read — assert no timeout, low latency, and same parent channel + // Recovery read — delay is removed, should succeed cleanly CosmosDiagnostics recoveryDiagnostics = this.performDocumentOperation( this.cosmosAsyncContainer, OperationType.Read, seedItem, false); CosmosDiagnosticsContext recoveryCtx = recoveryDiagnostics.getDiagnosticsContext(); @@ -410,7 +412,7 @@ public void connectionReuseAfterRealNettyTimeout() throws Exception { h2ParentChannelIdBeforeDelay.equals(h2ParentChannelIdAfterDelay), recoveryCtx.getDuration().toMillis()); AssertionsForClassTypes.assertThat(recoveryCtx.getDuration()) - .as("Recovery read should complete within 10s (allows one 6s ReadTimeout retry + TCP stabilization)") + .as("Recovery read should complete within 10s (delay removed, no retries expected)") .isLessThan(Duration.ofSeconds(10)); assertThat(h2ParentChannelIdAfterDelay) .as("H2 parent NioSocketChannel should survive ReadTimeoutException on Http2StreamChannel") From 283b12fde2e913427f78f7c7b41ebaaffabd83eb Mon Sep 17 00:00:00 2001 From: Abhijeet Mohanty Date: Sat, 14 Mar 2026 18:22:43 -0400 Subject: [PATCH 7/7] Adding HTTP/2 ping and HTTP connection lifecycle capabilities. --- .../implementation/http/Http2PingHealthHandler.java | 4 ++-- .../azure/cosmos/implementation/http/HttpClient.java | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/Http2PingHealthHandler.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/Http2PingHealthHandler.java index 12ce8ba2d5ea..84ce2066ab42 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/Http2PingHealthHandler.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/Http2PingHealthHandler.java @@ -171,11 +171,11 @@ public static void installOnParentIfAbsent(Channel channel, long pingIntervalMs) // For child streams (if they ever fire), navigate to parent. Channel targetChannel = channel.parent() != null ? channel.parent() : channel; - if (Boolean.TRUE.equals(targetChannel.attr(HANDLER_INSTALLED).get())) { + // Atomic check-and-set to prevent duplicate installation from concurrent doOnConnected callbacks + if (Boolean.TRUE.equals(targetChannel.attr(HANDLER_INSTALLED).getAndSet(Boolean.TRUE))) { return; // already installed } - targetChannel.attr(HANDLER_INSTALLED).set(Boolean.TRUE); targetChannel.pipeline().addLast(HANDLER_NAME, new Http2PingHealthHandler(pingIntervalMs)); if (logger.isDebugEnabled()) { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpClient.java index f4b6f78647c8..46a34aaabfe4 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpClient.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpClient.java @@ -78,11 +78,11 @@ static HttpClient createFixed(HttpClientConfig httpClientConfig) { // Phase 2: PING liveness — if PING ACK is stale, connection is silently degraded if (pingAckTimeoutNanos > 0) { Channel parentChannel = connection.channel(); - if (parentChannel.hasAttr(Http2PingHealthHandler.LAST_PING_ACK_NANOS)) { - long lastAckNanos = parentChannel.attr(Http2PingHealthHandler.LAST_PING_ACK_NANOS).get(); - if (System.nanoTime() - lastAckNanos > pingAckTimeoutNanos) { - return true; - } + Long lastAckNanos = parentChannel.hasAttr(Http2PingHealthHandler.LAST_PING_ACK_NANOS) + ? parentChannel.attr(Http2PingHealthHandler.LAST_PING_ACK_NANOS).get() + : null; + if (lastAckNanos != null && System.nanoTime() - lastAckNanos > pingAckTimeoutNanos) { + return true; } }