Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions backend/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ STELLAR_RPC_URL=https://soroban-testnet.stellar.org
STELLAR_NETWORK_PASSPHRASE=Test SDF Network ; September 2015
LOAN_MANAGER_CONTRACT_ID=
LENDING_POOL_CONTRACT_ID=
REMITTANCE_NFT_CONTRACT_ID=
POOL_TOKEN_ADDRESS=
# Secret key for the on-chain LoanManager admin account (G... / S...)
LOAN_MANAGER_ADMIN_SECRET=
Expand Down
30 changes: 30 additions & 0 deletions backend/migrations/1779000000010_webhook-retry-mechanism.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/**
* @type {import('node-pg-migrate').ColumnDefinitions | undefined}
*/
export const shorthands = undefined;

/**
* @param pgm {import('node-pg-migrate').MigrationBuilder}
* @returns {Promise<void> | void}
*/
export const up = (pgm) => {
pgm.addColumns("webhook_subscriptions", {
max_attempts: { type: "integer", notNull: true, default: 5 },
});

pgm.addColumns("webhook_deliveries", {
payload: { type: "jsonb" },
next_retry_at: { type: "timestamp" },
});

pgm.createIndex("webhook_deliveries", ["delivered_at", "next_retry_at", "attempt_count"]);
};

/**
* @param pgm {import('node-pg-migrate').MigrationBuilder}
* @returns {Promise<void> | void}
*/
export const down = (pgm) => {
pgm.dropColumns("webhook_deliveries", ["payload", "next_retry_at"]);
pgm.dropColumns("webhook_subscriptions", ["max_attempts"]);
};
32 changes: 32 additions & 0 deletions backend/migrations/1779000000011_multi-contract-indexer-state.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/**
* @type {import('node-pg-migrate').ColumnDefinitions | undefined}
*/
export const shorthands = undefined;

/**
* @param pgm {import('node-pg-migrate').MigrationBuilder}
* @returns {Promise<void> | void}
*/
export const up = (pgm) => {
// Add indexer_name column to track state per contract/indexer instance
pgm.addColumns("indexer_state", {
indexer_name: { type: "varchar(255)", notNull: true, default: "default" },
});

// Migrate existing record to 'loan_manager'
pgm.sql("UPDATE indexer_state SET indexer_name = 'loan_manager' WHERE id = 1");

// Ensure indexer_name is unique
pgm.addConstraint("indexer_state", "indexer_state_indexer_name_unique", {
unique: "indexer_name",
});
};

