From a1d425ce2716a95b8b0ba00555515c72fcc1dad5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C4=B1z=C4=B1r=20Sefa=20=C4=B0rken?= Date: Fri, 17 Apr 2026 13:24:04 +0300 Subject: [PATCH 01/10] chore: bump testgen-hs flake input to 10.6.3.1 and switch windows URL to blockfrost --- flake.lock | 8 ++++---- flake.nix | 2 +- nix/internal/windows.nix | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/flake.lock b/flake.lock index 63304b04..549eb101 100644 --- a/flake.lock +++ b/flake.lock @@ -363,16 +363,16 @@ "testgen-hs": { "flake": false, "locked": { - "lastModified": 1775724489, - "narHash": "sha256-UiTRFhvLifxg/Ve9RJsOKRPLGQSOzAX6skd1Eq6QAms=", + "lastModified": 1776089820, + "narHash": "sha256-Io/LO7dTDmRvVWLSNpxhH8fQcHcJeBrqviYgYnhgRiM=", "owner": "input-output-hk", "repo": "testgen-hs", - "rev": "df1ac503b267beb7bcbfc6b953cf214b04c03d30", + "rev": "358cd9124b62a9dc647a96687c272f540eb2f134", "type": "github" }, "original": { "owner": "input-output-hk", - "ref": "10.6.3.0", + "ref": "10.6.3.1", "repo": "testgen-hs", "type": "github" } diff --git a/flake.nix b/flake.nix index 4d063488..2027764c 100644 --- a/flake.nix +++ b/flake.nix @@ -29,7 +29,7 @@ }; mithril.url = "github:input-output-hk/mithril/2524.0"; testgen-hs = { - url = "github:input-output-hk/testgen-hs/10.6.3.0"; # make sure it follows cardano-node + url = "github:input-output-hk/testgen-hs/10.6.3.1"; # make sure it follows cardano-node flake = false; # otherwise, +2k dependencies we don’t really use }; hydra = { diff --git a/nix/internal/windows.nix b/nix/internal/windows.nix index 1a3f541f..68484da3 100644 --- a/nix/internal/windows.nix +++ b/nix/internal/windows.nix @@ -115,7 +115,7 @@ in rec { in pkgs.fetchzip { name = "testgen-hs-${version}"; - url = "https://github.com/input-output-hk/testgen-hs/releases/download/${version}/testgen-hs-${version}-${targetSystem}.zip"; + url = "https://github.com/blockfrost/testgen-hs/releases/download/${version}/testgen-hs-${version}-${targetSystem}.zip"; hash = "sha256-LXE1RBKgal1Twh7j2hpCfNLsBMEcqSwGHb4bj/Imd9Q="; }; From 72daec91401c3d63067d8d087fe28f9570fed4c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C4=B1z=C4=B1r=20Sefa=20=C4=B0rken?= Date: Fri, 17 Apr 2026 13:24:04 +0300 Subject: [PATCH 02/10] refactor: extract testgen-hs handling into bf-testgen and bf-error-decoder crates --- Cargo.lock | 32 +- Cargo.toml | 3 + crates/build_utils/src/lib.rs | 1 + crates/build_utils/src/target.rs | 21 + crates/build_utils/src/testgen_hs.rs | 23 +- crates/error_decoder/Cargo.toml | 26 ++ crates/error_decoder/README.md | 5 + crates/error_decoder/build.rs | 3 + crates/error_decoder/src/external.rs | 113 ++++++ crates/error_decoder/src/lib.rs | 3 + crates/error_decoder/src/tests.rs | 4 + .../src}/tests/random.rs | 6 +- .../src}/tests/specific.rs | 75 +++- crates/node/Cargo.toml | 4 - crates/node/src/cbor.rs | 4 - crates/node/src/cbor/fallback_decoder.rs | 298 -------------- crates/testgen/Cargo.toml | 19 + crates/testgen/README.md | 3 + crates/testgen/src/lib.rs | 3 + crates/testgen/src/testgen.rs | 369 ++++++++++++++++++ .../{node/src/cbor => testgen/src}/tests.rs | 53 +-- 21 files changed, 687 insertions(+), 381 deletions(-) create mode 100644 crates/build_utils/src/target.rs create mode 100644 crates/error_decoder/Cargo.toml create mode 100644 crates/error_decoder/README.md create mode 100644 crates/error_decoder/build.rs create mode 100644 crates/error_decoder/src/external.rs create mode 100644 crates/error_decoder/src/lib.rs create mode 100644 crates/error_decoder/src/tests.rs rename crates/{node/src/cbor => error_decoder/src}/tests/random.rs (95%) rename crates/{node/src/cbor => error_decoder/src}/tests/specific.rs (99%) delete mode 100644 crates/node/src/cbor/fallback_decoder.rs create mode 100644 crates/testgen/Cargo.toml create mode 100644 crates/testgen/README.md create mode 100644 crates/testgen/src/lib.rs create mode 100644 crates/testgen/src/testgen.rs rename crates/{node/src/cbor => testgen/src}/tests.rs (73%) diff --git a/Cargo.lock b/Cargo.lock index 26c0d8ed..e4677341 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -706,6 +706,22 @@ dependencies = [ "tracing", ] +[[package]] +name = "blockfrost-platform-error-decoder" +version = "0.0.3-rc.3" +dependencies = [ + "blockfrost-platform-build-utils", + "blockfrost-platform-common", + "blockfrost-platform-testgen", + "hex", + "pallas-codec", + "pallas-hardano", + "pallas-network", + "serde_json", + "sysinfo", + "tokio", +] + [[package]] name = "blockfrost-platform-integration-tests" version = "1.0.0-rc.2" @@ -748,11 +764,9 @@ dependencies = [ "blockfrost-platform-build-utils", "blockfrost-platform-common", "chrono", - "crossbeam", "deadpool 0.13.0", "hex", "metrics", - "num_cpus", "pallas-codec", "pallas-crypto", "pallas-hardano", @@ -760,8 +774,20 @@ dependencies = [ "pallas-primitives", "pallas-traverse", "serde", + "tokio", + "tracing", +] + +[[package]] +name = "blockfrost-platform-testgen" +version = "0.0.3-rc.3" +dependencies = [ + "blockfrost-platform-common", + "crossbeam", + "hex", + "num_cpus", + "serde", "serde_json", - "sysinfo", "tokio", "tracing", ] diff --git a/Cargo.toml b/Cargo.toml index 7ae37401..47c02578 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,8 @@ members = [ "crates/api_provider", "crates/node", "crates/data_node", + "crates/testgen", + "crates/error_decoder", "crates/gateway", "crates/sdk_bridge", "crates/integration_tests", @@ -32,6 +34,7 @@ bf-build-utils = { path = "crates/build_utils", package = "blockfrost-platform-b bf-common = { path = "crates/common", package = "blockfrost-platform-common" } bf-data-node = { path = "crates/data_node", package = "blockfrost-platform-data-node" } bf-node = { path = "crates/node", package = "blockfrost-platform-node" } +bf-testgen = { path = "crates/testgen", package = "blockfrost-platform-testgen" } bip39 = "2.2.2" blockfrost = { version = "1.2.3", default-features = false, features = [ "native-tls", diff --git a/crates/build_utils/src/lib.rs b/crates/build_utils/src/lib.rs index 8438c445..c0dfb6a2 100644 --- a/crates/build_utils/src/lib.rs +++ b/crates/build_utils/src/lib.rs @@ -1,2 +1,3 @@ pub mod git; +pub mod target; pub mod testgen_hs; diff --git a/crates/build_utils/src/target.rs b/crates/build_utils/src/target.rs new file mode 100644 index 00000000..ebb04563 --- /dev/null +++ b/crates/build_utils/src/target.rs @@ -0,0 +1,21 @@ +pub fn os() -> &'static str { + (if cfg!(target_os = "macos") { + "darwin" + } else if cfg!(target_os = "linux") { + "linux" + } else if cfg!(target_os = "windows") { + "windows" + } else { + panic!("Unsupported OS"); + }) as _ +} + +pub fn arch() -> &'static str { + (if cfg!(target_arch = "x86_64") { + "x86_64" + } else if cfg!(target_arch = "aarch64") { + "aarch64" + } else { + panic!("Unsupported architecture"); + }) as _ +} diff --git a/crates/build_utils/src/testgen_hs.rs b/crates/build_utils/src/testgen_hs.rs index 714aafac..6c240753 100644 --- a/crates/build_utils/src/testgen_hs.rs +++ b/crates/build_utils/src/testgen_hs.rs @@ -16,25 +16,10 @@ pub fn ensure() { return; } - let testgen_lib_version = "10.6.3.0"; - - let target_os = if cfg!(target_os = "macos") { - "darwin" - } else if cfg!(target_os = "linux") { - "linux" - } else if cfg!(target_os = "windows") { - "windows" - } else { - panic!("Unsupported OS"); - }; + let testgen_lib_version = "10.6.3.1"; - let arch = if cfg!(target_arch = "x86_64") { - "x86_64" - } else if cfg!(target_arch = "aarch64") { - "aarch64" - } else { - panic!("Unsupported architecture"); - }; + let target_os = super::target::os(); + let arch = super::target::arch(); let suffix = if target_os == "windows" { ".zip" @@ -44,7 +29,7 @@ pub fn ensure() { let file_name = format!("testgen-hs-{testgen_lib_version}-{arch}-{target_os}"); let download_url = format!( - "https://github.com/input-output-hk/testgen-hs/releases/download/{testgen_lib_version}/{file_name}{suffix}" + "https://github.com/blockfrost/testgen-hs/releases/download/{testgen_lib_version}/{file_name}{suffix}" ); println!("Looking for {file_name}"); diff --git a/crates/error_decoder/Cargo.toml b/crates/error_decoder/Cargo.toml new file mode 100644 index 00000000..b6c6733d --- /dev/null +++ b/crates/error_decoder/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "blockfrost-platform-error-decoder" +version.workspace = true +license.workspace = true +edition.workspace = true +build = "build.rs" + +[dependencies] +bf-common.workspace = true +bf-testgen.workspace = true +hex.workspace = true +sysinfo.workspace = true +serde_json.workspace = true +tokio.workspace = true +pallas-network.workspace = true +pallas-hardano.workspace = true +pallas-codec.workspace = true + +[features] +tarpaulin = [] + +[build-dependencies] +bf-build-utils.workspace = true + +[dev-dependencies] +bf-testgen = { workspace = true, features = ["test-utils"] } diff --git a/crates/error_decoder/README.md b/crates/error_decoder/README.md new file mode 100644 index 00000000..e1359ace --- /dev/null +++ b/crates/error_decoder/README.md @@ -0,0 +1,5 @@ +# error decoder + +This package contains external error decoder for tx submit errors. Historically used to decode and generate error messages that are returned from our API. +Now it's only used to test `pallas-hardano` based implementation and generate random test cases. `pallas-hardano` vs ledger error generating tests are also in this crate. +Related `pallas-hardano` part initially implemented in this repository and later merged into pallas. diff --git a/crates/error_decoder/build.rs b/crates/error_decoder/build.rs new file mode 100644 index 00000000..aeb1456e --- /dev/null +++ b/crates/error_decoder/build.rs @@ -0,0 +1,3 @@ +fn main() { + bf_build_utils::testgen_hs::ensure(); +} diff --git a/crates/error_decoder/src/external.rs b/crates/error_decoder/src/external.rs new file mode 100644 index 00000000..558cb21a --- /dev/null +++ b/crates/error_decoder/src/external.rs @@ -0,0 +1,113 @@ +use bf_common::errors::AppError; +use bf_testgen::testgen::{Testgen, TestgenResponse}; + +#[derive(Clone)] +pub struct ExternalDecoder { + testgen: Testgen, +} +impl ExternalDecoder { + pub fn spawn() -> Result { + let testgen = Testgen::spawn("deserialize-stream") + .map_err(|err| AppError::Server(format!("Failed to spawn ExternalDecoder: {err}")))?; + + Ok(Self { testgen }) + } + + pub async fn decode(&self, input: &[u8]) -> Result { + match self.testgen.decode(input).await { + Ok(resp) => match resp { + TestgenResponse::Ok(value) => Ok(value), + TestgenResponse::Err(err) => Err(err.to_string()), + }, + Err(err) => Err(err), + } + } + + /// This function is called at startup, so that we make sure that the worker is reasonable. + pub async fn startup_sanity_test(&self) -> Result<(), String> { + let input = hex::decode("8182068182028200a0").map_err(|err| err.to_string())?; + let result = self.decode(&input).await; + let expected = serde_json::json!({ + "contents": { + "contents": { + "contents": { + "era": "ShelleyBasedEraConway", + "error": [ + "ConwayCertsFailure (WithdrawalsNotInRewardsCERTS (Withdrawals {unWithdrawals = fromList []}))" + ], + "kind": "ShelleyTxValidationError" + }, + "tag": "TxValidationErrorInCardanoMode" + }, + "tag": "TxCmdTxSubmitValidationError" + }, + "tag": "TxSubmitFail" + }); + + if result == Ok(expected) { + Ok(()) + } else { + Err(format!( + "ExternalDecoder: startup_sanity_test failed: {result:?}" + )) + } + } + + /// A single global [`ExternalDecoder`] that you can cheaply use in tests. + #[cfg(all(test, not(feature = "tarpaulin")))] + pub fn instance() -> Self { + GLOBAL_INSTANCE.clone() + } +} + +#[cfg(all(test, not(feature = "tarpaulin")))] +static GLOBAL_INSTANCE: std::sync::LazyLock = + std::sync::LazyLock::new(|| ExternalDecoder::spawn().expect("Failed to spawn ExternalDecoder")); + +#[cfg(test)] +mod tests { + // The CBOR test cases are covered by `crates/error_decoder/src/tests/specific.rs`, + // which already cross-validates the Rust (pallas-hardano) implementation against the + // Haskell (testgen-hs) external decoder. This module only contains tests that are + // unique to the external decoder itself (e.g. crash-recovery behaviour). + #[cfg(not(feature = "tarpaulin"))] + use super::*; + + #[tokio::test] + //#[tracing_test::traced_test] + #[cfg(not(feature = "tarpaulin"))] + async fn test_sanity() { + let decoder = ExternalDecoder::spawn().unwrap(); + + // Wait for it to come up: + decoder.startup_sanity_test().await.unwrap(); + + // Now, kill our child to test the restart logic: + sysinfo::System::new_all() + .process(sysinfo::Pid::from_u32(decoder.testgen.child_pid().unwrap())) + .unwrap() + .kill(); + + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + let input = hex::decode("8182068183051a000c275b1a000b35ec").unwrap(); + let result = decoder.decode(&input).await; + + assert_eq!( + result, + Ok(serde_json::json!({"contents": + {"contents": + {"contents": + {"era": "ShelleyBasedEraConway", "error": + ["ConwayTreasuryValueMismatch (Mismatch {mismatchSupplied = Coin 734700, mismatchExpected = Coin 796507})"], + "kind": "ShelleyTxValidationError" + }, + "tag": "TxValidationErrorInCardanoMode" + }, "tag": "TxCmdTxSubmitValidationError" + }, + "tag": "TxSubmitFail" + } + )) + ); + } +} diff --git a/crates/error_decoder/src/lib.rs b/crates/error_decoder/src/lib.rs new file mode 100644 index 00000000..6d9999bd --- /dev/null +++ b/crates/error_decoder/src/lib.rs @@ -0,0 +1,3 @@ +pub mod external; +#[cfg(test)] +mod tests; diff --git a/crates/error_decoder/src/tests.rs b/crates/error_decoder/src/tests.rs new file mode 100644 index 00000000..6e0943e0 --- /dev/null +++ b/crates/error_decoder/src/tests.rs @@ -0,0 +1,4 @@ +#[cfg(test)] +mod random; +#[cfg(test)] +pub(crate) mod specific; diff --git a/crates/node/src/cbor/tests/random.rs b/crates/error_decoder/src/tests/random.rs similarity index 95% rename from crates/node/src/cbor/tests/random.rs rename to crates/error_decoder/src/tests/random.rs index 79d36779..c2b60926 100644 --- a/crates/node/src/cbor/tests/random.rs +++ b/crates/error_decoder/src/tests/random.rs @@ -1,4 +1,8 @@ -use super::*; +#![cfg(not(feature = "tarpaulin"))] + +use bf_testgen::tests::{CaseType, check_generated_cases}; +use pallas_hardano::display::haskell_error::serialize_error; +use pallas_network::miniprotocols::localtxsubmission::TxValidationError; #[test] #[allow(non_snake_case)] diff --git a/crates/node/src/cbor/tests/specific.rs b/crates/error_decoder/src/tests/specific.rs similarity index 99% rename from crates/node/src/cbor/tests/specific.rs rename to crates/error_decoder/src/tests/specific.rs index f9c312c2..952cf993 100644 --- a/crates/node/src/cbor/tests/specific.rs +++ b/crates/error_decoder/src/tests/specific.rs @@ -18,7 +18,80 @@ done ``` */ -use super::verify_one; +#[cfg(test)] +use pallas_network::miniprotocols::localtxsubmission::TxValidationError; + +/// This function takes a CBOR-encoded `ApplyTxErr`, and verifies our +/// deserializer against the Haskell one. Use it for specific cases. +/// +/// Under `tarpaulin`, the external Haskell decoder (`testgen-hs`) is not +/// available, so we only exercise the Rust decode + serialize path (which is +/// what we want coverage for anyway). +#[cfg(test)] +pub(crate) async fn verify_one(cbor: &str) { + use pallas_hardano::display::haskell_error::serialize_error; + + let cbor = hex::decode(cbor).unwrap(); + + let our_decoding = decode_error(&cbor); + let our_json = serialize_error(our_decoding).expect("Failed to serialize error"); + + #[cfg(not(feature = "tarpaulin"))] + { + use crate::external::ExternalDecoder; + + let reference_json = match ExternalDecoder::instance().decode(&cbor).await { + Ok(value) => value, + Err(shared_decoder_err) => { + // Recover from a poisoned shared decoder process by retrying with a fresh one. + let fresh_decoder = ExternalDecoder::spawn() + .expect("Failed to spawn a fresh ExternalDecoder for retry"); + fresh_decoder.decode(&cbor).await.unwrap_or_else(|fresh_decoder_err| { + panic!( + "Failed to decode reference JSON with both shared and fresh ExternalDecoder instances. shared_error={shared_decoder_err}, fresh_error={fresh_decoder_err}, cbor={}", + hex::encode(&cbor) + ) + }) + }, + }; + + assert_json_eq!(reference_json, our_json); + } + + // Under tarpaulin: just assert the Rust decoder didn't panic and produced valid JSON. + #[cfg(feature = "tarpaulin")] + { + let _ = our_json; + } +} +#[cfg(test)] +fn decode_error(bytes: &[u8]) -> TxValidationError { + use pallas_codec::minicbor; + + let mut decoder = minicbor::Decoder::new(bytes); + decoder.decode().unwrap() +} + +#[cfg(all(test, not(feature = "tarpaulin")))] +macro_rules! assert_json_eq { + ($left:expr, $right:expr) => { + if $left != $right { + let left_pretty = serde_json::to_string_pretty(&$left).unwrap(); + let right_pretty = serde_json::to_string_pretty(&$right).unwrap(); + panic!( + concat!( + "assertion `left == right` failed\n", + " left:\n {}\n right:\n {}", + ), + left_pretty.replace("\n", "\n "), + right_pretty.replace("\n", "\n "), + ); + } + }; +} + +#[cfg(all(test, not(feature = "tarpaulin")))] +pub(crate) use assert_json_eq; // export it #[tokio::test] #[allow(non_snake_case)] diff --git a/crates/node/Cargo.toml b/crates/node/Cargo.toml index 0c2d3476..b28e4919 100644 --- a/crates/node/Cargo.toml +++ b/crates/node/Cargo.toml @@ -9,14 +9,10 @@ build = "build.rs" bf-common.workspace = true serde.workspace = true tracing.workspace = true -crossbeam.workspace = true hex.workspace = true -serde_json.workspace = true tokio.workspace = true pallas-network.workspace = true deadpool.workspace = true -sysinfo.workspace = true -num_cpus.workspace = true metrics.workspace = true pallas-hardano.workspace = true pallas-crypto.workspace = true diff --git a/crates/node/src/cbor.rs b/crates/node/src/cbor.rs index 9cfdfb72..08103424 100644 --- a/crates/node/src/cbor.rs +++ b/crates/node/src/cbor.rs @@ -1,5 +1 @@ -pub mod fallback_decoder; - -#[cfg(test)] -pub mod tests; pub(crate) mod validation; diff --git a/crates/node/src/cbor/fallback_decoder.rs b/crates/node/src/cbor/fallback_decoder.rs deleted file mode 100644 index 24e2f488..00000000 --- a/crates/node/src/cbor/fallback_decoder.rs +++ /dev/null @@ -1,298 +0,0 @@ -use bf_common::errors::AppError; -use std::io::{BufRead, BufReader, Write}; -use std::process::{self as proc}; -use std::sync::{ - Arc, - atomic::{self, AtomicU32}, -}; -use std::thread; -use tokio::sync::{mpsc, oneshot}; -use tracing::{error, info}; - -#[derive(Clone)] -pub struct FallbackDecoder { - sender: mpsc::Sender, - current_child_pid: Arc, -} - -struct FDRequest { - cbor: Vec, - response_tx: oneshot::Sender>, -} - -impl FallbackDecoder { - /// Starts a new child process. - pub fn spawn() -> Result { - let testgen_hs_path = Self::find_testgen_hs().map_err(AppError::Server)?; - - info!( - "Using {} as a fallback CBOR error decoder", - &testgen_hs_path - ); - - let current_child_pid = Arc::new(AtomicU32::new(u32::MAX)); - let current_child_pid_clone = current_child_pid.clone(); - let (sender, mut receiver) = mpsc::channel::(128); - - // Clone `testgen_hs_path` for the thread. - let testgen_hs_path_for_thread = testgen_hs_path.clone(); - - thread::spawn(move || { - // For retries: - let mut last_unfulfilled_request: Option = None; - - loop { - let single_run = Self::spawn_child( - &testgen_hs_path_for_thread, - &mut receiver, - &mut last_unfulfilled_request, - ¤t_child_pid_clone, - ); - let restart_delay = std::time::Duration::from_secs(1); - error!( - "will restart in {:?} because of a subprocess error: {:?}", - restart_delay, single_run - ); - std::thread::sleep(restart_delay); - } - }); - - Ok(Self { - sender, - current_child_pid, - }) - } - - /// Decodes a CBOR error using the child process. - pub async fn decode(&self, cbor: &[u8]) -> Result { - let (response_tx, response_rx) = oneshot::channel(); - self.sender - .send(FDRequest { - cbor: cbor.to_vec(), - response_tx, - }) - .await - .map_err(|err| format!("failed to send request: {err:?}"))?; - - response_rx - .await - .unwrap_or_else(|err| unreachable!("worker thread dropped (can’t happen): {:?}", err)) - } - - /// Searches for `testgen-hs` in multiple directories. - pub fn find_testgen_hs() -> Result { - bf_common::find_libexec::find_libexec("testgen-hs", "TESTGEN_HS_PATH", &["--version"]) - } - - /// This function is called at startup, so that we make sure that the worker is reasonable. - pub async fn startup_sanity_test(&self) -> Result<(), String> { - let input = hex::decode("8182068182028200a0").map_err(|err| err.to_string())?; - let result = self.decode(&input).await; - let expected = serde_json::json!({ - "contents": { - "contents": { - "contents": { - "era": "ShelleyBasedEraConway", - "error": [ - "ConwayCertsFailure (WithdrawalsNotInRewardsCERTS (Withdrawals {unWithdrawals = fromList []}))" - ], - "kind": "ShelleyTxValidationError" - }, - "tag": "TxValidationErrorInCardanoMode" - }, - "tag": "TxCmdTxSubmitValidationError" - }, - "tag": "TxSubmitFail" - }); - - if result == Ok(expected) { - Ok(()) - } else { - Err(format!("startup_sanity_test failed: {result:?}")) - } - } - - /// Returns the current child PID: - pub fn child_pid(&self) -> Option { - match self.current_child_pid.load(atomic::Ordering::Relaxed) { - u32::MAX => None, - pid => Some(pid), - } - } - - #[cfg(test)] - /// A single global [`FallbackDecoder`] that you can cheaply use in tests. - pub fn instance() -> Self { - GLOBAL_INSTANCE.clone() - } - - fn spawn_child( - testgen_hs_path: &str, - receiver: &mut mpsc::Receiver, - last_unfulfilled_request: &mut Option, - current_child_pid: &Arc, - ) -> Result<(), String> { - let mut child = proc::Command::new(testgen_hs_path) - .arg("deserialize-stream") - .stdin(proc::Stdio::piped()) - .stdout(proc::Stdio::piped()) - .spawn() - .map_err(|err| format!("couldn’t start the child: {err:?}"))?; - - current_child_pid.store(child.id(), atomic::Ordering::Relaxed); - - let result = Self::process_requests(&mut child, receiver, last_unfulfilled_request); - - // Let’s make sure it’s dead in case a different error landed us here. - // Will return Ok(()) if already dead. - child - .kill() - .map_err(|err| format!("couldn’t kill the child: {err:?}"))?; - child - .wait() - .map_err(|err| format!("couldn’t reap the child: {err:?}"))?; - - result - } - - fn process_requests( - child: &mut proc::Child, - receiver: &mut mpsc::Receiver, - last_unfulfilled_request: &mut Option, - ) -> Result<(), String> { - let stdin = child - .stdin - .as_mut() - .ok_or("couldn’t grab stdin".to_string())?; - let stdout = child - .stdout - .as_mut() - .ok_or("couldn’t grab stdout".to_string())?; - let stdout_reader = BufReader::new(stdout); - let mut stdout_lines = stdout_reader.lines(); - - while let Some((request, is_a_retry)) = last_unfulfilled_request - .take() - .map(|a| (a, true)) - .or_else(|| receiver.blocking_recv().map(|a| (a, false))) - { - let cbor_hex = hex::encode(&request.cbor); - *last_unfulfilled_request = Some(request); - - let mut ask_and_receive = || -> Result, String> { - writeln!(stdin, "{cbor_hex}") - .map_err(|err| format!("couldn’t write to stdin: {err:?}"))?; - - match stdout_lines.next() { - Some(Ok(line)) => Ok(Self::parse_json(&line)), - Some(Err(e)) => Err(format!("failed to read from subprocess: {e}")), - None => Err("no output from subprocess".to_string()), - } - }; - - // Split the result to satisfy the borrow checker: - let (result_for_response, result_for_logs) = partition_result(ask_and_receive()); - - // We want to respond to the user with a failure in case this was a retry. - // Otherwise, it’s an infinite loop and wait time for the response. - if is_a_retry || result_for_response.is_ok() { - // unwrap is safe, we wrote there right before the writeln!() - let request = last_unfulfilled_request.take().unwrap(); - - let response = match result_for_response { - Ok(ok) => ok, - Err(_) => Err("repeated internal failure".to_string()), - }; - - // unwrap is safe, the other side would have to drop for a - // panic – can’t happen, because we control it: - request - .response_tx - .send(response) - .unwrap_or_else(|_| unreachable!()); - } - - // Now break the loop, and restart everything if we failed: - result_for_logs? - } - - Err("request channel closed, won’t happen".to_string()) - } - - fn parse_json(input: &str) -> Result { - let mut parsed: serde_json::Value = - serde_json::from_str(input).map_err(|e| e.to_string())?; - - parsed - .as_object() - .and_then(|obj| { - if obj.len() == 1 { - obj.get("error") - .and_then(|v| v.as_str()) - .map(|s| Err(s.to_string())) - } else { - None - } - }) - .unwrap_or_else(|| { - parsed - .get_mut("json") - .map(serde_json::Value::take) - .ok_or_else(|| "Missing 'json' field".to_string()) - }) - } -} - -fn partition_result(ae: Result) -> (Result, Result<(), E>) { - match ae { - Err(err) => (Err(()), Err(err)), - Ok(ok) => (Ok(ok), Ok(())), - } -} - -#[cfg(test)] -static GLOBAL_INSTANCE: std::sync::LazyLock = - std::sync::LazyLock::new(|| FallbackDecoder::spawn().expect("Failed to spawn FallbackDecoder")); - -#[cfg(test)] -mod tests { - #[cfg(not(feature = "tarpaulin"))] - use super::*; - #[tokio::test] - //#[tracing_test::traced_test] - #[cfg(not(feature = "tarpaulin"))] - async fn test_fallback_decoder() { - let decoder = FallbackDecoder::spawn().unwrap(); - - // Wait for it to come up: - decoder.startup_sanity_test().await.unwrap(); - - // Now, kill our child to test the restart logic: - sysinfo::System::new_all() - .process(sysinfo::Pid::from_u32(decoder.child_pid().unwrap())) - .unwrap() - .kill(); - - tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; - - let input = hex::decode("8182068183051a000c275b1a000b35ec").unwrap(); - let result = decoder.decode(&input).await; - - assert_eq!( - result, - Ok(serde_json::json!({"contents": - {"contents": - {"contents": - {"era": "ShelleyBasedEraConway", "error": - ["ConwayTreasuryValueMismatch (Mismatch {mismatchSupplied = Coin 734700, mismatchExpected = Coin 796507})"], - "kind": "ShelleyTxValidationError" - }, - "tag": "TxValidationErrorInCardanoMode" - }, "tag": "TxCmdTxSubmitValidationError" - }, - "tag": "TxSubmitFail" - } - )) - ); - } -} diff --git a/crates/testgen/Cargo.toml b/crates/testgen/Cargo.toml new file mode 100644 index 00000000..7d7d4a16 --- /dev/null +++ b/crates/testgen/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "blockfrost-platform-testgen" +version.workspace = true +license.workspace = true +edition.workspace = true + +[dependencies] +bf-common.workspace = true +serde.workspace = true +tracing.workspace = true +hex.workspace = true +serde_json.workspace = true +tokio.workspace = true +num_cpus.workspace = true +crossbeam.workspace = true + +[features] +tarpaulin = [] +test-utils = [] diff --git a/crates/testgen/README.md b/crates/testgen/README.md new file mode 100644 index 00000000..1dbba396 --- /dev/null +++ b/crates/testgen/README.md @@ -0,0 +1,3 @@ +# testgen + +Base implementation for communicating with the Haskell binary, testgen. diff --git a/crates/testgen/src/lib.rs b/crates/testgen/src/lib.rs new file mode 100644 index 00000000..1dbf4645 --- /dev/null +++ b/crates/testgen/src/lib.rs @@ -0,0 +1,3 @@ +pub mod testgen; +#[cfg(feature = "test-utils")] +pub mod tests; diff --git a/crates/testgen/src/testgen.rs b/crates/testgen/src/testgen.rs new file mode 100644 index 00000000..78175dec --- /dev/null +++ b/crates/testgen/src/testgen.rs @@ -0,0 +1,369 @@ +use bf_common::errors::AppError; +use serde::Deserialize; +use serde::de; +use std::io::{BufRead, BufReader, Write}; +use std::path::{Path, PathBuf}; +use std::process::{self as proc, Command}; +use std::sync::{ + Arc, + atomic::{self, AtomicU32}, +}; +use std::{env, thread}; +use tokio::sync::{mpsc, oneshot}; +use tracing::{debug, error, info, warn}; + +#[derive(Clone)] +pub struct Testgen { + sender: mpsc::Sender, + current_child_pid: Arc, +} + +pub struct TestgenRequest { + payload: String, + response: oneshot::Sender>, +} + +#[derive(Debug)] +pub enum TestgenResponse { + Ok(serde_json::Value), + Err(serde_json::Value), +} + +#[derive(Deserialize)] +struct TestgenResponseWire { + #[serde(default)] + json: Option, + #[serde(default)] + error: Option, +} + +impl<'de> Deserialize<'de> for TestgenResponse { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let wire = TestgenResponseWire::deserialize(deserializer)?; + match (wire.json, wire.error) { + (Some(json), Some(err)) => { + warn!( + "testgen-hs response has both `json` and `error` fields, discarding error: {}", + err + ); + Ok(Self::Ok(json)) + }, + (Some(json), None) => Ok(Self::Ok(json)), + (None, Some(error)) => Ok(Self::Err(error)), + (None, None) => Err(de::Error::custom( + "invalid testgen-hs response: missing both `json` and `error`", + )), + } + } +} +/// Testgen is an executable that we use to run some functionality that are readily/easily available +/// in Haskell codebase like the ledger. +/// The name is 'testgen' since it was initially implemented to generate test cases. +impl Testgen { + /// Starts a new child process. + pub fn spawn(variant: &str) -> Result { + Self::spawn_inner(variant, None) + } + + /// Starts a new child process with an init payload that is sent as the first + /// message on every (re)start. The response to the init payload is validated + /// but not returned — it is only used to confirm the subprocess is ready. + pub fn spawn_with_init(variant: &str, init_payload: String) -> Result { + Self::spawn_inner(variant, Some(init_payload)) + } + + fn spawn_inner(variant: &str, init_payload: Option) -> Result { + let testgen_hs_path = Self::find_testgen_hs().map_err(AppError::Server)?; + + info!( + "Spawning testgen-hs ({}) from {}", + variant, &testgen_hs_path + ); + + let current_child_pid = Arc::new(AtomicU32::new(u32::MAX)); + let current_child_pid_clone = current_child_pid.clone(); + let variant_clone = variant.to_string(); + let (sender, mut receiver) = mpsc::channel::(128); + + // Clone `testgen_hs_path` for the thread. + let testgen_hs_path_for_thread = testgen_hs_path.clone(); + + thread::spawn(move || { + // For retries: + let mut last_unfulfilled_request: Option = None; + + loop { + let single_run = Self::spawn_child( + &testgen_hs_path_for_thread, + &mut receiver, + &mut last_unfulfilled_request, + ¤t_child_pid_clone, + &variant_clone, + init_payload.as_deref(), + ); + + // If all senders have been dropped (Testgen handle was dropped), + // no one will send requests — exit the worker thread cleanly + // instead of spinning in an infinite restart loop. + // Only check when there's no pending retry to avoid overwriting it + if last_unfulfilled_request.is_none() { + match receiver.try_recv() { + Err(mpsc::error::TryRecvError::Disconnected) => { + info!("Testgen: all senders dropped, shutting down background worker"); + break; + }, + Ok(req) => { + last_unfulfilled_request = Some(req); + }, + Err(mpsc::error::TryRecvError::Empty) => {}, + } + } + + let restart_delay = std::time::Duration::from_secs(1); + error!( + "Testgen: will restart in {restart_delay:?} because of a subprocess error: {}", + single_run.err().unwrap_or_else(|| "unknown".to_string()) + ); + std::thread::sleep(restart_delay); + } + }); + + Ok(Self { + sender, + current_child_pid, + }) + } + + /// Sends the payload to the child process. + pub async fn decode(&self, cbor: &[u8]) -> Result { + self.send(hex::encode(cbor)).await + } + + /// Sends the payload to the child process. + pub async fn send(&self, payload: String) -> Result { + let (response, response_rx) = oneshot::channel(); + + self.sender + .send(TestgenRequest { payload, response }) + .await + .map_err(|err| format!("Testgen: failed to send request: {err:?}"))?; + + tokio::time::timeout(std::time::Duration::from_secs(30), response_rx) + .await + .map_err(|_| "Testgen: request timed out after 30s".to_string())? + .map_err(|err| format!("Testgen: worker thread dropped: {err:?}"))? + } + + /// Searches for `testgen-hs` in multiple directories. + pub fn find_testgen_hs() -> Result { + let exe_name = if cfg!(target_os = "windows") { + "testgen-hs.exe" + } else { + "testgen-hs" + }; + + let mut search_paths: Vec = Vec::new(); + + if let Ok(path) = env::var("TESTGEN_HS_PATH") { + search_paths.push(PathBuf::from(path)); + } + + if let Some(path) = option_env!("TESTGEN_HS_PATH") { + search_paths.push(PathBuf::from(path)); + } + + // This is the most important one for relocatable directories (that keep the initial + // structure) on Windows, Linux, macOS. + if let Ok(current_exe) = env::current_exe() + && let Ok(current_exe) = std::fs::canonicalize(current_exe) + && let Some(exe_dir) = current_exe.parent() + { + search_paths.push(exe_dir.join(exe_name)); + + // build_utils::testgen_hs::ensure extracts to target/{debug|release}/testgen-hs/. + search_paths.push(exe_dir.join("testgen-hs").join(exe_name)); + + if let Some(profile_dir) = exe_dir.parent() { + search_paths.push(profile_dir.join("testgen-hs").join(exe_name)); + } + } + + // Docker image fallback. + search_paths.push(PathBuf::from("/app/testgen-hs")); + + // System PATH lookup. + search_paths.extend( + env::var("PATH") + .map(|p| { + env::split_paths(&p) + .map(|dir| dir.join(exe_name)) + .collect::>() + }) + .unwrap_or_default(), + ); + + debug!("{} search paths = {:?}", exe_name, search_paths); + + // Checks if the path is runnable. Adjust for platform specifics if needed. + // TODO: check that the --version matches what we expect. + fn is_our_executable(path: &Path) -> bool { + Command::new(path).arg("--version").output().is_ok() + } + + // Look in each candidate path to find a matching executable. + for candidate in &search_paths { + let path = if candidate.file_name().is_some_and(|name| name == exe_name) { + candidate.clone() + } else { + candidate.join(exe_name) + }; + + if path.is_file() && is_our_executable(path.as_path()) { + return Ok(path.to_string_lossy().to_string()); + } + } + + Err(format!( + "No valid `{}` binary found in {:?}.", + exe_name, &search_paths + )) + } + + /// Returns the current child PID: + pub fn child_pid(&self) -> Option { + match self.current_child_pid.load(atomic::Ordering::Relaxed) { + u32::MAX => None, + pid => Some(pid), + } + } + + fn spawn_child( + testgen_hs_path: &str, + receiver: &mut mpsc::Receiver, + last_unfulfilled_request: &mut Option, + current_child_pid: &Arc, + variant: &str, + init_payload: Option<&str>, + ) -> Result<(), String> { + let mut child = proc::Command::new(testgen_hs_path) + .arg(variant) + .stdin(proc::Stdio::piped()) + .stdout(proc::Stdio::piped()) + .spawn() + .map_err(|err| format!("couldn’t start the child: {err:?}"))?; + + current_child_pid.store(child.id(), atomic::Ordering::Relaxed); + + let result = Self::process_requests( + &mut child, + receiver, + last_unfulfilled_request, + init_payload, + variant, + ); + + // Let’s make sure it’s dead in case a different error landed us here. + // Will return Ok(()) if already dead. + child + .kill() + .map_err(|err| format!("couldn’t kill the child: {err:?}"))?; + child + .wait() + .map_err(|err| format!("couldn’t reap the child: {err:?}"))?; + + result + } + + fn process_requests( + child: &mut proc::Child, + receiver: &mut mpsc::Receiver, + last_unfulfilled_request: &mut Option, + init_payload: Option<&str>, + variant: &str, + ) -> Result<(), String> { + let stdin = child + .stdin + .as_mut() + .ok_or("couldn’t grab stdin".to_string())?; + let stdout = child + .stdout + .as_mut() + .ok_or("couldn’t grab stdout".to_string())?; + let stdout_reader = BufReader::new(stdout); + let mut stdout_lines = stdout_reader.lines(); + + // Send init payload before processing any requests. + if let Some(payload) = init_payload { + writeln!(stdin, "{payload}") + .map_err(|err| format!("couldn’t write init payload: {err:?}"))?; + match stdout_lines.next() { + Some(Ok(line)) => { + let resp: TestgenResponse = serde_json::from_str(&line) + .map_err(|e| format!("init response parse error: {e}"))?; + match resp { + TestgenResponse::Ok(_) => { + info!("subprocess ({variant}) initialized successfully") + }, + TestgenResponse::Err(e) => return Err(format!("init failed: {e}")), + } + }, + Some(Err(e)) => return Err(format!("init read error: {e}")), + None => return Err("no init response from subprocess".to_string()), + } + } + + while let Some((request, is_a_retry)) = last_unfulfilled_request + .take() + .map(|a| (a, true)) + .or_else(|| receiver.blocking_recv().map(|a| (a, false))) + { + let payload = request.payload.clone(); + *last_unfulfilled_request = Some(request); + + let mut ask_and_receive = || -> Result, String> { + writeln!(stdin, "{payload}") + .map_err(|err| format!("couldn’t write to stdin: {err:?}"))?; + + match stdout_lines.next() { + Some(Ok(line)) => Ok(Ok(serde_json::from_str::(&line) + .map_err(|e| e.to_string())?)), + + Some(Err(e)) => Err(format!("failed to read from subprocess: {e}")), + None => Err("no output from subprocess".to_string()), + } + }; + + // Split the result to satisfy the borrow checker: + let (result_for_response, result_for_logs) = partition_result(ask_and_receive()); + + // We want to respond to the user with a failure in case this was a retry. + // Otherwise, it’s an infinite loop and wait time for the response. + if is_a_retry || result_for_response.is_ok() { + // unwrap is safe, we wrote there right before the writeln!() + let request = last_unfulfilled_request.take().unwrap(); + + let response = match result_for_response { + Ok(ok) => ok, + Err(_) => Err("repeated internal failure".to_string()), + }; + + let _ = request.response.send(response); + } + + // Now break the loop, and restart everything if we failed: + result_for_logs? + } + + Err("request channel closed".to_string()) + } +} + +fn partition_result(ae: Result) -> (Result, Result<(), E>) { + match ae { + Err(err) => (Err(()), Err(err)), + Ok(ok) => (Ok(ok), Ok(())), + } +} diff --git a/crates/node/src/cbor/tests.rs b/crates/testgen/src/tests.rs similarity index 73% rename from crates/node/src/cbor/tests.rs rename to crates/testgen/src/tests.rs index 1001e682..3deadfa4 100644 --- a/crates/node/src/cbor/tests.rs +++ b/crates/testgen/src/tests.rs @@ -1,18 +1,9 @@ #[cfg(not(feature = "tarpaulin"))] -use super::fallback_decoder::FallbackDecoder; -#[cfg(not(feature = "tarpaulin"))] use num_cpus; -#[cfg(not(feature = "tarpaulin"))] -use pallas_hardano::display::haskell_error::serialize_error; use serde::Deserialize; use std::process::Command; -#[cfg(not(feature = "tarpaulin"))] -mod random; -#[cfg(not(feature = "tarpaulin"))] -mod specific; - #[derive(Deserialize, Debug)] #[serde(rename_all(deserialize = "camelCase"))] pub struct CborTestCase { @@ -30,6 +21,7 @@ pub struct CborTestSeed { #[derive(Debug, Clone, Copy)] #[allow(non_camel_case_types, dead_code)] pub enum CaseType { + Tx_Conway, ApplyTxErr_Byron, ApplyTxErr_Shelley, ApplyTxErr_Allegra, @@ -58,7 +50,7 @@ pub fn check_generated_cases( { use std::io::{BufRead, BufReader}; - let child_exe = super::fallback_decoder::FallbackDecoder::find_testgen_hs().unwrap(); + let child_exe = crate::testgen::Testgen::find_testgen_hs().unwrap(); let mut child = Command::new(&child_exe) .arg("generate") @@ -164,44 +156,3 @@ pub fn check_generated_cases( } }); } -#[cfg(not(feature = "tarpaulin"))] -macro_rules! assert_json_eq { - ($left:expr, $right:expr) => { - if $left != $right { - let left_pretty = serde_json::to_string_pretty(&$left).unwrap(); - let right_pretty = serde_json::to_string_pretty(&$right).unwrap(); - panic!( - concat!( - "assertion `left == right` failed\n", - " left:\n {}\n right:\n {}", - ), - left_pretty.replace("\n", "\n "), - right_pretty.replace("\n", "\n "), - ); - } - }; -} - -/// This function takes a CBOR-encoded `ApplyTxErr`, and verifies our -/// deserializer against the Haskell one. Use it for specific cases. -#[cfg(not(feature = "tarpaulin"))] -async fn verify_one(cbor: &str) { - let cbor = hex::decode(cbor).unwrap(); - let reference_json = FallbackDecoder::instance().decode(&cbor).await.unwrap(); - - let our_decoding = decode_error(&cbor); - - let our_json = serialize_error(our_decoding).unwrap(); - assert_json_eq!(reference_json, our_json) -} -#[cfg(not(feature = "tarpaulin"))] -use pallas_network::miniprotocols::localtxsubmission::TxValidationError; - -#[cfg(test)] -#[cfg(not(feature = "tarpaulin"))] -fn decode_error(cbor: &[u8]) -> TxValidationError { - use pallas_codec::minicbor; - - let mut decoder = minicbor::Decoder::new(cbor); - decoder.decode().unwrap() -} From 38662c6319d879796831c6a8a8bd96b1c59040fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C4=B1z=C4=B1r=20Sefa=20=C4=B0rken?= Date: Fri, 17 Apr 2026 16:38:11 +0300 Subject: [PATCH 03/10] chore: switch testgen-hs flake input owner to blockfrost --- flake.lock | 4 ++-- flake.nix | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/flake.lock b/flake.lock index 549eb101..4243866a 100644 --- a/flake.lock +++ b/flake.lock @@ -365,13 +365,13 @@ "locked": { "lastModified": 1776089820, "narHash": "sha256-Io/LO7dTDmRvVWLSNpxhH8fQcHcJeBrqviYgYnhgRiM=", - "owner": "input-output-hk", + "owner": "blockfrost", "repo": "testgen-hs", "rev": "358cd9124b62a9dc647a96687c272f540eb2f134", "type": "github" }, "original": { - "owner": "input-output-hk", + "owner": "blockfrost", "ref": "10.6.3.1", "repo": "testgen-hs", "type": "github" diff --git a/flake.nix b/flake.nix index 2027764c..eb231485 100644 --- a/flake.nix +++ b/flake.nix @@ -29,7 +29,7 @@ }; mithril.url = "github:input-output-hk/mithril/2524.0"; testgen-hs = { - url = "github:input-output-hk/testgen-hs/10.6.3.1"; # make sure it follows cardano-node + url = "github:blockfrost/testgen-hs/10.6.3.1"; # make sure it follows cardano-node flake = false; # otherwise, +2k dependencies we don’t really use }; hydra = { From a59da0b07966133e6fa32a297ea3438a0acdb34b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C4=B1z=C4=B1r=20Sefa=20=C4=B0rken?= Date: Fri, 17 Apr 2026 17:23:58 +0300 Subject: [PATCH 04/10] test: add coverage for TestgenResponse deserialization --- crates/testgen/src/testgen.rs | 48 +++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/crates/testgen/src/testgen.rs b/crates/testgen/src/testgen.rs index 78175dec..2aa6c420 100644 --- a/crates/testgen/src/testgen.rs +++ b/crates/testgen/src/testgen.rs @@ -367,3 +367,51 @@ fn partition_result(ae: Result) -> (Result, Result<(), E>) { Ok(ok) => (Ok(ok), Ok(())), } } + +#[cfg(test)] +mod tests { + use super::TestgenResponse; + use serde_json::json; + + fn parse(s: &str) -> Result { + serde_json::from_str(s) + } + + #[test] + fn only_json_field_is_ok() { + match parse(r#"{"json": {"value": 42}}"#).unwrap() { + TestgenResponse::Ok(v) => assert_eq!(v, json!({"value": 42})), + other => panic!("expected Ok, got {other:?}"), + } + } + + #[test] + fn only_error_field_is_err() { + match parse(r#"{"error": "boom"}"#).unwrap() { + TestgenResponse::Err(v) => assert_eq!(v, json!("boom")), + other => panic!("expected Err, got {other:?}"), + } + } + + #[test] + fn both_fields_prefer_json() { + match parse(r#"{"json": {"ok": true}, "error": "ignored"}"#).unwrap() { + TestgenResponse::Ok(v) => assert_eq!(v, json!({"ok": true})), + other => panic!("expected Ok, got {other:?}"), + } + } + + #[test] + fn neither_field_fails_deserialization() { + let err = parse(r#"{}"#).unwrap_err(); + assert!(err.to_string().contains("missing both")); + } + + #[test] + fn unknown_fields_are_ignored() { + match parse(r#"{"json": 1, "extra": "meta"}"#).unwrap() { + TestgenResponse::Ok(v) => assert_eq!(v, json!(1)), + other => panic!("expected Ok, got {other:?}"), + } + } +} From 75aee91f117aa1f9601f36d8923524df91193fdb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C4=B1z=C4=B1r=20Sefa=20=C4=B0rken?= Date: Fri, 17 Apr 2026 18:12:57 +0300 Subject: [PATCH 05/10] refactor(testgen): polish API types and extract constants --- Cargo.lock | 4 +- crates/build_utils/src/target.rs | 16 ++-- crates/error_decoder/src/external.rs | 4 +- crates/testgen/src/testgen.rs | 126 ++++++++++++++++----------- 4 files changed, 89 insertions(+), 61 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e4677341..ecc0f852 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -708,7 +708,7 @@ dependencies = [ [[package]] name = "blockfrost-platform-error-decoder" -version = "0.0.3-rc.3" +version = "1.0.0-rc.2" dependencies = [ "blockfrost-platform-build-utils", "blockfrost-platform-common", @@ -780,7 +780,7 @@ dependencies = [ [[package]] name = "blockfrost-platform-testgen" -version = "0.0.3-rc.3" +version = "1.0.0-rc.2" dependencies = [ "blockfrost-platform-common", "crossbeam", diff --git a/crates/build_utils/src/target.rs b/crates/build_utils/src/target.rs index ebb04563..d05eeda8 100644 --- a/crates/build_utils/src/target.rs +++ b/crates/build_utils/src/target.rs @@ -1,21 +1,21 @@ -pub fn os() -> &'static str { - (if cfg!(target_os = "macos") { +pub const fn os() -> &'static str { + if cfg!(target_os = "macos") { "darwin" } else if cfg!(target_os = "linux") { "linux" } else if cfg!(target_os = "windows") { "windows" } else { - panic!("Unsupported OS"); - }) as _ + panic!("Unsupported OS") + } } -pub fn arch() -> &'static str { - (if cfg!(target_arch = "x86_64") { +pub const fn arch() -> &'static str { + if cfg!(target_arch = "x86_64") { "x86_64" } else if cfg!(target_arch = "aarch64") { "aarch64" } else { - panic!("Unsupported architecture"); - }) as _ + panic!("Unsupported architecture") + } } diff --git a/crates/error_decoder/src/external.rs b/crates/error_decoder/src/external.rs index 558cb21a..08662458 100644 --- a/crates/error_decoder/src/external.rs +++ b/crates/error_decoder/src/external.rs @@ -1,5 +1,5 @@ use bf_common::errors::AppError; -use bf_testgen::testgen::{Testgen, TestgenResponse}; +use bf_testgen::testgen::{Testgen, TestgenResponse, Variant}; #[derive(Clone)] pub struct ExternalDecoder { @@ -7,7 +7,7 @@ pub struct ExternalDecoder { } impl ExternalDecoder { pub fn spawn() -> Result { - let testgen = Testgen::spawn("deserialize-stream") + let testgen = Testgen::spawn(Variant::DeserializeStream) .map_err(|err| AppError::Server(format!("Failed to spawn ExternalDecoder: {err}")))?; Ok(Self { testgen }) diff --git a/crates/testgen/src/testgen.rs b/crates/testgen/src/testgen.rs index 2aa6c420..925766d2 100644 --- a/crates/testgen/src/testgen.rs +++ b/crates/testgen/src/testgen.rs @@ -12,23 +12,53 @@ use std::{env, thread}; use tokio::sync::{mpsc, oneshot}; use tracing::{debug, error, info, warn}; +/// Handle to a long-running `testgen-hs` subprocess. Cheap to clone; +/// clones share one worker thread. #[derive(Clone)] pub struct Testgen { sender: mpsc::Sender, current_child_pid: Arc, } -pub struct TestgenRequest { +struct TestgenRequest { payload: String, response: oneshot::Sender>, } -#[derive(Debug)] +/// Which `testgen-hs` subcommand to run. +#[derive(Debug, Clone, Copy)] +#[non_exhaustive] +pub enum Variant { + DeserializeStream, +} + +impl Variant { + fn as_arg(self) -> &'static str { + match self { + Self::DeserializeStream => "deserialize-stream", + } + } +} + +impl std::fmt::Display for Variant { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(self.as_arg()) + } +} + +#[derive(Debug, PartialEq)] pub enum TestgenResponse { Ok(serde_json::Value), Err(serde_json::Value), } +const MISSING_BOTH_FIELDS_MSG: &str = + "invalid testgen-hs response: missing both `json` and `error`"; + +const REQUEST_CHANNEL_CAPACITY: usize = 128; +const RESTART_DELAY: std::time::Duration = std::time::Duration::from_secs(1); +const RESPONSE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30); + #[derive(Deserialize)] struct TestgenResponseWire { #[serde(default)] @@ -53,29 +83,25 @@ impl<'de> Deserialize<'de> for TestgenResponse { }, (Some(json), None) => Ok(Self::Ok(json)), (None, Some(error)) => Ok(Self::Err(error)), - (None, None) => Err(de::Error::custom( - "invalid testgen-hs response: missing both `json` and `error`", - )), + (None, None) => Err(de::Error::custom(MISSING_BOTH_FIELDS_MSG)), } } } -/// Testgen is an executable that we use to run some functionality that are readily/easily available -/// in Haskell codebase like the ledger. -/// The name is 'testgen' since it was initially implemented to generate test cases. + impl Testgen { /// Starts a new child process. - pub fn spawn(variant: &str) -> Result { + pub fn spawn(variant: Variant) -> Result { Self::spawn_inner(variant, None) } /// Starts a new child process with an init payload that is sent as the first /// message on every (re)start. The response to the init payload is validated /// but not returned — it is only used to confirm the subprocess is ready. - pub fn spawn_with_init(variant: &str, init_payload: String) -> Result { + pub fn spawn_with_init(variant: Variant, init_payload: String) -> Result { Self::spawn_inner(variant, Some(init_payload)) } - fn spawn_inner(variant: &str, init_payload: Option) -> Result { + fn spawn_inner(variant: Variant, init_payload: Option) -> Result { let testgen_hs_path = Self::find_testgen_hs().map_err(AppError::Server)?; info!( @@ -85,14 +111,10 @@ impl Testgen { let current_child_pid = Arc::new(AtomicU32::new(u32::MAX)); let current_child_pid_clone = current_child_pid.clone(); - let variant_clone = variant.to_string(); - let (sender, mut receiver) = mpsc::channel::(128); - - // Clone `testgen_hs_path` for the thread. + let (sender, mut receiver) = mpsc::channel::(REQUEST_CHANNEL_CAPACITY); let testgen_hs_path_for_thread = testgen_hs_path.clone(); thread::spawn(move || { - // For retries: let mut last_unfulfilled_request: Option = None; loop { @@ -101,14 +123,11 @@ impl Testgen { &mut receiver, &mut last_unfulfilled_request, ¤t_child_pid_clone, - &variant_clone, + variant, init_payload.as_deref(), ); - // If all senders have been dropped (Testgen handle was dropped), - // no one will send requests — exit the worker thread cleanly - // instead of spinning in an infinite restart loop. - // Only check when there's no pending retry to avoid overwriting it + // Exit if no work is pending and all senders are gone. if last_unfulfilled_request.is_none() { match receiver.try_recv() { Err(mpsc::error::TryRecvError::Disconnected) => { @@ -122,12 +141,17 @@ impl Testgen { } } - let restart_delay = std::time::Duration::from_secs(1); error!( - "Testgen: will restart in {restart_delay:?} because of a subprocess error: {}", + "Testgen: will restart in {RESTART_DELAY:?} because of a subprocess error: {}", single_run.err().unwrap_or_else(|| "unknown".to_string()) ); - std::thread::sleep(restart_delay); + std::thread::sleep(RESTART_DELAY); + + // Re-check after the delay: senders may have dropped while we slept. + if last_unfulfilled_request.is_none() && receiver.is_closed() { + info!("Testgen: all senders dropped during restart delay, shutting down"); + break; + } } }); @@ -151,9 +175,9 @@ impl Testgen { .await .map_err(|err| format!("Testgen: failed to send request: {err:?}"))?; - tokio::time::timeout(std::time::Duration::from_secs(30), response_rx) + tokio::time::timeout(RESPONSE_TIMEOUT, response_rx) .await - .map_err(|_| "Testgen: request timed out after 30s".to_string())? + .map_err(|_| format!("Testgen: request timed out after {RESPONSE_TIMEOUT:?}"))? .map_err(|err| format!("Testgen: worker thread dropped: {err:?}"))? } @@ -245,11 +269,11 @@ impl Testgen { receiver: &mut mpsc::Receiver, last_unfulfilled_request: &mut Option, current_child_pid: &Arc, - variant: &str, + variant: Variant, init_payload: Option<&str>, ) -> Result<(), String> { let mut child = proc::Command::new(testgen_hs_path) - .arg(variant) + .arg(variant.as_arg()) .stdin(proc::Stdio::piped()) .stdout(proc::Stdio::piped()) .spawn() @@ -265,8 +289,7 @@ impl Testgen { variant, ); - // Let’s make sure it’s dead in case a different error landed us here. - // Will return Ok(()) if already dead. + // Make sure the child is dead before returning. child .kill() .map_err(|err| format!("couldn’t kill the child: {err:?}"))?; @@ -282,7 +305,7 @@ impl Testgen { receiver: &mut mpsc::Receiver, last_unfulfilled_request: &mut Option, init_payload: Option<&str>, - variant: &str, + variant: Variant, ) -> Result<(), String> { let stdin = child .stdin @@ -370,7 +393,7 @@ fn partition_result(ae: Result) -> (Result, Result<(), E>) { #[cfg(test)] mod tests { - use super::TestgenResponse; + use super::{MISSING_BOTH_FIELDS_MSG, TestgenResponse}; use serde_json::json; fn parse(s: &str) -> Result { @@ -379,39 +402,44 @@ mod tests { #[test] fn only_json_field_is_ok() { - match parse(r#"{"json": {"value": 42}}"#).unwrap() { - TestgenResponse::Ok(v) => assert_eq!(v, json!({"value": 42})), - other => panic!("expected Ok, got {other:?}"), - } + assert_eq!( + parse(r#"{"json": {"value": 42}}"#).unwrap(), + TestgenResponse::Ok(json!({"value": 42})), + ); } #[test] fn only_error_field_is_err() { - match parse(r#"{"error": "boom"}"#).unwrap() { - TestgenResponse::Err(v) => assert_eq!(v, json!("boom")), - other => panic!("expected Err, got {other:?}"), - } + assert_eq!( + parse(r#"{"error": "boom"}"#).unwrap(), + TestgenResponse::Err(json!("boom")), + ); } #[test] fn both_fields_prefer_json() { - match parse(r#"{"json": {"ok": true}, "error": "ignored"}"#).unwrap() { - TestgenResponse::Ok(v) => assert_eq!(v, json!({"ok": true})), - other => panic!("expected Ok, got {other:?}"), - } + assert_eq!( + parse(r#"{"json": {"ok": true}, "error": "ignored"}"#).unwrap(), + TestgenResponse::Ok(json!({"ok": true})), + ); } #[test] fn neither_field_fails_deserialization() { let err = parse(r#"{}"#).unwrap_err(); - assert!(err.to_string().contains("missing both")); + assert!( + err.to_string().contains(MISSING_BOTH_FIELDS_MSG), + "unexpected error: {err}", + ); } + /// Locks in the wire contract: unknown fields are silently dropped, so + /// `testgen-hs` can add new response fields without breaking us. #[test] fn unknown_fields_are_ignored() { - match parse(r#"{"json": 1, "extra": "meta"}"#).unwrap() { - TestgenResponse::Ok(v) => assert_eq!(v, json!(1)), - other => panic!("expected Ok, got {other:?}"), - } + assert_eq!( + parse(r#"{"json": 1, "extra": "meta"}"#).unwrap(), + TestgenResponse::Ok(json!(1)), + ); } } From bc68d97be495a03ac0d32781f8cb660388b05a71 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C4=B1z=C4=B1r=20Sefa=20=C4=B0rken?= Date: Mon, 4 May 2026 11:25:20 +0300 Subject: [PATCH 06/10] chore: bump blockfrost-tests flake input --- flake.lock | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/flake.lock b/flake.lock index 4243866a..00ebfce1 100644 --- a/flake.lock +++ b/flake.lock @@ -19,11 +19,11 @@ "blockfrost-tests": { "flake": false, "locked": { - "lastModified": 1777033748, - "narHash": "sha256-KVpxn562fS4ioCXB4d4RGsD8PNlIEgi9iJUxG5n+wts=", + "lastModified": 1777465150, + "narHash": "sha256-6QUnCM0HQLq4b1lM++fwhMtVSEPC5bvhBjqL3I0NKsI=", "owner": "blockfrost", "repo": "blockfrost-tests", - "rev": "ee42a8afbb2c1ecbdeb08570aa62f37022eec27f", + "rev": "9626853a6b19b792ef5f7d5673001eb233d0f870", "type": "github" }, "original": { From 02c4a397fb0e503358b7aa613e2cb3a936b760f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C4=B1z=C4=B1r=20Sefa=20=C4=B0rken?= Date: Fri, 8 May 2026 12:10:37 +0300 Subject: [PATCH 07/10] fix: Applied PR feedback --- crates/error_decoder/Cargo.toml | 8 ++++---- crates/testgen/src/tests.rs | 3 --- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/crates/error_decoder/Cargo.toml b/crates/error_decoder/Cargo.toml index b6c6733d..20e6cf1c 100644 --- a/crates/error_decoder/Cargo.toml +++ b/crates/error_decoder/Cargo.toml @@ -9,12 +9,8 @@ build = "build.rs" bf-common.workspace = true bf-testgen.workspace = true hex.workspace = true -sysinfo.workspace = true serde_json.workspace = true tokio.workspace = true -pallas-network.workspace = true -pallas-hardano.workspace = true -pallas-codec.workspace = true [features] tarpaulin = [] @@ -24,3 +20,7 @@ bf-build-utils.workspace = true [dev-dependencies] bf-testgen = { workspace = true, features = ["test-utils"] } +sysinfo.workspace = true +pallas-network.workspace = true +pallas-hardano.workspace = true +pallas-codec.workspace = true diff --git a/crates/testgen/src/tests.rs b/crates/testgen/src/tests.rs index 3deadfa4..24fd4ee1 100644 --- a/crates/testgen/src/tests.rs +++ b/crates/testgen/src/tests.rs @@ -1,6 +1,3 @@ -#[cfg(not(feature = "tarpaulin"))] -use num_cpus; - use serde::Deserialize; use std::process::Command; From 7f9ed88d5c1644e75e74ff5176748f86af7330e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C4=B1z=C4=B1r=20Sefa=20=C4=B0rken?= Date: Mon, 11 May 2026 17:33:21 +0300 Subject: [PATCH 08/10] fix(testgen): reset child pid after subprocess exit --- crates/testgen/src/testgen.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/crates/testgen/src/testgen.rs b/crates/testgen/src/testgen.rs index 925766d2..7ff98e72 100644 --- a/crates/testgen/src/testgen.rs +++ b/crates/testgen/src/testgen.rs @@ -289,13 +289,13 @@ impl Testgen { variant, ); - // Make sure the child is dead before returning. - child - .kill() - .map_err(|err| format!("couldn’t kill the child: {err:?}"))?; - child + let _ = child.kill().inspect_err( + |err| debug!(err = %err, "testgen-hs: child pid kill failed"), + ); + let _ = child .wait() - .map_err(|err| format!("couldn’t reap the child: {err:?}"))?; + .inspect_err(|err| debug!(err = %err, "testgen-hs: child pid wait failed")); + current_child_pid.store(u32::MAX, atomic::Ordering::Relaxed); result } From 576318392952e011e4d288f0b21e58f75f541c2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C4=B1z=C4=B1r=20Sefa=20=C4=B0rken?= Date: Mon, 11 May 2026 17:47:33 +0300 Subject: [PATCH 09/10] fix(testgen): formatting --- crates/testgen/src/testgen.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/testgen/src/testgen.rs b/crates/testgen/src/testgen.rs index 7ff98e72..1f009bcb 100644 --- a/crates/testgen/src/testgen.rs +++ b/crates/testgen/src/testgen.rs @@ -289,9 +289,9 @@ impl Testgen { variant, ); - let _ = child.kill().inspect_err( - |err| debug!(err = %err, "testgen-hs: child pid kill failed"), - ); + let _ = child + .kill() + .inspect_err(|err| debug!(err = %err, "testgen-hs: child pid kill failed")); let _ = child .wait() .inspect_err(|err| debug!(err = %err, "testgen-hs: child pid wait failed")); From 1544eb1110a0c984845a32593f0d6165cadf3050 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C4=B1z=C4=B1r=20Sefa=20=C4=B0rken?= Date: Tue, 12 May 2026 14:14:48 +0300 Subject: [PATCH 10/10] fix(testgen): some log levels changed to warning --- crates/testgen/src/testgen.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/testgen/src/testgen.rs b/crates/testgen/src/testgen.rs index 1f009bcb..abf8ac73 100644 --- a/crates/testgen/src/testgen.rs +++ b/crates/testgen/src/testgen.rs @@ -291,10 +291,10 @@ impl Testgen { let _ = child .kill() - .inspect_err(|err| debug!(err = %err, "testgen-hs: child pid kill failed")); + .inspect_err(|err| warn!(err = %err, "Testgen: child pid kill failed")); let _ = child .wait() - .inspect_err(|err| debug!(err = %err, "testgen-hs: child pid wait failed")); + .inspect_err(|err| warn!(err = %err, "Testgen: child pid wait failed")); current_child_pid.store(u32::MAX, atomic::Ordering::Relaxed); result