Skip to content
7 changes: 5 additions & 2 deletions apps/backend/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ JOB_SCHEDULER_POLL_SECONDS=300
JOB_WORKER_POLL_SECONDS=60
JOB_CATCHUP_MAX_ENQUEUE=10
JOB_SCHEDULE_PHASE_SECONDS=0
JOB_ENQUEUE_JITTER_SECONDS=0
DEAL_JOB_TIMEOUT_SECONDS=360 # 6m: Max runtime for deal jobs (TODO: reduce default to 3m)
RETRIEVAL_JOB_TIMEOUT_SECONDS=60 # 1m: Max runtime for retrieval jobs (TODO: reduce default to 30s)
DEALBOT_PGBOSS_POOL_MAX=1
DEALBOT_PGBOSS_SCHEDULER_ENABLED=true

Expand All @@ -67,6 +70,6 @@ PROXY_LOCATIONS=l1,l2

# Timeout Configuration (in milliseconds)
CONNECT_TIMEOUT_MS=10000 # 10s: Initial connection timeout
HTTP_REQUEST_TIMEOUT_MS=600000 # 10m: Total transfer timeout for HTTP/1.1
HTTP2_REQUEST_TIMEOUT_MS=600000 # 10m: Total transfer timeout for HTTP/2
HTTP_REQUEST_TIMEOUT_MS=240000 # 4m: Total transfer timeout for HTTP/1.1 (10MiB @ 170KB/s + overhead)
HTTP2_REQUEST_TIMEOUT_MS=240000 # 4m: Total transfer timeout for HTTP/2 (10MiB @ 170KB/s + overhead)
RETRIEVAL_TIMEOUT_BUFFER_MS=60000 # 1m: Stop batch before next scheduled run
96 changes: 96 additions & 0 deletions apps/backend/src/common/abort-utils.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import { afterEach, describe, expect, it, vi } from "vitest";
import { awaitWithAbort, createAbortError, delay } from "./abort-utils.js";

describe("createAbortError", () => {
it("returns a generic AbortError when no signal is provided", () => {
const error = createAbortError();
expect(error.name).toBe("AbortError");
expect(error.message).toBe("The operation was aborted");
});

it("returns the signal reason when it is an Error", () => {
const reason = new Error("custom reason");
const controller = new AbortController();
controller.abort(reason);
const error = createAbortError(controller.signal);
expect(error).toBe(reason);
});

it("returns an AbortError that preserves non-Error reasons", () => {
const controller = new AbortController();
controller.abort("string reason");
const error = createAbortError(controller.signal);
expect(error.name).toBe("AbortError");
expect(error.message).toBe("The operation was aborted: string reason");
expect((error as Error & { cause?: unknown }).cause).toBe("string reason");
});
});

describe("awaitWithAbort", () => {
it("passes through the promise when no signal is provided", async () => {
const result = await awaitWithAbort(Promise.resolve("hello"));
expect(result).toBe("hello");
});

it("throws immediately when signal is already aborted", async () => {
const controller = new AbortController();
controller.abort();
await expect(awaitWithAbort(Promise.resolve("hello"), controller.signal)).rejects.toThrow();
});

it("rejects when signal aborts during pending promise", async () => {
const controller = new AbortController();
const neverResolves = new Promise<string>(() => {});

const resultPromise = awaitWithAbort(neverResolves, controller.signal);
controller.abort(new Error("test abort"));

await expect(resultPromise).rejects.toThrow("test abort");
});

it("resolves normally when promise resolves before abort", async () => {
const controller = new AbortController();
const result = await awaitWithAbort(Promise.resolve(42), controller.signal);
expect(result).toBe(42);
});

it("rejects with the original error when promise rejects", async () => {
const controller = new AbortController();
const error = new Error("original error");
await expect(awaitWithAbort(Promise.reject(error), controller.signal)).rejects.toThrow("original error");
});
});

