diff --git a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/throttling/AdaptiveThrottler.java b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/throttling/AdaptiveThrottler.java new file mode 100644 index 000000000000..58cda496ff6f --- /dev/null +++ b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/throttling/AdaptiveThrottler.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.components.throttling; + +import java.util.Random; +import org.apache.beam.sdk.io.components.util.MovingSum; + +/** + * Implements adaptive throttling. + * + *

See + * https://landing.google.com/sre/book/chapters/handling-overload.html#client-side-throttling-a7sYUg + * for a full discussion of the use case and algorithm applied. + */ +public class AdaptiveThrottler { + + // The target minimum number of requests per samplePeriodMs, even if no + // requests succeed. Must be greater than 0, else we could throttle to zero. + // Because every decision is probabilistic, there is no guarantee that the + // request rate in any given interval will not be zero. (This is the +1 from + // the formula in + // https://landing.google.com/sre/book/chapters/handling-overload.html ) + public static final int MIN_REQUESTS = 1; + + private final MovingSum allRequests; + private final MovingSum successfulRequests; + private final double overloadRatio; + private final Random random; + + /** + * Initializes AdaptiveThrottler. + * + * @param windowMs length of history to consider, in ms, to set throttling. + * @param bucketMs granularity of time buckets that we store data in, in ms. + * @param overloadRatio the target ratio between requests sent and successful requests. + */ + public AdaptiveThrottler(long windowMs, long bucketMs, double overloadRatio) { + this(windowMs, bucketMs, overloadRatio, new Random()); + } + + // visible for testing + AdaptiveThrottler(long windowMs, long bucketMs, double overloadRatio, Random random) { + if (overloadRatio <= 1.0) { + throw new IllegalArgumentException("overloadRatio must be greater than 1.0"); + } + this.allRequests = new MovingSum(windowMs, bucketMs); + this.successfulRequests = new MovingSum(windowMs, bucketMs); + this.overloadRatio = overloadRatio; + this.random = random; + } + + protected double throttlingProbability(long now) { + if (!allRequests.hasData(now)) { + return 0.0; + } + long allReqs = allRequests.sum(now); + long successfulReqs = successfulRequests.sum(now); + double prob = (allReqs - overloadRatio * successfulReqs) / (allReqs + MIN_REQUESTS); + return Math.max(0.0, prob); + } + + /** + * Determines whether one RPC attempt should be throttled. + * + *

This should be called once each time the caller intends to send an RPC; if it returns true, + * drop or delay that request (calling this function again after the delay). + * + * @param now time in ms since the epoch + * @return true if the caller should throttle or delay the request. + */ + public synchronized boolean throttleRequest(long now) { + double prob = throttlingProbability(now); + allRequests.add(now, 1); + return random.nextDouble() < prob; + } + + /** + * Notifies the throttler of a successful request. + * + *

Must be called once for each request (for which throttleRequest was previously called) that + * succeeded. + * + * @param now time in ms since the epoch + */ + public synchronized void successfulRequest(long now) { + successfulRequests.add(now, 1); + } +} diff --git a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/throttling/ReactiveThrottler.java b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/throttling/ReactiveThrottler.java new file mode 100644 index 000000000000..ae0dea81a094 --- /dev/null +++ b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/throttling/ReactiveThrottler.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.components.throttling; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A wrapper around the AdaptiveThrottler that also handles logging and signaling throttling to the + * SDK harness using the provided namespace. + * + *

