Skip to content

Commit 18dfa9c

Browse files
committed
feat: implement background Soroban event listener and XDR indexer (#82, #158)
1 parent 62d3a5f commit 18dfa9c

2 files changed

Lines changed: 187 additions & 1 deletion

File tree

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@
2626
"prisma:generate": "prisma generate",
2727
"prisma:push": "prisma db push",
2828
"prisma:migrate": "prisma migrate dev",
29-
"prisma:studio": "prisma studio"
29+
"prisma:studio": "prisma studio",
30+
"start:indexer": "node services/eventIndexer.js"
3031
},
3132
"dependencies": {
3233
"@nestjs/common": "^10.4.22",

services/eventIndexer.js

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
const { rpc, xdr, scValToNative } = require('@stellar/stellar-sdk');
2+
const { PrismaClient } = require('@prisma/client');
3+
4+
// Initialize Prisma Client
5+
const prisma = new PrismaClient();
6+
7+
// Configuration
8+
// In a real environment, these would be in a .env file
9+
const SOROBAN_RPC_URL = process.env.SOROBAN_RPC_URL || 'https://soroban-testnet.stellar.org';
10+
const POOL_ADDRESS = process.env.POOL_ADDRESS || 'CC7J6PQTYY7W25Z6M4S6S6S6S6S6S6S6S6S6S6S6S6S6S6S6S6S6'; // Placeholder pool address
11+
12+
/**
13+
* Parsing utility to decode Soroban XDR payloads into readable JSON
14+
*/
15+
const SorobanParser = {
16+
/**
17+
* Decodes a Base64 encoded ScVal XDR string into a native JS type
18+
*/
19+
decodeScVal(xdrString) {
20+
if (!xdrString) return null;
21+
try {
22+
const val = xdr.ScVal.fromXDR(xdrString, 'base64');
23+
return scValToNative(val);
24+
} catch (error) {
25+
console.error('Error decoding ScVal:', error);
26+
return null;
27+
}
28+
},
29+
30+
/**
31+
* Parses a full SwapEvent from its raw XDR components
32+
*/
33+
parseSwapEvent(event) {
34+
try {
35+
// Soroban events have topics (array of ScVal) and a value (ScVal)
36+
const topics = event.topics.map(t => this.decodeScVal(t));
37+
const payload = this.decodeScVal(event.value);
38+
39+
// Standard SwapEvent structure:
40+
// topics[0]: 'Swap' (symbol/string)
41+
// topics[1]: User address (AccountID or ContractID)
42+
// payload: { amount_in: i128, amount_out: i128, token_in: Address, token_out: Address }
43+
44+
const eventName = topics[0];
45+
const userAddress = topics[1];
46+
47+
if (eventName !== 'Swap') {
48+
return null;
49+
}
50+
51+
return {
52+
eventName,
53+
userAddress: userAddress.toString(),
54+
amountIn: payload.amount_in ? payload.amount_in.toString() : payload.amountIn?.toString() || '0',
55+
amountOut: payload.amount_out ? payload.amount_out.toString() : payload.amountOut?.toString() || '0',
56+
timestamp: new Date().toISOString()
57+
};
58+
} catch (error) {
59+
console.error('Failed to parse swap event:', error);
60+
return null;
61+
}
62+
}
63+
};
64+
65+
const server = new rpc.Server(SOROBAN_RPC_URL);
66+
67+
/**
68+
* Main daemon loop to listen for Soroban events
69+
*/
70+
async function runIndexer() {
71+
console.log('--- TradeFlow Event Indexer Running ---');
72+
console.log(`RPC Node: ${SOROBAN_RPC_URL}`);
73+
console.log(`Watching Pool: ${POOL_ADDRESS}`);
74+
75+
// We'd ideally store this in the database to resume from failure
76+
let lastLedger = 0;
77+
78+
// Try to start from current ledger if we don't know where we are
79+
try {
80+
const info = await server.getLatestLedger();
81+
lastLedger = info.sequence;
82+
console.log(`Starting from current ledger: ${lastLedger}`);
83+
} catch (err) {
84+
console.warn('Could not fetch latest ledger, starting from 0');
85+
}
86+
87+
while (true) {
88+
try {
89+
// we use getEvents to poll for contract events
90+
const response = await server.getEvents({
91+
startLedger: lastLedger,
92+
filters: [
93+
{
94+
type: 'contract',
95+
contractIds: [POOL_ADDRESS],
96+
}
97+
],
98+
limit: 10
99+
});
100+
101+
if (response.results && response.results.length > 0) {
102+
console.log(`Found ${response.results.length} new events...`);
103+
104+
for (const event of response.results) {
105+
const parsed = SorobanParser.parseSwapEvent(event);
106+
107+
if (parsed) {
108+
console.log(`Event Parsed: Swap by ${parsed.userAddress} (${parsed.amountIn} -> ${parsed.amountOut})`);
109+
110+
// Sync with Database
111+
await saveTradeToDb(parsed);
112+
}
113+
114+
// Update ledger seq to avoid double counting
115+
if (event.ledgerSeq >= lastLedger) {
116+
lastLedger = event.ledgerSeq + 1;
117+
}
118+
}
119+
}
120+
121+
// Checkpoint every 5 seconds
122+
await new Promise(resolve => setTimeout(resolve, 5000));
123+
124+
} catch (error) {
125+
console.error('Indexer error pulse:', error.message);
126+
// Wait longer on error to prevent spamming
127+
await new Promise(resolve => setTimeout(resolve, 15000));
128+
}
129+
}
130+
}
131+
132+
/**
133+
* Persists the parsed trade into PostgreSQL via Prisma
134+
*/
135+
async function saveTradeToDb(tradeData) {
136+
try {
137+
// Ensure the pool exists in our database first
138+
// In a real app, pools should already exist, but for this daemon we'll upsert
139+
let pool = await prisma.pool.findUnique({
140+
where: { address: POOL_ADDRESS }
141+
});
142+
143+
if (!pool) {
144+
console.log(`Creating record for pool ${POOL_ADDRESS} in DB`);
145+
pool = await prisma.pool.create({
146+
data: {
147+
address: POOL_ADDRESS,
148+
tokenA: 'XLM', // Defaulting for indexer bootstrap
149+
tokenB: 'USDC',
150+
fee: '0.3%'
151+
}
152+
});
153+
}
154+
155+
// Insert the actual trade
156+
await prisma.trade.create({
157+
data: {
158+
poolId: pool.id,
159+
userAddress: tradeData.userAddress,
160+
amountIn: tradeData.amountIn,
161+
amountOut: tradeData.amountOut,
162+
timestamp: new Date()
163+
}
164+
});
165+
166+
console.log(`✅ Trade indexed successfully in DB: ${tradeData.userAddress.substring(0, 8)}...`);
167+
} catch (error) {
168+
console.error('Database insertion error:', error);
169+
}
170+
}
171+
172+
// Global error handling for the daemon
173+
process.on('uncaughtException', (err) => {
174+
console.error('Uncaught Exception:', err);
175+
});
176+
177+
process.on('unhandledRejection', (reason, promise) => {
178+
console.error('Unhandled Rejection at:', promise, 'reason:', reason);
179+
});
180+
181+
// Start the daemon
182+
runIndexer().catch(err => {
183+
console.error('Fatal Indexer Crash:', err);
184+
process.exit(1);
185+
});

0 commit comments

Comments
 (0)