Skip to content
Merged
99 changes: 66 additions & 33 deletions piece-retriever/bin/piece-retriever.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
measureStreamedEgress,
} from '../lib/retrieval.js'
import {
getStorageProviderAndValidatePayer,
getRetrievalCandidatesAndValidatePayer,
logRetrievalResult,
updateDataSetStats,
} from '../lib/store.js'
Expand Down Expand Up @@ -71,18 +71,17 @@ export default {
// Timestamp to measure file retrieval performance (from cache and from SP)
const fetchStartedAt = performance.now()

const [{ serviceProviderId, serviceUrl, dataSetId }, isBadBit] =
await Promise.all([
getStorageProviderAndValidatePayer(
env,
payerWalletAddress,
pieceCid,
env.ENFORCE_EGRESS_QUOTA,
),
env.BAD_BITS_KV.get(`bad-bits:${await getBadBitsEntry(pieceCid)}`, {
type: 'json',
}),
])
const [retrievalCandidates, isBadBit] = await Promise.all([
getRetrievalCandidatesAndValidatePayer(
env,
payerWalletAddress,
pieceCid,
env.ENFORCE_EGRESS_QUOTA,
),
env.BAD_BITS_KV.get(`bad-bits:${await getBadBitsEntry(pieceCid)}`, {
type: 'json',
}),
])

httpAssert(
!isBadBit,
Expand All @@ -91,23 +90,55 @@ export default {
)

httpAssert(
serviceProviderId,
404,
`Unsupported Service Provider: ${serviceProviderId}`,
retrievalCandidates.length > 0,
500,
'Service provider lookup failed',
)

let retrievalCandidate
let retrievalResult
const retrievalAttempts = []

try {
retrievalResult = await retrieveFile(
ctx,
serviceUrl,
pieceCid,
request,
env.ORIGIN_CACHE_TTL,
{ signal: request.signal },
while (retrievalCandidates.length > 0) {
const retrievalCandidateIndex = Math.floor(
Math.random() * retrievalCandidates.length,
)
} catch {}
retrievalCandidate = retrievalCandidates[retrievalCandidateIndex]
retrievalAttempts.push(retrievalCandidate)
retrievalCandidates.splice(retrievalCandidateIndex, 1)
console.log('Attempting retrieval', retrievalCandidate)
try {
retrievalResult = await retrieveFile(
ctx,
retrievalCandidate.serviceUrl,
pieceCid,
request,
env.ORIGIN_CACHE_TTL,
{ signal: request.signal },
)
if (retrievalResult.response.ok) {
break
}
console.log(
`Retrieval attempt failed: HTTP ${retrievalResult.response.status}`,
{
retrievalCandidate,
willRetry: retrievalCandidates.length > 0,
},
)
} catch (err) {
const msg =
typeof err === 'object' && err !== null && 'message' in err
? err.message
: String(err)
console.log(`Retrieval attempt failed: ${msg}`, {
retrievalCandidate,
willRetry: retrievalCandidates.length > 0,
})
}
Comment thread
juliangruber marked this conversation as resolved.
}

httpAssert(retrievalCandidate, 500, 'should never happen')

if (!retrievalResult || retrievalResult.response.status >= 500) {
ctx.waitUntil(
Expand All @@ -117,16 +148,18 @@ export default {
egressBytes: 0,
requestCountryCode,
timestamp: requestTimestamp,
dataSetId,
dataSetId: retrievalCandidate.dataSetId,
botName,
}),
)
const response = new Response(
`Service provider ${serviceProviderId} is unavailable${retrievalResult ? ` at ${retrievalResult.url}` : ''}`,
`No available service provider found. Attempted: ${retrievalAttempts.map((a) => `ID=${a.serviceProviderId} (Service URL=${a.serviceUrl})`).join(', ')}`,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will eventually need to update filbeam-bot to support an array of IDs in this header:

https://github.com/filbeam/bot/blob/c4479005a7834e0e708d1dc910bc9fd4baf5794a/index.js#L197-L203

Maybe after we land my PR filbeam/bot#40 first, to avoid merge conflicts.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

{
status: 502,
headers: new Headers({
'X-Data-Set-ID': dataSetId,
'X-Data-Set-ID': retrievalAttempts
.map((a) => a.dataSetId)
.join(','),
}),
},
)
Expand All @@ -145,7 +178,7 @@ export default {
egressBytes: 0,
requestCountryCode,
timestamp: requestTimestamp,
dataSetId,
dataSetId: retrievalCandidate.dataSetId,
botName,
}),
)
Expand All @@ -154,7 +187,7 @@ export default {
retrievalResult.response,
)
setContentSecurityPolicy(response)
response.headers.set('X-Data-Set-ID', dataSetId)
response.headers.set('X-Data-Set-ID', retrievalCandidate.dataSetId)
response.headers.set(
'Cache-Control',
`public, max-age=${env.CLIENT_CACHE_TTL}`,
Expand Down Expand Up @@ -185,12 +218,12 @@ export default {
fetchTtlb: lastByteFetchedAt - fetchStartedAt,
workerTtfb: firstByteAt - workerStartedAt,
},
dataSetId,
dataSetId: retrievalCandidate.dataSetId,
botName,
})

