Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
f5c2ad4
update everything to m2.5 contracts (tests not yet run)
juliangruber Aug 20, 2025
db7d1af
use new `PiecesAdded` event
juliangruber Aug 21, 2025
dabf118
`prettier --write`
juliangruber Aug 21, 2025
13f9978
fix sql
juliangruber Aug 21, 2025
f0c9678
progress
juliangruber Aug 21, 2025
54aecb5
integrate more service registry stuff (untested)
juliangruber Aug 22, 2025
08200d9
fix migration
juliangruber Aug 22, 2025
f8a8a41
more works
juliangruber Aug 22, 2025
03241a8
fix provider pkey, fix missing awaits
juliangruber Aug 23, 2025
0168274
fix remove upper case address support
juliangruber Aug 23, 2025
1384a3a
storage tests pass
juliangruber Aug 23, 2025
4450ab6
fix product removed handler
juliangruber Aug 23, 2025
c8dcf47
fix more input validation rules
juliangruber Aug 23, 2025
beea997
fix rpc mock
juliangruber Aug 23, 2025
b79d36b
fix test assertions
juliangruber Aug 23, 2025
424a0ad
fix provider pkey (continued)
juliangruber Aug 23, 2025
c851be8
remove outdated tests
juliangruber Aug 23, 2025
f7bb3b7
fix property names
juliangruber Aug 23, 2025
e5f6409
fix mixed case address tests
juliangruber Aug 23, 2025
16b609c
fix rpc mock
juliangruber Aug 23, 2025
f25eaa8
fix header name
juliangruber Aug 23, 2025
410be5a
fix retrieval test
juliangruber Aug 23, 2025
e1c031a
clean up
juliangruber Aug 23, 2025
18fe8be
remove more mixed case handling
juliangruber Aug 23, 2025
1ded92c
add `ProviderRemoved` event handler
juliangruber Aug 23, 2025
610d2dd
clean up
juliangruber Aug 23, 2025
e8bf57e
magic number
juliangruber Aug 23, 2025
e13e227
clean up
juliangruber Aug 23, 2025
04e7697
clean up
juliangruber Aug 23, 2025
d6d97cf
ensure all goldsky addresses are lower case
juliangruber Aug 26, 2025
b818563
`beneficiary` -> `beneficiary_address`
juliangruber Aug 28, 2025
fe23d99
`storage_provider` -> `storage_provider_address`
juliangruber Aug 28, 2025
5051b8b
`payer` -> `payer_address`
juliangruber Aug 28, 2025
61df23d
`payee` -> `payee_address`
juliangruber Aug 28, 2025
4edb623
`FilecoinWarmStorageService` -> `FWSS`
juliangruber Aug 28, 2025
5750b14
`DataSetSats` -> `DataSetStats`
juliangruber Aug 28, 2025
aa9b749
restore mixed case address handling
juliangruber Aug 29, 2025
2f8b592
add index
juliangruber Aug 29, 2025
36a4fc4
fix lower case on write not read
juliangruber Aug 29, 2025
e0dd92b
change query to update
juliangruber Aug 29, 2025
826ad3d
Update retriever/lib/store.js
juliangruber Aug 29, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions db/migrations/0013_m2_5_upgrades.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
DROP TABLE provider_urls;
DROP TABLE indexer_proof_set_rails;
DROP TABLE indexer_proof_sets;
DROP TABLE indexer_roots;
DROP TABLE proof_set_stats;
DROP TABLE retrieval_logs;

CREATE TABLE providers (
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest adding a active column to the providers table as providers can have isActive field inside the struct.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any events for this state change, do you know when we would update this field?

id TEXT NOT NULL,
beneficiary_address TEXT NOT NULL,
service_url TEXT,
PRIMARY KEY (id)
);
Comment thread
juliangruber marked this conversation as resolved.
CREATE INDEX providers_beneficiary_address ON providers(beneficiary_address);

CREATE TABLE data_sets (
id TEXT NOT NULL,
storage_provider_address TEXT,
payer_address TEXT,
payee_address TEXT,
with_cdn BOOLEAN,
total_egress_bytes_used INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY (id)
);
Comment thread
juliangruber marked this conversation as resolved.

CREATE TABLE pieces (
id TEXT NOT NULL,
data_set_id TEXT NOT NULL,
cid TEXT NOT NULL,
PRIMARY KEY (id, data_set_id)
);
CREATE INDEX pieces_cid ON pieces(cid);

