diff --git a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/BackoffPolicy.java b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/BackoffPolicy.java index bca04a2a596c2..963b5ec82389c 100644 --- a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/BackoffPolicy.java +++ b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/BackoffPolicy.java @@ -26,7 +26,9 @@ /** * Backoff configuration for broker reconnection attempts. * - *
The delay for attempt {@code n} is {@code min(initialInterval * multiplier^(n-1), maxInterval)}. + *
The base delay for attempt {@code n} is {@code min(initialInterval * multiplier^(n-1), maxInterval)}. + * A symmetric random jitter of {@code ±jitterPercent/2} is applied to each delay (including the + * first one) to spread out concurrent retries. * *
Use {@link #fixed(Duration, Duration)} or {@link #exponential(Duration, Duration)} for * the common cases, or {@link #builder()} to configure all knobs explicitly. @@ -35,23 +37,31 @@ @ToString public final class BackoffPolicy { + /** Default jitter percentage applied when not explicitly specified. */ + public static final double DEFAULT_JITTER_PERCENT = 10.0; + private final Duration initialInterval; private final Duration maxInterval; private final double multiplier; + private final double jitterPercent; - private BackoffPolicy(Duration initialInterval, Duration maxInterval, double multiplier) { + private BackoffPolicy(Duration initialInterval, Duration maxInterval, double multiplier, double jitterPercent) { Objects.requireNonNull(initialInterval, "initialInterval must not be null"); Objects.requireNonNull(maxInterval, "maxInterval must not be null"); if (multiplier < 1.0) { throw new IllegalArgumentException("multiplier must be >= 1.0"); } + if (jitterPercent < 0 || jitterPercent > 100) { + throw new IllegalArgumentException("jitterPercent must be in [0, 100]"); + } this.initialInterval = initialInterval; this.maxInterval = maxInterval; this.multiplier = multiplier; + this.jitterPercent = jitterPercent; } /** - * @return the delay before the first reconnection attempt + * @return the base delay before the first reconnection attempt */ public Duration initialInterval() { return initialInterval; @@ -72,25 +82,33 @@ public double multiplier() { } /** - * Create a fixed backoff (no increase between retries). + * @return the symmetric jitter percentage applied to each delay; {@code 0} means no jitter + */ + public double jitterPercent() { + return jitterPercent; + } + + /** + * Create a fixed backoff (no increase between retries) with the default jitter. * - * @param initialInterval the constant delay between reconnection attempts + * @param initialInterval the constant base delay between reconnection attempts * @param maxInterval the maximum delay between reconnection attempts - * @return a {@link BackoffPolicy} with a multiplier of 1.0 + * @return a {@link BackoffPolicy} with a multiplier of 1.0 and the default jitter */ public static BackoffPolicy fixed(Duration initialInterval, Duration maxInterval) { - return new BackoffPolicy(initialInterval, maxInterval, 1.0); + return new BackoffPolicy(initialInterval, maxInterval, 1.0, DEFAULT_JITTER_PERCENT); } /** - * Create an exponential backoff with the given bounds and a default multiplier of 2. + * Create an exponential backoff with the given bounds, a default multiplier of 2 and the + * default jitter. * - * @param initialInterval the delay before the first reconnection attempt + * @param initialInterval the base delay before the first reconnection attempt * @param maxInterval the maximum delay between reconnection attempts - * @return a {@link BackoffPolicy} with a multiplier of 2.0 + * @return a {@link BackoffPolicy} with a multiplier of 2.0 and the default jitter */ public static BackoffPolicy exponential(Duration initialInterval, Duration maxInterval) { - return new BackoffPolicy(initialInterval, maxInterval, 2.0); + return new BackoffPolicy(initialInterval, maxInterval, 2.0, DEFAULT_JITTER_PERCENT); } /** @@ -107,6 +125,7 @@ public static final class Builder { private Duration initialInterval; private Duration maxInterval; private double multiplier = 2.0; + private double jitterPercent = DEFAULT_JITTER_PERCENT; private Builder() { } @@ -145,11 +164,25 @@ public Builder multiplier(double multiplier) { return this; } + /** + * Symmetric jitter percentage applied to each returned delay. The actual jitter is the + * base delay multiplied by a uniform random factor in + * {@code [1 - jitterPercent/200, 1 + jitterPercent/200)}. Default is {@code 10.0}; set to + * {@code 0} to disable jitter. + * + * @param jitterPercent the jitter percentage, must be in {@code [0, 100]} + * @return this builder + */ + public Builder jitterPercent(double jitterPercent) { + this.jitterPercent = jitterPercent; + return this; + } + /** * @return a new {@link BackoffPolicy} instance */ public BackoffPolicy build() { - return new BackoffPolicy(initialInterval, maxInterval, multiplier); + return new BackoffPolicy(initialInterval, maxInterval, multiplier, jitterPercent); } } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java index f53051e2c42c4..9d8b59db91008 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java @@ -111,7 +111,7 @@ public void testCorrectBackoffConfiguration() { ClientConfigurationData clientConfigurationData = new ClientConfigurationData(); Assert.assertEquals(backoff.getMax().toMillis(), TimeUnit.NANOSECONDS.toMillis(clientConfigurationData.getMaxBackoffIntervalNanos())); - Assert.assertEquals(backoff.next().toMillis(), + Assert.assertEquals(backoff.getInitial().toMillis(), TimeUnit.NANOSECONDS.toMillis(clientConfigurationData.getInitialBackoffIntervalNanos())); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/Backoff.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/Backoff.java index 5957ce86796ac..8bcc165dd6125 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/Backoff.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/Backoff.java @@ -28,8 +28,8 @@ * Exponential backoff with mandatory stop. * *
Delays start at {@code initialDelay} and double on every call to {@link #next()}, up to - * {@code maxBackoff}. A random jitter of up to 10% is subtracted from each value to avoid - * thundering-herd retries. + * {@code maxBackoff}. A symmetric random jitter of {@code ±jitterPercent/2} is applied to every + * returned value (including the first one) to avoid thundering-herd retries. * *
If a {@code mandatoryStop} duration is configured, the backoff tracks wall-clock time from the * first {@link #next()} call. Once the elapsed time plus the next delay would exceed the mandatory @@ -43,6 +43,7 @@ * .initialDelay(Duration.ofMillis(100)) * .maxBackoff(Duration.ofMinutes(1)) * .mandatoryStop(Duration.ofSeconds(30)) + * .jitterPercent(10.0) * .build(); * * Duration delay = backoff.next(); @@ -51,6 +52,7 @@ public class Backoff { private static final Duration DEFAULT_INITIAL_DELAY = Duration.ofMillis(100); private static final Duration DEFAULT_MAX_BACKOFF_INTERVAL = Duration.ofMinutes(1); + private static final double DEFAULT_JITTER_PERCENT = 10.0; private static final Random random = new Random(); @Getter @@ -59,6 +61,8 @@ public class Backoff { private final Duration max; @Getter private final Duration mandatoryStop; + @Getter + private final double jitterPercent; private final Clock clock; private Duration next; @@ -67,10 +71,11 @@ public class Backoff { @Getter private boolean mandatoryStopMade; - private Backoff(Duration initial, Duration max, Duration mandatoryStop, Clock clock) { + private Backoff(Duration initial, Duration max, Duration mandatoryStop, double jitterPercent, Clock clock) { this.initial = initial; this.max = max; this.mandatoryStop = mandatoryStop; + this.jitterPercent = jitterPercent; this.next = initial; this.clock = clock; this.firstBackoffTime = Instant.EPOCH; @@ -101,8 +106,10 @@ public static Builder builder() { /** * Returns the next backoff delay, advancing the internal state. * - *
The returned duration is never less than the initial delay and never more than the max - * backoff. A random jitter of up to 10% is subtracted to spread out concurrent retries. + *
The underlying delay starts at the initial delay and doubles on each call up to the max + * backoff. A symmetric jitter of {@code ±jitterPercent/2} is applied on every call (including + * the first one) to spread out concurrent retries; the returned value may therefore be slightly + * below the initial delay or slightly above the max backoff. * * @return the delay to wait before the next retry attempt */ @@ -130,13 +137,13 @@ public Duration next() { } } - // Randomly decrease the timeout up to 10% to avoid simultaneous retries long currentMillis = current.toMillis(); - if (currentMillis > 10) { - currentMillis -= random.nextInt((int) currentMillis / 10); + if (jitterPercent > 0 && currentMillis > 0) { + // Apply a symmetric jitter of ±jitterPercent/2 around the current delay. + double factor = 1.0 + (random.nextDouble() - 0.5) * (jitterPercent / 100.0); + currentMillis = Math.max(0L, Math.round(currentMillis * factor)); } - long initialMillis = initial.toMillis(); - return Duration.ofMillis(Math.max(initialMillis, currentMillis)); + return Duration.ofMillis(currentMillis); } /** @@ -162,12 +169,13 @@ public void reset() { /** * Builder for {@link Backoff}. * - *
Defaults: initial delay 100 ms, max backoff 1 min, no mandatory stop. + *
Defaults: initial delay 100 ms, max backoff 1 min, no mandatory stop, 10% jitter. */ public static class Builder { private Duration initialDelay = DEFAULT_INITIAL_DELAY; private Duration maxBackoff = DEFAULT_MAX_BACKOFF_INTERVAL; private Duration mandatoryStop = Duration.ZERO; + private double jitterPercent = DEFAULT_JITTER_PERCENT; private Clock clock = Clock.systemDefaultZone(); /** @@ -205,6 +213,24 @@ public Builder mandatoryStop(Duration mandatoryStop) { return this; } + /** + * Sets the jitter percentage applied to each returned delay. The actual jitter is symmetric: + * the returned value is multiplied by a uniform random factor in + * {@code [1 - jitterPercent/200, 1 + jitterPercent/200)}. Defaults to 10. Set to 0 to disable + * jitter. + * + * @param jitterPercent the jitter percentage, must be in {@code [0, 100]} + * @return this builder + * @throws IllegalArgumentException if {@code jitterPercent} is outside {@code [0, 100]} + */ + public Builder jitterPercent(double jitterPercent) { + if (jitterPercent < 0 || jitterPercent > 100) { + throw new IllegalArgumentException("jitterPercent must be in [0, 100]"); + } + this.jitterPercent = jitterPercent; + return this; + } + Builder clock(Clock clock) { this.clock = clock; return this; @@ -216,7 +242,7 @@ Builder clock(Clock clock) { * @return a new Backoff */ public Backoff build() { - return new Backoff(initialDelay, maxBackoff, mandatoryStop, clock); + return new Backoff(initialDelay, maxBackoff, mandatoryStop, jitterPercent, clock); } } } diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/BackoffTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/BackoffTest.java index 152ddee5156d0..41c2de63c1049 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/BackoffTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/BackoffTest.java @@ -29,11 +29,6 @@ import org.testng.annotations.Test; public class BackoffTest { - boolean withinTenPercentAndDecrementTimer(Backoff backoff, long t2) { - long t1 = backoff.next().toMillis(); - return (t1 >= t2 * 0.9 && t1 <= t2); - } - boolean checkExactAndDecrementTimer(Backoff backoff, long t2) { long t1 = backoff.next().toMillis(); return t1 == t2; @@ -45,12 +40,13 @@ public void mandatoryStopTestNegativeTest() { .initialDelay(Duration.ofMillis(100)) .maxBackoff(Duration.ofSeconds(60)) .mandatoryStop(Duration.ofMillis(1900)) + .jitterPercent(0) .build(); assertEquals(backoff.next().toMillis(), 100); backoff.next(); // 200 backoff.next(); // 400 backoff.next(); // 800 - assertFalse(withinTenPercentAndDecrementTimer(backoff, 400)); + assertFalse(checkExactAndDecrementTimer(backoff, 400)); } @Test @@ -64,6 +60,7 @@ public void firstBackoffTimerTest() { .initialDelay(Duration.ofMillis(100)) .maxBackoff(Duration.ofSeconds(60)) .mandatoryStop(Duration.ofMillis(1900)) + .jitterPercent(0) .clock(mockClock) .build(); @@ -83,10 +80,11 @@ public void basicTest() { .initialDelay(Duration.ofMillis(5)) .maxBackoff(Duration.ofSeconds(60)) .mandatoryStop(Duration.ofSeconds(60)) + .jitterPercent(0) .clock(mockClock) .build(); assertTrue(checkExactAndDecrementTimer(backoff, 5)); - assertTrue(withinTenPercentAndDecrementTimer(backoff, 10)); + assertTrue(checkExactAndDecrementTimer(backoff, 10)); backoff.reset(); assertTrue(checkExactAndDecrementTimer(backoff, 5)); } @@ -104,13 +102,14 @@ public void maxTest() { .initialDelay(Duration.ofMillis(5)) .maxBackoff(Duration.ofMillis(20)) .mandatoryStop(Duration.ofMillis(20)) + .jitterPercent(0) .clock(mockClock) .build(); assertTrue(checkExactAndDecrementTimer(backoff, 5)); - assertTrue(withinTenPercentAndDecrementTimer(backoff, 10)); - assertTrue(withinTenPercentAndDecrementTimer(backoff, 5)); - assertTrue(withinTenPercentAndDecrementTimer(backoff, 20)); + assertTrue(checkExactAndDecrementTimer(backoff, 10)); + assertTrue(checkExactAndDecrementTimer(backoff, 5)); + assertTrue(checkExactAndDecrementTimer(backoff, 20)); } @Test @@ -121,70 +120,126 @@ public void mandatoryStopTest() { .initialDelay(Duration.ofMillis(100)) .maxBackoff(Duration.ofSeconds(60)) .mandatoryStop(Duration.ofMillis(1900)) + .jitterPercent(0) .clock(mockClock) .build(); Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(0)); assertTrue(checkExactAndDecrementTimer(backoff, 100)); Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(100)); - assertTrue(withinTenPercentAndDecrementTimer(backoff, 200)); + assertTrue(checkExactAndDecrementTimer(backoff, 200)); Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(300)); - assertTrue(withinTenPercentAndDecrementTimer(backoff, 400)); + assertTrue(checkExactAndDecrementTimer(backoff, 400)); Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(700)); - assertTrue(withinTenPercentAndDecrementTimer(backoff, 800)); + assertTrue(checkExactAndDecrementTimer(backoff, 800)); Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(1500)); // would have been 1600 w/o the mandatory stop - assertTrue(withinTenPercentAndDecrementTimer(backoff, 400)); + assertTrue(checkExactAndDecrementTimer(backoff, 400)); assertTrue(backoff.isMandatoryStopMade()); Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(1900)); - assertTrue(withinTenPercentAndDecrementTimer(backoff, 3200)); + assertTrue(checkExactAndDecrementTimer(backoff, 3200)); Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(3200)); - assertTrue(withinTenPercentAndDecrementTimer(backoff, 6400)); + assertTrue(checkExactAndDecrementTimer(backoff, 6400)); Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(3200)); - assertTrue(withinTenPercentAndDecrementTimer(backoff, 12800)); + assertTrue(checkExactAndDecrementTimer(backoff, 12800)); Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(6400)); - assertTrue(withinTenPercentAndDecrementTimer(backoff, 25600)); + assertTrue(checkExactAndDecrementTimer(backoff, 25600)); Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(12800)); - assertTrue(withinTenPercentAndDecrementTimer(backoff, 51200)); + assertTrue(checkExactAndDecrementTimer(backoff, 51200)); Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(25600)); - assertTrue(withinTenPercentAndDecrementTimer(backoff, 60000)); + assertTrue(checkExactAndDecrementTimer(backoff, 60000)); Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(51200)); - assertTrue(withinTenPercentAndDecrementTimer(backoff, 60000)); + assertTrue(checkExactAndDecrementTimer(backoff, 60000)); Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(60000)); backoff.reset(); Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(0)); assertTrue(checkExactAndDecrementTimer(backoff, 100)); Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(100)); - assertTrue(withinTenPercentAndDecrementTimer(backoff, 200)); + assertTrue(checkExactAndDecrementTimer(backoff, 200)); Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(300)); - assertTrue(withinTenPercentAndDecrementTimer(backoff, 400)); + assertTrue(checkExactAndDecrementTimer(backoff, 400)); Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(700)); - assertTrue(withinTenPercentAndDecrementTimer(backoff, 800)); + assertTrue(checkExactAndDecrementTimer(backoff, 800)); Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(1500)); // would have been 1600 w/o the mandatory stop - assertTrue(withinTenPercentAndDecrementTimer(backoff, 400)); + assertTrue(checkExactAndDecrementTimer(backoff, 400)); backoff.reset(); Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(0)); assertTrue(checkExactAndDecrementTimer(backoff, 100)); Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(100)); - assertTrue(withinTenPercentAndDecrementTimer(backoff, 200)); + assertTrue(checkExactAndDecrementTimer(backoff, 200)); Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(300)); - assertTrue(withinTenPercentAndDecrementTimer(backoff, 400)); + assertTrue(checkExactAndDecrementTimer(backoff, 400)); Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(700)); - assertTrue(withinTenPercentAndDecrementTimer(backoff, 800)); + assertTrue(checkExactAndDecrementTimer(backoff, 800)); backoff.reset(); Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(0)); assertTrue(checkExactAndDecrementTimer(backoff, 100)); Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(100)); - assertTrue(withinTenPercentAndDecrementTimer(backoff, 200)); + assertTrue(checkExactAndDecrementTimer(backoff, 200)); Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(300)); - assertTrue(withinTenPercentAndDecrementTimer(backoff, 400)); + assertTrue(checkExactAndDecrementTimer(backoff, 400)); Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(700)); - assertTrue(withinTenPercentAndDecrementTimer(backoff, 800)); + assertTrue(checkExactAndDecrementTimer(backoff, 800)); + } + + @Test + public void jitterIsAppliedSymmetricallyOnFirstCall() { + // With jitterPercent=20, the returned delay is in [base*0.9, base*1.1). + // Verify that across many calls we observe values both below and above the base, including + // on the very first call after a reset. + Backoff backoff = Backoff.builder() + .initialDelay(Duration.ofMillis(1000)) + .maxBackoff(Duration.ofMillis(1000)) + .jitterPercent(20) + .build(); + + boolean sawBelow = false; + boolean sawAbove = false; + long min = Long.MAX_VALUE; + long max = Long.MIN_VALUE; + for (int i = 0; i < 500; i++) { + backoff.reset(); + long t = backoff.next().toMillis(); + assertTrue(t >= 900 && t <= 1100, "value out of range: " + t); + if (t < 1000) { + sawBelow = true; + } + if (t > 1000) { + sawAbove = true; + } + min = Math.min(min, t); + max = Math.max(max, t); + } + assertTrue(sawBelow, "expected at least one jittered value below base, min=" + min); + assertTrue(sawAbove, "expected at least one jittered value above base, max=" + max); + } + + @Test + public void jitterPercentZeroDisablesJitter() { + Backoff backoff = Backoff.builder() + .initialDelay(Duration.ofMillis(100)) + .maxBackoff(Duration.ofMillis(100)) + .jitterPercent(0) + .build(); + for (int i = 0; i < 100; i++) { + backoff.reset(); + assertEquals(backoff.next().toMillis(), 100); + } + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void negativeJitterIsRejected() { + Backoff.builder().jitterPercent(-1); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void jitterAboveHundredIsRejected() { + Backoff.builder().jitterPercent(101); } }