-
Notifications
You must be signed in to change notification settings - Fork 6
Serve IPFS retrievals #312
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
cf025ce
f73c566
dd025cc
c26393b
b2e4c81
56b4cee
d7e4692
4b7b1c6
f8de5b0
6faf291
290b38d
4425cc3
aed1018
c47071f
78d175c
4d1541e
9baaea0
eee955b
4723465
6b28eb1
0e0451d
992e284
2105982
07c4dec
9c0c2a4
8d9fdec
2e0f385
7ccfd61
78bb34d
784006d
044a105
1690c01
fa1011f
4900845
d40e592
89e8fad
1f3c471
ff768d2
9f2272c
e223831
0aac84d
8e04420
78a7d32
def2c30
ef6cecf
4320137
ceed20c
78778c3
5be53ac
9ba5aa5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,331 @@ | ||
| import { | ||
| isValidEthereumAddress, | ||
| httpAssert, | ||
| setContentSecurityPolicy, | ||
| getBadBitsEntry, | ||
| updateDataSetStats, | ||
| logRetrievalResult, | ||
| } from '@filbeam/retrieval' | ||
|
|
||
| import { parseRequest } from '../lib/request.js' | ||
| import { | ||
| retrieveIpfsContent as defaultRetrieveIpfsContent, | ||
| measureStreamedEgress, | ||
| processIpfsResponse, | ||
| } from '../lib/retrieval.js' | ||
| import { | ||
| getStorageProviderAndValidatePayerByDataSetAndPiece, | ||
| getSlugForWalletAndCid, | ||
| } from '../lib/store.js' | ||
|
|
||
| export default { | ||
| /** | ||
| * @param {Request} request | ||
| * @param {Env} env | ||
| * @param {ExecutionContext} ctx | ||
| * @param {object} options | ||
| * @param {typeof defaultRetrieveIpfsContent} [options.retrieveIpfsContent] | ||
| * @returns | ||
| */ | ||
| async fetch( | ||
| request, | ||
| env, | ||
| ctx, | ||
| { retrieveIpfsContent = defaultRetrieveIpfsContent } = {}, | ||
| ) { | ||
| try { | ||
| return await this._fetch(request, env, ctx, { | ||
| retrieveIpfsContent, | ||
| }) | ||
| } catch (error) { | ||
| return this._handleError(error) | ||
| } | ||
| }, | ||
|
|
||
| /** | ||
| * @param {Request} request | ||
| * @param {Env} env | ||
| * @param {ExecutionContext} ctx | ||
| * @param {object} options | ||
| * @param {typeof defaultRetrieveIpfsContent} [options.retrieveIpfsContent] | ||
| * @returns | ||
| */ | ||
| async _fetch( | ||
| request, | ||
| env, | ||
| ctx, | ||
| { retrieveIpfsContent = defaultRetrieveIpfsContent } = {}, | ||
| ) { | ||
| httpAssert( | ||
| ['GET', 'HEAD'].includes(request.method), | ||
| 405, | ||
| 'Method Not Allowed', | ||
| ) | ||
|
|
||
| if ( | ||
| URL.parse(request.url)?.hostname === env.DNS_ROOT.slice(1) || | ||
| URL.parse(request.url)?.hostname === `link${env.DNS_ROOT}` | ||
| ) { | ||
| return handleDnsRootRequest(request, env) | ||
| } | ||
|
|
||
| if (URL.parse(request.url)?.hostname.endsWith('filcdn.io')) { | ||
| return Response.redirect( | ||
| request.url.replace('filcdn.io', 'filbeam.io'), | ||
| 301, | ||
| ) | ||
| } | ||
|
|
||
| const requestTimestamp = new Date().toISOString() | ||
| const workerStartedAt = performance.now() | ||
| const requestCountryCode = request.headers.get('CF-IPCountry') | ||
|
|
||
| const { dataSetId, pieceId, ipfsSubpath, ipfsFormat, botName } = parseRequest( | ||
| request, | ||
| env, | ||
| ) | ||
|
|
||
| try { | ||
| // Timestamp to measure file retrieval performance (from cache and from SP) | ||
| const fetchStartedAt = performance.now() | ||
|
|
||
| const { serviceProviderId, serviceUrl, ipfsRootCid } = | ||
| await getStorageProviderAndValidatePayerByDataSetAndPiece( | ||
| env, | ||
| dataSetId, | ||
| pieceId, | ||
| ) | ||
|
|
||
| // Now check Bad Bits with the ipfsRootCid we got from the database | ||
| const isBadBit = await env.BAD_BITS_KV.get( | ||
| `bad-bits:${await getBadBitsEntry(ipfsRootCid)}`, | ||
| { | ||
| type: 'json', | ||
| }, | ||
| ) | ||
|
|
||
| httpAssert( | ||
| !isBadBit, | ||
| 404, | ||
| 'The requested CID was flagged by the Bad Bits Denylist at https://badbits.dwebops.pub', | ||
| ) | ||
|
|
||
| httpAssert( | ||
| serviceProviderId, | ||
| 404, | ||
| `Unsupported Service Provider: ${serviceProviderId}`, | ||
| ) | ||
|
|
||
| const { response: originResponse, cacheMiss } = await retrieveIpfsContent( | ||
| serviceUrl, | ||
| ipfsRootCid, | ||
| ipfsSubpath, | ||
| env.ORIGIN_CACHE_TTL, | ||
| { signal: request.signal }, | ||
| ) | ||
|
|
||
| const responseBody = await processIpfsResponse(originResponse, { | ||
| ipfsRootCid, | ||
| ipfsSubpath, | ||
| ipfsFormat, | ||
| signal: request.signal, | ||
| }) | ||
|
|
||
| if (!responseBody) { | ||
| // The upstream response does not have any readable body | ||
| // There is no need to measure response body size, we can | ||
| // return the original response object. | ||
| ctx.waitUntil( | ||
| logRetrievalResult(env, { | ||
| cacheMiss, | ||
| responseStatus: originResponse.status, | ||
| egressBytes: 0, | ||
| requestCountryCode, | ||
| timestamp: requestTimestamp, | ||
| dataSetId, | ||
| botName, | ||
| }), | ||
| ) | ||
| const response = new Response(originResponse.body, originResponse) | ||
| setContentSecurityPolicy(response) | ||
| response.headers.set('X-Data-Set-ID', dataSetId) | ||
| response.headers.set( | ||
| 'Cache-Control', | ||
| `public, max-age=${env.CLIENT_CACHE_TTL}`, | ||
| ) | ||
| return response | ||
| } | ||
|
|
||
| // Stream and count bytes | ||
| // We create two identical streams, one for the egress measurement and the other for returning the response as soon as possible | ||
| const [returnedStream, egressMeasurementStream] = responseBody.tee() | ||
| const reader = egressMeasurementStream.getReader() | ||
| const firstByteAt = performance.now() | ||
|
|
||
| ctx.waitUntil( | ||
| (async () => { | ||
| const egressBytes = await measureStreamedEgress(reader) | ||
| const lastByteFetchedAt = performance.now() | ||
|
|
||
| await logRetrievalResult(env, { | ||
| cacheMiss, | ||
| responseStatus: originResponse.status, | ||
| egressBytes, | ||
| requestCountryCode, | ||
| timestamp: requestTimestamp, | ||
| performanceStats: { | ||
| fetchTtfb: firstByteAt - fetchStartedAt, | ||
| fetchTtlb: lastByteFetchedAt - fetchStartedAt, | ||
| workerTtfb: firstByteAt - workerStartedAt, | ||
| }, | ||
| dataSetId, | ||
| botName | ||
| }) | ||
|
|
||
| await updateDataSetStats(env, { | ||
| dataSetId, | ||
| egressBytes, | ||
| cacheMiss, | ||
| enforceEgressQuota: env.ENFORCE_EGRESS_QUOTA, | ||
| }) | ||
| })(), | ||
| ) | ||
|
|
||
| // Return immediately, proxying the transformed response | ||
| const response = new Response(returnedStream, { | ||
| status: originResponse.status, | ||
| statusText: originResponse.statusText, | ||
| headers: originResponse.headers, | ||
| }) | ||
| setContentSecurityPolicy(response) | ||
| response.headers.set('X-Data-Set-ID', dataSetId) | ||
| response.headers.set( | ||
| 'Cache-Control', | ||
| `public, max-age=${env.CLIENT_CACHE_TTL}`, | ||
| ) | ||
|
|
||
| // FIXME: move this logic into processIpfsResponse function | ||
| // When converting from CAR to RAW, set content-disposition to inline | ||
| // so browsers display the content instead of downloading it. | ||
| if (ipfsFormat !== 'car') { | ||
| response.headers.set('content-disposition', 'inline') | ||
| // Also remove the content-type header, remove x-content-type-options, | ||
| // and let the browser to sniff the content type. | ||
| response.headers.delete('content-type') | ||
| response.headers.delete('x-content-type-options') | ||
| } | ||
|
Comment on lines
+207
to
+216
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After we move this block to |
||
|
|
||
| return response | ||
| } catch (error) { | ||
| const { status } = getErrorHttpStatusMessage(error) | ||
|
|
||
| ctx.waitUntil( | ||
| logRetrievalResult(env, { | ||
| cacheMiss: null, | ||
| responseStatus: status, | ||
| egressBytes: null, | ||
| requestCountryCode, | ||
| timestamp: requestTimestamp, | ||
| dataSetId: null, | ||
| botName, | ||
| }), | ||
| ) | ||
|
|
||
| throw error | ||
| } | ||
| }, | ||
|
|
||
| /** | ||
| * @param {unknown} error | ||
| * @returns | ||
| */ | ||
| _handleError(error) { | ||
| const { status, message } = getErrorHttpStatusMessage(error) | ||
|
|
||
| if (status >= 500) { | ||
| console.error(error) | ||
| } | ||
| return new Response(message, { status }) | ||
| }, | ||
| } | ||
|
|
||
| /** | ||
| * Extracts status and message from an error object. | ||
| * | ||
| * - If the error has a numeric `status`, it is used; otherwise, defaults to 500. | ||
| * - If the status is < 500 and a string `message` exists, it's used; otherwise, a | ||
| * generic message is returned. | ||
| * | ||
| * @param {unknown} error - The error object to extract from. | ||
| * @returns {{ status: number; message: string }} | ||
| */ | ||
| function getErrorHttpStatusMessage(error) { | ||
| const isObject = typeof error === 'object' && error !== null | ||
| const status = | ||
| isObject && 'status' in error && typeof error.status === 'number' | ||
| ? error.status | ||
| : 500 | ||
|
|
||
| const message = | ||
| isObject && | ||
| status < 500 && | ||
| 'message' in error && | ||
| typeof error.message === 'string' | ||
| ? error.message | ||
| : 'Internal Server Error' | ||
|
|
||
| return { status, message } | ||
| } | ||
|
Comment on lines
+252
to
+278
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems to be a good candidate to extract into a helper shared by both piece-retriever and ipfs-retriever. |
||
|
|
||
| /** | ||
| * Handles requests to the bare DNS_ROOT domain (e.g., ipfs.filbeam.io). | ||
| * | ||
| * - If no path is provided, redirects to https://filbeam.com | ||
| * - If path is /wallet/cid or /wallet/cid/pathname, generates a slug and | ||
| * redirects to the subdomain-based URL | ||
| * | ||
| * @param {Request} request - The incoming request | ||
| * @param {Env} env - Worker environment | ||
| * @returns {Promise<Response>} Redirect response | ||
| */ | ||
| async function handleDnsRootRequest(request, env) { | ||
| // Parse the URL path to extract wallet, cid, and optional pathname | ||
| const parsedUrl = URL.parse(request.url) | ||
| const pathname = parsedUrl?.pathname || '/' | ||
|
|
||
| // If no path, redirect to filbeam.com | ||
| if (pathname === '/' || pathname === '') { | ||
| return Response.redirect('https://filbeam.com/', 302) | ||
| } | ||
|
|
||
| // Parse path as /wallet/cid/pathname | ||
| const pathParts = pathname.slice(1).split('/') // Remove leading slash and split | ||
|
|
||
| if (pathParts.length < 2) { | ||
| httpAssert( | ||
| false, | ||
| 404, | ||
| 'Invalid path format. Expected: /wallet/cid or /wallet/cid/pathname', | ||
| ) | ||
| } | ||
|
|
||
| const wallet = pathParts[0].toLowerCase() | ||
| const cid = pathParts[1] | ||
| const subpath = pathParts.slice(2).join('/') | ||
|
|
||
| // Validate wallet address | ||
| httpAssert( | ||
| isValidEthereumAddress(wallet), | ||
| 404, | ||
| `Invalid wallet address: ${wallet}. Address must be a valid ethereum address.`, | ||
| ) | ||
|
|
||
| // Get slug for the wallet and CID | ||
| const slug = await getSlugForWalletAndCid(env, wallet, cid) | ||
|
|
||
| // Build redirect URL | ||
| const redirectPath = subpath ? `/${subpath}` : '' | ||
| const redirectUrl = `https://${slug}${env.DNS_ROOT}${redirectPath}` | ||
|
|
||
| return Response.redirect(redirectUrl, 302) | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to create a discussion thread that we can mark as resolved later, and the only way to do so is by attaching the first comment to some code.
Let's review the TODO check-list in the issue description:
?format=raw?format=carand other query-string params when redirecting from/wallet/cid to 1-{dataset}-{piece}.I feel many of these items can be implemented later and don't block the landing of this PR.
The only hard requirement is IMO reporting egress consumption, as it is tied to billing. I can imagine the first iteration can report CAR size instead of the real response size. We will charge clients a bit more (CAR is larger than the raw data), but I think that's okay for a few days until we fix the problem in a follow-up pull request.
It would be nice to preserve the query string when redirecting from
link.*to1-{dataset}-{piece}.*, since it should be a quick fix.I propose to convert the rest of TODOs into new GH issues and add them as children of the theme MVP of FilBeam for IPFS (filbeam/roadmap#85).
@juliangruber thoughts?