/**
* @param pgm {import('node-pg-migrate').MigrationBuilder}
* @returns {Promise<void> | void}
*/
export const down = (pgm) => {
pgm.dropConstraint("indexer_state", "indexer_state_indexer_name_unique");
pgm.dropColumns("indexer_state", ["indexer_name"]);
};
46 changes: 33 additions & 13 deletions backend/src/controllers/indexerController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -355,10 +355,11 @@ export const createWebhookSubscription = async (
res: Response,
) => {
try {
const { callbackUrl, eventTypes, secret } = req.body as {
const { callbackUrl, eventTypes, secret, maxAttempts } = req.body as {
callbackUrl?: string;
eventTypes?: string[];
secret?: string;
maxAttempts?: number;
};

if (!callbackUrl) {
Expand Down Expand Up @@ -398,18 +399,12 @@ export const createWebhookSubscription = async (
});
}

const subscription = await webhookService.registerSubscription(
secret
? {
callbackUrl,
eventTypes: normalizedEventTypes,
secret,
}
: {
callbackUrl,
eventTypes: normalizedEventTypes,
},
);
const subscription = await webhookService.registerSubscription({
callbackUrl,
eventTypes: normalizedEventTypes,
secret: secret || undefined,
maxAttempts: maxAttempts ? Number(maxAttempts) : undefined,
});

res.status(201).json({
success: true,
Expand Down Expand Up @@ -497,6 +492,30 @@ export const getWebhookDeliveries = async (req: Request, res: Response) => {
}
};

export const getWebhookRetryQueue = async (req: Request, res: Response) => {
try {
const limit = Number(req.query.limit ?? 50);
const boundedLimit =
Number.isFinite(limit) && limit > 0 ? Math.min(limit, 100) : 50;

const pendingRetries = await webhookService.getPendingRetries(boundedLimit);

res.json({
success: true,
data: {
queueSize: pendingRetries.length,
retries: pendingRetries,
},
});
} catch (error) {
logger.error("Failed to fetch webhook retry queue", { error });
res.status(500).json({
success: false,
message: "Failed to fetch webhook retry queue",
});
}
};

export const reindexLedgerRange = async (req: Request, res: Response) => {
try {
const fromLedger = Number(req.query.fromLedger);
Expand Down Expand Up @@ -538,6 +557,7 @@ export const reindexLedgerRange = async (req: Request, res: Response) => {

const batchSize = Number(process.env.INDEXER_BATCH_SIZE ?? 100);
const indexer = new EventIndexer({
name: "reindex",
rpcUrl,
contractId,
pollIntervalMs: 30_000,
Expand Down
5 changes: 5 additions & 0 deletions backend/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
} from "./services/defaultChecker.js";
import { eventStreamService } from "./services/eventStreamService.js";
import { sorobanService } from "./services/sorobanService.js";
import { webhookRetryScheduler } from "./services/webhookRetryScheduler.js";

const port = process.env.PORT || 3001;

Expand All @@ -34,6 +35,9 @@ const server = app.listen(port, () => {

// Start periodic on-chain default checks (if configured)
startDefaultCheckerScheduler();

// Start webhook retry scheduler
webhookRetryScheduler.start();
});

const shutdown = async (signal: "SIGTERM" | "SIGINT") => {
Expand All @@ -48,6 +52,7 @@ const shutdown = async (signal: "SIGTERM" | "SIGINT") => {

stopIndexer();
stopDefaultCheckerScheduler();
webhookRetryScheduler.stop();

if (typeof eventStreamService.closeAll === 'function') {
eventStreamService.closeAll("Server shutting down");
Expand Down
22 changes: 22 additions & 0 deletions backend/src/routes/adminRoutes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
createWebhookSubscription,
deleteWebhookSubscription,
getWebhookDeliveries,
getWebhookRetryQueue,
listWebhookSubscriptions,
reindexLedgerRange,
} from "../controllers/indexerController.js";
Expand Down Expand Up @@ -206,4 +207,25 @@ router.delete(
*/
router.get("/webhooks/:id/deliveries", requireApiKey, getWebhookDeliveries);

/**
* @swagger
* /admin/webhooks/retry-queue:
* get:
* summary: View pending webhook retries
* tags: [Admin]
* security:
* - ApiKeyAuth: []
* parameters:
* - in: query
* name: limit
* required: false
* schema:
* type: integer
* default: 50
* responses:
* 200:
* description: Retry queue returned
*/
router.get("/webhooks/retry-queue", requireApiKey, getWebhookRetryQueue);

export default router;
64 changes: 47 additions & 17 deletions backend/src/services/eventIndexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ interface LoanEvent extends IndexedLoanEvent {
}

interface EventIndexerConfig {
name: string;
rpcUrl: string;
contractId: string;
pollIntervalMs?: number;
Expand All @@ -53,6 +54,7 @@ interface ProcessChunkResult {
}

export class EventIndexer {
private readonly name: string;
private readonly rpc: SorobanRpc.Server;
private readonly contractId: string;
private readonly pollIntervalMs: number;
Expand All @@ -61,19 +63,21 @@ export class EventIndexer {
private pollTimeout: NodeJS.Timeout | null = null;

constructor(config: EventIndexerConfig);
constructor(rpcUrl: string, contractId: string);
constructor(configOrRpcUrl: EventIndexerConfig | string, contractId?: string) {
constructor(rpcUrl: string, contractId: string, name?: string);
constructor(configOrRpcUrl: EventIndexerConfig | string, contractId?: string, name?: string) {
if (typeof configOrRpcUrl === "string") {
if (!contractId) {
throw new Error("contractId is required when using rpcUrl constructor");
}
this.name = name ?? "default";
this.rpc = new SorobanRpc.Server(configOrRpcUrl);
this.contractId = contractId;
this.pollIntervalMs = 30_000;
this.batchSize = 100;
return;
}

this.name = configOrRpcUrl.name;
this.rpc = new SorobanRpc.Server(configOrRpcUrl.rpcUrl);
this.contractId = configOrRpcUrl.contractId;
this.pollIntervalMs = configOrRpcUrl.pollIntervalMs ?? 30_000;
Expand Down Expand Up @@ -187,16 +191,16 @@ export class EventIndexer {
const result = await query(
`SELECT last_indexed_ledger
FROM indexer_state
ORDER BY id DESC
WHERE indexer_name = $1
LIMIT 1`,
[],
[this.name],
);

if (!result.rows.length) {
await query(
`INSERT INTO indexer_state (last_indexed_ledger)
VALUES (0)`,
[],
`INSERT INTO indexer_state (last_indexed_ledger, indexer_name)
VALUES (0, $1)`,
[this.name],
);
return 0;
}
Expand All @@ -209,20 +213,15 @@ export class EventIndexer {
`UPDATE indexer_state
SET last_indexed_ledger = GREATEST(last_indexed_ledger, $1),
updated_at = CURRENT_TIMESTAMP
WHERE id = (
SELECT id
FROM indexer_state
ORDER BY id DESC
LIMIT 1
)`,
[ledger],
WHERE indexer_name = $2`,
[ledger, this.name],
);

if ((updateResult.rowCount ?? 0) === 0) {
await query(
`INSERT INTO indexer_state (last_indexed_ledger)
VALUES ($1)`,
[ledger],
`INSERT INTO indexer_state (last_indexed_ledger, indexer_name)
VALUES ($1, $2)`,
[ledger, this.name],
);
}
}
Expand Down Expand Up @@ -472,6 +471,24 @@ export class EventIndexer {
} else if (type === "Seized") {
if (!event.topic[1]) return null;
borrower = this.decodeAddress(event.topic[1]);
} else if (type === "Deposit" || type === "Withdraw") {
if (!event.topic[1]) return null;
borrower = this.decodeAddress(event.topic[1]);
// LendingPool events: value is a tuple (amount, shares)
const nativeValue = scValToNative(event.value);
if (Array.isArray(nativeValue)) {
amount = nativeValue[0].toString();
} else {
amount = nativeValue.toString();
}
} else if (type === "YieldDistributed") {
amount = this.decodeAmount(event.value);
} else if (type === "Mint" || type === "ScoreUpd" || type === "ScoreDecr") {
if (!event.topic[1]) return null;
borrower = this.decodeAddress(event.topic[1]);
} else if (type === "Transfer") {
if (!event.topic[1]) return null;
borrower = this.decodeAddress(event.topic[1]); // 'from' address
}

return {
Expand Down Expand Up @@ -649,8 +666,21 @@ export class EventIndexer {
"Paused",
"Unpaused",
"MinScoreUpdated",
"Deposit",
"Withdraw",
"YieldDistributed",
"Mint",
"ScoreUpd",
"ScoreDecr",
"HashUpd",
"Transfer",
"PoolPaused",
"PoolUnpaused",
];

if (eventType === "PoolPaused") return "Paused";
if (eventType === "PoolUnpaused") return "Unpaused";

return supported.includes(eventType)
? (eventType as WebhookEventType)
: null;
Expand Down
Loading