diff --git a/crates/indexer/src/indexer/cache.rs b/crates/indexer/src/indexer/cache.rs new file mode 100644 index 0000000..2b272d3 --- /dev/null +++ b/crates/indexer/src/indexer/cache.rs @@ -0,0 +1,274 @@ +use crate::models::ActiveUtxo; +use simplicityhl::elements::OutPoint; +use std::collections::HashMap; + +#[derive(Debug)] +enum PendingOp { + Upsert(ActiveUtxo), + Delete, +} + +#[derive(Debug)] +pub struct UtxoCache { + inner: HashMap, + block_pending: Option>, +} + +impl UtxoCache { + pub fn new() -> Self { + Self { + inner: HashMap::new(), + block_pending: None, + } + } + + pub fn with_capacity(capacity: usize) -> Self { + Self { + inner: HashMap::with_capacity(capacity), + block_pending: None, + } + } + + pub fn begin_block(&mut self) { + if self.block_pending.is_none() { + self.block_pending = Some(HashMap::new()); + } + } + + pub fn commit_block(&mut self) { + let Some(pending) = self.block_pending.take() else { + return; + }; + + for (outpoint, op) in pending { + match op { + PendingOp::Upsert(active_utxo) => { + self.inner.insert(outpoint, active_utxo); + } + PendingOp::Delete => { + self.inner.remove(&outpoint); + } + } + } + } + + pub fn abort_block(&mut self) { + self.block_pending = None; + } + + pub fn insert(&mut self, outpoint: OutPoint, active_utxo: ActiveUtxo) { + if let Some(pending) = self.block_pending.as_mut() { + pending.insert(outpoint, PendingOp::Upsert(active_utxo)); + } else { + self.inner.insert(outpoint, active_utxo); + } + } + + pub fn get(&self, outpoint: &OutPoint) -> Option<&ActiveUtxo> { + if let Some(pending) = self.block_pending.as_ref() + && let Some(op) = pending.get(outpoint) + { + return match op { + PendingOp::Upsert(active_utxo) => Some(active_utxo), + PendingOp::Delete => None, + }; + } + + self.inner.get(outpoint) + } + + pub fn remove(&mut self, outpoint: &OutPoint) { + if let Some(pending) = self.block_pending.as_mut() { + pending.insert(*outpoint, PendingOp::Delete); + } else { + self.inner.remove(outpoint); + } + } +} + +impl Default for UtxoCache { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::UtxoCache; + use crate::models::{ActiveUtxo, UtxoData, UtxoType}; + use simplicityhl::elements::{OutPoint, Txid, hashes::Hash}; + use uuid::Uuid; + + fn outpoint(txid_byte: u8, vout: u32) -> OutPoint { + OutPoint { + txid: Txid::from_slice(&[txid_byte; 32]).expect("valid txid bytes"), + vout, + } + } + + fn active_utxo(offer_byte: u8) -> ActiveUtxo { + ActiveUtxo { + offer_id: Uuid::from_bytes([offer_byte; 16]), + data: UtxoData::Offer(UtxoType::PreLock), + } + } + + #[test] + fn insert_and_remove_without_active_block_apply_immediately() { + let mut cache = UtxoCache::new(); + let op = outpoint(1, 0); + + cache.insert(op, active_utxo(10)); + assert_eq!( + cache.get(&op).map(|u| u.offer_id), + Some(Uuid::from_bytes([10; 16])) + ); + + cache.remove(&op); + assert!(cache.get(&op).is_none()); + } + + #[test] + fn begin_block_is_idempotent_and_preserves_pending_delta() { + let mut cache = UtxoCache::new(); + let op = outpoint(2, 0); + + cache.begin_block(); + cache.insert(op, active_utxo(20)); + cache.begin_block(); + cache.commit_block(); + + assert_eq!( + cache.get(&op).map(|u| u.offer_id), + Some(Uuid::from_bytes([20; 16])) + ); + } + + #[test] + fn pending_changes_are_visible_before_commit() { + let mut cache = UtxoCache::new(); + let existing = outpoint(3, 0); + let pending = outpoint(4, 0); + + cache.insert(existing, active_utxo(30)); + cache.begin_block(); + cache.remove(&existing); + cache.insert(pending, active_utxo(40)); + + assert!(cache.get(&existing).is_none()); + assert_eq!( + cache.get(&pending).map(|u| u.offer_id), + Some(Uuid::from_bytes([40; 16])) + ); + } + + #[test] + fn abort_block_discards_all_pending_changes() { + let mut cache = UtxoCache::new(); + let existing = outpoint(5, 0); + let pending = outpoint(6, 0); + + cache.insert(existing, active_utxo(50)); + cache.begin_block(); + cache.remove(&existing); + cache.insert(pending, active_utxo(60)); + cache.abort_block(); + + assert_eq!( + cache.get(&existing).map(|u| u.offer_id), + Some(Uuid::from_bytes([50; 16])) + ); + assert!(cache.get(&pending).is_none()); + } + + #[test] + fn commit_block_applies_pending_changes() { + let mut cache = UtxoCache::new(); + let existing = outpoint(7, 0); + let pending = outpoint(8, 0); + + cache.insert(existing, active_utxo(70)); + cache.begin_block(); + cache.remove(&existing); + cache.insert(pending, active_utxo(80)); + cache.commit_block(); + + assert!(cache.get(&existing).is_none()); + assert_eq!( + cache.get(&pending).map(|u| u.offer_id), + Some(Uuid::from_bytes([80; 16])) + ); + } + + #[test] + fn latest_pending_operation_wins_for_same_outpoint() { + let mut cache = UtxoCache::new(); + let op = outpoint(9, 0); + + cache.begin_block(); + cache.insert(op, active_utxo(90)); + cache.remove(&op); + cache.insert(op, active_utxo(91)); + cache.commit_block(); + + assert_eq!( + cache.get(&op).map(|u| u.offer_id), + Some(Uuid::from_bytes([91; 16])) + ); + } + + #[test] + fn commit_without_active_block_is_noop() { + let mut cache = UtxoCache::new(); + let op = outpoint(10, 0); + cache.insert(op, active_utxo(100)); + + cache.commit_block(); + + assert_eq!( + cache.get(&op).map(|u| u.offer_id), + Some(Uuid::from_bytes([100; 16])) + ); + } + + #[test] + fn abort_without_active_block_is_noop() { + let mut cache = UtxoCache::new(); + let op = outpoint(11, 0); + cache.insert(op, active_utxo(110)); + + cache.abort_block(); + + assert_eq!( + cache.get(&op).map(|u| u.offer_id), + Some(Uuid::from_bytes([110; 16])) + ); + } + + #[test] + fn abort_then_retry_produces_correct_state() { + let mut cache = UtxoCache::new(); + let existing = outpoint(12, 0); + let aborted_new = outpoint(13, 0); + let committed_new = outpoint(14, 0); + + cache.insert(existing, active_utxo(120)); + + cache.begin_block(); + cache.remove(&existing); + cache.insert(aborted_new, active_utxo(130)); + cache.abort_block(); + + cache.begin_block(); + cache.remove(&existing); + cache.insert(committed_new, active_utxo(140)); + cache.commit_block(); + + assert!(cache.get(&existing).is_none()); + assert!(cache.get(&aborted_new).is_none()); + assert_eq!( + cache.get(&committed_new).map(|u| u.offer_id), + Some(Uuid::from_bytes([140; 16])) + ); + } +} diff --git a/crates/indexer/src/indexer/db.rs b/crates/indexer/src/indexer/db.rs index 63dfd00..091f2f1 100644 --- a/crates/indexer/src/indexer/db.rs +++ b/crates/indexer/src/indexer/db.rs @@ -1,13 +1,12 @@ -use std::collections::HashMap; - use simplicityhl::elements::{OutPoint, Txid, hashes::Hash, hex::ToHex}; use sqlx::PgPool; use uuid::Uuid; use crate::db::DbTx; +use crate::indexer::cache::UtxoCache; use crate::models::{ ActiveUtxo, OfferModel, OfferParticipantModel, OfferStatus, OfferUtxoModel, ParticipantType, - UtxoCache, UtxoData, UtxoType, + UtxoData, UtxoType, }; #[tracing::instrument( @@ -342,7 +341,7 @@ pub async fn load_utxo_cache(db: &PgPool) -> anyhow::Result { let offers_count = offer_rows.len(); let offer_participants_count = participant_rows.len(); - let mut cache: UtxoCache = HashMap::with_capacity(offers_count + offer_participants_count); + let mut cache = UtxoCache::with_capacity(offers_count + offer_participants_count); for rec in offer_rows { let outpoint = OutPoint { diff --git a/crates/indexer/src/indexer/handlers/lending_creation.rs b/crates/indexer/src/indexer/handlers/lending_creation.rs index 2c66271..6c818f3 100644 --- a/crates/indexer/src/indexer/handlers/lending_creation.rs +++ b/crates/indexer/src/indexer/handlers/lending_creation.rs @@ -1,11 +1,11 @@ use simplicityhl::elements::{OutPoint, Transaction, Txid, hashes::Hash}; use uuid::Uuid; -use crate::indexer::db; +use crate::indexer::{cache::UtxoCache, db}; use crate::models::{OfferUtxoModel, UtxoData, UtxoType}; use crate::{ db::DbTx, - models::{ActiveUtxo, OfferStatus, UtxoCache}, + models::{ActiveUtxo, OfferStatus}, }; #[tracing::instrument( diff --git a/crates/indexer/src/indexer/handlers/loan_liquidation.rs b/crates/indexer/src/indexer/handlers/loan_liquidation.rs index 4b1b7b3..c84f4fb 100644 --- a/crates/indexer/src/indexer/handlers/loan_liquidation.rs +++ b/crates/indexer/src/indexer/handlers/loan_liquidation.rs @@ -1,12 +1,9 @@ use simplicityhl::elements::{OutPoint, Transaction, Txid, hashes::Hash}; use uuid::Uuid; -use crate::indexer::db; +use crate::indexer::{cache::UtxoCache, db}; use crate::models::{OfferUtxoModel, UtxoType}; -use crate::{ - db::DbTx, - models::{OfferStatus, UtxoCache}, -}; +use crate::{db::DbTx, models::OfferStatus}; #[tracing::instrument( name = "Handling offer liquidation", diff --git a/crates/indexer/src/indexer/handlers/loan_repayment.rs b/crates/indexer/src/indexer/handlers/loan_repayment.rs index 0a001de..1a06a1d 100644 --- a/crates/indexer/src/indexer/handlers/loan_repayment.rs +++ b/crates/indexer/src/indexer/handlers/loan_repayment.rs @@ -1,11 +1,11 @@ use simplicityhl::elements::{OutPoint, Transaction, Txid, hashes::Hash}; use uuid::Uuid; -use crate::indexer::db; +use crate::indexer::{cache::UtxoCache, db}; use crate::models::{OfferUtxoModel, UtxoData, UtxoType}; use crate::{ db::DbTx, - models::{ActiveUtxo, OfferStatus, UtxoCache}, + models::{ActiveUtxo, OfferStatus}, }; #[tracing::instrument( diff --git a/crates/indexer/src/indexer/handlers/offer_cancellation.rs b/crates/indexer/src/indexer/handlers/offer_cancellation.rs index 8e255d0..be42033 100644 --- a/crates/indexer/src/indexer/handlers/offer_cancellation.rs +++ b/crates/indexer/src/indexer/handlers/offer_cancellation.rs @@ -1,12 +1,9 @@ use simplicityhl::elements::{OutPoint, Transaction, Txid, hashes::Hash}; use uuid::Uuid; -use crate::indexer::db; +use crate::indexer::{cache::UtxoCache, db}; use crate::models::{OfferUtxoModel, UtxoType}; -use crate::{ - db::DbTx, - models::{OfferStatus, UtxoCache}, -}; +use crate::{db::DbTx, models::OfferStatus}; #[tracing::instrument( name = "Handling offer cancellation", diff --git a/crates/indexer/src/indexer/handlers/offers.rs b/crates/indexer/src/indexer/handlers/offers.rs index 79a5bf8..b4d513b 100644 --- a/crates/indexer/src/indexer/handlers/offers.rs +++ b/crates/indexer/src/indexer/handlers/offers.rs @@ -3,10 +3,11 @@ use uuid::Uuid; use crate::indexer::handlers::{handle_lending_creation, handle_offer_cancellation}; use crate::indexer::{ - handle_loan_liquidation, handle_loan_repayment, handle_repayment_claim, is_loan_repayment_tx, + cache::UtxoCache, handle_loan_liquidation, handle_loan_repayment, handle_repayment_claim, + is_loan_repayment_tx, }; use crate::models::UtxoType; -use crate::{db::DbTx, indexer::is_offer_cancellation_tx, models::UtxoCache}; +use crate::{db::DbTx, indexer::is_offer_cancellation_tx}; #[tracing::instrument( name = "Handling offer status transition", diff --git a/crates/indexer/src/indexer/handlers/participants.rs b/crates/indexer/src/indexer/handlers/participants.rs index 5f1a61e..7c8dcc9 100644 --- a/crates/indexer/src/indexer/handlers/participants.rs +++ b/crates/indexer/src/indexer/handlers/participants.rs @@ -3,12 +3,9 @@ use uuid::Uuid; use simplicityhl::elements::hashes::Hash; use simplicityhl::elements::{OutPoint, Transaction}; -use crate::indexer::db; +use crate::indexer::{cache::UtxoCache, db}; use crate::models::{OfferParticipantModel, ParticipantType, UtxoData}; -use crate::{ - db::DbTx, - models::{ActiveUtxo, UtxoCache}, -}; +use crate::{db::DbTx, models::ActiveUtxo}; #[tracing::instrument( name = "Handling offer participant movement", diff --git a/crates/indexer/src/indexer/handlers/pre_lock.rs b/crates/indexer/src/indexer/handlers/pre_lock.rs index 2f554a9..9c8d3e4 100644 --- a/crates/indexer/src/indexer/handlers/pre_lock.rs +++ b/crates/indexer/src/indexer/handlers/pre_lock.rs @@ -5,11 +5,11 @@ use lending_contracts::{ sdk::{extract_arguments_from_tx, taproot_unspendable_internal_key}, }; -use crate::indexer::db; +use crate::indexer::{cache::UtxoCache, db}; use crate::models::{OfferModel, OfferUtxoModel, UtxoType}; use crate::{ db::DbTx, - models::{ActiveUtxo, OfferParticipantModel, ParticipantType, UtxoCache, UtxoData}, + models::{ActiveUtxo, OfferParticipantModel, ParticipantType, UtxoData}, }; #[tracing::instrument( diff --git a/crates/indexer/src/indexer/handlers/repayment_claim.rs b/crates/indexer/src/indexer/handlers/repayment_claim.rs index e6d47f6..b8cda41 100644 --- a/crates/indexer/src/indexer/handlers/repayment_claim.rs +++ b/crates/indexer/src/indexer/handlers/repayment_claim.rs @@ -1,12 +1,9 @@ use simplicityhl::elements::{OutPoint, Txid, hashes::Hash}; use uuid::Uuid; -use crate::indexer::db; +use crate::indexer::{cache::UtxoCache, db}; use crate::models::{OfferUtxoModel, UtxoType}; -use crate::{ - db::DbTx, - models::{OfferStatus, UtxoCache}, -}; +use crate::{db::DbTx, models::OfferStatus}; #[tracing::instrument( name = "Handling repayment tokens claim", diff --git a/crates/indexer/src/indexer/mod.rs b/crates/indexer/src/indexer/mod.rs index 1cb9fb4..227bc22 100644 --- a/crates/indexer/src/indexer/mod.rs +++ b/crates/indexer/src/indexer/mod.rs @@ -1,3 +1,4 @@ +mod cache; mod db; mod handlers; mod processors; diff --git a/crates/indexer/src/indexer/processors.rs b/crates/indexer/src/indexer/processors.rs index 9ca5659..1124b1f 100644 --- a/crates/indexer/src/indexer/processors.rs +++ b/crates/indexer/src/indexer/processors.rs @@ -7,8 +7,8 @@ use uuid::Uuid; use crate::{ db::DbTx, esplora_client::EsploraClient, - indexer::{db, handlers, is_pre_lock_creation_tx}, - models::{UtxoCache, UtxoData}, + indexer::{cache::UtxoCache, db, handlers, is_pre_lock_creation_tx}, + models::UtxoData, }; #[tracing::instrument( @@ -33,21 +33,35 @@ pub async fn process_block( } let mut sql_tx = db.begin().await?; + cache.begin_block(); - for tx in txs { - process_tx(&mut sql_tx, &tx, cache, block_height).await?; - } + let process_result = async { + for tx in txs { + process_tx(&mut sql_tx, &tx, cache, block_height).await?; + } - db::upsert_sync_state(&mut sql_tx, block_height, block_hash).await?; - sql_tx.commit().await?; + db::upsert_sync_state(&mut sql_tx, block_height, block_hash).await?; + sql_tx.commit().await?; - tracing::info!( - "Successfully indexed block #{} ({} txs)", - block_height, - tx_count - ); + Ok(()) + } + .await; - Ok(()) + match process_result { + Ok(()) => { + cache.commit_block(); + tracing::info!( + "Successfully indexed block #{} ({} txs)", + block_height, + tx_count + ); + Ok(()) + } + Err(error) => { + cache.abort_block(); + Err(error) + } + } } #[tracing::instrument( diff --git a/crates/indexer/src/models/offer.rs b/crates/indexer/src/models/offer.rs index 6b01003..655dcb9 100644 --- a/crates/indexer/src/models/offer.rs +++ b/crates/indexer/src/models/offer.rs @@ -1,9 +1,7 @@ -use std::collections::HashMap; - use serde::{Deserialize, Serialize}; use uuid::Uuid; -use simplicityhl::elements::{OutPoint, Txid, hashes::Hash}; +use simplicityhl::elements::{Txid, hashes::Hash}; use lending_contracts::pre_lock::build_arguments::PreLockArguments; @@ -21,8 +19,6 @@ pub struct ActiveUtxo { pub data: UtxoData, } -pub type UtxoCache = HashMap; - #[derive(Debug, Clone, Copy, PartialEq, Eq, sqlx::Type, Serialize, Deserialize)] #[sqlx(type_name = "offer_status", rename_all = "lowercase")] #[serde(rename_all = "lowercase")]