Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 41 additions & 7 deletions bin/agent-data-plane/src/cli/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use saluki_components::{
},
};
use saluki_config::{ConfigurationLoader, GenericConfiguration};
use saluki_core::runtime::SupervisorError;
use saluki_core::topology::TopologyBlueprint;
use saluki_env::EnvironmentProvider as _;
use saluki_error::{generic_error, ErrorContext as _, GenericError};
Expand All @@ -39,7 +40,7 @@ use crate::{
apm_onboarding::ApmOnboardingConfiguration, ottl_filter_processor::OttlFilterConfiguration,
ottl_transform_processor::OttlTransformConfiguration,
},
internal::{remote_agent::RemoteAgentBootstrap, spawn_control_plane, spawn_internal_observability_topology},
internal::{create_internal_supervisor, remote_agent::RemoteAgentBootstrap},
};
use crate::{config::DataPlaneConfiguration, env_provider::ADPEnvironmentProvider};

Expand Down Expand Up @@ -143,9 +144,8 @@ pub async fn handle_run_command(
)
.await?;

spawn_internal_observability_topology(&dp_config, &component_registry, health_registry.clone())
.error_context("Failed to spawn internal observability topology.")?;
spawn_control_plane(
// Create the internal supervisor (control plane + observability)
let mut internal_supervisor = create_internal_supervisor(
&config,
&dp_config,
&component_registry,
Expand All @@ -154,8 +154,12 @@ pub async fn handle_run_command(
dsd_stats_config,
ra_bootstrap,
)
.await
.error_context("Failed to spawn control plane.")?;
.error_context("Failed to create internal supervisor.")?;

// Create shutdown channel for the internal supervisor - we'll drive it in the main select loop
let (internal_shutdown_tx, internal_shutdown_rx) = tokio::sync::oneshot::channel();
let internal_supervisor_fut = internal_supervisor.run_with_shutdown(internal_shutdown_rx);
tokio::pin!(internal_supervisor_fut);

// Run memory bounds validation to ensure that we can launch the topology with our configured memory limit, if any.
let bounds_config = MemoryBoundsConfiguration::try_from_config(&config)?;
Expand Down Expand Up @@ -210,6 +214,27 @@ pub async fn handle_run_command(

let mut finished_with_error = false;
select! {
result = &mut internal_supervisor_fut => {
match result {
Err(SupervisorError::FailedToInitialize { child_name, source }) => {
error!(child_name, "Internal supervisor failed to initialize: {}. Shutting down...", source);
finished_with_error = true;
}
// If we haven't hit an initialization error -- which implies an error we can't really recover from --
// then just log for now, until we fully migrate everything over to the supervisor-based approach and
// can dial in our supervisor configuration.
//
// For right now, this matches the previous behavior where the process would exit if we couldn't
// configure/spawn the control plane or internal observability pipeline, but the process is unaffected
// if either of those components fail at _runtime_.
Err(e) => {
warn!("Internal supervisor exited: {}", e);
}
Ok(()) => {
warn!("Internal supervisor exited unexpectedly.");
}
}
}
_ = running_topology.wait_for_unexpected_finish() => {
error!("Component unexpectedly finished. Shutting down...");
finished_with_error = true;
Expand All @@ -219,7 +244,16 @@ pub async fn handle_run_command(
}
}

match running_topology.shutdown_with_timeout(Duration::from_secs(30)).await {
// Shutdown the primary topology
let topology_result = running_topology.shutdown_with_timeout(Duration::from_secs(30)).await;

// Signal the internal supervisor to shutdown (if still running) and drive it to completion.
// If the supervisor already exited (i.e., the select! above matched its branch), both the send
// and await resolve immediately — the send is a no-op and the future is already complete.
let _ = internal_shutdown_tx.send(());
let _ = internal_supervisor_fut.await;

match topology_result {
Ok(()) => {
if finished_with_error {
warn!("Topology shutdown complete despite error(s).")
Expand Down
Loading
Loading