diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/converter/FitbitIntradayHeartRateAvroConverter.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/converter/FitbitIntradayHeartRateAvroConverter.java index 89bc4cf1..92fb75e5 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/converter/FitbitIntradayHeartRateAvroConverter.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/converter/FitbitIntradayHeartRateAvroConverter.java @@ -64,12 +64,18 @@ protected Stream processRecords( // Used as the date to convert the local times in the dataset to absolute times. ZonedDateTime startDate = request.getDateRange().start(); + Instant rangeStart = startDate.toInstant(); + Instant rangeEnd = request.getDateRange().end().toInstant(); return iterableToStream(dataset) .map(tryOrNull(activity -> { Instant time = startDate.with(LocalTime.parse(activity.get("time").asText())) .toInstant(); + if (time.isBefore(rangeStart) || !time.isBefore(rangeEnd)) { + return null; + } + FitbitIntradayHeartRate heartRate = new FitbitIntradayHeartRate( time.toEpochMilli() / 1000d, timeReceived, diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitIntradayHeartRateRoute.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitIntradayHeartRateRoute.java index f7f06644..471056f8 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitIntradayHeartRateRoute.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitIntradayHeartRateRoute.java @@ -17,16 +17,20 @@ package org.radarbase.connect.rest.fitbit.route; -import static java.time.format.DateTimeFormatter.ISO_LOCAL_TIME; -import static java.time.temporal.ChronoUnit.SECONDS; +import static java.time.ZoneOffset.UTC; import io.confluent.connect.avro.AvroData; + +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.List; import java.util.stream.Stream; import org.radarbase.connect.rest.fitbit.converter.FitbitIntradayHeartRateAvroConverter; import org.radarbase.connect.rest.fitbit.request.FitbitRequestGenerator; import org.radarbase.connect.rest.fitbit.request.FitbitRestRequest; import org.radarbase.connect.rest.fitbit.user.User; import org.radarbase.connect.rest.fitbit.user.UserRepository; +import org.radarbase.connect.rest.fitbit.util.DateRange; public class FitbitIntradayHeartRateRoute extends FitbitPollingRoute { private final FitbitIntradayHeartRateAvroConverter converter; @@ -39,15 +43,22 @@ public FitbitIntradayHeartRateRoute(FitbitRequestGenerator generator, @Override protected String getUrlFormat(String baseUrl) { - return baseUrl + "/1/user/%s/activities/heart/date/%s/1d/1sec/time/%s/%s.json?timezone=UTC"; + // URL format args: user-id, date + return baseUrl + "/1/user/%s/activities/heart/date/%s/1d/1sec.json?timezone=UTC"; } protected Stream createRequests(User user) { - return startDateGenerator(getOffset(user).plus(ONE_SECOND).truncatedTo(SECONDS)) + // Important: heart rate is queried at the resolution of a single + // day, so the offset for the next request will be set to the next day. + Instant startDate = this.getOffset(user).plus(ONE_DAY) + .atZone(UTC) + .truncatedTo(ChronoUnit.DAYS).toInstant(); + List dateRangeStream = startDateGenerator(startDate).toList(); + return dateRangeStream.stream() .map(dateRange -> newRequest(user, dateRange, - user.getExternalUserId(), DATE_FORMAT.format(dateRange.start()), - ISO_LOCAL_TIME.format(dateRange.start()), - ISO_LOCAL_TIME.format(dateRange.end().truncatedTo(SECONDS)))); + user.getExternalUserId(), + DATE_FORMAT.format(dateRange.start()) + )); } @Override