CREATE TABLE retrieval_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp DATETIME NOT NULL,
data_set_id TEXT,
storage_provider_address TEXT,
client_address TEXT NOT NULL,
response_status INTEGER,
egress_bytes INTEGER,
cache_miss BOOLEAN,
fetch_ttfb INTEGER,
worker_ttfb INTEGER,
request_country_code TEXT,
fetch_ttlb INTEGER
);
Comment thread
juliangruber marked this conversation as resolved.
174 changes: 94 additions & 80 deletions indexer/bin/indexer.js
Original file line number Diff line number Diff line change
@@ -1,29 +1,34 @@
import {
handleProviderRegistered,
handleProductAdded,
handleProductUpdated,
handleProductRemoved,
handleProviderRemoved,
} from '../lib/provider-events-handler.js'
import { createPdpVerifierClient as defaultCreatePdpVerifierClient } from '../lib/pdp-verifier.js'
rpcRequest as defaultRpcRequest,
} from '../lib/service-provider-registry-handlers.js'
import { checkIfAddressIsSanctioned as defaultCheckIfAddressIsSanctioned } from '../lib/chainalysis.js'
import { handleProofSetRailCreated } from '../lib/proof-set-handler.js'
import { removeProofSetRoots, insertProofSetRoots } from '../lib/store.js'
import { handleFWSSDataSetCreated } from '../lib/filecoin-warm-storage-service-handlers.js'
import {
removeDataSetPieces,
insertDataSetPieces,
} from '../lib/pdp-verifier-handlers.js'

