|
| 1 | +/** |
| 2 | + * services/eventIndexer.js |
| 3 | + * |
| 4 | + * Background Soroban Event Listener daemon for TradeFlow-API. |
| 5 | + * This service polls the Soroban RPC for events emitted by the specified Pool contract. |
| 6 | + * When a 'Swap' event is detected, it parses the data and saves it to the database with Prisma. |
| 7 | + */ |
| 8 | + |
| 9 | +const { rpc } = require('@stellar/stellar-sdk'); |
| 10 | +const { PrismaClient } = require('@prisma/client'); |
| 11 | +const { parseScVal } = require('./scValParser'); |
| 12 | + |
| 13 | +// In case dotenv is not installed as a top-level dependency, |
| 14 | +// we try to load it safely. Most Node.js environments for this project should have it. |
| 15 | +try { |
| 16 | + require('dotenv').config(); |
| 17 | +} catch (e) { |
| 18 | + console.warn('⚠️ dotenv not loaded. Ensure environment variables are set manually.'); |
| 19 | +} |
| 20 | + |
| 21 | +const prisma = new PrismaClient(); |
| 22 | + |
| 23 | +// Configuration |
| 24 | +const RPC_URL = process.env.SOROBAN_RPC_URL || 'https://soroban-testnet.stellar.org'; |
| 25 | +const POOL_ADDRESS = process.env.POOL_ADDRESS; |
| 26 | +const POLL_INTERVAL = parseInt(process.env.INDEXER_POLL_INTERVAL || '5000'); |
| 27 | + |
| 28 | +if (!POOL_ADDRESS) { |
| 29 | + console.error('❌ POOL_ADDRESS is not defined in environment variables.'); |
| 30 | + console.error('Please add POOL_ADDRESS="YOUR_CONTRACT_ID" to your .env file.'); |
| 31 | + process.exit(1); |
| 32 | +} |
| 33 | + |
| 34 | +const server = new rpc.Server(RPC_URL); |
| 35 | + |
| 36 | +/** |
| 37 | + * Main daemon loop to poll for Soroban events. |
| 38 | + */ |
| 39 | +async function startIndexer() { |
| 40 | + console.log('--- 🚀 TradeFlow Soroban Event Indexer ---'); |
| 41 | + console.log(`📡 RPC Node: ${RPC_URL}`); |
| 42 | + console.log(`🎯 Pool Contract: ${POOL_ADDRESS}`); |
| 43 | + console.log('-------------------------------------------'); |
| 44 | + |
| 45 | + // Start from the latest ledger initially |
| 46 | + let currentLedgerSequence; |
| 47 | + try { |
| 48 | + const latestLedger = await server.getLatestLedger(); |
| 49 | + currentLedgerSequence = latestLedger.sequence; |
| 50 | + console.log(`Initial Start Ledger: ${currentLedgerSequence}`); |
| 51 | + } catch (err) { |
| 52 | + console.error('❌ Failed to connect to Soroban RPC. Verify your SOROBAN_RPC_URL.'); |
| 53 | + process.exit(1); |
| 54 | + } |
| 55 | + |
| 56 | + // Periodic polling |
| 57 | + setInterval(async () => { |
| 58 | + try { |
| 59 | + const response = await server.getEvents({ |
| 60 | + startLedger: currentLedgerSequence, |
| 61 | + filters: [ |
| 62 | + { |
| 63 | + type: 'contract', |
| 64 | + contractIds: [POOL_ADDRESS], |
| 65 | + }, |
| 66 | + ], |
| 67 | + limit: 10, |
| 68 | + }); |
| 69 | + |
| 70 | + if (response.events && response.events.length > 0) { |
| 71 | + console.log(`Found ${response.events.length} new event(s). Processing...`); |
| 72 | + |
| 73 | + for (const event of response.events) { |
| 74 | + // Process event |
| 75 | + await handleContractEvent(event); |
| 76 | + } |
| 77 | + |
| 78 | + // Advance ledger checkpoint |
| 79 | + const latestProcessed = Math.max(...response.events.map(e => parseInt(e.ledger))); |
| 80 | + currentLedgerSequence = latestProcessed + 1; |
| 81 | + } |
| 82 | + } catch (error) { |
| 83 | + console.error('⚠️ Indexer Polling Error:', error.message); |
| 84 | + } |
| 85 | + }, POLL_INTERVAL); |
| 86 | +} |
| 87 | + |
| 88 | +/** |
| 89 | + * Handles an individual contract event. |
| 90 | + * Filters for 'Swap' events and indexes them. |
| 91 | + * |
| 92 | + * @param {rpc.Api.GetEventsResponse.Event} event - The Soroban event from RPC. |
| 93 | + */ |
| 94 | +async function handleContractEvent(event) { |
| 95 | + try { |
| 96 | + // Decode topics to identify the event |
| 97 | + const topics = event.topic.map(t => parseScVal(t)); |
| 98 | + |
| 99 | + // Check if topics contain "Swap" (case-insensitive) |
| 100 | + const isSwapEvent = topics.some(topic => |
| 101 | + typeof topic === 'string' && topic.toLowerCase() === 'swap' |
| 102 | + ); |
| 103 | + |
| 104 | + if (isSwapEvent) { |
| 105 | + console.log(`✅ Detected SwapEvent in ledger ${event.ledger}`); |
| 106 | + |
| 107 | + const payload = parseScVal(event.value); |
| 108 | + if (!payload) return; |
| 109 | + |
| 110 | + console.log('Decoded Payload:', JSON.stringify(payload, null, 2)); |
| 111 | + |
| 112 | + // Map Soroban event data to our Prisma Trade model |
| 113 | + // Expected structure from SwapEvent: { user, amount_in, amount_out } |
| 114 | + const tradeData = { |
| 115 | + poolId: event.contractId, |
| 116 | + userAddress: payload.user || payload.address || 'Unknown', |
| 117 | + amountIn: (payload.amount_in || payload.amountIn || '0').toString(), |
| 118 | + amountOut: (payload.amount_out || payload.amountOut || '0').toString(), |
| 119 | + timestamp: new Date(), |
| 120 | + }; |
| 121 | + |
| 122 | + // Save to Database via Prisma |
| 123 | + const savedTrade = await prisma.trade.create({ |
| 124 | + data: tradeData |
| 125 | + }); |
| 126 | + |
| 127 | + console.log(`💾 Indexed Trade saved. DB ID: ${savedTrade.id}`); |
| 128 | + } |
| 129 | + } catch (error) { |
| 130 | + console.error('❌ Failed to process event:', error.message); |
| 131 | + } |
| 132 | +} |
| 133 | + |
| 134 | +// Graceful Shut-off |
| 135 | +process.on('SIGINT', async () => { |
| 136 | + console.log('\n--- Indexer Shutting Down ---'); |
| 137 | + await prisma.$disconnect(); |
| 138 | + process.exit(0); |
| 139 | +}); |
| 140 | + |
| 141 | +startIndexer(); |
0 commit comments