Conversation
There was a problem hiding this comment.
Pull request overview
This PR aims to prevent database row locking during fee estimation by introducing non-locking code paths for selecting UTXOs and listing unbatched payouts, while also improving observability of fee estimation API failures.
Changes:
- Add an explicit UTXO selection mode (payout vs. estimation) and a non-locking UTXO query for estimation flows.
- Add a non-locking
list_unbatched_for_estimationpayout query and use it in the fee estimation endpoint. - Improve logging around fee estimation response decoding and mempool.space → blockstream fallback.
Reviewed changes
Copilot reviewed 8 out of 10 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
src/utxo/repo.rs |
Adds a non-locking query for reservable UTXOs used during estimation. |
src/utxo/mod.rs |
Introduces UtxoSelectionMode and threads it through UTXO exclusion selection. |
src/payout/repo.rs |
Adds a non-locking “list unbatched payouts” query for estimation. |
src/job/process_payout_queue.rs |
Passes selection mode into UTXO exclusion selection based on for_estimation. |
src/app/mod.rs |
Switches fee estimation flow to use non-locking payout listing. |
src/fees/mempool_space.rs |
Adds warning logs with HTTP status when response decoding fails. |
src/fees/blockstream.rs |
Adds warning logs with HTTP status when response decoding fails. |
src/fees/client.rs |
Logs warning when mempool.space fails and falls back to blockstream. |
.sqlx/query-e8e7….json |
Adds sqlx offline metadata for the new payout estimation query. |
.sqlx/query-82f1….json |
Adds sqlx offline metadata for the new UTXO estimation query. |
Files not reviewed (2)
- .sqlx/query-82f16641005dce2f06696a56032b230c50cc83654abddc6092cc674de473d98e.json: Language not supported
- .sqlx/query-e8e72925aaecd1c9b639793f78f860918cdf77c6e92267d9e36c60ca54e643c6.json: Language not supported
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| pub async fn find_reservable_utxos_for_estimation( | ||
| &self, | ||
| tx: &mut Transaction<'_, Postgres>, | ||
| ids: impl Iterator<Item = KeychainId>, | ||
| ) -> Result<Vec<ReservableUtxo>, UtxoError> { | ||
| let uuids = ids.into_iter().map(Uuid::from).collect::<Vec<_>>(); | ||
| let rows = sqlx::query!( | ||
| r#"SELECT keychain_id, | ||
| CASE WHEN kind = 'external' THEN true ELSE false END as income_address, | ||
| tx_id, vout, spending_batch_id, income_settled_ledger_tx_id | ||
| FROM bria_utxos | ||
| WHERE keychain_id = ANY($1) AND bdk_spent = false"#, | ||
| &uuids[..] | ||
| ) | ||
| .fetch_all(&mut **tx) | ||
| .await?; |
There was a problem hiding this comment.
find_reservable_utxos_for_estimation is a near copy of find_reservable_utxos with FOR UPDATE removed. Keeping two copies of the same query and row-mapping logic increases the risk of subtle divergence over time. Consider factoring this into a single internal method (e.g., parameterize whether the query should lock) and call it from both public entry points.
| // Here we list all Utxos that bdk might want to use and lock them (FOR UPDATE) | ||
| // This ensures that we don't have 2 concurrent psbt constructions get in the way | ||
| // of each other | ||
| let reservable_utxos = self.utxos.find_reservable_utxos(tx, ids).await?; | ||
| let reservable_utxos = match mode { | ||
| UtxoSelectionMode::Payout => self.utxos.find_reservable_utxos(tx, ids).await?, | ||
| UtxoSelectionMode::Estimation => { | ||
| self.utxos | ||
| .find_reservable_utxos_for_estimation(tx, ids) | ||
| .await? | ||
| } | ||
| }; |
There was a problem hiding this comment.
The comment above implies UTXOs are always locked with FOR UPDATE, but with the new UtxoSelectionMode::Estimation path they are fetched without locking. Please update the comment to reflect the conditional behavior so readers don’t assume estimation mode still takes row locks.
| let mut entity_events = HashMap::new(); | ||
| for row in rows { | ||
| let wallet_id = WalletId::from(row.wallet_id); | ||
| let id = WalletId::from(row.id); |
There was a problem hiding this comment.
row.id is a payout ID, but this code converts it into WalletId and uses it as the key for grouping events. This defeats the type-safety provided by the PayoutId newtype and is inconsistent with list_for_batch (which uses PayoutId::from(row.id)). Use PayoutId here (and update the associated collection/map types) so payout IDs and wallet IDs can’t be mixed accidentally.
| let id = WalletId::from(row.id); | |
| let id = PayoutId::from(row.id); |
| #[instrument(name = "payouts.list_unbatched_for_estimation", skip(self))] | ||
| pub async fn list_unbatched_for_estimation( | ||
| &self, | ||
| tx: &mut Transaction<'_, Postgres>, | ||
| account_id: AccountId, | ||
| payout_queue_id: PayoutQueueId, | ||
| ) -> Result<UnbatchedPayouts, PayoutError> { | ||
| let rows = sqlx::query!( | ||
| r#" | ||
| SELECT b.*, e.sequence, e.event | ||
| FROM bria_payouts b | ||
| JOIN bria_payout_events e ON b.id = e.id | ||
| WHERE b.batch_id IS NULL AND b.account_id = $1 AND b.payout_queue_id = $2 | ||
| ORDER BY b.created_at, b.id, e.sequence"#, | ||
| account_id as AccountId, | ||
| payout_queue_id as PayoutQueueId, | ||
| ) | ||
| .fetch_all(&mut **tx) | ||
| .await?; | ||
| let mut wallet_payouts = Vec::new(); | ||
| let mut entity_events = HashMap::new(); | ||
| for row in rows { | ||
| let wallet_id = WalletId::from(row.wallet_id); | ||
| let id = WalletId::from(row.id); | ||
| wallet_payouts.push((id, wallet_id)); | ||
| let events = entity_events.entry(id).or_insert_with(EntityEvents::new); | ||
| events.load_event(row.sequence as usize, row.event)?; | ||
| } | ||
| let mut payouts: HashMap<WalletId, Vec<UnbatchedPayout>> = HashMap::new(); | ||
| for (id, wallet_id) in wallet_payouts { | ||
| if let Some(events) = entity_events.remove(&id) { | ||
| payouts | ||
| .entry(wallet_id) | ||
| .or_default() | ||
| .push(UnbatchedPayout::try_from(events)?); | ||
| } | ||
| } | ||
| let filtered_payouts: HashMap<WalletId, Vec<UnbatchedPayout>> = payouts | ||
| .into_iter() | ||
| .map(|(wallet_id, unbatched_payouts)| { | ||
| let filtered_unbatched_payouts = unbatched_payouts | ||
| .into_iter() | ||
| .filter(|payout| { | ||
| !payout | ||
| .events | ||
| .iter() | ||
| .any(|event| matches!(event, PayoutEvent::Cancelled { .. })) | ||
| }) | ||
| .collect(); | ||
| (wallet_id, filtered_unbatched_payouts) | ||
| }) | ||
| .collect(); | ||
| Ok(UnbatchedPayouts::new(filtered_payouts)) | ||
| } |
There was a problem hiding this comment.
list_unbatched_for_estimation largely duplicates list_unbatched with the only difference being the absence of FOR UPDATE. This duplication makes it easy for the two code paths to drift (e.g., filtering logic, ordering, joins). Consider extracting a shared private helper that takes a for_update: bool (or similar) and reuses the same row-to-entity reconstruction code.
No description provided.