From 11f39fb6b7f3ae2cca68de68191d62d40730bb1c Mon Sep 17 00:00:00 2001 From: Pim van Nierop Date: Tue, 9 Dec 2025 09:23:02 +0100 Subject: [PATCH 1/5] fix: set offset for resting heart rate to one day later --- .../fitbit/route/FitbitRestingHeartRateRoute.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitRestingHeartRateRoute.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitRestingHeartRateRoute.java index 6e433d07..3c12f208 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitRestingHeartRateRoute.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitRestingHeartRateRoute.java @@ -18,13 +18,12 @@ package org.radarbase.connect.rest.fitbit.route; import static java.time.ZoneOffset.UTC; -import static java.time.temporal.ChronoUnit.SECONDS; +import static java.time.temporal.ChronoUnit.DAYS; import io.confluent.connect.avro.AvroData; import java.time.Duration; import java.time.ZonedDateTime; -import java.time.format.DateTimeFormatter; import java.util.stream.Stream; import org.radarbase.connect.rest.fitbit.converter.FitbitRestingHeartRateAvroConverter; @@ -47,9 +46,14 @@ public FitbitRestingHeartRateRoute( @Override protected Stream createRequests(User user) { - ZonedDateTime startDate = this.getOffset(user).plus(ONE_SECOND) + // Important: resting heart rate is queried at the resolution of a single + // day, so the offset for the next request will be set to the next day. + ZonedDateTime startDate = this.getOffset(user).plus(ONE_DAY) .atZone(UTC) - .truncatedTo(SECONDS); + .truncatedTo(DAYS); + // Note: the date range of startDate to now() is not correct, but will ensure that in case of empty + // results, the HISTORICAL_TIME_DAYS retry inactivation in requestEmpty() of FitbitPollingRoute.java + // will never be used. return Stream.of(newRequest(user, new DateRange(startDate, ZonedDateTime.now(UTC)), user.getExternalUserId(), DATE_FORMAT.format(startDate))); } From 26369b7fe06a3af366894702880a0c71c00e9094 Mon Sep 17 00:00:00 2001 From: Pim van Nierop Date: Tue, 9 Dec 2025 11:44:13 +0100 Subject: [PATCH 2/5] docs: add comment explaining offset progress in case of empty responses --- .../connect/rest/fitbit/route/FitbitPollingRoute.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitPollingRoute.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitPollingRoute.java index 05ba9a09..c7453e2c 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitPollingRoute.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitPollingRoute.java @@ -164,6 +164,9 @@ public void requestEmpty(RestRequest request) { lastPollPerUser.put(((FitbitRestRequest) request).getUser().getId(), lastPoll); FitbitRestRequest fitbitRequest = (FitbitRestRequest) request; Instant endOffset = fitbitRequest.getDateRange().end().toInstant(); + // When having polled a date range for a route for HISTORICAL_TIME_DAYS days and + // the response has no data, consider this data not to exist by considering + // the end of the date range as the last successful data retrieval. if (DAYS.between(endOffset, lastPoll) >= HISTORICAL_TIME_DAYS) { String key = fitbitRequest.getUser().getVersionedId(); offsets.put(key, endOffset); From 4dd7c5db280c2d38338f043884c1342c87068b87 Mon Sep 17 00:00:00 2001 From: Pim van Nierop Date: Wed, 15 Oct 2025 11:29:13 +0200 Subject: [PATCH 3/5] Fix: snyk scanning actions --- .github/workflows/scheduled-snyk-docker.yaml | 1 + .github/workflows/scheduled-snyk.yaml | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/.github/workflows/scheduled-snyk-docker.yaml b/.github/workflows/scheduled-snyk-docker.yaml index e2a80181..0794c27c 100644 --- a/.github/workflows/scheduled-snyk-docker.yaml +++ b/.github/workflows/scheduled-snyk-docker.yaml @@ -42,6 +42,7 @@ jobs: permissions: contents: read packages: write + security-events: write steps: - uses: actions/checkout@v5 diff --git a/.github/workflows/scheduled-snyk.yaml b/.github/workflows/scheduled-snyk.yaml index 152344db..85407149 100644 --- a/.github/workflows/scheduled-snyk.yaml +++ b/.github/workflows/scheduled-snyk.yaml @@ -36,7 +36,9 @@ jobs: security: needs: prepare-matrix runs-on: ubuntu-latest - permissions: {} + permissions: + contents: read + security-events: write strategy: matrix: module: ${{ fromJson(needs.prepare-matrix.outputs.modules ) }} From 54f08bd48d7c4ecb674d964b1fe868662cc2eb9e Mon Sep 17 00:00:00 2001 From: yatharthranjan Date: Mon, 28 Jul 2025 14:00:25 +0100 Subject: [PATCH 4/5] make too many requests backoff configurable --- .../fitbit/FitbitRestSourceConnectorConfig.java | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitRestSourceConnectorConfig.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitRestSourceConnectorConfig.java index 61a55a3e..d6b62e02 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitRestSourceConnectorConfig.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitRestSourceConnectorConfig.java @@ -254,6 +254,10 @@ public class FitbitRestSourceConnectorConfig extends RestSourceConnectorConfig { private static final String FITBIT_FORBIDDEN_BACKOFF_DISPLAY = "Forbidden backoff time (s)"; private static final int FITBIT_FORBIDDEN_BACKOFF_DEFAULT = 86400; // 24 hours + public static final String FITBIT_TOO_MANY_REQUESTS_COOLDOWN_CONFIG = "fitbit.too.many.requests.cooldown.s"; + private static final String FITBIT_TOO_MANY_REQUESTS_COOLDOWN_DOC = "Cooldown time in seconds after receiving too many requests (429) response."; + private static final String FITBIT_TOO_MANY_REQUESTS_COOLDOWN_DISPLAY = "Too many requests cooldown (s)"; + private static final int FITBIT_TOO_MANY_REQUESTS_COOLDOWN_DEFAULT = 3600; // 1 hour private UserRepository userRepository; private final Headers clientCredentials; @@ -689,6 +693,16 @@ public String toString() { ++orderInGroup, Width.SHORT, FITBIT_FORBIDDEN_BACKOFF_DISPLAY) + + .define(FITBIT_TOO_MANY_REQUESTS_COOLDOWN_CONFIG, + Type.INT, + FITBIT_TOO_MANY_REQUESTS_COOLDOWN_DEFAULT, + Importance.MEDIUM, + FITBIT_TOO_MANY_REQUESTS_COOLDOWN_DOC, + group, + ++orderInGroup, + Width.SHORT, + FITBIT_TOO_MANY_REQUESTS_COOLDOWN_DISPLAY) ; } @@ -849,7 +863,7 @@ public Duration getPollIntervalPerUser() { } public Duration getTooManyRequestsCooldownInterval() { - return Duration.ofHours(1); + return Duration.ofSeconds(getInt(FITBIT_TOO_MANY_REQUESTS_COOLDOWN_CONFIG)); } public String getFitbitIntradayCaloriesTopic() { From de7aeb657cbbbdf5e5d897498008464603166c37 Mon Sep 17 00:00:00 2001 From: yatharthranjan Date: Mon, 28 Jul 2025 15:52:42 +0100 Subject: [PATCH 5/5] top of the hour cooldown strategy for 429 --- .../FitbitRestSourceConnectorConfig.java | 23 +++++++ .../rest/fitbit/route/FitbitPollingRoute.java | 65 +++++++++++++++---- 2 files changed, 77 insertions(+), 11 deletions(-) diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitRestSourceConnectorConfig.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitRestSourceConnectorConfig.java index d6b62e02..b3b02767 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitRestSourceConnectorConfig.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitRestSourceConnectorConfig.java @@ -259,6 +259,11 @@ public class FitbitRestSourceConnectorConfig extends RestSourceConnectorConfig { private static final String FITBIT_TOO_MANY_REQUESTS_COOLDOWN_DISPLAY = "Too many requests cooldown (s)"; private static final int FITBIT_TOO_MANY_REQUESTS_COOLDOWN_DEFAULT = 3600; // 1 hour + public static final String FITBIT_COOLDOWN_STRATEGY_CONFIG = "fitbit.cooldown.strategy"; + private static final String FITBIT_COOLDOWN_STRATEGY_DOC = "Strategy for handling too many requests cooldown. Options: ROLLING_WINDOW, TOP_OF_HOUR"; + private static final String FITBIT_COOLDOWN_STRATEGY_DISPLAY = "Cooldown strategy"; + private static final String FITBIT_COOLDOWN_STRATEGY_DEFAULT = "ROLLING_WINDOW"; + private UserRepository userRepository; private final Headers clientCredentials; @@ -703,6 +708,16 @@ public String toString() { ++orderInGroup, Width.SHORT, FITBIT_TOO_MANY_REQUESTS_COOLDOWN_DISPLAY) + + .define(FITBIT_COOLDOWN_STRATEGY_CONFIG, + Type.STRING, + FITBIT_COOLDOWN_STRATEGY_DEFAULT, + Importance.MEDIUM, + FITBIT_COOLDOWN_STRATEGY_DOC, + group, + ++orderInGroup, + Width.SHORT, + FITBIT_COOLDOWN_STRATEGY_DISPLAY) ; } @@ -910,4 +925,12 @@ public int getMaxForbidden() { public int getForbiddenBackoff() { return getInt(FITBIT_FORBIDDEN_BACKOFF_CONFIG); } + + public String getCooldownStrategy() { + String value = getString(FITBIT_COOLDOWN_STRATEGY_CONFIG); + if (!value.equalsIgnoreCase("ROLLING_WINDOW") && !value.equalsIgnoreCase("TOP_OF_HOUR")) { + throw new ConfigException(FITBIT_COOLDOWN_STRATEGY_CONFIG, value, "Invalid cooldown strategy. Must be either ROLLING_WINDOW or TOP_OF_HOUR."); + } + return value; + } } diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitPollingRoute.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitPollingRoute.java index c7453e2c..3f6517ba 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitPollingRoute.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitPollingRoute.java @@ -23,6 +23,7 @@ import static java.time.ZoneOffset.UTC; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; +import java.time.temporal.ChronoUnit; import static java.time.temporal.ChronoUnit.DAYS; import static java.time.temporal.ChronoUnit.MINUTES; import static java.time.temporal.ChronoUnit.NANOS; @@ -120,6 +121,7 @@ public abstract class FitbitPollingRoute implements PollingRequestRoute { private final Map forbidden403Counter; private int maxForbiddenResponses; private Duration forbidden403Cooldown; + private String cooldownStrategy; public FitbitPollingRoute( FitbitRequestGenerator generator, @@ -147,6 +149,7 @@ public void initialize(RestSourceConnectorConfig config) { this.maxForbiddenResponses = fitbitConfig.getMaxForbidden(); this.converter().initialize(fitbitConfig); this.forbidden403Cooldown = Duration.ofSeconds(fitbitConfig.getForbiddenBackoff()); + this.cooldownStrategy = fitbitConfig.getCooldownStrategy(); } @Override @@ -178,19 +181,19 @@ public void requestFailed(RestRequest request, Response response) { if (response != null && response.code() == 429) { User user = ((FitbitRestRequest) request).getUser(); tooManyRequestsForUser.add(user); - String cooldownString = response.header("Retry-After"); - Duration cooldown = getTooManyRequestsCooldown(); - if (cooldownString != null) { - try { - cooldown = Duration.ofSeconds(Long.parseLong(cooldownString)); - } catch (NumberFormatException ex) { - cooldown = getTooManyRequestsCooldown(); - } + + Instant backOff; + if ("TOP_OF_HOUR".equalsIgnoreCase(cooldownStrategy)) { + backOff = calculateTopOfHourBackoff(); + logger.info("Too many requests for user {}. Using TOP_OF_HOUR strategy, backing off until: {}", + user, backOff); + } else { + backOff = calculateRollingWindowBackoff(response); + logger.info("Too many requests for user {}. Using ROLLING_WINDOW strategy, backing off until: {}", + user, backOff.plus(getPollIntervalPerUser())); } - Instant backOff = lastPoll.plus(cooldown); + lastPollPerUser.put(user.getId(), backOff); - logger.info("Too many requests for user {}. Backing off until {}", - user, backOff.plus(getPollIntervalPerUser())); } else if (response != null && response.code() == 403) { User user = ((FitbitRestRequest) request).getUser(); String userId = user.getId(); @@ -209,6 +212,46 @@ public void requestFailed(RestRequest request, Response response) { } } + /** + * Calculate backoff using top-of-hour strategy. + * Waits until the top of the next hour to align with Fitbit's rate limit reset. + */ + private Instant calculateTopOfHourBackoff() { + Instant now = Instant.now(); + Instant topOfNextHour = calculateTopOfNextHour(now).plus(ONE_SECOND); + return topOfNextHour; + } + + /** + * Calculate backoff using rolling window strategy. + * Uses configured cooldown duration from when the error occurred. + */ + private Instant calculateRollingWindowBackoff(Response response) { + String cooldownString = response.header("Retry-After"); + Duration cooldown = getTooManyRequestsCooldown(); + + if (cooldownString != null) { + try { + cooldown = Duration.ofSeconds(Long.parseLong(cooldownString)); + } catch (NumberFormatException ex) { + cooldown = getTooManyRequestsCooldown(); + } + } + + return lastPoll.plus(cooldown); + } + + /** + * Calculate the top of the next hour from the given instant. + */ + private Instant calculateTopOfNextHour(Instant instant) { + ZonedDateTime zonedDateTime = instant.atZone(UTC); + ZonedDateTime topOfNextHour = zonedDateTime + .plusHours(1) + .truncatedTo(ChronoUnit.HOURS); + return topOfNextHour.toInstant(); + } + /** * Actually construct requests, based on the current offset * @param user Fitbit user