Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
53d8060
Add logging at runloop ticks
MOZGIII Mar 27, 2026
ac770bd
Add a metric of a total number of the runloop ticks
MOZGIII Mar 27, 2026
287177d
Add waymark_runloop_ticks_by_cause_total metrics
MOZGIII Mar 27, 2026
598ecfc
Add waymark-metrics-util crate
MOZGIII Apr 2, 2026
70b8848
Add waymark_runloop_channel_len
MOZGIII Mar 27, 2026
0e5e671
Add waymark_runloop_polled_instances_total and waymark_runloop_polled…
MOZGIII Mar 27, 2026
0a08a2c
Add waymark_runloop_shard_steps_total metric
MOZGIII Mar 27, 2026
5001f97
Add waymark_runloop_shard_instance_failed_total metric
MOZGIII Mar 27, 2026
4c16110
Add waymark_runloop_polled_completions_total metric
MOZGIII Mar 27, 2026
b701c6a
Add waymark_runloop_flush_instances_done_total metric
MOZGIII Mar 27, 2026
6f2dec7
Add waymark_runloop_ticks_stats_* metrics
MOZGIII Mar 30, 2026
76f2884
Add the waymark_runloop_ticks_stats_inflight_actions_total metric
MOZGIII Mar 30, 2026
104582d
Added waymark_runloop_ticks_stats_commit_barrier_pending_batch_len me…
MOZGIII Mar 30, 2026
eec376f
Add waymark_runloop_ticks_stats_blocked_idle_instances_len metric
MOZGIII Mar 30, 2026
43f990b
Integrate the metrics with the postgres count_query facility
MOZGIII Mar 30, 2026
dd32b87
Add waymark_runloop_timeout_completions_total metrics and log event
MOZGIII Mar 31, 2026
6ac1c2b
Add waymark_start_workers_up metric
MOZGIII Mar 31, 2026
e529b49
Log and measure the lock uuid
MOZGIII Mar 31, 2026
7df33d6
Add waymark_backend_postgres_query_poll_instances_rows_total metric
MOZGIII Mar 31, 2026
3698ff9
Add waymark_worker_remote_send_action_seconds and waymark_worker_remo…
MOZGIII Mar 31, 2026
4c06cf6
Add waymark_worker_remote_execute_remote_request_worker_wait_seconds …
MOZGIII Mar 31, 2026
d58da5b
Add waymark_worker_status_reporter_loop_upsert_worker_status_seconds …
MOZGIII Apr 1, 2026
941d4f8
Add waymark_runloop_ticks_stats_available_instance_slots_peek metric
MOZGIII Apr 1, 2026
e725809
Integrate postgres backend count_batch_size with metrics
MOZGIII Apr 2, 2026
268b066
Add waymark_postgres_save_graphs_runner_instances_payload_size metric
MOZGIII Apr 7, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ waymark-ir-conversions = { path = "crates/lib/ir-conversions" }
waymark-ir-format = { path = "crates/lib/ir-format" }
waymark-ir-parser = { path = "crates/lib/ir-parser" }
waymark-message-conversions = { path = "crates/lib/message-conversions" }
waymark-metrics-util = { path = "crates/lib/metrics-util" }
waymark-nonzero-duration = { path = "crates/lib/nonzero-duration" }
waymark-observability = { path = "crates/lib/observability" }
waymark-observability-macros = { path = "crates/lib/observability-macros" }
Expand Down
1 change: 1 addition & 0 deletions crates/bin/start-workers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ waymark-worker-status-reporter = { workspace = true }

