Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/backend/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ JOB_SCHEDULE_PHASE_SECONDS=0
JOB_ENQUEUE_JITTER_SECONDS=0
DEAL_JOB_TIMEOUT_SECONDS=360 # 6m: Max runtime for deal jobs (TODO: reduce default to 3m)
RETRIEVAL_JOB_TIMEOUT_SECONDS=60 # 1m: Max runtime for retrieval jobs (TODO: reduce default to 30s)
IPFS_BLOCK_FETCH_CONCURRENCY=6 # Parallel block fetches when validating IPFS DAGs
DEALBOT_PGBOSS_POOL_MAX=1
DEALBOT_PGBOSS_SCHEDULER_ENABLED=true

Expand Down
1 change: 1 addition & 0 deletions apps/backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
"dependencies": {
"@filoz/synapse-sdk": "0.36.1",
"@ipld/car": "^5.4.2",
"@ipld/dag-pb": "^4.1.4",
"@nestjs/axios": "^4.0.1",
"@nestjs/common": "^11.1.13",
"@nestjs/config": "^4.0.3",
Expand Down
37 changes: 37 additions & 0 deletions apps/backend/src/common/car-utils.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import { randomBytes } from "node:crypto";
import { mkdir, rm } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { afterEach, beforeEach, describe, expect, it } from "vitest";
import { buildUnixfsCar } from "./car-utils.js";

describe("car-utils", () => {
let tempDir: string;

beforeEach(async () => {
tempDir = join(tmpdir(), `car-utils-test-${randomBytes(6).toString("hex")}`);
await mkdir(tempDir, { recursive: true });
});

afterEach(async () => {
await rm(tempDir, { recursive: true, force: true });
});

describe("buildUnixfsCar", () => {
it("produces CAR data, root CID, and block CIDs from file data", async () => {
const data = randomBytes(4096);
const result = await buildUnixfsCar({
data: Buffer.from(data),
size: data.length,
name: "test.bin",
});

expect(result.carData.length).toBeGreaterThan(0);
expect(result.rootCID).toBeDefined();
expect(result.blockCIDs.length).toBeGreaterThanOrEqual(1);
expect(result.blockCount).toBe(result.blockCIDs.length);
expect(result.totalBlockSize).toBeGreaterThan(0);
expect(result.carSize).toBe(result.carData.length);
});
});
});
2 changes: 1 addition & 1 deletion apps/backend/src/common/car-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { tmpdir } from "node:os";
import { join } from "node:path";
import { CarReader } from "@ipld/car";
import { cleanupTempCar, createCarFromPath } from "filecoin-pin/core/unixfs";
import { CID } from "multiformats/cid";
import type { CID } from "multiformats/cid";

export type UnixfsCarResult = {
carData: Uint8Array;
Expand Down
9 changes: 9 additions & 0 deletions apps/backend/src/config/app.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ export const configValidationSchema = Joi.object({
JOB_ENQUEUE_JITTER_SECONDS: Joi.number().min(0).default(0),
DEAL_JOB_TIMEOUT_SECONDS: Joi.number().min(120).default(360), // 6 minutes max runtime for data storage jobs (TODO: reduce default to 3 minutes)
RETRIEVAL_JOB_TIMEOUT_SECONDS: Joi.number().min(60).default(60), // 1 minute max runtime for retrieval jobs (TODO: reduce default to 30 seconds)
IPFS_BLOCK_FETCH_CONCURRENCY: Joi.number().integer().min(1).max(32).default(6),

// Dataset
DEALBOT_LOCAL_DATASETS_PATH: Joi.string().default(DEFAULT_LOCAL_DATASETS_PATH),
Expand Down Expand Up @@ -232,6 +233,10 @@ export interface ITimeoutConfig {
http2RequestTimeoutMs: number;
}

export interface IRetrievalConfig {
ipfsBlockFetchConcurrency: number;
}

export interface IConfig {
app: IAppConfig;
database: IDatabaseConfig;
Expand All @@ -240,6 +245,7 @@ export interface IConfig {
jobs: IJobsConfig;
dataset: IDatasetConfig;
timeouts: ITimeoutConfig;
retrieval: IRetrievalConfig;
}

const parseIpniTestingMode = (value: string | undefined): IpniTestingMode => {
Expand Down Expand Up @@ -349,5 +355,8 @@ export function loadConfig(): IConfig {
httpRequestTimeoutMs: Number.parseInt(process.env.HTTP_REQUEST_TIMEOUT_MS || "240000", 10),
http2RequestTimeoutMs: Number.parseInt(process.env.HTTP2_REQUEST_TIMEOUT_MS || "240000", 10),
},
retrieval: {
ipfsBlockFetchConcurrency: Number.parseInt(process.env.IPFS_BLOCK_FETCH_CONCURRENCY || "6", 10),
},
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,16 @@ export interface IRetrievalAddon {
*/
validateData?(retrievedData: Buffer, config: RetrievalConfiguration): Promise<ValidationResult>;

/**
* Optional: Validate by fetching each expected block from the SP (e.g. GET /ipfs/<cid> with Accept: application/vnd.ipld.raw).
* Used when the strategy does not use a single CAR stream.
*
* @param config - Retrieval configuration (must include expected CIDs in metadata)
* @param signal - Optional abort signal
* @returns Validation result
*/
validateByBlockFetch?(config: RetrievalConfiguration, signal?: AbortSignal): Promise<ValidationResult>;

/**
* Optional: Get expected performance metrics for this retrieval method
* Useful for monitoring and alerting on performance degradation
Expand Down
4 changes: 2 additions & 2 deletions apps/backend/src/retrieval-addons/retrieval-addons.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ import { HttpClientModule } from "../http-client/http-client.module.js";
import { WalletSdkModule } from "../wallet-sdk/wallet-sdk.module.js";
import { RetrievalAddonsService } from "./retrieval-addons.service.js";
import { DirectRetrievalStrategy } from "./strategies/direct.strategy.js";
import { IpniRetrievalStrategy } from "./strategies/ipni.strategy.js";
import { IpfsBlockRetrievalStrategy } from "./strategies/ipfs-block.strategy.js";

@Module({
imports: [WalletSdkModule, HttpClientModule],
providers: [RetrievalAddonsService, DirectRetrievalStrategy, IpniRetrievalStrategy],
providers: [RetrievalAddonsService, DirectRetrievalStrategy, IpfsBlockRetrievalStrategy],
exports: [RetrievalAddonsService],
})
export class RetrievalAddonsModule {}
48 changes: 43 additions & 5 deletions apps/backend/src/retrieval-addons/retrieval-addons.service.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { Injectable, Logger } from "@nestjs/common";
import { delay } from "../common/abort-utils.js";
import { RetrievalError, type RetrievalErrorResponseInfo } from "../common/errors.js";
import { ServiceType } from "../database/types.js";
import { HttpClientService } from "../http-client/http-client.service.js";
import type { RequestWithMetrics } from "../http-client/types.js";
import type { IRetrievalAddon } from "./interfaces/retrieval-addon.interface.js";
import { DirectRetrievalStrategy } from "./strategies/direct.strategy.js";
import { IpniRetrievalStrategy } from "./strategies/ipni.strategy.js";
import { IpfsBlockRetrievalStrategy } from "./strategies/ipfs-block.strategy.js";
import type {
RetrievalConfiguration,
RetrievalExecutionResult,
Expand All @@ -26,7 +27,7 @@ export class RetrievalAddonsService {

constructor(
private readonly directRetrieval: DirectRetrievalStrategy,
private readonly ipniRetrieval: IpniRetrievalStrategy,
private readonly ipfsBlockRetrieval: IpfsBlockRetrievalStrategy,
private readonly httpClientService: HttpClientService,
) {
this.registerAddons();
Expand All @@ -38,7 +39,7 @@ export class RetrievalAddonsService {
*/
private registerAddons(): void {
this.registerAddon(this.directRetrieval);
this.registerAddon(this.ipniRetrieval);
this.registerAddon(this.ipfsBlockRetrieval);

this.logger.log(`Registered ${this.addons.size} retrieval add-ons: ${Array.from(this.addons.keys()).join(", ")}`);
}
Expand Down Expand Up @@ -194,7 +195,6 @@ export class RetrievalAddonsService {
return {
url: urlResults[index].url,
method: urlResults[index].method,
data: Buffer.alloc(0),
metrics: {
latency: 0,
ttfb: 0,
Expand Down Expand Up @@ -343,6 +343,45 @@ export class RetrievalAddonsService {

try {
signal?.throwIfAborted();
if (urlResult.method === ServiceType.IPFS_PIN && strategy.validateByBlockFetch) {
let validation: ValidationResult;
const startTime = performance.now();
try {
validation = await strategy.validateByBlockFetch(config, signal);
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
this.logger.warn(
`Block-fetch validation error for ${urlResult.method} retrieval of deal ${config.deal.id}: ${errorMessage}`,
);
validation = {
isValid: false,
method: "validation-error",
details: errorMessage,
};
}
// TODO: totalTime includes per-block hash verification (validateBlock) which
// inflates latency and deflates throughput relative to pure network performance.
// Splitting fetch and hash into separate queues would give accurate download metrics.
const totalTime = performance.now() - startTime;
const responseSize = validation.bytesRead ?? 0;
const throughput = totalTime > 0 && responseSize > 0 ? responseSize / (totalTime / 1000) : 0;

return {
url: urlResult.url,
method: urlResult.method,
metrics: {
latency: Math.round(totalTime),
ttfb: validation.ttfb ?? 0,
throughput,
statusCode: validation.isValid ? 200 : 0,
timestamp: new Date(),
responseSize,
},
validation,
success: true,
Comment thread
SgtPooki marked this conversation as resolved.
};
}

let result: RequestWithMetrics<Buffer>;
result = await this.httpClientService.requestWithMetrics<Buffer>(urlResult.url, {
headers: urlResult.headers,
Expand Down Expand Up @@ -426,7 +465,6 @@ export class RetrievalAddonsService {
return {
url: urlResult.url,
method: urlResult.method,
data: Buffer.alloc(0),
metrics: {
latency: 0,
ttfb: 0,
Expand Down
Loading