await updateDataSetStats(env, {
dataSetId,
dataSetId: retrievalCandidate.dataSetId,
egressBytes,
cacheMiss: retrievalResult.cacheMiss,
enforceEgressQuota: env.ENFORCE_EGRESS_QUOTA,
Expand All @@ -205,7 +238,7 @@ export default {
headers: retrievalResult.response.headers,
})
setContentSecurityPolicy(response)
response.headers.set('X-Data-Set-ID', dataSetId)
response.headers.set('X-Data-Set-ID', retrievalCandidate.dataSetId)
response.headers.set(
'Cache-Control',
`public, max-age=${env.CLIENT_CACHE_TTL}`,
Expand Down
57 changes: 21 additions & 36 deletions piece-retriever/lib/store.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,17 @@ export async function logRetrievalResult(env, params) {
* @param {string} pieceCid - The piece CID to look up
* @param {boolean} [enforceEgressQuota=false] - Whether to enforce egress quota
* limits. Default is `false`
* @returns {Promise<{
* serviceProviderId: string
* serviceUrl: string
* dataSetId: string
* cdnEgressQuota: bigint
* cacheMissEgressQuota: bigint
* }>}
* @returns {Promise<
* {
* serviceProviderId: string
* serviceUrl: string
* dataSetId: string
* cdnEgressQuota: bigint
* cacheMissEgressQuota: bigint
* }[]
* >}
*/
export async function getStorageProviderAndValidatePayer(
export async function getRetrievalCandidatesAndValidatePayer(
env,
payerAddress,
pieceCid,
Expand Down Expand Up @@ -206,29 +208,21 @@ export async function getStorageProviderAndValidatePayer(
`Cache miss egress quota exhausted for payer '${payerAddress}' and data set '${withSufficientCDNQuota[0]?.data_set_id}'. Please top up your cache miss egress quota.`,
)

const {
data_set_id: dataSetId,
service_provider_id: serviceProviderId,
service_url: serviceUrl,
cdn_egress_quota: cdnEgressQuota,
cache_miss_egress_quota: cacheMissEgressQuota,
} = pickRandom(withSufficientCacheMissQuota)

// We need this assertion to supress TypeScript error. The compiler is not able to infer that
// `withCDN.filter()` above returns only rows with `service_url` defined.
httpAssert(serviceUrl, 500, 'should never happen')
const retrievalCandidates = withSufficientCacheMissQuota.map((row) => ({
dataSetId: row.data_set_id,
serviceProviderId: row.service_provider_id,
// We need this cast to supress a TypeScript error. The compiler is not able to infer that
// `withCDN.filter()` above returns only rows with `service_url` defined.
serviceUrl: /** @type {string} */ (row.service_url),
cdnEgressQuota: BigInt(row.cdn_egress_quota ?? '0'),
cacheMissEgressQuota: BigInt(row.cache_miss_egress_quota ?? '0'),
}))

console.log(
`Looked up Data set ID '${dataSetId}' and service provider id '${serviceProviderId}' for piece_cid '${pieceCid}' and payer '${payerAddress}'. Service URL: ${serviceUrl}`,
`Looked up ${retrievalCandidates.length} retrieval candidates for piece_cid '${pieceCid}' and payer '${payerAddress}'`,
Comment thread
bajtos marked this conversation as resolved.
)

return {
serviceProviderId,
serviceUrl,
dataSetId,
cdnEgressQuota: BigInt(cdnEgressQuota ?? '0'),
cacheMissEgressQuota: BigInt(cacheMissEgressQuota ?? '0'),
}
return retrievalCandidates
}

/**
Expand Down Expand Up @@ -266,12 +260,3 @@ export async function updateDataSetStats(
)
.run()
}

/**
* @template T
* @param {T[]} arr
* @returns {T}
*/
function pickRandom(arr) {
return arr[Math.floor(Math.random() * arr.length)]
}
8 changes: 6 additions & 2 deletions piece-retriever/test/retriever.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -933,7 +933,9 @@ describe('piece-retriever.fetch', () => {
})
await waitOnExecutionContext(ctx)
expect(res.status).toBe(502)
expect(await res.text()).toMatch(/^Service provider \d+ is unavailable at /)
expect(await res.text()).toMatch(
/^No available service provider found. Attempted: ID=/,
)
expect(res.headers.get('X-Data-Set-ID')).toBe(String(dataSetId))

const result = await env.DB.prepare(
Expand Down Expand Up @@ -1063,7 +1065,9 @@ describe('piece-retriever.fetch', () => {
})
await waitOnExecutionContext(ctx)
expect(res.status).toBe(502)
expect(await res.text()).toMatch(/^Service provider \d+ is unavailable$/)
expect(await res.text()).toMatch(
/^No available service provider found. Attempted: ID=/,
)
expect(res.headers.get('X-Data-Set-ID')).toBe(String(dataSetId))
})
})
Loading