export default {
/**
* @param {Request} request
* @param {Env} env
* @param {ExecutionContext} ctx
* @param {object} options
* @param {typeof defaultCreatePdpVerifierClient} [options.createPdpVerifierClient]
* @param {typeof defaultCheckIfAddressIsSanctioned} [options.checkIfAddressIsSanctioned]
* @param {typeof defaultRpcRequest} [options.rpcRequest]
* @returns {Promise<Response>}
*/
async fetch(
request,
env,
ctx,
{
createPdpVerifierClient = defaultCreatePdpVerifierClient,
checkIfAddressIsSanctioned = defaultCheckIfAddressIsSanctioned,
rpcRequest = defaultRpcRequest,
Comment thread
juliangruber marked this conversation as resolved.
} = {},
) {
// TypeScript setup is broken in our monorepo
Expand All @@ -36,7 +41,7 @@ export default {
// @ts-ignore
RPC_URL,
// @ts-ignore
PDP_VERIFIER_ADDRESS,
SERVICE_PROVIDER_REGISTRY_ADDRESS,
// @ts-ignore
SECRET_HEADER_KEY,
// @ts-ignore
Expand All @@ -50,148 +55,157 @@ export default {
return new Response('Method Not Allowed', { status: 405 })
}
const payload = await request.json()

const pathname = new URL(request.url).pathname
if (pathname === '/proof-set-created') {
if (pathname === '/pdp-verifier/data-set-created') {
Comment thread
juliangruber marked this conversation as resolved.
if (
!(
typeof payload.set_id === 'number' ||
typeof payload.set_id === 'string'
) ||
!payload.owner
!payload.storage_provider
) {
console.error('ProofSetCreated: Invalid payload', payload)
console.error('PDPVerifier.DataSetCreated: Invalid payload', payload)
return new Response('Bad Request', { status: 400 })
}
console.log(
`New proof set (set_id=${payload.set_id}, owner=${payload.owner})`,
`New PDPVerifier data set (data_set_id=${payload.set_id}, storage_provider=${payload.storage_provider})`,
)
await env.DB.prepare(
`
INSERT INTO indexer_proof_sets (
set_id,
owner
INSERT INTO data_sets (
id,
storage_provider_address
)
VALUES (?, ?)
ON CONFLICT DO NOTHING
`,
)
.bind(String(payload.set_id), payload.owner?.toLowerCase())
.bind(String(payload.set_id), payload.storage_provider.toLowerCase())
.run()
return new Response('OK', { status: 200 })
} else if (pathname === '/roots-added') {
} else if (pathname === '/pdp-verifier/pieces-added') {
if (
!(
typeof payload.set_id === 'number' ||
typeof payload.set_id === 'string'
) ||
!payload.root_ids ||
typeof payload.root_ids !== 'string'
!payload.piece_ids ||
typeof payload.piece_ids !== 'string' ||
!payload.piece_cids ||
typeof payload.piece_cids !== 'string'
) {
console.error('RootsAdded: Invalid payload', payload)
console.error('PDPVerifier.PiecesAdded: Invalid payload', payload)
return new Response('Bad Request', { status: 400 })
}

/** @type {string[]} */
const rootIds = payload.root_ids.split(',')

const setId = BigInt(payload.set_id)

const pdpVerifier = createPdpVerifierClient({
rpcUrl: RPC_URL,
glifToken: GLIF_TOKEN,
pdpVerifierAddress: PDP_VERIFIER_ADDRESS,
})

const rootCids = payload.root_cids
? payload.root_cids.split(',')
: await Promise.all(
rootIds.map(async (rootId) => {
try {
return await pdpVerifier.getRootCid(
setId,
BigInt(rootId),
payload.block_number,
)
} catch (/** @type {any} */ err) {
console.error(
`RootsAdded: Cannot resolve root CID for setId=${setId} rootId=${rootId}: ${err?.stack ?? err}`,
)
throw err
}
}),
)
const pieceIds = payload.piece_ids.split(',')
/** @type {string[]} */
const pieceCids = payload.piece_cids.split(',')

console.log(
`New roots (root_ids=[${rootIds.join(', ')}], root_cids=[${rootCids.join(', ')}], set_id=${payload.set_id})`,
`New pieces (piece_ids=[${pieceIds.join(', ')}], piece_cids=[${pieceCids.join(', ')}], data_set_id=${payload.set_id})`,
)

await insertProofSetRoots(env, payload.set_id, rootIds, rootCids)
await insertDataSetPieces(env, payload.set_id, pieceIds, pieceCids)

return new Response('OK', { status: 200 })
} else if (pathname === '/roots-removed') {
} else if (pathname === '/pdp-verifier/pieces-removed') {
if (
!(
typeof payload.set_id === 'number' ||
typeof payload.set_id === 'string'
) ||
!payload.root_ids ||
typeof payload.root_ids !== 'string'
!payload.piece_ids ||
typeof payload.piece_ids !== 'string'
) {
console.error('RootsRemoved: Invalid payload', payload)
console.error('PDPVerifier.PiecesRemoved: Invalid payload', payload)
return new Response('Bad Request', { status: 400 })
}

/** @type {string[]} */
const rootIds = payload.root_ids.split(',')
const pieceIds = payload.piece_ids.split(',')

console.log(
`Removing roots (root_ids=[${rootIds.join(', ')}], set_id=${payload.set_id})`,
`Removing pieces (piece_ids=[${pieceIds.join(', ')}], data_set_id=${payload.set_id})`,
)

await removeProofSetRoots(env, payload.set_id, rootIds)
await removeDataSetPieces(env, payload.set_id, pieceIds)
return new Response('OK', { status: 200 })
} else if (pathname === '/proof-set-rail-created') {
} else if (pathname === '/filecoin-warm-storage-service/data-set-created') {
if (
!payload.proof_set_id ||
!(
typeof payload.proof_set_id === 'number' ||
typeof payload.proof_set_id === 'string'
) ||
!payload.rail_id ||
Comment thread
juliangruber marked this conversation as resolved.
!payload.data_set_id ||
!(
typeof payload.rail_id === 'number' ||
typeof payload.rail_id === 'string'
typeof payload.data_set_id === 'number' ||
typeof payload.data_set_id === 'string'
) ||
!payload.payer ||
!payload.payee
!payload.payee ||
typeof payload.with_cdn !== 'boolean'
Comment thread
juliangruber marked this conversation as resolved.
) {
console.error('ProofSetRailCreated: Invalid payload', payload)
console.error('FWSS.DataSetCreated: Invalid payload', payload)
return new Response('Bad Request', { status: 400 })
}

console.log(
`New proof set rail (proof_set_id=${payload.proof_set_id}, rail_id=${payload.rail_id}, payer=${payload.payer}, payee=${payload.payee}, with_cdn=${payload.with_cdn})`,
`New FWSS data set (data_set_id=${payload.data_set_id}, payer=${payload.payer}, payee=${payload.payee}, with_cdn=${payload.with_cdn})`,
)

try {
await handleProofSetRailCreated(env, payload, {
await handleFWSSDataSetCreated(env, payload, {
checkIfAddressIsSanctioned,
})
} catch (err) {
console.log(
`Error handling proof set rail creation: ${err}. Retrying...`,
`Error handling FWSS data set creation: ${err}. Retrying...`,
)
// @ts-ignore
env.RETRY_QUEUE.send({ type: 'proof-set-rail-created', payload })
env.RETRY_QUEUE.send({
type: 'filecoin-warm-storage-service-data-set-created',
payload,
})
}

return new Response('OK', { status: 200 })
} else if (pathname === '/provider-registered') {
const { provider, piece_retrieval_url: pieceRetrievalUrl } = payload
return await handleProviderRegistered(env, provider, pieceRetrievalUrl)
} else if (pathname === '/provider-removed') {
const { provider } = payload
return await handleProviderRemoved(env, provider)
} else if (pathname === '/service-provider-registry/product-added') {
Comment thread
juliangruber marked this conversation as resolved.
const {
provider_id: providerId,
product_type: productType,
block_number: blockNumber,
} = payload
return await handleProductAdded(
env,
rpcRequest,
providerId,
productType,
RPC_URL,
GLIF_TOKEN,
blockNumber,
SERVICE_PROVIDER_REGISTRY_ADDRESS,
)
} else if (pathname === '/service-provider-registry/product-updated') {
const {
provider_id: providerId,
product_type: productType,
block_number: blockNumber,
} = payload
return await handleProductUpdated(
env,
rpcRequest,
providerId,
productType,
RPC_URL,
GLIF_TOKEN,
blockNumber,
SERVICE_PROVIDER_REGISTRY_ADDRESS,
)
} else if (pathname === '/service-provider-registry/product-removed') {
const { provider_id: providerId, product_type: productType } = payload
return await handleProductRemoved(env, providerId, productType)
} else if (pathname === '/service-provider-registry/provider-removed') {
const { provider_id: providerId } = payload
return await handleProviderRemoved(env, providerId)
} else {
return new Response('Not Found', { status: 404 })
}
Expand All @@ -212,14 +226,14 @@ export default {
for (const message of batch.messages) {
if (message.body.type === 'proof-set-rail-created') {
try {
await handleProofSetRailCreated(env, message.body.payload, {
await handleFWSSDataSetCreated(env, message.body.payload, {
checkIfAddressIsSanctioned,
})

message.ack()
} catch (err) {
console.log(
`Error handling proof set rail creation: ${err}. Retrying...`,
`Error handling FWSS data set creation: ${err}. Retrying...`,
)
message.retry({ delaySeconds: 10 })
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { checkIfAddressIsSanctioned as defaultCheckIfAddressIsSanctioned } from '../lib/chainalysis.js'
import { checkIfAddressIsSanctioned as defaultCheckIfAddressIsSanctioned } from './chainalysis.js'

/**
* Handle proof set rail creation
Expand All @@ -10,7 +10,7 @@ import { checkIfAddressIsSanctioned as defaultCheckIfAddressIsSanctioned } from
* @throws {Error} If there is an error with fetching payer's address sanction
* status or during the database operation
*/
export async function handleProofSetRailCreated(
export async function handleFWSSDataSetCreated(
env,
payload,
{ checkIfAddressIsSanctioned = defaultCheckIfAddressIsSanctioned },
Expand All @@ -32,29 +32,27 @@ export async function handleProofSetRailCreated(
ON CONFLICT (address) DO UPDATE SET is_sanctioned = excluded.is_sanctioned
`,
)
.bind(payload.payer, isPayerSanctioned)
.bind(payload.payer.toLowerCase(), isPayerSanctioned)
.run()
}

await env.DB.prepare(
`
INSERT INTO indexer_proof_set_rails (
proof_set_id,
rail_id,
payer,
payee,
INSERT INTO data_sets (
id,
payer_address,
payee_address,
with_cdn
)
VALUES (?, ?, ?, ?, ?)
VALUES (?, ?, ?, ?)
Comment thread
juliangruber marked this conversation as resolved.
ON CONFLICT DO NOTHING
`,
)
.bind(
String(payload.proof_set_id),
String(payload.rail_id),
payload.payer,
payload.payee,
payload.with_cdn ?? null,
String(payload.data_set_id),
payload.payer.toLowerCase(),
payload.payee.toLowerCase(),
payload.with_cdn,
)
.run()
}
Loading