For usage, instantiate one instance of a ReactiveThrottler class for a PTransform. When making + * remote calls to a service, preface that call with the throttle() method to potentially + * pre-emptively throttle the request. This will throttle future calls based on the failure rate of + * preceding calls, with higher failure rates leading to longer periods of throttling to allow + * system recovery. capture the timestamp of the attempted request, then execute the request code. + * On a success, call successfulRequest(timestamp) to report the success to the throttler. + */ +public class ReactiveThrottler extends AdaptiveThrottler { + private static final Logger LOG = LoggerFactory.getLogger(ReactiveThrottler.class); + private static final long SECONDS_TO_MILLISECONDS = 1000L; + + private final ThrottlingSignaler throttlingSignaler; + private final int throttleDelaySecs; + + /** + * Initializes the ReactiveThrottler. + * + * @param windowMs length of history to consider, in ms, to set throttling. + * @param bucketMs granularity of time buckets that we store data in, in ms. + * @param overloadRatio the target ratio between requests sent and successful requests. + * @param namespace the namespace to use for logging and signaling throttling is occurring. + * @param throttleDelaySecs the amount of time in seconds to wait after preemptively throttled + * requests. + */ + public ReactiveThrottler( + long windowMs, long bucketMs, double overloadRatio, String namespace, int throttleDelaySecs) { + super(windowMs, bucketMs, overloadRatio); + if (throttleDelaySecs <= 0) { + throw new IllegalArgumentException("throttleDelaySecs must be greater than 0"); + } + this.throttlingSignaler = new ThrottlingSignaler(namespace); + this.throttleDelaySecs = throttleDelaySecs; + } + + /** + * Stops request code from advancing while the underlying AdaptiveThrottler is signaling to + * preemptively throttle the request. Automatically handles logging the throttling and signaling + * to the SDK harness that the request is being throttled. This should be called in any context + * where a call to a remote service is being contacted prior to the call being performed. + */ + public void throttle() throws InterruptedException { + if (throttleRequest(System.currentTimeMillis())) { + LOG.debug("Delaying request for {} seconds due to previous failures", throttleDelaySecs); + Thread.sleep(throttleDelaySecs * SECONDS_TO_MILLISECONDS); + throttlingSignaler.signalThrottling(throttleDelaySecs * SECONDS_TO_MILLISECONDS); + } + } +} diff --git a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/util/MovingSum.java b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/util/MovingSum.java new file mode 100644 index 000000000000..92c741d7d668 --- /dev/null +++ b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/util/MovingSum.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.components.util; + +import java.util.Arrays; + +/** + * Class that keeps track of a rolling window sum. + * + *

For use in tracking recent performance of the connector. + * + *

Intended to be similar to {@link org.apache.beam.sdk.util.MovingFunction}, but for convenience + * we expose the count of entries as well so this doubles as a moving average tracker. + */ +public class MovingSum { + private final int numBuckets; + private final long bucketMs; + + private int currentIndex; + private long currentMsSinceEpoch; + private final long[] sums; + private final long[] counts; + + public MovingSum(long windowMs, long bucketMs) { + if (windowMs < bucketMs || bucketMs <= 0) { + throw new IllegalArgumentException("windowMs >= bucketMs > 0 please"); + } + this.numBuckets = (int) Math.ceil((double) windowMs / bucketMs); + this.bucketMs = bucketMs; + this.sums = new long[this.numBuckets]; + this.counts = new long[this.numBuckets]; + this.currentIndex = 0; + this.currentMsSinceEpoch = 0; + Arrays.fill(this.sums, 0L); + Arrays.fill(this.counts, 0L); + } + + private void reset(long now) { + this.currentIndex = 0; + this.currentMsSinceEpoch = (now / bucketMs) * bucketMs; + Arrays.fill(sums, 0L); + Arrays.fill(counts, 0L); + } + + private void flush(long now) { + if (now >= (currentMsSinceEpoch + bucketMs * numBuckets)) { + // Time moved forward so far that all currently held data is outside of + // the window. It is faster to simply reset our data. + reset(now); + return; + } + + while (now >= currentMsSinceEpoch + bucketMs) { + // Advance time by one bucketMs, setting the new bucket's counts to 0. + currentMsSinceEpoch += bucketMs; + currentIndex = (currentIndex + 1) % numBuckets; + sums[currentIndex] = 0; + counts[currentIndex] = 0; + } + } + + public long sum(long now) { + flush(now); + long total = 0; + for (long s : sums) { + total += s; + } + return total; + } + + public void add(long now, long inc) { + flush(now); + sums[currentIndex] += inc; + counts[currentIndex] += 1; + } + + public long count(long now) { + flush(now); + long total = 0; + for (long c : counts) { + total += c; + } + return total; + } + + public boolean hasData(long now) { + return count(now) > 0; + } +} diff --git a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/util/package-info.java b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/util/package-info.java new file mode 100644 index 000000000000..40c757128789 --- /dev/null +++ b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/util/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Defines utilities for Beam IO components. */ +package org.apache.beam.sdk.io.components.util; diff --git a/sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/throttling/AdaptiveThrottlerTest.java b/sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/throttling/AdaptiveThrottlerTest.java new file mode 100644 index 000000000000..3e798c7a0d55 --- /dev/null +++ b/sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/throttling/AdaptiveThrottlerTest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.components.throttling; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.Random; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class AdaptiveThrottlerTest { + + private static final long START_TIME = 1500000000000L; + private static final long SAMPLE_PERIOD = 60000L; + private static final long BUCKET = 1000L; + private static final double OVERLOAD_RATIO = 2.0; + + private AdaptiveThrottler throttler; + + @Before + public void setUp() { + throttler = new AdaptiveThrottler(SAMPLE_PERIOD, BUCKET, OVERLOAD_RATIO); + } + + @Test + public void testNoInitialThrottling() { + assertEquals(0.0, throttler.throttlingProbability(START_TIME), 0.0); + } + + @Test + public void testNoThrottlingIfNoErrors() { + for (long t = START_TIME; t < START_TIME + 20; t++) { + assertFalse(throttler.throttleRequest(t)); + throttler.successfulRequest(t); + } + assertEquals(0.0, throttler.throttlingProbability(START_TIME + 20), 0.0); + } + + @Test + public void testNoThrottlingAfterErrorsExpire() { + for (long t = START_TIME; t < START_TIME + SAMPLE_PERIOD; t += 100) { + throttler.throttleRequest(t); + // No successful request + } + assertTrue(throttler.throttlingProbability(START_TIME + SAMPLE_PERIOD) > 0); + + for (long t = START_TIME + SAMPLE_PERIOD; t < START_TIME + SAMPLE_PERIOD * 2; t += 100) { + throttler.throttleRequest(t); + throttler.successfulRequest(t); + } + + assertEquals(0.0, throttler.throttlingProbability(START_TIME + SAMPLE_PERIOD * 2), 0.0); + } + + @Test + public void testThrottlingAfterErrors() { + // Inject a mocked Random + throttler = new AdaptiveThrottler(SAMPLE_PERIOD, BUCKET, OVERLOAD_RATIO, new MockRandom()); + + for (long t = START_TIME; t < START_TIME + 20; t++) { + boolean throttled = throttler.throttleRequest(t); + // 1/3rd of requests succeeding. + if (t % 3 == 1) { + throttler.successfulRequest(t); + } + + if (t > START_TIME + 10) { + // Roughly 1/3rd succeeding, 1/3rd failing, 1/3rd throttled. + assertEquals(0.33, throttler.throttlingProbability(t), 0.1); + // Given mocked random, expects 10..13 throttled, 14+ unthrottled + assertEquals(t < START_TIME + 14, throttled); + } + } + } + + private static class MockRandom extends Random { + private int callCount = 0; + + @Override + public double nextDouble() { + // Return 0.0, 0.1, ..., 0.9, 0.0, 0.1 ... + double val = (callCount % 10) / 10.0; + callCount++; + return val; + } + } +} diff --git a/sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/util/MovingSumTest.java b/sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/util/MovingSumTest.java new file mode 100644 index 000000000000..fd70ba7bf241 --- /dev/null +++ b/sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/util/MovingSumTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.components.util; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class MovingSumTest { + + @Test + public void testMovingSumBasic() { + MovingSum movingSum = new MovingSum(10000, 1000); + assertFalse(movingSum.hasData(1000)); + + movingSum.add(1000, 5); + assertTrue(movingSum.hasData(1000)); + assertEquals(5, movingSum.sum(1000)); + assertEquals(1, movingSum.count(1000)); + + movingSum.add(1500, 10); + assertEquals(15, movingSum.sum(1500)); + assertEquals(2, movingSum.count(1500)); + + // Advance by 2 buckets (from 1000 to 3000) + assertEquals(15, movingSum.sum(3000)); + movingSum.add(3500, 20); + assertEquals(35, movingSum.sum(3500)); + assertEquals(3, movingSum.count(3500)); + + // Wait 11 seconds (moving completely outside window) + assertEquals(0, movingSum.sum(12000)); + assertEquals(0, movingSum.count(12000)); + assertFalse(movingSum.hasData(12000)); + } + + @Test + public void testInvalidArguments() { + assertThrows(IllegalArgumentException.class, () -> new MovingSum(100, 1000)); + assertThrows(IllegalArgumentException.class, () -> new MovingSum(1000, 0)); + } +}