anyhow = { workspace = true }
envfury = { workspace = true }
metrics = { workspace = true }
prost = { workspace = true }
sqlx = { workspace = true }
thiserror = { workspace = true }
Expand Down
41 changes: 39 additions & 2 deletions crates/bin/start-workers/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ async fn main() -> Result<()> {
// Load configuration and announce startup.
let config = WorkerConfig::from_env()?;

tracing::debug!(target: "raw-config", ?config, "raw config");
// Prepare a new Lock ID to use in the runloop.
let lock_uuid = LockId::new_uuid_v4();

tracing::debug!(target: "raw-config", ?config, %lock_uuid, "raw config");

info!(
worker_count = config.worker_count,
Expand All @@ -81,9 +84,44 @@ async fn main() -> Result<()> {
garbage_collector_batch_size = config.garbage_collector.batch_size,
garbage_collector_retention_secs = config.garbage_collector.retention.as_secs(),
max_action_lifecycle = ?config.max_action_lifecycle,
%lock_uuid,
"starting worker infrastructure"
);

metrics::gauge!(
"waymark_start_workers_up",

"worker_count" => config.worker_count.to_string(),
"concurrent_per_worker" => config.concurrent_per_worker.to_string(),
"user_modules" => format!("{:?}", config.user_modules),

"lock_ttl_seconds" => config.lock_ttl.as_secs_f64().to_string(),
"lock_heartbeat_seconds" => config.lock_heartbeat.as_secs_f64().to_string(),

"evict_sleep_threshold_seconds" => config.evict_sleep_threshold.as_secs_f64().to_string(),

"expired_lock_reclaimer_interval_seconds" => config.expired_lock_reclaimer_interval.as_secs_f64().to_string(),
"expired_lock_reclaimer_interval_seconds" => config.expired_lock_reclaimer_interval.as_secs_f64().to_string(),
"expired_lock_reclaimer_batch_size" => config.expired_lock_reclaimer_batch_size.to_string(),

"garbage_collector_interval_seconds" => config.garbage_collector.interval.as_secs_f64().to_string(),
"garbage_collector_batch_size" => config.garbage_collector.batch_size.to_string(),
"garbage_collector_retention_seconds" => config.garbage_collector.retention.as_secs_f64().to_string(),

"max_action_lifecycle" => config.max_action_lifecycle.map(|val| val.to_string()).unwrap_or("no".into()),

"executor_shards" => config.executor_shards.to_string(),
"poll_interval_seconds" => config.poll_interval.map(|val| val.as_secs_f64().to_string()).unwrap_or("no".into()),
"max_concurrent_instances" => config.max_concurrent_instances.to_string(),
"instance_done_batch_size" => config.instance_done_batch_size.map(|val| val.to_string()).unwrap_or("no".into()),
"persistence_interval_seconds" => config.persistence_interval.map(|val| val.as_secs_f64().to_string()).unwrap_or("no".into()),

"profile_interval_seconds" => config.profile_interval.as_secs_f64().to_string(),

"lock_uuid" => lock_uuid.to_string(),
)
.set(1);

// Wire shutdown coordination.
let shutdown_token = tokio_util::sync::CancellationToken::new();

Expand Down Expand Up @@ -187,7 +225,6 @@ async fn main() -> Result<()> {
});

// Run the runloop.
let lock_uuid = LockId::new_uuid_v4();
let runloop = waymark_runloop::RunLoop::new_with_shutdown(
remote_pool.clone(),
backend.clone(),
Expand Down
1 change: 1 addition & 0 deletions crates/lib/backend-postgres/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ waymark-dag-builder = { workspace = true }
waymark-garbage-collector-backend = { workspace = true }
waymark-ids = { workspace = true }
waymark-ir-conversions = { workspace = true }
waymark-metrics-util = { workspace = true }
waymark-observability = { workspace = true }
waymark-proto = { workspace = true }
waymark-runner = { workspace = true }
Expand Down
5 changes: 5 additions & 0 deletions crates/lib/backend-postgres/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,8 @@ impl PostgresBackend {
runner_builder.push_values(
payloads.iter(),
|mut b, (instance_id, _scheduled_at, _lock_expires_at, payload)| {
metrics::histogram!("waymark_postgres_save_graphs_runner_instances_payload_size")
.record(payload.len() as f64);
b.push_bind(*instance_id).push_bind(payload.as_slice());
},
);
Expand Down Expand Up @@ -779,6 +781,9 @@ impl PostgresBackend {
return Err(PollQueuedInstancesError::EmptyRows);
};

metrics::counter!("waymark_backend_postgres_query_poll_instances_rows_total")
.increment(rows.len().get() as _);

let claimed_instance_ids: Vec<Uuid> =
rows.iter().map(|row| row.get("instance_id")).collect();
sqlx::query("UPDATE runner_instances SET current_status = $2 WHERE instance_id = ANY($1)")
Expand Down
8 changes: 6 additions & 2 deletions crates/lib/backend-postgres/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use std::sync::{Arc, Mutex};

use sqlx::PgPool;
use waymark_backends_core::{BackendError, BackendResult};
use waymark_metrics_util::Val as MetricsVal;
use waymark_observability::obs;
use waymark_secret_string::SecretStr;
use waymark_timed_future::TimedFutureExt as _;
Expand Down Expand Up @@ -95,19 +96,22 @@ impl PostgresBackend {
.clone()
}

pub(crate) fn count_query(counts: &Arc<Mutex<HashMap<String, usize>>>, label: &str) {
pub(crate) fn count_query(counts: &Arc<Mutex<HashMap<String, usize>>>, label: &'static str) {
metrics::counter!("waymark_postgres_queries_total", "label" => label).increment(1);
let mut guard = counts.lock().expect("query counts poisoned");
*guard.entry(label.to_string()).or_insert(0) += 1;
}

pub(crate) fn count_batch_size(
counts: &Arc<Mutex<HashMap<String, HashMap<usize, usize>>>>,
label: &str,
label: &'static str,
size: usize,
) {
if size == 0 {
return;
}
metrics::histogram!("waymark_postgres_queries_batch_size", "label" => label)
.record(MetricsVal(size));
let mut guard = counts.lock().expect("batch size counts poisoned");
let entry = guard.entry(label.to_string()).or_default();
*entry.entry(size).or_insert(0) += 1;
Expand Down
8 changes: 8 additions & 0 deletions crates/lib/metrics-util/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[package]
name = "waymark-metrics-util"
edition = "2024"
version.workspace = true
publish.workspace = true

[dependencies]
metrics = { workspace = true }
9 changes: 9 additions & 0 deletions crates/lib/metrics-util/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
//! Utilities for [`metrics`] crate.

pub struct Val<T>(pub T);

impl metrics::IntoF64 for Val<usize> {
fn into_f64(self) -> f64 {
self.0 as _
}
}
2 changes: 2 additions & 0 deletions crates/lib/runloop/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ waymark-core-backend = { workspace = true }
waymark-dag = { workspace = true }
waymark-dag-builder = { workspace = true }
waymark-ids = { workspace = true }
waymark-metrics-util = { workspace = true }
waymark-nonzero-duration = { workspace = true }
waymark-observability = { workspace = true }
waymark-proto = { workspace = true }
Expand All @@ -23,6 +24,7 @@ waymark-worker-core = { workspace = true }
waymark-workflow-registry-backend = { workspace = true }

chrono = { workspace = true, default-features = false }
metrics = { workspace = true }
nonempty-collections = { workspace = true }
prost = { workspace = true }
serde_json = { workspace = true }
Expand Down
5 changes: 5 additions & 0 deletions crates/lib/runloop/src/available_instance_slots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ impl Tracker {
.send(available_slots)
.map_err(|_| UpdateError::NoReaders)
}

/// Peek at the current value of the available slots.
pub fn peek_available(&self) -> usize {
*self.tx.borrow()
}
}

impl Reader {
Expand Down
Loading
Loading