From 465cad75a5f7b333c7b31a6ecf6de32ffb203d96 Mon Sep 17 00:00:00 2001 From: Pim van Nierop Date: Wed, 25 Mar 2026 11:18:13 +0100 Subject: [PATCH 1/2] conservative: filter observation outside of date range --- .../converter/FitbitIntradayHeartRateAvroConverter.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/converter/FitbitIntradayHeartRateAvroConverter.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/converter/FitbitIntradayHeartRateAvroConverter.java index 89bc4cf1..92fb75e5 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/converter/FitbitIntradayHeartRateAvroConverter.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/converter/FitbitIntradayHeartRateAvroConverter.java @@ -64,12 +64,18 @@ protected Stream processRecords( // Used as the date to convert the local times in the dataset to absolute times. ZonedDateTime startDate = request.getDateRange().start(); + Instant rangeStart = startDate.toInstant(); + Instant rangeEnd = request.getDateRange().end().toInstant(); return iterableToStream(dataset) .map(tryOrNull(activity -> { Instant time = startDate.with(LocalTime.parse(activity.get("time").asText())) .toInstant(); + if (time.isBefore(rangeStart) || !time.isBefore(rangeEnd)) { + return null; + } + FitbitIntradayHeartRate heartRate = new FitbitIntradayHeartRate( time.toEpochMilli() / 1000d, timeReceived, From 90022397fa57babf518eebb5c44bfbffcf552c61 Mon Sep 17 00:00:00 2001 From: Pim van Nierop Date: Wed, 25 Mar 2026 11:49:42 +0100 Subject: [PATCH 2/2] conservative: download intraday heart rate data by day --- .../route/FitbitIntradayHeartRateRoute.java | 25 +++++++++++++------ 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitIntradayHeartRateRoute.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitIntradayHeartRateRoute.java index f7f06644..471056f8 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitIntradayHeartRateRoute.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitIntradayHeartRateRoute.java @@ -17,16 +17,20 @@ package org.radarbase.connect.rest.fitbit.route; -import static java.time.format.DateTimeFormatter.ISO_LOCAL_TIME; -import static java.time.temporal.ChronoUnit.SECONDS; +import static java.time.ZoneOffset.UTC; import io.confluent.connect.avro.AvroData; + +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.List; import java.util.stream.Stream; import org.radarbase.connect.rest.fitbit.converter.FitbitIntradayHeartRateAvroConverter; import org.radarbase.connect.rest.fitbit.request.FitbitRequestGenerator; import org.radarbase.connect.rest.fitbit.request.FitbitRestRequest; import org.radarbase.connect.rest.fitbit.user.User; import org.radarbase.connect.rest.fitbit.user.UserRepository; +import org.radarbase.connect.rest.fitbit.util.DateRange; public class FitbitIntradayHeartRateRoute extends FitbitPollingRoute { private final FitbitIntradayHeartRateAvroConverter converter; @@ -39,15 +43,22 @@ public FitbitIntradayHeartRateRoute(FitbitRequestGenerator generator, @Override protected String getUrlFormat(String baseUrl) { - return baseUrl + "/1/user/%s/activities/heart/date/%s/1d/1sec/time/%s/%s.json?timezone=UTC"; + // URL format args: user-id, date + return baseUrl + "/1/user/%s/activities/heart/date/%s/1d/1sec.json?timezone=UTC"; } protected Stream createRequests(User user) { - return startDateGenerator(getOffset(user).plus(ONE_SECOND).truncatedTo(SECONDS)) + // Important: heart rate is queried at the resolution of a single + // day, so the offset for the next request will be set to the next day. + Instant startDate = this.getOffset(user).plus(ONE_DAY) + .atZone(UTC) + .truncatedTo(ChronoUnit.DAYS).toInstant(); + List dateRangeStream = startDateGenerator(startDate).toList(); + return dateRangeStream.stream() .map(dateRange -> newRequest(user, dateRange, - user.getExternalUserId(), DATE_FORMAT.format(dateRange.start()), - ISO_LOCAL_TIME.format(dateRange.start()), - ISO_LOCAL_TIME.format(dateRange.end().truncatedTo(SECONDS)))); + user.getExternalUserId(), + DATE_FORMAT.format(dateRange.start()) + )); } @Override