Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion crates/core/src/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use postgres_native_tls::MakeTlsConnector;
use tokio::{task, time::timeout};
pub use tokio_postgres::types::{ToSql, Type as PgType};
use tokio_postgres::{
config::SslMode, Config, CopyInSink, Error as PgError, Row, Statement, ToStatement,
config::SslMode, error::SqlState, Config, CopyInSink, Error as PgError, Row, Statement,
ToStatement,
};

pub fn connection_string() -> Result<String, env::VarError> {
Expand Down Expand Up @@ -49,6 +50,18 @@ pub enum PostgresError {
ConnectionPoolError(#[from] RunError<tokio_postgres::Error>),
}

impl PostgresError {
pub fn is_unique_violation_on(&self, constraint: &str) -> bool {
match self {
PostgresError::PgError(error) => error.as_db_error().is_some_and(|db_error| {
db_error.code() == &SqlState::UNIQUE_VIOLATION
&& db_error.constraint() == Some(constraint)
}),
PostgresError::ConnectionPoolError(_) => false,
}
}
}

pub struct PostgresClient {
pub pool: Pool<PostgresConnectionManager<MakeTlsConnector>>,
}
Expand Down
5 changes: 5 additions & 0 deletions crates/core/src/schema/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::schema::v1_0_1::apply_v1_0_1_schema;
use crate::schema::v1_0_2::apply_v1_0_2_schema;
use crate::schema::v1_0_3::apply_v1_0_3_schema;
use crate::{
postgres::{PostgresClient, PostgresError},
schema::v1_0_0::apply_v1_0_0_schema,
Expand All @@ -8,12 +9,16 @@ use crate::{
mod v1_0_0;
mod v1_0_1;
mod v1_0_2;
mod v1_0_3;

pub use v1_0_3::TRANSACTION_EXTERNAL_ID_UNIQUE_INDEX;

/// Applies the database schema to the database.
pub async fn apply_schema(client: &PostgresClient) -> Result<(), PostgresError> {
apply_v1_0_0_schema(client).await?;
apply_v1_0_1_schema(client).await?;
apply_v1_0_2_schema(client).await?;
apply_v1_0_3_schema(client).await?;

Ok(())
}
18 changes: 18 additions & 0 deletions crates/core/src/schema/v1_0_3.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use crate::postgres::{PostgresClient, PostgresError};

pub const TRANSACTION_EXTERNAL_ID_UNIQUE_INDEX: &str = "idx_transaction_relayer_external_id_unique";

/// Applies the RRelayer database schema version 1.0.3.
/// Adds per-relayer idempotency for non-null transaction external IDs.
pub async fn apply_v1_0_3_schema(client: &PostgresClient) -> Result<(), PostgresError> {
let schema_sql = format!(
r#"
CREATE UNIQUE INDEX IF NOT EXISTS {TRANSACTION_EXTERNAL_ID_UNIQUE_INDEX}
ON relayer.transaction(relayer_id, external_id)
WHERE external_id IS NOT NULL;
"#
);

client.batch_execute(&schema_sql).await?;
Ok(())
}
4 changes: 1 addition & 3 deletions crates/core/src/startup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,8 @@ async fn start_api(
// Determine which signing provider to use (network-level or global)
let signing_provider = if let Some(ref signing_key) = network_config.signing_provider {
signing_key
} else if let Some(ref signing_key) = config.signing_provider {
signing_key
} else {
return None;
config.signing_provider.as_ref()?
};

// Check if only private keys are configured
Expand Down
23 changes: 23 additions & 0 deletions crates/core/src/transaction/db/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,27 @@ impl PostgresClient {
Some(row) => Ok(Some(build_transaction_from_transaction_view(&row))),
}
}

pub async fn get_transaction_by_relayer_and_external_id(
&self,
relayer_id: &RelayerId,
external_id: &str,
) -> Result<Option<Transaction>, PostgresError> {
let row = self
.query_one_or_none(
"
SELECT *
FROM relayer.transaction
WHERE relayer_id = $1
AND external_id = $2;
",
&[relayer_id, &external_id],
)
.await?;

match row {
None => Ok(None),
Some(row) => Ok(Some(build_transaction_from_transaction_view(&row))),
}
}
}
2 changes: 1 addition & 1 deletion crates/core/src/transaction/db/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ impl PostgresClient {
&transaction.value,
&transaction.blobs,
&transaction.speed,
&transaction.status,
&TransactionStatus::FAILED,
&transaction.expires_at,
&transaction.queued_at,
&failed_reason.chars().take(2000).collect::<String>(),
Expand Down
75 changes: 69 additions & 6 deletions crates/core/src/transaction/nonce_manager.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,32 @@
use tokio::sync::Mutex;
use std::sync::Arc;
use tokio::sync::{Mutex, OwnedMutexGuard};

use crate::transaction::types::TransactionNonce;

pub struct NonceManager {
nonce: Mutex<TransactionNonce>,
nonce: Arc<Mutex<TransactionNonce>>,
}

pub struct NonceReservation {
nonce: TransactionNonce,
committed: bool,
guard: Option<OwnedMutexGuard<TransactionNonce>>,
}

impl NonceManager {
pub fn new(current_nonce: TransactionNonce) -> Self {
NonceManager { nonce: Mutex::new(current_nonce) }
NonceManager { nonce: Arc::new(Mutex::new(current_nonce)) }
}

pub async fn get_and_increment(&self) -> TransactionNonce {
let mut nonce_guard = self.nonce.lock().await;
/// Reserves and increments the next nonce while holding the internal mutex until commit or drop.
///
/// Callers should keep the work between reservation and `NonceReservation::commit` short; if
/// the reservation is dropped before commit, the nonce cursor is rolled back automatically.
pub async fn reserve_next(&self) -> NonceReservation {
let mut nonce_guard = self.nonce.clone().lock_owned().await;
let current_nonce = *nonce_guard;
*nonce_guard = current_nonce + 1;
current_nonce
NonceReservation { nonce: current_nonce, committed: false, guard: Some(nonce_guard) }
}

pub async fn sync_with_onchain_nonce(&self, onchain_nonce: TransactionNonce) {
Expand All @@ -30,3 +41,55 @@
*nonce_guard
}
}

impl NonceReservation {
pub fn nonce(&self) -> TransactionNonce {
self.nonce
}

pub fn commit(mut self) {
self.committed = true;
self.guard.take();
}
}

impl Drop for NonceReservation {
fn drop(&mut self) {
if self.committed {
return;
}

if let Some(mut nonce_guard) = self.guard.take() {
*nonce_guard = self.nonce;
}
}
}

#[cfg(test)]
mod tests {
use super::NonceManager;
use crate::transaction::types::TransactionNonce;

#[tokio::test]
async fn reservation_rolls_back_when_dropped() {
let manager = NonceManager::new(TransactionNonce::new(7));

Check failure

Code scanning / CodeQL

Hard-coded cryptographic value Critical

This hard-coded value is used as
a nonce
.

{
let reservation = manager.reserve_next().await;
assert_eq!(reservation.nonce(), TransactionNonce::new(7));

Check failure

Code scanning / CodeQL

Hard-coded cryptographic value Critical

This hard-coded value is used as
a nonce
.
}

assert_eq!(manager.get_current_nonce().await, TransactionNonce::new(7));

Check failure

Code scanning / CodeQL

Hard-coded cryptographic value Critical

This hard-coded value is used as
a nonce
.
}

#[tokio::test]
async fn reservation_commit_keeps_increment() {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a few more test case to do here to help me know how this works:

  1. What happens if transatciont none reserves mulitple (7, 8, 9, 10), then drops 8.
  2. What happens if transatciont none reserves mulitple (7, 8, 9, 10), then commits 10, then drops 8.

We should see expected behaviour for both, which is either to re-use oldest nonce and next reserve gets 8, then next to jump up to 11 after that, or to then ignore the dropped 8 entirely if we can.

let manager = NonceManager::new(TransactionNonce::new(7));

Check failure

Code scanning / CodeQL

Hard-coded cryptographic value Critical

This hard-coded value is used as
a nonce
.

let reservation = manager.reserve_next().await;
assert_eq!(reservation.nonce(), TransactionNonce::new(7));

Check failure

Code scanning / CodeQL

Hard-coded cryptographic value Critical

This hard-coded value is used as
a nonce
.
reservation.commit();

assert_eq!(manager.get_current_nonce().await, TransactionNonce::new(8));

Check failure

Code scanning / CodeQL

Hard-coded cryptographic value Critical

This hard-coded value is used as
a nonce
.
}
}
Loading
Loading