describe("delay", () => {
afterEach(() => {
vi.useRealTimers();
});

it("resolves after the specified time", async () => {
vi.useFakeTimers();
const promise = delay(100);
vi.advanceTimersByTime(100);
await expect(promise).resolves.toBeUndefined();
});

it("rejects immediately when signal is already aborted", async () => {
const controller = new AbortController();
controller.abort();
await expect(delay(1000, controller.signal)).rejects.toThrow();
});

it("rejects when signal aborts during delay", async () => {
const controller = new AbortController();
const promise = delay(10_000, controller.signal);

controller.abort(new Error("cancelled"));
await expect(promise).rejects.toThrow("cancelled");
});

it("resolves normally when no signal is provided", async () => {
vi.useFakeTimers();
const promise = delay(50);
vi.advanceTimersByTime(50);
await expect(promise).resolves.toBeUndefined();
});
});
61 changes: 61 additions & 0 deletions apps/backend/src/common/abort-utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/**
* Returns the abort reason from a signal as an Error, or creates a generic AbortError.
*/
export function createAbortError(signal?: AbortSignal): Error {
const reason = signal?.reason;
if (reason instanceof Error) {
return reason;
}
const baseMessage = "The operation was aborted";
const message = reason === undefined ? baseMessage : `${baseMessage}: ${String(reason)}`;
const error: Error & { cause?: unknown } = new Error(message);
error.name = "AbortError";
if (reason !== undefined) {
error.cause = reason;
}
return error;
}

/**
* Wraps a promise so it rejects immediately when the signal fires.
* If the signal is already aborted, rejects immediately.
*/
export async function awaitWithAbort<T>(promise: Promise<T>, signal?: AbortSignal): Promise<T> {
if (!signal) return promise;
signal.throwIfAborted();

return new Promise<T>((resolve, reject) => {
const onAbort = () => reject(createAbortError(signal));
signal.addEventListener("abort", onAbort, { once: true });
promise.then(
(value) => {
signal.removeEventListener("abort", onAbort);
resolve(value);
},
(error) => {
signal.removeEventListener("abort", onAbort);
reject(error);
},
);
});
}

/**
* Abort-aware delay. Resolves after `ms`, or rejects immediately if signal fires.
*/
export function delay(ms: number, signal?: AbortSignal): Promise<void> {
if (!signal) return new Promise((resolve) => setTimeout(resolve, ms));
if (signal.aborted) return Promise.reject(createAbortError(signal));

return new Promise((resolve, reject) => {
const timeoutId = setTimeout(() => {
signal.removeEventListener("abort", onAbort);
resolve();
}, ms);
const onAbort = () => {
clearTimeout(timeoutId);
reject(createAbortError(signal));
};
signal.addEventListener("abort", onAbort, { once: true });
});
}
13 changes: 12 additions & 1 deletion apps/backend/src/common/car-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,36 @@ export type UnixfsCarResult = {
carSize: number;
};

