Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions aggregator/src/aggregator/aggregation_job_creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -952,7 +952,7 @@ mod tests {
};
use janus_messages::{
AggregationJobStep, Interval, Query, ReportError, ReportId, ReportIdChecksum,
ReportMetadata, Role, TaskId, Time,
ReportMetadata, Role, TaskId,
batch_mode::{LeaderSelected, TimeInterval},
taskprov::TimePrecision,
};
Expand Down Expand Up @@ -2847,13 +2847,8 @@ mod tests {
let report_time_1 = report_time_2.sub_duration(&batch_time_window_size).unwrap();

// Compute the bucketed time bucket starts for querying outstanding batches
let batch_window_units = batch_time_window_size.as_time_precision_units();
let bucket_time_1 = Time::from_time_precision_units(
(report_time_1.as_time_precision_units() / batch_window_units) * batch_window_units,
);
let bucket_time_2 = Time::from_time_precision_units(
(report_time_2.as_time_precision_units() / batch_window_units) * batch_window_units,
);
let bucket_time_1 = report_time_1.to_batch_interval_start(batch_time_window_size);
let bucket_time_2 = report_time_2.to_batch_interval_start(batch_time_window_size);
let vdaf = Arc::new(Prio3::new_count(2).unwrap());
let helper_hpke_keypair = HpkeKeypair::test();

Expand Down
10 changes: 4 additions & 6 deletions aggregator/src/aggregator/batch_creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use janus_aggregator_core::{
},
},
};
use janus_core::time::{Clock, IntervalExt};
use janus_core::time::{Clock, IntervalExt, TimeExt};
use janus_messages::{
AggregationJobStep, BatchId, Duration, Interval, ReportId, TaskId, Time,
batch_mode::LeaderSelected,
Expand Down Expand Up @@ -117,11 +117,9 @@ where
.map(|batch_time_window_size| {
// While everything is in units of the time precision, we still have to
// bucket things by the batch_time_window_size.
let batch_window_units = batch_time_window_size.as_time_precision_units();
Time::from_time_precision_units(
(report.client_timestamp().as_time_precision_units() / batch_window_units)
* batch_window_units,
)
report
.client_timestamp()
.to_batch_interval_start(batch_time_window_size)
});
let mut map_entry = self.buckets.entry(time_bucket_start_opt);
let bucket = match &mut map_entry {
Expand Down
12 changes: 3 additions & 9 deletions aggregator_core/src/datastore/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7448,16 +7448,10 @@ async fn delete_expired_collection_artifacts(ephemeral_datastore: EphemeralDatas
} => {
// Compute the batch time bucket start by rounding down to
// batch_time_window_size
let batch_window_units = batch_time_window_size.as_time_precision_units();
let time_bucket_start = Time::from_time_precision_units(
(client_timestamps[0].as_time_precision_units() / batch_window_units)
* batch_window_units,
);
let time_bucket_start =
client_timestamps[0].to_batch_interval_start(*batch_time_window_size);
let same_bucket = client_timestamps.iter().all(|ts| {
Time::from_time_precision_units(
(ts.as_time_precision_units() / batch_window_units)
* batch_window_units,
) == time_bucket_start
ts.to_batch_interval_start(*batch_time_window_size) == time_bucket_start
});
assert!(
same_bucket,
Expand Down
2 changes: 1 addition & 1 deletion aggregator_core/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub enum BatchMode {
/// If present, reports will be separated into different batches by timestamp, such that
/// the client timestamp interval duration will not exceed this value. The minimum and
/// maximum allowed report timestamps for each batch will be multiples of this value as
/// well. This must be a multiple of the task's time precision.
/// well.
///
/// This is an implementation-specific configuration parameter, and not part of the query
/// type as defined in DAP.
Expand Down
65 changes: 39 additions & 26 deletions core/src/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,12 +261,6 @@ pub trait DateTimeExt {
/// Returns true if and only if this [`DateTime<Utc>`] occurs before the given [`Time`].
fn is_before(&self, time: &Time, time_precision: &TimePrecision) -> bool;

/// Compute the start of the batch interval containing this DateTime, given the task time
/// precision.
fn to_batch_interval_start(&self, time_precision: &TimePrecision) -> Result<Self, Error>
where
Self: Sized;

/// Get the difference between this [`DateTime<Utc>`] and the provided `other` [`Time`].
/// Returns `self - other`. `self` must be after `other`.
fn difference_as_time_delta(
Expand Down Expand Up @@ -331,25 +325,6 @@ impl DateTimeExt for DateTime<Utc> {
self.as_seconds_since_epoch() < time.as_seconds_since_epoch(time_precision)
}

fn to_batch_interval_start(&self, time_precision: &TimePrecision) -> Result<Self, Error> {
let seconds = self.timestamp() as u64;
let rem = seconds.checked_rem(time_precision.as_seconds()).ok_or(
Error::IllegalTimeArithmetic("remainder would overflow/underflow"),
)?;
let aligned_seconds = seconds
.checked_sub(rem)
.ok_or(Error::IllegalTimeArithmetic("operation would underflow"))?;
DateTime::<Utc>::from_timestamp(
aligned_seconds
.try_into()
.map_err(|_| Error::IllegalTimeArithmetic("number of seconds too big for i64"))?,
0,
)
.ok_or(Error::IllegalTimeArithmetic(
"number of seconds is out of range",
))
}

fn difference_as_time_delta(
&self,
other: &Time,
Expand Down Expand Up @@ -422,6 +397,21 @@ pub trait TimeExt: Sized {

/// Get the number of time precision units as a signed integer, if possible.
fn as_signed_time_precision_units(&self) -> Result<i64, Error>;

/// Compute the start of the batch interval containing this `Time`, given the duration of the
/// batch intervals in the task. For example:
///
/// ```no-compile
/// assert_eq!(
/// Time::from_time_precision_units(17)
/// .to_batch_interval_start(Duration::from_time_precision_units(4)),
/// Time::from_time_precision_units(16),
/// );
/// ```
///
/// This is irrespective of whatever time precision the two values are in. But for the
/// computation to be meaningful, they should use the same time precision.
fn to_batch_interval_start(&self, batch_interval_duration: Duration) -> Self;
}

impl TimeExt for Time {
Expand Down Expand Up @@ -540,6 +530,13 @@ impl TimeExt for Time {
.try_into()
.map_err(|_| Error::IllegalTimeArithmetic("time too large for signed integer"))
}

fn to_batch_interval_start(&self, batch_interval_duration: Duration) -> Self {
let batch_interval_units = batch_interval_duration.as_time_precision_units();
Self::from_time_precision_units(
(self.as_time_precision_units() / batch_interval_units) * batch_interval_units,
)
}
}

/// Extension methods on [`Interval`].
Expand Down Expand Up @@ -588,7 +585,7 @@ mod tests {
use chrono::{DateTime, TimeDelta, Utc};
use janus_messages::{Duration, Interval, Time, taskprov::TimePrecision};

use crate::time::{Clock, DateTimeExt, IntervalExt, MockClock, TimeDeltaExt};
use crate::time::{Clock, DateTimeExt, IntervalExt, MockClock, TimeDeltaExt, TimeExt};

const TEST_TIME_PRECISION: TimePrecision = TimePrecision::from_seconds(1);

Expand Down Expand Up @@ -765,6 +762,22 @@ mod tests {
}
}

#[test]
fn time_to_batch_interval_start() {
for (label, time_in, expected) in [
("aligned", 16, 16),
("not aligned bigger than batch interval", 17, 16),
("not aligned smaller than batch interval", 15, 0),
] {
assert_eq!(
Time::from_time_precision_units(time_in)
.to_batch_interval_start(Duration::from_time_precision_units(16)),
Time::from_time_precision_units(expected),
"{label}: failure"
)
}
}

#[test]
fn add_duration_success() {
let dt = DateTime::<Utc>::from_timestamp(1000000000, 0).unwrap();
Expand Down
5 changes: 2 additions & 3 deletions interop_binaries/tests/end_to_end.rs
Original file line number Diff line number Diff line change
Expand Up @@ -414,9 +414,8 @@ async fn run(
let query_json = match query_kind {
QueryKind::TimeInterval => {
let batch_interval_start = start_timestamp
.to_batch_interval_start(&TimePrecision::from_seconds(TIME_PRECISION))
.unwrap()
.as_seconds_since_epoch();
.to_time(&TimePrecision::from_seconds(TIME_PRECISION))
.as_seconds_since_epoch(&TimePrecision::from_seconds(TIME_PRECISION));
// Span the aggregation over two time precisions, just in case our measurements
// spilled over a batch boundary.
let batch_interval_duration = TIME_PRECISION * 2;
Expand Down
Loading