Skip to content

Commit eedcbbb

Browse files
committed
enhancement(agent-data-plane): move internal observability/control plane to supervisor scheme
1 parent 871f057 commit eedcbbb

6 files changed

Lines changed: 530 additions & 275 deletions

File tree

bin/agent-data-plane/src/cli/run.rs

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,15 @@ use saluki_components::{
2929
},
3030
};
3131
use saluki_config::{ConfigurationLoader, GenericConfiguration};
32+
use saluki_core::runtime::SupervisorError;
3233
use saluki_core::topology::TopologyBlueprint;
3334
use saluki_env::{configstream::create_config_stream, EnvironmentProvider as _};
3435
use saluki_error::{generic_error, ErrorContext as _, GenericError};
3536
use saluki_health::HealthRegistry;
3637
use tokio::{select, time::interval};
3738
use tracing::{error, info, warn};
3839

39-
use crate::{
40-
components::apm_onboarding::ApmOnboardingConfiguration,
41-
internal::{spawn_control_plane, spawn_internal_observability_topology},
42-
};
40+
use crate::{components::apm_onboarding::ApmOnboardingConfiguration, internal::create_internal_supervisor};
4341
use crate::{config::DataPlaneConfiguration, env_provider::ADPEnvironmentProvider};
4442

4543
/// Runs the data plane.
@@ -129,18 +127,21 @@ pub async fn handle_run_command(
129127
)
130128
.await?;
131129

132-
spawn_internal_observability_topology(&dp_config, &component_registry, health_registry.clone())
133-
.error_context("Failed to spawn internal observability topology.")?;
134-
spawn_control_plane(
130+
// Create the internal supervisor (control plane + observability)
131+
let mut internal_supervisor = create_internal_supervisor(
135132
&config,
136133
&dp_config,
137134
&component_registry,
138135
health_registry.clone(),
139136
env_provider,
140137
dsd_stats_config,
141138
)
142-
.await
143-
.error_context("Failed to spawn control plane.")?;
139+
.error_context("Failed to create internal supervisor.")?;
140+
141+
// Create shutdown channel for the internal supervisor - we'll drive it in the main select loop
142+
let (internal_shutdown_tx, internal_shutdown_rx) = tokio::sync::oneshot::channel();
143+
let internal_supervisor_fut = internal_supervisor.run_with_shutdown(internal_shutdown_rx);
144+
tokio::pin!(internal_supervisor_fut);
144145

145146
// Run memory bounds validation to ensure that we can launch the topology with our configured memory limit, if any.
146147
let bounds_config = MemoryBoundsConfiguration::try_from_config(&config)?;
@@ -195,6 +196,27 @@ pub async fn handle_run_command(
195196

196197
let mut finished_with_error = false;
197198
select! {
199+
result = &mut internal_supervisor_fut => {
200+
match result {
201+
Err(SupervisorError::FailedToInitialize { source }) => {
202+
error!("Internal supervisor failed to initialize: {}. Shutting down...", source);
203+
finished_with_error = true;
204+
}
205+
// If we haven't hit an initialization error -- which implies an error we can't really recover from --
206+
// then just log for now, until we fully migrate everything over to the supervisor-based approach and
207+
// can dial in our supervisor configuration.
208+
//
209+
// For right now, this matches the previous behavior where the process would exit if we couldn't
210+
// configure/spawn the control plane or internal observability pipeline, but the process is unaffected
211+
// if either of those components fail at _runtime_.
212+
Err(e) => {
213+
warn!("Internal supervisor exited: {}", e);
214+
}
215+
Ok(()) => {
216+
warn!("Internal supervisor exited unexpectedly.");
217+
}
218+
}
219+
}
198220
_ = running_topology.wait_for_unexpected_finish() => {
199221
error!("Component unexpectedly finished. Shutting down...");
200222
finished_with_error = true;
@@ -204,7 +226,14 @@ pub async fn handle_run_command(
204226
}
205227
}
206228

207-
match running_topology.shutdown_with_timeout(Duration::from_secs(30)).await {
229+
// Shutdown the primary topology
230+
let topology_result = running_topology.shutdown_with_timeout(Duration::from_secs(30)).await;
231+
232+
// Signal the internal supervisor to shutdown (if still running) and drive it to completion
233+
let _ = internal_shutdown_tx.send(());
234+
let _ = internal_supervisor_fut.await;
235+
236+
match topology_result {
208237
Ok(()) => {
209238
if finished_with_error {
210239
warn!("Topology shutdown complete despite error(s).")

0 commit comments

Comments
 (0)