diff --git a/sdk/cosmos/azure-cosmos-tests/CONNECT_TIMEOUT_TESTING_README.md b/sdk/cosmos/azure-cosmos-tests/CONNECT_TIMEOUT_TESTING_README.md
index 7895b856b331..26368bffd455 100644
--- a/sdk/cosmos/azure-cosmos-tests/CONNECT_TIMEOUT_TESTING_README.md
+++ b/sdk/cosmos/azure-cosmos-tests/CONNECT_TIMEOUT_TESTING_README.md
@@ -38,7 +38,7 @@ docker run --rm --cap-add=NET_ADMIN --memory 8g \
java -DCOSMOS.THINCLIENT_ENABLED=true \
-DCOSMOS.THINCLIENT_CONNECTION_TIMEOUT_IN_SECONDS=1 \
-DCOSMOS.HTTP2_ENABLED=true \
- org.testng.TestNG /workspace/azure-cosmos-tests/src/test/resources/manual-thinclient-network-delay-testng.xml \
+ org.testng.TestNG /workspace/azure-cosmos-tests/src/test/resources/manual-http-network-fault-testng.xml \
-verbose 2
'
```
diff --git a/sdk/cosmos/azure-cosmos-tests/NETWORK_DELAY_TESTING_README.md b/sdk/cosmos/azure-cosmos-tests/NETWORK_DELAY_TESTING_README.md
index c304c6e06657..798e4df3954a 100644
--- a/sdk/cosmos/azure-cosmos-tests/NETWORK_DELAY_TESTING_README.md
+++ b/sdk/cosmos/azure-cosmos-tests/NETWORK_DELAY_TESTING_README.md
@@ -76,7 +76,7 @@ docker run --rm --cap-add=NET_ADMIN --memory 8g \
-DACCOUNT_KEY=$ACCOUNT_KEY \
-DCOSMOS.THINCLIENT_ENABLED=true \
-DCOSMOS.HTTP2_ENABLED=true \
- org.testng.TestNG /workspace/azure-cosmos-tests/src/test/resources/manual-thinclient-network-delay-testng.xml \
+ org.testng.TestNG /workspace/azure-cosmos-tests/src/test/resources/manual-http-network-fault-testng.xml \
-verbose 2
'
```
diff --git a/sdk/cosmos/azure-cosmos-tests/pom.xml b/sdk/cosmos/azure-cosmos-tests/pom.xml
index 71bf282c4066..0662705a84ca 100644
--- a/sdk/cosmos/azure-cosmos-tests/pom.xml
+++ b/sdk/cosmos/azure-cosmos-tests/pom.xml
@@ -857,6 +857,30 @@ Licensed under the MIT License.
+
+ manual-http-network-fault
+
+ manual-http-network-fault
+
+
+
+
+ org.apache.maven.plugins
+ maven-failsafe-plugin
+ 3.5.3
+
+
+ src/test/resources/manual-http-network-fault-testng.xml
+
+
+ true
+ true
+
+
+
+
+
+
fi-thinclient-multi-region
diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/Http2ConnectTimeoutBifurcationTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/Http2ConnectTimeoutBifurcationTests.java
index e93b4cf56353..1c82ccbeee5a 100644
--- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/Http2ConnectTimeoutBifurcationTests.java
+++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/Http2ConnectTimeoutBifurcationTests.java
@@ -45,8 +45,8 @@
* - Metadata requests → GW V1 endpoint (port 443) → CONNECT_TIMEOUT_MILLIS = 45s (unchanged)
*
* HOW TO RUN:
- * 1. Group "manual-thinclient-network-delay" — NOT included in CI.
- * 2. Docker container with --cap-add=NET_ADMIN, JDK 21, .m2 mounted.
+ * 1. Group "manual-http-network-fault" — runs in dedicated CI pipeline stage.
+ * 2. Runs natively on Linux VMs (with sudo) or in Docker (with --cap-add=NET_ADMIN).
* 3. Tests self-manage iptables rules (add/remove) — no manual intervention.
* 4. See CONNECT_TIMEOUT_TESTING_README.md for full setup and run instructions.
*
@@ -61,8 +61,12 @@ public class Http2ConnectTimeoutBifurcationTests extends FaultInjectionTestBase
private CosmosAsyncContainer cosmosAsyncContainer;
private TestObject seedItem;
- private static final String TEST_GROUP = "manual-thinclient-network-delay";
+ private static final String TEST_GROUP = "manual-http-network-fault";
private static final long TEST_TIMEOUT = 180_000;
+ // Network interface detected at runtime — eth0 for Docker, or the default route interface on CI VMs.
+ private String networkInterface;
+ // Prefix for privileged commands: empty string in Docker (runs as root), "sudo " on CI VMs.
+ private String sudoPrefix;
@Factory(dataProvider = "clientBuildersWithGatewayAndHttp2")
public Http2ConnectTimeoutBifurcationTests(CosmosClientBuilder clientBuilder) {
@@ -72,6 +76,11 @@ public Http2ConnectTimeoutBifurcationTests(CosmosClientBuilder clientBuilder) {
@BeforeClass(groups = {TEST_GROUP}, timeOut = TIMEOUT)
public void beforeClass() {
+ // Detect whether we're running as root (Docker) or need sudo (CI VM)
+ this.sudoPrefix = "root".equals(System.getProperty("user.name")) ? "" : "sudo ";
+ this.networkInterface = detectNetworkInterface();
+ logger.info("Network interface: {}, sudo: {}", this.networkInterface, !this.sudoPrefix.isEmpty());
+
System.setProperty("COSMOS.THINCLIENT_ENABLED", "true");
// Use the default THINCLIENT_CONNECTION_TIMEOUT_IN_SECONDS (5s) — no override.
// Tests are designed around the 5s default to match production behavior.
@@ -92,7 +101,9 @@ public void beforeClass() {
@AfterClass(groups = {TEST_GROUP}, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true)
public void afterClass() {
// Safety: remove any leftover iptables rules
- removeIptablesDropOnPort(10250);
+ if (sudoPrefix != null) {
+ removeIptablesDropOnPort(10250);
+ }
System.clearProperty("COSMOS.THINCLIENT_ENABLED");
safeClose(this.client);
}
@@ -113,23 +124,24 @@ public void afterClass() {
* @param port10250DelayMs delay for port 10250 traffic (thin client data plane)
*/
private void addPerPortDelay(int port443DelayMs, int port10250DelayMs) {
+ String iface = networkInterface;
String[] cmds = {
// Create root prio qdisc with 3 bands
- "tc qdisc add dev eth0 root handle 1: prio bands 3",
+ sudoPrefix + "tc qdisc add dev " + iface + " root handle 1: prio bands 3",
// Band 1 (handle 1:1): delay for port 443
- String.format("tc qdisc add dev eth0 parent 1:1 handle 10: netem delay %dms", port443DelayMs),
+ String.format("%stc qdisc add dev %s parent 1:1 handle 10: netem delay %dms", sudoPrefix, iface, port443DelayMs),
// Band 2 (handle 1:2): delay for port 10250
- String.format("tc qdisc add dev eth0 parent 1:2 handle 20: netem delay %dms", port10250DelayMs),
+ String.format("%stc qdisc add dev %s parent 1:2 handle 20: netem delay %dms", sudoPrefix, iface, port10250DelayMs),
// Band 3 (handle 1:3): no delay (default for all other traffic)
- "tc qdisc add dev eth0 parent 1:3 handle 30: pfifo_fast",
+ sudoPrefix + "tc qdisc add dev " + iface + " parent 1:3 handle 30: pfifo_fast",
// Mark port 443 packets with mark 1
- "iptables -t mangle -A OUTPUT -p tcp --dport 443 -j MARK --set-mark 1",
+ sudoPrefix + "iptables -t mangle -A OUTPUT -p tcp --dport 443 -j MARK --set-mark 1",
// Mark port 10250 packets with mark 2
- "iptables -t mangle -A OUTPUT -p tcp --dport 10250 -j MARK --set-mark 2",
+ sudoPrefix + "iptables -t mangle -A OUTPUT -p tcp --dport 10250 -j MARK --set-mark 2",
// Route mark 1 → band 1 (port 443 delay)
- "tc filter add dev eth0 parent 1:0 protocol ip prio 1 handle 1 fw flowid 1:1",
+ sudoPrefix + "tc filter add dev " + iface + " parent 1:0 protocol ip prio 1 handle 1 fw flowid 1:1",
// Route mark 2 → band 2 (port 10250 delay)
- "tc filter add dev eth0 parent 1:0 protocol ip prio 2 handle 2 fw flowid 1:2",
+ sudoPrefix + "tc filter add dev " + iface + " parent 1:0 protocol ip prio 2 handle 2 fw flowid 1:2",
};
for (String cmd : cmds) {
@@ -156,27 +168,26 @@ private void addPerPortDelay(int port443DelayMs, int port10250DelayMs) {
* @param port10250SynDelayMs SYN delay for port 10250 (thin client data plane)
*/
private void addPerPortSynDelay(int port443SynDelayMs, int port10250SynDelayMs) {
+ String iface = networkInterface;
String[] cmds = {
// Create root prio qdisc with 3 bands
- "tc qdisc add dev eth0 root handle 1: prio bands 3",
+ sudoPrefix + "tc qdisc add dev " + iface + " root handle 1: prio bands 3",
// Band 1 (handle 1:1): delay for port 443 SYN
- String.format("tc qdisc add dev eth0 parent 1:1 handle 10: netem delay %dms", port443SynDelayMs),
+ String.format("%stc qdisc add dev %s parent 1:1 handle 10: netem delay %dms", sudoPrefix, iface, port443SynDelayMs),
// Band 2 (handle 1:2): delay for port 10250 SYN
- String.format("tc qdisc add dev eth0 parent 1:2 handle 20: netem delay %dms", port10250SynDelayMs),
+ String.format("%stc qdisc add dev %s parent 1:2 handle 20: netem delay %dms", sudoPrefix, iface, port10250SynDelayMs),
// Band 3 (handle 1:3): no delay (default for all other traffic including non-SYN)
- "tc qdisc add dev eth0 parent 1:3 handle 30: pfifo_fast",
+ sudoPrefix + "tc qdisc add dev " + iface + " parent 1:3 handle 30: pfifo_fast",
// Mark ONLY SYN packets (initial TCP connect) to port 443 with mark 1
- "iptables -t mangle -A OUTPUT -p tcp --dport 443 --tcp-flags SYN,ACK,FIN,RST SYN -j MARK --set-mark 1",
+ sudoPrefix + "iptables -t mangle -A OUTPUT -p tcp --dport 443 --tcp-flags SYN,ACK,FIN,RST SYN -j MARK --set-mark 1",
// Mark ONLY SYN packets to port 10250 with mark 2
- "iptables -t mangle -A OUTPUT -p tcp --dport 10250 --tcp-flags SYN,ACK,FIN,RST SYN -j MARK --set-mark 2",
+ sudoPrefix + "iptables -t mangle -A OUTPUT -p tcp --dport 10250 --tcp-flags SYN,ACK,FIN,RST SYN -j MARK --set-mark 2",
// Route mark 1 → band 1 (port 443 SYN delay)
- "tc filter add dev eth0 parent 1:0 protocol ip prio 1 handle 1 fw flowid 1:1",
+ sudoPrefix + "tc filter add dev " + iface + " parent 1:0 protocol ip prio 1 handle 1 fw flowid 1:1",
// Route mark 2 → band 2 (port 10250 SYN delay)
- "tc filter add dev eth0 parent 1:0 protocol ip prio 2 handle 2 fw flowid 1:2",
+ sudoPrefix + "tc filter add dev " + iface + " parent 1:0 protocol ip prio 2 handle 2 fw flowid 1:2",
// CRITICAL: Catch-all filter → band 3 (no delay) for ALL unmarked traffic.
- // Without this, prio qdisc's default priomap sends unmarked packets to band 1
- // (the delay band), which delays TLS/HTTP/ACK traffic and causes spurious failures.
- "tc filter add dev eth0 parent 1:0 protocol ip prio 99 u32 match u32 0 0 flowid 1:3",
+ sudoPrefix + "tc filter add dev " + iface + " parent 1:0 protocol ip prio 99 u32 match u32 0 0 flowid 1:3",
};
for (String cmd : cmds) {
@@ -191,9 +202,10 @@ private void addPerPortSynDelay(int port443SynDelayMs, int port10250SynDelayMs)
* Removes all per-port delay rules (tc qdisc + iptables mangle marks).
*/
private void removePerPortDelay() {
+ String iface = networkInterface;
String[] cmds = {
- "tc qdisc del dev eth0 root 2>/dev/null",
- "iptables -t mangle -F OUTPUT 2>/dev/null",
+ sudoPrefix + "tc qdisc del dev " + iface + " root 2>/dev/null",
+ sudoPrefix + "iptables -t mangle -F OUTPUT 2>/dev/null",
};
for (String cmd : cmds) {
@@ -218,9 +230,11 @@ private void executeShellCommand(String cmd) {
if (exit != 0) {
try (BufferedReader err = new BufferedReader(new InputStreamReader(p.getErrorStream()))) {
String errMsg = err.readLine();
- logger.warn("Command failed (exit={}): {} — {}", exit, cmd, errMsg);
+ fail("Command failed (exit=" + exit + "): " + cmd + " — " + errMsg);
}
}
+ } catch (AssertionError e) {
+ throw e;
} catch (Exception e) {
logger.error("Failed to execute: {}", cmd, e);
fail("Shell command failed: " + cmd + " — " + e.getMessage());
@@ -238,7 +252,7 @@ private void executeShellCommand(String cmd) {
*/
private void addIptablesDropOnPort(int port) {
String cmd = String.format(
- "iptables -A OUTPUT -p tcp --dport %d --tcp-flags SYN,ACK,FIN,RST SYN -j DROP", port);
+ "%siptables -A OUTPUT -p tcp --dport %d --tcp-flags SYN,ACK,FIN,RST SYN -j DROP", sudoPrefix, port);
logger.info(">>> Adding iptables DROP SYN rule: {}", cmd);
try {
Process p = Runtime.getRuntime().exec(new String[]{"sh", "-c", cmd});
@@ -262,7 +276,7 @@ private void addIptablesDropOnPort(int port) {
*/
private void removeIptablesDropOnPort(int port) {
String cmd = String.format(
- "iptables -D OUTPUT -p tcp --dport %d --tcp-flags SYN,ACK,FIN,RST SYN -j DROP", port);
+ "%siptables -D OUTPUT -p tcp --dport %d --tcp-flags SYN,ACK,FIN,RST SYN -j DROP", sudoPrefix, port);
logger.info(">>> Removing iptables DROP SYN rule: {}", cmd);
try {
Process p = Runtime.getRuntime().exec(new String[]{"sh", "-c", cmd});
@@ -277,6 +291,28 @@ private void removeIptablesDropOnPort(int port) {
}
}
+ /**
+ * Detects the default-route network interface.
+ * In Docker this is typically eth0. On CI VMs it may be eth0, ens5, etc.
+ * Falls back to "eth0" if detection fails.
+ */
+ private static String detectNetworkInterface() {
+ try {
+ Process p = Runtime.getRuntime().exec(new String[]{"sh", "-c",
+ "ip route show default | awk '{print $5}' | head -1"});
+ p.waitFor();
+ try (BufferedReader reader = new BufferedReader(new InputStreamReader(p.getInputStream()))) {
+ String iface = reader.readLine();
+ if (iface != null && !iface.isEmpty() && !iface.contains("@")) {
+ return iface.trim();
+ }
+ }
+ } catch (Exception e) {
+ // fall through
+ }
+ return "eth0";
+ }
+
// ========================================================================
// Tests
// ========================================================================
diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/Http2ConnectionLifecycleTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/Http2ConnectionLifecycleTests.java
index 9c849a8ce044..601d9d06cc32 100644
--- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/Http2ConnectionLifecycleTests.java
+++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/Http2ConnectionLifecycleTests.java
@@ -53,10 +53,11 @@
* does NOT close the parent TCP connection.
*
* HOW TO RUN:
- * 1. Group "manual-thinclient-network-delay" — NOT included in CI.
- * 2. Docker container with --cap-add=NET_ADMIN, JDK 21, .m2 mounted.
+ * 1. Group "manual-http-network-fault" — NOT included in standard CI test suites.
+ * 2. Runs natively on Linux VMs (with sudo) or in Docker (with --cap-add=NET_ADMIN).
* 3. Tests self-manage tc netem (add/remove delay) — no manual intervention.
- * 4. See NETWORK_DELAY_TESTING_README.md for full setup and run instructions.
+ * 4. Tests self-skip if tc is not available (e.g., on Windows or non-privileged Linux).
+ * 5. See NETWORK_DELAY_TESTING_README.md for full setup and run instructions.
*
* DESIGN:
* - No creates during tests. One seed item created in beforeClass (via shared container).
@@ -70,12 +71,13 @@ public class Http2ConnectionLifecycleTests extends FaultInjectionTestBase {
private CosmosAsyncContainer cosmosAsyncContainer;
private TestObject seedItem;
- private static final String TEST_GROUP = "manual-thinclient-network-delay";
+ private static final String TEST_GROUP = "manual-http-network-fault";
// 3 minutes per test — enough for warmup + delay + retries + cross-region failover + recovery read
private static final long TEST_TIMEOUT = 180_000;
- // Hardcode eth0 — Docker always uses eth0. detectNetworkInterface() fails during active delay
- // because `tc qdisc show dev eth0` hangs, and the fallback returns `eth0@if23` which tc rejects.
- private static final String NETWORK_INTERFACE = "eth0";
+ // Network interface detected at runtime — eth0 for Docker, or the default route interface on CI VMs.
+ private String networkInterface;
+ // Prefix for privileged commands: empty string in Docker (runs as root), "sudo " on CI VMs.
+ private String sudoPrefix;
@Factory(dataProvider = "clientBuildersWithGatewayAndHttp2")
public Http2ConnectionLifecycleTests(CosmosClientBuilder clientBuilder) {
@@ -85,6 +87,29 @@ public Http2ConnectionLifecycleTests(CosmosClientBuilder clientBuilder) {
@BeforeClass(groups = {TEST_GROUP}, timeOut = TIMEOUT)
public void beforeClass() {
+ // Detect whether we're running as root (Docker) or need sudo (CI VM)
+ this.sudoPrefix = "root".equals(System.getProperty("user.name")) ? "" : "sudo ";
+
+ // Detect the default-route network interface
+ this.networkInterface = detectNetworkInterface();
+ logger.info("Network interface: {}, sudo: {}", this.networkInterface, !this.sudoPrefix.isEmpty());
+
+ // Verify tc (traffic control) is available — fail-fast if not
+ try {
+ Process p = Runtime.getRuntime().exec(new String[]{"sh", "-c", sudoPrefix + "tc qdisc show dev " + networkInterface});
+ int exit = p.waitFor();
+ if (exit != 0) {
+ try (BufferedReader err = new BufferedReader(new InputStreamReader(p.getErrorStream()))) {
+ String errMsg = err.readLine();
+ fail("tc not available on " + networkInterface + " (exit=" + exit + "): " + errMsg);
+ }
+ }
+ } catch (AssertionError e) {
+ throw e;
+ } catch (Exception e) {
+ fail("tc not available: " + e.getMessage());
+ }
+
System.setProperty("COSMOS.THINCLIENT_ENABLED", "true");
// Seed one item using a temporary client. The shared container is created by @BeforeSuite.
@@ -118,7 +143,10 @@ public void beforeMethod() {
*/
@AfterMethod(groups = {TEST_GROUP}, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true)
public void afterMethod() {
- removeNetworkDelay();
+ if (sudoPrefix != null) {
+ removeNetworkDelay();
+ removePacketDrop();
+ }
safeClose(this.client);
this.client = null;
this.cosmosAsyncContainer = null;
@@ -127,8 +155,12 @@ public void afterMethod() {
@AfterClass(groups = {TEST_GROUP}, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true)
public void afterClass() {
- removeNetworkDelay();
+ if (sudoPrefix != null) {
+ removeNetworkDelay();
+ removePacketDrop();
+ }
System.clearProperty("COSMOS.THINCLIENT_ENABLED");
+ System.clearProperty("COSMOS.HTTP_CONNECTION_MAX_LIFETIME_IN_SECONDS");
}
// ========================================================================
@@ -265,19 +297,18 @@ private void assertNoGatewayTimeout(CosmosDiagnostics diagnostics, String contex
}
/**
- * Applies a tc netem delay to all outbound traffic on the Docker container's network interface.
+ * Applies a tc netem delay to all outbound traffic on the network interface.
* This delays ALL packets (including TCP handshake, HTTP/2 frames, and TLS records) by the
* specified duration, causing reactor-netty's ReadTimeoutHandler to fire on H2 stream channels
* when the delay exceeds the configured responseTimeout.
*
- *
Requires {@code --cap-add=NET_ADMIN} on the Docker container. Fails the test immediately
- * if the {@code tc} command is not available or returns a non-zero exit code.
+ * Requires root (Docker with {@code --cap-add=NET_ADMIN}) or passwordless sudo (CI VM).
+ * Fails the test immediately if {@code tc} command fails.
*
* @param delayMs the delay in milliseconds to inject (e.g., 8000 for an 8-second delay)
*/
private void addNetworkDelay(int delayMs) {
- String iface = NETWORK_INTERFACE;
- String cmd = String.format("tc qdisc add dev %s root netem delay %dms", iface, delayMs);
+ String cmd = String.format("%stc qdisc add dev %s root netem delay %dms", sudoPrefix, networkInterface, delayMs);
logger.info(">>> Adding network delay: {}", cmd);
try {
Process p = Runtime.getRuntime().exec(new String[]{"sh", "-c", cmd});
@@ -285,10 +316,10 @@ private void addNetworkDelay(int delayMs) {
if (exit != 0) {
try (BufferedReader err = new BufferedReader(new InputStreamReader(p.getErrorStream()))) {
String errMsg = err.readLine();
- logger.warn("tc add failed (exit={}): {}", exit, errMsg);
+ fail("tc add failed (exit=" + exit + "): " + errMsg);
}
} else {
- logger.info(">>> Network delay active: {}ms on {}", delayMs, iface);
+ logger.info(">>> Network delay active: {}ms on {}", delayMs, networkInterface);
}
} catch (Exception e) {
logger.error("Failed to add network delay", e);
@@ -297,7 +328,7 @@ private void addNetworkDelay(int delayMs) {
}
/**
- * Removes any tc netem qdisc from the Docker container's network interface, restoring
+ * Removes any tc netem qdisc from the network interface, restoring
* normal network behavior. This is called in {@code finally} blocks after each test and
* in {@code @AfterMethod} and {@code @AfterClass} as a safety net.
*
@@ -305,8 +336,7 @@ private void addNetworkDelay(int delayMs) {
* Does not fail the test on error — the priority is cleanup, not assertion.
*/
private void removeNetworkDelay() {
- String iface = NETWORK_INTERFACE;
- String cmd = String.format("tc qdisc del dev %s root netem", iface);
+ String cmd = String.format("%stc qdisc del dev %s root", sudoPrefix, networkInterface);
logger.info(">>> Removing network delay: {}", cmd);
try {
Process p = Runtime.getRuntime().exec(new String[]{"sh", "-c", cmd});
@@ -314,7 +344,10 @@ private void removeNetworkDelay() {
if (exit == 0) {
logger.info(">>> Network delay removed");
} else {
- logger.warn("tc del returned exit={} (may already be removed)", exit);
+ try (BufferedReader err = new BufferedReader(new InputStreamReader(p.getErrorStream()))) {
+ String errMsg = err.readLine();
+ logger.warn("tc del returned exit={}: {} (may already be removed)", exit, errMsg);
+ }
}
} catch (Exception e) {
logger.warn("Failed to remove network delay: {}", e.getMessage());
@@ -364,10 +397,10 @@ public void connectionReuseAfterRealNettyTimeout() throws Exception {
.isNotNull();
assertContainsGatewayTimeout(delayedDiagnostics, "delayed read");
- // Brief pause to let TCP retransmission settle after netem qdisc deletion
- Thread.sleep(1000);
+ // Brief pause to let TCP retransmission settle after netem qdisc deletion.
+ Thread.sleep(3000);
- // Recovery read — assert no timeout, low latency, and same parent channel
+ // Recovery read — delay is removed, should succeed cleanly
CosmosDiagnostics recoveryDiagnostics = this.performDocumentOperation(
this.cosmosAsyncContainer, OperationType.Read, seedItem, false);
CosmosDiagnosticsContext recoveryCtx = recoveryDiagnostics.getDiagnosticsContext();
@@ -379,7 +412,7 @@ public void connectionReuseAfterRealNettyTimeout() throws Exception {
h2ParentChannelIdBeforeDelay.equals(h2ParentChannelIdAfterDelay),
recoveryCtx.getDuration().toMillis());
AssertionsForClassTypes.assertThat(recoveryCtx.getDuration())
- .as("Recovery read should complete within 10s (allows one 6s ReadTimeout retry + TCP stabilization)")
+ .as("Recovery read should complete within 10s (delay removed, no retries expected)")
.isLessThan(Duration.ofSeconds(10));
assertThat(h2ParentChannelIdAfterDelay)
.as("H2 parent NioSocketChannel should survive ReadTimeoutException on Http2StreamChannel")
@@ -757,4 +790,306 @@ public void parentChannelSurvivesE2ECancelWithoutReadTimeout() throws Exception
.as("H2 stream channels are never reused (RFC 9113 §5.1.1) — stream ID should differ from warmup")
.isNotEqualTo(warmupStreamChannelId);
}
+
+ // ========================================================================
+ // Connection Max Lifetime Tests
+ // ========================================================================
+
+ /**
+ * Proves that a connection is rotated after maxLifeTime expires.
+ * Sets COSMOS.HTTP_CONNECTION_MAX_LIFETIME_IN_SECONDS=15 (short lifetime for testing).
+ * Establishes a connection, captures parentChannelId, waits for the lifetime + background
+ * sweep interval to elapse, then performs another read and asserts the parentChannelId changed.
+ */
+ @Test(groups = {TEST_GROUP}, timeOut = TEST_TIMEOUT)
+ public void connectionRotatedAfterMaxLifetimeExpiry() throws Exception {
+ System.setProperty("COSMOS.HTTP_CONNECTION_MAX_LIFETIME_IN_SECONDS", "15");
+ try {
+ safeClose(this.client);
+ this.client = getClientBuilder().buildAsyncClient();
+ this.cosmosAsyncContainer = getSharedMultiPartitionCosmosContainerWithIdAsPartitionKey(this.client);
+
+ String initialParentChannelId = establishH2ConnectionAndGetParentChannelId();
+ logger.info("Initial parentChannelId: {}", initialParentChannelId);
+
+ long startTime = System.currentTimeMillis();
+ long waitMs = 50_000;
+ String latestParentChannelId = initialParentChannelId;
+
+ while (System.currentTimeMillis() - startTime < waitMs) {
+ Thread.sleep(5_000);
+ latestParentChannelId = readAndGetParentChannelId();
+ logger.info("Elapsed={}s parentChannelId={} (changed={})",
+ (System.currentTimeMillis() - startTime) / 1000,
+ latestParentChannelId,
+ !latestParentChannelId.equals(initialParentChannelId));
+ if (!latestParentChannelId.equals(initialParentChannelId)) {
+ break;
+ }
+ }
+
+ logger.info("RESULT: initial={}, final={}, ROTATED={}",
+ initialParentChannelId, latestParentChannelId,
+ !initialParentChannelId.equals(latestParentChannelId));
+ assertThat(latestParentChannelId)
+ .as("After max lifetime (15s + jitter), connection should be rotated to a new parentChannelId")
+ .isNotEqualTo(initialParentChannelId);
+ } finally {
+ System.clearProperty("COSMOS.HTTP_CONNECTION_MAX_LIFETIME_IN_SECONDS");
+ }
+ }
+
+ /**
+ * Proves that per-connection jitter staggers eviction — not all connections expire at once.
+ * Creates multiple H2 parent connections via concurrent requests, sets a short maxLifeTime (15s),
+ * then observes that connections are evicted at different times.
+ */
+ @Test(groups = {TEST_GROUP}, timeOut = TEST_TIMEOUT)
+ public void perConnectionJitterStaggersEviction() throws Exception {
+ System.setProperty("COSMOS.HTTP_CONNECTION_MAX_LIFETIME_IN_SECONDS", "15");
+ try {
+ safeClose(this.client);
+ this.client = getClientBuilder().buildAsyncClient();
+ this.cosmosAsyncContainer = getSharedMultiPartitionCosmosContainerWithIdAsPartitionKey(this.client);
+
+ int concurrentRequests = 100;
+ Set initialParentChannelIds = ConcurrentHashMap.newKeySet();
+
+ for (int wave = 0; wave < 3; wave++) {
+ Flux.range(0, concurrentRequests)
+ .flatMap(i -> this.cosmosAsyncContainer.readItem(
+ seedItem.getId(), new PartitionKey(seedItem.getId()), TestObject.class)
+ .doOnSuccess(response -> {
+ try {
+ String parentId = extractParentChannelId(response.getDiagnostics());
+ if (parentId != null) {
+ initialParentChannelIds.add(parentId);
+ }
+ } catch (Exception e) {
+ logger.warn("Failed to extract parentChannelId", e);
+ }
+ }), concurrentRequests)
+ .collectList()
+ .block();
+ if (initialParentChannelIds.size() > 1) {
+ break;
+ }
+ }
+
+ logger.info("Initial parent channels: {} (count={})", initialParentChannelIds, initialParentChannelIds.size());
+ assertThat(initialParentChannelIds)
+ .as("Concurrent reads should create multiple parent H2 channels")
+ .hasSizeGreaterThan(1);
+
+ Thread.sleep(20_000);
+
+ Set midpointParentChannelIds = ConcurrentHashMap.newKeySet();
+ Flux.range(0, concurrentRequests)
+ .flatMap(i -> this.cosmosAsyncContainer.readItem(
+ seedItem.getId(), new PartitionKey(seedItem.getId()), TestObject.class)
+ .doOnSuccess(response -> {
+ try {
+ String parentId = extractParentChannelId(response.getDiagnostics());
+ if (parentId != null) {
+ midpointParentChannelIds.add(parentId);
+ }
+ } catch (Exception e) {
+ logger.warn("Failed to extract parentChannelId", e);
+ }
+ }), concurrentRequests)
+ .collectList()
+ .block();
+
+ Set survivedChannels = new HashSet<>(initialParentChannelIds);
+ survivedChannels.retainAll(midpointParentChannelIds);
+ Set newChannels = new HashSet<>(midpointParentChannelIds);
+ newChannels.removeAll(initialParentChannelIds);
+
+ logger.info("RESULT: initial={} (count={}), midpoint={} (count={}), survived={}, new={}",
+ initialParentChannelIds, initialParentChannelIds.size(),
+ midpointParentChannelIds, midpointParentChannelIds.size(),
+ survivedChannels, newChannels);
+
+ assertThat(midpointParentChannelIds)
+ .as("Pool should still be functional at midpoint")
+ .isNotEmpty();
+ } finally {
+ System.clearProperty("COSMOS.HTTP_CONNECTION_MAX_LIFETIME_IN_SECONDS");
+ }
+ }
+
+ /**
+ * Proves that when a connection is silently degraded (packets dropped, no TCP RST),
+ * the PING health check detects the degradation (no ACK received within timeout),
+ * the eviction predicate evicts the connection, and the next request succeeds on a new connection.
+ *
+ * Configuration:
+ * - Max lifetime = 600s (intentionally HIGH — we don't want lifetime to trigger eviction)
+ * - PING interval = 3s (send probes frequently)
+ * - PING ACK timeout = 10s (short — evict quickly when ACKs stop arriving)
+ * - Blackhole duration = 25s (PING ACK timeout 10s + background sweep 5s + margin)
+ */
+ @Test(groups = {TEST_GROUP}, timeOut = TEST_TIMEOUT)
+ public void degradedConnectionEvictedByPingHealthCheck() throws Exception {
+ // High max lifetime so it can't trigger eviction — only PING staleness should evict
+ System.setProperty("COSMOS.HTTP_CONNECTION_MAX_LIFETIME_IN_SECONDS", "600");
+ System.setProperty("COSMOS.HTTP2_PING_INTERVAL_IN_SECONDS", "3");
+ System.setProperty("COSMOS.HTTP2_PING_ACK_TIMEOUT_IN_SECONDS", "10");
+ System.setProperty("COSMOS.HTTP2_ENABLED", "true");
+ try {
+ safeClose(this.client);
+ this.client = getClientBuilder().buildAsyncClient();
+ this.cosmosAsyncContainer = getSharedMultiPartitionCosmosContainerWithIdAsPartitionKey(this.client);
+
+ String initialParentChannelId = establishH2ConnectionAndGetParentChannelId();
+ logger.info("Initial parentChannelId: {}", initialParentChannelId);
+
+ // Blackhole traffic — PINGs sent but no ACKs return
+ addPacketDrop();
+ logger.info("Waiting 25s for PING ACK timeout (10s) + background sweep (5s) + margin...");
+ Thread.sleep(25_000);
+ removePacketDrop();
+ Thread.sleep(2_000);
+
+ CosmosEndToEndOperationLatencyPolicyConfig e2ePolicy =
+ new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(30)).build();
+ CosmosItemRequestOptions opts = new CosmosItemRequestOptions();
+ opts.setCosmosEndToEndOperationLatencyPolicyConfig(e2ePolicy);
+
+ CosmosItemResponse response = this.cosmosAsyncContainer.readItem(
+ seedItem.getId(), new PartitionKey(seedItem.getId()), opts, TestObject.class).block();
+
+ assertThat(response).as("Recovery read must succeed").isNotNull();
+ assertThat(response.getStatusCode()).as("Recovery read status code").isEqualTo(200);
+
+ String recoveryParentChannelId = extractParentChannelId(response.getDiagnostics());
+ logger.info("RESULT: initial={}, recovery={}, ROTATED={}",
+ initialParentChannelId, recoveryParentChannelId,
+ !initialParentChannelId.equals(recoveryParentChannelId));
+
+ assertThat(recoveryParentChannelId)
+ .as("Recovery read must use a new parentChannelId — degraded connection evicted by PING health check")
+ .isNotNull()
+ .isNotEmpty()
+ .isNotEqualTo(initialParentChannelId);
+ } finally {
+ removePacketDrop();
+ System.clearProperty("COSMOS.HTTP_CONNECTION_MAX_LIFETIME_IN_SECONDS");
+ System.clearProperty("COSMOS.HTTP2_PING_INTERVAL_IN_SECONDS");
+ System.clearProperty("COSMOS.HTTP2_PING_ACK_TIMEOUT_IN_SECONDS");
+ System.clearProperty("COSMOS.HTTP2_ENABLED");
+ }
+ }
+
+ /**
+ * Proves that when a connection exceeds its jittered max lifetime AND the network is healthy
+ * (PING ACKs are still arriving), the max lifetime eviction still triggers.
+ * This is the safety-net — connections shouldn't live forever even if PINGs succeed.
+ */
+ @Test(groups = {TEST_GROUP}, timeOut = TEST_TIMEOUT)
+ public void connectionEvictedAfterMaxLifetimeEvenWithHealthyPings() throws Exception {
+ System.setProperty("COSMOS.HTTP_CONNECTION_MAX_LIFETIME_IN_SECONDS", "15");
+ System.setProperty("COSMOS.HTTP2_PING_INTERVAL_IN_SECONDS", "3");
+ System.setProperty("COSMOS.HTTP2_PING_ACK_TIMEOUT_IN_SECONDS", "60");
+ try {
+ safeClose(this.client);
+ this.client = getClientBuilder().buildAsyncClient();
+ this.cosmosAsyncContainer = getSharedMultiPartitionCosmosContainerWithIdAsPartitionKey(this.client);
+
+ String initialParentChannelId = establishH2ConnectionAndGetParentChannelId();
+ logger.info("Initial parentChannelId: {}", initialParentChannelId);
+
+ // No blackhole — PINGs succeed. Just wait for max lifetime (15s + jitter + sweep margin)
+ logger.info("Waiting 50s for max lifetime (15s) + jitter (up to 30s) + background sweep...");
+ Thread.sleep(50_000);
+
+ CosmosEndToEndOperationLatencyPolicyConfig e2ePolicy =
+ new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(30)).build();
+ CosmosItemRequestOptions opts = new CosmosItemRequestOptions();
+ opts.setCosmosEndToEndOperationLatencyPolicyConfig(e2ePolicy);
+
+ CosmosItemResponse response = this.cosmosAsyncContainer.readItem(
+ seedItem.getId(), new PartitionKey(seedItem.getId()), opts, TestObject.class).block();
+
+ assertThat(response).as("Recovery read must succeed").isNotNull();
+ assertThat(response.getStatusCode()).as("Recovery read status code").isEqualTo(200);
+
+ String recoveryParentChannelId = extractParentChannelId(response.getDiagnostics());
+ logger.info("RESULT: initial={}, recovery={}, ROTATED={}",
+ initialParentChannelId, recoveryParentChannelId,
+ !initialParentChannelId.equals(recoveryParentChannelId));
+
+ assertThat(recoveryParentChannelId)
+ .as("Recovery read must use a new parentChannelId — max lifetime eviction still works with healthy PINGs")
+ .isNotNull()
+ .isNotEmpty()
+ .isNotEqualTo(initialParentChannelId);
+ } finally {
+ System.clearProperty("COSMOS.HTTP_CONNECTION_MAX_LIFETIME_IN_SECONDS");
+ System.clearProperty("COSMOS.HTTP2_PING_INTERVAL_IN_SECONDS");
+ System.clearProperty("COSMOS.HTTP2_PING_ACK_TIMEOUT_IN_SECONDS");
+ }
+ }
+
+ // ========================================================================
+ // iptables helpers for silent degradation (packet drop, no RST)
+ // ========================================================================
+
+ private void addPacketDrop() {
+ String cmd = String.format("%siptables -A OUTPUT -p tcp --dport 10250 -j DROP", sudoPrefix);
+ logger.info(">>> Adding packet drop: {}", cmd);
+ try {
+ Process p = Runtime.getRuntime().exec(new String[]{"sh", "-c", cmd});
+ int exit = p.waitFor();
+ if (exit != 0) {
+ try (BufferedReader err = new BufferedReader(new InputStreamReader(p.getErrorStream()))) {
+ String errMsg = err.readLine();
+ fail("iptables add failed (exit=" + exit + "): " + errMsg);
+ }
+ } else {
+ logger.info(">>> Packet drop active on port 10250");
+ }
+ } catch (Exception e) {
+ logger.error("Failed to add packet drop", e);
+ fail("Could not add packet drop via iptables: " + e.getMessage());
+ }
+ }
+
+ private void removePacketDrop() {
+ String cmd = String.format("%siptables -D OUTPUT -p tcp --dport 10250 -j DROP", sudoPrefix);
+ logger.info(">>> Removing packet drop: {}", cmd);
+ try {
+ Process p = Runtime.getRuntime().exec(new String[]{"sh", "-c", cmd});
+ int exit = p.waitFor();
+ if (exit == 0) {
+ logger.info(">>> Packet drop removed");
+ } else {
+ logger.warn("iptables del returned exit={} (may already be removed)", exit);
+ }
+ } catch (Exception e) {
+ logger.warn("Failed to remove packet drop: {}", e.getMessage());
+ }
+ }
+
+ /**
+ * Detects the default-route network interface.
+ * In Docker this is typically eth0. On CI VMs it may be eth0, ens5, etc.
+ * Falls back to "eth0" if detection fails.
+ */
+ private static String detectNetworkInterface() {
+ try {
+ Process p = Runtime.getRuntime().exec(new String[]{"sh", "-c",
+ "ip route show default | awk '{print $5}' | head -1"});
+ p.waitFor();
+ try (BufferedReader reader = new BufferedReader(new InputStreamReader(p.getInputStream()))) {
+ String iface = reader.readLine();
+ if (iface != null && !iface.isEmpty() && !iface.contains("@")) {
+ return iface.trim();
+ }
+ }
+ } catch (Exception e) {
+ // fall through
+ }
+ return "eth0";
+ }
}
diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java
index bcdd41b615fb..0437c42348b5 100644
--- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java
+++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java
@@ -297,7 +297,7 @@ public CosmosAsyncDatabase getDatabase(String id) {
@BeforeSuite(groups = {"thinclient", "fast", "long", "direct", "multi-region", "multi-master", "flaky-multi-master", "emulator",
"emulator-vnext", "split", "query", "cfp-split", "circuit-breaker-misc-gateway", "circuit-breaker-misc-direct",
- "circuit-breaker-read-all-read-many", "fi-multi-master", "long-emulator", "fi-thinclient-multi-region", "fi-thinclient-multi-master", "multi-region-strong"}, timeOut = SUITE_SETUP_TIMEOUT)
+ "circuit-breaker-read-all-read-many", "fi-multi-master", "long-emulator", "fi-thinclient-multi-region", "fi-thinclient-multi-master", "multi-region-strong", "manual-http-network-fault"}, timeOut = SUITE_SETUP_TIMEOUT)
public void beforeSuite() {
logger.info("beforeSuite Started");
diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/resources/manual-http-network-fault-testng.xml b/sdk/cosmos/azure-cosmos-tests/src/test/resources/manual-http-network-fault-testng.xml
new file mode 100644
index 000000000000..b6127237e422
--- /dev/null
+++ b/sdk/cosmos/azure-cosmos-tests/src/test/resources/manual-http-network-fault-testng.xml
@@ -0,0 +1,16 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java
index d8380bd94fff..2d14f40382b9 100644
--- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java
@@ -141,6 +141,20 @@ public class Configs {
private static final Duration MAX_IDLE_CONNECTION_TIMEOUT = Duration.ofSeconds(60);
private static final Duration CONNECTION_ACQUIRE_TIMEOUT = Duration.ofSeconds(45);
private static final String REACTOR_NETTY_CONNECTION_POOL_NAME = "reactor-netty-connection-pool";
+
+ // HTTP connection max lifetime — forces periodic connection rotation for DNS re-resolution and load redistribution.
+ private static final int DEFAULT_HTTP_CONNECTION_MAX_LIFETIME_IN_SECONDS = 300; // 5 minutes
+ private static final String HTTP_CONNECTION_MAX_LIFETIME_IN_SECONDS = "COSMOS.HTTP_CONNECTION_MAX_LIFETIME_IN_SECONDS";
+ public static final int HTTP_CONNECTION_MAX_LIFETIME_JITTER_IN_SECONDS = 30;
+
+ // HTTP/2 PING health check — detects silently degraded connections (packet black-hole, half-open TCP).
+ // Interval: how often to send PING frames on each parent H2 connection.
+ // Timeout: if no PING ACK is received within this duration, the connection is considered unhealthy.
+ private static final int DEFAULT_HTTP2_PING_INTERVAL_IN_SECONDS = 10;
+ private static final String HTTP2_PING_INTERVAL_IN_SECONDS = "COSMOS.HTTP2_PING_INTERVAL_IN_SECONDS";
+ private static final int DEFAULT_HTTP2_PING_ACK_TIMEOUT_IN_SECONDS = 30;
+ private static final String HTTP2_PING_ACK_TIMEOUT_IN_SECONDS = "COSMOS.HTTP2_PING_ACK_TIMEOUT_IN_SECONDS";
+
private static final int DEFAULT_HTTP_RESPONSE_TIMEOUT_IN_SECONDS = 60;
private static final int DEFAULT_QUERY_PLAN_RESPONSE_TIMEOUT_IN_SECONDS = 5;
private static final int DEFAULT_ADDRESS_REFRESH_RESPONSE_TIMEOUT_IN_SECONDS = 5;
@@ -639,6 +653,24 @@ public static int getHttpResponseTimeoutInSeconds() {
return getJVMConfigAsInt(HTTP_RESPONSE_TIMEOUT_IN_SECONDS, DEFAULT_HTTP_RESPONSE_TIMEOUT_IN_SECONDS);
}
+ public static int getHttpConnectionMaxLifetimeInSeconds() {
+ return getJVMConfigAsInt(
+ HTTP_CONNECTION_MAX_LIFETIME_IN_SECONDS,
+ DEFAULT_HTTP_CONNECTION_MAX_LIFETIME_IN_SECONDS);
+ }
+
+ public static int getHttp2PingIntervalInSeconds() {
+ return getJVMConfigAsInt(
+ HTTP2_PING_INTERVAL_IN_SECONDS,
+ DEFAULT_HTTP2_PING_INTERVAL_IN_SECONDS);
+ }
+
+ public static int getHttp2PingAckTimeoutInSeconds() {
+ return getJVMConfigAsInt(
+ HTTP2_PING_ACK_TIMEOUT_IN_SECONDS,
+ DEFAULT_HTTP2_PING_ACK_TIMEOUT_IN_SECONDS);
+ }
+
public static int getQueryPlanResponseTimeoutInSeconds() {
return getJVMConfigAsInt(QUERY_PLAN_RESPONSE_TIMEOUT_IN_SECONDS, DEFAULT_QUERY_PLAN_RESPONSE_TIMEOUT_IN_SECONDS);
}
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/Http2PingHealthHandler.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/Http2PingHealthHandler.java
new file mode 100644
index 000000000000..84ce2066ab42
--- /dev/null
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/Http2PingHealthHandler.java
@@ -0,0 +1,186 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+package com.azure.cosmos.implementation.http;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http2.DefaultHttp2PingFrame;
+import io.netty.handler.codec.http2.Http2PingFrame;
+import io.netty.util.AttributeKey;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * HTTP/2 PING-based health checker for parent HTTP/2 connections.
+ *
+ * Installed on the parent TCP channel (not child H2 streams). Periodically sends
+ * HTTP/2 PING frames and tracks last ACK timestamp. The connection pool's
+ * eviction predicate reads {@link #LAST_PING_ACK_NANOS} to determine liveness.
+ *
+ * Lifecycle: one instance per parent H2 connection. The handler is guarded by
+ * {@link #HANDLER_INSTALLED} to prevent duplicate installation when multiple
+ * child streams are opened on the same parent.
+ */
+public class Http2PingHealthHandler extends ChannelDuplexHandler {
+
+ private static final Logger logger = LoggerFactory.getLogger(Http2PingHealthHandler.class);
+
+ /**
+ * Nano timestamp of the last PING ACK received on this parent channel.
+ * Updated ONLY when an Http2PingFrame with ack=true arrives — NOT on arbitrary reads.
+ * Http2FrameCodec propagates PING ACK frames to downstream handlers (addLast position).
+ * When the network is blackholed, no PING ACKs arrive, this goes stale, eviction triggers.
+ */
+ public static final AttributeKey LAST_PING_ACK_NANOS =
+ AttributeKey.valueOf("cosmos.h2.lastPingAckNanos");
+
+ /**
+ * Guard attribute to prevent duplicate handler installation on the same parent channel.
+ */
+ static final AttributeKey HANDLER_INSTALLED =
+ AttributeKey.valueOf("cosmos.h2.pingHealthInstalled");
+
+ static final String HANDLER_NAME = "cosmos.h2PingHealth";
+
+ private final long pingIntervalMs;
+ private final long pingContent;
+ private ScheduledFuture> pingTask;
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
+ /**
+ * @param pingIntervalMs interval between PING frames in milliseconds
+ */
+ public Http2PingHealthHandler(long pingIntervalMs) {
+ this.pingIntervalMs = pingIntervalMs;
+ // Fixed PING payload — readable as "cosmos" in hex
+ this.pingContent = 0xC0_5D_B0_01L;
+ }
+
+ @Override
+ public void handlerAdded(ChannelHandlerContext ctx) {
+ // Seed the last-ack timestamp so the eviction predicate doesn't immediately
+ // consider a brand-new connection as "PING-stale"
+ ctx.channel().attr(LAST_PING_ACK_NANOS).set(System.nanoTime());
+
+ // The handler is installed from a child stream's doOnConnected, so the parent
+ // channel is already active — channelActive() won't fire. Start the schedule now.
+ if (ctx.channel().isActive()) {
+ schedulePing(ctx);
+ }
+ }
+
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ // Fallback: if handler is added before the channel becomes active (unlikely
+ // with the current parent-install pattern, but correct for completeness)
+ schedulePing(ctx);
+ super.channelActive(ctx);
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ cancelPing();
+ super.channelInactive(ctx);
+ }
+
+ @Override
+ public void handlerRemoved(ChannelHandlerContext ctx) {
+ cancelPing();
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ if (msg instanceof Http2PingFrame) {
+ Http2PingFrame pingFrame = (Http2PingFrame) msg;
+ if (pingFrame.ack()) {
+ ctx.channel().attr(LAST_PING_ACK_NANOS).set(System.nanoTime());
+ if (logger.isDebugEnabled()) {
+ logger.debug("HTTP/2 PING ACK received on channel {}",
+ ctx.channel().id().asShortText());
+ }
+ }
+ }
+ // Always propagate — don't consume frames
+ super.channelRead(ctx, msg);
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ logger.warn("Http2PingHealthHandler error on channel {}: {}",
+ ctx.channel().id().asShortText(), cause.getMessage());
+ ctx.fireExceptionCaught(cause);
+ }
+
+ private void schedulePing(ChannelHandlerContext ctx) {
+ if (closed.get() || !ctx.channel().isActive() || this.pingTask != null) {
+ return;
+ }
+ // Use the channel's event loop to avoid threading issues
+ this.pingTask = ctx.channel().eventLoop().scheduleAtFixedRate(
+ () -> sendPing(ctx),
+ pingIntervalMs,
+ pingIntervalMs,
+ TimeUnit.MILLISECONDS
+ );
+ }
+
+ private void sendPing(ChannelHandlerContext ctx) {
+ if (!ctx.channel().isActive()) {
+ cancelPing();
+ return;
+ }
+ DefaultHttp2PingFrame pingFrame = new DefaultHttp2PingFrame(pingContent, false);
+ // Use channel().writeAndFlush() — NOT ctx.writeAndFlush().
+ // Our handler is at addLast (after Http2FrameCodec). ctx.writeAndFlush() sends outbound
+ // from our position toward the network, BYPASSING the codec (frames aren't encoded).
+ // channel().writeAndFlush() starts from the pipeline tail, going through ALL handlers
+ // including Http2FrameCodec which encodes the PingFrame to HTTP/2 binary wire format.
+ ctx.channel().writeAndFlush(pingFrame).addListener(future -> {
+ if (!future.isSuccess()) {
+ logger.debug("HTTP/2 PING send failed on channel {}: {}",
+ ctx.channel().id().asShortText(),
+ future.cause() != null ? future.cause().getMessage() : "unknown");
+ } else if (logger.isTraceEnabled()) {
+ logger.trace("HTTP/2 PING sent on channel {}", ctx.channel().id().asShortText());
+ }
+ });
+ }
+
+ private void cancelPing() {
+ if (closed.compareAndSet(false, true) && this.pingTask != null) {
+ this.pingTask.cancel(false);
+ }
+ }
+
+ /**
+ * Installs this handler on the parent H2 channel if not already installed.
+ * Safe to call from any child stream's doOnConnected callback — will navigate
+ * to the parent channel and install exactly once.
+ *
+ * @param childChannel the child H2 stream channel from doOnConnected
+ * @param pingIntervalMs PING interval in milliseconds
+ */
+ public static void installOnParentIfAbsent(Channel channel, long pingIntervalMs) {
+ // In reactor-netty H2 mode, doOnConnected fires for the PARENT TCP channel
+ // (not child streams). channel.parent() is null because we're already on the parent.
+ // For child streams (if they ever fire), navigate to parent.
+ Channel targetChannel = channel.parent() != null ? channel.parent() : channel;
+
+ // Atomic check-and-set to prevent duplicate installation from concurrent doOnConnected callbacks
+ if (Boolean.TRUE.equals(targetChannel.attr(HANDLER_INSTALLED).getAndSet(Boolean.TRUE))) {
+ return; // already installed
+ }
+
+ targetChannel.pipeline().addLast(HANDLER_NAME, new Http2PingHealthHandler(pingIntervalMs));
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Installed Http2PingHealthHandler on channel {} with {}ms interval",
+ targetChannel.id().asShortText(), pingIntervalMs);
+ }
+ }
+}
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpClient.java
index 4f1830908623..46a34aaabfe4 100644
--- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpClient.java
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpClient.java
@@ -5,11 +5,13 @@
import com.azure.cosmos.Http2ConnectionConfig;
import com.azure.cosmos.implementation.Configs;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
+import io.netty.channel.Channel;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.Http2AllocationStrategy;
import reactor.netty.resources.ConnectionProvider;
import java.time.Duration;
+import java.util.concurrent.ThreadLocalRandom;
/**
* A generic interface for sending HTTP requests and getting responses.
@@ -53,6 +55,51 @@ static HttpClient createFixed(HttpClientConfig httpClientConfig) {
}
fixedConnectionProviderBuilder.pendingAcquireTimeout(httpClientConfig.getConnectionAcquireTimeout());
fixedConnectionProviderBuilder.maxIdleTime(httpClientConfig.getMaxIdleConnectionTimeout());
+
+ int maxLifetimeSeconds = Configs.getHttpConnectionMaxLifetimeInSeconds();
+ int pingAckTimeoutSeconds = Configs.getHttp2PingAckTimeoutInSeconds();
+ if (maxLifetimeSeconds > 0 || pingAckTimeoutSeconds > 0) {
+ long baseMaxLifeMs = maxLifetimeSeconds > 0 ? maxLifetimeSeconds * 1000L : 0;
+ int jitterRangeSeconds = Configs.HTTP_CONNECTION_MAX_LIFETIME_JITTER_IN_SECONDS; // [1, jitterRange] seconds
+ long maxIdleTimeMs = httpClientConfig.getMaxIdleConnectionTimeout().toMillis();
+ long pingAckTimeoutNanos = pingAckTimeoutSeconds > 0 ? pingAckTimeoutSeconds * 1_000_000_000L : 0;
+
+ fixedConnectionProviderBuilder.evictionPredicate((connection, metadata) -> {
+ // Phase 0: Dead channel — always evict
+ if (!connection.channel().isActive()) {
+ return true;
+ }
+
+ // Phase 1: Idle timeout — unchanged from default behavior
+ if (maxIdleTimeMs > 0 && metadata.idleTime() > maxIdleTimeMs) {
+ return true;
+ }
+
+ // Phase 2: PING liveness — if PING ACK is stale, connection is silently degraded
+ if (pingAckTimeoutNanos > 0) {
+ Channel parentChannel = connection.channel();
+ Long lastAckNanos = parentChannel.hasAttr(Http2PingHealthHandler.LAST_PING_ACK_NANOS)
+ ? parentChannel.attr(Http2PingHealthHandler.LAST_PING_ACK_NANOS).get()
+ : null;
+ if (lastAckNanos != null && System.nanoTime() - lastAckNanos > pingAckTimeoutNanos) {
+ return true;
+ }
+ }
+
+ // Phase 3: Lifetime with jitter — connection exceeded its jittered max lifetime
+ // Jitter is per-evaluation with 1s granularity in [1s, jitterRange]. ThreadLocalRandom
+ // is lock-free and safe for concurrent eviction sweeps across event loops.
+ if (baseMaxLifeMs > 0) {
+ int connJitterMs = ThreadLocalRandom.current().nextInt(1, jitterRangeSeconds + 1) * 1000;
+ return metadata.lifeTime() > (baseMaxLifeMs + connJitterMs);
+ }
+
+ return false;
+ });
+
+ fixedConnectionProviderBuilder.evictInBackground(Duration.ofSeconds(5));
+ }
+
if (Configs.isNettyHttpClientMetricsEnabled()) {
fixedConnectionProviderBuilder.metrics(true);
}
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ReactorNettyClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ReactorNettyClient.java
index 3e7f763caeb8..307ba356513a 100644
--- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ReactorNettyClient.java
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ReactorNettyClient.java
@@ -164,6 +164,15 @@ private void configureChannelPipelineHandlers() {
"customHeaderCleaner",
new Http2ResponseHeaderCleanerHandler());
}
+
+ // Install HTTP/2 PING health checker on the parent TCP channel.
+ // doOnConnected fires for the parent H2 connection; installOnParentIfAbsent
+ // ensures the handler is added exactly once per parent connection.
+ int pingIntervalSeconds = Configs.getHttp2PingIntervalInSeconds();
+ if (pingIntervalSeconds > 0) {
+ Http2PingHealthHandler.installOnParentIfAbsent(
+ connection.channel(), pingIntervalSeconds * 1000L);
+ }
}));
}
}
diff --git a/sdk/cosmos/live-http-network-fault-platform-matrix.json b/sdk/cosmos/live-http-network-fault-platform-matrix.json
new file mode 100644
index 000000000000..05efd056736d
--- /dev/null
+++ b/sdk/cosmos/live-http-network-fault-platform-matrix.json
@@ -0,0 +1,18 @@
+{
+ "displayNames": {
+ "-Pmanual-http-network-fault": "HttpNetworkFault",
+ "Session": "",
+ "ubuntu": ""
+ },
+ "include": [
+ {
+ "DESIRED_CONSISTENCIES": "[\"Session\"]",
+ "ACCOUNT_CONSISTENCY": "Session",
+ "ArmTemplateParameters": "@{ enableMultipleWriteLocations = $false; defaultConsistencyLevel = 'Session' }",
+ "ProfileFlag": [ "-Pmanual-http-network-fault" ],
+ "Agent": {
+ "ubuntu": { "OSVmImage": "env:LINUXVMIMAGE", "Pool": "env:LINUXPOOL" }
+ }
+ }
+ ]
+}
diff --git a/sdk/cosmos/tests.yml b/sdk/cosmos/tests.yml
index 69d782fcc9a0..1a146454ead3 100644
--- a/sdk/cosmos/tests.yml
+++ b/sdk/cosmos/tests.yml
@@ -163,6 +163,48 @@ extends:
- name: AdditionalArgs
value: '-DCOSMOS.CLIENT_LEAK_DETECTION_ENABLED=true -DACCOUNT_HOST=$(thin-client-canary-multi-writer-session-endpoint) -DACCOUNT_KEY=$(thin-client-canary-multi-writer-session-key) -DCOSMOS.THINCLIENT_ENABLED=true'
+ # Network fault injection tests (tc netem, iptables) — runs natively on Linux CI VMs.
+ # PreSteps ensure iproute2 (tc) and iptables are installed and sch_netem kernel module is loaded.
+ # Tests use sudo for tc/iptables commands. Fail-fast if tc is not available.
+ # Tests run sequentially (MaxParallel: 1) to avoid tc/iptables interference between tests.
+ - template: /eng/pipelines/templates/stages/archetype-sdk-tests-isolated.yml
+ parameters:
+ TestName: 'Cosmos_Live_Test_HttpNetworkFault'
+ CloudConfig:
+ Public:
+ ServiceConnection: azure-sdk-tests-cosmos
+ PreSteps:
+ - script: |
+ sudo apt-get update -qq && sudo apt-get install -y -qq iproute2 iptables
+ sudo modprobe sch_netem || true
+ tc -Version && echo "tc available" || echo "tc not found"
+ displayName: 'Install tc (iproute2) and iptables for network fault injection'
+ MatrixConfigs:
+ - Name: Cosmos_live_test_http_network_fault
+ Path: sdk/cosmos/live-http-network-fault-platform-matrix.json
+ Selection: all
+ GenerateVMJobs: true
+ MatrixReplace:
+ - .*Version=1.2(1|5)/1.17
+ ServiceDirectory: cosmos
+ Artifacts:
+ - name: azure-cosmos
+ groupId: com.azure
+ safeName: azurecosmos
+ AdditionalModules:
+ - name: azure-cosmos-tests
+ groupId: com.azure
+ - name: azure-cosmos-benchmark
+ groupId: com.azure
+ TimeoutInMinutes: 30
+ MaxParallel: 1
+ TestGoals: 'verify'
+ TestOptions: '$(ProfileFlag) $(AdditionalArgs) -DskipCompile=true -DskipTestCompile=true -DcreateSourcesJar=false'
+ TestResultsFiles: '**/junitreports/TEST-*.xml'
+ AdditionalVariables:
+ - name: AdditionalArgs
+ value: '-DCOSMOS.CLIENT_LEAK_DETECTION_ENABLED=true -DACCOUNT_HOST=$(thinclient-test-endpoint) -DACCOUNT_KEY=$(thinclient-test-key) -DCOSMOS.THINCLIENT_ENABLED=true -DCOSMOS.HTTP2_ENABLED=true'
+
- template: /eng/pipelines/templates/stages/archetype-sdk-tests-isolated.yml
parameters:
TestName: 'Spring_Data_Cosmos_Integration'