diff --git a/Cargo.lock b/Cargo.lock index f1290b7f..f8a41c56 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -703,6 +703,20 @@ dependencies = [ "parking_lot_core", ] +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "der" version = "0.7.10" @@ -2847,7 +2861,7 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e56dd856803e253c8f298af3f4d7eb0ae5e23a737252cd90bb4f3b435033b2d" dependencies = [ - "dashmap", + "dashmap 5.5.3", "futures", "lazy_static", "log", @@ -3462,6 +3476,7 @@ dependencies = [ "futures-core", "futures-sink", "pin-project-lite", + "slab", "tokio", ] @@ -4147,6 +4162,7 @@ name = "waymark-core-backend" version = "0.1.0" dependencies = [ "chrono", + "either", "nonempty-collections", "serde", "waymark-backends-core", @@ -4663,6 +4679,7 @@ dependencies = [ "waymark-scheduler-loop", "waymark-scheduler-loop-core", "waymark-tokio-metrics-bringup", + "waymark-vcr-bringup", "waymark-webapp-bringup", "waymark-worker-remote", "waymark-worker-status-reporter", @@ -4758,6 +4775,131 @@ dependencies = [ "waymark-utils-futures", ] +[[package]] +name = "waymark-vcr-bringup" +version = "0.1.0" +dependencies = [ + "either", + "tokio", + "waymark-backends-core", + "waymark-core-backend", + "waymark-jsonlines", + "waymark-vcr-file", + "waymark-vcr-playback", + "waymark-vcr-playback-worker-pool", + "waymark-vcr-recorder", + "waymark-vcr-recorder-backend", + "waymark-vcr-recorder-worker-pool", + "waymark-workflow-registry-backend", +] + +[[package]] +name = "waymark-vcr-core" +version = "0.1.0" +dependencies = [ + "serde", + "waymark-ids", + "waymark-worker-core", +] + +[[package]] +name = "waymark-vcr-file" +version = "0.1.0" +dependencies = [ + "serde", + "serde_json", + "tokio", + "uuid", + "waymark-dag", + "waymark-ids", + "waymark-jsonlines", + "waymark-runner-executor-core", + "waymark-vcr-core", + "waymark-worker-core", +] + +[[package]] +name = "waymark-vcr-playback" +version = "0.1.0" +dependencies = [ + "dashmap 6.1.0", + "thiserror", + "tokio", + "tracing", + "uuid", + "waymark-backends-core", + "waymark-core-backend", + "waymark-ids", + "waymark-jsonlines", + "waymark-runner-executor-core", + "waymark-runner-state", + "waymark-vcr-core", + "waymark-vcr-file", + "waymark-vcr-playback-worker-pool-core", + "waymark-worker-core", + "waymark-workflow-registry-backend", +] + +[[package]] +name = "waymark-vcr-playback-worker-pool" +version = "0.1.0" +dependencies = [ + "futures-util", + "nonempty-collections", + "tokio", + "tokio-util", + "waymark-vcr-playback-worker-pool-core", + "waymark-worker-core", +] + +[[package]] +name = "waymark-vcr-playback-worker-pool-core" +version = "0.1.0" +dependencies = [ + "waymark-worker-core", +] + +[[package]] +name = "waymark-vcr-recorder" +version = "0.1.0" +dependencies = [ + "thiserror", + "tokio", + "tracing", + "uuid", + "waymark-core-backend", + "waymark-ids", + "waymark-jsonlines", + "waymark-vcr-core", + "waymark-vcr-file", + "waymark-worker-core", + "waymark-workflow-registry-backend", +] + +[[package]] +name = "waymark-vcr-recorder-backend" +version = "0.1.0" +dependencies = [ + "nonempty-collections", + "tracing", + "uuid", + "waymark-backends-core", + "waymark-core-backend", + "waymark-ids", + "waymark-vcr-recorder", + "waymark-workflow-registry-backend", +] + +[[package]] +name = "waymark-vcr-recorder-worker-pool" +version = "0.1.0" +dependencies = [ + "nonempty-collections", + "tracing", + "waymark-vcr-recorder", + "waymark-worker-core", +] + [[package]] name = "waymark-webapp-backend" version = "0.1.0" @@ -4915,6 +5057,7 @@ dependencies = [ name = "waymark-workflow-registry-backend" version = "0.1.0" dependencies = [ + "either", "waymark-backends-core", "waymark-ids", ] diff --git a/Cargo.toml b/Cargo.toml index c0b84afa..6715f9b3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ waymark-ids = { path = "crates/lib/ids" } waymark-ir-conversions = { path = "crates/lib/ir-conversions" } waymark-ir-format = { path = "crates/lib/ir-format" } waymark-ir-parser = { path = "crates/lib/ir-parser" } +waymark-jsonlines = { path = "crates/lib/jsonlines" } waymark-message-conversions = { path = "crates/lib/message-conversions" } waymark-metrics-util = { path = "crates/lib/metrics-util" } waymark-nonzero-duration = { path = "crates/lib/nonzero-duration" } @@ -57,6 +58,15 @@ waymark-timed-future = { path = "crates/lib/timed-future" } waymark-tokio-metrics-bringup = { path = "crates/lib/tokio-metrics-bringup" } waymark-utils-futures = { path = "crates/lib/utils-futures" } waymark-utils-tokio-channel = { path = "crates/lib/utils-tokio-channel" } +waymark-vcr-bringup = { path = "crates/lib/vcr-bringup" } +waymark-vcr-core = { path = "crates/lib/vcr-core" } +waymark-vcr-file = { path = "crates/lib/vcr-file" } +waymark-vcr-playback = { path = "crates/lib/vcr-playback" } +waymark-vcr-playback-worker-pool = { path = "crates/lib/vcr-playback-worker-pool" } +waymark-vcr-playback-worker-pool-core = { path = "crates/lib/vcr-playback-worker-pool-core" } +waymark-vcr-recorder = { path = "crates/lib/vcr-recorder" } +waymark-vcr-recorder-backend = { path = "crates/lib/vcr-recorder-backend" } +waymark-vcr-recorder-worker-pool = { path = "crates/lib/vcr-recorder-worker-pool" } waymark-webapp-backend = { path = "crates/lib/webapp-backend" } waymark-webapp-bringup = { path = "crates/lib/webapp-bringup" } waymark-webapp-config = { path = "crates/lib/webapp-config" } @@ -71,6 +81,7 @@ waymark-worker-status-core = { path = "crates/lib/worker-status-core" } waymark-worker-status-reporter = { path = "crates/lib/worker-status-reporter" } waymark-workflow-registry-backend = { path = "crates/lib/workflow-registry-backend" } + anyhow = "1" axum = "0.8" chrono = { version = "0.4", default-features = false } @@ -78,6 +89,7 @@ clap = "4.5" color-eyre = "0.6" console-subscriber = "0.5" cron = "0.12" +dashmap = "6" either = "1" envfury = "0.2" function_name = "0.3" diff --git a/crates/bin/start-workers/Cargo.toml b/crates/bin/start-workers/Cargo.toml index 525e1cea..748771dc 100644 --- a/crates/bin/start-workers/Cargo.toml +++ b/crates/bin/start-workers/Cargo.toml @@ -19,6 +19,7 @@ waymark-runloop = { workspace = true } waymark-scheduler-loop = { workspace = true } waymark-scheduler-loop-core = { workspace = true } waymark-tokio-metrics-bringup = { workspace = true } +waymark-vcr-bringup = { workspace = true } waymark-webapp-bringup = { workspace = true } waymark-worker-remote = { workspace = true } waymark-worker-status-reporter = { workspace = true } diff --git a/crates/bin/start-workers/src/main.rs b/crates/bin/start-workers/src/main.rs index 81d658f1..d1c962eb 100644 --- a/crates/bin/start-workers/src/main.rs +++ b/crates/bin/start-workers/src/main.rs @@ -224,10 +224,28 @@ async fn main() -> Result<()> { } }); + let vcr = waymark_vcr_bringup::setup(waymark_vcr_bringup::Mode::Record { + log_file_path: format!("vcr-{lock_uuid}.jsonl").into(), + command_buffer_size: 1024.try_into().unwrap(), + }) + .await?; + + let mut vcr_player_task = None; + match vcr.more { + waymark_vcr_bringup::BringupMore::None => {} + waymark_vcr_bringup::BringupMore::Playback { player_params } => { + let waymark_vcr_bringup::playback::PlayerBringup { player_task } = + waymark_vcr_bringup::playback::player(player_params, backend.clone()); + vcr_player_task = Some(player_task); + } + } + // Run the runloop. + let runloop_remote_pool = waymark_vcr_bringup::pool(vcr.pool, remote_pool.clone()); + let runloop_backend = waymark_vcr_bringup::backend(vcr.backend, backend.clone()); let runloop = waymark_runloop::RunLoop::new_with_shutdown( - remote_pool.clone(), - backend.clone(), + runloop_remote_pool, + runloop_backend, RunLoopConfig { max_concurrent_instances: config.max_concurrent_instances, executor_shards: config.executor_shards, @@ -262,6 +280,20 @@ async fn main() -> Result<()> { let _ = tokio::time::timeout(Duration::from_secs(2), status_reporter_handle).await; let _ = tokio::time::timeout(Duration::from_secs(2), expired_lock_reclaimer_handle).await; + match vcr.tasks { + waymark_vcr_bringup::BringupTasks::Recorder { recorder } => { + let _ = tokio::time::timeout(Duration::from_secs(2), recorder).await; + } + waymark_vcr_bringup::BringupTasks::Playback { loader } => { + let _ = tokio::time::timeout(Duration::from_secs(2), loader).await; + } + waymark_vcr_bringup::BringupTasks::None => {} + } + + if let Some(player_task) = vcr_player_task { + let _ = tokio::time::timeout(Duration::from_secs(2), player_task).await; + } + if let Err(err) = remote_pool.shutdown().await { warn!(error = %err, "worker pool shutdown failed"); } diff --git a/crates/lib/core-backend/Cargo.toml b/crates/lib/core-backend/Cargo.toml index a242f964..a20594e6 100644 --- a/crates/lib/core-backend/Cargo.toml +++ b/crates/lib/core-backend/Cargo.toml @@ -4,6 +4,9 @@ version.workspace = true publish.workspace = true edition = "2024" +[features] +default = ["either"] + [dependencies] waymark-backends-core = { workspace = true } waymark-ids = { workspace = true } @@ -11,6 +14,7 @@ waymark-runner-executor-core = { workspace = true } waymark-runner-state = { workspace = true } chrono = { workspace = true } +either = { workspace = true, optional = true } nonempty-collections = { workspace = true } serde = { workspace = true, features = ["derive"] } diff --git a/crates/lib/core-backend/src/either.rs b/crates/lib/core-backend/src/either.rs new file mode 100644 index 00000000..0e65f28b --- /dev/null +++ b/crates/lib/core-backend/src/either.rs @@ -0,0 +1,101 @@ +use either::Either; + +use crate::CoreBackend; + +impl CoreBackend for Either +where + Left: CoreBackend, + Right: CoreBackend, +{ + fn save_graphs<'a>( + &'a self, + claim: crate::LockClaim, + graphs: &'a [crate::GraphUpdate], + ) -> impl Future>> + + Send + + 'a { + match self { + Either::Left(inner) => Either::Left(inner.save_graphs(claim, graphs)), + Either::Right(inner) => Either::Right(inner.save_graphs(claim, graphs)), + } + } + + fn save_actions_done<'a>( + &'a self, + actions: &'a [crate::ActionDone], + ) -> impl Future> + Send + 'a { + match self { + Either::Left(inner) => Either::Left(inner.save_actions_done(actions)), + Either::Right(inner) => Either::Right(inner.save_actions_done(actions)), + } + } + + type PollQueuedInstancesError = Left::PollQueuedInstancesError; + + fn poll_queued_instances( + &self, + size: std::num::NonZeroUsize, + claim: crate::LockClaim, + ) -> impl Future< + Output = Result< + nonempty_collections::NEVec, + Self::PollQueuedInstancesError, + >, + > + Send + + '_ { + match self { + Either::Left(inner) => Either::Left(inner.poll_queued_instances(size, claim)), + Either::Right(inner) => Either::Right(inner.poll_queued_instances(size, claim)), + } + } + + fn refresh_instance_locks<'a>( + &'a self, + claim: crate::LockClaim, + instance_ids: &'a [waymark_ids::InstanceId], + ) -> impl Future>> + + Send + + 'a { + match self { + Either::Left(inner) => Either::Left(inner.refresh_instance_locks(claim, instance_ids)), + Either::Right(inner) => { + Either::Right(inner.refresh_instance_locks(claim, instance_ids)) + } + } + } + + fn release_instance_locks<'a>( + &'a self, + lock_uuid: waymark_ids::LockId, + instance_ids: &'a [waymark_ids::InstanceId], + ) -> impl Future> + Send + 'a { + match self { + Either::Left(inner) => { + Either::Left(inner.release_instance_locks(lock_uuid, instance_ids)) + } + Either::Right(inner) => { + Either::Right(inner.release_instance_locks(lock_uuid, instance_ids)) + } + } + } + + fn save_instances_done<'a>( + &'a self, + instances: &'a [crate::InstanceDone], + ) -> impl Future> + Send + 'a { + match self { + Either::Left(inner) => Either::Left(inner.save_instances_done(instances)), + Either::Right(inner) => Either::Right(inner.save_instances_done(instances)), + } + } + + fn queue_instances<'a>( + &'a self, + instances: &'a [crate::QueuedInstance], + ) -> impl Future> + Send + 'a { + match self { + Either::Left(inner) => Either::Left(inner.queue_instances(instances)), + Either::Right(inner) => Either::Right(inner.queue_instances(instances)), + } + } +} diff --git a/crates/lib/core-backend/src/lib.rs b/crates/lib/core-backend/src/lib.rs index a7528ece..95102e4e 100644 --- a/crates/lib/core-backend/src/lib.rs +++ b/crates/lib/core-backend/src/lib.rs @@ -1,6 +1,10 @@ //! Core backend traits for waymark. +#[cfg(feature = "either")] +mod either; + mod data; + pub mod poll_queued_instances; use nonempty_collections::NEVec; diff --git a/crates/lib/vcr-bringup/Cargo.toml b/crates/lib/vcr-bringup/Cargo.toml new file mode 100644 index 00000000..440c5aad --- /dev/null +++ b/crates/lib/vcr-bringup/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "waymark-vcr-bringup" +edition = "2024" +version.workspace = true +publish.workspace = true + +[dependencies] +waymark-backends-core = { workspace = true } +waymark-core-backend = { workspace = true } +waymark-jsonlines = { workspace = true } +waymark-vcr-file = { workspace = true } +waymark-vcr-playback = { workspace = true } +waymark-vcr-playback-worker-pool = { workspace = true } +waymark-vcr-recorder = { workspace = true } +waymark-vcr-recorder-backend = { workspace = true } +waymark-vcr-recorder-worker-pool = { workspace = true } +waymark-workflow-registry-backend = { workspace = true } + +either = { workspace = true } +tokio = { workspace = true, features = ["rt"] } diff --git a/crates/lib/vcr-bringup/src/lib.rs b/crates/lib/vcr-bringup/src/lib.rs new file mode 100644 index 00000000..8c029b83 --- /dev/null +++ b/crates/lib/vcr-bringup/src/lib.rs @@ -0,0 +1,175 @@ +//! VCR bringup provides the VCR setup procedures optimized for use in +//! the configurable application initialization. +//! +//! The types are tailored to be easy to use in the sequential initialization +//! logic and take care of type regularization without forcing the use of +//! dyn-traits. + +use std::{num::NonZeroUsize, sync::Arc}; + +use either::Either; + +pub mod playback; +pub mod recorder; + +pub enum Mode { + Record { + log_file_path: std::path::PathBuf, + command_buffer_size: NonZeroUsize, + }, + Playback { + log_file_path: std::path::PathBuf, + loaded_log_items_buffer_size: NonZeroUsize, + pool_ingest_buffer_size: NonZeroUsize, + }, + Off, +} + +#[must_use] +pub struct Bringup { + pub pool: BringupPool, + pub backend: BringupBackend, + pub tasks: BringupTasks, + pub more: BringupMore, +} + +#[must_use] +pub enum BringupPool { + Recorder(waymark_vcr_recorder::pool::Handle), + Playback(playback::PoolParams), + None, +} + +#[must_use] +pub enum BringupBackend { + Recorder(waymark_vcr_recorder::backend::Handle), + None, +} + +#[must_use] +pub enum BringupTasks { + Recorder { + recorder: tokio::task::JoinHandle>, + }, + Playback { + loader: tokio::task::JoinHandle>, + }, + None, +} + +#[must_use] +pub enum BringupMore { + None, + Playback { + player_params: playback::PlayerParams, + }, +} + +pub async fn setup(mode: Mode) -> Result { + Ok(match mode { + Mode::Record { + log_file_path, + command_buffer_size, + } => { + let recorder::Bringup { + backend_handle, + pool_handle, + recorder_task, + } = recorder::setup(log_file_path, command_buffer_size).await?; + + let pool = BringupPool::Recorder(pool_handle); + let backend = BringupBackend::Recorder(backend_handle); + let tasks = BringupTasks::Recorder { + recorder: recorder_task, + }; + let more = BringupMore::None; + + Bringup { + pool, + backend, + tasks, + more, + } + } + Mode::Playback { + log_file_path, + pool_ingest_buffer_size, + loaded_log_items_buffer_size, + } => { + let playback::Bringup { + player_params, + pool_params, + loader_task, + } = playback::setup( + log_file_path, + loaded_log_items_buffer_size, + pool_ingest_buffer_size, + ) + .await?; + + let pool = BringupPool::Playback(pool_params); + let backend = BringupBackend::None; + let tasks = BringupTasks::Playback { + loader: loader_task, + }; + let more = BringupMore::Playback { player_params }; + + Bringup { + pool, + backend, + tasks, + more, + } + } + Mode::Off => Bringup { + pool: BringupPool::None, + backend: BringupBackend::None, + tasks: BringupTasks::None, + more: BringupMore::None, + }, + }) +} + +pub type VcrPool = Either< + waymark_vcr_recorder_worker_pool::Pool, + waymark_vcr_playback_worker_pool::Pool>, +>; +pub type VcrBackend = waymark_vcr_recorder_backend::Backend; + +pub type MaybeVcrPool = Either>; +pub type MaybeVcrBackend = Either>; + +pub fn pool(bringup: BringupPool, pool: Pool) -> MaybeVcrPool { + match bringup { + BringupPool::Recorder(recorder) => { + Either::Right(Either::Left(waymark_vcr_recorder_worker_pool::Pool { + inner: pool, + recorder, + })) + } + BringupPool::Playback(pool_params) => { + let playback::PoolParams { + execution_correlator, + pool_ingest_buffer_size, + } = pool_params; + + Either::Right(Either::Right(waymark_vcr_playback_worker_pool::Pool::new( + execution_correlator, + pool_ingest_buffer_size, + ))) + } + BringupPool::None => Either::Left(pool), + } +} + +pub fn backend(bringup: BringupBackend, backend: Backend) -> MaybeVcrBackend { + match bringup { + BringupBackend::Recorder(recorder) => { + Either::Right(waymark_vcr_recorder_backend::Backend { + inner: backend, + recorder, + }) + } + BringupBackend::None => Either::Left(backend), + } +} diff --git a/crates/lib/vcr-bringup/src/playback.rs b/crates/lib/vcr-bringup/src/playback.rs new file mode 100644 index 00000000..02f251c1 --- /dev/null +++ b/crates/lib/vcr-bringup/src/playback.rs @@ -0,0 +1,79 @@ +use std::{num::NonZeroUsize, sync::Arc}; + +#[must_use] +pub struct Bringup { + pub player_params: PlayerParams, + pub pool_params: PoolParams, + pub loader_task: tokio::task::JoinHandle>, +} + +#[must_use] +pub struct PlayerParams { + pub execution_correlator_prep: waymark_vcr_playback::execution_correlator::PrepHandle, + pub player_rx: waymark_vcr_playback::player::Receiver, +} + +#[must_use] +pub struct PoolParams { + pub execution_correlator: Arc, + pub pool_ingest_buffer_size: NonZeroUsize, +} + +pub async fn setup( + log_file_path: impl AsRef, + loaded_log_items_buffer_size: NonZeroUsize, + pool_ingest_buffer_size: NonZeroUsize, +) -> Result { + let file = tokio::fs::File::open(log_file_path).await?; + let reader = waymark_vcr_file::Reader::from(tokio::io::BufReader::new(file)); + + let (player_tx, player_rx) = tokio::sync::mpsc::channel(loaded_log_items_buffer_size.get()); + + let params = waymark_vcr_playback::loader::Params { reader, player_tx }; + let loader_task = tokio::spawn(waymark_vcr_playback::loader::run(params)); + + let execution_correlator = Arc::new(waymark_vcr_playback::ExecutionCorrelator::default()); + let execution_correlator_prep = execution_correlator.prep_handle(); + + let player_params = PlayerParams { + execution_correlator_prep, + player_rx, + }; + + let pool_params = PoolParams { + execution_correlator, + pool_ingest_buffer_size, + }; + + Ok(Bringup { + player_params, + loader_task, + pool_params, + }) +} + +#[must_use] +pub struct PlayerBringup { + pub player_task: + tokio::task::JoinHandle>, +} + +pub fn player(player_params: PlayerParams, backend: Backend) -> PlayerBringup +where + Backend: waymark_core_backend::CoreBackend, + Backend: waymark_workflow_registry_backend::WorkflowRegistryBackend, + Backend: Send + 'static, +{ + let PlayerParams { + execution_correlator_prep, + player_rx, + } = player_params; + + let params = waymark_vcr_playback::player::Params { + execution_correlator_prep, + backend, + player_rx, + }; + let player_task = tokio::spawn(waymark_vcr_playback::player::run(params)); + PlayerBringup { player_task } +} diff --git a/crates/lib/vcr-bringup/src/recorder.rs b/crates/lib/vcr-bringup/src/recorder.rs new file mode 100644 index 00000000..ac0383cf --- /dev/null +++ b/crates/lib/vcr-bringup/src/recorder.rs @@ -0,0 +1,34 @@ +use std::num::NonZeroUsize; + +#[must_use] +pub struct Bringup { + pub recorder_task: tokio::task::JoinHandle>, + pub pool_handle: waymark_vcr_recorder::pool::Handle, + pub backend_handle: waymark_vcr_recorder::backend::Handle, +} + +pub async fn setup( + log_file_path: impl AsRef, + command_buffer_size: NonZeroUsize, +) -> Result { + let recorder_handle = waymark_vcr_recorder::Handle::new(command_buffer_size); + + let file = tokio::fs::File::create_new(log_file_path).await?; + let writer = waymark_vcr_file::Writer::from(file); + + let pool_handle = recorder_handle.pool_handle(); + let backend_handle = recorder_handle.backend_handle(); + + let params = waymark_vcr_recorder::Params { + writer, + handle: recorder_handle, + }; + + let recorder_task = tokio::spawn(waymark_vcr_recorder::r#loop(params)); + + Ok(Bringup { + recorder_task, + pool_handle, + backend_handle, + }) +} diff --git a/crates/lib/vcr-core/Cargo.toml b/crates/lib/vcr-core/Cargo.toml new file mode 100644 index 00000000..eb3e1af8 --- /dev/null +++ b/crates/lib/vcr-core/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "waymark-vcr-core" +edition = "2024" +version.workspace = true +publish.workspace = true + +[dependencies] +waymark-ids = { workspace = true } +waymark-worker-core = { workspace = true } + +serde = { workspace = true, features = ["derive"] } diff --git a/crates/lib/vcr-core/src/lib.rs b/crates/lib/vcr-core/src/lib.rs new file mode 100644 index 00000000..e4855842 --- /dev/null +++ b/crates/lib/vcr-core/src/lib.rs @@ -0,0 +1,26 @@ +use waymark_ids::{ExecutionId, InstanceId}; + +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)] +pub struct CorrelationId { + pub executor_id: InstanceId, + pub execution_id: ExecutionId, + pub attempt_number: u32, +} + +impl CorrelationId { + pub fn from_request(value: &waymark_worker_core::ActionRequest) -> Self { + Self { + executor_id: value.executor_id, + execution_id: value.execution_id, + attempt_number: value.attempt_number, + } + } + + pub fn from_completion(value: &waymark_worker_core::ActionCompletion) -> Self { + Self { + executor_id: value.executor_id, + execution_id: value.execution_id, + attempt_number: value.attempt_number, + } + } +} diff --git a/crates/lib/vcr-file/Cargo.toml b/crates/lib/vcr-file/Cargo.toml new file mode 100644 index 00000000..fc7da120 --- /dev/null +++ b/crates/lib/vcr-file/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "waymark-vcr-file" +edition = "2024" +version.workspace = true +publish.workspace = true + +[dependencies] +waymark-dag = { workspace = true } +waymark-ids = { workspace = true } +waymark-jsonlines = { workspace = true } +waymark-runner-executor-core = { workspace = true } +waymark-vcr-core = { workspace = true } +waymark-worker-core = { workspace = true } + +serde = { workspace = true, features = ["derive"] } +serde_json = { workspace = true } +tokio = { workspace = true, features = ["fs"] } +uuid = { workspace = true } diff --git a/crates/lib/vcr-file/src/action.rs b/crates/lib/vcr-file/src/action.rs new file mode 100644 index 00000000..ff866463 --- /dev/null +++ b/crates/lib/vcr-file/src/action.rs @@ -0,0 +1,50 @@ +use std::collections::HashMap; + +use uuid::Uuid; +use waymark_runner_executor_core::UncheckedExecutionResult; +use waymark_vcr_core::CorrelationId; + +#[derive(Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub struct Params { + pub action_name: String, + pub module_name: Option, + pub kwargs: HashMap, + pub timeout_seconds: u32, +} + +pub fn deconstruct_request( + value: waymark_worker_core::ActionRequest, +) -> (CorrelationId, Params, Uuid) { + let waymark_worker_core::ActionRequest { + action_name, + module_name, + kwargs, + timeout_seconds, + dispatch_token, + attempt_number, + executor_id, + execution_id, + } = value; + + let correlation_id = CorrelationId { + executor_id, + execution_id, + attempt_number, + }; + + let params = Params { + action_name, + module_name, + kwargs, + timeout_seconds, + }; + + (correlation_id, params, dispatch_token) +} + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub struct LogItem { + pub execution_time: std::time::Duration, + pub params: Params, + pub result: UncheckedExecutionResult, +} diff --git a/crates/lib/vcr-file/src/instance.rs b/crates/lib/vcr-file/src/instance.rs new file mode 100644 index 00000000..beecd5f7 --- /dev/null +++ b/crates/lib/vcr-file/src/instance.rs @@ -0,0 +1,7 @@ +use waymark_ids::WorkflowVersionId; + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub struct LogItem { + pub workflow_version_id: WorkflowVersionId, + pub actions: Vec, +} diff --git a/crates/lib/vcr-file/src/lib.rs b/crates/lib/vcr-file/src/lib.rs new file mode 100644 index 00000000..51492d1b --- /dev/null +++ b/crates/lib/vcr-file/src/lib.rs @@ -0,0 +1,12 @@ +pub mod action; +pub mod instance; +pub mod workflow_version; + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub enum LogItem { + Instance(instance::LogItem), + WorkflowVersion(workflow_version::LogItem), +} + +pub type Reader = waymark_jsonlines::Reader, LogItem>; +pub type Writer = waymark_jsonlines::Writer; diff --git a/crates/lib/vcr-file/src/workflow_version.rs b/crates/lib/vcr-file/src/workflow_version.rs new file mode 100644 index 00000000..15472f24 --- /dev/null +++ b/crates/lib/vcr-file/src/workflow_version.rs @@ -0,0 +1,11 @@ +use waymark_ids::WorkflowVersionId; + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub struct LogItem { + pub id: WorkflowVersionId, + pub workflow_name: String, + pub workflow_version: String, + pub ir_hash: String, + pub program_proto: Vec, + pub concurrent: bool, +} diff --git a/crates/lib/vcr-playback-worker-pool-core/Cargo.toml b/crates/lib/vcr-playback-worker-pool-core/Cargo.toml new file mode 100644 index 00000000..15df6abe --- /dev/null +++ b/crates/lib/vcr-playback-worker-pool-core/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "waymark-vcr-playback-worker-pool-core" +edition = "2024" +version.workspace = true +publish.workspace = true + +[dependencies] +waymark-worker-core = { workspace = true } diff --git a/crates/lib/vcr-playback-worker-pool-core/src/execution_correlator.rs b/crates/lib/vcr-playback-worker-pool-core/src/execution_correlator.rs new file mode 100644 index 00000000..010ae71b --- /dev/null +++ b/crates/lib/vcr-playback-worker-pool-core/src/execution_correlator.rs @@ -0,0 +1,26 @@ +use std::ops::Deref; + +use waymark_worker_core::{ActionCompletion, ActionRequest}; + +pub struct CorrelatedActionCompletion { + pub completion: ActionCompletion, + pub delay: std::time::Duration, +} + +pub trait ExecutionCorrelator { + type Error; + + fn correlate(&self, request: ActionRequest) -> Result; +} + +impl ExecutionCorrelator for T +where + T: Deref, + ::Target: self::ExecutionCorrelator, +{ + type Error = <::Target as self::ExecutionCorrelator>::Error; + + fn correlate(&self, request: ActionRequest) -> Result { + ::deref(self).correlate(request) + } +} diff --git a/crates/lib/vcr-playback-worker-pool-core/src/lib.rs b/crates/lib/vcr-playback-worker-pool-core/src/lib.rs new file mode 100644 index 00000000..524e9b9f --- /dev/null +++ b/crates/lib/vcr-playback-worker-pool-core/src/lib.rs @@ -0,0 +1,3 @@ +pub mod execution_correlator; + +pub use self::execution_correlator::ExecutionCorrelator; diff --git a/crates/lib/vcr-playback-worker-pool/Cargo.toml b/crates/lib/vcr-playback-worker-pool/Cargo.toml new file mode 100644 index 00000000..82850fe8 --- /dev/null +++ b/crates/lib/vcr-playback-worker-pool/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "waymark-vcr-playback-worker-pool" +edition = "2024" +version.workspace = true +publish.workspace = true + +[dependencies] +waymark-vcr-playback-worker-pool-core = { workspace = true } +waymark-worker-core = { workspace = true } + +futures-util = { workspace = true } +nonempty-collections = { workspace = true } +tokio = { workspace = true, features = ["macros"] } +tokio-util = { workspace = true, features = ["time"] } diff --git a/crates/lib/vcr-playback-worker-pool/src/buffered_ingest_completion_queue.rs b/crates/lib/vcr-playback-worker-pool/src/buffered_ingest_completion_queue.rs new file mode 100644 index 00000000..56b81184 --- /dev/null +++ b/crates/lib/vcr-playback-worker-pool/src/buffered_ingest_completion_queue.rs @@ -0,0 +1,70 @@ +use std::num::NonZeroUsize; + +use nonempty_collections::NEVec; +use waymark_worker_core::ActionCompletion; + +#[derive(Debug)] +pub struct IngestHandle { + tx: tokio::sync::mpsc::Sender<(ActionCompletion, tokio::time::Instant)>, +} + +#[derive(Debug)] +pub struct PollHandle { + rx: tokio::sync::mpsc::Receiver<(ActionCompletion, tokio::time::Instant)>, + queue: super::completion_queue::CompletionQueue, +} + +impl IngestHandle { + pub fn queue(&self, completion: ActionCompletion, delay: std::time::Duration) { + let at = tokio::time::Instant::now() + .checked_add(delay) + .expect("instant overflow"); + + self.tx + .try_send((completion, at)) + .expect("the ingest buffer capacity exceeded"); + } +} + +impl PollHandle { + pub async fn poll_completions(&mut self) -> Option> { + let mut poll_ingest_items = true; + + loop { + let ingest_item = { + let inner_poll_fut = self.queue.poll_completions(); + let inner_poll_fut = std::pin::pin!(inner_poll_fut); + + let ingest_fut = self.rx.recv(); + let ingest_fut = std::pin::pin!(ingest_fut); + + tokio::select! { + polled_completions = inner_poll_fut => { + return polled_completions; + } + maybe_ingest_item = ingest_fut, if poll_ingest_items => { + let Some(ingest_item) = maybe_ingest_item else { + poll_ingest_items = false; + continue; + }; + ingest_item + } + } + }; + + let (completion_to_queue, at) = ingest_item; + + let delay = at.saturating_duration_since(tokio::time::Instant::now()); + + self.queue.queue(completion_to_queue, delay); + } + } +} + +pub fn new( + ingest_buffer: NonZeroUsize, + queue: super::completion_queue::CompletionQueue, +) -> (IngestHandle, PollHandle) { + let (tx, rx) = tokio::sync::mpsc::channel(ingest_buffer.get()); + (IngestHandle { tx }, PollHandle { rx, queue }) +} diff --git a/crates/lib/vcr-playback-worker-pool/src/completion_queue.rs b/crates/lib/vcr-playback-worker-pool/src/completion_queue.rs new file mode 100644 index 00000000..2d7137aa --- /dev/null +++ b/crates/lib/vcr-playback-worker-pool/src/completion_queue.rs @@ -0,0 +1,39 @@ +use futures_util::{FutureExt as _, StreamExt}; +use nonempty_collections::NEVec; +use waymark_worker_core::ActionCompletion; + +#[derive(Debug, Default)] +pub struct CompletionQueue { + queue: tokio_util::time::DelayQueue, +} + +impl CompletionQueue { + pub fn queue(&mut self, completion: ActionCompletion, delay: std::time::Duration) { + self.queue.insert(completion, delay); + } + + pub async fn poll_completions(&mut self) -> Option> { + let first = self.queue.next().await?; + let first = first.into_inner(); + + let mut vec = NEVec::new(first); + + loop { + let maybe_now = self.queue.next().now_or_never(); + + let Some(maybe_next) = maybe_now else { + break; + }; + + let Some(next) = maybe_next else { + break; + }; + + let next = next.into_inner(); + + vec.push(next); + } + + Some(vec) + } +} diff --git a/crates/lib/vcr-playback-worker-pool/src/lib.rs b/crates/lib/vcr-playback-worker-pool/src/lib.rs new file mode 100644 index 00000000..0ac9dcf8 --- /dev/null +++ b/crates/lib/vcr-playback-worker-pool/src/lib.rs @@ -0,0 +1,61 @@ +mod buffered_ingest_completion_queue; +mod completion_queue; + +use std::num::NonZeroUsize; + +use nonempty_collections::NEVec; + +use waymark_worker_core::{ActionCompletion, ActionRequest, BaseWorkerPool, WorkerPoolError}; + +#[derive(Debug)] +pub struct Pool { + execution_correlator: ExecutionCorrelator, + completion_queue_ingester: buffered_ingest_completion_queue::IngestHandle, + completion_queue_poller: tokio::sync::Mutex, +} + +impl Pool { + pub fn new(execution_correlator: ExecutionCorrelator, ingest_buffer: NonZeroUsize) -> Self { + let queue = self::completion_queue::CompletionQueue::default(); + let (completion_queue_ingester, completion_queue_poller) = + self::buffered_ingest_completion_queue::new(ingest_buffer, queue); + let completion_queue_poller = tokio::sync::Mutex::new(completion_queue_poller); + Self { + execution_correlator, + completion_queue_ingester, + completion_queue_poller, + } + } +} + +impl BaseWorkerPool for Pool +where + ExecutionCorrelator: waymark_vcr_playback_worker_pool_core::ExecutionCorrelator, + ExecutionCorrelator: Sync, + ::Error: + core::fmt::Display, +{ + fn queue(&self, request: ActionRequest) -> Result<(), WorkerPoolError> { + let correlated_action_completion = + self.execution_correlator + .correlate(request) + .map_err(|error| { + WorkerPoolError::new( + "VcrPlaybackQueueError::ExecutionCorrelation", + format!("vcr playback: unable to find a correlated execution: {error}"), + ) + })?; + + let waymark_vcr_playback_worker_pool_core::execution_correlator::CorrelatedActionCompletion { completion, delay } = + correlated_action_completion; + + self.completion_queue_ingester.queue(completion, delay); + + Ok(()) + } + + async fn poll_complete(&self) -> Option> { + let mut completion_queue_poller = self.completion_queue_poller.lock().await; + completion_queue_poller.poll_completions().await + } +} diff --git a/crates/lib/vcr-playback/Cargo.toml b/crates/lib/vcr-playback/Cargo.toml new file mode 100644 index 00000000..d0de283e --- /dev/null +++ b/crates/lib/vcr-playback/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "waymark-vcr-playback" +edition = "2024" +version.workspace = true +publish.workspace = true + +[dependencies] +waymark-backends-core = { workspace = true } +waymark-core-backend = { workspace = true } +waymark-ids = { workspace = true } +waymark-jsonlines = { workspace = true } +waymark-runner-executor-core = { workspace = true } +waymark-runner-state = { workspace = true } +waymark-vcr-core = { workspace = true } +waymark-vcr-file = { workspace = true } +waymark-vcr-playback-worker-pool-core = { workspace = true } +waymark-worker-core = { workspace = true } +waymark-workflow-registry-backend = { workspace = true } + +dashmap = { workspace = true } +thiserror = { workspace = true } +tokio = { workspace = true, features = ["sync"] } +tracing = { workspace = true } +uuid = { workspace = true } diff --git a/crates/lib/vcr-playback/src/execution_correlator.rs b/crates/lib/vcr-playback/src/execution_correlator.rs new file mode 100644 index 00000000..70573f93 --- /dev/null +++ b/crates/lib/vcr-playback/src/execution_correlator.rs @@ -0,0 +1,97 @@ +use std::sync::Arc; + +use dashmap::DashMap; +use waymark_runner_executor_core::UncheckedExecutionResult; +use waymark_vcr_core::CorrelationId; +use waymark_vcr_playback_worker_pool_core::execution_correlator::CorrelatedActionCompletion; +use waymark_worker_core::ActionRequest; + +#[derive(Debug)] +struct PreparedCorrelation { + pub delay: std::time::Duration, + pub result: UncheckedExecutionResult, + pub params: waymark_vcr_file::action::Params, +} + +#[derive(Debug, Default)] +pub struct ExecutionCorrelator { + prepared_correlations: DashMap, +} + +#[derive(Debug)] +pub struct PrepHandle(Arc); + +impl ExecutionCorrelator { + pub fn prep_handle(self: &Arc) -> PrepHandle { + PrepHandle(Arc::clone(self)) + } +} + +impl PrepHandle { + pub fn prepare_correlation( + &self, + id: CorrelationId, + log_item: waymark_vcr_file::action::LogItem, + ) { + let waymark_vcr_file::action::LogItem { + execution_time, + params, + result, + } = log_item; + + let prepared_correlation = PreparedCorrelation { + delay: execution_time, + params, + result, + }; + + self.0 + .prepared_correlations + .insert(id, prepared_correlation); + } +} + +#[derive(Debug, thiserror::Error)] +#[error("execution to correlate not found")] +pub struct NotFoundError; + +impl waymark_vcr_playback_worker_pool_core::ExecutionCorrelator for ExecutionCorrelator { + type Error = NotFoundError; + + fn correlate(&self, request: ActionRequest) -> Result { + let (id, params, dispatch_token) = waymark_vcr_file::action::deconstruct_request(request); + let (_, prepared_correlation) = self + .prepared_correlations + .remove(&id) + .ok_or(NotFoundError)?; + + let PreparedCorrelation { + delay, + params: expected_params, + result, + } = prepared_correlation; + + if expected_params != params { + // We want to pay attention to this, so this is a panic for now. + panic!( + "correlation assumption violated;\nexpected:\n{params:?}\nactual:\n{expected_params:?}" + ); + } + + let CorrelationId { + executor_id, + execution_id, + attempt_number, + } = id; + + let completion = waymark_worker_core::ActionCompletion { + executor_id, + execution_id, + attempt_number, + dispatch_token, + result, + }; + + Ok(CorrelatedActionCompletion { completion, delay }) + } +} diff --git a/crates/lib/vcr-playback/src/lib.rs b/crates/lib/vcr-playback/src/lib.rs new file mode 100644 index 00000000..4d37d0d4 --- /dev/null +++ b/crates/lib/vcr-playback/src/lib.rs @@ -0,0 +1,5 @@ +pub mod execution_correlator; +pub mod loader; +pub mod player; + +pub use self::execution_correlator::ExecutionCorrelator; diff --git a/crates/lib/vcr-playback/src/loader.rs b/crates/lib/vcr-playback/src/loader.rs new file mode 100644 index 00000000..f908f18e --- /dev/null +++ b/crates/lib/vcr-playback/src/loader.rs @@ -0,0 +1,25 @@ +pub struct Params { + pub reader: waymark_vcr_file::Reader, + pub player_tx: crate::player::Sender, +} + +pub async fn run(params: Params) -> Result<(), waymark_jsonlines::ReadError> { + let Params { + mut reader, + player_tx, + } = params; + + loop { + let Ok(permit) = player_tx.reserve().await else { + break; + }; + + let Some(item) = reader.next_value().await? else { + break; + }; + + permit.send(item); + } + + Ok(()) +} diff --git a/crates/lib/vcr-playback/src/player.rs b/crates/lib/vcr-playback/src/player.rs new file mode 100644 index 00000000..4b9f98da --- /dev/null +++ b/crates/lib/vcr-playback/src/player.rs @@ -0,0 +1,143 @@ +use std::collections::HashMap; + +use waymark_ids::WorkflowVersionId; +use waymark_vcr_core::CorrelationId; +pub use waymark_vcr_file::LogItem; + +pub type Sender = tokio::sync::mpsc::Sender; +pub type Receiver = tokio::sync::mpsc::Receiver; + +pub struct Params { + pub execution_correlator_prep: crate::execution_correlator::PrepHandle, + pub backend: Backend, + pub player_rx: Receiver, +} + +type RemappedWorkflowVersionIdsMap = HashMap; + +pub async fn run( + params: Params, +) -> Result<(), waymark_backends_core::BackendError> +where + Backend: waymark_core_backend::CoreBackend, + Backend: waymark_workflow_registry_backend::WorkflowRegistryBackend, +{ + let Params { + mut execution_correlator_prep, + mut backend, + mut player_rx, + } = params; + + let mut remapped_workflow_version_ids = Default::default(); + + loop { + let Some(log_item) = player_rx.recv().await else { + break; + }; + + match log_item { + waymark_vcr_file::LogItem::Instance(log_item) => { + handle_instance( + &mut backend, + &remapped_workflow_version_ids, + &mut execution_correlator_prep, + log_item, + ) + .await?; + } + waymark_vcr_file::LogItem::WorkflowVersion(log_item) => { + handle_workflow_version(&mut backend, &mut remapped_workflow_version_ids, log_item) + .await?; + } + }; + } + + Ok(()) +} + +async fn handle_instance( + backend: &mut Backend, + remapped_workflow_version_ids: &RemappedWorkflowVersionIdsMap, + execution_correlator_prep: &mut crate::execution_correlator::PrepHandle, + log_item: waymark_vcr_file::instance::LogItem, +) -> Result<(), waymark_backends_core::BackendError> +where + Backend: waymark_core_backend::CoreBackend, +{ + let waymark_vcr_file::instance::LogItem { + workflow_version_id, + actions, + } = log_item; + + let Some(workflow_version_id) = remapped_workflow_version_ids.get(&workflow_version_id) else { + tracing::warn!( + %workflow_version_id, + "unable to locate remapped workflow version id for the queued instance" + ); + return Ok(()); + }; + + let executor_id = waymark_ids::InstanceId::new_uuid_v4(); + + let first_action_execution_id = waymark_ids::ExecutionId::new_uuid_v4(); + + let queued_instance = waymark_core_backend::QueuedInstance { + workflow_version_id: *workflow_version_id, + schedule_id: None, + entry_node: first_action_execution_id, + state: waymark_runner_state::RunnerState::dummy(), + action_results: Default::default(), + instance_id: executor_id, + scheduled_at: None, + }; + + backend.queue_instances(&[queued_instance]).await?; + + for log_item in actions { + let correlation_id = CorrelationId { + executor_id, + execution_id: first_action_execution_id, // TODO: figure out how to predict the subsequent execution IDs. + attempt_number: 0, // TODO: keep as part of log item + }; + execution_correlator_prep.prepare_correlation(correlation_id, log_item); + } + + Ok(()) +} + +async fn handle_workflow_version( + backend: &mut Backend, + remapped_workflow_version_ids: &mut RemappedWorkflowVersionIdsMap, + log_item: waymark_vcr_file::workflow_version::LogItem, +) -> Result<(), waymark_backends_core::BackendError> +where + Backend: waymark_workflow_registry_backend::WorkflowRegistryBackend, +{ + let waymark_vcr_file::workflow_version::LogItem { + id, + workflow_name, + workflow_version, + ir_hash, + program_proto, + concurrent, + } = log_item; + + let std::collections::hash_map::Entry::Vacant(vacant) = remapped_workflow_version_ids.entry(id) + else { + return Ok(()); + }; + + let remapped_id = backend + .upsert_workflow_version(&waymark_workflow_registry_backend::WorkflowRegistration { + workflow_name, + workflow_version, + ir_hash, + program_proto, + concurrent, + }) + .await?; + + vacant.insert(remapped_id); + + Ok(()) +} diff --git a/crates/lib/vcr-recorder-backend/Cargo.toml b/crates/lib/vcr-recorder-backend/Cargo.toml new file mode 100644 index 00000000..c7a2b144 --- /dev/null +++ b/crates/lib/vcr-recorder-backend/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "waymark-vcr-recorder-backend" +edition = "2024" +version.workspace = true +publish.workspace = true + +[dependencies] +waymark-backends-core = { workspace = true } +waymark-core-backend = { workspace = true } +waymark-ids = { workspace = true } +waymark-vcr-recorder = { workspace = true } +waymark-workflow-registry-backend = { workspace = true } + +nonempty-collections = { workspace = true } +tracing = { workspace = true } +uuid = { workspace = true } diff --git a/crates/lib/vcr-recorder-backend/src/lib.rs b/crates/lib/vcr-recorder-backend/src/lib.rs new file mode 100644 index 00000000..a396d9bd --- /dev/null +++ b/crates/lib/vcr-recorder-backend/src/lib.rs @@ -0,0 +1,129 @@ +use nonempty_collections::NEVec; +use waymark_ids::WorkflowVersionId; + +pub struct Backend { + pub inner: InnerBackend, + pub recorder: waymark_vcr_recorder::backend::Handle, +} + +impl waymark_core_backend::CoreBackend for Backend +where + InnerBackend: waymark_core_backend::CoreBackend, + InnerBackend: Sync, +{ + fn save_graphs<'a>( + &'a self, + claim: waymark_core_backend::LockClaim, + graphs: &'a [waymark_core_backend::GraphUpdate], + ) -> impl Future< + Output = waymark_backends_core::BackendResult< + Vec, + >, + > { + self.inner.save_graphs(claim, graphs) + } + + fn save_actions_done<'a>( + &'a self, + actions: &'a [waymark_core_backend::ActionDone], + ) -> impl Future> { + self.inner.save_actions_done(actions) + } + + type PollQueuedInstancesError = InnerBackend::PollQueuedInstancesError; + + async fn poll_queued_instances( + &self, + size: std::num::NonZeroUsize, + claim: waymark_core_backend::LockClaim, + ) -> Result, Self::PollQueuedInstancesError> { + let polled_instances = self.inner.poll_queued_instances(size, claim).await?; + + for queued_instance in &polled_instances { + if let Err(error) = self.recorder.handle_seen_instance(queued_instance.clone()) { + tracing::warn!(message = "unable to handle seen instance", ?error); + break; + } + } + + Ok(polled_instances) + } + + fn refresh_instance_locks<'a>( + &'a self, + claim: waymark_core_backend::LockClaim, + instance_ids: &'a [waymark_ids::InstanceId], + ) -> impl Future< + Output = waymark_backends_core::BackendResult< + Vec, + >, + > { + self.inner.refresh_instance_locks(claim, instance_ids) + } + + fn release_instance_locks<'a>( + &'a self, + lock_uuid: waymark_ids::LockId, + instance_ids: &'a [waymark_ids::InstanceId], + ) -> impl Future> { + self.inner.release_instance_locks(lock_uuid, instance_ids) + } + + async fn save_instances_done<'a>( + &'a self, + instances: &'a [waymark_core_backend::InstanceDone], + ) -> waymark_backends_core::BackendResult<()> { + for instance in instances { + if let Err(error) = self + .recorder + .handle_seen_instance_done(instance.executor_id) + { + tracing::warn!(message = "unable to handle seen instance done", ?error); + break; + } + } + + self.inner.save_instances_done(instances).await + } + + fn queue_instances<'a>( + &'a self, + instances: &'a [waymark_core_backend::QueuedInstance], + ) -> impl Future> { + self.inner.queue_instances(instances) + } +} +impl waymark_workflow_registry_backend::WorkflowRegistryBackend + for Backend +where + InnerBackend: waymark_workflow_registry_backend::WorkflowRegistryBackend, + InnerBackend: Sync, +{ + fn upsert_workflow_version<'a>( + &'a self, + registration: &'a waymark_workflow_registry_backend::WorkflowRegistration, + ) -> impl Future> + Send + 'a + { + self.inner.upsert_workflow_version(registration) + } + + async fn get_workflow_versions<'a>( + &'a self, + ids: &'a [WorkflowVersionId], + ) -> waymark_backends_core::BackendResult> + { + let workflow_versions = self.inner.get_workflow_versions(ids).await?; + + for workflow_version in &workflow_versions { + if let Err(error) = self + .recorder + .handle_seen_workflow_version(workflow_version.clone()) + { + tracing::warn!(message = "unable to handle seen workflow version", ?error); + break; + } + } + + Ok(workflow_versions) + } +} diff --git a/crates/lib/vcr-recorder-worker-pool/Cargo.toml b/crates/lib/vcr-recorder-worker-pool/Cargo.toml new file mode 100644 index 00000000..f0627d32 --- /dev/null +++ b/crates/lib/vcr-recorder-worker-pool/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "waymark-vcr-recorder-worker-pool" +edition = "2024" +version.workspace = true +publish.workspace = true + +[dependencies] +waymark-vcr-recorder = { workspace = true } +waymark-worker-core = { workspace = true } + +nonempty-collections = { workspace = true } +tracing = { workspace = true } diff --git a/crates/lib/vcr-recorder-worker-pool/src/lib.rs b/crates/lib/vcr-recorder-worker-pool/src/lib.rs new file mode 100644 index 00000000..1d7858d0 --- /dev/null +++ b/crates/lib/vcr-recorder-worker-pool/src/lib.rs @@ -0,0 +1,40 @@ +use nonempty_collections::NEVec; + +use waymark_worker_core::{ActionCompletion, ActionRequest, BaseWorkerPool, WorkerPoolError}; + +#[derive(Debug)] +pub struct Pool { + pub inner: InnerPool, + pub recorder: waymark_vcr_recorder::pool::Handle, +} + +impl BaseWorkerPool for Pool +where + InnerPool: BaseWorkerPool, + InnerPool: Sync, +{ + fn launch(&self) -> impl Future> + '_ { + self.inner.launch() + } + + fn queue(&self, request: ActionRequest) -> Result<(), WorkerPoolError> { + tracing::info!(?request, message = "enqueue"); + if let Err(error) = self.recorder.request(request.clone()) { + tracing::warn!(message = "unable to record action request", ?error); + } + self.inner.queue(request) + } + + async fn poll_complete(&self) -> Option> { + let completions = self.inner.poll_complete().await?; + + for completion in &completions { + if let Err(error) = self.recorder.completion(completion.clone()) { + tracing::warn!(message = "unable to record completion", ?error); + break; + } + } + + Some(completions) + } +} diff --git a/crates/lib/vcr-recorder/Cargo.toml b/crates/lib/vcr-recorder/Cargo.toml new file mode 100644 index 00000000..95963e2b --- /dev/null +++ b/crates/lib/vcr-recorder/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "waymark-vcr-recorder" +edition = "2024" +version.workspace = true +publish.workspace = true + +[dependencies] +waymark-core-backend = { workspace = true } +waymark-ids = { workspace = true } +waymark-jsonlines = { workspace = true } +waymark-vcr-core = { workspace = true } +waymark-vcr-file = { workspace = true } +waymark-worker-core = { workspace = true } +waymark-workflow-registry-backend = { workspace = true } + +thiserror = { workspace = true } +tokio = { workspace = true, features = ["sync", "io-util"] } +tracing = { workspace = true } +uuid = { workspace = true } diff --git a/crates/lib/vcr-recorder/src/action.rs b/crates/lib/vcr-recorder/src/action.rs new file mode 100644 index 00000000..afe6466a --- /dev/null +++ b/crates/lib/vcr-recorder/src/action.rs @@ -0,0 +1,73 @@ +use std::collections::HashMap; + +use uuid::Uuid; + +#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] +struct Key { + pub correlation_id: waymark_vcr_core::CorrelationId, + pub dispatch_token: Uuid, +} + +#[derive(Debug)] +struct IncompleteActionContext { + pub queued_at: std::time::Instant, + pub params: waymark_vcr_file::action::Params, +} + +#[derive(Debug, Default)] +pub struct Correlator { + incomplete_action_contexts: HashMap, +} + +impl Correlator { + pub fn insert_request(&mut self, request: waymark_worker_core::ActionRequest) { + let (correlation_id, params, dispatch_token) = + waymark_vcr_file::action::deconstruct_request(request); + + let key = Key { + correlation_id, + dispatch_token, + }; + + let context = IncompleteActionContext { + queued_at: std::time::Instant::now(), + params, + }; + + self.incomplete_action_contexts.insert(key, context); + } + + pub fn correlate_completion( + &mut self, + completion: waymark_worker_core::ActionCompletion, + ) -> Option { + let waymark_worker_core::ActionCompletion { + executor_id, + execution_id, + attempt_number, + dispatch_token, + result, + } = completion; + + let correlation_id = waymark_vcr_core::CorrelationId { + executor_id, + execution_id, + attempt_number, + }; + + let key = Key { + correlation_id, + dispatch_token, + }; + + let context = self.incomplete_action_contexts.remove(&key)?; + + let IncompleteActionContext { queued_at, params } = context; + + Some(waymark_vcr_file::action::LogItem { + execution_time: queued_at.elapsed(), + params, + result, + }) + } +} diff --git a/crates/lib/vcr-recorder/src/backend.rs b/crates/lib/vcr-recorder/src/backend.rs new file mode 100644 index 00000000..3dc862e6 --- /dev/null +++ b/crates/lib/vcr-recorder/src/backend.rs @@ -0,0 +1,54 @@ +use waymark_workflow_registry_backend::WorkflowVersion; + +use crate::{Command, HandleError}; + +#[derive(Debug, Clone)] +pub struct Handle { + tx: tokio::sync::mpsc::Sender, +} + +impl Handle { + pub fn handle_seen_instance( + &self, + queued_instance: waymark_core_backend::QueuedInstance, + ) -> Result<(), HandleError> { + self.tx + .try_send(Command::OpenInstanceLog(queued_instance)) + .map_err(|error| match error { + tokio::sync::mpsc::error::TrySendError::Closed(_) => HandleError::RecorderDropped, + tokio::sync::mpsc::error::TrySendError::Full(_) => HandleError::NoBufferCapacity, + }) + } + + pub fn handle_seen_instance_done( + &self, + instance_id: waymark_ids::InstanceId, + ) -> Result<(), HandleError> { + self.tx + .try_send(Command::CompleteInstanceLog(instance_id)) + .map_err(|error| match error { + tokio::sync::mpsc::error::TrySendError::Closed(_) => HandleError::RecorderDropped, + tokio::sync::mpsc::error::TrySendError::Full(_) => HandleError::NoBufferCapacity, + }) + } + + pub fn handle_seen_workflow_version( + &self, + workflow_version: WorkflowVersion, + ) -> Result<(), HandleError> { + self.tx + .try_send(Command::RecordWorkflowVersion(workflow_version)) + .map_err(|error| match error { + tokio::sync::mpsc::error::TrySendError::Closed(_) => HandleError::RecorderDropped, + tokio::sync::mpsc::error::TrySendError::Full(_) => HandleError::NoBufferCapacity, + }) + } +} + +impl crate::Handle { + pub fn backend_handle(&self) -> Handle { + Handle { + tx: self.tx.clone(), + } + } +} diff --git a/crates/lib/vcr-recorder/src/instance.rs b/crates/lib/vcr-recorder/src/instance.rs new file mode 100644 index 00000000..a74d381e --- /dev/null +++ b/crates/lib/vcr-recorder/src/instance.rs @@ -0,0 +1,111 @@ +use std::collections::HashMap; + +use waymark_ids::{InstanceId, WorkflowVersionId}; + +#[derive(Debug, Default)] +pub struct Bufferrer { + open_instances: HashMap, +} + +#[derive(Debug)] +struct OpenInstance { + pub actions_correlator: crate::action::Correlator, + pub action_log_items: Vec, + pub workflow_version_id: WorkflowVersionId, +} + +#[derive(Debug, thiserror::Error)] +pub enum OpenInstanceLogError { + /// Returned when the instance log is already open currently; + /// this is usually not an issue, and can happen by design since + /// we don't expect the ingest instances to be filtered. + /// Users of this API can simply ignore this error as essentially + /// registers the queued instance for logging in an idempotent fashion. + #[error("instance log already opened")] + AlreadyOpened, +} + +#[derive(Debug, thiserror::Error)] +#[error("instance not found")] +pub struct InstanceNotFound; + +#[derive(Debug, thiserror::Error)] +pub enum RecordActionCompletionError { + #[error("instance not found")] + InstanceNotFound, + + #[error("action not found")] + ActionNotFound, +} + +impl Bufferrer { + pub fn open_instance_log(&mut self, queued_instance: waymark_core_backend::QueuedInstance) { + let open_instance = OpenInstance { + actions_correlator: Default::default(), + action_log_items: Default::default(), + workflow_version_id: queued_instance.workflow_version_id, + }; + + let entry = self.open_instances.entry(queued_instance.instance_id); + + let std::collections::hash_map::Entry::Vacant(entry) = entry else { + // Do not replace already opened instances. + return; + }; + + entry.insert(open_instance); + } + + pub fn record_action_request( + &mut self, + action_request: waymark_worker_core::ActionRequest, + ) -> Result<(), InstanceNotFound> { + let open_instance = self + .open_instances + .get_mut(&action_request.executor_id) + .ok_or(InstanceNotFound)?; + + open_instance + .actions_correlator + .insert_request(action_request); + + Ok(()) + } + + pub fn record_action_completion( + &mut self, + action_completion: waymark_worker_core::ActionCompletion, + ) -> Result<(), RecordActionCompletionError> { + let open_instance = self + .open_instances + .get_mut(&action_completion.executor_id) + .ok_or(RecordActionCompletionError::InstanceNotFound)?; + + let log_item = open_instance + .actions_correlator + .correlate_completion(action_completion) + .ok_or(RecordActionCompletionError::ActionNotFound)?; + + open_instance.action_log_items.push(log_item); + + Ok(()) + } + + pub fn complete_instance_log( + &mut self, + id: waymark_ids::InstanceId, + ) -> Result { + let open_instance = self.open_instances.remove(&id).ok_or(InstanceNotFound)?; + + let OpenInstance { + actions_correlator: _, + action_log_items: actions, + workflow_version_id, + } = open_instance; + + Ok(waymark_vcr_file::instance::LogItem { + workflow_version_id, + actions, + }) + } +} diff --git a/crates/lib/vcr-recorder/src/lib.rs b/crates/lib/vcr-recorder/src/lib.rs new file mode 100644 index 00000000..e487a88b --- /dev/null +++ b/crates/lib/vcr-recorder/src/lib.rs @@ -0,0 +1,136 @@ +use std::num::NonZeroUsize; + +use tokio::io::AsyncWriteExt as _; + +pub mod action; +pub mod backend; +pub mod instance; +pub mod pool; + +enum Command { + OpenInstanceLog(waymark_core_backend::QueuedInstance), + RecordActionRequest(waymark_worker_core::ActionRequest), + RecordActionCompletion(waymark_worker_core::ActionCompletion), + CompleteInstanceLog(waymark_ids::InstanceId), + RecordWorkflowVersion(waymark_workflow_registry_backend::WorkflowVersion), +} + +#[derive(Debug)] +pub struct Handle { + tx: tokio::sync::mpsc::Sender, + rx: tokio::sync::mpsc::Receiver, +} + +pub struct Params { + pub writer: waymark_vcr_file::Writer, + pub handle: Handle, +} + +#[derive(Debug)] +pub enum Error { + Write(waymark_jsonlines::WriteError), + Flush(std::io::Error), +} + +pub async fn r#loop(params: Params) -> Result<(), Error> { + let Params { + mut writer, + handle: Handle { mut rx, tx: _ }, + } = params; + + let mut instance_buferrer = instance::Bufferrer::default(); + + let mut commands = Vec::with_capacity(1024); + let mut prepared_log_items = Vec::with_capacity(commands.capacity()); + + loop { + commands.clear(); + let limit = commands.capacity(); + let read = rx.recv_many(&mut commands, limit).await; + if read == 0 { + break; + } + + for command in commands.drain(..) { + let log_item = match command { + Command::OpenInstanceLog(queued_instance) => { + instance_buferrer.open_instance_log(queued_instance); + continue; + } + Command::RecordActionRequest(action_request) => { + let result = instance_buferrer.record_action_request(action_request); + if let Err(error) = result { + tracing::warn!(?error, "unable to record action request"); + }; + continue; + } + Command::RecordActionCompletion(action_completion) => { + let result = instance_buferrer.record_action_completion(action_completion); + if let Err(error) = result { + tracing::warn!(?error, "unable to record action completion"); + }; + continue; + } + Command::CompleteInstanceLog(instance_id) => { + let instance_log_item = + match instance_buferrer.complete_instance_log(instance_id) { + Ok(val) => val, + Err(error) => { + tracing::warn!(?error, "unable to complete instance log"); + continue; + } + }; + waymark_vcr_file::LogItem::Instance(instance_log_item) + } + Command::RecordWorkflowVersion(workflow_version) => { + let waymark_workflow_registry_backend::WorkflowVersion { + id, + workflow_name, + workflow_version, + ir_hash, + program_proto, + concurrent, + } = workflow_version; + + let workflow_version_log_item = waymark_vcr_file::workflow_version::LogItem { + id, + workflow_name, + workflow_version, + ir_hash, + program_proto, + concurrent, + }; + + waymark_vcr_file::LogItem::WorkflowVersion(workflow_version_log_item) + } + }; + prepared_log_items.push(log_item); + } + + for item in prepared_log_items.drain(..) { + writer.write_value(&item).await.map_err(Error::Write)?; + } + + writer.writer.flush().await.map_err(Error::Flush)?; + } + + writer.writer.flush().await.map_err(Error::Flush)?; + + Ok(()) +} + +impl Handle { + pub fn new(command_buffer: NonZeroUsize) -> Self { + let (tx, rx) = tokio::sync::mpsc::channel(command_buffer.get()); + Self { tx, rx } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum HandleError { + #[error("recorder is dropped")] + RecorderDropped, + + #[error("no buffer capacity")] + NoBufferCapacity, +} diff --git a/crates/lib/vcr-recorder/src/pool.rs b/crates/lib/vcr-recorder/src/pool.rs new file mode 100644 index 00000000..613a7cd2 --- /dev/null +++ b/crates/lib/vcr-recorder/src/pool.rs @@ -0,0 +1,37 @@ +use crate::{Command, HandleError}; + +#[derive(Debug, Clone)] +pub struct Handle { + tx: tokio::sync::mpsc::Sender, +} + +impl Handle { + pub fn request(&self, request: waymark_worker_core::ActionRequest) -> Result<(), HandleError> { + self.tx + .try_send(Command::RecordActionRequest(request)) + .map_err(|error| match error { + tokio::sync::mpsc::error::TrySendError::Closed(_) => HandleError::RecorderDropped, + tokio::sync::mpsc::error::TrySendError::Full(_) => HandleError::NoBufferCapacity, + }) + } + + pub fn completion( + &self, + completion: waymark_worker_core::ActionCompletion, + ) -> Result<(), HandleError> { + self.tx + .try_send(Command::RecordActionCompletion(completion)) + .map_err(|error| match error { + tokio::sync::mpsc::error::TrySendError::Closed(_) => HandleError::RecorderDropped, + tokio::sync::mpsc::error::TrySendError::Full(_) => HandleError::NoBufferCapacity, + }) + } +} + +impl crate::Handle { + pub fn pool_handle(&self) -> Handle { + Handle { + tx: self.tx.clone(), + } + } +} diff --git a/crates/lib/workflow-registry-backend/Cargo.toml b/crates/lib/workflow-registry-backend/Cargo.toml index e9287a54..56e95f03 100644 --- a/crates/lib/workflow-registry-backend/Cargo.toml +++ b/crates/lib/workflow-registry-backend/Cargo.toml @@ -4,10 +4,15 @@ version.workspace = true publish.workspace = true edition = "2024" +[features] +default = ["either"] + [dependencies] waymark-backends-core = { workspace = true } waymark-ids = { workspace = true } +either = { workspace = true, optional = true } + [lib] test = false doctest = false diff --git a/crates/lib/workflow-registry-backend/src/either.rs b/crates/lib/workflow-registry-backend/src/either.rs new file mode 100644 index 00000000..869f1f0a --- /dev/null +++ b/crates/lib/workflow-registry-backend/src/either.rs @@ -0,0 +1,33 @@ +use either::Either; +use waymark_ids::WorkflowVersionId; + +use crate::WorkflowRegistryBackend; + +impl WorkflowRegistryBackend for Either +where + Left: WorkflowRegistryBackend, + Right: WorkflowRegistryBackend, +{ + fn upsert_workflow_version<'a>( + &'a self, + registration: &'a crate::WorkflowRegistration, + ) -> impl Future> + Send + 'a + { + match self { + Either::Left(inner) => Either::Left(inner.upsert_workflow_version(registration)), + Either::Right(inner) => Either::Right(inner.upsert_workflow_version(registration)), + } + } + + fn get_workflow_versions<'a>( + &'a self, + ids: &'a [WorkflowVersionId], + ) -> impl Future>> + + Send + + 'a { + match self { + Either::Left(inner) => Either::Left(inner.get_workflow_versions(ids)), + Either::Right(inner) => Either::Right(inner.get_workflow_versions(ids)), + } + } +} diff --git a/crates/lib/workflow-registry-backend/src/lib.rs b/crates/lib/workflow-registry-backend/src/lib.rs index 7ded2477..166c41d3 100644 --- a/crates/lib/workflow-registry-backend/src/lib.rs +++ b/crates/lib/workflow-registry-backend/src/lib.rs @@ -1,3 +1,6 @@ +#[cfg(feature = "either")] +mod either; + pub use waymark_backends_core::{BackendError, BackendResult}; use waymark_ids::WorkflowVersionId;