1010 */
1111
1212import { supabase } from "./supabase.js" ;
13- import { findMatchingPayment } from "./stellar.js" ;
13+ import { findMatchingPayment , findAnyRecentPayment } from "./stellar.js" ;
1414import { sendWebhook , isEventSubscribed } from "./webhooks.js" ;
1515import { sendReceiptEmail } from "./email.js" ;
1616import { renderReceiptEmail } from "./email-templates.js" ;
@@ -68,7 +68,24 @@ async function pollPendingPayments() {
6868
6969 logger . info ( { count : pending . length } , "Horizon poller: checking pending payments" ) ;
7070
71- await Promise . allSettled ( pending . map ( p => checkPayment ( p ) ) ) ;
71+ // Group by recipient+asset to process same-address payments sequentially
72+ // This prevents two payments with identical recipient+amount from both
73+ // claiming the same on-chain transaction in the same cycle.
74+ const groups = new Map ( ) ;
75+ for ( const p of pending ) {
76+ const key = `${ p . recipient } :${ p . asset } ` ;
77+ if ( ! groups . has ( key ) ) groups . set ( key , [ ] ) ;
78+ groups . get ( key ) . push ( p ) ;
79+ }
80+
81+ // Process each group sequentially, different groups in parallel
82+ await Promise . allSettled (
83+ Array . from ( groups . values ( ) ) . map ( async ( group ) => {
84+ for ( const p of group ) {
85+ await checkPayment ( p ) ;
86+ }
87+ } )
88+ ) ;
7289
7390 } catch ( err ) {
7491 logger . warn ( { err } , "Horizon poller: unexpected error" ) ;
@@ -97,7 +114,65 @@ async function checkPayment(payment) {
97114
98115 if ( ! match ) {
99116 logger . info ( { paymentId : payment . id } , "Horizon poller: no match yet" ) ;
100- return ; // not confirmed yet
117+
118+ // Check for wrong-amount payment
119+ const anyPayment = await findAnyRecentPayment ( {
120+ recipient : payment . recipient ,
121+ assetCode : payment . asset ,
122+ assetIssuer : payment . asset_issuer ,
123+ createdAt : payment . created_at ,
124+ } ) ;
125+
126+ if ( anyPayment ) {
127+ const received = Number ( anyPayment . received_amount ) ;
128+ const expected = Number ( payment . amount ) ;
129+ const diff = received - expected ;
130+
131+ if ( diff < - 0.0000001 ) {
132+ // Underpayment — mark failed
133+ await supabase . from ( "payments" ) . update ( {
134+ status : "failed" ,
135+ tx_id : anyPayment . transaction_hash ,
136+ metadata : {
137+ ...( payment . metadata || { } ) ,
138+ failure_reason : "underpayment" ,
139+ expected_amount : expected ,
140+ received_amount : received ,
141+ shortfall : Number ( ( expected - received ) . toFixed ( 7 ) ) ,
142+ } ,
143+ } ) . eq ( "id" , payment . id ) . eq ( "status" , "pending" ) ;
144+
145+ const redis = await connectRedisClient ( ) ;
146+ await invalidatePaymentCache ( redis , payment . id ) ;
147+ logger . info ( { paymentId : payment . id , expected, received } , "Horizon poller: underpayment detected — marked failed" ) ;
148+
149+ // Notify via SSE and Socket.io
150+ streamManager . notify ( payment . id , "payment.failed" , { status : "failed" , reason : "underpayment" , expected_amount : expected , received_amount : received } ) ;
151+ if ( _io && payment . merchant_id ) {
152+ _io . to ( `merchant:${ payment . merchant_id } ` ) . emit ( "payment:failed" , { id : payment . id , reason : "underpayment" , expected_amount : expected , received_amount : received } ) ;
153+ }
154+ } else if ( diff > 0.0000001 ) {
155+ // Overpayment — confirm but flag
156+ const latencySeconds = ( Date . now ( ) - new Date ( payment . created_at ) . getTime ( ) ) / 1000 ;
157+ const { data : updated } = await supabase . from ( "payments" ) . update ( {
158+ status : "confirmed" ,
159+ tx_id : anyPayment . transaction_hash ,
160+ completion_duration_seconds : Math . floor ( latencySeconds ) ,
161+ metadata : { ...( payment . metadata || { } ) , overpayment : true , expected_amount : expected , received_amount : received , excess : Number ( ( received - expected ) . toFixed ( 7 ) ) } ,
162+ } ) . eq ( "id" , payment . id ) . eq ( "status" , "pending" ) . is ( "tx_id" , null ) . select ( "id" ) . maybeSingle ( ) ;
163+
164+ if ( ! updated ) return ; // already claimed
165+
166+ const redis = await connectRedisClient ( ) ;
167+ await invalidatePaymentCache ( redis , payment . id ) ;
168+ logger . info ( { paymentId : payment . id , expected, received } , "Horizon poller: overpayment — confirmed with flag" ) ;
169+ streamManager . notify ( payment . id , "payment.confirmed" , { status : "confirmed" , tx_id : anyPayment . transaction_hash , overpayment : true } ) ;
170+ if ( _io && payment . merchant_id ) {
171+ _io . to ( `merchant:${ payment . merchant_id } ` ) . emit ( "payment:confirmed" , { id : payment . id , tx_id : anyPayment . transaction_hash , overpayment : true } ) ;
172+ }
173+ }
174+ }
175+ return ;
101176 }
102177
103178 // Guard: ensure this tx_hash hasn't already confirmed a different payment
@@ -116,22 +191,37 @@ async function checkPayment(payment) {
116191 const createdAt = new Date ( payment . created_at ) ;
117192 const latencySeconds = ( Date . now ( ) - createdAt . getTime ( ) ) / 1000 ;
118193
119- // Update DB
120- const { error : updateError } = await supabase
194+ // Atomic update: only succeeds if tx_id is still NULL (not claimed by another payment).
195+ // The unique index on tx_id provides the final database-level guarantee.
196+ const { data : updated , error : updateError } = await supabase
121197 . from ( "payments" )
122198 . update ( {
123199 status : "confirmed" ,
124200 tx_id : match . transaction_hash ,
125201 completion_duration_seconds : Math . floor ( latencySeconds ) ,
126202 } )
127203 . eq ( "id" , payment . id )
128- . eq ( "status" , "pending" ) ; // guard against double-confirm
204+ . eq ( "status" , "pending" )
205+ . is ( "tx_id" , null ) // ← only claim if not already taken
206+ . select ( "id" )
207+ . maybeSingle ( ) ;
129208
130209 if ( updateError ) {
210+ // Unique constraint violation — another payment already claimed this tx
211+ if ( updateError . code === "23505" ) {
212+ logger . warn ( { paymentId : payment . id , txHash : match . transaction_hash } , "Horizon poller: tx_hash already claimed by another payment (unique constraint)" ) ;
213+ return ;
214+ }
131215 logger . warn ( { err : updateError , paymentId : payment . id } , "Horizon poller: DB update failed" ) ;
132216 return ;
133217 }
134218
219+ // If updated is null, the row was already confirmed or claimed — skip
220+ if ( ! updated ) {
221+ logger . info ( { paymentId : payment . id } , "Horizon poller: payment already processed, skipping" ) ;
222+ return ;
223+ }
224+
135225 // Invalidate Redis cache
136226 const redis = await connectRedisClient ( ) ;
137227 await invalidatePaymentCache ( redis , payment . id ) ;
0 commit comments