diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/converter/FitbitIntradayHeartRateAvroConverter.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/converter/FitbitIntradayHeartRateAvroConverter.java index 89bc4cf1..92fb75e5 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/converter/FitbitIntradayHeartRateAvroConverter.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/converter/FitbitIntradayHeartRateAvroConverter.java @@ -64,12 +64,18 @@ protected Stream processRecords( // Used as the date to convert the local times in the dataset to absolute times. ZonedDateTime startDate = request.getDateRange().start(); + Instant rangeStart = startDate.toInstant(); + Instant rangeEnd = request.getDateRange().end().toInstant(); return iterableToStream(dataset) .map(tryOrNull(activity -> { Instant time = startDate.with(LocalTime.parse(activity.get("time").asText())) .toInstant(); + if (time.isBefore(rangeStart) || !time.isBefore(rangeEnd)) { + return null; + } + FitbitIntradayHeartRate heartRate = new FitbitIntradayHeartRate( time.toEpochMilli() / 1000d, timeReceived, diff --git a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitIntradayHeartRateRoute.java b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitIntradayHeartRateRoute.java index f7f06644..f7e80044 100644 --- a/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitIntradayHeartRateRoute.java +++ b/kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitIntradayHeartRateRoute.java @@ -17,7 +17,7 @@ package org.radarbase.connect.rest.fitbit.route; -import static java.time.format.DateTimeFormatter.ISO_LOCAL_TIME; +import static java.time.temporal.ChronoUnit.DAYS; import static java.time.temporal.ChronoUnit.SECONDS; import io.confluent.connect.avro.AvroData; @@ -46,8 +46,10 @@ protected Stream createRequests(User user) { return startDateGenerator(getOffset(user).plus(ONE_SECOND).truncatedTo(SECONDS)) .map(dateRange -> newRequest(user, dateRange, user.getExternalUserId(), DATE_FORMAT.format(dateRange.start()), - ISO_LOCAL_TIME.format(dateRange.start()), - ISO_LOCAL_TIME.format(dateRange.end().truncatedTo(SECONDS)))); + // Always request full day boundaries + TIME_FORMAT.format(dateRange.start().truncatedTo(DAYS).truncatedTo(SECONDS)), + TIME_FORMAT.format(dateRange.start().truncatedTo(DAYS).plusDays(1).minusNanos(1) + .truncatedTo(SECONDS)))); } @Override