Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,12 +202,12 @@ The following types can be passed over RPC (in arguments or return values), and
* `Uint8Array`
* `Error` and its well-known subclasses
* `ReadableStream` and `WritableStream`, with automatic flow control.
* `Headers`, `Request`, and `Response` from the Fetch API.

The following types are not supported as of this writing, but may be added in the future:
* `Map` and `Set`
* `ArrayBuffer` and typed arrays other than `Uint8Array`
* `RegExp`
* `Headers`, `Request`, and `Response`

The following are intentionally NOT supported:
* Application-defined classes that do not extend `RpcTarget`.
Expand Down
176 changes: 175 additions & 1 deletion __tests__/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,26 @@ let SERIALIZE_TEST_CASES: Record<string, unknown> = {
'["inf"]': Infinity,
'["-inf"]': -Infinity,
'["nan"]': NaN,

'["headers",[]]': new Headers(),
'["headers",[["content-type","text/plain"],["x-custom","hello"]]]':
new Headers({"Content-Type": "text/plain", "X-Custom": "hello"}),

'["request","http://example.com/",{"method":"HEAD"}]':
new Request("http://example.com/", {method: "HEAD"}),
'["request","http://example.com/",{"method":"DELETE","headers":[["x-foo","bar"]]}]':
new Request("http://example.com/", {method: "DELETE", headers: {"X-Foo": "bar"}}),
'["request","http://example.com/",{"redirect":"manual"}]':
new Request("http://example.com/", {redirect: "manual"}),

// Note: Cloudflare Workers atutomatically fills in `statusText` based on `status` while other
// platforms leave it as an empty string. So we can't actually test a totalyl empty init
// struct here, annoyingly.
'["response",null,{"statusText":"OK"}]': new Response(null, {statusText: "OK"}),
'["response",null,{"status":404,"statusText":"Not Found"}]':
new Response(null, {status: 404, statusText: "Not Found"}),
'["response",null,{"status":201,"statusText":"Hello","headers":[["x-custom","value"]]}]':
new Response(null, {status: 201, statusText: "Hello", headers: {"X-Custom": "value"}}),
};

