From fc12f11473a7816cf5690a7914f61793bcca94c5 Mon Sep 17 00:00:00 2001 From: Pim van Nierop Date: Wed, 25 Mar 2026 11:18:13 +0100 Subject: [PATCH 1/3] conservative: filter observation outside of date range --- .../converter/FitbitIntradayHeartRateAvroConverter.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/converter/FitbitIntradayHeartRateAvroConverter.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/converter/FitbitIntradayHeartRateAvroConverter.java index 89bc4cf1..92fb75e5 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/converter/FitbitIntradayHeartRateAvroConverter.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/converter/FitbitIntradayHeartRateAvroConverter.java @@ -64,12 +64,18 @@ protected Stream processRecords( // Used as the date to convert the local times in the dataset to absolute times. ZonedDateTime startDate = request.getDateRange().start(); + Instant rangeStart = startDate.toInstant(); + Instant rangeEnd = request.getDateRange().end().toInstant(); return iterableToStream(dataset) .map(tryOrNull(activity -> { Instant time = startDate.with(LocalTime.parse(activity.get("time").asText())) .toInstant(); + if (time.isBefore(rangeStart) || !time.isBefore(rangeEnd)) { + return null; + } + FitbitIntradayHeartRate heartRate = new FitbitIntradayHeartRate( time.toEpochMilli() / 1000d, timeReceived, From 468e915e54ee681d24ee1bae3955e04c74b26811 Mon Sep 17 00:00:00 2001 From: Pauline Date: Wed, 25 Mar 2026 12:27:21 +0000 Subject: [PATCH 2/3] Fix intraday heart rate date range --- .../rest/fitbit/route/FitbitIntradayHeartRateRoute.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitIntradayHeartRateRoute.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitIntradayHeartRateRoute.java index f7f06644..78cfb29f 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitIntradayHeartRateRoute.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitIntradayHeartRateRoute.java @@ -18,6 +18,7 @@ package org.radarbase.connect.rest.fitbit.route; import static java.time.format.DateTimeFormatter.ISO_LOCAL_TIME; +import static java.time.temporal.ChronoUnit.DAYS; import static java.time.temporal.ChronoUnit.SECONDS; import io.confluent.connect.avro.AvroData; @@ -46,8 +47,10 @@ protected Stream createRequests(User user) { return startDateGenerator(getOffset(user).plus(ONE_SECOND).truncatedTo(SECONDS)) .map(dateRange -> newRequest(user, dateRange, user.getExternalUserId(), DATE_FORMAT.format(dateRange.start()), - ISO_LOCAL_TIME.format(dateRange.start()), - ISO_LOCAL_TIME.format(dateRange.end().truncatedTo(SECONDS)))); + // Always request full day boundaries + ISO_LOCAL_TIME.format(dateRange.start().truncatedTo(DAYS).truncatedTo(SECONDS)), + ISO_LOCAL_TIME.format(dateRange.start().truncatedTo(DAYS).plusDays(1).minusNanos(1) + .truncatedTo(SECONDS)))); } @Override From c412767249f6c28b3380ad3a2dd93580a7daa548 Mon Sep 17 00:00:00 2001 From: Pauline Date: Wed, 25 Mar 2026 13:17:00 +0000 Subject: [PATCH 3/3] Fix time format --- .../rest/fitbit/route/FitbitIntradayHeartRateRoute.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitIntradayHeartRateRoute.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitIntradayHeartRateRoute.java index 78cfb29f..f7e80044 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitIntradayHeartRateRoute.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitIntradayHeartRateRoute.java @@ -17,7 +17,6 @@ package org.radarbase.connect.rest.fitbit.route; -import static java.time.format.DateTimeFormatter.ISO_LOCAL_TIME; import static java.time.temporal.ChronoUnit.DAYS; import static java.time.temporal.ChronoUnit.SECONDS; @@ -48,8 +47,8 @@ protected Stream createRequests(User user) { .map(dateRange -> newRequest(user, dateRange, user.getExternalUserId(), DATE_FORMAT.format(dateRange.start()), // Always request full day boundaries - ISO_LOCAL_TIME.format(dateRange.start().truncatedTo(DAYS).truncatedTo(SECONDS)), - ISO_LOCAL_TIME.format(dateRange.start().truncatedTo(DAYS).plusDays(1).minusNanos(1) + TIME_FORMAT.format(dateRange.start().truncatedTo(DAYS).truncatedTo(SECONDS)), + TIME_FORMAT.format(dateRange.start().truncatedTo(DAYS).plusDays(1).minusNanos(1) .truncatedTo(SECONDS)))); }