From f2f127d262a46b192a5632146ba6bdbf77c15eae Mon Sep 17 00:00:00 2001 From: Alpay Aldemir Date: Mon, 11 May 2026 16:44:57 -0700 Subject: [PATCH 1/4] fix(transaction): avoid eager reservation of nonce and stray pending tx record --- crates/core/src/postgres.rs | 15 +- crates/core/src/schema/mod.rs | 5 + crates/core/src/schema/v1_0_3.rs | 18 + crates/core/src/transaction/db/read.rs | 23 + crates/core/src/transaction/db/write.rs | 2 +- crates/core/src/transaction/nonce_manager.rs | 75 ++- .../queue_system/transactions_queues.rs | 526 ++++++++++++++++-- .../transactions_queues_custom_errors.rs | 148 ++++- 8 files changed, 759 insertions(+), 53 deletions(-) create mode 100644 crates/core/src/schema/v1_0_3.rs diff --git a/crates/core/src/postgres.rs b/crates/core/src/postgres.rs index 22c4bb1c..2684aba8 100644 --- a/crates/core/src/postgres.rs +++ b/crates/core/src/postgres.rs @@ -10,7 +10,8 @@ use postgres_native_tls::MakeTlsConnector; use tokio::{task, time::timeout}; pub use tokio_postgres::types::{ToSql, Type as PgType}; use tokio_postgres::{ - config::SslMode, Config, CopyInSink, Error as PgError, Row, Statement, ToStatement, + config::SslMode, error::SqlState, Config, CopyInSink, Error as PgError, Row, Statement, + ToStatement, }; pub fn connection_string() -> Result { @@ -49,6 +50,18 @@ pub enum PostgresError { ConnectionPoolError(#[from] RunError), } +impl PostgresError { + pub fn is_unique_violation_on(&self, constraint: &str) -> bool { + match self { + PostgresError::PgError(error) => error.as_db_error().is_some_and(|db_error| { + db_error.code() == &SqlState::UNIQUE_VIOLATION + && db_error.constraint() == Some(constraint) + }), + PostgresError::ConnectionPoolError(_) => false, + } + } +} + pub struct PostgresClient { pub pool: Pool>, } diff --git a/crates/core/src/schema/mod.rs b/crates/core/src/schema/mod.rs index 6d94cb7c..970514a5 100644 --- a/crates/core/src/schema/mod.rs +++ b/crates/core/src/schema/mod.rs @@ -1,5 +1,6 @@ use crate::schema::v1_0_1::apply_v1_0_1_schema; use crate::schema::v1_0_2::apply_v1_0_2_schema; +use crate::schema::v1_0_3::apply_v1_0_3_schema; use crate::{ postgres::{PostgresClient, PostgresError}, schema::v1_0_0::apply_v1_0_0_schema, @@ -8,12 +9,16 @@ use crate::{ mod v1_0_0; mod v1_0_1; mod v1_0_2; +mod v1_0_3; + +pub use v1_0_3::TRANSACTION_EXTERNAL_ID_UNIQUE_INDEX; /// Applies the database schema to the database. pub async fn apply_schema(client: &PostgresClient) -> Result<(), PostgresError> { apply_v1_0_0_schema(client).await?; apply_v1_0_1_schema(client).await?; apply_v1_0_2_schema(client).await?; + apply_v1_0_3_schema(client).await?; Ok(()) } diff --git a/crates/core/src/schema/v1_0_3.rs b/crates/core/src/schema/v1_0_3.rs new file mode 100644 index 00000000..c8bd322b --- /dev/null +++ b/crates/core/src/schema/v1_0_3.rs @@ -0,0 +1,18 @@ +use crate::postgres::{PostgresClient, PostgresError}; + +pub const TRANSACTION_EXTERNAL_ID_UNIQUE_INDEX: &str = "idx_transaction_relayer_external_id_unique"; + +/// Applies the RRelayer database schema version 1.0.3. +/// Adds per-relayer idempotency for non-null transaction external IDs. +pub async fn apply_v1_0_3_schema(client: &PostgresClient) -> Result<(), PostgresError> { + let schema_sql = format!( + r#" + CREATE UNIQUE INDEX IF NOT EXISTS {TRANSACTION_EXTERNAL_ID_UNIQUE_INDEX} + ON relayer.transaction(relayer_id, external_id) + WHERE external_id IS NOT NULL; + "# + ); + + client.batch_execute(&schema_sql).await?; + Ok(()) +} diff --git a/crates/core/src/transaction/db/read.rs b/crates/core/src/transaction/db/read.rs index 4198e3d2..0bf33a8d 100644 --- a/crates/core/src/transaction/db/read.rs +++ b/crates/core/src/transaction/db/read.rs @@ -124,4 +124,27 @@ impl PostgresClient { Some(row) => Ok(Some(build_transaction_from_transaction_view(&row))), } } + + pub async fn get_transaction_by_relayer_and_external_id( + &self, + relayer_id: &RelayerId, + external_id: &str, + ) -> Result, PostgresError> { + let row = self + .query_one_or_none( + " + SELECT * + FROM relayer.transaction + WHERE relayer_id = $1 + AND external_id = $2; + ", + &[relayer_id, &external_id], + ) + .await?; + + match row { + None => Ok(None), + Some(row) => Ok(Some(build_transaction_from_transaction_view(&row))), + } + } } diff --git a/crates/core/src/transaction/db/write.rs b/crates/core/src/transaction/db/write.rs index 0d0a4f62..eb6f6edb 100644 --- a/crates/core/src/transaction/db/write.rs +++ b/crates/core/src/transaction/db/write.rs @@ -168,7 +168,7 @@ impl PostgresClient { &transaction.value, &transaction.blobs, &transaction.speed, - &transaction.status, + &TransactionStatus::FAILED, &transaction.expires_at, &transaction.queued_at, &failed_reason.chars().take(2000).collect::(), diff --git a/crates/core/src/transaction/nonce_manager.rs b/crates/core/src/transaction/nonce_manager.rs index 2dc40ddb..b6ac1121 100644 --- a/crates/core/src/transaction/nonce_manager.rs +++ b/crates/core/src/transaction/nonce_manager.rs @@ -1,21 +1,32 @@ -use tokio::sync::Mutex; +use std::sync::Arc; +use tokio::sync::{Mutex, OwnedMutexGuard}; use crate::transaction::types::TransactionNonce; pub struct NonceManager { - nonce: Mutex, + nonce: Arc>, +} + +pub struct NonceReservation { + nonce: TransactionNonce, + committed: bool, + guard: Option>, } impl NonceManager { pub fn new(current_nonce: TransactionNonce) -> Self { - NonceManager { nonce: Mutex::new(current_nonce) } + NonceManager { nonce: Arc::new(Mutex::new(current_nonce)) } } - pub async fn get_and_increment(&self) -> TransactionNonce { - let mut nonce_guard = self.nonce.lock().await; + /// Reserves and increments the next nonce while holding the internal mutex until commit or drop. + /// + /// Callers should keep the work between reservation and `NonceReservation::commit` short; if + /// the reservation is dropped before commit, the nonce cursor is rolled back automatically. + pub async fn reserve_next(&self) -> NonceReservation { + let mut nonce_guard = self.nonce.clone().lock_owned().await; let current_nonce = *nonce_guard; *nonce_guard = current_nonce + 1; - current_nonce + NonceReservation { nonce: current_nonce, committed: false, guard: Some(nonce_guard) } } pub async fn sync_with_onchain_nonce(&self, onchain_nonce: TransactionNonce) { @@ -30,3 +41,55 @@ impl NonceManager { *nonce_guard } } + +impl NonceReservation { + pub fn nonce(&self) -> TransactionNonce { + self.nonce + } + + pub fn commit(mut self) { + self.committed = true; + self.guard.take(); + } +} + +impl Drop for NonceReservation { + fn drop(&mut self) { + if self.committed { + return; + } + + if let Some(mut nonce_guard) = self.guard.take() { + *nonce_guard = self.nonce; + } + } +} + +#[cfg(test)] +mod tests { + use super::NonceManager; + use crate::transaction::types::TransactionNonce; + + #[tokio::test] + async fn reservation_rolls_back_when_dropped() { + let manager = NonceManager::new(TransactionNonce::new(7)); + + { + let reservation = manager.reserve_next().await; + assert_eq!(reservation.nonce(), TransactionNonce::new(7)); + } + + assert_eq!(manager.get_current_nonce().await, TransactionNonce::new(7)); + } + + #[tokio::test] + async fn reservation_commit_keeps_increment() { + let manager = NonceManager::new(TransactionNonce::new(7)); + + let reservation = manager.reserve_next().await; + assert_eq!(reservation.nonce(), TransactionNonce::new(7)); + reservation.commit(); + + assert_eq!(manager.get_current_nonce().await, TransactionNonce::new(8)); + } +} diff --git a/crates/core/src/transaction/queue_system/transactions_queues.rs b/crates/core/src/transaction/queue_system/transactions_queues.rs index 320821c5..2a52e7bb 100644 --- a/crates/core/src/transaction/queue_system/transactions_queues.rs +++ b/crates/core/src/transaction/queue_system/transactions_queues.rs @@ -26,19 +26,20 @@ use super::{ transactions_queue::TransactionsQueue, types::{ AddTransactionError, CancelTransactionError, CancelTransactionResult, CompetitionType, - EditableTransactionType, ProcessInmempoolStatus, ProcessInmempoolTransactionError, - ProcessMinedStatus, ProcessMinedTransactionError, ProcessPendingStatus, - ProcessPendingTransactionError, ProcessResult, ReplaceTransactionError, - ReplaceTransactionResult, TransactionRelayerSetup, TransactionToSend, - TransactionsQueueSetup, + CompetitiveTransaction, EditableTransactionType, ProcessInmempoolStatus, + ProcessInmempoolTransactionError, ProcessMinedStatus, ProcessMinedTransactionError, + ProcessPendingStatus, ProcessPendingTransactionError, ProcessResult, + ReplaceTransactionError, ReplaceTransactionResult, TransactionRelayerSetup, + TransactionToSend, TransactionsQueueSetup, }, }; +use crate::schema::TRANSACTION_EXTERNAL_ID_UNIQUE_INDEX; use crate::transaction::api::RelayTransactionRequest; use crate::transaction::queue_system::types::SendTransactionGasPriceError; use crate::transaction::types::{TransactionBlob, TransactionConversionError, TransactionSpeed}; use crate::{ gas::{BlobGasOracleCache, BlobGasPriceResult, GasLimit, GasOracleCache, GasPriceResult}, - postgres::{PostgresClient, PostgresConnectionError}, + postgres::{PostgresClient, PostgresConnectionError, PostgresError}, relayer::RelayerId, safe_proxy::SafeProxyManager, shared::{cache::Cache, common_types::WalletOrProviderError}, @@ -47,7 +48,10 @@ use crate::{ cache::invalidate_transaction_no_state_cache, nonce_manager::NonceManager, queue_system::types::TransactionQueueSendTransactionError, - types::{Transaction, TransactionData, TransactionId, TransactionStatus, TransactionValue}, + types::{ + Transaction, TransactionData, TransactionId, TransactionNonce, TransactionStatus, + TransactionValue, + }, }, webhooks::WebhookManager, }; @@ -80,11 +84,26 @@ impl TransactionsQueues { let mut relayer_block_times_ms = HashMap::new(); for setup in setups { - let current_nonce = setup.evm_provider.get_nonce(&setup.relayer).await?; + let onchain_nonce = setup.evm_provider.get_nonce(&setup.relayer).await?; + // read the nonces assigned to in-flight transactions to determine the next nonce to use. + let open_local_nonce = Self::next_nonce_after_open_transactions( + &setup.pending_transactions, + &setup.inmempool_transactions, + &setup.mined_transactions, + ); + let current_nonce = open_local_nonce + .filter(|local_nonce| local_nonce.into_inner() > onchain_nonce.into_inner()) + .unwrap_or(onchain_nonce); info!( - "Startup nonce synchronization for relayer {} ({}): synchronizing nonce manager with on-chain nonce {}", - setup.relayer.name, setup.relayer.id, current_nonce.into_inner() + "Startup nonce synchronization for relayer {} ({}): on-chain nonce {}, open local next nonce {}, using {}", + setup.relayer.name, + setup.relayer.id, + onchain_nonce.into_inner(), + open_local_nonce + .map(|nonce| nonce.into_inner().to_string()) + .unwrap_or_else(|| "none".to_string()), + current_nonce.into_inner() ); relayer_block_times_ms.insert(setup.relayer.id, setup.evm_provider.blocks_every); @@ -207,11 +226,212 @@ impl TransactionsQueues { Utc::now() + chrono::Duration::hours(12) } + fn next_nonce_after_open_transactions( + pending_transactions: &VecDeque, + inmempool_transactions: &VecDeque, + mined_transactions: &HashMap, + ) -> Option { + fn nonce_if_open(transaction: &Transaction) -> Option { + match transaction.status { + TransactionStatus::PENDING + | TransactionStatus::INMEMPOOL + | TransactionStatus::MINED => Some(transaction.nonce), + TransactionStatus::CONFIRMED + | TransactionStatus::FAILED + | TransactionStatus::EXPIRED + | TransactionStatus::CANCELLED + | TransactionStatus::DROPPED + | TransactionStatus::REPLACED => None, + } + } + + let pending_nonces = pending_transactions.iter().filter_map(nonce_if_open); + let inmempool_nonces = inmempool_transactions.iter().flat_map(|transaction| { + nonce_if_open(&transaction.original).into_iter().chain( + transaction + .competitive + .as_ref() + .and_then(|(transaction, _)| nonce_if_open(transaction)), + ) + }); + let mined_nonces = mined_transactions.values().filter_map(nonce_if_open); + + pending_nonces + .chain(inmempool_nonces) + .chain(mined_nonces) + .map(|nonce| nonce.into_inner()) + .max() + .map(|nonce| TransactionNonce::new(nonce + 1)) + } + /// Checks if a transaction has expired. fn has_expired(&self, transaction: &Transaction) -> bool { transaction.expires_at < Utc::now() } + fn transaction_matches_payload( + transaction: &Transaction, + relayer_id: &RelayerId, + transaction_to_send: &TransactionToSend, + ) -> bool { + transaction.relayer_id == *relayer_id + && transaction.to == transaction_to_send.to + && transaction.value == transaction_to_send.value + && transaction.data == transaction_to_send.data + && transaction.speed == transaction_to_send.speed + && transaction.blobs == transaction_to_send.blobs + && transaction.external_id == transaction_to_send.external_id + } + + /// Replays a prior idempotent result once the payload matches; status is intentionally ignored after a hash exists. + fn resolve_idempotent_transaction( + transaction: Transaction, + relayer_id: &RelayerId, + transaction_to_send: &TransactionToSend, + external_id: &str, + ) -> Result { + if !Self::transaction_matches_payload(&transaction, relayer_id, transaction_to_send) { + return Err(AddTransactionError::ExternalIdPayloadMismatch { + relayer_id: *relayer_id, + external_id: external_id.to_string(), + }); + } + + if transaction.known_transaction_hash.is_some() { + return Ok(transaction); + } + + Err(AddTransactionError::IdempotentTransactionFailed { + relayer_id: *relayer_id, + external_id: external_id.to_string(), + }) + } + + /// Loads an existing transaction for a relayer-scoped external id. + async fn load_idempotent_transaction( + &self, + relayer_id: &RelayerId, + external_id: &str, + ) -> Result, AddTransactionError> { + self.db + .get_transaction_by_relayer_and_external_id(relayer_id, external_id) + .await + .map_err(AddTransactionError::CouldNotReadTransactionDb) + } + + /// Checks whether a relayer-scoped external id is already assigned to a transaction. + async fn external_id_already_used( + &self, + relayer_id: &RelayerId, + external_id: Option<&str>, + ) -> Result { + let Some(external_id) = external_id else { + return Ok(false); + }; + + self.db + .get_transaction_by_relayer_and_external_id(relayer_id, external_id) + .await + .map(|transaction| transaction.is_some()) + } + + /// Checks for a pre-existing idempotent result before nonce reservation. + async fn check_idempotent_existing_transaction( + &self, + relayer_id: &RelayerId, + transaction_to_send: &TransactionToSend, + ) -> Result, AddTransactionError> { + let Some(external_id) = transaction_to_send.external_id.as_deref() else { + return Ok(None); + }; + + let Some(transaction) = self.load_idempotent_transaction(relayer_id, external_id).await? + else { + return Ok(None); + }; + + let transaction = Self::resolve_idempotent_transaction( + transaction, + relayer_id, + transaction_to_send, + external_id, + )?; + info!( + %relayer_id, + external_id, + transaction_id = %transaction.id, + "Returning existing idempotent transaction" + ); + Ok(Some(transaction)) + } + + /// Recovers an idempotent result after a concurrent insert wins the unique-index race. + async fn resolve_idempotent_insert_conflict( + &self, + relayer_id: &RelayerId, + transaction_to_send: &TransactionToSend, + external_id: &str, + ) -> Result { + let existing = self + .load_idempotent_transaction(relayer_id, external_id) + .await? + .ok_or_else(|| AddTransactionError::ExternalIdPayloadMismatch { + relayer_id: *relayer_id, + external_id: external_id.to_string(), + })?; + + let transaction = Self::resolve_idempotent_transaction( + existing, + relayer_id, + transaction_to_send, + external_id, + )?; + info!( + %relayer_id, + external_id, + transaction_id = %transaction.id, + "Returning existing idempotent transaction after insert conflict" + ); + Ok(transaction) + } + + /// Returns the external id when a DB error is the idempotency unique-index race. + fn idempotent_insert_conflict_external_id<'a>( + error: &PostgresError, + transaction_to_send: &'a TransactionToSend, + ) -> Option<&'a str> { + let external_id = transaction_to_send.external_id.as_deref()?; + error.is_unique_violation_on(TRANSACTION_EXTERNAL_ID_UNIQUE_INDEX).then_some(external_id) + } + + /// Persists deterministic simulation failures or resolves the winning duplicate insert. + async fn save_failed_simulation_or_resolve_conflict( + &self, + relayer_id: &RelayerId, + transaction: &Transaction, + transaction_to_send: &TransactionToSend, + failed_reason: &str, + ) -> Result, AddTransactionError> { + match self.db.transaction_failed_on_send(relayer_id, transaction, failed_reason).await { + Ok(()) => Ok(None), + Err(error) => { + let Some(external_id) = + Self::idempotent_insert_conflict_external_id(&error, transaction_to_send) + else { + return Err(AddTransactionError::CouldNotSaveTransactionDb(error)); + }; + + self.resolve_idempotent_insert_conflict( + relayer_id, + transaction_to_send, + external_id, + ) + .await + .map(Some) + } + } + } + /// Converts a transaction to a no-op transaction. fn transaction_to_noop( &self, @@ -322,7 +542,7 @@ impl TransactionsQueues { .estimate_gas(&temp_transaction_request, transaction.is_noop) .await .map_err(|e| { - AddTransactionError::TransactionEstimateGasError(transaction.relayer_id, e) + AddTransactionError::transaction_estimate_gas_error(transaction.relayer_id, e) })?; let relayer_balance = transactions_queue.get_balance().await.map_err(|e| { @@ -361,6 +581,12 @@ impl TransactionsQueues { .get_transactions_queue(relayer_id) .ok_or(AddTransactionError::RelayerNotFound(*relayer_id))?; + if let Some(transaction) = + self.check_idempotent_existing_transaction(relayer_id, transaction_to_send).await? + { + return Ok(transaction); + } + let mut transactions_queue = queue_arc.lock().await; if transactions_queue.is_paused() { @@ -375,15 +601,6 @@ impl TransactionsQueues { }); } - // Sync nonce manager with on-chain nonce to ensure consistency - let current_onchain_nonce = transactions_queue - .get_nonce() - .await - .map_err(|e| AddTransactionError::CouldNotGetCurrentOnChainNonce(*relayer_id, e))?; - - transactions_queue.nonce_manager.sync_with_onchain_nonce(current_onchain_nonce).await; - let assigned_nonce = transactions_queue.nonce_manager.get_and_increment().await; - let mut transaction = Transaction { id: transaction_to_send.id, relayer_id: *relayer_id, @@ -391,7 +608,8 @@ impl TransactionsQueues { from: transactions_queue.relay_address(), value: transaction_to_send.value, data: transaction_to_send.data.clone(), - nonce: assigned_nonce, + // placeholder nonce, will be set to the next available nonce from the nonce manager. + nonce: TransactionNonce::new(0), gas_limit: None, status: TransactionStatus::PENDING, blobs: transaction_to_send.blobs.clone(), @@ -430,23 +648,37 @@ impl TransactionsQueues { let estimated_gas_limit = match estimated_gas_limit { Ok(limit) => limit, - Err(err) => { - self.db - .transaction_failed_on_send( + Err(err @ AddTransactionError::TransactionSimulationReverted(_, _)) => { + let failed_reason = err.to_string(); + if let Some(existing) = self + .save_failed_simulation_or_resolve_conflict( relayer_id, &transaction, - "Failed to send transaction as always failing on gas estimation", + transaction_to_send, + &failed_reason, ) - .await - .map_err(AddTransactionError::CouldNotSaveTransactionDb)?; + .await? + { + return Ok(existing); + } self.invalidate_transaction_cache(&transaction.id).await; return Err(err); } + Err(err) => return Err(err), }; transaction.gas_limit = Some(estimated_gas_limit); + let current_onchain_nonce = transactions_queue + .get_nonce() + .await + .map_err(|e| AddTransactionError::CouldNotGetCurrentOnChainNonce(*relayer_id, e))?; + + transactions_queue.nonce_manager.sync_with_onchain_nonce(current_onchain_nonce).await; + let nonce_reservation = transactions_queue.nonce_manager.reserve_next().await; + transaction.nonce = nonce_reservation.nonce(); + let transaction_request = Self::create_typed_transaction( &transactions_queue, &transaction, @@ -458,13 +690,30 @@ impl TransactionsQueues { transaction.known_transaction_hash = Some(transactions_queue.compute_tx_hash(&transaction_request).await?); - self.db - .save_transaction(relayer_id, &transaction) - .await - .map_err(AddTransactionError::CouldNotSaveTransactionDb)?; + match self.db.save_transaction(relayer_id, &transaction).await { + Ok(()) => {} + Err(error) => { + let Some(external_id) = + Self::idempotent_insert_conflict_external_id(&error, transaction_to_send) + else { + return Err(AddTransactionError::CouldNotSaveTransactionDb(error)); + }; + + drop(nonce_reservation); + return self + .resolve_idempotent_insert_conflict( + relayer_id, + transaction_to_send, + external_id, + ) + .await; + } + } transactions_queue.add_pending_transaction(transaction.clone()).await; - // Nonce already incremented atomically above - no need for separate increase call + // release the nonce manager lock after the transaction has been successfully added to the queue. + // releasing the nonce manager lock too soon may cause downstream failures to lead to eager (and faulty) nonce increment. + nonce_reservation.commit(); self.invalidate_transaction_cache(&transaction.id).await; if let Some(webhook_manager) = &self.webhook_manager { @@ -574,6 +823,22 @@ impl TransactionsQueues { cancelled_by_transaction_id: None, }; + if let Some(external_id) = cancel_transaction.external_id.as_deref() { + if self + .external_id_already_used( + &transaction.relayer_id, + Some(external_id), + ) + .await + .map_err(CancelTransactionError::CouldNotReadTransactionDb)? + { + return Err(CancelTransactionError::ExternalIdAlreadyUsed { + relayer_id: transaction.relayer_id, + external_id: external_id.to_string(), + }); + } + } + info!("cancel_transaction: creating higher gas cancel transaction for inmempool tx with same nonce {:?}", cancel_transaction.nonce); // For cancel transactions, we need to bump the original transaction's gas prices @@ -822,6 +1087,22 @@ impl TransactionsQueues { cancelled_by_transaction_id: None, }; + if let Some(external_id) = replace_transaction.external_id.as_deref() { + if self + .external_id_already_used( + &transaction.relayer_id, + Some(external_id), + ) + .await + .map_err(ReplaceTransactionError::CouldNotReadTransactionDb)? + { + return Err(ReplaceTransactionError::ExternalIdAlreadyUsed { + relayer_id: transaction.relayer_id, + external_id: external_id.to_string(), + }); + } + } + info!("replace_transaction: creating competitive replace transaction for inmempool tx with same nonce {:?}", replace_transaction.nonce); // For replace transactions, we need to bump the original transaction's gas prices and gas limit @@ -1177,8 +1458,10 @@ impl TransactionsQueues { )); } - let new_nonce = - transactions_queue.nonce_manager.get_and_increment().await; + let nonce_reservation = + transactions_queue.nonce_manager.reserve_next().await; + let new_nonce = nonce_reservation.nonce(); + nonce_reservation.commit(); transaction.nonce = new_nonce; transactions_queue @@ -1477,7 +1760,9 @@ impl TransactionsQueues { )); } - let new_nonce = transactions_queue.nonce_manager.get_and_increment().await; + let nonce_reservation = transactions_queue.nonce_manager.reserve_next().await; + let new_nonce = nonce_reservation.nonce(); + nonce_reservation.commit(); transaction.nonce = new_nonce; transactions_queue.update_inmempool_transaction_nonce(&transaction.id, new_nonce).await; @@ -1690,3 +1975,174 @@ impl TransactionsQueues { } } } + +#[cfg(test)] +mod tests { + use std::collections::{HashMap, VecDeque}; + + use alloy::primitives::{TxHash, U256}; + use chrono::Utc; + + use super::TransactionsQueues; + use crate::{ + network::ChainId, + relayer::RelayerId, + shared::common_types::EvmAddress, + transaction::{ + queue_system::types::{CompetitionType, CompetitiveTransaction, TransactionToSend}, + types::{ + Transaction, TransactionData, TransactionHash, TransactionId, TransactionNonce, + TransactionSpeed, TransactionStatus, TransactionValue, + }, + }, + }; + + fn transaction_with_nonce(status: TransactionStatus, nonce: u64) -> Transaction { + Transaction { + id: TransactionId::new(), + relayer_id: RelayerId::new(), + to: EvmAddress::zero(), + from: EvmAddress::zero(), + value: TransactionValue::zero(), + data: TransactionData::empty(), + nonce: TransactionNonce::new(nonce), + gas_limit: None, + status, + blobs: None, + chain_id: ChainId::new(1), + known_transaction_hash: None, + queued_at: Utc::now(), + expires_at: Utc::now(), + sent_at: None, + mined_at: None, + mined_at_block_number: None, + confirmed_at: None, + speed: TransactionSpeed::FAST, + sent_with_max_priority_fee_per_gas: None, + sent_with_max_fee_per_gas: None, + is_noop: false, + sent_with_gas: None, + sent_with_blob_gas: None, + external_id: None, + cancelled_by_transaction_id: None, + } + } + + fn to_transaction_to_send(transaction: &Transaction) -> TransactionToSend { + TransactionToSend::new( + transaction.to, + transaction.value, + transaction.data.clone(), + Some(transaction.speed.clone()), + transaction.blobs.clone(), + transaction.external_id.clone(), + ) + } + + #[test] + fn next_nonce_after_open_transactions_ignores_failed_rows() { + let mut pending = VecDeque::new(); + pending.push_back(transaction_with_nonce(TransactionStatus::FAILED, 99)); + pending.push_back(transaction_with_nonce(TransactionStatus::PENDING, 3)); + + let next_nonce = TransactionsQueues::next_nonce_after_open_transactions( + &pending, + &VecDeque::new(), + &HashMap::new(), + ) + .unwrap(); + + assert_eq!(next_nonce, TransactionNonce::new(4)); + } + + #[test] + fn next_nonce_after_open_transactions_counts_inmempool_and_mined() { + let mut inmempool = VecDeque::new(); + let mut competitive = + CompetitiveTransaction::new(transaction_with_nonce(TransactionStatus::INMEMPOOL, 7)); + competitive.add_competitor( + transaction_with_nonce(TransactionStatus::INMEMPOOL, 7), + CompetitionType::Replace, + ); + inmempool.push_back(competitive); + + let mined_transaction = transaction_with_nonce(TransactionStatus::MINED, 12); + let mut mined = HashMap::new(); + mined.insert(mined_transaction.id, mined_transaction); + + let next_nonce = TransactionsQueues::next_nonce_after_open_transactions( + &VecDeque::new(), + &inmempool, + &mined, + ) + .unwrap(); + + assert_eq!(next_nonce, TransactionNonce::new(13)); + } + + #[test] + fn idempotent_resolve_returns_matching_transaction_with_hash() { + let relayer_id = RelayerId::new(); + let mut transaction = transaction_with_nonce(TransactionStatus::PENDING, 1); + transaction.relayer_id = relayer_id; + transaction.external_id = Some("idempotent-key".to_string()); + transaction.known_transaction_hash = Some(TransactionHash::new(TxHash::ZERO)); + + let transaction_to_send = to_transaction_to_send(&transaction); + + let resolved = TransactionsQueues::resolve_idempotent_transaction( + transaction.clone(), + &relayer_id, + &transaction_to_send, + "idempotent-key", + ) + .unwrap(); + + assert_eq!(resolved.id, transaction.id); + } + + #[test] + fn idempotent_resolve_rejects_payload_mismatch() { + let relayer_id = RelayerId::new(); + let mut transaction = transaction_with_nonce(TransactionStatus::PENDING, 1); + transaction.relayer_id = relayer_id; + transaction.external_id = Some("idempotent-key".to_string()); + + let mut transaction_to_send = to_transaction_to_send(&transaction); + transaction_to_send.value = TransactionValue::new(U256::from(1_u128)); + + let result = TransactionsQueues::resolve_idempotent_transaction( + transaction, + &relayer_id, + &transaction_to_send, + "idempotent-key", + ); + + assert!(matches!( + result, + Err(super::AddTransactionError::ExternalIdPayloadMismatch { .. }) + )); + } + + #[test] + fn idempotent_resolve_returns_bad_request_for_prior_failed_simulation() { + let relayer_id = RelayerId::new(); + let mut transaction = transaction_with_nonce(TransactionStatus::FAILED, 1); + transaction.relayer_id = relayer_id; + transaction.external_id = Some("idempotent-key".to_string()); + + let transaction_to_send = to_transaction_to_send(&transaction); + + let result = TransactionsQueues::resolve_idempotent_transaction( + transaction, + &relayer_id, + &transaction_to_send, + "idempotent-key", + ); + + assert!(matches!( + result, + Err(super::AddTransactionError::IdempotentTransactionFailed { .. }) + )); + } +} diff --git a/crates/core/src/transaction/queue_system/types/transactions_queues_custom_errors.rs b/crates/core/src/transaction/queue_system/types/transactions_queues_custom_errors.rs index fa65e5ee..ffb9f882 100644 --- a/crates/core/src/transaction/queue_system/types/transactions_queues_custom_errors.rs +++ b/crates/core/src/transaction/queue_system/types/transactions_queues_custom_errors.rs @@ -1,13 +1,18 @@ use std::time::SystemTimeError; -use alloy::transports::{RpcError, TransportErrorKind}; +use alloy::{ + rpc::json_rpc::ErrorPayload, + transports::{RpcError, TransportErrorKind}, +}; use thiserror::Error; use super::{ SendTransactionGasPriceError, TransactionQueueSendTransactionError, TransactionSentWithRelayer, }; use crate::common_types::EvmAddress; -use crate::shared::{bad_request, forbidden, internal_server_error, not_found, HttpError}; +use crate::shared::{ + bad_request, conflict, forbidden, internal_server_error, not_found, HttpError, +}; use crate::transaction::types::TransactionConversionError; use crate::{ postgres::PostgresError, @@ -33,6 +38,12 @@ pub enum ReplaceTransactionError { #[error("Relayer could not update the transaction in the db {0}")] CouldNotUpdateTransactionInDb(#[from] PostgresError), + #[error("Transaction could not be read from DB: {0}")] + CouldNotReadTransactionDb(PostgresError), + + #[error("external_id {external_id} has already been used for relayer {relayer_id}")] + ExternalIdAlreadyUsed { relayer_id: RelayerId, external_id: String }, + #[error("Nonce synchronization recovered, replacement transaction should be retried")] NonceSynchronizationRecovered, } @@ -47,6 +58,10 @@ impl From for HttpError { return forbidden(value.to_string()); } + if matches!(value, ReplaceTransactionError::ExternalIdAlreadyUsed { .. }) { + return conflict(value.to_string()); + } + internal_server_error(Some(value.to_string())) } } @@ -56,6 +71,9 @@ pub enum AddTransactionError { #[error("Transaction could not be saved in DB: {0}")] CouldNotSaveTransactionDb(PostgresError), + #[error("Transaction could not be read from DB: {0}")] + CouldNotReadTransactionDb(PostgresError), + #[error("Relayer could not be found: {0}")] RelayerNotFound(RelayerId), @@ -74,6 +92,9 @@ pub enum AddTransactionError { #[error("could not estimate gas limit - {0}")] TransactionEstimateGasError(RelayerId, RpcError), + #[error("transaction simulation reverted - {1}")] + TransactionSimulationReverted(RelayerId, RpcError), + #[error("Could not get current on chain nonce for relayer {0} - {1}")] CouldNotGetCurrentOnChainNonce(RelayerId, RpcError), @@ -82,23 +103,71 @@ pub enum AddTransactionError { #[error("Unsupported transaction type: {message}")] UnsupportedTransactionType { message: String }, + + #[error("external_id {external_id} has already been used with a different transaction payload for relayer {relayer_id}")] + ExternalIdPayloadMismatch { relayer_id: RelayerId, external_id: String }, + + #[error("transaction for external_id {external_id} on relayer {relayer_id} previously failed before broadcast")] + IdempotentTransactionFailed { relayer_id: RelayerId, external_id: String }, } impl From for HttpError { fn from(value: AddTransactionError) -> Self { - if matches!(value, AddTransactionError::RelayerIsPaused(_)) { - return forbidden(value.to_string()); + match &value { + AddTransactionError::RelayerIsPaused(_) => forbidden(value.to_string()), + AddTransactionError::RelayerNotFound(_) => not_found(value.to_string()), + AddTransactionError::UnsupportedTransactionType { .. } + | AddTransactionError::TransactionSimulationReverted(_, _) + | AddTransactionError::IdempotentTransactionFailed { .. } => { + bad_request(value.to_string()) + } + AddTransactionError::ExternalIdPayloadMismatch { .. } => conflict(value.to_string()), + AddTransactionError::CouldNotSaveTransactionDb(_) + | AddTransactionError::CouldNotReadTransactionDb(_) + | AddTransactionError::CouldNotReadAllowlistsFromDb(_) + | AddTransactionError::TransactionGasPriceError(_) + | AddTransactionError::ComputeTransactionHashError(_) + | AddTransactionError::TransactionEstimateGasError(_, _) + | AddTransactionError::CouldNotGetCurrentOnChainNonce(_, _) + | AddTransactionError::TransactionConversionError(_) => { + internal_server_error(Some(value.to_string())) + } } + } +} - if matches!(value, AddTransactionError::RelayerNotFound(_)) { - return not_found(value.to_string()); +impl AddTransactionError { + pub fn transaction_estimate_gas_error( + relayer_id: RelayerId, + error: RpcError, + ) -> Self { + if is_deterministic_simulation_revert(&error) { + return Self::TransactionSimulationReverted(relayer_id, error); } - if matches!(value, AddTransactionError::UnsupportedTransactionType { .. }) { - return bad_request(value.to_string()); - } + Self::TransactionEstimateGasError(relayer_id, error) + } +} - internal_server_error(Some(value.to_string())) +fn error_payload_is_revert(payload: &ErrorPayload) -> bool { + payload.as_revert_data().is_some() + || payload.message.to_ascii_lowercase().contains("execution reverted") +} + +fn is_deterministic_simulation_revert(error: &RpcError) -> bool { + match error { + RpcError::ErrorResp(payload) => error_payload_is_revert(payload), + RpcError::DeserError { text, .. } => serde_json::from_str::(text) + .map(|payload| error_payload_is_revert(&payload)) + .unwrap_or(false), + RpcError::Transport(TransportErrorKind::Custom(error)) => { + error.to_string().to_ascii_lowercase().contains("execution reverted") + } + RpcError::NullResp + | RpcError::UnsupportedFeature(_) + | RpcError::LocalUsageError(_) + | RpcError::SerError(_) + | RpcError::Transport(_) => false, } } @@ -110,6 +179,12 @@ pub enum CancelTransactionError { #[error("Could not update transaction in database: {0}")] CouldNotUpdateTransactionDb(PostgresError), + #[error("Transaction could not be read from DB: {0}")] + CouldNotReadTransactionDb(PostgresError), + + #[error("external_id {external_id} has already been used for relayer {relayer_id}")] + ExternalIdAlreadyUsed { relayer_id: RelayerId, external_id: String }, + #[error("Relayer could not be found: {0}")] RelayerNotFound(RelayerId), @@ -130,6 +205,10 @@ impl From for HttpError { return not_found(value.to_string()); } + if matches!(value, CancelTransactionError::ExternalIdAlreadyUsed { .. }) { + return conflict(value.to_string()); + } + internal_server_error(Some(value.to_string())) } } @@ -247,3 +326,52 @@ pub struct CompetitionResolutionResult { /// The transaction that lost the race (if there was competition) pub loser: Option, } + +#[cfg(test)] +mod tests { + use alloy::{ + rpc::json_rpc::ErrorPayload, + transports::{RpcError, TransportErrorKind}, + }; + use reqwest::StatusCode; + + use super::AddTransactionError; + use crate::{relayer::RelayerId, shared::HttpError}; + + #[test] + fn simulation_revert_maps_to_bad_request() { + let payload: ErrorPayload = serde_json::from_str( + r#"{"code":3,"message":"execution reverted: Multicall3: call failed"}"#, + ) + .unwrap(); + let error: RpcError = RpcError::ErrorResp(payload); + + let http_error: HttpError = + AddTransactionError::transaction_estimate_gas_error(RelayerId::new(), error).into(); + + assert_eq!(http_error.0, StatusCode::BAD_REQUEST); + } + + #[test] + fn transport_failure_stays_server_error() { + let error: RpcError = + RpcError::Transport(TransportErrorKind::BackendGone); + + let http_error: HttpError = + AddTransactionError::transaction_estimate_gas_error(RelayerId::new(), error).into(); + + assert_eq!(http_error.0, StatusCode::INTERNAL_SERVER_ERROR); + } + + #[test] + fn custom_transport_revert_without_colon_maps_to_bad_request() { + let error: RpcError = RpcError::Transport(TransportErrorKind::Custom( + "execution reverted".to_string().into(), + )); + + let http_error: HttpError = + AddTransactionError::transaction_estimate_gas_error(RelayerId::new(), error).into(); + + assert_eq!(http_error.0, StatusCode::BAD_REQUEST); + } +} From 3dd2b706e6a90a02b9f4ed3b6d055ae32ea460ad Mon Sep 17 00:00:00 2001 From: Alpay Aldemir Date: Mon, 11 May 2026 17:11:28 -0700 Subject: [PATCH 2/4] fix(startup): clippy --- crates/core/src/startup.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/crates/core/src/startup.rs b/crates/core/src/startup.rs index 1efcc8ab..a8900a76 100644 --- a/crates/core/src/startup.rs +++ b/crates/core/src/startup.rs @@ -184,10 +184,8 @@ async fn start_api( // Determine which signing provider to use (network-level or global) let signing_provider = if let Some(ref signing_key) = network_config.signing_provider { signing_key - } else if let Some(ref signing_key) = config.signing_provider { - signing_key } else { - return None; + config.signing_provider.as_ref()? }; // Check if only private keys are configured From 528ba5b79d1e777b7a3c324397ace34bf1a16a48 Mon Sep 17 00:00:00 2001 From: Alpay Aldemir Date: Mon, 11 May 2026 18:51:55 -0700 Subject: [PATCH 3/4] fix(transaction): on conflict return existing Transaction despite payload mismatch - the external_id should be obeyed as the consumer defines the mapping of external_id to transaction content --- .../queue_system/transactions_queues.rs | 54 ++++++++++++++++--- 1 file changed, 46 insertions(+), 8 deletions(-) diff --git a/crates/core/src/transaction/queue_system/transactions_queues.rs b/crates/core/src/transaction/queue_system/transactions_queues.rs index 2a52e7bb..6546e4b9 100644 --- a/crates/core/src/transaction/queue_system/transactions_queues.rs +++ b/crates/core/src/transaction/queue_system/transactions_queues.rs @@ -283,24 +283,36 @@ impl TransactionsQueues { && transaction.external_id == transaction_to_send.external_id } - /// Replays a prior idempotent result once the payload matches; status is intentionally ignored after a hash exists. + /// Resolves a prior idempotent result; accepted transactions with hashes always win. fn resolve_idempotent_transaction( transaction: Transaction, relayer_id: &RelayerId, transaction_to_send: &TransactionToSend, external_id: &str, ) -> Result { - if !Self::transaction_matches_payload(&transaction, relayer_id, transaction_to_send) { + let payload_matches = + Self::transaction_matches_payload(&transaction, relayer_id, transaction_to_send); + + if transaction.known_transaction_hash.is_some() { + if !payload_matches { + warn!( + %relayer_id, + external_id, + transaction_id = %transaction.id, + "External id reused with different payload; returning existing accepted transaction" + ); + } + + return Ok(transaction); + } + + if !payload_matches { return Err(AddTransactionError::ExternalIdPayloadMismatch { relayer_id: *relayer_id, external_id: external_id.to_string(), }); } - if transaction.known_transaction_hash.is_some() { - return Ok(transaction); - } - Err(AddTransactionError::IdempotentTransactionFailed { relayer_id: *relayer_id, external_id: external_id.to_string(), @@ -2086,7 +2098,8 @@ mod tests { let mut transaction = transaction_with_nonce(TransactionStatus::PENDING, 1); transaction.relayer_id = relayer_id; transaction.external_id = Some("idempotent-key".to_string()); - transaction.known_transaction_hash = Some(TransactionHash::new(TxHash::ZERO)); + let transaction_hash = TransactionHash::new(TxHash::ZERO); + transaction.known_transaction_hash = Some(transaction_hash); let transaction_to_send = to_transaction_to_send(&transaction); @@ -2099,10 +2112,11 @@ mod tests { .unwrap(); assert_eq!(resolved.id, transaction.id); + assert_eq!(resolved.known_transaction_hash, Some(transaction_hash)); } #[test] - fn idempotent_resolve_rejects_payload_mismatch() { + fn idempotent_resolve_rejects_payload_mismatch_without_hash() { let relayer_id = RelayerId::new(); let mut transaction = transaction_with_nonce(TransactionStatus::PENDING, 1); transaction.relayer_id = relayer_id; @@ -2124,6 +2138,30 @@ mod tests { )); } + #[test] + fn idempotent_resolve_returns_accepted_transaction_with_payload_mismatch() { + let relayer_id = RelayerId::new(); + let mut transaction = transaction_with_nonce(TransactionStatus::PENDING, 1); + transaction.relayer_id = relayer_id; + transaction.external_id = Some("idempotent-key".to_string()); + let transaction_hash = TransactionHash::new(TxHash::ZERO); + transaction.known_transaction_hash = Some(transaction_hash); + + let mut transaction_to_send = to_transaction_to_send(&transaction); + transaction_to_send.value = TransactionValue::new(U256::from(1_u128)); + + let resolved = TransactionsQueues::resolve_idempotent_transaction( + transaction.clone(), + &relayer_id, + &transaction_to_send, + "idempotent-key", + ) + .unwrap(); + + assert_eq!(resolved.id, transaction.id); + assert_eq!(resolved.known_transaction_hash, Some(transaction_hash)); + } + #[test] fn idempotent_resolve_returns_bad_request_for_prior_failed_simulation() { let relayer_id = RelayerId::new(); From 6a4d271f85e57cf44a1b8a27cb0f7d3a87b495ca Mon Sep 17 00:00:00 2001 From: Alpay Aldemir Date: Tue, 12 May 2026 12:21:52 -0700 Subject: [PATCH 4/4] fix(transaction): return 409 error if external_id relates to a different tx payload --- .../queue_system/transactions_queues.rs | 35 ++++++++----------- .../pages/integration/api/transactions.mdx | 15 +++++++- 2 files changed, 28 insertions(+), 22 deletions(-) diff --git a/crates/core/src/transaction/queue_system/transactions_queues.rs b/crates/core/src/transaction/queue_system/transactions_queues.rs index 6546e4b9..cdbdbcdb 100644 --- a/crates/core/src/transaction/queue_system/transactions_queues.rs +++ b/crates/core/src/transaction/queue_system/transactions_queues.rs @@ -283,7 +283,8 @@ impl TransactionsQueues { && transaction.external_id == transaction_to_send.external_id } - /// Resolves a prior idempotent result; accepted transactions with hashes always win. + /// Resolves a prior idempotent result. Exact retries return the original transaction; + /// reused external ids with different payloads are conflicts. fn resolve_idempotent_transaction( transaction: Transaction, relayer_id: &RelayerId, @@ -293,19 +294,6 @@ impl TransactionsQueues { let payload_matches = Self::transaction_matches_payload(&transaction, relayer_id, transaction_to_send); - if transaction.known_transaction_hash.is_some() { - if !payload_matches { - warn!( - %relayer_id, - external_id, - transaction_id = %transaction.id, - "External id reused with different payload; returning existing accepted transaction" - ); - } - - return Ok(transaction); - } - if !payload_matches { return Err(AddTransactionError::ExternalIdPayloadMismatch { relayer_id: *relayer_id, @@ -313,6 +301,10 @@ impl TransactionsQueues { }); } + if transaction.known_transaction_hash.is_some() { + return Ok(transaction); + } + Err(AddTransactionError::IdempotentTransactionFailed { relayer_id: *relayer_id, external_id: external_id.to_string(), @@ -2139,7 +2131,7 @@ mod tests { } #[test] - fn idempotent_resolve_returns_accepted_transaction_with_payload_mismatch() { + fn idempotent_resolve_rejects_payload_mismatch_with_hash() { let relayer_id = RelayerId::new(); let mut transaction = transaction_with_nonce(TransactionStatus::PENDING, 1); transaction.relayer_id = relayer_id; @@ -2150,16 +2142,17 @@ mod tests { let mut transaction_to_send = to_transaction_to_send(&transaction); transaction_to_send.value = TransactionValue::new(U256::from(1_u128)); - let resolved = TransactionsQueues::resolve_idempotent_transaction( - transaction.clone(), + let result = TransactionsQueues::resolve_idempotent_transaction( + transaction, &relayer_id, &transaction_to_send, "idempotent-key", - ) - .unwrap(); + ); - assert_eq!(resolved.id, transaction.id); - assert_eq!(resolved.known_transaction_hash, Some(transaction_hash)); + assert!(matches!( + result, + Err(super::AddTransactionError::ExternalIdPayloadMismatch { .. }) + )); } #[test] diff --git a/documentation/rrelayer/docs/pages/integration/api/transactions.mdx b/documentation/rrelayer/docs/pages/integration/api/transactions.mdx index 5437a75b..4c256555 100644 --- a/documentation/rrelayer/docs/pages/integration/api/transactions.mdx +++ b/documentation/rrelayer/docs/pages/integration/api/transactions.mdx @@ -49,6 +49,17 @@ POST /transactions/relayers/{relayer_id}/send } ``` +### Error Responses + +| Status Code | Description | +| ----------- | --------------------------------------------------------------------------- | +| `400` | Invalid transaction payload, unsupported transaction type, or failed simulation | +| `401` | Authentication required or relayer permissions denied | +| `403` | Relayer is paused | +| `404` | Relayer does not exist | +| `409` | `externalId` has already been used for this relayer with a different payload | +| `429` | Transaction rate limit exceeded | + ### Example ```bash @@ -126,7 +137,9 @@ Same format as regular send transaction: | ----------- | ----------------------------------------------------------------- | | `400` | No available relayers for the chain (all paused or internal-only) | | `404` | No relayers found for the specified chain | -| `401` | Authentication required (Basic auth only) | +| `401` | Authentication required or selected relayer API key invalid | +| `409` | `externalId` has already been used for the selected relayer with a different payload | +| `429` | Transaction rate limit exceeded | ### Example