diff --git a/.changeset/rpc-parse-options-config.md b/.changeset/rpc-parse-options-config.md new file mode 100644 index 0000000000..13ddff261e --- /dev/null +++ b/.changeset/rpc-parse-options-config.md @@ -0,0 +1,5 @@ +--- +"effect": patch +--- + +Add configurable schema parse concurrency to RPC client and server so schema parsing concurrency can be customized per RPC instance. diff --git a/packages/effect/src/unstable/rpc/RpcClient.ts b/packages/effect/src/unstable/rpc/RpcClient.ts index b2eb7ae9e4..72600127d2 100644 --- a/packages/effect/src/unstable/rpc/RpcClient.ts +++ b/packages/effect/src/unstable/rpc/RpcClient.ts @@ -43,6 +43,7 @@ import * as Queue from "../../Queue.ts" import * as Result from "../../Result.ts" import * as Schedule from "../../Schedule.ts" import * as Schema from "../../Schema.ts" +import type * as AST from "../../SchemaAST.ts" import * as Scope from "../../Scope.ts" import * as Stream from "../../Stream.ts" import type * as Struct from "../../Struct.ts" @@ -67,6 +68,8 @@ import * as RpcSerialization from "./RpcSerialization.ts" import * as RpcWorker from "./RpcWorker.ts" import { withRunClient } from "./Utils.ts" +type RpcParseOptions = Pick + /** * The object-shaped client generated from a union of RPC definitions, with one * method per RPC tag. @@ -649,6 +652,7 @@ export const make: readonly spanAttributes?: Record | undefined readonly generateRequestId?: (() => RequestId) | undefined readonly disableTracing?: boolean | undefined + readonly parseOptions?: RpcParseOptions | undefined readonly flatten?: Flatten | undefined } | undefined ) => Effect.Effect< @@ -662,11 +666,17 @@ export const make: readonly spanAttributes?: Record | undefined readonly generateRequestId?: (() => RequestId) | undefined readonly disableTracing?: boolean | undefined + readonly parseOptions?: RpcParseOptions | undefined readonly flatten?: Flatten | undefined } | undefined ) { const clientId = clientIdCounter++ const { run, send, supportsAck, supportsTransferables } = yield* Protocol + const parseConcurrency = options?.parseOptions?.concurrency + const parseOptions: RpcParseOptions | undefined = parseConcurrency !== undefined + ? { concurrency: parseConcurrency } + : undefined + const getRpcSchemas = makeRpcSchemas(parseOptions) type ClientEntry = { readonly rpc: Rpc.AnyWithProps @@ -689,7 +699,7 @@ export const make: const entry: ClientEntry = { rpc, context: collector ? Context.add(fiber.context, Transferable.Collector, collector) : fiber.context, - schemas: rpcSchemas(rpc) + schemas: getRpcSchemas(rpc) } entries.set(message.id, entry) @@ -796,23 +806,26 @@ interface RpcSchemas { readonly encodePayload: (payload: any) => Effect.Effect readonly decodeExit: (encoded: unknown) => Effect.Effect, Schema.SchemaError, unknown> } -const rpcSchemasCache = new WeakMap() -const rpcSchemas = (rpc: Rpc.AnyWithProps) => { - let entry = rpcSchemasCache.get(rpc) - if (entry !== undefined) { +const makeRpcSchemas = (parseOptions: RpcParseOptions | undefined) => { + const rpcSchemasCache = new WeakMap() + return (rpc: Rpc.AnyWithProps) => { + let entry = rpcSchemasCache.get(rpc) + if (entry !== undefined) { + return entry + } + const streamSchemas = RpcSchema.getStreamSchemas(rpc.successSchema) + entry = { + decodeChunk: Option.map( + streamSchemas, + (streamSchemas) => + Schema.decodeUnknownEffect(Schema.toCodecJson(Schema.NonEmptyArray(streamSchemas.success)), parseOptions) + ), + encodePayload: Schema.encodeEffect(Schema.toCodecJson(rpc.payloadSchema), parseOptions), + decodeExit: Schema.decodeUnknownEffect(Schema.toCodecJson(Rpc.exitSchema(rpc as any)), parseOptions) + } + rpcSchemasCache.set(rpc, entry) return entry } - const streamSchemas = RpcSchema.getStreamSchemas(rpc.successSchema) - entry = { - decodeChunk: Option.map( - streamSchemas, - (streamSchemas) => Schema.decodeUnknownEffect(Schema.toCodecJson(Schema.NonEmptyArray(streamSchemas.success))) - ), - encodePayload: Schema.encodeEffect(Schema.toCodecJson(rpc.payloadSchema)), - decodeExit: Schema.decodeUnknownEffect(Schema.toCodecJson(Rpc.exitSchema(rpc as any))) - } - rpcSchemasCache.set(rpc, entry) - return entry } /** diff --git a/packages/effect/src/unstable/rpc/RpcServer.ts b/packages/effect/src/unstable/rpc/RpcServer.ts index 145190763e..b728e999c3 100644 --- a/packages/effect/src/unstable/rpc/RpcServer.ts +++ b/packages/effect/src/unstable/rpc/RpcServer.ts @@ -52,6 +52,7 @@ import * as Pull from "../../Pull.ts" import * as Queue from "../../Queue.ts" import * as Schedule from "../../Schedule.ts" import * as Schema from "../../Schema.ts" +import type * as AST from "../../SchemaAST.ts" import * as Scope from "../../Scope.ts" import * as Semaphore from "../../Semaphore.ts" import { Stdio } from "../../Stdio.ts" @@ -84,6 +85,8 @@ import * as RpcSerialization from "./RpcSerialization.ts" import type { InitialMessage } from "./RpcWorker.ts" import { withRun } from "./Utils.ts" +type RpcParseOptions = Pick + /** * The decoded RPC server boundary, accepting client messages for a client id * and allowing that client to be disconnected. @@ -514,6 +517,7 @@ export const make: ( readonly spanAttributes?: Record | undefined readonly concurrency?: number | "unbounded" | undefined readonly disableFatalDefects?: boolean | undefined + readonly parseOptions?: RpcParseOptions | undefined } | undefined ) => Effect.Effect< @@ -531,6 +535,7 @@ export const make: ( readonly spanAttributes?: Record | undefined readonly concurrency?: number | "unbounded" | undefined readonly disableFatalDefects?: boolean | undefined + readonly parseOptions?: RpcParseOptions | undefined } ) { const { @@ -544,6 +549,10 @@ export const make: ( } = yield* Protocol const services = yield* Effect.context | Rpc.Middleware>() const scope = yield* Scope.make() + const parseConcurrency = options?.parseOptions?.concurrency + const parseOptions: RpcParseOptions | undefined = parseConcurrency !== undefined + ? { concurrency: parseConcurrency } + : undefined const server = yield* makeNoSerialization(group, { ...options, @@ -621,14 +630,15 @@ export const make: ( const entry = services.mapUnsafe.get(rpc.key) as Rpc.Handler const streamSchemas = RpcSchema.getStreamSchemas(rpc.successSchema) schemas = { - decode: Schema.decodeUnknownEffect(Schema.toCodecJson(rpc.payloadSchema)) as any, + decode: Schema.decodeUnknownEffect(Schema.toCodecJson(rpc.payloadSchema), parseOptions) as any, encodeChunk: Schema.encodeUnknownEffect( Schema.toCodecJson( Schema.Array(Option.isSome(streamSchemas) ? streamSchemas.value.success : Schema.Any) - ) + ), + parseOptions ) as any, - encodeExit: Schema.encodeUnknownEffect(Schema.toCodecJson(Rpc.exitSchema(rpc as any))) as any, - encodeDefect: Schema.encodeUnknownEffect(Schema.toCodecJson(rpc.defectSchema)) as any, + encodeExit: Schema.encodeUnknownEffect(Schema.toCodecJson(Rpc.exitSchema(rpc as any)), parseOptions) as any, + encodeDefect: Schema.encodeUnknownEffect(Schema.toCodecJson(rpc.defectSchema), parseOptions) as any, context: entry.context } schemasCache.set(rpc, schemas) @@ -792,6 +802,7 @@ export const layer = ( readonly spanAttributes?: Record | undefined readonly concurrency?: number | "unbounded" | undefined readonly disableFatalDefects?: boolean | undefined + readonly parseOptions?: RpcParseOptions | undefined } ): Layer.Layer< never, @@ -820,6 +831,7 @@ export const layerHttp = (options: { readonly spanAttributes?: Record | undefined readonly concurrency?: number | "unbounded" | undefined readonly disableFatalDefects?: boolean | undefined + readonly parseOptions?: RpcParseOptions | undefined }): Layer.Layer< never, never, @@ -1187,6 +1199,7 @@ export const toHttpEffect: ( readonly spanPrefix?: string | undefined readonly spanAttributes?: Record | undefined readonly disableFatalDefects?: boolean | undefined + readonly parseOptions?: RpcParseOptions | undefined } | undefined ) => Effect.Effect< Effect.Effect, @@ -1203,6 +1216,7 @@ export const toHttpEffect: ( readonly spanPrefix?: string | undefined readonly spanAttributes?: Record | undefined readonly disableFatalDefects?: boolean | undefined + readonly parseOptions?: RpcParseOptions | undefined } ) { const { httpEffect, protocol } = yield* makeProtocolWithHttpEffect @@ -1228,6 +1242,7 @@ export const toHttpEffectWebsocket: ( readonly spanPrefix?: string | undefined readonly spanAttributes?: Record | undefined readonly disableFatalDefects?: boolean | undefined + readonly parseOptions?: RpcParseOptions | undefined } | undefined ) => Effect.Effect< Effect.Effect, @@ -1244,6 +1259,7 @@ export const toHttpEffectWebsocket: ( readonly spanPrefix?: string | undefined readonly spanAttributes?: Record | undefined readonly disableFatalDefects?: boolean | undefined + readonly parseOptions?: RpcParseOptions | undefined } ) { const { httpEffect, protocol } = yield* makeProtocolWithHttpEffectWebsocket diff --git a/packages/effect/test/rpc/RpcParseOptions.test.ts b/packages/effect/test/rpc/RpcParseOptions.test.ts new file mode 100644 index 0000000000..b914aea6b2 --- /dev/null +++ b/packages/effect/test/rpc/RpcParseOptions.test.ts @@ -0,0 +1,93 @@ +import { assert, describe, it } from "@effect/vitest" +import { Deferred, Effect, Option, Queue, Schema } from "effect" +import { Rpc, RpcClient, RpcGroup, RpcServer } from "effect/unstable/rpc" +import type { FromClientEncoded, FromServerEncoded } from "effect/unstable/rpc/RpcMessage" + +const ParseOptionsGroup = RpcGroup.make( + Rpc.make("Ping", { + payload: Schema.Struct({ + value: Schema.String + }), + success: Schema.String, + defect: Schema.String + }) +) + +describe("Rpc parseOptions", () => { + it.effect("RpcClient.make forwards only parseOptions concurrency", () => + Effect.gen(function*() { + const sent = yield* Deferred.make() + const parseOptionsWithExcessProperty = { + concurrency: "unbounded" as const, + onExcessProperty: "error" as const + } + const client = yield* RpcClient.make(ParseOptionsGroup, { + parseOptions: parseOptionsWithExcessProperty + }).pipe( + Effect.provideService( + RpcClient.Protocol, + RpcClient.Protocol.of({ + run: () => Effect.never, + send: (_clientId, request) => Deferred.succeed(sent, request), + supportsAck: true, + supportsTransferables: false + }) + ) + ) + const payloadWithExcessProperty = { value: "ok", extra: "x" } + const exit = yield* Effect.exit(client.Ping(payloadWithExcessProperty, { discard: true })) + assert.strictEqual(exit._tag, "Success") + const request = yield* Deferred.await(sent) + assert.strictEqual(request._tag, "Request") + assert.strictEqual(Object.hasOwn(request.payload, "extra"), false) + }).pipe(Effect.scoped)) + + it.effect("RpcServer.make forwards only parseOptions concurrency", () => + Effect.gen(function*() { + const sent = yield* Deferred.make() + const disconnects = yield* Queue.unbounded() + const parseOptionsWithExcessProperty = { + concurrency: "unbounded" as const, + onExcessProperty: "error" as const + } + + const request = { + _tag: "Request", + id: "1", + tag: "Ping", + payload: { value: "ok", extra: "x" }, + headers: [] + } as const + + const server = RpcServer.make(ParseOptionsGroup, { + parseOptions: parseOptionsWithExcessProperty + }).pipe( + Effect.provideService( + RpcServer.Protocol, + RpcServer.Protocol.of({ + run: (f) => Effect.andThen(f(0, request), Effect.never), + disconnects, + send: (_clientId, response) => Deferred.succeed(sent, response), + end: () => Effect.void, + clientIds: Effect.succeed(new Set()), + initialMessage: Effect.succeed(Option.none()), + supportsAck: true, + supportsTransferables: false, + supportsSpanPropagation: true + }) + ), + Effect.provide(ParseOptionsGroup.toLayerHandler("Ping", () => Effect.succeed("ok"))), + Effect.forkScoped + ) + yield* server + + const response = yield* Effect.raceFirst( + Deferred.await(sent), + Effect.fail("Timed out waiting for RPC response").pipe(Effect.delay("1 second")) + ) + if (response._tag !== "Exit") { + assert.fail(`Expected Exit response, got ${response._tag}`) + } + assert.deepStrictEqual(response.exit, { _tag: "Success", value: "ok" }) + }).pipe(Effect.scoped)) +})