Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/scheduled-snyk-docker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ jobs:
permissions:
contents: read
packages: write
security-events: write

steps:
- uses: actions/checkout@v5
Expand Down
4 changes: 3 additions & 1 deletion .github/workflows/scheduled-snyk.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ jobs:
security:
needs: prepare-matrix
runs-on: ubuntu-latest
permissions: {}
permissions:
contents: read
security-events: write
strategy:
matrix:
module: ${{ fromJson(needs.prepare-matrix.outputs.modules ) }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,15 @@ public class FitbitRestSourceConnectorConfig extends RestSourceConnectorConfig {
private static final String FITBIT_FORBIDDEN_BACKOFF_DISPLAY = "Forbidden backoff time (s)";
private static final int FITBIT_FORBIDDEN_BACKOFF_DEFAULT = 86400; // 24 hours

public static final String FITBIT_TOO_MANY_REQUESTS_COOLDOWN_CONFIG = "fitbit.too.many.requests.cooldown.s";
private static final String FITBIT_TOO_MANY_REQUESTS_COOLDOWN_DOC = "Cooldown time in seconds after receiving too many requests (429) response.";
private static final String FITBIT_TOO_MANY_REQUESTS_COOLDOWN_DISPLAY = "Too many requests cooldown (s)";
private static final int FITBIT_TOO_MANY_REQUESTS_COOLDOWN_DEFAULT = 3600; // 1 hour

public static final String FITBIT_COOLDOWN_STRATEGY_CONFIG = "fitbit.cooldown.strategy";
private static final String FITBIT_COOLDOWN_STRATEGY_DOC = "Strategy for handling too many requests cooldown. Options: ROLLING_WINDOW, TOP_OF_HOUR";
private static final String FITBIT_COOLDOWN_STRATEGY_DISPLAY = "Cooldown strategy";
private static final String FITBIT_COOLDOWN_STRATEGY_DEFAULT = "ROLLING_WINDOW";

private UserRepository userRepository;
private final Headers clientCredentials;
Expand Down Expand Up @@ -689,6 +698,26 @@ public String toString() {
++orderInGroup,
Width.SHORT,
FITBIT_FORBIDDEN_BACKOFF_DISPLAY)

.define(FITBIT_TOO_MANY_REQUESTS_COOLDOWN_CONFIG,
Type.INT,
FITBIT_TOO_MANY_REQUESTS_COOLDOWN_DEFAULT,
Importance.MEDIUM,
FITBIT_TOO_MANY_REQUESTS_COOLDOWN_DOC,
group,
++orderInGroup,
Width.SHORT,
FITBIT_TOO_MANY_REQUESTS_COOLDOWN_DISPLAY)

.define(FITBIT_COOLDOWN_STRATEGY_CONFIG,
Type.STRING,
FITBIT_COOLDOWN_STRATEGY_DEFAULT,
Importance.MEDIUM,
FITBIT_COOLDOWN_STRATEGY_DOC,
group,
++orderInGroup,
Width.SHORT,
FITBIT_COOLDOWN_STRATEGY_DISPLAY)
;
}

Expand Down Expand Up @@ -849,7 +878,7 @@ public Duration getPollIntervalPerUser() {
}

public Duration getTooManyRequestsCooldownInterval() {
return Duration.ofHours(1);
return Duration.ofSeconds(getInt(FITBIT_TOO_MANY_REQUESTS_COOLDOWN_CONFIG));
}

public String getFitbitIntradayCaloriesTopic() {
Expand Down Expand Up @@ -896,4 +925,12 @@ public int getMaxForbidden() {
public int getForbiddenBackoff() {
return getInt(FITBIT_FORBIDDEN_BACKOFF_CONFIG);
}

public String getCooldownStrategy() {
String value = getString(FITBIT_COOLDOWN_STRATEGY_CONFIG);
if (!value.equalsIgnoreCase("ROLLING_WINDOW") && !value.equalsIgnoreCase("TOP_OF_HOUR")) {
throw new ConfigException(FITBIT_COOLDOWN_STRATEGY_CONFIG, value, "Invalid cooldown strategy. Must be either ROLLING_WINDOW or TOP_OF_HOUR.");
}
return value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static java.time.ZoneOffset.UTC;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import static java.time.temporal.ChronoUnit.DAYS;
import static java.time.temporal.ChronoUnit.MINUTES;
import static java.time.temporal.ChronoUnit.NANOS;
Expand Down Expand Up @@ -120,6 +121,7 @@ public abstract class FitbitPollingRoute implements PollingRequestRoute {
private final Map<String, Integer> forbidden403Counter;
private int maxForbiddenResponses;
private Duration forbidden403Cooldown;
private String cooldownStrategy;

public FitbitPollingRoute(
FitbitRequestGenerator generator,
Expand Down Expand Up @@ -147,6 +149,7 @@ public void initialize(RestSourceConnectorConfig config) {
this.maxForbiddenResponses = fitbitConfig.getMaxForbidden();
this.converter().initialize(fitbitConfig);
this.forbidden403Cooldown = Duration.ofSeconds(fitbitConfig.getForbiddenBackoff());
this.cooldownStrategy = fitbitConfig.getCooldownStrategy();
}

@Override
Expand All @@ -164,6 +167,9 @@ public void requestEmpty(RestRequest request) {
lastPollPerUser.put(((FitbitRestRequest) request).getUser().getId(), lastPoll);
FitbitRestRequest fitbitRequest = (FitbitRestRequest) request;
Instant endOffset = fitbitRequest.getDateRange().end().toInstant();
// When having polled a date range for a route for HISTORICAL_TIME_DAYS days and
// the response has no data, consider this data not to exist by considering
// the end of the date range as the last successful data retrieval.
if (DAYS.between(endOffset, lastPoll) >= HISTORICAL_TIME_DAYS) {
String key = fitbitRequest.getUser().getVersionedId();
offsets.put(key, endOffset);
Expand All @@ -175,19 +181,19 @@ public void requestFailed(RestRequest request, Response response) {
if (response != null && response.code() == 429) {
User user = ((FitbitRestRequest) request).getUser();
tooManyRequestsForUser.add(user);
String cooldownString = response.header("Retry-After");
Duration cooldown = getTooManyRequestsCooldown();
if (cooldownString != null) {
try {
cooldown = Duration.ofSeconds(Long.parseLong(cooldownString));
} catch (NumberFormatException ex) {
cooldown = getTooManyRequestsCooldown();
}

Instant backOff;
if ("TOP_OF_HOUR".equalsIgnoreCase(cooldownStrategy)) {
backOff = calculateTopOfHourBackoff();
logger.info("Too many requests for user {}. Using TOP_OF_HOUR strategy, backing off until: {}",
user, backOff);
} else {
backOff = calculateRollingWindowBackoff(response);
logger.info("Too many requests for user {}. Using ROLLING_WINDOW strategy, backing off until: {}",
user, backOff.plus(getPollIntervalPerUser()));
}
Instant backOff = lastPoll.plus(cooldown);

lastPollPerUser.put(user.getId(), backOff);
logger.info("Too many requests for user {}. Backing off until {}",
user, backOff.plus(getPollIntervalPerUser()));
} else if (response != null && response.code() == 403) {
User user = ((FitbitRestRequest) request).getUser();
String userId = user.getId();
Expand All @@ -206,6 +212,46 @@ public void requestFailed(RestRequest request, Response response) {
}
}

/**
* Calculate backoff using top-of-hour strategy.
* Waits until the top of the next hour to align with Fitbit's rate limit reset.
*/
private Instant calculateTopOfHourBackoff() {
Instant now = Instant.now();
Instant topOfNextHour = calculateTopOfNextHour(now).plus(ONE_SECOND);
return topOfNextHour;
}

/**
* Calculate backoff using rolling window strategy.
* Uses configured cooldown duration from when the error occurred.
*/
private Instant calculateRollingWindowBackoff(Response response) {
String cooldownString = response.header("Retry-After");
Duration cooldown = getTooManyRequestsCooldown();

if (cooldownString != null) {
try {
cooldown = Duration.ofSeconds(Long.parseLong(cooldownString));
} catch (NumberFormatException ex) {
cooldown = getTooManyRequestsCooldown();
}
}

return lastPoll.plus(cooldown);
}

/**
* Calculate the top of the next hour from the given instant.
*/
private Instant calculateTopOfNextHour(Instant instant) {
ZonedDateTime zonedDateTime = instant.atZone(UTC);
ZonedDateTime topOfNextHour = zonedDateTime
.plusHours(1)
.truncatedTo(ChronoUnit.HOURS);
return topOfNextHour.toInstant();
}

/**
* Actually construct requests, based on the current offset
* @param user Fitbit user
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@
package org.radarbase.connect.rest.fitbit.route;

import static java.time.ZoneOffset.UTC;
import static java.time.temporal.ChronoUnit.SECONDS;
import static java.time.temporal.ChronoUnit.DAYS;

import io.confluent.connect.avro.AvroData;

import java.time.Duration;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.stream.Stream;

import org.radarbase.connect.rest.fitbit.converter.FitbitRestingHeartRateAvroConverter;
Expand All @@ -47,9 +46,14 @@ public FitbitRestingHeartRateRoute(

@Override
protected Stream<FitbitRestRequest> createRequests(User user) {
ZonedDateTime startDate = this.getOffset(user).plus(ONE_SECOND)
// Important: resting heart rate is queried at the resolution of a single
// day, so the offset for the next request will be set to the next day.
ZonedDateTime startDate = this.getOffset(user).plus(ONE_DAY)
.atZone(UTC)
.truncatedTo(SECONDS);
.truncatedTo(DAYS);
// Note: the date range of startDate to now() is not correct, but will ensure that in case of empty
// results, the HISTORICAL_TIME_DAYS retry inactivation in requestEmpty() of FitbitPollingRoute.java
// will never be used.
return Stream.of(newRequest(user, new DateRange(startDate, ZonedDateTime.now(UTC)),
user.getExternalUserId(), DATE_FORMAT.format(startDate)));
}
Expand Down
Loading