Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions packages/aws/src/overrides/converters/aws-apigw-v1.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ import {
} from "@opennextjs/core/overrides/converters/utils.js";
import type { InternalEvent, InternalResult } from "@opennextjs/core/types/open-next.js";
import type { Converter } from "@opennextjs/core/types/overrides.js";
import { fromReadableStream } from "@opennextjs/core/utils/stream.js";
import { fromReadableStream, toReadableStream } from "@opennextjs/core/utils/stream.js";
import type { APIGatewayProxyEvent, APIGatewayProxyResult } from "aws-lambda";

function normalizeAPIGatewayProxyEventHeaders(event: APIGatewayProxyEvent): Record<string, string> {
event.multiValueHeaders;
const headers: Record<string, string> = {};

for (const [key, values] of Object.entries(event.multiValueHeaders || {})) {
Expand Down Expand Up @@ -73,7 +72,7 @@ async function convertFromAPIGatewayProxyEvent(event: APIGatewayProxyEvent): Pro
method: httpMethod,
rawPath: path,
url: `https://${extractHostFromHeaders(headers)}${path}${normalizeAPIGatewayProxyEventQueryParams(event)}`,
body: Buffer.from(body ?? "", isBase64Encoded ? "base64" : "utf8"),
body: body ? toReadableStream(body, isBase64Encoded) : undefined,
headers,
remoteAddress: requestContext.identity.sourceIp,
query: removeUndefinedFromQuery(normalizeAPIGatewayProxyEventMultiValueQueryStringParameters(event)),
Expand Down
14 changes: 8 additions & 6 deletions packages/aws/src/overrides/converters/aws-apigw-v2.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import type { ReadableStream } from "node:stream/web";

import { debug } from "@opennextjs/core/adapters/logger.js";
import { convertToQuery } from "@opennextjs/core/core/routing/util.js";
import { parseSetCookieHeader } from "@opennextjs/core/http/util.js";
Expand All @@ -7,7 +9,7 @@ import {
} from "@opennextjs/core/overrides/converters/utils.js";
import type { InternalEvent, InternalResult } from "@opennextjs/core/types/open-next.js";
import type { Converter } from "@opennextjs/core/types/overrides.js";
import { fromReadableStream } from "@opennextjs/core/utils/stream.js";
import { fromReadableStream, toReadableStream } from "@opennextjs/core/utils/stream.js";
import type { APIGatewayProxyEventV2, APIGatewayProxyResultV2 } from "aws-lambda";

// Not sure which one is really needed as this is not documented anywhere but server actions redirect are not working without this,
Expand Down Expand Up @@ -36,18 +38,18 @@ const CloudFrontBlacklistedHeaders = [
"via",
];

function normalizeAPIGatewayProxyEventV2Body(event: APIGatewayProxyEventV2): Buffer {
function normalizeAPIGatewayProxyEventV2Body(event: APIGatewayProxyEventV2): ReadableStream | undefined {
const { body, isBase64Encoded } = event;
if (Buffer.isBuffer(body)) {
return body;
return toReadableStream(body);
}
if (typeof body === "string") {
return Buffer.from(body, isBase64Encoded ? "base64" : "utf8");
return toReadableStream(body, isBase64Encoded);
}
if (typeof body === "object") {
return Buffer.from(JSON.stringify(body));
return toReadableStream(JSON.stringify(body));
}
return Buffer.from("", "utf8");
return undefined;
}

function normalizeAPIGatewayProxyEventV2Headers(event: APIGatewayProxyEventV2): Record<string, string> {
Expand Down
5 changes: 2 additions & 3 deletions packages/aws/src/overrides/converters/aws-cloudfront.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@ import { parseSetCookieHeader } from "@opennextjs/core/http/util.js";
import { extractHostFromHeaders } from "@opennextjs/core/overrides/converters/utils.js";
import type { InternalEvent, InternalResult, MiddlewareResult } from "@opennextjs/core/types/open-next.js";
import type { Converter } from "@opennextjs/core/types/overrides.js";
import { fromReadableStream } from "@opennextjs/core/utils/stream.js";
import { fromReadableStream, toReadableStream } from "@opennextjs/core/utils/stream.js";
import type {
CloudFrontCustomOrigin,
CloudFrontHeaders,
CloudFrontRequest,
CloudFrontRequestEvent,
CloudFrontRequestResult,
} from "aws-lambda";

const cloudfrontBlacklistedHeaders = [
// Disallowed headers, see: https://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/edge-function-restrictions-all.html#function-restrictions-disallowed-headers
"connection",
Expand Down Expand Up @@ -82,7 +81,7 @@ async function convertFromCloudFrontRequestEvent(event: CloudFrontRequestEvent):
method,
rawPath: uri,
url: `https://${extractHostFromHeaders(headers)}${uri}${querystring ? `?${querystring}` : ""}`,
body: Buffer.from(body?.data ?? "", body?.encoding === "base64" ? "base64" : "utf8"),
body: body?.data ? toReadableStream(body.data, body.encoding === "base64") : undefined,
headers,
remoteAddress: clientIp,
query: convertToQuery(querystring),
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/core/routing/cacheInterceptor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ function createPprPartialResult(
"next-resume": "1",
},
rawPath: localizedPath,
body: Buffer.from(cachedValue.meta?.postponed || "", "utf-8"),
body: toReadableStream(cachedValue.meta?.postponed || ""),
},
result: {
type: "core",
Expand Down
6 changes: 4 additions & 2 deletions packages/core/src/core/routing/middleware.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ export async function handleMiddleware(

const middleware = await middlewareLoader();

const [bodyForMiddleware, bodyForForward] = internalEvent.body?.tee() ?? [undefined, undefined];

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i wonder if this is going to be unnecessary for the majority of apps. as in, if someone isn't going to read the body in the middleware, we're probably going ot tee the stream but never make use of that. i wonder if there's some like lazy tee or something we could do potentially.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't know anything about a lazy tee, and to be fair in most case there will be no body.
I guess we could add an option (or an env variable) to not forward the body to the middleware if not needed


const result: Response = await middleware.default({
// `geo` is pre Next 15.
geo: {
Expand All @@ -84,7 +86,7 @@ export async function handleMiddleware(
trailingSlash: NextConfig.trailingSlash,
},
url,
body: convertBodyToReadableStream(internalEvent.method, internalEvent.body),
body: convertBodyToReadableStream(internalEvent.method, bodyForMiddleware),
} as unknown as Request);
const statusCode = result.status;

Expand Down Expand Up @@ -175,7 +177,7 @@ export async function handleMiddleware(
rawPath: new URL(newUrl).pathname,
type: internalEvent.type,
headers: { ...internalEvent.headers, ...reqHeaders },
body: internalEvent.body,
body: bodyForForward,
method: internalEvent.method,
query: middlewareQuery,
cookies: internalEvent.cookies,
Expand Down
9 changes: 2 additions & 7 deletions packages/core/src/core/routing/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -203,15 +203,10 @@ export function unescapeRegex(str: string) {
/**
* @__PURE__
*/
export function convertBodyToReadableStream(method: string, body?: string | Buffer) {
export function convertBodyToReadableStream(method: string, body?: ReadableStream) {
if (method === "GET" || method === "HEAD") return undefined;
if (!body) return undefined;
return new ReadableStream({
start(controller) {
controller.enqueue(body);
controller.close();
},
});
return body;
}

enum CommonHeaders {
Expand Down
61 changes: 50 additions & 11 deletions packages/core/src/http/request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

// @ts-nocheck
import http from "node:http";
import type { ReadableStream } from "node:stream/web";

export class IncomingMessage extends http.IncomingMessage {
constructor({
Expand All @@ -16,7 +17,7 @@ export class IncomingMessage extends http.IncomingMessage {
method: string;
url: string;
headers: Record<string, string | string[]>;
body?: Buffer;
body?: ReadableStream;
remoteAddress?: string;
}) {
super({
Expand All @@ -28,12 +29,6 @@ export class IncomingMessage extends http.IncomingMessage {
destroy: Function.prototype,
});

// Set the content length when there is a body.
// See https://httpwg.org/specs/rfc9110.html#field.content-length
if (body) {
headers["content-length"] ??= String(Buffer.byteLength(body));
}

Object.assign(this, {
ip: remoteAddress,
complete: true,
Expand All @@ -46,9 +41,53 @@ export class IncomingMessage extends http.IncomingMessage {
url,
});

this._read = () => {
this.push(body);
this.push(null);
};
this._read = (() => {
if (!body) {
return () => {
this.push(null);
};
}
const reader = body.getReader();
let reading = false;
let streamDone = false;

this.once("close", () => {
if (!streamDone) {
streamDone = true;
reader.cancel().catch(() => {});
}
Comment on lines +55 to +58

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it worth waiting for the current read to finish before cancelling, if there's on in-progress? something like toggle streamdone, wait for reading to be false, and in the pump below dont do it if streamdone is true, then cancel here after reading is false? WDYT?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we could do something like that. I don't see any downsides to this at the moment

});

const pump = () => {
reading = true;
reader
.read()
.then(({ done, value }) => {
if (done) {
streamDone = true;
reader.releaseLock();

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm shouldnt the release also happen in the catch as well?

this.push(null);
} else {
const canContinue = this.push(value);
if (canContinue) {
pump();
} else {
reading = false;
}
}
})
.catch((err) => {
streamDone = true;
reader.cancel().catch(() => {});
this.destroy(err);
});
};

return () => {
if (!reading) {
pump();
}
};
})();
}
}
8 changes: 5 additions & 3 deletions packages/core/src/overrides/converters/edge.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Buffer } from "node:buffer";
import type { ReadableStream } from "node:stream/web";

import cookieParser from "cookie";

Expand Down Expand Up @@ -32,7 +32,7 @@ const converter: Converter<InternalEvent, InternalResult | MiddlewareResult> = {
const shouldHaveBody = method !== "GET" && method !== "HEAD";

// Only read body for methods that should have one
const body = shouldHaveBody ? Buffer.from(await request.arrayBuffer()) : undefined;
const body = shouldHaveBody ? ((request.body as ReadableStream | undefined) ?? undefined) : undefined;

const cookieHeader = request.headers.get("cookie");
const cookies = cookieHeader ? (cookieParser.parse(cookieHeader) as Record<string, string>) : {};
Expand Down Expand Up @@ -106,7 +106,9 @@ const converter: Converter<InternalEvent, InternalResult | MiddlewareResult> = {
}

// We should not return a body for statusCode's that doesn't allow bodies
const body = NULL_BODY_STATUSES.has(result.statusCode) ? null : (result.body as ReadableStream);
const body = NULL_BODY_STATUSES.has(result.statusCode)
? null
: (result.body as unknown as globalThis.ReadableStream);

return new Response(body, {
status: result.statusCode,
Expand Down
13 changes: 4 additions & 9 deletions packages/core/src/overrides/converters/node.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import type { IncomingMessage } from "node:http";
import { Readable } from "node:stream";
import type { ReadableStream } from "node:stream/web";

import cookieParser from "cookie";

Expand All @@ -10,15 +12,8 @@ import { extractHostFromHeaders, getQueryFromSearchParams } from "./utils.js";
const converter: Converter = {
convertFrom: async (event: unknown) => {
const req = event as IncomingMessage & { protocol?: string };
const body = await new Promise<Buffer>((resolve) => {
const chunks: Uint8Array[] = [];
req.on("data", (chunk) => {
chunks.push(chunk);
});
req.on("end", () => {
resolve(Buffer.concat(chunks));
});
});
const shouldHaveBody = req.method !== "GET" && req.method !== "HEAD";
const body: ReadableStream | undefined = shouldHaveBody ? Readable.toWeb(req) : undefined;

const headers = Object.fromEntries(
Object.entries(req.headers ?? {})
Expand Down
23 changes: 20 additions & 3 deletions packages/core/src/overrides/proxyExternalRequest/node.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { request } from "node:https";
import { Readable } from "node:stream";
import type { ReadableStream } from "node:stream/web";

import type { InternalEvent, InternalResult } from "@/types/open-next";
import type { ProxyExternalRequest } from "@/types/overrides";
Expand Down Expand Up @@ -35,6 +36,15 @@ const nodeProxy: ProxyExternalRequest = {
const { url, headers, method, body } = internalEvent;
debug("proxyRequest", url);
return new Promise<InternalResult>((resolve, reject) => {
let hasRejected = false;
const rejectOnce = (e: Error) => {
if (hasRejected) {
return;
}

hasRejected = true;
reject(e);
};
const filteredHeaders = filterHeadersForProxy(headers);
debug("filteredHeaders", filteredHeaders);
const req = request(
Expand Down Expand Up @@ -67,15 +77,22 @@ const nodeProxy: ProxyExternalRequest = {

_res.on("error", (e) => {
error("proxyRequest error", e);
reject(e);
rejectOnce(e);
});
}
);
req.on("error", (e) => {
error("proxyRequest error", e);
rejectOnce(e);
});

if (body && method !== "GET" && method !== "HEAD") {
req.write(body);
Readable.fromWeb(body as ReadableStream<Uint8Array>)
.on("error", rejectOnce)
.pipe(req);
} else {
req.end();
}
req.end();
});
Comment thread
conico974 marked this conversation as resolved.
},
};
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/types/open-next.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ export type InternalEvent = {
readonly rawPath: string;
// Full URL - starts with "https://on/" when the host is not available
readonly url: string;
readonly body?: Buffer;
readonly body?: ReadableStream;
//TODO: change the type of headers to Record<string, string | string[]>
readonly headers: Record<string, string>;
readonly query: Record<string, string | string[]>;
Expand Down
Loading
Loading