diff --git a/Cargo.lock b/Cargo.lock index 26c0d8ed..ecc0f852 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -706,6 +706,22 @@ dependencies = [ "tracing", ] +[[package]] +name = "blockfrost-platform-error-decoder" +version = "1.0.0-rc.2" +dependencies = [ + "blockfrost-platform-build-utils", + "blockfrost-platform-common", + "blockfrost-platform-testgen", + "hex", + "pallas-codec", + "pallas-hardano", + "pallas-network", + "serde_json", + "sysinfo", + "tokio", +] + [[package]] name = "blockfrost-platform-integration-tests" version = "1.0.0-rc.2" @@ -748,11 +764,9 @@ dependencies = [ "blockfrost-platform-build-utils", "blockfrost-platform-common", "chrono", - "crossbeam", "deadpool 0.13.0", "hex", "metrics", - "num_cpus", "pallas-codec", "pallas-crypto", "pallas-hardano", @@ -760,8 +774,20 @@ dependencies = [ "pallas-primitives", "pallas-traverse", "serde", + "tokio", + "tracing", +] + +[[package]] +name = "blockfrost-platform-testgen" +version = "1.0.0-rc.2" +dependencies = [ + "blockfrost-platform-common", + "crossbeam", + "hex", + "num_cpus", + "serde", "serde_json", - "sysinfo", "tokio", "tracing", ] diff --git a/Cargo.toml b/Cargo.toml index 7ae37401..47c02578 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,8 @@ members = [ "crates/api_provider", "crates/node", "crates/data_node", + "crates/testgen", + "crates/error_decoder", "crates/gateway", "crates/sdk_bridge", "crates/integration_tests", @@ -32,6 +34,7 @@ bf-build-utils = { path = "crates/build_utils", package = "blockfrost-platform-b bf-common = { path = "crates/common", package = "blockfrost-platform-common" } bf-data-node = { path = "crates/data_node", package = "blockfrost-platform-data-node" } bf-node = { path = "crates/node", package = "blockfrost-platform-node" } +bf-testgen = { path = "crates/testgen", package = "blockfrost-platform-testgen" } bip39 = "2.2.2" blockfrost = { version = "1.2.3", default-features = false, features = [ "native-tls", diff --git a/crates/build_utils/src/lib.rs b/crates/build_utils/src/lib.rs index 8438c445..c0dfb6a2 100644 --- a/crates/build_utils/src/lib.rs +++ b/crates/build_utils/src/lib.rs @@ -1,2 +1,3 @@ pub mod git; +pub mod target; pub mod testgen_hs; diff --git a/crates/build_utils/src/target.rs b/crates/build_utils/src/target.rs new file mode 100644 index 00000000..d05eeda8 --- /dev/null +++ b/crates/build_utils/src/target.rs @@ -0,0 +1,21 @@ +pub const fn os() -> &'static str { + if cfg!(target_os = "macos") { + "darwin" + } else if cfg!(target_os = "linux") { + "linux" + } else if cfg!(target_os = "windows") { + "windows" + } else { + panic!("Unsupported OS") + } +} + +pub const fn arch() -> &'static str { + if cfg!(target_arch = "x86_64") { + "x86_64" + } else if cfg!(target_arch = "aarch64") { + "aarch64" + } else { + panic!("Unsupported architecture") + } +} diff --git a/crates/build_utils/src/testgen_hs.rs b/crates/build_utils/src/testgen_hs.rs index 714aafac..6c240753 100644 --- a/crates/build_utils/src/testgen_hs.rs +++ b/crates/build_utils/src/testgen_hs.rs @@ -16,25 +16,10 @@ pub fn ensure() { return; } - let testgen_lib_version = "10.6.3.0"; - - let target_os = if cfg!(target_os = "macos") { - "darwin" - } else if cfg!(target_os = "linux") { - "linux" - } else if cfg!(target_os = "windows") { - "windows" - } else { - panic!("Unsupported OS"); - }; + let testgen_lib_version = "10.6.3.1"; - let arch = if cfg!(target_arch = "x86_64") { - "x86_64" - } else if cfg!(target_arch = "aarch64") { - "aarch64" - } else { - panic!("Unsupported architecture"); - }; + let target_os = super::target::os(); + let arch = super::target::arch(); let suffix = if target_os == "windows" { ".zip" @@ -44,7 +29,7 @@ pub fn ensure() { let file_name = format!("testgen-hs-{testgen_lib_version}-{arch}-{target_os}"); let download_url = format!( - "https://github.com/input-output-hk/testgen-hs/releases/download/{testgen_lib_version}/{file_name}{suffix}" + "https://github.com/blockfrost/testgen-hs/releases/download/{testgen_lib_version}/{file_name}{suffix}" ); println!("Looking for {file_name}"); diff --git a/crates/error_decoder/Cargo.toml b/crates/error_decoder/Cargo.toml new file mode 100644 index 00000000..20e6cf1c --- /dev/null +++ b/crates/error_decoder/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "blockfrost-platform-error-decoder" +version.workspace = true +license.workspace = true +edition.workspace = true +build = "build.rs" + +[dependencies] +bf-common.workspace = true +bf-testgen.workspace = true +hex.workspace = true +serde_json.workspace = true +tokio.workspace = true + +[features] +tarpaulin = [] + +[build-dependencies] +bf-build-utils.workspace = true + +[dev-dependencies] +bf-testgen = { workspace = true, features = ["test-utils"] } +sysinfo.workspace = true +pallas-network.workspace = true +pallas-hardano.workspace = true +pallas-codec.workspace = true diff --git a/crates/error_decoder/README.md b/crates/error_decoder/README.md new file mode 100644 index 00000000..e1359ace --- /dev/null +++ b/crates/error_decoder/README.md @@ -0,0 +1,5 @@ +# error decoder + +This package contains external error decoder for tx submit errors. Historically used to decode and generate error messages that are returned from our API. +Now it's only used to test `pallas-hardano` based implementation and generate random test cases. `pallas-hardano` vs ledger error generating tests are also in this crate. +Related `pallas-hardano` part initially implemented in this repository and later merged into pallas. diff --git a/crates/error_decoder/build.rs b/crates/error_decoder/build.rs new file mode 100644 index 00000000..aeb1456e --- /dev/null +++ b/crates/error_decoder/build.rs @@ -0,0 +1,3 @@ +fn main() { + bf_build_utils::testgen_hs::ensure(); +} diff --git a/crates/error_decoder/src/external.rs b/crates/error_decoder/src/external.rs new file mode 100644 index 00000000..08662458 --- /dev/null +++ b/crates/error_decoder/src/external.rs @@ -0,0 +1,113 @@ +use bf_common::errors::AppError; +use bf_testgen::testgen::{Testgen, TestgenResponse, Variant}; + +#[derive(Clone)] +pub struct ExternalDecoder { + testgen: Testgen, +} +impl ExternalDecoder { + pub fn spawn() -> Result { + let testgen = Testgen::spawn(Variant::DeserializeStream) + .map_err(|err| AppError::Server(format!("Failed to spawn ExternalDecoder: {err}")))?; + + Ok(Self { testgen }) + } + + pub async fn decode(&self, input: &[u8]) -> Result { + match self.testgen.decode(input).await { + Ok(resp) => match resp { + TestgenResponse::Ok(value) => Ok(value), + TestgenResponse::Err(err) => Err(err.to_string()), + }, + Err(err) => Err(err), + } + } + + /// This function is called at startup, so that we make sure that the worker is reasonable. + pub async fn startup_sanity_test(&self) -> Result<(), String> { + let input = hex::decode("8182068182028200a0").map_err(|err| err.to_string())?; + let result = self.decode(&input).await; + let expected = serde_json::json!({ + "contents": { + "contents": { + "contents": { + "era": "ShelleyBasedEraConway", + "error": [ + "ConwayCertsFailure (WithdrawalsNotInRewardsCERTS (Withdrawals {unWithdrawals = fromList []}))" + ], + "kind": "ShelleyTxValidationError" + }, + "tag": "TxValidationErrorInCardanoMode" + }, + "tag": "TxCmdTxSubmitValidationError" + }, + "tag": "TxSubmitFail" + }); + + if result == Ok(expected) { + Ok(()) + } else { + Err(format!( + "ExternalDecoder: startup_sanity_test failed: {result:?}" + )) + } + } + + /// A single global [`ExternalDecoder`] that you can cheaply use in tests. + #[cfg(all(test, not(feature = "tarpaulin")))] + pub fn instance() -> Self { + GLOBAL_INSTANCE.clone() + } +} + +#[cfg(all(test, not(feature = "tarpaulin")))] +static GLOBAL_INSTANCE: std::sync::LazyLock = + std::sync::LazyLock::new(|| ExternalDecoder::spawn().expect("Failed to spawn ExternalDecoder")); + +#[cfg(test)] +mod tests { + // The CBOR test cases are covered by `crates/error_decoder/src/tests/specific.rs`, + // which already cross-validates the Rust (pallas-hardano) implementation against the + // Haskell (testgen-hs) external decoder. This module only contains tests that are + // unique to the external decoder itself (e.g. crash-recovery behaviour). + #[cfg(not(feature = "tarpaulin"))] + use super::*; + + #[tokio::test] + //#[tracing_test::traced_test] + #[cfg(not(feature = "tarpaulin"))] + async fn test_sanity() { + let decoder = ExternalDecoder::spawn().unwrap(); + + // Wait for it to come up: + decoder.startup_sanity_test().await.unwrap(); + + // Now, kill our child to test the restart logic: + sysinfo::System::new_all() + .process(sysinfo::Pid::from_u32(decoder.testgen.child_pid().unwrap())) + .unwrap() + .kill(); + + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + let input = hex::decode("8182068183051a000c275b1a000b35ec").unwrap(); + let result = decoder.decode(&input).await; + + assert_eq!( + result, + Ok(serde_json::json!({"contents": + {"contents": + {"contents": + {"era": "ShelleyBasedEraConway", "error": + ["ConwayTreasuryValueMismatch (Mismatch {mismatchSupplied = Coin 734700, mismatchExpected = Coin 796507})"], + "kind": "ShelleyTxValidationError" + }, + "tag": "TxValidationErrorInCardanoMode" + }, "tag": "TxCmdTxSubmitValidationError" + }, + "tag": "TxSubmitFail" + } + )) + ); + } +} diff --git a/crates/error_decoder/src/lib.rs b/crates/error_decoder/src/lib.rs new file mode 100644 index 00000000..6d9999bd --- /dev/null +++ b/crates/error_decoder/src/lib.rs @@ -0,0 +1,3 @@ +pub mod external; +#[cfg(test)] +mod tests; diff --git a/crates/error_decoder/src/tests.rs b/crates/error_decoder/src/tests.rs new file mode 100644 index 00000000..6e0943e0 --- /dev/null +++ b/crates/error_decoder/src/tests.rs @@ -0,0 +1,4 @@ +#[cfg(test)] +mod random; +#[cfg(test)] +pub(crate) mod specific; diff --git a/crates/node/src/cbor/tests/random.rs b/crates/error_decoder/src/tests/random.rs similarity index 95% rename from crates/node/src/cbor/tests/random.rs rename to crates/error_decoder/src/tests/random.rs index 79d36779..c2b60926 100644 --- a/crates/node/src/cbor/tests/random.rs +++ b/crates/error_decoder/src/tests/random.rs @@ -1,4 +1,8 @@ -use super::*; +#![cfg(not(feature = "tarpaulin"))] + +use bf_testgen::tests::{CaseType, check_generated_cases}; +use pallas_hardano::display::haskell_error::serialize_error; +use pallas_network::miniprotocols::localtxsubmission::TxValidationError; #[test] #[allow(non_snake_case)] diff --git a/crates/node/src/cbor/tests/specific.rs b/crates/error_decoder/src/tests/specific.rs similarity index 99% rename from crates/node/src/cbor/tests/specific.rs rename to crates/error_decoder/src/tests/specific.rs index f9c312c2..952cf993 100644 --- a/crates/node/src/cbor/tests/specific.rs +++ b/crates/error_decoder/src/tests/specific.rs @@ -18,7 +18,80 @@ done ``` */ -use super::verify_one; +#[cfg(test)] +use pallas_network::miniprotocols::localtxsubmission::TxValidationError; + +/// This function takes a CBOR-encoded `ApplyTxErr`, and verifies our +/// deserializer against the Haskell one. Use it for specific cases. +/// +/// Under `tarpaulin`, the external Haskell decoder (`testgen-hs`) is not +/// available, so we only exercise the Rust decode + serialize path (which is +/// what we want coverage for anyway). +#[cfg(test)] +pub(crate) async fn verify_one(cbor: &str) { + use pallas_hardano::display::haskell_error::serialize_error; + + let cbor = hex::decode(cbor).unwrap(); + + let our_decoding = decode_error(&cbor); + let our_json = serialize_error(our_decoding).expect("Failed to serialize error"); + + #[cfg(not(feature = "tarpaulin"))] + { + use crate::external::ExternalDecoder; + + let reference_json = match ExternalDecoder::instance().decode(&cbor).await { + Ok(value) => value, + Err(shared_decoder_err) => { + // Recover from a poisoned shared decoder process by retrying with a fresh one. + let fresh_decoder = ExternalDecoder::spawn() + .expect("Failed to spawn a fresh ExternalDecoder for retry"); + fresh_decoder.decode(&cbor).await.unwrap_or_else(|fresh_decoder_err| { + panic!( + "Failed to decode reference JSON with both shared and fresh ExternalDecoder instances. shared_error={shared_decoder_err}, fresh_error={fresh_decoder_err}, cbor={}", + hex::encode(&cbor) + ) + }) + }, + }; + + assert_json_eq!(reference_json, our_json); + } + + // Under tarpaulin: just assert the Rust decoder didn't panic and produced valid JSON. + #[cfg(feature = "tarpaulin")] + { + let _ = our_json; + } +} +#[cfg(test)] +fn decode_error(bytes: &[u8]) -> TxValidationError { + use pallas_codec::minicbor; + + let mut decoder = minicbor::Decoder::new(bytes); + decoder.decode().unwrap() +} + +#[cfg(all(test, not(feature = "tarpaulin")))] +macro_rules! assert_json_eq { + ($left:expr, $right:expr) => { + if $left != $right { + let left_pretty = serde_json::to_string_pretty(&$left).unwrap(); + let right_pretty = serde_json::to_string_pretty(&$right).unwrap(); + panic!( + concat!( + "assertion `left == right` failed\n", + " left:\n {}\n right:\n {}", + ), + left_pretty.replace("\n", "\n "), + right_pretty.replace("\n", "\n "), + ); + } + }; +} + +#[cfg(all(test, not(feature = "tarpaulin")))] +pub(crate) use assert_json_eq; // export it #[tokio::test] #[allow(non_snake_case)] diff --git a/crates/node/Cargo.toml b/crates/node/Cargo.toml index 0c2d3476..b28e4919 100644 --- a/crates/node/Cargo.toml +++ b/crates/node/Cargo.toml @@ -9,14 +9,10 @@ build = "build.rs" bf-common.workspace = true serde.workspace = true tracing.workspace = true -crossbeam.workspace = true hex.workspace = true -serde_json.workspace = true tokio.workspace = true pallas-network.workspace = true deadpool.workspace = true -sysinfo.workspace = true -num_cpus.workspace = true metrics.workspace = true pallas-hardano.workspace = true pallas-crypto.workspace = true diff --git a/crates/node/src/cbor.rs b/crates/node/src/cbor.rs index 9cfdfb72..08103424 100644 --- a/crates/node/src/cbor.rs +++ b/crates/node/src/cbor.rs @@ -1,5 +1 @@ -pub mod fallback_decoder; - -#[cfg(test)] -pub mod tests; pub(crate) mod validation; diff --git a/crates/node/src/cbor/fallback_decoder.rs b/crates/node/src/cbor/fallback_decoder.rs deleted file mode 100644 index 24e2f488..00000000 --- a/crates/node/src/cbor/fallback_decoder.rs +++ /dev/null @@ -1,298 +0,0 @@ -use bf_common::errors::AppError; -use std::io::{BufRead, BufReader, Write}; -use std::process::{self as proc}; -use std::sync::{ - Arc, - atomic::{self, AtomicU32}, -}; -use std::thread; -use tokio::sync::{mpsc, oneshot}; -use tracing::{error, info}; - -#[derive(Clone)] -pub struct FallbackDecoder { - sender: mpsc::Sender, - current_child_pid: Arc, -} - -struct FDRequest { - cbor: Vec, - response_tx: oneshot::Sender>, -} - -impl FallbackDecoder { - /// Starts a new child process. - pub fn spawn() -> Result { - let testgen_hs_path = Self::find_testgen_hs().map_err(AppError::Server)?; - - info!( - "Using {} as a fallback CBOR error decoder", - &testgen_hs_path - ); - - let current_child_pid = Arc::new(AtomicU32::new(u32::MAX)); - let current_child_pid_clone = current_child_pid.clone(); - let (sender, mut receiver) = mpsc::channel::(128); - - // Clone `testgen_hs_path` for the thread. - let testgen_hs_path_for_thread = testgen_hs_path.clone(); - - thread::spawn(move || { - // For retries: - let mut last_unfulfilled_request: Option = None; - - loop { - let single_run = Self::spawn_child( - &testgen_hs_path_for_thread, - &mut receiver, - &mut last_unfulfilled_request, - ¤t_child_pid_clone, - ); - let restart_delay = std::time::Duration::from_secs(1); - error!( - "will restart in {:?} because of a subprocess error: {:?}", - restart_delay, single_run - ); - std::thread::sleep(restart_delay); - } - }); - - Ok(Self { - sender, - current_child_pid, - }) - } - - /// Decodes a CBOR error using the child process. - pub async fn decode(&self, cbor: &[u8]) -> Result { - let (response_tx, response_rx) = oneshot::channel(); - self.sender - .send(FDRequest { - cbor: cbor.to_vec(), - response_tx, - }) - .await - .map_err(|err| format!("failed to send request: {err:?}"))?; - - response_rx - .await - .unwrap_or_else(|err| unreachable!("worker thread dropped (can’t happen): {:?}", err)) - } - - /// Searches for `testgen-hs` in multiple directories. - pub fn find_testgen_hs() -> Result { - bf_common::find_libexec::find_libexec("testgen-hs", "TESTGEN_HS_PATH", &["--version"]) - } - - /// This function is called at startup, so that we make sure that the worker is reasonable. - pub async fn startup_sanity_test(&self) -> Result<(), String> { - let input = hex::decode("8182068182028200a0").map_err(|err| err.to_string())?; - let result = self.decode(&input).await; - let expected = serde_json::json!({ - "contents": { - "contents": { - "contents": { - "era": "ShelleyBasedEraConway", - "error": [ - "ConwayCertsFailure (WithdrawalsNotInRewardsCERTS (Withdrawals {unWithdrawals = fromList []}))" - ], - "kind": "ShelleyTxValidationError" - }, - "tag": "TxValidationErrorInCardanoMode" - }, - "tag": "TxCmdTxSubmitValidationError" - }, - "tag": "TxSubmitFail" - }); - - if result == Ok(expected) { - Ok(()) - } else { - Err(format!("startup_sanity_test failed: {result:?}")) - } - } - - /// Returns the current child PID: - pub fn child_pid(&self) -> Option { - match self.current_child_pid.load(atomic::Ordering::Relaxed) { - u32::MAX => None, - pid => Some(pid), - } - } - - #[cfg(test)] - /// A single global [`FallbackDecoder`] that you can cheaply use in tests. - pub fn instance() -> Self { - GLOBAL_INSTANCE.clone() - } - - fn spawn_child( - testgen_hs_path: &str, - receiver: &mut mpsc::Receiver, - last_unfulfilled_request: &mut Option, - current_child_pid: &Arc, - ) -> Result<(), String> { - let mut child = proc::Command::new(testgen_hs_path) - .arg("deserialize-stream") - .stdin(proc::Stdio::piped()) - .stdout(proc::Stdio::piped()) - .spawn() - .map_err(|err| format!("couldn’t start the child: {err:?}"))?; - - current_child_pid.store(child.id(), atomic::Ordering::Relaxed); - - let result = Self::process_requests(&mut child, receiver, last_unfulfilled_request); - - // Let’s make sure it’s dead in case a different error landed us here. - // Will return Ok(()) if already dead. - child - .kill() - .map_err(|err| format!("couldn’t kill the child: {err:?}"))?; - child - .wait() - .map_err(|err| format!("couldn’t reap the child: {err:?}"))?; - - result - } - - fn process_requests( - child: &mut proc::Child, - receiver: &mut mpsc::Receiver, - last_unfulfilled_request: &mut Option, - ) -> Result<(), String> { - let stdin = child - .stdin - .as_mut() - .ok_or("couldn’t grab stdin".to_string())?; - let stdout = child - .stdout - .as_mut() - .ok_or("couldn’t grab stdout".to_string())?; - let stdout_reader = BufReader::new(stdout); - let mut stdout_lines = stdout_reader.lines(); - - while let Some((request, is_a_retry)) = last_unfulfilled_request - .take() - .map(|a| (a, true)) - .or_else(|| receiver.blocking_recv().map(|a| (a, false))) - { - let cbor_hex = hex::encode(&request.cbor); - *last_unfulfilled_request = Some(request); - - let mut ask_and_receive = || -> Result, String> { - writeln!(stdin, "{cbor_hex}") - .map_err(|err| format!("couldn’t write to stdin: {err:?}"))?; - - match stdout_lines.next() { - Some(Ok(line)) => Ok(Self::parse_json(&line)), - Some(Err(e)) => Err(format!("failed to read from subprocess: {e}")), - None => Err("no output from subprocess".to_string()), - } - }; - - // Split the result to satisfy the borrow checker: - let (result_for_response, result_for_logs) = partition_result(ask_and_receive()); - - // We want to respond to the user with a failure in case this was a retry. - // Otherwise, it’s an infinite loop and wait time for the response. - if is_a_retry || result_for_response.is_ok() { - // unwrap is safe, we wrote there right before the writeln!() - let request = last_unfulfilled_request.take().unwrap(); - - let response = match result_for_response { - Ok(ok) => ok, - Err(_) => Err("repeated internal failure".to_string()), - }; - - // unwrap is safe, the other side would have to drop for a - // panic – can’t happen, because we control it: - request - .response_tx - .send(response) - .unwrap_or_else(|_| unreachable!()); - } - - // Now break the loop, and restart everything if we failed: - result_for_logs? - } - - Err("request channel closed, won’t happen".to_string()) - } - - fn parse_json(input: &str) -> Result { - let mut parsed: serde_json::Value = - serde_json::from_str(input).map_err(|e| e.to_string())?; - - parsed - .as_object() - .and_then(|obj| { - if obj.len() == 1 { - obj.get("error") - .and_then(|v| v.as_str()) - .map(|s| Err(s.to_string())) - } else { - None - } - }) - .unwrap_or_else(|| { - parsed - .get_mut("json") - .map(serde_json::Value::take) - .ok_or_else(|| "Missing 'json' field".to_string()) - }) - } -} - -fn partition_result(ae: Result) -> (Result, Result<(), E>) { - match ae { - Err(err) => (Err(()), Err(err)), - Ok(ok) => (Ok(ok), Ok(())), - } -} - -#[cfg(test)] -static GLOBAL_INSTANCE: std::sync::LazyLock = - std::sync::LazyLock::new(|| FallbackDecoder::spawn().expect("Failed to spawn FallbackDecoder")); - -#[cfg(test)] -mod tests { - #[cfg(not(feature = "tarpaulin"))] - use super::*; - #[tokio::test] - //#[tracing_test::traced_test] - #[cfg(not(feature = "tarpaulin"))] - async fn test_fallback_decoder() { - let decoder = FallbackDecoder::spawn().unwrap(); - - // Wait for it to come up: - decoder.startup_sanity_test().await.unwrap(); - - // Now, kill our child to test the restart logic: - sysinfo::System::new_all() - .process(sysinfo::Pid::from_u32(decoder.child_pid().unwrap())) - .unwrap() - .kill(); - - tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; - - let input = hex::decode("8182068183051a000c275b1a000b35ec").unwrap(); - let result = decoder.decode(&input).await; - - assert_eq!( - result, - Ok(serde_json::json!({"contents": - {"contents": - {"contents": - {"era": "ShelleyBasedEraConway", "error": - ["ConwayTreasuryValueMismatch (Mismatch {mismatchSupplied = Coin 734700, mismatchExpected = Coin 796507})"], - "kind": "ShelleyTxValidationError" - }, - "tag": "TxValidationErrorInCardanoMode" - }, "tag": "TxCmdTxSubmitValidationError" - }, - "tag": "TxSubmitFail" - } - )) - ); - } -} diff --git a/crates/testgen/Cargo.toml b/crates/testgen/Cargo.toml new file mode 100644 index 00000000..7d7d4a16 --- /dev/null +++ b/crates/testgen/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "blockfrost-platform-testgen" +version.workspace = true +license.workspace = true +edition.workspace = true + +[dependencies] +bf-common.workspace = true +serde.workspace = true +tracing.workspace = true +hex.workspace = true +serde_json.workspace = true +tokio.workspace = true +num_cpus.workspace = true +crossbeam.workspace = true + +[features] +tarpaulin = [] +test-utils = [] diff --git a/crates/testgen/README.md b/crates/testgen/README.md new file mode 100644 index 00000000..1dbba396 --- /dev/null +++ b/crates/testgen/README.md @@ -0,0 +1,3 @@ +# testgen + +Base implementation for communicating with the Haskell binary, testgen. diff --git a/crates/testgen/src/lib.rs b/crates/testgen/src/lib.rs new file mode 100644 index 00000000..1dbf4645 --- /dev/null +++ b/crates/testgen/src/lib.rs @@ -0,0 +1,3 @@ +pub mod testgen; +#[cfg(feature = "test-utils")] +pub mod tests; diff --git a/crates/testgen/src/testgen.rs b/crates/testgen/src/testgen.rs new file mode 100644 index 00000000..abf8ac73 --- /dev/null +++ b/crates/testgen/src/testgen.rs @@ -0,0 +1,445 @@ +use bf_common::errors::AppError; +use serde::Deserialize; +use serde::de; +use std::io::{BufRead, BufReader, Write}; +use std::path::{Path, PathBuf}; +use std::process::{self as proc, Command}; +use std::sync::{ + Arc, + atomic::{self, AtomicU32}, +}; +use std::{env, thread}; +use tokio::sync::{mpsc, oneshot}; +use tracing::{debug, error, info, warn}; + +/// Handle to a long-running `testgen-hs` subprocess. Cheap to clone; +/// clones share one worker thread. +#[derive(Clone)] +pub struct Testgen { + sender: mpsc::Sender, + current_child_pid: Arc, +} + +struct TestgenRequest { + payload: String, + response: oneshot::Sender>, +} + +/// Which `testgen-hs` subcommand to run. +#[derive(Debug, Clone, Copy)] +#[non_exhaustive] +pub enum Variant { + DeserializeStream, +} + +impl Variant { + fn as_arg(self) -> &'static str { + match self { + Self::DeserializeStream => "deserialize-stream", + } + } +} + +impl std::fmt::Display for Variant { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(self.as_arg()) + } +} + +#[derive(Debug, PartialEq)] +pub enum TestgenResponse { + Ok(serde_json::Value), + Err(serde_json::Value), +} + +const MISSING_BOTH_FIELDS_MSG: &str = + "invalid testgen-hs response: missing both `json` and `error`"; + +const REQUEST_CHANNEL_CAPACITY: usize = 128; +const RESTART_DELAY: std::time::Duration = std::time::Duration::from_secs(1); +const RESPONSE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30); + +#[derive(Deserialize)] +struct TestgenResponseWire { + #[serde(default)] + json: Option, + #[serde(default)] + error: Option, +} + +impl<'de> Deserialize<'de> for TestgenResponse { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let wire = TestgenResponseWire::deserialize(deserializer)?; + match (wire.json, wire.error) { + (Some(json), Some(err)) => { + warn!( + "testgen-hs response has both `json` and `error` fields, discarding error: {}", + err + ); + Ok(Self::Ok(json)) + }, + (Some(json), None) => Ok(Self::Ok(json)), + (None, Some(error)) => Ok(Self::Err(error)), + (None, None) => Err(de::Error::custom(MISSING_BOTH_FIELDS_MSG)), + } + } +} + +impl Testgen { + /// Starts a new child process. + pub fn spawn(variant: Variant) -> Result { + Self::spawn_inner(variant, None) + } + + /// Starts a new child process with an init payload that is sent as the first + /// message on every (re)start. The response to the init payload is validated + /// but not returned — it is only used to confirm the subprocess is ready. + pub fn spawn_with_init(variant: Variant, init_payload: String) -> Result { + Self::spawn_inner(variant, Some(init_payload)) + } + + fn spawn_inner(variant: Variant, init_payload: Option) -> Result { + let testgen_hs_path = Self::find_testgen_hs().map_err(AppError::Server)?; + + info!( + "Spawning testgen-hs ({}) from {}", + variant, &testgen_hs_path + ); + + let current_child_pid = Arc::new(AtomicU32::new(u32::MAX)); + let current_child_pid_clone = current_child_pid.clone(); + let (sender, mut receiver) = mpsc::channel::(REQUEST_CHANNEL_CAPACITY); + let testgen_hs_path_for_thread = testgen_hs_path.clone(); + + thread::spawn(move || { + let mut last_unfulfilled_request: Option = None; + + loop { + let single_run = Self::spawn_child( + &testgen_hs_path_for_thread, + &mut receiver, + &mut last_unfulfilled_request, + ¤t_child_pid_clone, + variant, + init_payload.as_deref(), + ); + + // Exit if no work is pending and all senders are gone. + if last_unfulfilled_request.is_none() { + match receiver.try_recv() { + Err(mpsc::error::TryRecvError::Disconnected) => { + info!("Testgen: all senders dropped, shutting down background worker"); + break; + }, + Ok(req) => { + last_unfulfilled_request = Some(req); + }, + Err(mpsc::error::TryRecvError::Empty) => {}, + } + } + + error!( + "Testgen: will restart in {RESTART_DELAY:?} because of a subprocess error: {}", + single_run.err().unwrap_or_else(|| "unknown".to_string()) + ); + std::thread::sleep(RESTART_DELAY); + + // Re-check after the delay: senders may have dropped while we slept. + if last_unfulfilled_request.is_none() && receiver.is_closed() { + info!("Testgen: all senders dropped during restart delay, shutting down"); + break; + } + } + }); + + Ok(Self { + sender, + current_child_pid, + }) + } + + /// Sends the payload to the child process. + pub async fn decode(&self, cbor: &[u8]) -> Result { + self.send(hex::encode(cbor)).await + } + + /// Sends the payload to the child process. + pub async fn send(&self, payload: String) -> Result { + let (response, response_rx) = oneshot::channel(); + + self.sender + .send(TestgenRequest { payload, response }) + .await + .map_err(|err| format!("Testgen: failed to send request: {err:?}"))?; + + tokio::time::timeout(RESPONSE_TIMEOUT, response_rx) + .await + .map_err(|_| format!("Testgen: request timed out after {RESPONSE_TIMEOUT:?}"))? + .map_err(|err| format!("Testgen: worker thread dropped: {err:?}"))? + } + + /// Searches for `testgen-hs` in multiple directories. + pub fn find_testgen_hs() -> Result { + let exe_name = if cfg!(target_os = "windows") { + "testgen-hs.exe" + } else { + "testgen-hs" + }; + + let mut search_paths: Vec = Vec::new(); + + if let Ok(path) = env::var("TESTGEN_HS_PATH") { + search_paths.push(PathBuf::from(path)); + } + + if let Some(path) = option_env!("TESTGEN_HS_PATH") { + search_paths.push(PathBuf::from(path)); + } + + // This is the most important one for relocatable directories (that keep the initial + // structure) on Windows, Linux, macOS. + if let Ok(current_exe) = env::current_exe() + && let Ok(current_exe) = std::fs::canonicalize(current_exe) + && let Some(exe_dir) = current_exe.parent() + { + search_paths.push(exe_dir.join(exe_name)); + + // build_utils::testgen_hs::ensure extracts to target/{debug|release}/testgen-hs/. + search_paths.push(exe_dir.join("testgen-hs").join(exe_name)); + + if let Some(profile_dir) = exe_dir.parent() { + search_paths.push(profile_dir.join("testgen-hs").join(exe_name)); + } + } + + // Docker image fallback. + search_paths.push(PathBuf::from("/app/testgen-hs")); + + // System PATH lookup. + search_paths.extend( + env::var("PATH") + .map(|p| { + env::split_paths(&p) + .map(|dir| dir.join(exe_name)) + .collect::>() + }) + .unwrap_or_default(), + ); + + debug!("{} search paths = {:?}", exe_name, search_paths); + + // Checks if the path is runnable. Adjust for platform specifics if needed. + // TODO: check that the --version matches what we expect. + fn is_our_executable(path: &Path) -> bool { + Command::new(path).arg("--version").output().is_ok() + } + + // Look in each candidate path to find a matching executable. + for candidate in &search_paths { + let path = if candidate.file_name().is_some_and(|name| name == exe_name) { + candidate.clone() + } else { + candidate.join(exe_name) + }; + + if path.is_file() && is_our_executable(path.as_path()) { + return Ok(path.to_string_lossy().to_string()); + } + } + + Err(format!( + "No valid `{}` binary found in {:?}.", + exe_name, &search_paths + )) + } + + /// Returns the current child PID: + pub fn child_pid(&self) -> Option { + match self.current_child_pid.load(atomic::Ordering::Relaxed) { + u32::MAX => None, + pid => Some(pid), + } + } + + fn spawn_child( + testgen_hs_path: &str, + receiver: &mut mpsc::Receiver, + last_unfulfilled_request: &mut Option, + current_child_pid: &Arc, + variant: Variant, + init_payload: Option<&str>, + ) -> Result<(), String> { + let mut child = proc::Command::new(testgen_hs_path) + .arg(variant.as_arg()) + .stdin(proc::Stdio::piped()) + .stdout(proc::Stdio::piped()) + .spawn() + .map_err(|err| format!("couldn’t start the child: {err:?}"))?; + + current_child_pid.store(child.id(), atomic::Ordering::Relaxed); + + let result = Self::process_requests( + &mut child, + receiver, + last_unfulfilled_request, + init_payload, + variant, + ); + + let _ = child + .kill() + .inspect_err(|err| warn!(err = %err, "Testgen: child pid kill failed")); + let _ = child + .wait() + .inspect_err(|err| warn!(err = %err, "Testgen: child pid wait failed")); + current_child_pid.store(u32::MAX, atomic::Ordering::Relaxed); + + result + } + + fn process_requests( + child: &mut proc::Child, + receiver: &mut mpsc::Receiver, + last_unfulfilled_request: &mut Option, + init_payload: Option<&str>, + variant: Variant, + ) -> Result<(), String> { + let stdin = child + .stdin + .as_mut() + .ok_or("couldn’t grab stdin".to_string())?; + let stdout = child + .stdout + .as_mut() + .ok_or("couldn’t grab stdout".to_string())?; + let stdout_reader = BufReader::new(stdout); + let mut stdout_lines = stdout_reader.lines(); + + // Send init payload before processing any requests. + if let Some(payload) = init_payload { + writeln!(stdin, "{payload}") + .map_err(|err| format!("couldn’t write init payload: {err:?}"))?; + match stdout_lines.next() { + Some(Ok(line)) => { + let resp: TestgenResponse = serde_json::from_str(&line) + .map_err(|e| format!("init response parse error: {e}"))?; + match resp { + TestgenResponse::Ok(_) => { + info!("subprocess ({variant}) initialized successfully") + }, + TestgenResponse::Err(e) => return Err(format!("init failed: {e}")), + } + }, + Some(Err(e)) => return Err(format!("init read error: {e}")), + None => return Err("no init response from subprocess".to_string()), + } + } + + while let Some((request, is_a_retry)) = last_unfulfilled_request + .take() + .map(|a| (a, true)) + .or_else(|| receiver.blocking_recv().map(|a| (a, false))) + { + let payload = request.payload.clone(); + *last_unfulfilled_request = Some(request); + + let mut ask_and_receive = || -> Result, String> { + writeln!(stdin, "{payload}") + .map_err(|err| format!("couldn’t write to stdin: {err:?}"))?; + + match stdout_lines.next() { + Some(Ok(line)) => Ok(Ok(serde_json::from_str::(&line) + .map_err(|e| e.to_string())?)), + + Some(Err(e)) => Err(format!("failed to read from subprocess: {e}")), + None => Err("no output from subprocess".to_string()), + } + }; + + // Split the result to satisfy the borrow checker: + let (result_for_response, result_for_logs) = partition_result(ask_and_receive()); + + // We want to respond to the user with a failure in case this was a retry. + // Otherwise, it’s an infinite loop and wait time for the response. + if is_a_retry || result_for_response.is_ok() { + // unwrap is safe, we wrote there right before the writeln!() + let request = last_unfulfilled_request.take().unwrap(); + + let response = match result_for_response { + Ok(ok) => ok, + Err(_) => Err("repeated internal failure".to_string()), + }; + + let _ = request.response.send(response); + } + + // Now break the loop, and restart everything if we failed: + result_for_logs? + } + + Err("request channel closed".to_string()) + } +} + +fn partition_result(ae: Result) -> (Result, Result<(), E>) { + match ae { + Err(err) => (Err(()), Err(err)), + Ok(ok) => (Ok(ok), Ok(())), + } +} + +#[cfg(test)] +mod tests { + use super::{MISSING_BOTH_FIELDS_MSG, TestgenResponse}; + use serde_json::json; + + fn parse(s: &str) -> Result { + serde_json::from_str(s) + } + + #[test] + fn only_json_field_is_ok() { + assert_eq!( + parse(r#"{"json": {"value": 42}}"#).unwrap(), + TestgenResponse::Ok(json!({"value": 42})), + ); + } + + #[test] + fn only_error_field_is_err() { + assert_eq!( + parse(r#"{"error": "boom"}"#).unwrap(), + TestgenResponse::Err(json!("boom")), + ); + } + + #[test] + fn both_fields_prefer_json() { + assert_eq!( + parse(r#"{"json": {"ok": true}, "error": "ignored"}"#).unwrap(), + TestgenResponse::Ok(json!({"ok": true})), + ); + } + + #[test] + fn neither_field_fails_deserialization() { + let err = parse(r#"{}"#).unwrap_err(); + assert!( + err.to_string().contains(MISSING_BOTH_FIELDS_MSG), + "unexpected error: {err}", + ); + } + + /// Locks in the wire contract: unknown fields are silently dropped, so + /// `testgen-hs` can add new response fields without breaking us. + #[test] + fn unknown_fields_are_ignored() { + assert_eq!( + parse(r#"{"json": 1, "extra": "meta"}"#).unwrap(), + TestgenResponse::Ok(json!(1)), + ); + } +} diff --git a/crates/node/src/cbor/tests.rs b/crates/testgen/src/tests.rs similarity index 73% rename from crates/node/src/cbor/tests.rs rename to crates/testgen/src/tests.rs index 1001e682..24fd4ee1 100644 --- a/crates/node/src/cbor/tests.rs +++ b/crates/testgen/src/tests.rs @@ -1,18 +1,6 @@ -#[cfg(not(feature = "tarpaulin"))] -use super::fallback_decoder::FallbackDecoder; -#[cfg(not(feature = "tarpaulin"))] -use num_cpus; - -#[cfg(not(feature = "tarpaulin"))] -use pallas_hardano::display::haskell_error::serialize_error; use serde::Deserialize; use std::process::Command; -#[cfg(not(feature = "tarpaulin"))] -mod random; -#[cfg(not(feature = "tarpaulin"))] -mod specific; - #[derive(Deserialize, Debug)] #[serde(rename_all(deserialize = "camelCase"))] pub struct CborTestCase { @@ -30,6 +18,7 @@ pub struct CborTestSeed { #[derive(Debug, Clone, Copy)] #[allow(non_camel_case_types, dead_code)] pub enum CaseType { + Tx_Conway, ApplyTxErr_Byron, ApplyTxErr_Shelley, ApplyTxErr_Allegra, @@ -58,7 +47,7 @@ pub fn check_generated_cases( { use std::io::{BufRead, BufReader}; - let child_exe = super::fallback_decoder::FallbackDecoder::find_testgen_hs().unwrap(); + let child_exe = crate::testgen::Testgen::find_testgen_hs().unwrap(); let mut child = Command::new(&child_exe) .arg("generate") @@ -164,44 +153,3 @@ pub fn check_generated_cases( } }); } -#[cfg(not(feature = "tarpaulin"))] -macro_rules! assert_json_eq { - ($left:expr, $right:expr) => { - if $left != $right { - let left_pretty = serde_json::to_string_pretty(&$left).unwrap(); - let right_pretty = serde_json::to_string_pretty(&$right).unwrap(); - panic!( - concat!( - "assertion `left == right` failed\n", - " left:\n {}\n right:\n {}", - ), - left_pretty.replace("\n", "\n "), - right_pretty.replace("\n", "\n "), - ); - } - }; -} - -/// This function takes a CBOR-encoded `ApplyTxErr`, and verifies our -/// deserializer against the Haskell one. Use it for specific cases. -#[cfg(not(feature = "tarpaulin"))] -async fn verify_one(cbor: &str) { - let cbor = hex::decode(cbor).unwrap(); - let reference_json = FallbackDecoder::instance().decode(&cbor).await.unwrap(); - - let our_decoding = decode_error(&cbor); - - let our_json = serialize_error(our_decoding).unwrap(); - assert_json_eq!(reference_json, our_json) -} -#[cfg(not(feature = "tarpaulin"))] -use pallas_network::miniprotocols::localtxsubmission::TxValidationError; - -#[cfg(test)] -#[cfg(not(feature = "tarpaulin"))] -fn decode_error(cbor: &[u8]) -> TxValidationError { - use pallas_codec::minicbor; - - let mut decoder = minicbor::Decoder::new(cbor); - decoder.decode().unwrap() -} diff --git a/flake.lock b/flake.lock index 7cb16da3..f7f235d8 100644 --- a/flake.lock +++ b/flake.lock @@ -363,16 +363,16 @@ "testgen-hs": { "flake": false, "locked": { - "lastModified": 1775724489, - "narHash": "sha256-UiTRFhvLifxg/Ve9RJsOKRPLGQSOzAX6skd1Eq6QAms=", - "owner": "input-output-hk", + "lastModified": 1776089820, + "narHash": "sha256-Io/LO7dTDmRvVWLSNpxhH8fQcHcJeBrqviYgYnhgRiM=", + "owner": "blockfrost", "repo": "testgen-hs", - "rev": "df1ac503b267beb7bcbfc6b953cf214b04c03d30", + "rev": "358cd9124b62a9dc647a96687c272f540eb2f134", "type": "github" }, "original": { - "owner": "input-output-hk", - "ref": "10.6.3.0", + "owner": "blockfrost", + "ref": "10.6.3.1", "repo": "testgen-hs", "type": "github" } diff --git a/flake.nix b/flake.nix index 9332a796..f7eb63ca 100644 --- a/flake.nix +++ b/flake.nix @@ -29,7 +29,7 @@ }; mithril.url = "github:input-output-hk/mithril/2524.0"; testgen-hs = { - url = "github:input-output-hk/testgen-hs/10.6.3.0"; # make sure it follows cardano-node + url = "github:blockfrost/testgen-hs/10.6.3.1"; # make sure it follows cardano-node flake = false; # otherwise, +2k dependencies we don’t really use }; hydra = { diff --git a/nix/internal/windows.nix b/nix/internal/windows.nix index 1a3f541f..68484da3 100644 --- a/nix/internal/windows.nix +++ b/nix/internal/windows.nix @@ -115,7 +115,7 @@ in rec { in pkgs.fetchzip { name = "testgen-hs-${version}"; - url = "https://github.com/input-output-hk/testgen-hs/releases/download/${version}/testgen-hs-${version}-${targetSystem}.zip"; + url = "https://github.com/blockfrost/testgen-hs/releases/download/${version}/testgen-hs-${version}-${targetSystem}.zip"; hash = "sha256-LXE1RBKgal1Twh7j2hpCfNLsBMEcqSwGHb4bj/Imd9Q="; };