Skip to content

Commit 90f9cc0

Browse files
authored
refactor: update partition reporting in RoutineLoadJobLister to focus on non-zero lag values (#18)
1 parent 50648f4 commit 90f9cc0

1 file changed

Lines changed: 24 additions & 14 deletions

File tree

src/tools/fe/routine_load/job_lister.rs

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -196,14 +196,15 @@ impl RoutineLoadJobLister {
196196
}
197197

198198
if job.lag.is_some() {
199-
// Partitions Overview: show Top 40 (max lag) and Bottom 10 (min lag)
199+
// Partitions Overview: show Top 30 (largest non-zero lag) and Bottom 20 (smallest non-zero lag)
200200
let rows = self.build_partition_rows(job.progress.as_ref(), job.lag.as_ref());
201-
let delayed = rows.iter().filter(|(_, _, lag_v)| *lag_v > 0).count();
201+
let nonzero_count = rows.iter().filter(|(_, _, lag_v)| *lag_v > 0).count();
202+
let zero_count = rows.len().saturating_sub(nonzero_count);
202203
if !rows.is_empty() {
203-
report.push_str("\nPartitions Overview (by lag):\n");
204-
report.push_str(&self.format_partitions_overview_top_bottom(&rows, 40, 10));
204+
report.push_str("\nPartitions Overview (non-zero lags only):\n");
205+
report.push_str(&self.format_partitions_overview_nonzero_top_bottom(&rows, 30, 20));
205206
report.push_str(&format!(
206-
"Top 40 & Bottom 10 shown; delayed: {delayed}, total partitions: {}\n",
207+
"non-zero-lag: {nonzero_count}, zero-lag: {zero_count}, total: {}\n",
207208
rows.len()
208209
));
209210

@@ -256,22 +257,30 @@ impl RoutineLoadJobLister {
256257
rows
257258
}
258259

259-
fn format_partitions_overview_top_bottom(
260+
fn format_partitions_overview_nonzero_top_bottom(
260261
&self,
261262
rows: &[(String, Option<String>, i64)],
262263
top_n: usize,
263264
bottom_n: usize,
264265
) -> String {
265-
let total = rows.len();
266+
// filter non-zero lag rows
267+
let mut nonzero: Vec<(String, Option<String>, i64)> = rows
268+
.iter()
269+
.filter(|(_, _, lag_v)| *lag_v > 0)
270+
.cloned()
271+
.collect();
272+
let total = nonzero.len();
266273
let mut out = String::new();
267274

268275
// Top section (largest lag first)
269276
out.push_str("Top by lag:\n");
270277
out.push_str("┌─────────────┬─────────────┬─────────────┐\n");
271278
out.push_str("│ Partition │ Progress │ Lag │\n");
272279
out.push_str("├─────────────┼─────────────┼─────────────┤\n");
280+
// sort desc
281+
nonzero.sort_by(|a, b| b.2.cmp(&a.2));
273282
let mut printed = 0usize;
274-
for (part, prog, lag_v) in rows.iter().take(top_n) {
283+
for (part, prog, lag_v) in nonzero.iter().take(top_n) {
275284
let prog_s = prog.as_deref().unwrap_or("N/A");
276285
out.push_str(&format!("│ {part:>11} │ {prog_s:>11} │ {lag_v:>11} │\n"));
277286
printed += 1;
@@ -286,12 +295,13 @@ impl RoutineLoadJobLister {
286295
out.push_str("┌─────────────┬─────────────┬─────────────┐\n");
287296
out.push_str("│ Partition │ Progress │ Lag │\n");
288297
out.push_str("├─────────────┼─────────────┼─────────────┤\n");
289-
if total > top_n {
290-
let start = total.saturating_sub(bottom_n);
291-
for (part, prog, lag_v) in rows.iter().skip(start) {
292-
let prog_s = prog.as_deref().unwrap_or("N/A");
293-
out.push_str(&format!("│ {part:>11} │ {prog_s:>11} │ {lag_v:>11} │\n"));
294-
}
298+
// sort asc
299+
nonzero.sort_by(|a, b| a.2.cmp(&b.2));
300+
let start = 0usize; // beginning for smallest
301+
let end = bottom_n.min(total);
302+
for (part, prog, lag_v) in nonzero.iter().skip(start).take(end) {
303+
let prog_s = prog.as_deref().unwrap_or("N/A");
304+
out.push_str(&format!("│ {part:>11} │ {prog_s:>11} │ {lag_v:>11} │\n"));
295305
}
296306
out.push_str("└─────────────┴─────────────┴─────────────┘\n");
297307

0 commit comments

Comments
 (0)