diff --git a/Cargo.lock b/Cargo.lock index ec145120..7bed137d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4014,6 +4014,7 @@ dependencies = [ "waymark-ids", "waymark-ir-conversions", "waymark-ir-parser", + "waymark-metrics-util", "waymark-observability", "waymark-proto", "waymark-runner", @@ -4355,6 +4356,13 @@ dependencies = [ "waymark-proto", ] +[[package]] +name = "waymark-metrics-util" +version = "0.1.0" +dependencies = [ + "metrics", +] + [[package]] name = "waymark-nonzero-duration" version = "0.1.0" @@ -4417,6 +4425,7 @@ name = "waymark-runloop" version = "0.1.0" dependencies = [ "chrono", + "metrics", "mockall", "nonempty-collections", "prost 0.12.6", @@ -4435,6 +4444,7 @@ dependencies = [ "waymark-dag-builder", "waymark-ids", "waymark-ir-parser", + "waymark-metrics-util", "waymark-nonzero-duration", "waymark-observability", "waymark-proto", @@ -4641,6 +4651,7 @@ version = "0.1.0" dependencies = [ "anyhow", "envfury", + "metrics", "prost 0.12.6", "sqlx", "thiserror", @@ -4860,6 +4871,7 @@ version = "0.1.0" dependencies = [ "anyhow", "futures-core", + "metrics", "nonempty-collections", "prost 0.12.6", "serde_json", @@ -4899,6 +4911,7 @@ name = "waymark-worker-status-reporter" version = "0.1.0" dependencies = [ "chrono", + "metrics", "tokio", "tokio-util", "tracing", diff --git a/Cargo.toml b/Cargo.toml index c91d0f29..c0b84afa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,7 @@ waymark-ir-conversions = { path = "crates/lib/ir-conversions" } waymark-ir-format = { path = "crates/lib/ir-format" } waymark-ir-parser = { path = "crates/lib/ir-parser" } waymark-message-conversions = { path = "crates/lib/message-conversions" } +waymark-metrics-util = { path = "crates/lib/metrics-util" } waymark-nonzero-duration = { path = "crates/lib/nonzero-duration" } waymark-observability = { path = "crates/lib/observability" } waymark-observability-macros = { path = "crates/lib/observability-macros" } diff --git a/crates/bin/start-workers/Cargo.toml b/crates/bin/start-workers/Cargo.toml index b854bb89..525e1cea 100644 --- a/crates/bin/start-workers/Cargo.toml +++ b/crates/bin/start-workers/Cargo.toml @@ -25,6 +25,7 @@ waymark-worker-status-reporter = { workspace = true } anyhow = { workspace = true } envfury = { workspace = true } +metrics = { workspace = true } prost = { workspace = true } sqlx = { workspace = true } thiserror = { workspace = true } diff --git a/crates/bin/start-workers/src/main.rs b/crates/bin/start-workers/src/main.rs index 9e672afe..8d93a48a 100644 --- a/crates/bin/start-workers/src/main.rs +++ b/crates/bin/start-workers/src/main.rs @@ -65,7 +65,10 @@ async fn main() -> Result<()> { // Load configuration and announce startup. let config = WorkerConfig::from_env()?; - tracing::debug!(target: "raw-config", ?config, "raw config"); + // Prepare a new Lock ID to use in the runloop. + let lock_uuid = LockId::new_uuid_v4(); + + tracing::debug!(target: "raw-config", ?config, %lock_uuid, "raw config"); info!( worker_count = config.worker_count, @@ -81,9 +84,44 @@ async fn main() -> Result<()> { garbage_collector_batch_size = config.garbage_collector.batch_size, garbage_collector_retention_secs = config.garbage_collector.retention.as_secs(), max_action_lifecycle = ?config.max_action_lifecycle, + %lock_uuid, "starting worker infrastructure" ); + metrics::gauge!( + "waymark_start_workers_up", + + "worker_count" => config.worker_count.to_string(), + "concurrent_per_worker" => config.concurrent_per_worker.to_string(), + "user_modules" => format!("{:?}", config.user_modules), + + "lock_ttl_seconds" => config.lock_ttl.as_secs_f64().to_string(), + "lock_heartbeat_seconds" => config.lock_heartbeat.as_secs_f64().to_string(), + + "evict_sleep_threshold_seconds" => config.evict_sleep_threshold.as_secs_f64().to_string(), + + "expired_lock_reclaimer_interval_seconds" => config.expired_lock_reclaimer_interval.as_secs_f64().to_string(), + "expired_lock_reclaimer_interval_seconds" => config.expired_lock_reclaimer_interval.as_secs_f64().to_string(), + "expired_lock_reclaimer_batch_size" => config.expired_lock_reclaimer_batch_size.to_string(), + + "garbage_collector_interval_seconds" => config.garbage_collector.interval.as_secs_f64().to_string(), + "garbage_collector_batch_size" => config.garbage_collector.batch_size.to_string(), + "garbage_collector_retention_seconds" => config.garbage_collector.retention.as_secs_f64().to_string(), + + "max_action_lifecycle" => config.max_action_lifecycle.map(|val| val.to_string()).unwrap_or("no".into()), + + "executor_shards" => config.executor_shards.to_string(), + "poll_interval_seconds" => config.poll_interval.map(|val| val.as_secs_f64().to_string()).unwrap_or("no".into()), + "max_concurrent_instances" => config.max_concurrent_instances.to_string(), + "instance_done_batch_size" => config.instance_done_batch_size.map(|val| val.to_string()).unwrap_or("no".into()), + "persistence_interval_seconds" => config.persistence_interval.map(|val| val.as_secs_f64().to_string()).unwrap_or("no".into()), + + "profile_interval_seconds" => config.profile_interval.as_secs_f64().to_string(), + + "lock_uuid" => lock_uuid.to_string(), + ) + .set(1); + // Wire shutdown coordination. let shutdown_token = tokio_util::sync::CancellationToken::new(); @@ -187,7 +225,6 @@ async fn main() -> Result<()> { }); // Run the runloop. - let lock_uuid = LockId::new_uuid_v4(); let runloop = waymark_runloop::RunLoop::new_with_shutdown( remote_pool.clone(), backend.clone(), diff --git a/crates/lib/backend-postgres/Cargo.toml b/crates/lib/backend-postgres/Cargo.toml index 83051aea..c7048c00 100644 --- a/crates/lib/backend-postgres/Cargo.toml +++ b/crates/lib/backend-postgres/Cargo.toml @@ -13,6 +13,7 @@ waymark-dag-builder = { workspace = true } waymark-garbage-collector-backend = { workspace = true } waymark-ids = { workspace = true } waymark-ir-conversions = { workspace = true } +waymark-metrics-util = { workspace = true } waymark-observability = { workspace = true } waymark-proto = { workspace = true } waymark-runner = { workspace = true } diff --git a/crates/lib/backend-postgres/src/core.rs b/crates/lib/backend-postgres/src/core.rs index a86b9926..8f67a5f0 100644 --- a/crates/lib/backend-postgres/src/core.rs +++ b/crates/lib/backend-postgres/src/core.rs @@ -586,6 +586,8 @@ impl PostgresBackend { runner_builder.push_values( payloads.iter(), |mut b, (instance_id, _scheduled_at, _lock_expires_at, payload)| { + metrics::histogram!("waymark_postgres_save_graphs_runner_instances_payload_size") + .record(payload.len() as f64); b.push_bind(*instance_id).push_bind(payload.as_slice()); }, ); @@ -779,6 +781,9 @@ impl PostgresBackend { return Err(PollQueuedInstancesError::EmptyRows); }; + metrics::counter!("waymark_backend_postgres_query_poll_instances_rows_total") + .increment(rows.len().get() as _); + let claimed_instance_ids: Vec = rows.iter().map(|row| row.get("instance_id")).collect(); sqlx::query("UPDATE runner_instances SET current_status = $2 WHERE instance_id = ANY($1)") diff --git a/crates/lib/backend-postgres/src/lib.rs b/crates/lib/backend-postgres/src/lib.rs index cdb8eb8d..93813949 100644 --- a/crates/lib/backend-postgres/src/lib.rs +++ b/crates/lib/backend-postgres/src/lib.rs @@ -14,6 +14,7 @@ use std::sync::{Arc, Mutex}; use sqlx::PgPool; use waymark_backends_core::{BackendError, BackendResult}; +use waymark_metrics_util::Val as MetricsVal; use waymark_observability::obs; use waymark_secret_string::SecretStr; use waymark_timed_future::TimedFutureExt as _; @@ -95,19 +96,22 @@ impl PostgresBackend { .clone() } - pub(crate) fn count_query(counts: &Arc>>, label: &str) { + pub(crate) fn count_query(counts: &Arc>>, label: &'static str) { + metrics::counter!("waymark_postgres_queries_total", "label" => label).increment(1); let mut guard = counts.lock().expect("query counts poisoned"); *guard.entry(label.to_string()).or_insert(0) += 1; } pub(crate) fn count_batch_size( counts: &Arc>>>, - label: &str, + label: &'static str, size: usize, ) { if size == 0 { return; } + metrics::histogram!("waymark_postgres_queries_batch_size", "label" => label) + .record(MetricsVal(size)); let mut guard = counts.lock().expect("batch size counts poisoned"); let entry = guard.entry(label.to_string()).or_default(); *entry.entry(size).or_insert(0) += 1; diff --git a/crates/lib/metrics-util/Cargo.toml b/crates/lib/metrics-util/Cargo.toml new file mode 100644 index 00000000..33e6e16d --- /dev/null +++ b/crates/lib/metrics-util/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "waymark-metrics-util" +edition = "2024" +version.workspace = true +publish.workspace = true + +[dependencies] +metrics = { workspace = true } diff --git a/crates/lib/metrics-util/src/lib.rs b/crates/lib/metrics-util/src/lib.rs new file mode 100644 index 00000000..c4e0139c --- /dev/null +++ b/crates/lib/metrics-util/src/lib.rs @@ -0,0 +1,9 @@ +//! Utilities for [`metrics`] crate. + +pub struct Val(pub T); + +impl metrics::IntoF64 for Val { + fn into_f64(self) -> f64 { + self.0 as _ + } +} diff --git a/crates/lib/runloop/Cargo.toml b/crates/lib/runloop/Cargo.toml index fed1581a..2a139b85 100644 --- a/crates/lib/runloop/Cargo.toml +++ b/crates/lib/runloop/Cargo.toml @@ -10,6 +10,7 @@ waymark-core-backend = { workspace = true } waymark-dag = { workspace = true } waymark-dag-builder = { workspace = true } waymark-ids = { workspace = true } +waymark-metrics-util = { workspace = true } waymark-nonzero-duration = { workspace = true } waymark-observability = { workspace = true } waymark-proto = { workspace = true } @@ -23,6 +24,7 @@ waymark-worker-core = { workspace = true } waymark-workflow-registry-backend = { workspace = true } chrono = { workspace = true, default-features = false } +metrics = { workspace = true } nonempty-collections = { workspace = true } prost = { workspace = true } serde_json = { workspace = true } diff --git a/crates/lib/runloop/src/available_instance_slots.rs b/crates/lib/runloop/src/available_instance_slots.rs index 4d2cf816..6e692a21 100644 --- a/crates/lib/runloop/src/available_instance_slots.rs +++ b/crates/lib/runloop/src/available_instance_slots.rs @@ -98,6 +98,11 @@ impl Tracker { .send(available_slots) .map_err(|_| UpdateError::NoReaders) } + + /// Peek at the current value of the available slots. + pub fn peek_available(&self) -> usize { + *self.tx.borrow() + } } impl Reader { diff --git a/crates/lib/runloop/src/runloop.rs b/crates/lib/runloop/src/runloop.rs index 7c582ee3..780b1167 100644 --- a/crates/lib/runloop/src/runloop.rs +++ b/crates/lib/runloop/src/runloop.rs @@ -23,6 +23,7 @@ use crate::commit_barrier::CommitBarrier; use crate::instance_lock_heartbeat; use crate::{error_value, persist, queued_instances_polling, shard}; use waymark_ids::{ExecutionId, InstanceId, LockId}; +use waymark_metrics_util::Val as MetricsVal; use waymark_dag::DAG; use waymark_observability::obs; @@ -438,6 +439,17 @@ where }, }; + metrics::histogram!("waymark_runloop_channel_len", "channel" => "completion_rx") + .record(MetricsVal(completion_rx.len())); + metrics::histogram!("waymark_runloop_channel_len", "channel" => "instance_rx") + .record(MetricsVal(instance_rx.len())); + metrics::histogram!("waymark_runloop_channel_len", "channel" => "sleep_rx") + .record(MetricsVal(sleep_rx.len())); + metrics::histogram!("waymark_runloop_channel_len", "channel" => "event_rx") + .record(MetricsVal(event_rx.len())); + metrics::histogram!("waymark_runloop_channel_len", "channel" => "persist_ack_rx") + .record(MetricsVal(persist_ack_rx.len())); + let mut all_completions: Vec = Vec::new(); let mut all_instances: Vec = Vec::new(); let mut all_steps: Vec = Vec::new(); @@ -448,14 +460,23 @@ where match first_event { CoordinatorEvent::Completions(completions) => { + metrics::counter!("waymark_runloop_ticks_by_cause_total", "cause" => "completions").increment(1); + metrics::counter!("waymark_runloop_polled_completions_total", "where" => "first") + .increment(completions.len() as _); all_completions.extend(completions); } CoordinatorEvent::Instance(queued_instances_polling::Message::Batch { instances, }) => { + metrics::counter!("waymark_runloop_ticks_by_cause_total", "cause" => "instances").increment(1); + metrics::counter!("waymark_runloop_polled_instances_total", "where" => "first") + .increment(instances.len().get() as _); all_instances.extend(instances); } CoordinatorEvent::Instance(queued_instances_polling::Message::Pending) => { + metrics::counter!("waymark_runloop_ticks_by_cause_total", "cause" => "instances_pending").increment(1); + metrics::counter!("waymark_runloop_polled_instance_pending_total", "where" => "first") + .increment(1); queued_instances_poller_is_pending = true; } CoordinatorEvent::Instance(queued_instances_polling::Message::Error(err)) => { @@ -463,12 +484,20 @@ where break 'runloop Err(Error::CoreBackendPoll(err)); } CoordinatorEvent::Shard(event) => match event { - shard::Event::Step(step) => all_steps.push(step), + shard::Event::Step(step) => { + metrics::counter!("waymark_runloop_ticks_by_cause_total", "cause" => "shard_step").increment(1); + metrics::counter!("waymark_runloop_shard_steps_total", "where" => "first") + .increment(1); + all_steps.push(step) + } shard::Event::InstanceFailed { executor_id, entry_node, error, } => { + metrics::counter!("waymark_runloop_ticks_by_cause_total", "cause" => "shard_instance_failed").increment(1); + metrics::counter!("waymark_runloop_shard_instance_failed_total", "where" => "first") + .increment(1); all_failed_instances.push(InstanceDone { executor_id, entry_node, @@ -478,25 +507,33 @@ where } }, CoordinatorEvent::SleepWake(wake) => { + metrics::counter!("waymark_runloop_ticks_by_cause_total", "cause" => "sleep_wake").increment(1); all_wakes.push(wake); } CoordinatorEvent::PersistAck(ack) => { + metrics::counter!("waymark_runloop_ticks_by_cause_total", "cause" => "persist_ack").increment(1); all_persist_acks.push(ack); } - CoordinatorEvent::ActionTimeoutTick => {} + CoordinatorEvent::ActionTimeoutTick => { + metrics::counter!("waymark_runloop_ticks_by_cause_total", "cause" => "action_timeout_tick").increment(1); + } } while let Ok(completions) = completion_rx.try_recv() { let completions = completions.into_inner_measured_multi(&["completions_try", "completions_both"]); + metrics::counter!("waymark_runloop_polled_completions_total", "where" => "batch") + .increment(completions.len() as _); all_completions.extend(completions); } while let Ok(message) = instance_rx.try_recv() { match message { queued_instances_polling::Message::Batch { instances } => { + metrics::counter!("waymark_runloop_polled_instances_total", "where" => "batch").increment(instances.len().get() as _); all_instances.extend(instances); } queued_instances_polling::Message::Pending => { + metrics::counter!("waymark_runloop_polled_instance_pending_total", "where" => "batch").increment(1); queued_instances_poller_is_pending = true; } queued_instances_polling::Message::Error(err) => { @@ -510,12 +547,18 @@ where let event = event.into_inner_measured_multi(&["shard_event_try", "shard_event_both"]); match event { - shard::Event::Step(step) => all_steps.push(step), + shard::Event::Step(step) => { + metrics::counter!("waymark_runloop_shard_steps_total", "where" => "batch") + .increment(1); + all_steps.push(step); + } shard::Event::InstanceFailed { executor_id, entry_node, error, } => { + metrics::counter!("waymark_runloop_shard_instance_failed_total", "where" => "batch") + .increment(1); all_failed_instances.push(InstanceDone { executor_id, entry_node, @@ -738,6 +781,67 @@ where { break 'runloop Err(err.into()); } + + metrics::counter!("waymark_runloop_ticks_total").increment(1); + + metrics::histogram!("waymark_runloop_ticks_stats_executor_shards_len") + .record(MetricsVal(executor_shards.len())); + metrics::histogram!("waymark_runloop_ticks_stats_inflight_actions_len") + .record(MetricsVal(inflight_actions.len())); + metrics::histogram!("waymark_runloop_ticks_stats_inflight_dispatches_len") + .record(MetricsVal(inflight_dispatches.len())); + metrics::histogram!("waymark_runloop_ticks_stats_sleeping_nodes_len") + .record(MetricsVal(sleeping_nodes.len())); + metrics::histogram!("waymark_runloop_ticks_stats_sleeping_by_instance_len") + .record(MetricsVal(sleeping_by_instance.len())); + metrics::histogram!("waymark_runloop_ticks_stats_blocked_until_by_instance_len") + .record(MetricsVal(blocked_until_by_instance.len())); + metrics::histogram!("waymark_runloop_ticks_stats_instances_done_pending_len") + .record(MetricsVal(instances_done_pending.len())); + + let total_in_flight_actions: usize = inflight_actions.values().sum(); + metrics::histogram!("waymark_runloop_ticks_stats_inflight_actions_total") + .record(MetricsVal(total_in_flight_actions)); + + let commit_barrier_pending_batch_count = commit_barrier.pending_batch_count(); + metrics::histogram!("waymark_runloop_ticks_stats_commit_barrier_pending_batch_len") + .record(MetricsVal(commit_barrier_pending_batch_count)); + + let blocked_idle_instances_count = blocked_until_by_instance + .keys() + .filter(|instance_id| inflight_actions.get(instance_id).copied().unwrap_or(0) == 0) + .count(); + metrics::histogram!("waymark_runloop_ticks_stats_blocked_idle_instances_len") + .record(MetricsVal(blocked_idle_instances_count)); + + metrics::histogram!("waymark_runloop_ticks_stats_available_instance_slots_peek") + .record(MetricsVal( + self.available_instances_updater + .available_instance_slots_tracker + .peek_available(), + )); + + tracing::trace!( + target: "runloop-ticks", + + instances_idle, + next_shard, + + shard_senders_len = shard_senders.len(), + + executor_shards_len = executor_shards.len(), + inflight_actions_len = inflight_actions.len(), + inflight_dispatches_len = inflight_dispatches.len(), + sleeping_nodes_len = sleeping_nodes.len(), + sleeping_by_instance_len = sleeping_by_instance.len(), + blocked_until_by_instance_len = blocked_until_by_instance.len(), + instances_done_pending_len = instances_done_pending.len(), + total_in_flight_actions, + commit_barrier_pending_batch_count, + blocked_idle_instances_count, + + "runloop tick" + ); }; info!( diff --git a/crates/lib/runloop/src/runloop/ops/flush_instances_done.rs b/crates/lib/runloop/src/runloop/ops/flush_instances_done.rs index 3c8b5e80..0bceb3d5 100644 --- a/crates/lib/runloop/src/runloop/ops/flush_instances_done.rs +++ b/crates/lib/runloop/src/runloop/ops/flush_instances_done.rs @@ -33,6 +33,7 @@ where return Ok(()); } let batch = std::mem::take(pending); + metrics::counter!("waymark_runloop_flush_instances_done_total").increment(batch.len() as _); core_backend.save_instances_done(&batch).await?; Ok(()) } diff --git a/crates/lib/runloop/src/runloop/parts/inflight_dispatches.rs b/crates/lib/runloop/src/runloop/parts/inflight_dispatches.rs index 3e6bc307..51b54874 100644 --- a/crates/lib/runloop/src/runloop/parts/inflight_dispatches.rs +++ b/crates/lib/runloop/src/runloop/parts/inflight_dispatches.rs @@ -58,6 +58,13 @@ pub fn handle(params: Params<'_>) { let Some(dispatch) = inflight_dispatches.get(&execution_id) else { continue; }; + tracing::debug!( + instance_id = %execution_id, + executor_id = %dispatch.executor_id, + "action timed out" + ); + metrics::counter!("waymark_runloop_timeout_completions_total").increment(1); + timeout_completions.push(ActionCompletion { executor_id: dispatch.executor_id, execution_id, diff --git a/crates/lib/worker-remote/Cargo.toml b/crates/lib/worker-remote/Cargo.toml index 8a9f852c..3e38bdb9 100644 --- a/crates/lib/worker-remote/Cargo.toml +++ b/crates/lib/worker-remote/Cargo.toml @@ -15,6 +15,7 @@ waymark-worker-status-core = { workspace = true } anyhow = { workspace = true } # TODO: drop futures-core = { workspace = true } +metrics = { workspace = true } nonempty-collections = { workspace = true } prost = { workspace = true } serde_json = { workspace = true } diff --git a/crates/lib/worker-remote/src/lib.rs b/crates/lib/worker-remote/src/lib.rs index 9f20fb4a..31b87628 100644 --- a/crates/lib/worker-remote/src/lib.rs +++ b/crates/lib/worker-remote/src/lib.rs @@ -1168,12 +1168,15 @@ async fn execute_remote_request( }; }; + let before = std::time::Instant::now(); let worker_idx = loop { if let Some(idx) = pool.try_acquire_slot() { break idx; } tokio::time::sleep(Duration::from_millis(5)).await; }; + metrics::histogram!("waymark_worker_remote_execute_remote_request_worker_wait_seconds") + .record(before.elapsed()); let worker = pool.get_worker(worker_idx).await; let dispatch = ActionDispatchPayload { @@ -1189,7 +1192,11 @@ async fn execute_remote_request( dispatch_token, }; - match worker.send_action(dispatch).await { + let before = std::time::Instant::now(); + let result = worker.send_action(dispatch).await; + metrics::histogram!("waymark_worker_remote_send_action_seconds").record(before.elapsed()); + + match result { Ok(metrics) => { pool.record_latency(metrics.ack_latency, metrics.worker_duration); pool.record_completion(worker_idx, Arc::clone(&pool)); @@ -1336,7 +1343,13 @@ impl BaseWorkerPool for RemoteWorkerPool { let completion_tx = completion_tx.clone(); let pool = Arc::clone(&pool); tokio::spawn(async move { + let before = std::time::Instant::now(); + let completion = execute_remote_request(pool, request).await; + + metrics::histogram!("waymark_worker_remote_execute_remote_request_seconds") + .record(before.elapsed()); + let _ = completion_tx.send(completion).await; }); } diff --git a/crates/lib/worker-status-reporter/Cargo.toml b/crates/lib/worker-status-reporter/Cargo.toml index aa9b03c9..f94dd6b1 100644 --- a/crates/lib/worker-status-reporter/Cargo.toml +++ b/crates/lib/worker-status-reporter/Cargo.toml @@ -11,6 +11,7 @@ waymark-worker-status-backend = { workspace = true } waymark-worker-status-core = { workspace = true } chrono = { workspace = true, default-features = false, features = ["clock"] } +metrics = { workspace = true } tokio = { workspace = true, features = ["macros", "time"] } tokio-util = { workspace = true } tracing = { workspace = true } diff --git a/crates/lib/worker-status-reporter/src/lib.rs b/crates/lib/worker-status-reporter/src/lib.rs index ec1dbb6f..1deb3d39 100644 --- a/crates/lib/worker-status-reporter/src/lib.rs +++ b/crates/lib/worker-status-reporter/src/lib.rs @@ -73,7 +73,11 @@ pub async fn run( time_series: Some(time_series.encode()), }; - if let Err(err) = backend.upsert_worker_status(&status).await { + let before = std::time::Instant::now(); + let result = backend.upsert_worker_status(&status).await; + metrics::histogram!("waymark_worker_status_reporter_loop_upsert_worker_status_seconds").record(before.elapsed()); + metrics::counter!("waymark_worker_status_reporter_loop_upsert_worker_status_total", "success" => result.is_ok().to_string()).increment(1); + if let Err(err) = result { warn!(error = %err, "failed to update worker status"); } }