Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/adapter/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ pub enum Command {
instance_id: ComputeInstanceId,
tx: oneshot::Sender<
Result<
mz_compute_client::controller::instance::Client<mz_repr::Timestamp>,
mz_compute_client::controller::instance_client::InstanceClient<mz_repr::Timestamp>,
mz_compute_client::controller::error::InstanceMissing,
>,
>,
Expand Down
4 changes: 2 additions & 2 deletions src/adapter/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -749,10 +749,10 @@ impl AdapterError {
}

pub fn concurrent_dependency_drop_from_instance_peek_error(
e: mz_compute_client::controller::instance::PeekError,
e: mz_compute_client::controller::instance_client::PeekError,
compute_instance: ComputeInstanceId,
) -> AdapterError {
use mz_compute_client::controller::instance::PeekError::*;
use mz_compute_client::controller::instance_client::PeekError::*;
match e {
ReplicaMissing(id) => AdapterError::ConcurrentDependencyDrop {
dependency_kind: "replica",
Expand Down
9 changes: 5 additions & 4 deletions src/adapter/src/peek_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ use std::collections::BTreeMap;
use std::sync::Arc;

use differential_dataflow::consolidation::consolidate;
use mz_compute_client::controller::error::{CollectionLookupError, InstanceMissing};
use mz_compute_client::controller::error::CollectionLookupError;
use mz_compute_client::controller::error::InstanceMissing;
use mz_compute_client::controller::instance_client::InstanceClient;
use mz_compute_client::protocol::command::PeekTarget;
use mz_compute_types::ComputeInstanceId;
use mz_expr::row::RowCollection;
Expand Down Expand Up @@ -54,8 +56,7 @@ pub struct PeekClient {
/// Note that these are never cleaned up. In theory, this could lead to a very slow memory leak
/// if a long-running user session keeps peeking on clusters that are being created and dropped
/// in a hot loop. Hopefully this won't occur any time soon.
compute_instances:
BTreeMap<ComputeInstanceId, mz_compute_client::controller::instance::Client<Timestamp>>,
compute_instances: BTreeMap<ComputeInstanceId, InstanceClient<Timestamp>>,
/// Handle to storage collections for reading frontiers and policies.
pub storage_collections: StorageCollectionsHandle,
/// A generator for transient `GlobalId`s, shared with Coordinator.
Expand Down Expand Up @@ -93,7 +94,7 @@ impl PeekClient {
pub async fn ensure_compute_instance_client(
&mut self,
compute_instance: ComputeInstanceId,
) -> Result<mz_compute_client::controller::instance::Client<Timestamp>, InstanceMissing> {
) -> Result<InstanceClient<Timestamp>, InstanceMissing> {
if !self.compute_instances.contains_key(&compute_instance) {
let client = self
.call_coordinator(|tx| Command::GetComputeInstanceClient {
Expand Down
14 changes: 8 additions & 6 deletions src/compute-client/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,21 @@ use crate::controller::error::{
ReplicaCreationError, ReplicaDropError,
};
use crate::controller::instance::{Instance, SharedCollectionState};
use crate::controller::instance_client::InstanceClient;
use crate::controller::introspection::{IntrospectionUpdates, spawn_introspection_sink};
use crate::controller::replica::ReplicaConfig;
use crate::logging::{LogVariant, LoggingConfig};
use crate::metrics::ComputeControllerMetrics;
use crate::protocol::command::{ComputeParameters, PeekTarget};
use crate::protocol::response::{PeekResponse, SubscribeBatch};

mod instance;
mod introspection;
mod replica;
mod sequential_hydration;

pub mod error;
pub mod instance;
pub mod instance_client;

pub(crate) type StorageCollections<T> = Arc<
dyn mz_storage_client::storage_collections::StorageCollections<Timestamp = T> + Send + Sync,
Expand Down Expand Up @@ -337,11 +339,11 @@ impl<T: ComputeControllerTimestamp> ComputeController<T> {
self.instances.get(&id).ok_or(InstanceMissing(id))
}

/// Return an `instance::Client` for the indicated compute instance.
/// Return an `instance_client::InstanceClient` for the indicated compute instance.
pub fn instance_client(
&self,
id: ComputeInstanceId,
) -> Result<instance::Client<T>, InstanceMissing> {
) -> Result<InstanceClient<T>, InstanceMissing> {
self.instance(id).map(|instance| instance.client.clone())
}

Expand Down Expand Up @@ -527,7 +529,7 @@ where
logs.push((log, id, shared));
}

let client = instance::Client::spawn(
let client = InstanceClient::spawn(
id,
self.build_info,
Arc::clone(&self.storage_collections),
Expand Down Expand Up @@ -1067,13 +1069,13 @@ where

#[derive(Debug)]
struct InstanceState<T: ComputeControllerTimestamp> {
client: instance::Client<T>,
client: InstanceClient<T>,
replicas: BTreeSet<ReplicaId>,
collections: BTreeMap<GlobalId, Collection<T>>,
}

impl<T: ComputeControllerTimestamp> InstanceState<T> {
fn new(client: instance::Client<T>, collections: BTreeMap<GlobalId, Collection<T>>) -> Self {
fn new(client: InstanceClient<T>, collections: BTreeMap<GlobalId, Collection<T>>) -> Self {
Self {
client,
replicas: Default::default(),
Expand Down
2 changes: 1 addition & 1 deletion src/compute-client/src/controller/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use thiserror::Error;

pub use mz_storage_types::errors::CollectionMissing;

use crate::controller::instance::InstanceShutDown;
use crate::controller::instance_client::InstanceShutDown;
use crate::controller::{ComputeInstanceId, ReplicaId};

/// The error returned by replica-targeted peeks and subscribes when the target replica
Expand Down
211 changes: 10 additions & 201 deletions src/compute-client/src/controller/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use std::time::{Duration, Instant};
use chrono::{DateTime, DurationRound, TimeDelta, Utc};
use mz_build_info::BuildInfo;
use mz_cluster_client::WallclockLagFn;
use mz_compute_types::ComputeInstanceId;
use mz_compute_types::dataflows::{BuildDesc, DataflowDescription};
use mz_compute_types::plan::render_plan::RenderPlan;
use mz_compute_types::sinks::{
Expand Down Expand Up @@ -45,9 +44,7 @@ use thiserror::Error;
use timely::PartialOrder;
use timely::progress::frontier::MutableAntichain;
use timely::progress::{Antichain, ChangeBatch, Timestamp};
use tokio::sync::mpsc::error::SendError;
use tokio::sync::{mpsc, oneshot};
use tracing::debug_span;
use uuid::Uuid;

use crate::controller::error::{
Expand Down Expand Up @@ -102,32 +99,7 @@ impl From<CollectionMissing> for DataflowCreationError {
}
}

#[derive(Error, Debug)]
#[error("the instance has shut down")]
pub(super) struct InstanceShutDown;

/// Errors arising during peek processing.
#[derive(Error, Debug)]
pub enum PeekError {
/// The replica that the peek was issued against does not exist.
#[error("replica does not exist: {0}")]
ReplicaMissing(ReplicaId),
/// The read hold that was passed in is against the wrong collection.
#[error("read hold ID does not match peeked collection: {0}")]
ReadHoldIdMismatch(GlobalId),
/// The read hold that was passed in is for a later time than the peek's timestamp.
#[error("insufficient read hold provided: {0}")]
ReadHoldInsufficient(GlobalId),
/// The peek's target instance has shut down.
#[error("the instance has shut down")]
InstanceShutDown,
}

impl From<InstanceShutDown> for PeekError {
fn from(_error: InstanceShutDown) -> Self {
Self::InstanceShutDown
}
}
use crate::controller::instance_client::PeekError;

#[derive(Error, Debug)]
pub(super) enum ReadPolicyError {
Expand All @@ -144,173 +116,7 @@ impl From<CollectionMissing> for ReadPolicyError {
}

/// A command sent to an [`Instance`] task.
type Command<T> = Box<dyn FnOnce(&mut Instance<T>) + Send>;

/// A client for an `Instance` task.
#[derive(Clone, derivative::Derivative)]
#[derivative(Debug)]
pub struct Client<T: ComputeControllerTimestamp> {
/// A sender for commands for the instance.
command_tx: mpsc::UnboundedSender<Command<T>>,
/// A sender for read hold changes for collections installed on the instance.
#[derivative(Debug = "ignore")]
read_hold_tx: read_holds::ChangeTx<T>,
}

impl<T: ComputeControllerTimestamp> Client<T> {
pub(super) fn read_hold_tx(&self) -> read_holds::ChangeTx<T> {
Arc::clone(&self.read_hold_tx)
}

/// Call a method to be run on the instance task, by sending a message to the instance.
/// Does not wait for a response message.
pub(super) fn call<F>(&self, f: F) -> Result<(), InstanceShutDown>
where
F: FnOnce(&mut Instance<T>) + Send + 'static,
{
let otel_ctx = OpenTelemetryContext::obtain();
self.command_tx
.send(Box::new(move |instance| {
let _span = debug_span!("instance::call").entered();
otel_ctx.attach_as_parent();

f(instance)
}))
.map_err(|_send_error| InstanceShutDown)
}

/// Call a method to be run on the instance task, by sending a message to the instance and
/// waiting for a response message.
pub(super) async fn call_sync<F, R>(&self, f: F) -> Result<R, InstanceShutDown>
where
F: FnOnce(&mut Instance<T>) -> R + Send + 'static,
R: Send + 'static,
{
let (tx, rx) = oneshot::channel();
let otel_ctx = OpenTelemetryContext::obtain();
self.command_tx
.send(Box::new(move |instance| {
let _span = debug_span!("instance::call_sync").entered();
otel_ctx.attach_as_parent();
let result = f(instance);
let _ = tx.send(result);
}))
.map_err(|_send_error| InstanceShutDown)?;

rx.await.map_err(|_| InstanceShutDown)
}
}

impl<T> Client<T>
where
T: ComputeControllerTimestamp,
{
pub(super) fn spawn(
id: ComputeInstanceId,
build_info: &'static BuildInfo,
storage: StorageCollections<T>,
peek_stash_persist_location: PersistLocation,
arranged_logs: Vec<(LogVariant, GlobalId, SharedCollectionState<T>)>,
metrics: InstanceMetrics,
now: NowFn,
wallclock_lag: WallclockLagFn<T>,
dyncfg: Arc<ConfigSet>,
response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
read_only: bool,
) -> Self {
let (command_tx, command_rx) = mpsc::unbounded_channel();

let read_hold_tx: read_holds::ChangeTx<_> = {
let command_tx = command_tx.clone();
Arc::new(move |id, change: ChangeBatch<_>| {
let cmd: Command<_> = {
let change = change.clone();
Box::new(move |i| i.apply_read_hold_change(id, change.clone()))
};
command_tx.send(cmd).map_err(|_| SendError((id, change)))
})
};

mz_ore::task::spawn(
|| format!("compute-instance-{id}"),
Instance::new(
build_info,
storage,
peek_stash_persist_location,
arranged_logs,
metrics,
now,
wallclock_lag,
dyncfg,
command_rx,
response_tx,
Arc::clone(&read_hold_tx),
introspection_tx,
read_only,
)
.run(),
);

Self {
command_tx,
read_hold_tx,
}
}

/// Acquires a `ReadHold` and collection write frontier for each of the identified compute
/// collections.
pub async fn acquire_read_holds_and_collection_write_frontiers(
&self,
ids: Vec<GlobalId>,
) -> Result<Vec<(GlobalId, ReadHold<T>, Antichain<T>)>, CollectionLookupError> {
self.call_sync(move |i| {
let mut result = Vec::new();
for id in ids.into_iter() {
result.push((
id,
i.acquire_read_hold(id)?,
i.collection_write_frontier(id)?,
));
}
Ok(result)
})
.await?
}

/// Issue a peek by calling into the instance task.
///
/// If this returns an error, then it didn't modify any `Instance` state.
pub async fn peek(
&self,
peek_target: PeekTarget,
literal_constraints: Option<Vec<Row>>,
uuid: Uuid,
timestamp: T,
result_desc: RelationDesc,
finishing: RowSetFinishing,
map_filter_project: mz_expr::SafeMfpPlan,
target_read_hold: ReadHold<T>,
target_replica: Option<ReplicaId>,
peek_response_tx: oneshot::Sender<PeekResponse>,
) -> Result<(), PeekError> {
self.call_sync(move |i| {
i.peek(
peek_target,
literal_constraints,
uuid,
timestamp,
result_desc,
finishing,
map_filter_project,
target_read_hold,
target_replica,
peek_response_tx,
)
})
.await?
}
}
pub(super) type Command<T> = Box<dyn FnOnce(&mut Instance<T>) + Send>;

/// A response from a replica, composed of a replica ID, the replica's current epoch, and the
/// compute response itself.
Expand Down Expand Up @@ -1053,7 +859,10 @@ impl<T: ComputeControllerTimestamp> Instance<T> {
}

/// Reports the current write frontier for the identified compute collection.
fn collection_write_frontier(&self, id: GlobalId) -> Result<Antichain<T>, CollectionMissing> {
pub(super) fn collection_write_frontier(
&self,
id: GlobalId,
) -> Result<Antichain<T>, CollectionMissing> {
Ok(self.collection(id)?.write_frontier())
}
}
Expand All @@ -1062,7 +871,7 @@ impl<T> Instance<T>
where
T: ComputeControllerTimestamp,
{
fn new(
pub(super) fn new(
build_info: &'static BuildInfo,
storage: StorageCollections<T>,
peek_stash_persist_location: PersistLocation,
Expand Down Expand Up @@ -1126,7 +935,7 @@ where
}
}

async fn run(mut self) {
pub(super) async fn run(mut self) {
self.send(ComputeCommand::Hello {
// The nonce is protocol iteration-specific and will be set in
// `ReplicaTask::specialize_command`.
Expand Down Expand Up @@ -1930,7 +1739,7 @@ where
}

/// Apply a collection read hold change.
fn apply_read_hold_change(&mut self, id: GlobalId, mut update: ChangeBatch<T>) {
pub(super) fn apply_read_hold_change(&mut self, id: GlobalId, mut update: ChangeBatch<T>) {
let Some(collection) = self.collections.get_mut(&id) else {
soft_panic_or_log!(
"read hold change for absent collection (id={id}, changes={update:?})"
Expand Down Expand Up @@ -2443,7 +2252,7 @@ where
///
/// This mirrors the logic used by the controller-side `InstanceState::acquire_read_hold`,
/// but executes on the instance task itself.
fn acquire_read_hold(&self, id: GlobalId) -> Result<ReadHold<T>, CollectionMissing> {
pub(super) fn acquire_read_hold(&self, id: GlobalId) -> Result<ReadHold<T>, CollectionMissing> {
// Similarly to InstanceState::acquire_read_hold and StorageCollections::acquire_read_holds,
// we acquire read holds at the earliest possible time rather than returning a copy
// of the implied read hold. This is so that dependents can acquire read holds on
Expand Down
Loading