Skip to content

Commit 092d5cc

Browse files
committed
enhancement(agent-data-plane): add restartable worker for health registry
1 parent 4d639de commit 092d5cc

2 files changed

Lines changed: 146 additions & 31 deletions

File tree

bin/agent-data-plane/src/internal/control_plane.rs

Lines changed: 9 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -61,33 +61,22 @@ impl Supervisable for HealthRegistryWorker {
6161
"health-registry"
6262
}
6363

64-
async fn initialize(&self, mut process_shutdown: ProcessShutdown) -> Result<SupervisorFuture, InitializationError> {
65-
// Spawn the health registry runner - it runs forever until the process exits
66-
//
67-
// TODO: This pattern doesn't allow us to gracefully restart the health registry worker if the supervisor itself
68-
// is restarted. We could do something simpler like `HealthRegistry` having a mutex to ensure only one instance is
69-
// running at a time, and that way the worker can be restarted gracefully, picking up where it left off.
64+
async fn initialize(&self, process_shutdown: ProcessShutdown) -> Result<SupervisorFuture, InitializationError> {
65+
// Spawn the health registry runner with the shutdown signal. The runner will exit
66+
// gracefully when the shutdown signal is received, and the response receiver will be
67+
// returned to the registry state so that a subsequent spawn can succeed if the supervisor
68+
// restarts this worker.
7069
let handle = self
7170
.health_registry
7271
.clone()
73-
.spawn()
72+
.spawn(process_shutdown)
7473
.await
7574
.map_err(|e| InitializationError::Failed { source: e })?;
7675

7776
Ok(Box::pin(async move {
78-
tokio::select! {
79-
result = handle => {
80-
// The health registry task completed (shouldn't happen normally)
81-
if let Err(e) = result {
82-
return Err(generic_error!("Health registry task panicked: {}", e));
83-
}
84-
Ok(())
85-
}
86-
_ = process_shutdown.wait_for_shutdown() => {
87-
// On shutdown, we just let the spawned task continue until the runtime shuts down.
88-
// The health registry doesn't have a graceful shutdown mechanism.
89-
Ok(())
90-
}
77+
match handle.await {
78+
Ok(()) => Ok(()),
79+
Err(e) => Err(generic_error!("Health registry task panicked: {}", e)),
9180
}
9281
}))
9382
}

lib/saluki-health/src/lib.rs

Lines changed: 137 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::future::Future;
12
#[cfg(test)]
23
use std::sync::atomic::AtomicUsize;
34
use std::{
@@ -357,12 +358,40 @@ impl HealthRegistry {
357358

358359
/// Spawns the health registry runner, which manages the scheduling and collection of liveness probes.
359360
///
361+
/// The runner will run until the given `shutdown` future resolves, at which point it will
362+
/// gracefully stop. After the runner stops, the health registry can be spawned again by
363+
/// calling this method with a new shutdown future.
364+
///
360365
/// # Errors
361366
///
362-
/// If the health registry has already been spawned, an error will be returned.
363-
pub async fn spawn(self) -> Result<JoinHandle<()>, GenericError> {
367+
/// If the health registry runner is currently running (i.e., has been spawned but not yet
368+
/// stopped), an error will be returned.
369+
pub async fn spawn<F>(self, shutdown: F) -> Result<JoinHandle<()>, GenericError>
370+
where
371+
F: Future<Output = ()> + Send + 'static,
372+
{
364373
let runner = self.into_runner()?;
365-
Ok(tokio::spawn(runner.run()))
374+
Ok(tokio::spawn(runner.run(shutdown)))
375+
}
376+
}
377+
378+
/// A guard that returns the response receiver back to the registry when dropped.
379+
///
380+
/// This allows the health registry runner to be restarted gracefully - when the runner stops
381+
/// (either due to shutdown or an error), the receiver is returned to the registry state so
382+
/// that a subsequent call to `spawn()` can succeed.
383+
struct RunnerGuard {
384+
registry: Arc<Mutex<RegistryState>>,
385+
responses_rx: Option<mpsc::Receiver<LivenessResponse>>,
386+
}
387+
388+
impl Drop for RunnerGuard {
389+
fn drop(&mut self) {
390+
if let Some(rx) = self.responses_rx.take() {
391+
let mut inner = self.registry.lock().unwrap();
392+
inner.responses_rx = Some(rx);
393+
debug!("Returned response receiver to registry state.");
394+
}
366395
}
367396
}
368397

@@ -410,7 +439,7 @@ struct Runner {
410439
registry: Arc<Mutex<RegistryState>>,
411440
pending_probes: DelayQueue<usize>,
412441
pending_timeouts: DelayQueue<usize>,
413-
responses_rx: mpsc::Receiver<LivenessResponse>,
442+
guard: RunnerGuard,
414443
pending_components_notify: Arc<Notify>,
415444
#[cfg(test)]
416445
state: Arc<RunnerState>,
@@ -424,11 +453,16 @@ impl Runner {
424453
#[cfg(test)]
425454
let state = Arc::new(RunnerState::new());
426455

456+
let guard = RunnerGuard {
457+
registry: Arc::clone(&registry),
458+
responses_rx: Some(responses_rx),
459+
};
460+
427461
Self {
428462
registry,
429463
pending_probes: DelayQueue::new(),
430464
pending_timeouts: DelayQueue::new(),
431-
responses_rx,
465+
guard,
432466
pending_components_notify,
433467
#[cfg(test)]
434468
state,
@@ -494,6 +528,26 @@ impl Runner {
494528
self.pending_probes.insert(component_id, duration);
495529
}
496530

531+
fn schedule_all_existing_components(&mut self) {
532+
// First, drain any pending components to avoid scheduling them twice.
533+
// This handles the case where components were registered before the runner started.
534+
let _pending = self.drain_pending_components();
535+
536+
let component_count = {
537+
let registry = self.registry.lock().unwrap();
538+
registry.component_state.len()
539+
};
540+
541+
for component_id in 0..component_count {
542+
self.process_component_health_update(component_id, HealthUpdate::Unknown);
543+
self.schedule_probe_for_component(component_id, Duration::ZERO);
544+
}
545+
546+
if component_count > 0 {
547+
debug!(component_count, "Scheduled probes for all existing components.");
548+
}
549+
}
550+
497551
fn handle_component_probe_response(&mut self, response: LivenessResponse) {
498552
let component_id = response.request.component_id;
499553
let timeout_key = response.request.timeout_key;
@@ -551,11 +605,33 @@ impl Runner {
551605
}
552606
}
553607

554-
async fn run(mut self) {
608+
async fn run<F: Future<Output = ()>>(mut self, shutdown: F) {
555609
info!("Health checker running.");
556610

611+
// Take the response receiver out of the guard so we can use it in the select loop.
612+
// It will be put back when the guard is dropped.
613+
let mut responses_rx = self
614+
.guard
615+
.responses_rx
616+
.take()
617+
.expect("responses_rx should always be Some when Runner is created");
618+
619+
// Schedule probes for all existing components. This allows the runner to "pick up where it
620+
// left off" after a restart - any components that were registered before the runner was
621+
// restarted will be immediately probed.
622+
self.schedule_all_existing_components();
623+
624+
// Pin the shutdown future so we can poll it in the select loop.
625+
let mut shutdown = std::pin::pin!(shutdown);
626+
557627
loop {
558628
select! {
629+
// Shutdown signal received - exit the run loop gracefully.
630+
_ = &mut shutdown => {
631+
info!("Health checker shutting down.");
632+
break;
633+
},
634+
559635
// A component has been scheduled to have a liveness probe sent to it.
560636
Some(entry) = self.pending_probes.next() => {
561637
#[cfg(test)]
@@ -579,7 +655,7 @@ impl Runner {
579655
},
580656

581657
// A probe response has been received.
582-
Some(response) = self.responses_rx.recv() => {
658+
Some(response) = responses_rx.recv() => {
583659
self.handle_component_probe_response(response);
584660
},
585661

@@ -594,13 +670,21 @@ impl Runner {
594670
},
595671
}
596672
}
673+
674+
// Put the receiver back in the guard so it can be returned to the registry state when dropped.
675+
self.guard.responses_rx = Some(responses_rx);
676+
677+
// When we exit the loop, the RunnerGuard will be dropped, returning the response receiver
678+
// back to the registry state so that a subsequent spawn() can succeed.
597679
}
598680
}
599681

600682
#[cfg(test)]
601683
mod tests {
602684
use std::future::Future;
603685

686+
use futures::FutureExt as _;
687+
use tokio::sync::oneshot;
604688
use tokio_test::{
605689
assert_pending, assert_ready,
606690
task::{spawn, Spawn},
@@ -630,7 +714,10 @@ mod tests {
630714
// This ensures that the component is registered, and that it schedules/sends an initial probe request to the component:
631715
let runner = registry.into_runner().expect("should not fail to create runner");
632716
let runner_state = runner.state();
633-
let registry_task = spawn(runner.run());
717+
718+
// Create a shutdown future that never resolves (for tests that don't need shutdown).
719+
let shutdown = std::future::pending();
720+
let registry_task = spawn(runner.run(shutdown));
634721

635722
(handle, registry_task, registry_state, runner_state)
636723
}
@@ -669,12 +756,51 @@ mod tests {
669756
}
670757

671758
#[tokio::test]
672-
async fn duplicate_registry_spawn_fails() {
759+
async fn duplicate_registry_spawn_fails_while_running() {
673760
let registry = HealthRegistry::new();
674761
let registry2 = registry.clone();
675762

676-
assert!(registry.spawn().await.is_ok());
677-
assert!(registry2.spawn().await.is_err());
763+
// First spawn should succeed (using a future that never resolves).
764+
let shutdown = std::future::pending();
765+
assert!(registry.spawn(shutdown).await.is_ok());
766+
767+
// Second spawn should fail while the first is still running.
768+
let shutdown2 = std::future::pending();
769+
assert!(registry2.spawn(shutdown2).await.is_err());
770+
}
771+
772+
#[tokio::test]
773+
async fn registry_can_be_respawned_after_shutdown() {
774+
let registry = HealthRegistry::new();
775+
let registry2 = registry.clone();
776+
let registry3 = registry.clone();
777+
778+
// First spawn should succeed.
779+
let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
780+
let join_handle = registry
781+
.spawn(shutdown_rx.map(|_| ()))
782+
.await
783+
.expect("first spawn should succeed");
784+
785+
// Trigger shutdown.
786+
let _ = shutdown_tx.send(());
787+
788+
// Wait for the runner to stop.
789+
join_handle.await.expect("runner should complete without panic");
790+
791+
// Now we should be able to spawn again.
792+
let shutdown2 = std::future::pending();
793+
assert!(
794+
registry2.spawn(shutdown2).await.is_ok(),
795+
"should be able to respawn after shutdown"
796+
);
797+
798+
// But not a third time while the second is running.
799+
let shutdown3 = std::future::pending();
800+
assert!(
801+
registry3.spawn(shutdown3).await.is_err(),
802+
"should not be able to spawn while running"
803+
);
678804
}
679805

680806
#[test]

0 commit comments

Comments
 (0)