From 53d8060907af3e84ae15283e1b77a4a9634d099c Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Fri, 27 Mar 2026 11:59:10 +0400 Subject: [PATCH 01/25] Add logging at runloop ticks --- crates/lib/runloop/src/runloop.rs | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/crates/lib/runloop/src/runloop.rs b/crates/lib/runloop/src/runloop.rs index 7c582ee3..99f39d4f 100644 --- a/crates/lib/runloop/src/runloop.rs +++ b/crates/lib/runloop/src/runloop.rs @@ -738,6 +738,25 @@ where { break 'runloop Err(err.into()); } + + tracing::trace!( + target: "runloop-ticks", + + instances_idle, + next_shard, + + shard_senders_len = shard_senders.len(), + + executor_shards_len = executor_shards.len(), + inflight_actions_len = inflight_actions.len(), + inflight_dispatches_len = inflight_dispatches.len(), + sleeping_nodes_len = sleeping_nodes.len(), + sleeping_by_instance_len = sleeping_by_instance.len(), + blocked_until_by_instance_len = blocked_until_by_instance.len(), + instances_done_pending_len = instances_done_pending.len(), + + "runloop tick" + ); }; info!( From ac770bd3bc22ef2795abfbb77948e6570a3ba3a9 Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Fri, 27 Mar 2026 13:01:35 +0400 Subject: [PATCH 02/25] Add a metric of a total number of the runloop ticks --- Cargo.lock | 1 + crates/lib/runloop/Cargo.toml | 1 + crates/lib/runloop/src/runloop.rs | 2 ++ 3 files changed, 4 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index ec145120..6181d904 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4417,6 +4417,7 @@ name = "waymark-runloop" version = "0.1.0" dependencies = [ "chrono", + "metrics", "mockall", "nonempty-collections", "prost 0.12.6", diff --git a/crates/lib/runloop/Cargo.toml b/crates/lib/runloop/Cargo.toml index fed1581a..8e8afe67 100644 --- a/crates/lib/runloop/Cargo.toml +++ b/crates/lib/runloop/Cargo.toml @@ -23,6 +23,7 @@ waymark-worker-core = { workspace = true } waymark-workflow-registry-backend = { workspace = true } chrono = { workspace = true, default-features = false } +metrics = { workspace = true } nonempty-collections = { workspace = true } prost = { workspace = true } serde_json = { workspace = true } diff --git a/crates/lib/runloop/src/runloop.rs b/crates/lib/runloop/src/runloop.rs index 99f39d4f..bfcc0985 100644 --- a/crates/lib/runloop/src/runloop.rs +++ b/crates/lib/runloop/src/runloop.rs @@ -739,6 +739,8 @@ where break 'runloop Err(err.into()); } + metrics::counter!("waymark_runloop_ticks_total").increment(1); + tracing::trace!( target: "runloop-ticks", From 287177ddfe52120ba2128fe4231b1ecb942d2520 Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Fri, 27 Mar 2026 13:15:49 +0400 Subject: [PATCH 03/25] Add waymark_runloop_ticks_by_cause_total metrics --- crates/lib/runloop/src/runloop.rs | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/crates/lib/runloop/src/runloop.rs b/crates/lib/runloop/src/runloop.rs index bfcc0985..814ef97f 100644 --- a/crates/lib/runloop/src/runloop.rs +++ b/crates/lib/runloop/src/runloop.rs @@ -448,14 +448,17 @@ where match first_event { CoordinatorEvent::Completions(completions) => { + metrics::counter!("waymark_runloop_ticks_by_cause_total", "cause" => "completions").increment(1); all_completions.extend(completions); } CoordinatorEvent::Instance(queued_instances_polling::Message::Batch { instances, }) => { + metrics::counter!("waymark_runloop_ticks_by_cause_total", "cause" => "instances").increment(1); all_instances.extend(instances); } CoordinatorEvent::Instance(queued_instances_polling::Message::Pending) => { + metrics::counter!("waymark_runloop_ticks_by_cause_total", "cause" => "instances_pending").increment(1); queued_instances_poller_is_pending = true; } CoordinatorEvent::Instance(queued_instances_polling::Message::Error(err)) => { @@ -463,12 +466,16 @@ where break 'runloop Err(Error::CoreBackendPoll(err)); } CoordinatorEvent::Shard(event) => match event { - shard::Event::Step(step) => all_steps.push(step), + shard::Event::Step(step) => { + metrics::counter!("waymark_runloop_ticks_by_cause_total", "cause" => "shard_step").increment(1); + all_steps.push(step) + } shard::Event::InstanceFailed { executor_id, entry_node, error, } => { + metrics::counter!("waymark_runloop_ticks_by_cause_total", "cause" => "shard_instance_failed").increment(1); all_failed_instances.push(InstanceDone { executor_id, entry_node, @@ -478,12 +485,16 @@ where } }, CoordinatorEvent::SleepWake(wake) => { + metrics::counter!("waymark_runloop_ticks_by_cause_total", "cause" => "sleep_wake").increment(1); all_wakes.push(wake); } CoordinatorEvent::PersistAck(ack) => { + metrics::counter!("waymark_runloop_ticks_by_cause_total", "cause" => "persist_ack").increment(1); all_persist_acks.push(ack); } - CoordinatorEvent::ActionTimeoutTick => {} + CoordinatorEvent::ActionTimeoutTick => { + metrics::counter!("waymark_runloop_ticks_by_cause_total", "cause" => "action_timeout_tick").increment(1); + } } while let Ok(completions) = completion_rx.try_recv() { From 598ecfcdaba6d4a36f2085e174751f9d27ba7c50 Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Thu, 2 Apr 2026 09:40:06 +0400 Subject: [PATCH 04/25] Add waymark-metrics-util crate --- Cargo.lock | 7 +++++++ crates/lib/metrics-util/Cargo.toml | 8 ++++++++ crates/lib/metrics-util/src/lib.rs | 9 +++++++++ 3 files changed, 24 insertions(+) create mode 100644 crates/lib/metrics-util/Cargo.toml create mode 100644 crates/lib/metrics-util/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 6181d904..46f4cb33 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4355,6 +4355,13 @@ dependencies = [ "waymark-proto", ] +[[package]] +name = "waymark-metrics-util" +version = "0.1.0" +dependencies = [ + "metrics", +] + [[package]] name = "waymark-nonzero-duration" version = "0.1.0" diff --git a/crates/lib/metrics-util/Cargo.toml b/crates/lib/metrics-util/Cargo.toml new file mode 100644 index 00000000..33e6e16d --- /dev/null +++ b/crates/lib/metrics-util/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "waymark-metrics-util" +edition = "2024" +version.workspace = true +publish.workspace = true + +[dependencies] +metrics = { workspace = true } diff --git a/crates/lib/metrics-util/src/lib.rs b/crates/lib/metrics-util/src/lib.rs new file mode 100644 index 00000000..c4e0139c --- /dev/null +++ b/crates/lib/metrics-util/src/lib.rs @@ -0,0 +1,9 @@ +//! Utilities for [`metrics`] crate. + +pub struct Val(pub T); + +impl metrics::IntoF64 for Val { + fn into_f64(self) -> f64 { + self.0 as _ + } +} From 70b8848dcad2945ca987ccdd05f6aef7e559f6ac Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Fri, 27 Mar 2026 13:44:59 +0400 Subject: [PATCH 05/25] Add waymark_runloop_channel_len --- Cargo.lock | 1 + Cargo.toml | 1 + crates/lib/runloop/Cargo.toml | 1 + crates/lib/runloop/src/runloop.rs | 12 ++++++++++++ 4 files changed, 15 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 46f4cb33..8af1eeed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4443,6 +4443,7 @@ dependencies = [ "waymark-dag-builder", "waymark-ids", "waymark-ir-parser", + "waymark-metrics-util", "waymark-nonzero-duration", "waymark-observability", "waymark-proto", diff --git a/Cargo.toml b/Cargo.toml index c91d0f29..c0b84afa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,7 @@ waymark-ir-conversions = { path = "crates/lib/ir-conversions" } waymark-ir-format = { path = "crates/lib/ir-format" } waymark-ir-parser = { path = "crates/lib/ir-parser" } waymark-message-conversions = { path = "crates/lib/message-conversions" } +waymark-metrics-util = { path = "crates/lib/metrics-util" } waymark-nonzero-duration = { path = "crates/lib/nonzero-duration" } waymark-observability = { path = "crates/lib/observability" } waymark-observability-macros = { path = "crates/lib/observability-macros" } diff --git a/crates/lib/runloop/Cargo.toml b/crates/lib/runloop/Cargo.toml index 8e8afe67..2a139b85 100644 --- a/crates/lib/runloop/Cargo.toml +++ b/crates/lib/runloop/Cargo.toml @@ -10,6 +10,7 @@ waymark-core-backend = { workspace = true } waymark-dag = { workspace = true } waymark-dag-builder = { workspace = true } waymark-ids = { workspace = true } +waymark-metrics-util = { workspace = true } waymark-nonzero-duration = { workspace = true } waymark-observability = { workspace = true } waymark-proto = { workspace = true } diff --git a/crates/lib/runloop/src/runloop.rs b/crates/lib/runloop/src/runloop.rs index 814ef97f..de274c9d 100644 --- a/crates/lib/runloop/src/runloop.rs +++ b/crates/lib/runloop/src/runloop.rs @@ -23,6 +23,7 @@ use crate::commit_barrier::CommitBarrier; use crate::instance_lock_heartbeat; use crate::{error_value, persist, queued_instances_polling, shard}; use waymark_ids::{ExecutionId, InstanceId, LockId}; +use waymark_metrics_util::Val as MetricsVal; use waymark_dag::DAG; use waymark_observability::obs; @@ -438,6 +439,17 @@ where }, }; + metrics::histogram!("waymark_runloop_channel_len", "channel" => "completion_rx") + .record(MetricsVal(completion_rx.len())); + metrics::histogram!("waymark_runloop_channel_len", "channel" => "instance_rx") + .record(MetricsVal(instance_rx.len())); + metrics::histogram!("waymark_runloop_channel_len", "channel" => "sleep_rx") + .record(MetricsVal(sleep_rx.len())); + metrics::histogram!("waymark_runloop_channel_len", "channel" => "event_rx") + .record(MetricsVal(event_rx.len())); + metrics::histogram!("waymark_runloop_channel_len", "channel" => "persist_ack_rx") + .record(MetricsVal(persist_ack_rx.len())); + let mut all_completions: Vec = Vec::new(); let mut all_instances: Vec = Vec::new(); let mut all_steps: Vec = Vec::new(); From 0e5e67144a0bd665658132c141e44a593ec568b9 Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Fri, 27 Mar 2026 20:09:05 +0400 Subject: [PATCH 06/25] Add waymark_runloop_polled_instances_total and waymark_runloop_polled_instance_pending_total metrics --- crates/lib/runloop/src/runloop.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/crates/lib/runloop/src/runloop.rs b/crates/lib/runloop/src/runloop.rs index de274c9d..a0915b5c 100644 --- a/crates/lib/runloop/src/runloop.rs +++ b/crates/lib/runloop/src/runloop.rs @@ -467,10 +467,14 @@ where instances, }) => { metrics::counter!("waymark_runloop_ticks_by_cause_total", "cause" => "instances").increment(1); + metrics::counter!("waymark_runloop_polled_instances_total", "where" => "first") + .increment(instances.len().get() as _); all_instances.extend(instances); } CoordinatorEvent::Instance(queued_instances_polling::Message::Pending) => { metrics::counter!("waymark_runloop_ticks_by_cause_total", "cause" => "instances_pending").increment(1); + metrics::counter!("waymark_runloop_polled_instance_pending_total", "where" => "first") + .increment(1); queued_instances_poller_is_pending = true; } CoordinatorEvent::Instance(queued_instances_polling::Message::Error(err)) => { @@ -517,9 +521,11 @@ where while let Ok(message) = instance_rx.try_recv() { match message { queued_instances_polling::Message::Batch { instances } => { + metrics::counter!("waymark_runloop_polled_instances_total", "where" => "batch").increment(instances.len().get() as _); all_instances.extend(instances); } queued_instances_polling::Message::Pending => { + metrics::counter!("waymark_runloop_polled_instance_pending_total", "where" => "batch").increment(1); queued_instances_poller_is_pending = true; } queued_instances_polling::Message::Error(err) => { From 0a08a2cd9e510416ea9607b848dc97927183073c Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Fri, 27 Mar 2026 20:27:57 +0400 Subject: [PATCH 07/25] Add waymark_runloop_shard_steps_total metric --- crates/lib/runloop/src/runloop.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/crates/lib/runloop/src/runloop.rs b/crates/lib/runloop/src/runloop.rs index a0915b5c..f096e386 100644 --- a/crates/lib/runloop/src/runloop.rs +++ b/crates/lib/runloop/src/runloop.rs @@ -484,6 +484,8 @@ where CoordinatorEvent::Shard(event) => match event { shard::Event::Step(step) => { metrics::counter!("waymark_runloop_ticks_by_cause_total", "cause" => "shard_step").increment(1); + metrics::counter!("waymark_runloop_shard_steps_total", "where" => "first") + .increment(1); all_steps.push(step) } shard::Event::InstanceFailed { @@ -539,7 +541,11 @@ where let event = event.into_inner_measured_multi(&["shard_event_try", "shard_event_both"]); match event { - shard::Event::Step(step) => all_steps.push(step), + shard::Event::Step(step) => { + metrics::counter!("waymark_runloop_shard_steps_total", "where" => "batch") + .increment(1); + all_steps.push(step); + } shard::Event::InstanceFailed { executor_id, entry_node, From 5001f97ebfc83a2797bd52796cd9a998a9493e69 Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Fri, 27 Mar 2026 20:28:43 +0400 Subject: [PATCH 08/25] Add waymark_runloop_shard_instance_failed_total metric --- crates/lib/runloop/src/runloop.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/crates/lib/runloop/src/runloop.rs b/crates/lib/runloop/src/runloop.rs index f096e386..1840b46e 100644 --- a/crates/lib/runloop/src/runloop.rs +++ b/crates/lib/runloop/src/runloop.rs @@ -494,6 +494,8 @@ where error, } => { metrics::counter!("waymark_runloop_ticks_by_cause_total", "cause" => "shard_instance_failed").increment(1); + metrics::counter!("waymark_runloop_shard_instance_failed_total", "where" => "first") + .increment(1); all_failed_instances.push(InstanceDone { executor_id, entry_node, @@ -551,6 +553,8 @@ where entry_node, error, } => { + metrics::counter!("waymark_runloop_shard_instance_failed_total", "where" => "batch") + .increment(1); all_failed_instances.push(InstanceDone { executor_id, entry_node, From 4c16110a7300e90f4b4d2135419ec5ed2ccd3b5b Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Fri, 27 Mar 2026 21:21:11 +0400 Subject: [PATCH 09/25] Add waymark_runloop_polled_completions_total metric --- crates/lib/runloop/src/runloop.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/crates/lib/runloop/src/runloop.rs b/crates/lib/runloop/src/runloop.rs index 1840b46e..3515b186 100644 --- a/crates/lib/runloop/src/runloop.rs +++ b/crates/lib/runloop/src/runloop.rs @@ -461,6 +461,8 @@ where match first_event { CoordinatorEvent::Completions(completions) => { metrics::counter!("waymark_runloop_ticks_by_cause_total", "cause" => "completions").increment(1); + metrics::counter!("waymark_runloop_polled_completions_total", "where" => "first") + .increment(completions.len() as _); all_completions.extend(completions); } CoordinatorEvent::Instance(queued_instances_polling::Message::Batch { @@ -520,6 +522,8 @@ where while let Ok(completions) = completion_rx.try_recv() { let completions = completions.into_inner_measured_multi(&["completions_try", "completions_both"]); + metrics::counter!("waymark_runloop_polled_completions_total", "where" => "batch") + .increment(completions.len() as _); all_completions.extend(completions); } while let Ok(message) = instance_rx.try_recv() { From b701c6a2dffd87bc22b5ae0b600b3d4a8791f0e7 Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Fri, 27 Mar 2026 21:46:36 +0400 Subject: [PATCH 10/25] Add waymark_runloop_flush_instances_done_total metric --- crates/lib/runloop/src/runloop/ops/flush_instances_done.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/lib/runloop/src/runloop/ops/flush_instances_done.rs b/crates/lib/runloop/src/runloop/ops/flush_instances_done.rs index 3c8b5e80..0bceb3d5 100644 --- a/crates/lib/runloop/src/runloop/ops/flush_instances_done.rs +++ b/crates/lib/runloop/src/runloop/ops/flush_instances_done.rs @@ -33,6 +33,7 @@ where return Ok(()); } let batch = std::mem::take(pending); + metrics::counter!("waymark_runloop_flush_instances_done_total").increment(batch.len() as _); core_backend.save_instances_done(&batch).await?; Ok(()) } From 6f2dec7b1d89cb6419e26a4d9bd9231da5f1d01d Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Mon, 30 Mar 2026 16:22:33 +0400 Subject: [PATCH 11/25] Add waymark_runloop_ticks_stats_* metrics --- crates/lib/runloop/src/runloop.rs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/crates/lib/runloop/src/runloop.rs b/crates/lib/runloop/src/runloop.rs index 3515b186..036524ec 100644 --- a/crates/lib/runloop/src/runloop.rs +++ b/crates/lib/runloop/src/runloop.rs @@ -784,6 +784,21 @@ where metrics::counter!("waymark_runloop_ticks_total").increment(1); + metrics::histogram!("waymark_runloop_ticks_stats_executor_shards_len") + .record(MetricsVal(executor_shards.len())); + metrics::histogram!("waymark_runloop_ticks_stats_inflight_actions_len") + .record(MetricsVal(inflight_actions.len())); + metrics::histogram!("waymark_runloop_ticks_stats_inflight_dispatches_len") + .record(MetricsVal(inflight_dispatches.len())); + metrics::histogram!("waymark_runloop_ticks_stats_sleeping_nodes_len") + .record(MetricsVal(sleeping_nodes.len())); + metrics::histogram!("waymark_runloop_ticks_stats_sleeping_by_instance_len") + .record(MetricsVal(sleeping_by_instance.len())); + metrics::histogram!("waymark_runloop_ticks_stats_blocked_until_by_instance_len") + .record(MetricsVal(blocked_until_by_instance.len())); + metrics::histogram!("waymark_runloop_ticks_stats_instances_done_pending_len") + .record(MetricsVal(instances_done_pending.len())); + tracing::trace!( target: "runloop-ticks", From 76f2884c1cb7eff820b1c35a48bfb9aeb5e20129 Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Mon, 30 Mar 2026 16:39:54 +0400 Subject: [PATCH 12/25] Add the waymark_runloop_ticks_stats_inflight_actions_total metric --- crates/lib/runloop/src/runloop.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/crates/lib/runloop/src/runloop.rs b/crates/lib/runloop/src/runloop.rs index 036524ec..b01bb419 100644 --- a/crates/lib/runloop/src/runloop.rs +++ b/crates/lib/runloop/src/runloop.rs @@ -799,6 +799,10 @@ where metrics::histogram!("waymark_runloop_ticks_stats_instances_done_pending_len") .record(MetricsVal(instances_done_pending.len())); + let total_in_flight_actions: usize = inflight_actions.values().sum(); + metrics::histogram!("waymark_runloop_ticks_stats_inflight_actions_total") + .record(MetricsVal(total_in_flight_actions)); + tracing::trace!( target: "runloop-ticks", From 104582d33947437479922ba5faadb7f39ab84df6 Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Mon, 30 Mar 2026 16:42:46 +0400 Subject: [PATCH 13/25] Added waymark_runloop_ticks_stats_commit_barrier_pending_batch_len metric --- crates/lib/runloop/src/runloop.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/crates/lib/runloop/src/runloop.rs b/crates/lib/runloop/src/runloop.rs index b01bb419..2a3a21e4 100644 --- a/crates/lib/runloop/src/runloop.rs +++ b/crates/lib/runloop/src/runloop.rs @@ -803,6 +803,10 @@ where metrics::histogram!("waymark_runloop_ticks_stats_inflight_actions_total") .record(MetricsVal(total_in_flight_actions)); + let commit_barrier_pending_batch_count = commit_barrier.pending_batch_count(); + metrics::histogram!("waymark_runloop_ticks_stats_commit_barrier_pending_batch_len") + .record(MetricsVal(commit_barrier_pending_batch_count)); + tracing::trace!( target: "runloop-ticks", From eec376f4ac459dacac3cc5c26911eee54ae4f915 Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Mon, 30 Mar 2026 16:45:06 +0400 Subject: [PATCH 14/25] Add waymark_runloop_ticks_stats_blocked_idle_instances_len metric --- crates/lib/runloop/src/runloop.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/crates/lib/runloop/src/runloop.rs b/crates/lib/runloop/src/runloop.rs index 2a3a21e4..a083016f 100644 --- a/crates/lib/runloop/src/runloop.rs +++ b/crates/lib/runloop/src/runloop.rs @@ -807,6 +807,13 @@ where metrics::histogram!("waymark_runloop_ticks_stats_commit_barrier_pending_batch_len") .record(MetricsVal(commit_barrier_pending_batch_count)); + let blocked_idle_instances_count = blocked_until_by_instance + .keys() + .filter(|instance_id| inflight_actions.get(instance_id).copied().unwrap_or(0) == 0) + .count(); + metrics::histogram!("waymark_runloop_ticks_stats_blocked_idle_instances_len") + .record(MetricsVal(blocked_idle_instances_count)); + tracing::trace!( target: "runloop-ticks", @@ -822,6 +829,9 @@ where sleeping_by_instance_len = sleeping_by_instance.len(), blocked_until_by_instance_len = blocked_until_by_instance.len(), instances_done_pending_len = instances_done_pending.len(), + total_in_flight_actions, + commit_barrier_pending_batch_count, + blocked_idle_instances_count, "runloop tick" ); From 43f990b1b375a86cc503ccf102cc03e8c72e35a0 Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Mon, 30 Mar 2026 22:02:52 +0400 Subject: [PATCH 15/25] Integrate the metrics with the postgres count_query facility --- crates/lib/backend-postgres/src/lib.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/lib/backend-postgres/src/lib.rs b/crates/lib/backend-postgres/src/lib.rs index cdb8eb8d..41b4d8fc 100644 --- a/crates/lib/backend-postgres/src/lib.rs +++ b/crates/lib/backend-postgres/src/lib.rs @@ -95,7 +95,8 @@ impl PostgresBackend { .clone() } - pub(crate) fn count_query(counts: &Arc>>, label: &str) { + pub(crate) fn count_query(counts: &Arc>>, label: &'static str) { + metrics::counter!("waymark_postgres_queries_total", "label" => label).increment(1); let mut guard = counts.lock().expect("query counts poisoned"); *guard.entry(label.to_string()).or_insert(0) += 1; } From dd32b8754fb4462eb9e6912c68994cd149e03c94 Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Tue, 31 Mar 2026 11:12:43 +0400 Subject: [PATCH 16/25] Add waymark_runloop_timeout_completions_total metrics and log event --- .../lib/runloop/src/runloop/parts/inflight_dispatches.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/crates/lib/runloop/src/runloop/parts/inflight_dispatches.rs b/crates/lib/runloop/src/runloop/parts/inflight_dispatches.rs index 3e6bc307..51b54874 100644 --- a/crates/lib/runloop/src/runloop/parts/inflight_dispatches.rs +++ b/crates/lib/runloop/src/runloop/parts/inflight_dispatches.rs @@ -58,6 +58,13 @@ pub fn handle(params: Params<'_>) { let Some(dispatch) = inflight_dispatches.get(&execution_id) else { continue; }; + tracing::debug!( + instance_id = %execution_id, + executor_id = %dispatch.executor_id, + "action timed out" + ); + metrics::counter!("waymark_runloop_timeout_completions_total").increment(1); + timeout_completions.push(ActionCompletion { executor_id: dispatch.executor_id, execution_id, From 6ac1c2b44cedae4963027100af8c4632d09333c6 Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Tue, 31 Mar 2026 12:03:26 +0400 Subject: [PATCH 17/25] Add waymark_start_workers_up metric --- Cargo.lock | 1 + crates/bin/start-workers/Cargo.toml | 1 + crates/bin/start-workers/src/main.rs | 32 ++++++++++++++++++++++++++++ 3 files changed, 34 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 8af1eeed..3b9c9a4e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4650,6 +4650,7 @@ version = "0.1.0" dependencies = [ "anyhow", "envfury", + "metrics", "prost 0.12.6", "sqlx", "thiserror", diff --git a/crates/bin/start-workers/Cargo.toml b/crates/bin/start-workers/Cargo.toml index b854bb89..525e1cea 100644 --- a/crates/bin/start-workers/Cargo.toml +++ b/crates/bin/start-workers/Cargo.toml @@ -25,6 +25,7 @@ waymark-worker-status-reporter = { workspace = true } anyhow = { workspace = true } envfury = { workspace = true } +metrics = { workspace = true } prost = { workspace = true } sqlx = { workspace = true } thiserror = { workspace = true } diff --git a/crates/bin/start-workers/src/main.rs b/crates/bin/start-workers/src/main.rs index 9e672afe..8d012846 100644 --- a/crates/bin/start-workers/src/main.rs +++ b/crates/bin/start-workers/src/main.rs @@ -84,6 +84,38 @@ async fn main() -> Result<()> { "starting worker infrastructure" ); + metrics::gauge!( + "waymark_start_workers_up", + + "worker_count" => config.worker_count.to_string(), + "concurrent_per_worker" => config.concurrent_per_worker.to_string(), + "user_modules" => format!("{:?}", config.user_modules), + + "lock_ttl_seconds" => config.lock_ttl.as_secs_f64().to_string(), + "lock_heartbeat_seconds" => config.lock_heartbeat.as_secs_f64().to_string(), + + "evict_sleep_threshold_seconds" => config.evict_sleep_threshold.as_secs_f64().to_string(), + + "expired_lock_reclaimer_interval_seconds" => config.expired_lock_reclaimer_interval.as_secs_f64().to_string(), + "expired_lock_reclaimer_interval_seconds" => config.expired_lock_reclaimer_interval.as_secs_f64().to_string(), + "expired_lock_reclaimer_batch_size" => config.expired_lock_reclaimer_batch_size.to_string(), + + "garbage_collector_interval_seconds" => config.garbage_collector.interval.as_secs_f64().to_string(), + "garbage_collector_batch_size" => config.garbage_collector.batch_size.to_string(), + "garbage_collector_retention_seconds" => config.garbage_collector.retention.as_secs_f64().to_string(), + + "max_action_lifecycle" => config.max_action_lifecycle.map(|val| val.to_string()).unwrap_or("no".into()), + + "executor_shards" => config.executor_shards.to_string(), + "poll_interval_seconds" => config.poll_interval.map(|val| val.as_secs_f64().to_string()).unwrap_or("no".into()), + "max_concurrent_instances" => config.max_concurrent_instances.to_string(), + "instance_done_batch_size" => config.instance_done_batch_size.map(|val| val.to_string()).unwrap_or("no".into()), + "persistence_interval_seconds" => config.persistence_interval.map(|val| val.as_secs_f64().to_string()).unwrap_or("no".into()), + + "profile_interval_seconds" => config.profile_interval.as_secs_f64().to_string(), + ) + .set(1); + // Wire shutdown coordination. let shutdown_token = tokio_util::sync::CancellationToken::new(); From e529b4995f5589e9a9fd2b5da8283d5ef150bf4b Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Tue, 31 Mar 2026 15:17:38 +0400 Subject: [PATCH 18/25] Log and measure the lock uuid --- crates/bin/start-workers/src/main.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/crates/bin/start-workers/src/main.rs b/crates/bin/start-workers/src/main.rs index 8d012846..8d93a48a 100644 --- a/crates/bin/start-workers/src/main.rs +++ b/crates/bin/start-workers/src/main.rs @@ -65,7 +65,10 @@ async fn main() -> Result<()> { // Load configuration and announce startup. let config = WorkerConfig::from_env()?; - tracing::debug!(target: "raw-config", ?config, "raw config"); + // Prepare a new Lock ID to use in the runloop. + let lock_uuid = LockId::new_uuid_v4(); + + tracing::debug!(target: "raw-config", ?config, %lock_uuid, "raw config"); info!( worker_count = config.worker_count, @@ -81,6 +84,7 @@ async fn main() -> Result<()> { garbage_collector_batch_size = config.garbage_collector.batch_size, garbage_collector_retention_secs = config.garbage_collector.retention.as_secs(), max_action_lifecycle = ?config.max_action_lifecycle, + %lock_uuid, "starting worker infrastructure" ); @@ -113,6 +117,8 @@ async fn main() -> Result<()> { "persistence_interval_seconds" => config.persistence_interval.map(|val| val.as_secs_f64().to_string()).unwrap_or("no".into()), "profile_interval_seconds" => config.profile_interval.as_secs_f64().to_string(), + + "lock_uuid" => lock_uuid.to_string(), ) .set(1); @@ -219,7 +225,6 @@ async fn main() -> Result<()> { }); // Run the runloop. - let lock_uuid = LockId::new_uuid_v4(); let runloop = waymark_runloop::RunLoop::new_with_shutdown( remote_pool.clone(), backend.clone(), From 7df33d60d525bd85ddfd6a5eb20fadb70fc7389c Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Tue, 31 Mar 2026 16:10:32 +0400 Subject: [PATCH 19/25] Add waymark_backend_postgres_query_poll_instances_rows_total metric --- crates/lib/backend-postgres/src/core.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/crates/lib/backend-postgres/src/core.rs b/crates/lib/backend-postgres/src/core.rs index a86b9926..51f4fd1e 100644 --- a/crates/lib/backend-postgres/src/core.rs +++ b/crates/lib/backend-postgres/src/core.rs @@ -779,6 +779,9 @@ impl PostgresBackend { return Err(PollQueuedInstancesError::EmptyRows); }; + metrics::counter!("waymark_backend_postgres_query_poll_instances_rows_total") + .increment(rows.len().get() as _); + let claimed_instance_ids: Vec = rows.iter().map(|row| row.get("instance_id")).collect(); sqlx::query("UPDATE runner_instances SET current_status = $2 WHERE instance_id = ANY($1)") From 3698ff919d5b4696ac84f7a1c1cd9e85d5f4c46a Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Tue, 31 Mar 2026 16:45:25 +0400 Subject: [PATCH 20/25] Add waymark_worker_remote_send_action_seconds and waymark_worker_remote_execute_remote_request_seconds metrics --- Cargo.lock | 1 + crates/lib/worker-remote/Cargo.toml | 1 + crates/lib/worker-remote/src/lib.rs | 12 +++++++++++- 3 files changed, 13 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 3b9c9a4e..4987f3b7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4870,6 +4870,7 @@ version = "0.1.0" dependencies = [ "anyhow", "futures-core", + "metrics", "nonempty-collections", "prost 0.12.6", "serde_json", diff --git a/crates/lib/worker-remote/Cargo.toml b/crates/lib/worker-remote/Cargo.toml index 8a9f852c..3e38bdb9 100644 --- a/crates/lib/worker-remote/Cargo.toml +++ b/crates/lib/worker-remote/Cargo.toml @@ -15,6 +15,7 @@ waymark-worker-status-core = { workspace = true } anyhow = { workspace = true } # TODO: drop futures-core = { workspace = true } +metrics = { workspace = true } nonempty-collections = { workspace = true } prost = { workspace = true } serde_json = { workspace = true } diff --git a/crates/lib/worker-remote/src/lib.rs b/crates/lib/worker-remote/src/lib.rs index 9f20fb4a..50277a70 100644 --- a/crates/lib/worker-remote/src/lib.rs +++ b/crates/lib/worker-remote/src/lib.rs @@ -1189,7 +1189,11 @@ async fn execute_remote_request( dispatch_token, }; - match worker.send_action(dispatch).await { + let before = std::time::Instant::now(); + let result = worker.send_action(dispatch).await; + metrics::histogram!("waymark_worker_remote_send_action_seconds").record(before.elapsed()); + + match result { Ok(metrics) => { pool.record_latency(metrics.ack_latency, metrics.worker_duration); pool.record_completion(worker_idx, Arc::clone(&pool)); @@ -1336,7 +1340,13 @@ impl BaseWorkerPool for RemoteWorkerPool { let completion_tx = completion_tx.clone(); let pool = Arc::clone(&pool); tokio::spawn(async move { + let before = std::time::Instant::now(); + let completion = execute_remote_request(pool, request).await; + + metrics::histogram!("waymark_worker_remote_execute_remote_request_seconds") + .record(before.elapsed()); + let _ = completion_tx.send(completion).await; }); } From 4c06cf6861fb5fb7f11af314a0e448fac7b58d06 Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Tue, 31 Mar 2026 17:38:02 +0400 Subject: [PATCH 21/25] Add waymark_worker_remote_execute_remote_request_worker_wait_seconds metric --- crates/lib/worker-remote/src/lib.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/crates/lib/worker-remote/src/lib.rs b/crates/lib/worker-remote/src/lib.rs index 50277a70..31b87628 100644 --- a/crates/lib/worker-remote/src/lib.rs +++ b/crates/lib/worker-remote/src/lib.rs @@ -1168,12 +1168,15 @@ async fn execute_remote_request( }; }; + let before = std::time::Instant::now(); let worker_idx = loop { if let Some(idx) = pool.try_acquire_slot() { break idx; } tokio::time::sleep(Duration::from_millis(5)).await; }; + metrics::histogram!("waymark_worker_remote_execute_remote_request_worker_wait_seconds") + .record(before.elapsed()); let worker = pool.get_worker(worker_idx).await; let dispatch = ActionDispatchPayload { From d58da5bdcafdb362eae9d7cc582f8f2e8fd60114 Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Wed, 1 Apr 2026 13:10:45 +0400 Subject: [PATCH 22/25] Add waymark_worker_status_reporter_loop_upsert_worker_status_seconds and waymark_worker_status_reporter_loop_upsert_worker_status_total metrics --- Cargo.lock | 1 + crates/lib/worker-status-reporter/Cargo.toml | 1 + crates/lib/worker-status-reporter/src/lib.rs | 6 +++++- 3 files changed, 7 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 4987f3b7..76f111c9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4910,6 +4910,7 @@ name = "waymark-worker-status-reporter" version = "0.1.0" dependencies = [ "chrono", + "metrics", "tokio", "tokio-util", "tracing", diff --git a/crates/lib/worker-status-reporter/Cargo.toml b/crates/lib/worker-status-reporter/Cargo.toml index aa9b03c9..f94dd6b1 100644 --- a/crates/lib/worker-status-reporter/Cargo.toml +++ b/crates/lib/worker-status-reporter/Cargo.toml @@ -11,6 +11,7 @@ waymark-worker-status-backend = { workspace = true } waymark-worker-status-core = { workspace = true } chrono = { workspace = true, default-features = false, features = ["clock"] } +metrics = { workspace = true } tokio = { workspace = true, features = ["macros", "time"] } tokio-util = { workspace = true } tracing = { workspace = true } diff --git a/crates/lib/worker-status-reporter/src/lib.rs b/crates/lib/worker-status-reporter/src/lib.rs index ec1dbb6f..1deb3d39 100644 --- a/crates/lib/worker-status-reporter/src/lib.rs +++ b/crates/lib/worker-status-reporter/src/lib.rs @@ -73,7 +73,11 @@ pub async fn run( time_series: Some(time_series.encode()), }; - if let Err(err) = backend.upsert_worker_status(&status).await { + let before = std::time::Instant::now(); + let result = backend.upsert_worker_status(&status).await; + metrics::histogram!("waymark_worker_status_reporter_loop_upsert_worker_status_seconds").record(before.elapsed()); + metrics::counter!("waymark_worker_status_reporter_loop_upsert_worker_status_total", "success" => result.is_ok().to_string()).increment(1); + if let Err(err) = result { warn!(error = %err, "failed to update worker status"); } } From 941d4f87942e7667d3d4abf262d6e257ea89195e Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Wed, 1 Apr 2026 13:19:39 +0400 Subject: [PATCH 23/25] Add waymark_runloop_ticks_stats_available_instance_slots_peek metric --- crates/lib/runloop/src/available_instance_slots.rs | 5 +++++ crates/lib/runloop/src/runloop.rs | 7 +++++++ 2 files changed, 12 insertions(+) diff --git a/crates/lib/runloop/src/available_instance_slots.rs b/crates/lib/runloop/src/available_instance_slots.rs index 4d2cf816..6e692a21 100644 --- a/crates/lib/runloop/src/available_instance_slots.rs +++ b/crates/lib/runloop/src/available_instance_slots.rs @@ -98,6 +98,11 @@ impl Tracker { .send(available_slots) .map_err(|_| UpdateError::NoReaders) } + + /// Peek at the current value of the available slots. + pub fn peek_available(&self) -> usize { + *self.tx.borrow() + } } impl Reader { diff --git a/crates/lib/runloop/src/runloop.rs b/crates/lib/runloop/src/runloop.rs index a083016f..780b1167 100644 --- a/crates/lib/runloop/src/runloop.rs +++ b/crates/lib/runloop/src/runloop.rs @@ -814,6 +814,13 @@ where metrics::histogram!("waymark_runloop_ticks_stats_blocked_idle_instances_len") .record(MetricsVal(blocked_idle_instances_count)); + metrics::histogram!("waymark_runloop_ticks_stats_available_instance_slots_peek") + .record(MetricsVal( + self.available_instances_updater + .available_instance_slots_tracker + .peek_available(), + )); + tracing::trace!( target: "runloop-ticks", From e725809dbc2a92c2b66ac132d99417c01f2f6d45 Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Thu, 2 Apr 2026 09:52:31 +0400 Subject: [PATCH 24/25] Integrate postgres backend count_batch_size with metrics --- Cargo.lock | 1 + crates/lib/backend-postgres/Cargo.toml | 1 + crates/lib/backend-postgres/src/lib.rs | 5 ++++- 3 files changed, 6 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 76f111c9..7bed137d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4014,6 +4014,7 @@ dependencies = [ "waymark-ids", "waymark-ir-conversions", "waymark-ir-parser", + "waymark-metrics-util", "waymark-observability", "waymark-proto", "waymark-runner", diff --git a/crates/lib/backend-postgres/Cargo.toml b/crates/lib/backend-postgres/Cargo.toml index 83051aea..c7048c00 100644 --- a/crates/lib/backend-postgres/Cargo.toml +++ b/crates/lib/backend-postgres/Cargo.toml @@ -13,6 +13,7 @@ waymark-dag-builder = { workspace = true } waymark-garbage-collector-backend = { workspace = true } waymark-ids = { workspace = true } waymark-ir-conversions = { workspace = true } +waymark-metrics-util = { workspace = true } waymark-observability = { workspace = true } waymark-proto = { workspace = true } waymark-runner = { workspace = true } diff --git a/crates/lib/backend-postgres/src/lib.rs b/crates/lib/backend-postgres/src/lib.rs index 41b4d8fc..93813949 100644 --- a/crates/lib/backend-postgres/src/lib.rs +++ b/crates/lib/backend-postgres/src/lib.rs @@ -14,6 +14,7 @@ use std::sync::{Arc, Mutex}; use sqlx::PgPool; use waymark_backends_core::{BackendError, BackendResult}; +use waymark_metrics_util::Val as MetricsVal; use waymark_observability::obs; use waymark_secret_string::SecretStr; use waymark_timed_future::TimedFutureExt as _; @@ -103,12 +104,14 @@ impl PostgresBackend { pub(crate) fn count_batch_size( counts: &Arc>>>, - label: &str, + label: &'static str, size: usize, ) { if size == 0 { return; } + metrics::histogram!("waymark_postgres_queries_batch_size", "label" => label) + .record(MetricsVal(size)); let mut guard = counts.lock().expect("batch size counts poisoned"); let entry = guard.entry(label.to_string()).or_default(); *entry.entry(size).or_insert(0) += 1; From 268b066133729382221996175273aada2f6d82df Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Wed, 8 Apr 2026 02:49:06 +0400 Subject: [PATCH 25/25] Add waymark_postgres_save_graphs_runner_instances_payload_size metric --- crates/lib/backend-postgres/src/core.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/lib/backend-postgres/src/core.rs b/crates/lib/backend-postgres/src/core.rs index 51f4fd1e..8f67a5f0 100644 --- a/crates/lib/backend-postgres/src/core.rs +++ b/crates/lib/backend-postgres/src/core.rs @@ -586,6 +586,8 @@ impl PostgresBackend { runner_builder.push_values( payloads.iter(), |mut b, (instance_id, _scheduled_at, _lock_expires_at, payload)| { + metrics::histogram!("waymark_postgres_save_graphs_runner_instances_payload_size") + .record(payload.len() as f64); b.push_bind(*instance_id).push_bind(payload.as_slice()); }, );