Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions backend/migrations/1779000000010_webhook-retry-mechanism.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/**
* @type {import('node-pg-migrate').ColumnDefinitions | undefined}
*/
export const shorthands = undefined;

/**
* @param pgm {import('node-pg-migrate').MigrationBuilder}
* @returns {Promise<void> | void}
*/
export const up = (pgm) => {
pgm.addColumns("webhook_subscriptions", {
max_attempts: { type: "integer", notNull: true, default: 5 },
});

pgm.addColumns("webhook_deliveries", {
payload: { type: "jsonb" },
next_retry_at: { type: "timestamp" },
});

pgm.createIndex("webhook_deliveries", ["delivered_at", "next_retry_at", "attempt_count"]);
};

/**
* @param pgm {import('node-pg-migrate').MigrationBuilder}
* @returns {Promise<void> | void}
*/
export const down = (pgm) => {
pgm.dropColumns("webhook_deliveries", ["payload", "next_retry_at"]);
pgm.dropColumns("webhook_subscriptions", ["max_attempts"]);
};
45 changes: 32 additions & 13 deletions backend/src/controllers/indexerController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -355,10 +355,11 @@ export const createWebhookSubscription = async (
res: Response,
) => {
try {
const { callbackUrl, eventTypes, secret } = req.body as {
const { callbackUrl, eventTypes, secret, maxAttempts } = req.body as {
callbackUrl?: string;
eventTypes?: string[];
secret?: string;
maxAttempts?: number;
};

if (!callbackUrl) {
Expand Down Expand Up @@ -398,18 +399,12 @@ export const createWebhookSubscription = async (
});
}

const subscription = await webhookService.registerSubscription(
secret
? {
callbackUrl,
eventTypes: normalizedEventTypes,
secret,
}
: {
callbackUrl,
eventTypes: normalizedEventTypes,
},
);
const subscription = await webhookService.registerSubscription({
callbackUrl,
eventTypes: normalizedEventTypes,
secret: secret || undefined,
maxAttempts: maxAttempts ? Number(maxAttempts) : undefined,
});

res.status(201).json({
success: true,
Expand Down Expand Up @@ -497,6 +492,30 @@ export const getWebhookDeliveries = async (req: Request, res: Response) => {
}
};

export const getWebhookRetryQueue = async (req: Request, res: Response) => {
try {
const limit = Number(req.query.limit ?? 50);
const boundedLimit =
Number.isFinite(limit) && limit > 0 ? Math.min(limit, 100) : 50;

const pendingRetries = await webhookService.getPendingRetries(boundedLimit);

res.json({
success: true,
data: {
queueSize: pendingRetries.length,
retries: pendingRetries,
},
});
} catch (error) {
logger.error("Failed to fetch webhook retry queue", { error });
res.status(500).json({
success: false,
message: "Failed to fetch webhook retry queue",
});
}
};

export const reindexLedgerRange = async (req: Request, res: Response) => {
try {
const fromLedger = Number(req.query.fromLedger);
Expand Down
5 changes: 5 additions & 0 deletions backend/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
} from "./services/defaultChecker.js";
import { eventStreamService } from "./services/eventStreamService.js";
import { sorobanService } from "./services/sorobanService.js";
import { webhookRetryScheduler } from "./services/webhookRetryScheduler.js";

const port = process.env.PORT || 3001;

Expand All @@ -34,6 +35,9 @@ const server = app.listen(port, () => {

// Start periodic on-chain default checks (if configured)
startDefaultCheckerScheduler();

// Start webhook retry scheduler
webhookRetryScheduler.start();
});

const shutdown = async (signal: "SIGTERM" | "SIGINT") => {
Expand All @@ -48,6 +52,7 @@ const shutdown = async (signal: "SIGTERM" | "SIGINT") => {

stopIndexer();
stopDefaultCheckerScheduler();
webhookRetryScheduler.stop();

if (typeof eventStreamService.closeAll === 'function') {
eventStreamService.closeAll("Server shutting down");
Expand Down
22 changes: 22 additions & 0 deletions backend/src/routes/adminRoutes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
createWebhookSubscription,
deleteWebhookSubscription,
getWebhookDeliveries,
getWebhookRetryQueue,
listWebhookSubscriptions,
reindexLedgerRange,
} from "../controllers/indexerController.js";
Expand Down Expand Up @@ -206,4 +207,25 @@ router.delete(
*/
router.get("/webhooks/:id/deliveries", requireApiKey, getWebhookDeliveries);

/**
* @swagger
* /admin/webhooks/retry-queue:
* get:
* summary: View pending webhook retries
* tags: [Admin]
* security:
* - ApiKeyAuth: []
* parameters:
* - in: query
* name: limit
* required: false
* schema:
* type: integer
* default: 50
* responses:
* 200:
* description: Retry queue returned
*/
router.get("/webhooks/retry-queue", requireApiKey, getWebhookRetryQueue);

export default router;
87 changes: 87 additions & 0 deletions backend/src/services/webhookRetryScheduler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import { webhookService } from "./webhookService.js";
import logger from "../utils/logger.js";

export class WebhookRetryScheduler {
private interval: ReturnType<typeof setInterval> | undefined;
private isRunning = false;
private intervalMs: number;

constructor(intervalMs: number = 60 * 1000) {
this.intervalMs = intervalMs;
}

start(): void {
if (this.interval) {
logger.warn("Webhook retry scheduler is already running");
return;
}

this.interval = setInterval(async () => {
await this.processRetries();
}, this.intervalMs);

logger.info("Webhook retry scheduler started", { intervalMs: this.intervalMs });
}

stop(): void {
if (this.interval) {
clearInterval(this.interval);
this.interval = undefined;
logger.info("Webhook retry scheduler stopped");
}
}

private async processRetries(): Promise<void> {
if (this.isRunning) return;

this.isRunning = true;
try {
const pendingRetries = await webhookService.getPendingRetries(50);

if (pendingRetries.length === 0) {
return;
}

logger.info(`Processing ${pendingRetries.length} pending webhook retries`);

await Promise.all(
pendingRetries.map(async (delivery) => {
try {
// We need to fetch the subscription details to get the callback URL and secret
// getPendingRetries already does a JOIN, so we have them in the row,
// but mapDeliveryRow doesn't include them in the interface.
// I've added mapRetryToContext in webhookService to handle this.

// Re-fetch with join to get all needed info safely if not in delivery object
// Actually, I'll update getPendingRetries to return the context directly.

// Since I've implemented mapRetryToContext in WebhookService,
// I'll use it to get the context for each delivery.

// NOTE: In a real production system, we might want to limit concurrency here.
await webhookService.sendToWebhook(
delivery.subscriptionId,
(delivery as any).callbackUrl, // These are injected by the JOIN in getPendingRetries
(delivery as any).secret,
(delivery as any).maxAttempts,
delivery.payload,
delivery.id,
delivery.attemptCount + 1
);
} catch (error) {
logger.error("Failed to process webhook retry", {
deliveryId: delivery.id,
error
});
}
})
);
} catch (error) {
logger.error("Error in webhook retry scheduler loop", { error });
} finally {
this.isRunning = false;
}
}
}

export const webhookRetryScheduler = new WebhookRetryScheduler();
Loading
Loading