class NotSerializable {
Expand All @@ -54,7 +74,14 @@ describe("simple serialization", () => {

it("can deserialize", () => {
for (let key in SERIALIZE_TEST_CASES) {
expect(deserialize(key)).toStrictEqual(SERIALIZE_TEST_CASES[key]);
let value = deserialize(key);
if (value instanceof Headers || value instanceof Request || value instanceof Response) {
// toStrictEqual() won't work, so test these by serializing them again and making sure
// they at least round-trip.
expect(serialize(value)).toBe(key);
} else {
expect(value).toStrictEqual(SERIALIZE_TEST_CASES[key]);
}
}
})

Expand Down Expand Up @@ -2228,3 +2255,150 @@ describe("ReadableStream over RPC", () => {
expect(cancelCalled).toBe(true);
});
});

// =======================================================================================

describe("Fetch API types over RPC", () => {
it("can send Headers over RPC", async () => {
class HeaderServer extends RpcTarget {
getHeaders() {
return new Headers({"Content-Type": "text/html", "X-Server": "test"});
}
readHeader(headers: Headers, name: string) {
return headers.get(name);
}
}

await using harness = new TestHarness(new HeaderServer());
let stub = harness.stub as any;

// Server -> Client
let headers: Headers = await stub.getHeaders();
expect(headers).toBeInstanceOf(Headers);
expect(headers.get("content-type")).toBe("text/html");
expect(headers.get("x-server")).toBe("test");

// Client -> Server
let result = await stub.readHeader(new Headers({"Authorization": "Bearer abc"}), "authorization");
expect(result).toBe("Bearer abc");
});

it("can send Request with body over RPC", async () => {
class RequestServer extends RpcTarget {
async receiveRequest(req: Request) {
return {
url: req.url,
method: req.method,
body: await req.text(),
customHeader: req.headers.get("x-custom"),
};
}
getRequest() {
return new Request("http://example.com/api", {
method: "POST",
headers: {"X-Custom": "fromserver"},
body: "server body",
});
}
}

await using harness = new TestHarness(new RequestServer());
let stub = harness.stub as any;

// Client -> Server: send request with body and headers
let result = await stub.receiveRequest(new Request("http://test.com/path", {
method: "PUT",
headers: {"X-Custom": "hello"},
body: "request body",
}));
expect(result.url).toBe("http://test.com/path");
expect(result.method).toBe("PUT");
expect(result.body).toBe("request body");
expect(result.customHeader).toBe("hello");

// Server -> Client: receive request with body
let req: Request = await stub.getRequest();
expect(req).toBeInstanceOf(Request);
expect(req.url).toBe("http://example.com/api");
expect(req.method).toBe("POST");
expect(req.headers.get("x-custom")).toBe("fromserver");
expect(await req.text()).toBe("server body");
});

it("can send Response with body over RPC", async () => {
class ResponseServer extends RpcTarget {
async receiveResponse(resp: Response) {
return {
status: resp.status,
statusText: resp.statusText,
body: await resp.text(),
customHeader: resp.headers.get("x-custom"),
};
}
getResponse() {
return new Response("hello from server", {
status: 201,
statusText: "Created",
headers: {"X-Custom": "fromserver"},
});
}
}

await using harness = new TestHarness(new ResponseServer());
let stub = harness.stub as any;

// Client -> Server: send response with body and status
let result = await stub.receiveResponse(new Response("response body", {
status: 404,
statusText: "Not Found",
headers: {"X-Custom": "value"},
}));
expect(result.status).toBe(404);
expect(result.statusText).toBe("Not Found");
expect(result.body).toBe("response body");
expect(result.customHeader).toBe("value");

// Server -> Client: receive response with body
let resp: Response = await stub.getResponse();
expect(resp).toBeInstanceOf(Response);
expect(resp.status).toBe(201);
expect(resp.statusText).toBe("Created");
expect(resp.headers.get("x-custom")).toBe("fromserver");
expect(await resp.text()).toBe("hello from server");
});

it("can send Request without body over RPC", async () => {
class RequestServer extends RpcTarget {
async receiveRequest(req: Request) {
let hasBody = req.body !== null;
if (req.body === undefined) {
// Ugh, Firefox doesn't support `request.body`, try a different approach.
hasBody = (await req.arrayBuffer()).byteLength > 0;
}

return { url: req.url, method: req.method, hasBody };
}
}

await using harness = new TestHarness(new RequestServer());
let stub = harness.stub as any;
let result = await stub.receiveRequest(new Request("http://example.com"));
expect(result.url).toBe("http://example.com/");
expect(result.method).toBe("GET");
expect(result.hasBody).toBe(false);
});

it("can send Response without body over RPC", async () => {
class ResponseServer extends RpcTarget {
receiveResponse(resp: Response) {
return { status: resp.status, hasBody: resp.body !== null };
}
}

await using harness = new TestHarness(new ResponseServer());
let stub = harness.stub as any;
let result = await stub.receiveResponse(new Response(null, {status: 204}));
expect(result.status).toBe(204);
expect(result.hasBody).toBe(false);
});
});
16 changes: 16 additions & 0 deletions protocol.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,22 @@ A JavaScript `Error` value. `type` is the name of the specific well-known `Error

_TODO: We should extend this to encode own properties that have been added to the error._

`["headers", pairs]`

A `Headers` object from the Fetch API. `pairs` is an array of `[name, value]` pairs, where both `name` and `value` are strings. For example: `["headers", [["content-type", "text/plain"], ["x-custom", "hello"]]]`.

`["request", url, init]`

A `Request` object from the Fetch API. `url` and `init` are the parameters to pass to `Request`'s constructor to create the desired `Request` instance. The sender should omit properties from `init` when their value would be the default value anyway. `init.headers`, if present, must contain an array of pairs, suitable to pass to the constructor of `Headers`. `init.body`, if present, is an expression for the response body, which must evaluate to `null`, a string, `Uint8Array`, or `ReadableStream`. Other properties of `init` must be plain values; they will not be evaluated as expressions before passing to the `Request` constructor.

At this time, `init.signal` is not supported and must not be sent, though that will change when `AbortSignal` gains support for serialization.

`["response", body, init]`

A `Response` object from the Fetch API. `body` and `init` are the parameters to pass to `Response`'s constructor to create the desired `Response` instance. `body` is an expression which must evaluate to `null`, a string, `UInt8Array`, or `ReadableStream`. `init.headers`, if present, must contain an array of pairs, suitable to pass to the constructor of `Headers`. Other properties of `init` must be plain values; they will not be evaluated as expressions before passing to the `Response` constructor.

At this time, `init.webSocket` (a Cloudflare Workers extension) is not supported and must not be sent, though that may change if `WebSocket` gains support for serialization.

`["import", importId, propertyPath, callArguments]`
`["pipeline", importId, propertyPath, callArguments]`

Expand Down
71 changes: 70 additions & 1 deletion src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ export type PropertyPath = (string | number)[];

type TypeForRpc = "unsupported" | "primitive" | "object" | "function" | "array" | "date" |
"bigint" | "bytes" | "stub" | "rpc-promise" | "rpc-target" | "rpc-thenable" | "error" |
"undefined" | "writable" | "readable";
"undefined" | "writable" | "readable" | "headers" | "request" | "response";

const AsyncFunction = (async function () {}).constructor;

Expand Down Expand Up @@ -97,6 +97,15 @@ export function typeForRpc(value: unknown): TypeForRpc {
case ReadableStream.prototype:
return "readable";

case Headers.prototype:
return "headers";

case Request.prototype:
return "request";

case Response.prototype:
return "response";

// TODO: All other structured clone types.

case RpcStub.prototype:
Expand Down Expand Up @@ -1017,6 +1026,9 @@ export class RpcPayload {
}

case "readable": {
// Note that we don't use tee() here because we treat streams as reference types -- we
// actually want to share the same body. tee()ing the stream would force the runtime to
// buffer a copy of the whole body which would usually never be read.
let stream = <ReadableStream>value;
let hook: StubHook;
if (owner) {
Expand All @@ -1028,6 +1040,37 @@ export class RpcPayload {
return stream;
}

case "headers":
return new Headers(<Headers>value);

case "request": {
let req = <Request>value;
if (req.body) {
// Note "deep-copy" of a ReadableStream always returns the same stream, but we still
// need to run it in order to handle refcounting / disposal properly.
this.deepCopy(req.body, req, "body", req, dupStubs, owner);
}

// Make an actual copy of the object, e.g. so the headers are copied.
// Note that it would be incorrect to use clone() here since that would tee() the body
// stream.
return new Request(req);
}

case "response": {
let resp = <Response>value;
if (resp.body) {
// Note "deep-copy" of a ReadableStream always returns the same stream, but we still
// need to run it in order to handle refcounting / disposal properly.
this.deepCopy(resp.body, resp, "body", resp, dupStubs, owner);
}

// Make an actual copy of the object, e.g. so the headers are copied.
// Note that it would be incorrect to use clone() here since that would tee() the body
// stream.
return new Response(resp.body, resp);
}

default:
kind satisfies never;
throw new Error("unreachable");
Expand Down Expand Up @@ -1278,6 +1321,26 @@ export class RpcPayload {
// Since thenables are promises, we don't own them, so we don't dispose them.
return;

case "headers":
// Headers have no owned resources to dispose.
return;

case "request": {
// The body may be a ReadableStream that has an associated hook in rpcTargets.
let req = <Request>value;
if (req.body) this.disposeImpl(req.body, req);
// TODO: When we support AbortSignal, we may need to dispose request.signal here?
return;
}

case "response": {
// The body may be a ReadableStream that has an associated hook in rpcTargets.
let resp = <Response>value;
if (resp.body) this.disposeImpl(resp.body, resp);
// TODO: When we support WebSocket, we may need to dispose response.webSocket here?
return;
}

case "writable": {
let stream = <WritableStream>value;
let hook = this.rpcTargets?.get(stream);
Expand Down Expand Up @@ -1347,6 +1410,9 @@ export class RpcPayload {
case "rpc-target":
case "writable":
case "readable":
case "headers":
case "request":
case "response":
return;

case "array": {
Expand Down Expand Up @@ -1496,6 +1562,9 @@ function followPath(value: unknown, parent: object | undefined,
case "bytes":
case "date":
case "error":
case "headers":
case "request":
case "response":
// These have no properties that can be accessed remotely.
value = undefined;
break;
Expand Down
Loading
Loading