Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/rpc-parse-options-config.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"effect": patch
---

Add configurable schema parse concurrency to RPC client and server so schema parsing concurrency can be customized per RPC instance.
45 changes: 29 additions & 16 deletions packages/effect/src/unstable/rpc/RpcClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import * as Queue from "../../Queue.ts"
import * as Result from "../../Result.ts"
import * as Schedule from "../../Schedule.ts"
import * as Schema from "../../Schema.ts"
import type * as AST from "../../SchemaAST.ts"
import * as Scope from "../../Scope.ts"
import * as Stream from "../../Stream.ts"
import type * as Struct from "../../Struct.ts"
Expand All @@ -67,6 +68,8 @@ import * as RpcSerialization from "./RpcSerialization.ts"
import * as RpcWorker from "./RpcWorker.ts"
import { withRunClient } from "./Utils.ts"

type RpcParseOptions = Pick<AST.ParseOptions, "concurrency">

/**
* The object-shaped client generated from a union of RPC definitions, with one
* method per RPC tag.
Expand Down Expand Up @@ -649,6 +652,7 @@ export const make: <Rpcs extends Rpc.Any, const Flatten extends boolean = false>
readonly spanAttributes?: Record<string, unknown> | undefined
readonly generateRequestId?: (() => RequestId) | undefined
readonly disableTracing?: boolean | undefined
readonly parseOptions?: RpcParseOptions | undefined
readonly flatten?: Flatten | undefined
} | undefined
) => Effect.Effect<
Expand All @@ -662,11 +666,17 @@ export const make: <Rpcs extends Rpc.Any, const Flatten extends boolean = false>
readonly spanAttributes?: Record<string, unknown> | undefined
readonly generateRequestId?: (() => RequestId) | undefined
readonly disableTracing?: boolean | undefined
readonly parseOptions?: RpcParseOptions | undefined
readonly flatten?: Flatten | undefined
} | undefined
) {
const clientId = clientIdCounter++
const { run, send, supportsAck, supportsTransferables } = yield* Protocol
const parseConcurrency = options?.parseOptions?.concurrency
const parseOptions: RpcParseOptions | undefined = parseConcurrency !== undefined
? { concurrency: parseConcurrency }
: undefined
const getRpcSchemas = makeRpcSchemas(parseOptions)

type ClientEntry = {
readonly rpc: Rpc.AnyWithProps
Expand All @@ -689,7 +699,7 @@ export const make: <Rpcs extends Rpc.Any, const Flatten extends boolean = false>
const entry: ClientEntry = {
rpc,
context: collector ? Context.add(fiber.context, Transferable.Collector, collector) : fiber.context,
schemas: rpcSchemas(rpc)
schemas: getRpcSchemas(rpc)
}
entries.set(message.id, entry)

Expand Down Expand Up @@ -796,23 +806,26 @@ interface RpcSchemas {
readonly encodePayload: (payload: any) => Effect.Effect<any, Schema.SchemaError, unknown>
readonly decodeExit: (encoded: unknown) => Effect.Effect<Exit.Exit<any, any>, Schema.SchemaError, unknown>
}
const rpcSchemasCache = new WeakMap<Rpc.AnyWithProps, RpcSchemas>()
const rpcSchemas = (rpc: Rpc.AnyWithProps) => {
let entry = rpcSchemasCache.get(rpc)
if (entry !== undefined) {
const makeRpcSchemas = (parseOptions: RpcParseOptions | undefined) => {
const rpcSchemasCache = new WeakMap<Rpc.AnyWithProps, RpcSchemas>()
return (rpc: Rpc.AnyWithProps) => {
let entry = rpcSchemasCache.get(rpc)
if (entry !== undefined) {
return entry
}
const streamSchemas = RpcSchema.getStreamSchemas(rpc.successSchema)
entry = {
decodeChunk: Option.map(
streamSchemas,
(streamSchemas) =>
Schema.decodeUnknownEffect(Schema.toCodecJson(Schema.NonEmptyArray(streamSchemas.success)), parseOptions)
),
encodePayload: Schema.encodeEffect(Schema.toCodecJson(rpc.payloadSchema), parseOptions),
decodeExit: Schema.decodeUnknownEffect(Schema.toCodecJson(Rpc.exitSchema(rpc as any)), parseOptions)
}
rpcSchemasCache.set(rpc, entry)
return entry
}
const streamSchemas = RpcSchema.getStreamSchemas(rpc.successSchema)
entry = {
decodeChunk: Option.map(
streamSchemas,
(streamSchemas) => Schema.decodeUnknownEffect(Schema.toCodecJson(Schema.NonEmptyArray(streamSchemas.success)))
),
encodePayload: Schema.encodeEffect(Schema.toCodecJson(rpc.payloadSchema)),
decodeExit: Schema.decodeUnknownEffect(Schema.toCodecJson(Rpc.exitSchema(rpc as any)))
}
rpcSchemasCache.set(rpc, entry)
return entry
}

/**
Expand Down
24 changes: 20 additions & 4 deletions packages/effect/src/unstable/rpc/RpcServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import * as Pull from "../../Pull.ts"
import * as Queue from "../../Queue.ts"
import * as Schedule from "../../Schedule.ts"
import * as Schema from "../../Schema.ts"
import type * as AST from "../../SchemaAST.ts"
import * as Scope from "../../Scope.ts"
import * as Semaphore from "../../Semaphore.ts"
import { Stdio } from "../../Stdio.ts"
Expand Down Expand Up @@ -84,6 +85,8 @@ import * as RpcSerialization from "./RpcSerialization.ts"
import type { InitialMessage } from "./RpcWorker.ts"
import { withRun } from "./Utils.ts"

type RpcParseOptions = Pick<AST.ParseOptions, "concurrency">

/**
* The decoded RPC server boundary, accepting client messages for a client id
* and allowing that client to be disconnected.
Expand Down Expand Up @@ -514,6 +517,7 @@ export const make: <Rpcs extends Rpc.Any>(
readonly spanAttributes?: Record<string, unknown> | undefined
readonly concurrency?: number | "unbounded" | undefined
readonly disableFatalDefects?: boolean | undefined
readonly parseOptions?: RpcParseOptions | undefined
}
| undefined
) => Effect.Effect<
Expand All @@ -531,6 +535,7 @@ export const make: <Rpcs extends Rpc.Any>(
readonly spanAttributes?: Record<string, unknown> | undefined
readonly concurrency?: number | "unbounded" | undefined
readonly disableFatalDefects?: boolean | undefined
readonly parseOptions?: RpcParseOptions | undefined
}
) {
const {
Expand All @@ -544,6 +549,10 @@ export const make: <Rpcs extends Rpc.Any>(
} = yield* Protocol
const services = yield* Effect.context<Rpc.ToHandler<Rpcs> | Rpc.Middleware<Rpcs>>()
const scope = yield* Scope.make()
const parseConcurrency = options?.parseOptions?.concurrency
const parseOptions: RpcParseOptions | undefined = parseConcurrency !== undefined
? { concurrency: parseConcurrency }
: undefined

const server = yield* makeNoSerialization(group, {
...options,
Expand Down Expand Up @@ -621,14 +630,15 @@ export const make: <Rpcs extends Rpc.Any>(
const entry = services.mapUnsafe.get(rpc.key) as Rpc.Handler<Rpcs["_tag"]>
const streamSchemas = RpcSchema.getStreamSchemas(rpc.successSchema)
schemas = {
decode: Schema.decodeUnknownEffect(Schema.toCodecJson(rpc.payloadSchema)) as any,
decode: Schema.decodeUnknownEffect(Schema.toCodecJson(rpc.payloadSchema), parseOptions) as any,
encodeChunk: Schema.encodeUnknownEffect(
Schema.toCodecJson(
Schema.Array(Option.isSome(streamSchemas) ? streamSchemas.value.success : Schema.Any)
)
),
parseOptions
) as any,
encodeExit: Schema.encodeUnknownEffect(Schema.toCodecJson(Rpc.exitSchema(rpc as any))) as any,
encodeDefect: Schema.encodeUnknownEffect(Schema.toCodecJson(rpc.defectSchema)) as any,
encodeExit: Schema.encodeUnknownEffect(Schema.toCodecJson(Rpc.exitSchema(rpc as any)), parseOptions) as any,
encodeDefect: Schema.encodeUnknownEffect(Schema.toCodecJson(rpc.defectSchema), parseOptions) as any,
context: entry.context
}
schemasCache.set(rpc, schemas)
Expand Down Expand Up @@ -792,6 +802,7 @@ export const layer = <Rpcs extends Rpc.Any>(
readonly spanAttributes?: Record<string, unknown> | undefined
readonly concurrency?: number | "unbounded" | undefined
readonly disableFatalDefects?: boolean | undefined
readonly parseOptions?: RpcParseOptions | undefined
}
): Layer.Layer<
never,
Expand Down Expand Up @@ -820,6 +831,7 @@ export const layerHttp = <Rpcs extends Rpc.Any>(options: {
readonly spanAttributes?: Record<string, unknown> | undefined
readonly concurrency?: number | "unbounded" | undefined
readonly disableFatalDefects?: boolean | undefined
readonly parseOptions?: RpcParseOptions | undefined
}): Layer.Layer<
never,
never,
Expand Down Expand Up @@ -1187,6 +1199,7 @@ export const toHttpEffect: <Rpcs extends Rpc.Any>(
readonly spanPrefix?: string | undefined
readonly spanAttributes?: Record<string, unknown> | undefined
readonly disableFatalDefects?: boolean | undefined
readonly parseOptions?: RpcParseOptions | undefined
} | undefined
) => Effect.Effect<
Effect.Effect<HttpServerResponse.HttpServerResponse, never, Scope.Scope | HttpServerRequest.HttpServerRequest>,
Expand All @@ -1203,6 +1216,7 @@ export const toHttpEffect: <Rpcs extends Rpc.Any>(
readonly spanPrefix?: string | undefined
readonly spanAttributes?: Record<string, unknown> | undefined
readonly disableFatalDefects?: boolean | undefined
readonly parseOptions?: RpcParseOptions | undefined
}
) {
const { httpEffect, protocol } = yield* makeProtocolWithHttpEffect
Expand All @@ -1228,6 +1242,7 @@ export const toHttpEffectWebsocket: <Rpcs extends Rpc.Any>(
readonly spanPrefix?: string | undefined
readonly spanAttributes?: Record<string, unknown> | undefined
readonly disableFatalDefects?: boolean | undefined
readonly parseOptions?: RpcParseOptions | undefined
} | undefined
) => Effect.Effect<
Effect.Effect<HttpServerResponse.HttpServerResponse, never, Scope.Scope | HttpServerRequest.HttpServerRequest>,
Expand All @@ -1244,6 +1259,7 @@ export const toHttpEffectWebsocket: <Rpcs extends Rpc.Any>(
readonly spanPrefix?: string | undefined
readonly spanAttributes?: Record<string, unknown> | undefined
readonly disableFatalDefects?: boolean | undefined
readonly parseOptions?: RpcParseOptions | undefined
}
) {
const { httpEffect, protocol } = yield* makeProtocolWithHttpEffectWebsocket
Expand Down
93 changes: 93 additions & 0 deletions packages/effect/test/rpc/RpcParseOptions.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import { assert, describe, it } from "@effect/vitest"
import { Deferred, Effect, Option, Queue, Schema } from "effect"
import { Rpc, RpcClient, RpcGroup, RpcServer } from "effect/unstable/rpc"
import type { FromClientEncoded, FromServerEncoded } from "effect/unstable/rpc/RpcMessage"

const ParseOptionsGroup = RpcGroup.make(
Rpc.make("Ping", {
payload: Schema.Struct({
value: Schema.String
}),
success: Schema.String,
defect: Schema.String
})
)

describe("Rpc parseOptions", () => {
it.effect("RpcClient.make forwards only parseOptions concurrency", () =>
Effect.gen(function*() {
const sent = yield* Deferred.make<FromClientEncoded>()
const parseOptionsWithExcessProperty = {
concurrency: "unbounded" as const,
onExcessProperty: "error" as const
}
const client = yield* RpcClient.make(ParseOptionsGroup, {
parseOptions: parseOptionsWithExcessProperty
}).pipe(
Effect.provideService(
RpcClient.Protocol,
RpcClient.Protocol.of({
run: () => Effect.never,
send: (_clientId, request) => Deferred.succeed(sent, request),
supportsAck: true,
supportsTransferables: false
})
)
)
const payloadWithExcessProperty = { value: "ok", extra: "x" }
const exit = yield* Effect.exit(client.Ping(payloadWithExcessProperty, { discard: true }))
assert.strictEqual(exit._tag, "Success")
const request = yield* Deferred.await(sent)
assert.strictEqual(request._tag, "Request")
assert.strictEqual(Object.hasOwn(request.payload, "extra"), false)
}).pipe(Effect.scoped))

it.effect("RpcServer.make forwards only parseOptions concurrency", () =>
Effect.gen(function*() {
const sent = yield* Deferred.make<FromServerEncoded>()
const disconnects = yield* Queue.unbounded<number>()
const parseOptionsWithExcessProperty = {
concurrency: "unbounded" as const,
onExcessProperty: "error" as const
}

const request = {
_tag: "Request",
id: "1",
tag: "Ping",
payload: { value: "ok", extra: "x" },
headers: []
} as const

const server = RpcServer.make(ParseOptionsGroup, {
parseOptions: parseOptionsWithExcessProperty
}).pipe(
Effect.provideService(
RpcServer.Protocol,
RpcServer.Protocol.of({
run: (f) => Effect.andThen(f(0, request), Effect.never),
disconnects,
send: (_clientId, response) => Deferred.succeed(sent, response),
end: () => Effect.void,
clientIds: Effect.succeed(new Set()),
initialMessage: Effect.succeed(Option.none()),
supportsAck: true,
supportsTransferables: false,
supportsSpanPropagation: true
})
),
Effect.provide(ParseOptionsGroup.toLayerHandler("Ping", () => Effect.succeed("ok"))),
Effect.forkScoped
)
yield* server

const response = yield* Effect.raceFirst(
Deferred.await(sent),
Effect.fail("Timed out waiting for RPC response").pipe(Effect.delay("1 second"))
)
if (response._tag !== "Exit") {
assert.fail(`Expected Exit response, got ${response._tag}`)
}
assert.deepStrictEqual(response.exit, { _tag: "Success", value: "ok" })
}).pipe(Effect.scoped))
})