Skip to content

Commit 7e80970

Browse files
authored
Merge pull request #114 from JerryIdoko/feature/websocket-live-streaming-159
Pull Request: Refactor API for Live Ledger Streaming (#159, #83)
2 parents 17dd993 + 8579f18 commit 7e80970

5 files changed

Lines changed: 102 additions & 4 deletions

File tree

.env.example

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ DB_USERNAME=postgres
88
DB_PASSWORD=password
99
DB_DATABASE=tradeflow
1010

11-
# Soroban Event Indexer Configuration
11+
# Soroban Event Indexer & WebSocket Configuration
1212
SOROBAN_RPC_URL="https://soroban-testnet.stellar.org"
1313
POOL_ADDRESS="CC..." # Replace with your Pool Contract ID
14-
INDEXER_POLL_INTERVAL=5000 # Polling interval in ms
14+
INDEXER_POLL_INTERVAL=5000 # Polling interval in ms
15+
WS_PORT=3001 # WebSocket server port

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@
2727
"prisma:push": "prisma db push",
2828
"prisma:migrate": "prisma migrate dev",
2929
"prisma:studio": "prisma studio",
30-
"start:indexer": "node services/eventIndexer.js"
30+
"start:indexer": "node services/eventIndexer.js",
31+
"start:all": "node server.js"
3132
},
3233
"dependencies": {
3334
"@nestjs/common": "^10.4.22",

server.js

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/**
2+
* server.js
3+
*
4+
* Main Entry Point for combined Soroban Indexer and WebSocket streaming server.
5+
* This file orchestrates the real-time data flow from the blockchain to the frontend.
6+
*/
7+
8+
const WebSocket = require('ws');
9+
const wsEvents = require('./services/wsEvents');
10+
const { startIndexer } = require('./services/eventIndexer');
11+
12+
// Config and Port Setup
13+
const WS_PORT = process.env.WS_PORT || 3001;
14+
const wss = new WebSocket.Server({ port: WS_PORT });
15+
16+
console.log('--- 🌐 TradeFlow Real-Time Stream Server ---');
17+
console.log(`📡 WebSocket server running on ws://localhost:${WS_PORT}`);
18+
19+
// Connection tracking
20+
let activeConnections = 0;
21+
22+
/**
23+
* Listen for incoming WebSocket connections.
24+
*/
25+
wss.on('connection', (ws) => {
26+
activeConnections++;
27+
console.log(`✅ New Web3 client connected. Active: ${activeConnections}`);
28+
29+
// Initial Connection ACK
30+
ws.send(JSON.stringify({
31+
event: 'INDEXER_CONNECTED',
32+
status: 'ONLINE',
33+
timestamp: new Date().toISOString()
34+
}));
35+
36+
ws.on('close', () => {
37+
activeConnections--;
38+
console.log(`❌ Web3 client disconnected. Active: ${activeConnections}`);
39+
});
40+
41+
ws.on('error', (err) => {
42+
console.error('⚠️ WS Socket Error:', err.message);
43+
});
44+
});
45+
46+
/**
47+
* BROADCASTER: Listens to the internal 'newTrade' event emitter.
48+
* Broadcasts every new blockchain event caught by the Indexer daemon
49+
* to all connected browser clients.
50+
*/
51+
wsEvents.on('newTrade', (tradeData) => {
52+
console.log(`📣 BROADCASTING: New trade found in pool ${tradeData.poolId.slice(0, 8)}...`);
53+
54+
const payload = JSON.stringify({
55+
event: 'NEW_TRADE_EVENT',
56+
data: tradeData,
57+
receivedAt: new Date().toISOString()
58+
});
59+
60+
// Iterative broadcast to all active subscribers
61+
wss.clients.forEach((client) => {
62+
if (client.readyState === WebSocket.OPEN) {
63+
client.send(payload);
64+
}
65+
});
66+
});
67+
68+
/**
69+
* 🚀 START DAEMON ORCHESTRATION
70+
* We start the Soroban Indexer in the same Node.js process to bridge
71+
* the blockchain data with the WebSocket emitter via internal memory.
72+
*/
73+
startIndexer().catch((err) => {
74+
console.error('❌ CRITICAL ERROR: Event Indexer failed to start:', err.message);
75+
process.exit(1);
76+
});
77+
78+
// Process Management
79+
process.on('SIGTERM', () => {
80+
console.log('🛑 Closing WebSocket connections and shutting down.');
81+
wss.close();
82+
process.exit(0);
83+
});

services/eventIndexer.js

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
const { rpc } = require('@stellar/stellar-sdk');
1010
const { PrismaClient } = require('@prisma/client');
1111
const { parseScVal } = require('./scValParser');
12+
const wsEvents = require('./wsEvents');
1213

1314
// In case dotenv is not installed as a top-level dependency,
1415
// we try to load it safely. Most Node.js environments for this project should have it.
@@ -125,6 +126,9 @@ async function handleContractEvent(event) {
125126
});
126127

127128
console.log(`💾 Indexed Trade saved. DB ID: ${savedTrade.id}`);
129+
130+
// Trigger WebSocket broadcast
131+
wsEvents.emit('newTrade', savedTrade);
128132
}
129133
} catch (error) {
130134
console.error('❌ Failed to process event:', error.message);
@@ -138,4 +142,9 @@ process.on('SIGINT', async () => {
138142
process.exit(0);
139143
});
140144

141-
startIndexer();
145+
exports.startIndexer = startIndexer;
146+
147+
// In standalone mode, starting the indexer automatically
148+
if (require.main === module) {
149+
startIndexer();
150+
}

services/wsEvents.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
const EventEmitter = require('events');
2+
const wsEvents = new EventEmitter();
3+
4+
module.exports = wsEvents;

0 commit comments

Comments
 (0)