From c2ddc27b2857542c5b0536d79e4d806067395f0d Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Mon, 4 May 2026 18:23:44 -0700 Subject: [PATCH 1/3] [improve][client] PIP-468: Make Backoff jitter configurable Adds a jitterPercent parameter to Backoff (default 10%) and exposes it on BackoffPolicy in the v5 client API. Jitter is now applied symmetrically as +/- jitterPercent/2 around the base delay and applied on the first call as well, instead of always shaving up to 10% off and skipping the initial delay. --- .../client/api/v5/config/BackoffPolicy.java | 52 +++++--- .../apache/pulsar/common/util/Backoff.java | 50 ++++++-- .../pulsar/common/util/BackoffTest.java | 112 +++++++++++++----- 3 files changed, 157 insertions(+), 57 deletions(-) diff --git a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/BackoffPolicy.java b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/BackoffPolicy.java index eb7ab9a4ecde8..c63abd9651cf0 100644 --- a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/BackoffPolicy.java +++ b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/BackoffPolicy.java @@ -24,56 +24,80 @@ /** * Backoff configuration for broker reconnection attempts. * - *

The delay for attempt {@code n} is {@code min(initialInterval * multiplier^(n-1), maxInterval)}. + *

The base delay for attempt {@code n} is {@code min(initialInterval * multiplier^(n-1), maxInterval)}. + * A symmetric random jitter of {@code ±jitterPercent/2} is applied to each delay (including the + * first one) to spread out concurrent retries. * * @param initialInterval the delay before the first reconnection attempt * @param maxInterval the maximum delay between reconnection attempts * @param multiplier the multiplier applied after each attempt + * @param jitterPercent the symmetric jitter percentage applied to each delay; {@code 0} disables jitter */ public record BackoffPolicy( Duration initialInterval, Duration maxInterval, - double multiplier + double multiplier, + double jitterPercent ) { + /** Default jitter percentage applied when not explicitly specified. */ + public static final double DEFAULT_JITTER_PERCENT = 10.0; + public BackoffPolicy { Objects.requireNonNull(initialInterval, "initialInterval must not be null"); Objects.requireNonNull(maxInterval, "maxInterval must not be null"); if (multiplier < 1.0) { throw new IllegalArgumentException("multiplier must be >= 1.0"); } + if (jitterPercent < 0) { + throw new IllegalArgumentException("jitterPercent must be >= 0"); + } } /** - * Create a fixed backoff (no increase between retries). + * Create a fixed backoff (no increase between retries) with the default jitter. * - * @param initialInterval the constant delay between reconnection attempts + * @param initialInterval the constant base delay between reconnection attempts * @param maxInterval the maximum delay between reconnection attempts - * @return a {@link BackoffPolicy} with a multiplier of 1.0 + * @return a {@link BackoffPolicy} with a multiplier of 1.0 and the default jitter */ public static BackoffPolicy fixed(Duration initialInterval, Duration maxInterval) { - return new BackoffPolicy(initialInterval, maxInterval, 1.0); + return new BackoffPolicy(initialInterval, maxInterval, 1.0, DEFAULT_JITTER_PERCENT); } /** - * Create an exponential backoff with the given bounds and a default multiplier of 2. + * Create an exponential backoff with the given bounds, a default multiplier of 2 and the + * default jitter. * - * @param initialInterval the delay before the first reconnection attempt + * @param initialInterval the base delay before the first reconnection attempt * @param maxInterval the maximum delay between reconnection attempts - * @return a {@link BackoffPolicy} with a multiplier of 2.0 + * @return a {@link BackoffPolicy} with a multiplier of 2.0 and the default jitter */ public static BackoffPolicy exponential(Duration initialInterval, Duration maxInterval) { - return new BackoffPolicy(initialInterval, maxInterval, 2.0); + return new BackoffPolicy(initialInterval, maxInterval, 2.0, DEFAULT_JITTER_PERCENT); } /** - * Create an exponential backoff with a custom multiplier. + * Create an exponential backoff with a custom multiplier and the default jitter. * - * @param initialInterval the delay before the first reconnection attempt + * @param initialInterval the base delay before the first reconnection attempt * @param maxInterval the maximum delay between reconnection attempts * @param multiplier the multiplier applied after each attempt, must be >= 1.0 - * @return a {@link BackoffPolicy} with the specified parameters + * @return a {@link BackoffPolicy} with the specified parameters and the default jitter */ public static BackoffPolicy exponential(Duration initialInterval, Duration maxInterval, double multiplier) { - return new BackoffPolicy(initialInterval, maxInterval, multiplier); + return new BackoffPolicy(initialInterval, maxInterval, multiplier, DEFAULT_JITTER_PERCENT); + } + + /** + * Returns a copy of this policy with the given jitter percentage. The actual jitter applied to + * each returned delay is symmetric around the base value: a uniform random factor in + * {@code [1 - jitterPercent/200, 1 + jitterPercent/200)}. + * + * @param jitterPercent the jitter percentage to apply, must be {@code >= 0}; {@code 0} disables jitter + * @return a new {@link BackoffPolicy} with the configured jitter + * @throws IllegalArgumentException if {@code jitterPercent} is negative + */ + public BackoffPolicy withJitter(double jitterPercent) { + return new BackoffPolicy(initialInterval, maxInterval, multiplier, jitterPercent); } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/Backoff.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/Backoff.java index 5957ce86796ac..03d00de1d2be3 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/Backoff.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/Backoff.java @@ -28,8 +28,8 @@ * Exponential backoff with mandatory stop. * *

Delays start at {@code initialDelay} and double on every call to {@link #next()}, up to - * {@code maxBackoff}. A random jitter of up to 10% is subtracted from each value to avoid - * thundering-herd retries. + * {@code maxBackoff}. A symmetric random jitter of {@code ±jitterPercent/2} is applied to every + * returned value (including the first one) to avoid thundering-herd retries. * *

If a {@code mandatoryStop} duration is configured, the backoff tracks wall-clock time from the * first {@link #next()} call. Once the elapsed time plus the next delay would exceed the mandatory @@ -43,6 +43,7 @@ * .initialDelay(Duration.ofMillis(100)) * .maxBackoff(Duration.ofMinutes(1)) * .mandatoryStop(Duration.ofSeconds(30)) + * .jitterPercent(10.0) * .build(); * * Duration delay = backoff.next(); @@ -51,6 +52,7 @@ public class Backoff { private static final Duration DEFAULT_INITIAL_DELAY = Duration.ofMillis(100); private static final Duration DEFAULT_MAX_BACKOFF_INTERVAL = Duration.ofMinutes(1); + private static final double DEFAULT_JITTER_PERCENT = 10.0; private static final Random random = new Random(); @Getter @@ -59,6 +61,8 @@ public class Backoff { private final Duration max; @Getter private final Duration mandatoryStop; + @Getter + private final double jitterPercent; private final Clock clock; private Duration next; @@ -67,10 +71,11 @@ public class Backoff { @Getter private boolean mandatoryStopMade; - private Backoff(Duration initial, Duration max, Duration mandatoryStop, Clock clock) { + private Backoff(Duration initial, Duration max, Duration mandatoryStop, double jitterPercent, Clock clock) { this.initial = initial; this.max = max; this.mandatoryStop = mandatoryStop; + this.jitterPercent = jitterPercent; this.next = initial; this.clock = clock; this.firstBackoffTime = Instant.EPOCH; @@ -101,8 +106,10 @@ public static Builder builder() { /** * Returns the next backoff delay, advancing the internal state. * - *

The returned duration is never less than the initial delay and never more than the max - * backoff. A random jitter of up to 10% is subtracted to spread out concurrent retries. + *

The underlying delay starts at the initial delay and doubles on each call up to the max + * backoff. A symmetric jitter of {@code ±jitterPercent/2} is applied on every call (including + * the first one) to spread out concurrent retries; the returned value may therefore be slightly + * below the initial delay or slightly above the max backoff. * * @return the delay to wait before the next retry attempt */ @@ -130,13 +137,13 @@ public Duration next() { } } - // Randomly decrease the timeout up to 10% to avoid simultaneous retries long currentMillis = current.toMillis(); - if (currentMillis > 10) { - currentMillis -= random.nextInt((int) currentMillis / 10); + if (jitterPercent > 0 && currentMillis > 0) { + // Apply a symmetric jitter of ±jitterPercent/2 around the current delay. + double factor = 1.0 + (random.nextDouble() - 0.5) * (jitterPercent / 100.0); + currentMillis = Math.max(0L, Math.round(currentMillis * factor)); } - long initialMillis = initial.toMillis(); - return Duration.ofMillis(Math.max(initialMillis, currentMillis)); + return Duration.ofMillis(currentMillis); } /** @@ -162,12 +169,13 @@ public void reset() { /** * Builder for {@link Backoff}. * - *

Defaults: initial delay 100 ms, max backoff 1 min, no mandatory stop. + *

Defaults: initial delay 100 ms, max backoff 1 min, no mandatory stop, 10% jitter. */ public static class Builder { private Duration initialDelay = DEFAULT_INITIAL_DELAY; private Duration maxBackoff = DEFAULT_MAX_BACKOFF_INTERVAL; private Duration mandatoryStop = Duration.ZERO; + private double jitterPercent = DEFAULT_JITTER_PERCENT; private Clock clock = Clock.systemDefaultZone(); /** @@ -205,6 +213,24 @@ public Builder mandatoryStop(Duration mandatoryStop) { return this; } + /** + * Sets the jitter percentage applied to each returned delay. The actual jitter is symmetric: + * the returned value is multiplied by a uniform random factor in + * {@code [1 - jitterPercent/200, 1 + jitterPercent/200)}. Defaults to 10. Set to 0 to disable + * jitter. + * + * @param jitterPercent the jitter percentage, must be {@code >= 0} + * @return this builder + * @throws IllegalArgumentException if {@code jitterPercent} is negative + */ + public Builder jitterPercent(double jitterPercent) { + if (jitterPercent < 0) { + throw new IllegalArgumentException("jitterPercent must be >= 0"); + } + this.jitterPercent = jitterPercent; + return this; + } + Builder clock(Clock clock) { this.clock = clock; return this; @@ -216,7 +242,7 @@ Builder clock(Clock clock) { * @return a new Backoff */ public Backoff build() { - return new Backoff(initialDelay, maxBackoff, mandatoryStop, clock); + return new Backoff(initialDelay, maxBackoff, mandatoryStop, jitterPercent, clock); } } } diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/BackoffTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/BackoffTest.java index 152ddee5156d0..ac56e3f5338fe 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/BackoffTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/BackoffTest.java @@ -29,11 +29,6 @@ import org.testng.annotations.Test; public class BackoffTest { - boolean withinTenPercentAndDecrementTimer(Backoff backoff, long t2) { - long t1 = backoff.next().toMillis(); - return (t1 >= t2 * 0.9 && t1 <= t2); - } - boolean checkExactAndDecrementTimer(Backoff backoff, long t2) { long t1 = backoff.next().toMillis(); return t1 == t2; @@ -45,12 +40,13 @@ public void mandatoryStopTestNegativeTest() { .initialDelay(Duration.ofMillis(100)) .maxBackoff(Duration.ofSeconds(60)) .mandatoryStop(Duration.ofMillis(1900)) + .jitterPercent(0) .build(); assertEquals(backoff.next().toMillis(), 100); backoff.next(); // 200 backoff.next(); // 400 backoff.next(); // 800 - assertFalse(withinTenPercentAndDecrementTimer(backoff, 400)); + assertFalse(checkExactAndDecrementTimer(backoff, 400)); } @Test @@ -64,6 +60,7 @@ public void firstBackoffTimerTest() { .initialDelay(Duration.ofMillis(100)) .maxBackoff(Duration.ofSeconds(60)) .mandatoryStop(Duration.ofMillis(1900)) + .jitterPercent(0) .clock(mockClock) .build(); @@ -83,10 +80,11 @@ public void basicTest() { .initialDelay(Duration.ofMillis(5)) .maxBackoff(Duration.ofSeconds(60)) .mandatoryStop(Duration.ofSeconds(60)) + .jitterPercent(0) .clock(mockClock) .build(); assertTrue(checkExactAndDecrementTimer(backoff, 5)); - assertTrue(withinTenPercentAndDecrementTimer(backoff, 10)); + assertTrue(checkExactAndDecrementTimer(backoff, 10)); backoff.reset(); assertTrue(checkExactAndDecrementTimer(backoff, 5)); } @@ -104,13 +102,14 @@ public void maxTest() { .initialDelay(Duration.ofMillis(5)) .maxBackoff(Duration.ofMillis(20)) .mandatoryStop(Duration.ofMillis(20)) + .jitterPercent(0) .clock(mockClock) .build(); assertTrue(checkExactAndDecrementTimer(backoff, 5)); - assertTrue(withinTenPercentAndDecrementTimer(backoff, 10)); - assertTrue(withinTenPercentAndDecrementTimer(backoff, 5)); - assertTrue(withinTenPercentAndDecrementTimer(backoff, 20)); + assertTrue(checkExactAndDecrementTimer(backoff, 10)); + assertTrue(checkExactAndDecrementTimer(backoff, 5)); + assertTrue(checkExactAndDecrementTimer(backoff, 20)); } @Test @@ -121,70 +120,121 @@ public void mandatoryStopTest() { .initialDelay(Duration.ofMillis(100)) .maxBackoff(Duration.ofSeconds(60)) .mandatoryStop(Duration.ofMillis(1900)) + .jitterPercent(0) .clock(mockClock) .build(); Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(0)); assertTrue(checkExactAndDecrementTimer(backoff, 100)); Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(100)); - assertTrue(withinTenPercentAndDecrementTimer(backoff, 200)); + assertTrue(checkExactAndDecrementTimer(backoff, 200)); Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(300)); - assertTrue(withinTenPercentAndDecrementTimer(backoff, 400)); + assertTrue(checkExactAndDecrementTimer(backoff, 400)); Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(700)); - assertTrue(withinTenPercentAndDecrementTimer(backoff, 800)); + assertTrue(checkExactAndDecrementTimer(backoff, 800)); Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(1500)); // would have been 1600 w/o the mandatory stop - assertTrue(withinTenPercentAndDecrementTimer(backoff, 400)); + assertTrue(checkExactAndDecrementTimer(backoff, 400)); assertTrue(backoff.isMandatoryStopMade()); Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(1900)); - assertTrue(withinTenPercentAndDecrementTimer(backoff, 3200)); + assertTrue(checkExactAndDecrementTimer(backoff, 3200)); Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(3200)); - assertTrue(withinTenPercentAndDecrementTimer(backoff, 6400)); + assertTrue(checkExactAndDecrementTimer(backoff, 6400)); Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(3200)); - assertTrue(withinTenPercentAndDecrementTimer(backoff, 12800)); + assertTrue(checkExactAndDecrementTimer(backoff, 12800)); Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(6400)); - assertTrue(withinTenPercentAndDecrementTimer(backoff, 25600)); + assertTrue(checkExactAndDecrementTimer(backoff, 25600)); Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(12800)); - assertTrue(withinTenPercentAndDecrementTimer(backoff, 51200)); + assertTrue(checkExactAndDecrementTimer(backoff, 51200)); Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(25600)); - assertTrue(withinTenPercentAndDecrementTimer(backoff, 60000)); + assertTrue(checkExactAndDecrementTimer(backoff, 60000)); Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(51200)); - assertTrue(withinTenPercentAndDecrementTimer(backoff, 60000)); + assertTrue(checkExactAndDecrementTimer(backoff, 60000)); Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(60000)); backoff.reset(); Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(0)); assertTrue(checkExactAndDecrementTimer(backoff, 100)); Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(100)); - assertTrue(withinTenPercentAndDecrementTimer(backoff, 200)); + assertTrue(checkExactAndDecrementTimer(backoff, 200)); Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(300)); - assertTrue(withinTenPercentAndDecrementTimer(backoff, 400)); + assertTrue(checkExactAndDecrementTimer(backoff, 400)); Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(700)); - assertTrue(withinTenPercentAndDecrementTimer(backoff, 800)); + assertTrue(checkExactAndDecrementTimer(backoff, 800)); Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(1500)); // would have been 1600 w/o the mandatory stop - assertTrue(withinTenPercentAndDecrementTimer(backoff, 400)); + assertTrue(checkExactAndDecrementTimer(backoff, 400)); backoff.reset(); Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(0)); assertTrue(checkExactAndDecrementTimer(backoff, 100)); Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(100)); - assertTrue(withinTenPercentAndDecrementTimer(backoff, 200)); + assertTrue(checkExactAndDecrementTimer(backoff, 200)); Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(300)); - assertTrue(withinTenPercentAndDecrementTimer(backoff, 400)); + assertTrue(checkExactAndDecrementTimer(backoff, 400)); Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(700)); - assertTrue(withinTenPercentAndDecrementTimer(backoff, 800)); + assertTrue(checkExactAndDecrementTimer(backoff, 800)); backoff.reset(); Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(0)); assertTrue(checkExactAndDecrementTimer(backoff, 100)); Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(100)); - assertTrue(withinTenPercentAndDecrementTimer(backoff, 200)); + assertTrue(checkExactAndDecrementTimer(backoff, 200)); Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(300)); - assertTrue(withinTenPercentAndDecrementTimer(backoff, 400)); + assertTrue(checkExactAndDecrementTimer(backoff, 400)); Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(700)); - assertTrue(withinTenPercentAndDecrementTimer(backoff, 800)); + assertTrue(checkExactAndDecrementTimer(backoff, 800)); + } + + @Test + public void jitterIsAppliedSymmetricallyOnFirstCall() { + // With jitterPercent=20, the returned delay is in [base*0.9, base*1.1). + // Verify that across many calls we observe values both below and above the base, including + // on the very first call after a reset. + Backoff backoff = Backoff.builder() + .initialDelay(Duration.ofMillis(1000)) + .maxBackoff(Duration.ofMillis(1000)) + .jitterPercent(20) + .build(); + + boolean sawBelow = false; + boolean sawAbove = false; + long min = Long.MAX_VALUE; + long max = Long.MIN_VALUE; + for (int i = 0; i < 500; i++) { + backoff.reset(); + long t = backoff.next().toMillis(); + assertTrue(t >= 900 && t <= 1100, "value out of range: " + t); + if (t < 1000) { + sawBelow = true; + } + if (t > 1000) { + sawAbove = true; + } + min = Math.min(min, t); + max = Math.max(max, t); + } + assertTrue(sawBelow, "expected at least one jittered value below base, min=" + min); + assertTrue(sawAbove, "expected at least one jittered value above base, max=" + max); + } + + @Test + public void jitterPercentZeroDisablesJitter() { + Backoff backoff = Backoff.builder() + .initialDelay(Duration.ofMillis(100)) + .maxBackoff(Duration.ofMillis(100)) + .jitterPercent(0) + .build(); + for (int i = 0; i < 100; i++) { + backoff.reset(); + assertEquals(backoff.next().toMillis(), 100); + } + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void negativeJitterIsRejected() { + Backoff.builder().jitterPercent(-1); } } From c0b08718da6150631f8df771688fbd040dde8e89 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Tue, 5 May 2026 14:37:21 -0700 Subject: [PATCH 2/3] Cap jitterPercent at 100 Reject values above 100 in both Backoff.Builder and BackoffPolicy so the jitter factor stays in a sensible range. Add a test for the upper bound. --- .../apache/pulsar/client/api/v5/config/BackoffPolicy.java | 8 ++++---- .../main/java/org/apache/pulsar/common/util/Backoff.java | 8 ++++---- .../java/org/apache/pulsar/common/util/BackoffTest.java | 5 +++++ 3 files changed, 13 insertions(+), 8 deletions(-) diff --git a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/BackoffPolicy.java b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/BackoffPolicy.java index c63abd9651cf0..d1847ff90262c 100644 --- a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/BackoffPolicy.java +++ b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/config/BackoffPolicy.java @@ -48,8 +48,8 @@ public record BackoffPolicy( if (multiplier < 1.0) { throw new IllegalArgumentException("multiplier must be >= 1.0"); } - if (jitterPercent < 0) { - throw new IllegalArgumentException("jitterPercent must be >= 0"); + if (jitterPercent < 0 || jitterPercent > 100) { + throw new IllegalArgumentException("jitterPercent must be in [0, 100]"); } } @@ -93,9 +93,9 @@ public static BackoffPolicy exponential(Duration initialInterval, Duration maxIn * each returned delay is symmetric around the base value: a uniform random factor in * {@code [1 - jitterPercent/200, 1 + jitterPercent/200)}. * - * @param jitterPercent the jitter percentage to apply, must be {@code >= 0}; {@code 0} disables jitter + * @param jitterPercent the jitter percentage to apply, must be in {@code [0, 100]}; {@code 0} disables jitter * @return a new {@link BackoffPolicy} with the configured jitter - * @throws IllegalArgumentException if {@code jitterPercent} is negative + * @throws IllegalArgumentException if {@code jitterPercent} is outside {@code [0, 100]} */ public BackoffPolicy withJitter(double jitterPercent) { return new BackoffPolicy(initialInterval, maxInterval, multiplier, jitterPercent); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/Backoff.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/Backoff.java index 03d00de1d2be3..8bcc165dd6125 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/Backoff.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/Backoff.java @@ -219,13 +219,13 @@ public Builder mandatoryStop(Duration mandatoryStop) { * {@code [1 - jitterPercent/200, 1 + jitterPercent/200)}. Defaults to 10. Set to 0 to disable * jitter. * - * @param jitterPercent the jitter percentage, must be {@code >= 0} + * @param jitterPercent the jitter percentage, must be in {@code [0, 100]} * @return this builder - * @throws IllegalArgumentException if {@code jitterPercent} is negative + * @throws IllegalArgumentException if {@code jitterPercent} is outside {@code [0, 100]} */ public Builder jitterPercent(double jitterPercent) { - if (jitterPercent < 0) { - throw new IllegalArgumentException("jitterPercent must be >= 0"); + if (jitterPercent < 0 || jitterPercent > 100) { + throw new IllegalArgumentException("jitterPercent must be in [0, 100]"); } this.jitterPercent = jitterPercent; return this; diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/BackoffTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/BackoffTest.java index ac56e3f5338fe..41c2de63c1049 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/BackoffTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/BackoffTest.java @@ -237,4 +237,9 @@ public void negativeJitterIsRejected() { Backoff.builder().jitterPercent(-1); } + @Test(expectedExceptions = IllegalArgumentException.class) + public void jitterAboveHundredIsRejected() { + Backoff.builder().jitterPercent(101); + } + } From 19a503e92aad6252e0ad32b0a3fe618c76e69115 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Tue, 5 May 2026 15:16:43 -0700 Subject: [PATCH 3/3] [fix][test] Use Backoff.getInitial() instead of next() in ConsumerImplTest next() now applies symmetric jitter on the first call, so it no longer returns the configured initial value exactly. The test only cares about configuration, so query the configured initial directly. --- .../java/org/apache/pulsar/client/impl/ConsumerImplTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java index f53051e2c42c4..9d8b59db91008 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java @@ -111,7 +111,7 @@ public void testCorrectBackoffConfiguration() { ClientConfigurationData clientConfigurationData = new ClientConfigurationData(); Assert.assertEquals(backoff.getMax().toMillis(), TimeUnit.NANOSECONDS.toMillis(clientConfigurationData.getMaxBackoffIntervalNanos())); - Assert.assertEquals(backoff.next().toMillis(), + Assert.assertEquals(backoff.getInitial().toMillis(), TimeUnit.NANOSECONDS.toMillis(clientConfigurationData.getInitialBackoffIntervalNanos())); }