export async function buildUnixfsCar(dataFile: { data: Buffer; size: number; name: string }): Promise<UnixfsCarResult> {
export async function buildUnixfsCar(
dataFile: { data: Buffer; size: number; name: string },
{ signal }: { signal?: AbortSignal } = {},
): Promise<UnixfsCarResult> {
const safeName = dataFile.name?.trim() ? dataFile.name.replace(/[^\w.-]+/g, "_") : "dealbot-upload";
const tempDir = join(tmpdir(), `dealbot-car-${randomBytes(6).toString("hex")}`);
const tempFilePath = join(tempDir, safeName);
let carPath: string | undefined;
try {
await mkdir(tempDir, { recursive: true });
signal?.throwIfAborted();
await writeFile(tempFilePath, dataFile.data);
signal?.throwIfAborted();

const carResult = await createCarFromPath(tempFilePath);
carPath = carResult.carPath;
signal?.throwIfAborted();

const carBytes = await readFile(carPath);
signal?.throwIfAborted();

const reader = await CarReader.fromBytes(carBytes);
signal?.throwIfAborted();

const blockCIDs: CID[] = [];
let totalBlockSize = 0;
let blockCount = 0;

for await (const block of reader.blocks()) {
signal?.throwIfAborted();
blockCIDs.push(block.cid);
totalBlockSize += block.bytes.length;
blockCount += 1;
Expand Down
34 changes: 30 additions & 4 deletions apps/backend/src/config/app.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ export const configValidationSchema = Joi.object({
DEALBOT_PGBOSS_POOL_MAX: Joi.number().integer().min(1).default(1),
JOB_CATCHUP_MAX_ENQUEUE: Joi.number().min(1).default(10),
JOB_SCHEDULE_PHASE_SECONDS: Joi.number().min(0).default(0),
JOB_ENQUEUE_JITTER_SECONDS: Joi.number().min(0).default(0),
DEAL_JOB_TIMEOUT_SECONDS: Joi.number().min(120).default(360), // 6 minutes max runtime for data storage jobs (TODO: reduce default to 3 minutes)
RETRIEVAL_JOB_TIMEOUT_SECONDS: Joi.number().min(60).default(60), // 1 minute max runtime for retrieval jobs (TODO: reduce default to 30 seconds)

// Dataset
DEALBOT_LOCAL_DATASETS_PATH: Joi.string().default(DEFAULT_LOCAL_DATASETS_PATH),
Expand All @@ -100,8 +103,8 @@ export const configValidationSchema = Joi.object({

// Timeouts (in milliseconds)
CONNECT_TIMEOUT_MS: Joi.number().min(1000).default(10000), // 10 seconds to establish connection/receive headers
HTTP_REQUEST_TIMEOUT_MS: Joi.number().min(1000).default(600000), // 10 minutes total for HTTP requests (Body transfer)
HTTP2_REQUEST_TIMEOUT_MS: Joi.number().min(1000).default(600000), // 10 minutes total for HTTP/2 requests (Body transfer)
HTTP_REQUEST_TIMEOUT_MS: Joi.number().min(1000).default(240000), // 4 minutes total for HTTP requests (10MiB @ 170KB/s + overhead)
HTTP2_REQUEST_TIMEOUT_MS: Joi.number().min(1000).default(240000), // 4 minutes total for HTTP/2 requests (10MiB @ 170KB/s + overhead)
Comment thread
SgtPooki marked this conversation as resolved.
RETRIEVAL_TIMEOUT_BUFFER_MS: Joi.number()
.min(0)
.default(60000)
Expand Down Expand Up @@ -242,6 +245,26 @@ export interface IJobsConfig {
* Only used when `DEALBOT_JOBS_MODE=pgboss`.
*/
schedulePhaseSeconds: number;
/**
* Random delay (seconds) added when enqueuing jobs.
*
* Helps avoid synchronized bursts across instances. Only used with pg-boss.
*/
enqueueJitterSeconds: number;
/**
* Maximum runtime (seconds) for deal jobs before forced abort.
*
* Uses AbortController to actively cancel job execution.
* Only used when `DEALBOT_JOBS_MODE=pgboss`.
*/
dealJobTimeoutSeconds: number;
/**
* Maximum runtime (seconds) for retrieval jobs before forced abort.
*
* Uses AbortController to actively cancel job execution.
* Only used when `DEALBOT_JOBS_MODE=pgboss`.
*/
retrievalJobTimeoutSeconds: number;
}

export interface IDatasetConfig {
Expand Down Expand Up @@ -350,6 +373,9 @@ export function loadConfig(): IConfig {
pgbossPoolMax: Number.parseInt(process.env.DEALBOT_PGBOSS_POOL_MAX || "1", 10),
catchupMaxEnqueue: Number.parseInt(process.env.JOB_CATCHUP_MAX_ENQUEUE || "10", 10),
schedulePhaseSeconds: Number.parseInt(process.env.JOB_SCHEDULE_PHASE_SECONDS || "0", 10),
enqueueJitterSeconds: Number.parseInt(process.env.JOB_ENQUEUE_JITTER_SECONDS || "0", 10),
dealJobTimeoutSeconds: Number.parseInt(process.env.DEAL_JOB_TIMEOUT_SECONDS || "360", 10),
retrievalJobTimeoutSeconds: Number.parseInt(process.env.RETRIEVAL_JOB_TIMEOUT_SECONDS || "60", 10),
},
dataset: {
localDatasetsPath: process.env.DEALBOT_LOCAL_DATASETS_PATH || DEFAULT_LOCAL_DATASETS_PATH,
Expand Down Expand Up @@ -377,8 +403,8 @@ export function loadConfig(): IConfig {
},
timeouts: {
connectTimeoutMs: Number.parseInt(process.env.CONNECT_TIMEOUT_MS || "10000", 10),
httpRequestTimeoutMs: Number.parseInt(process.env.HTTP_REQUEST_TIMEOUT_MS || "600000", 10),
http2RequestTimeoutMs: Number.parseInt(process.env.HTTP2_REQUEST_TIMEOUT_MS || "600000", 10),
httpRequestTimeoutMs: Number.parseInt(process.env.HTTP_REQUEST_TIMEOUT_MS || "240000", 10),
http2RequestTimeoutMs: Number.parseInt(process.env.HTTP2_REQUEST_TIMEOUT_MS || "240000", 10),
retrievalTimeoutBufferMs: Number.parseInt(process.env.RETRIEVAL_TIMEOUT_BUFFER_MS || "60000", 10),
},
};
Expand Down
20 changes: 13 additions & 7 deletions apps/backend/src/deal-addons/deal-addons.service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Injectable, Logger } from "@nestjs/common";
import { awaitWithAbort } from "../common/abort-utils.js";
import type { Deal } from "../database/entities/deal.entity.js";
import type { DealMetadata, ServiceType } from "../database/types.js";
import type { IDealAddon } from "./interfaces/deal-addon.interface.js";
Expand Down Expand Up @@ -58,7 +59,7 @@ export class DealAddonsService {
* @returns Complete preprocessing result with processed data and metadata
* @throws Error if preprocessing fails
*/
async preprocessDeal(config: DealConfiguration): Promise<DealPreprocessingResult> {
async preprocessDeal(config: DealConfiguration, signal?: AbortSignal): Promise<DealPreprocessingResult> {
const startTime = Date.now();
this.logger.log(`Starting deal preprocessing for file: ${config.dataFile.name}`);

Expand All @@ -79,7 +80,7 @@ export class DealAddonsService {
);

// Execute preprocessing pipeline
const pipelineResult = await this.executePreprocessingPipeline(sortedAddons, config);
const pipelineResult = await this.executePreprocessingPipeline(sortedAddons, config, signal);

// Merge Synapse configurations from all add-ons
const synapseConfig = this.mergeSynapseConfigs(sortedAddons, pipelineResult.aggregatedMetadata);
Expand Down Expand Up @@ -113,19 +114,23 @@ export class DealAddonsService {
* @param deal - Deal entity with upload information
* @param appliedAddons - Names of add-ons that were applied during preprocessing
*/
async handleUploadComplete(deal: Deal, appliedAddons: ServiceType[]): Promise<void> {
async handleUploadComplete(deal: Deal, appliedAddons: ServiceType[], signal?: AbortSignal): Promise<void> {
this.logger.debug(`Running onUploadComplete handlers for deal ${deal.id}`);

signal?.throwIfAborted();

const uploadCompletePromises = appliedAddons
.map((addonName) => this.addons.get(addonName))
.filter((addon) => addon?.onUploadComplete)
.map((addon) => addon!.onUploadComplete!(deal));
.map((addon) => addon!.onUploadComplete!(deal, signal));

try {
await Promise.all(uploadCompletePromises);
await awaitWithAbort(Promise.all(uploadCompletePromises), signal);
this.logger.debug(`onUploadComplete handlers completed for deal ${deal.id}`);
} catch (error) {
this.logger.warn(`onUploadComplete handler failed for deal ${deal.id}: ${error.message}`);
signal?.throwIfAborted();
const errorMessage = error instanceof Error ? error.message : String(error);
this.logger.warn(`onUploadComplete handler failed for deal ${deal.id}: ${errorMessage}`);
throw error;
}
}
Expand Down Expand Up @@ -196,6 +201,7 @@ export class DealAddonsService {
private async executePreprocessingPipeline(
addons: IDealAddon[],
config: DealConfiguration,
signal?: AbortSignal,
): Promise<{
finalData: Buffer | Uint8Array;
finalSize: number;
Expand All @@ -217,7 +223,7 @@ export class DealAddonsService {
this.logger.debug(`Executing add-on: ${addon.name}`);

// Execute preprocessing
const result = await addon.preprocessData(context);
const result = await addon.preprocessData(context, signal);

// Validate result if validation is implemented
if (addon.validate) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ export interface IDealAddon<T extends IpniMetadata | DirectMetadata = any> {
* @returns Preprocessing result with transformed data and metadata
* @throws Error if preprocessing fails
*/
preprocessData(context: AddonExecutionContext): Promise<PreprocessingResult<T>>;
preprocessData(context: AddonExecutionContext, signal?: AbortSignal): Promise<PreprocessingResult<T>>;

/**
* Get Synapse SDK configuration for this add-on
Expand All @@ -53,7 +53,7 @@ export interface IDealAddon<T extends IpniMetadata | DirectMetadata = any> {
* @param deal - Deal entity with upload information
* @returns Promise that resolves when handler is complete
*/
onUploadComplete?(deal: Deal): Promise<void>;
onUploadComplete?(deal: Deal, signal?: AbortSignal): Promise<void>;

/**
* Optional post-processing after deal creation
Expand Down
Loading