-
Notifications
You must be signed in to change notification settings - Fork 4.6k
[Gemini] Port client-side throttling to the Java SDK #39021
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
jrmccluskey
wants to merge
5
commits into
apache:master
Choose a base branch
from
jrmccluskey:javaThrottling
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
103 changes: 103 additions & 0 deletions
103
...ponents/src/main/java/org/apache/beam/sdk/io/components/throttling/AdaptiveThrottler.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,103 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.beam.sdk.io.components.throttling; | ||
|
|
||
| import java.util.Random; | ||
| import org.apache.beam.sdk.io.components.util.MovingSum; | ||
|
|
||
| /** | ||
| * Implements adaptive throttling. | ||
| * | ||
| * <p>See | ||
| * https://landing.google.com/sre/book/chapters/handling-overload.html#client-side-throttling-a7sYUg | ||
| * for a full discussion of the use case and algorithm applied. | ||
| */ | ||
| public class AdaptiveThrottler { | ||
|
|
||
| // The target minimum number of requests per samplePeriodMs, even if no | ||
| // requests succeed. Must be greater than 0, else we could throttle to zero. | ||
| // Because every decision is probabilistic, there is no guarantee that the | ||
| // request rate in any given interval will not be zero. (This is the +1 from | ||
| // the formula in | ||
| // https://landing.google.com/sre/book/chapters/handling-overload.html ) | ||
| public static final int MIN_REQUESTS = 1; | ||
|
|
||
| private final MovingSum allRequests; | ||
| private final MovingSum successfulRequests; | ||
| private final double overloadRatio; | ||
| private final Random random; | ||
|
|
||
| /** | ||
| * Initializes AdaptiveThrottler. | ||
| * | ||
| * @param windowMs length of history to consider, in ms, to set throttling. | ||
| * @param bucketMs granularity of time buckets that we store data in, in ms. | ||
| * @param overloadRatio the target ratio between requests sent and successful requests. | ||
| */ | ||
| public AdaptiveThrottler(long windowMs, long bucketMs, double overloadRatio) { | ||
| this(windowMs, bucketMs, overloadRatio, new Random()); | ||
| } | ||
|
|
||
| // visible for testing | ||
| AdaptiveThrottler(long windowMs, long bucketMs, double overloadRatio, Random random) { | ||
| if (overloadRatio <= 1.0) { | ||
| throw new IllegalArgumentException("overloadRatio must be greater than 1.0"); | ||
| } | ||
| this.allRequests = new MovingSum(windowMs, bucketMs); | ||
| this.successfulRequests = new MovingSum(windowMs, bucketMs); | ||
| this.overloadRatio = overloadRatio; | ||
| this.random = random; | ||
| } | ||
|
|
||
| protected double throttlingProbability(long now) { | ||
| if (!allRequests.hasData(now)) { | ||
| return 0.0; | ||
| } | ||
| long allReqs = allRequests.sum(now); | ||
| long successfulReqs = successfulRequests.sum(now); | ||
| double prob = (allReqs - overloadRatio * successfulReqs) / (allReqs + MIN_REQUESTS); | ||
| return Math.max(0.0, prob); | ||
| } | ||
|
|
||
| /** | ||
| * Determines whether one RPC attempt should be throttled. | ||
| * | ||
| * <p>This should be called once each time the caller intends to send an RPC; if it returns true, | ||
| * drop or delay that request (calling this function again after the delay). | ||
| * | ||
| * @param now time in ms since the epoch | ||
| * @return true if the caller should throttle or delay the request. | ||
| */ | ||
| public synchronized boolean throttleRequest(long now) { | ||
| double prob = throttlingProbability(now); | ||
| allRequests.add(now, 1); | ||
| return random.nextDouble() < prob; | ||
| } | ||
|
|
||
| /** | ||
| * Notifies the throttler of a successful request. | ||
| * | ||
| * <p>Must be called once for each request (for which throttleRequest was previously called) that | ||
| * succeeded. | ||
| * | ||
| * @param now time in ms since the epoch | ||
| */ | ||
| public synchronized void successfulRequest(long now) { | ||
| successfulRequests.add(now, 1); | ||
| } | ||
| } | ||
74 changes: 74 additions & 0 deletions
74
...ponents/src/main/java/org/apache/beam/sdk/io/components/throttling/ReactiveThrottler.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,74 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.beam.sdk.io.components.throttling; | ||
|
|
||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| /** | ||
| * A wrapper around the AdaptiveThrottler that also handles logging and signaling throttling to the | ||
| * SDK harness using the provided namespace. | ||
| * | ||
| * <p>For usage, instantiate one instance of a ReactiveThrottler class for a PTransform. When making | ||
| * remote calls to a service, preface that call with the throttle() method to potentially | ||
| * pre-emptively throttle the request. This will throttle future calls based on the failure rate of | ||
| * preceding calls, with higher failure rates leading to longer periods of throttling to allow | ||
| * system recovery. capture the timestamp of the attempted request, then execute the request code. | ||
| * On a success, call successfulRequest(timestamp) to report the success to the throttler. | ||
| */ | ||
| public class ReactiveThrottler extends AdaptiveThrottler { | ||
| private static final Logger LOG = LoggerFactory.getLogger(ReactiveThrottler.class); | ||
| private static final long SECONDS_TO_MILLISECONDS = 1000L; | ||
|
|
||
| private final ThrottlingSignaler throttlingSignaler; | ||
| private final int throttleDelaySecs; | ||
|
|
||
| /** | ||
| * Initializes the ReactiveThrottler. | ||
| * | ||
| * @param windowMs length of history to consider, in ms, to set throttling. | ||
| * @param bucketMs granularity of time buckets that we store data in, in ms. | ||
| * @param overloadRatio the target ratio between requests sent and successful requests. | ||
| * @param namespace the namespace to use for logging and signaling throttling is occurring. | ||
| * @param throttleDelaySecs the amount of time in seconds to wait after preemptively throttled | ||
| * requests. | ||
| */ | ||
| public ReactiveThrottler( | ||
| long windowMs, long bucketMs, double overloadRatio, String namespace, int throttleDelaySecs) { | ||
| super(windowMs, bucketMs, overloadRatio); | ||
| if (throttleDelaySecs <= 0) { | ||
| throw new IllegalArgumentException("throttleDelaySecs must be greater than 0"); | ||
| } | ||
| this.throttlingSignaler = new ThrottlingSignaler(namespace); | ||
| this.throttleDelaySecs = throttleDelaySecs; | ||
| } | ||
|
jrmccluskey marked this conversation as resolved.
|
||
|
|
||
| /** | ||
| * Stops request code from advancing while the underlying AdaptiveThrottler is signaling to | ||
| * preemptively throttle the request. Automatically handles logging the throttling and signaling | ||
| * to the SDK harness that the request is being throttled. This should be called in any context | ||
| * where a call to a remote service is being contacted prior to the call being performed. | ||
| */ | ||
| public void throttle() throws InterruptedException { | ||
| if (throttleRequest(System.currentTimeMillis())) { | ||
| LOG.debug("Delaying request for {} seconds due to previous failures", throttleDelaySecs); | ||
| Thread.sleep(throttleDelaySecs * SECONDS_TO_MILLISECONDS); | ||
| throttlingSignaler.signalThrottling(throttleDelaySecs * SECONDS_TO_MILLISECONDS); | ||
| } | ||
| } | ||
|
jrmccluskey marked this conversation as resolved.
|
||
| } | ||
104 changes: 104 additions & 0 deletions
104
sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/util/MovingSum.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,104 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.beam.sdk.io.components.util; | ||
|
|
||
| import java.util.Arrays; | ||
|
|
||
| /** | ||
| * Class that keeps track of a rolling window sum. | ||
| * | ||
| * <p>For use in tracking recent performance of the connector. | ||
| * | ||
| * <p>Intended to be similar to {@link org.apache.beam.sdk.util.MovingFunction}, but for convenience | ||
| * we expose the count of entries as well so this doubles as a moving average tracker. | ||
| */ | ||
| public class MovingSum { | ||
| private final int numBuckets; | ||
| private final long bucketMs; | ||
|
|
||
| private int currentIndex; | ||
| private long currentMsSinceEpoch; | ||
| private final long[] sums; | ||
| private final long[] counts; | ||
|
|
||
| public MovingSum(long windowMs, long bucketMs) { | ||
| if (windowMs < bucketMs || bucketMs <= 0) { | ||
| throw new IllegalArgumentException("windowMs >= bucketMs > 0 please"); | ||
| } | ||
| this.numBuckets = (int) Math.ceil((double) windowMs / bucketMs); | ||
| this.bucketMs = bucketMs; | ||
| this.sums = new long[this.numBuckets]; | ||
| this.counts = new long[this.numBuckets]; | ||
| this.currentIndex = 0; | ||
| this.currentMsSinceEpoch = 0; | ||
| Arrays.fill(this.sums, 0L); | ||
| Arrays.fill(this.counts, 0L); | ||
| } | ||
|
|
||
| private void reset(long now) { | ||
| this.currentIndex = 0; | ||
| this.currentMsSinceEpoch = (now / bucketMs) * bucketMs; | ||
| Arrays.fill(sums, 0L); | ||
| Arrays.fill(counts, 0L); | ||
| } | ||
|
|
||
| private void flush(long now) { | ||
| if (now >= (currentMsSinceEpoch + bucketMs * numBuckets)) { | ||
| // Time moved forward so far that all currently held data is outside of | ||
| // the window. It is faster to simply reset our data. | ||
| reset(now); | ||
| return; | ||
| } | ||
|
|
||
| while (now >= currentMsSinceEpoch + bucketMs) { | ||
| // Advance time by one bucketMs, setting the new bucket's counts to 0. | ||
| currentMsSinceEpoch += bucketMs; | ||
| currentIndex = (currentIndex + 1) % numBuckets; | ||
| sums[currentIndex] = 0; | ||
| counts[currentIndex] = 0; | ||
| } | ||
| } | ||
|
|
||
| public long sum(long now) { | ||
| flush(now); | ||
| long total = 0; | ||
| for (long s : sums) { | ||
| total += s; | ||
| } | ||
| return total; | ||
| } | ||
|
|
||
| public void add(long now, long inc) { | ||
| flush(now); | ||
| sums[currentIndex] += inc; | ||
| counts[currentIndex] += 1; | ||
| } | ||
|
|
||
| public long count(long now) { | ||
| flush(now); | ||
| long total = 0; | ||
| for (long c : counts) { | ||
| total += c; | ||
| } | ||
| return total; | ||
| } | ||
|
|
||
| public boolean hasData(long now) { | ||
| return count(now) > 0; | ||
| } | ||
| } |
20 changes: 20 additions & 0 deletions
20
...java/io/components/src/main/java/org/apache/beam/sdk/io/components/util/package-info.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,20 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| /** Defines utilities for Beam IO components. */ | ||
| package org.apache.beam.sdk.io.components.util; |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.