diff --git a/logging/src/lib.rs b/logging/src/lib.rs index a51ec3b4e..b02c164b3 100644 --- a/logging/src/lib.rs +++ b/logging/src/lib.rs @@ -66,6 +66,11 @@ impl Registry { .map(|x| (*x).clone()) } + /// Iterates over the names of currently bound loggers. + pub fn names(&self) -> impl Iterator { + self.map.keys().map(String::as_str) + } + /// Creates a new logger registry. pub fn new(time: Instant) -> Self { Registry { diff --git a/mdbook/src/chapter_5/chapter_5_4.md b/mdbook/src/chapter_5/chapter_5_4.md index 0ad588d15..17656487d 100644 --- a/mdbook/src/chapter_5/chapter_5_4.md +++ b/mdbook/src/chapter_5/chapter_5_4.md @@ -222,6 +222,22 @@ Beyond the main `"timely"` stream, there are typed log streams for deeper intros The `` in the stream names is the Rust type name of the dataflow's timestamp, obtained from `std::any::type_name::()` (e.g., `"timely/progress/usize"` for a dataflow using `usize` timestamps). Note that `type_name` is best-effort and not guaranteed to be stable across compiler versions, so these stream names should be treated accordingly. +Because the typed-stream names embed `type_name::()`, the exact key for a given dataflow can be awkward to predict (especially for nested or composite timestamps). `Registry::names()` returns an iterator over the names currently bound, which you can call after constructing a dataflow to see what is available: + +```rust,no_run +timely::execute_from_args(std::env::args(), |worker| { + worker.dataflow::(|scope| { + // ... build your dataflow ... + }); + + if let Some(registry) = worker.log_register() { + for name in registry.names() { + println!("{name}"); + } + } +}).unwrap(); +``` + **`TimelyProgressEvent`** captures the exchange of progress information between operators. Each event records whether it is a send or receive (`is_send`), the `source` worker, the `channel` and `seq_no`, the `identifier` of the operator, and two lists of updates: `messages` (updates to message counts at targets) and `internal` (updates to capabilities at sources). Each update is a tuple `(node, port, timestamp, delta)`. These are primarily useful for debugging the progress tracking protocol. **`TrackerEvent`** records updates to the reachability tracker, which maintains the set of timestamps that could still arrive at each operator port. Each scope (subgraph) has its own tracker, identified by `tracker_id` — this is the worker-unique `id` of the scope operator (the same `id` from `OperatesEvent`). diff --git a/timely/src/progress/frontier.rs b/timely/src/progress/frontier.rs index 25c82ed56..15e3dfaf0 100644 --- a/timely/src/progress/frontier.rs +++ b/timely/src/progress/frontier.rs @@ -532,18 +532,13 @@ impl MutableAntichain { T: Clone + PartialOrder + Ord, I: IntoIterator, { - let updates = updates.into_iter(); - // track whether a rebuild is needed. let mut rebuild_required = false; for (time, delta) in updates { - // If we do not yet require a rebuild, test whether we might require one // and set the flag in that case. if !rebuild_required { - let beyond_frontier = self.frontier.iter().any(|f| f.less_than(&time)); - let before_frontier = !self.frontier.iter().any(|f| f.less_equal(&time)); - rebuild_required = !(beyond_frontier || (delta < 0 && before_frontier)); + rebuild_required = self.requires_rebuild(&time, delta); } self.updates.update(time, delta); @@ -555,6 +550,34 @@ impl MutableAntichain { self.changes.drain() } + /// Tests whether applying `(time, delta)` will require a frontier rebuild. + /// + /// Factored out of [`Self::update_iter`] so it is generic only over `T` and not + /// the iterator type, deduplicating the inlined `frontier.iter().any(...)` bodies + /// across `update_iter` monomorphizations. + fn requires_rebuild(&self, time: &T, delta: i64) -> bool + where + T: PartialOrder, + { + // Single-pass `for` loop (instead of two `Iterator::any` calls) avoids + // monomorphizing `slice::Iter::any` over per-call-site closure types and + // traverses `self.frontier` at most once. + let mut beyond_frontier = false; + let mut before_frontier = true; + for f in &self.frontier { + if !beyond_frontier && f.less_than(time) { + beyond_frontier = true; + } + if before_frontier && f.less_equal(time) { + before_frontier = false; + } + if beyond_frontier && !before_frontier { + break; + } + } + !(beyond_frontier || (delta < 0 && before_frontier)) + } + /// Rebuilds `self.frontier` from `self.updates`. /// /// This method is meant to be used for bulk updates to the frontier, and does more work than one might do