Comprehensive reference for @effect/rpc (v0.73.0) — type-safe, transport-agnostic RPC framework built on Effect. Define procedures with schemas, group them, implement handlers, then serve over WebSocket, HTTP, Socket, Worker, or Stdio with automatic serialization, streaming, middleware, and distributed tracing.
- Package Overview
- Module Map
- Rpc (Procedure Definition)
- RpcGroup
- RpcServer
- RpcClient
- RpcMiddleware
- RpcMessage
- RpcSerialization
- RpcSchema
- RpcTest
- RpcWorker
- RpcClientError
- Wrapper (fork / uninterruptible)
- End-to-End Example
- Architecture Flow
- Key Patterns
- Quick Reference
@effect/rpc v0.73.0
"Type-safe, transport-agnostic RPC for Effect"
Purpose: Define RPC procedures with Schema-validated payloads, success types, and errors. Group them, attach middleware, then serve over any transport — WebSocket, HTTP, raw sockets, workers, or stdio. Clients are auto-generated with full type safety.
| Dependency | Purpose |
|---|---|
@effect/platform (peer) |
HTTP server/client, Socket, Worker abstractions |
effect (peer) |
Core Effect library |
msgpackr |
MessagePack binary serialization (via platform) |
{
".": "./src/index.ts",
"./*": "./src/*.ts",
"./internal/*": null
}- Barrel export:
import { Rpc, RpcClient, RpcServer } from "@effect/rpc" - Direct module:
import * as Rpc from "@effect/rpc/Rpc"(tree-shakeable) - Internal blocked:
./internal/*mapped tonull
| Module | Purpose |
|---|---|
Rpc |
Procedure definition, type extractors, Wrapper |
RpcGroup |
Group procedures, add middleware/prefix, implement handlers |
RpcServer |
Server engine + Protocol tag + 6 transport implementations |
RpcClient |
Client engine + Protocol tag + 4 transport implementations |
RpcMiddleware |
Middleware Tag factory, server & client middleware |
RpcMessage |
Wire protocol types (FromClient, FromServer) |
RpcSerialization |
Serialization service + 5 built-in formats |
RpcSchema |
Stream schema wrapper for streaming RPCs |
RpcTest |
In-memory test client (no serialization) |
RpcWorker |
Worker-specific InitialMessage support |
RpcClientError |
Client error type (TaggedError) |
| Total | 11 public modules |
Module: Rpc.ts — Define individual RPC procedures with Schema-validated payload, success, and error types.
import { Rpc } from "@effect/rpc";
const GetUser = Rpc.make("GetUser", {
payload: { id: Schema.Number }, // Schema.Struct.Fields or Schema.Schema
success: Schema.Struct({ name: Schema.String }),
error: Schema.String,
});
// Streaming RPC
const WatchUsers = Rpc.make("WatchUsers", {
success: Schema.Struct({ name: Schema.String }),
error: Schema.String,
stream: true, // success becomes Stream, error moves to stream errors
});
// With primaryKey (for deduplication)
const GetItem = Rpc.make("GetItem", {
payload: { id: Schema.String },
primaryKey: (payload) => payload.id,
});| Option | Type | Default | Description |
|---|---|---|---|
payload |
Schema.Struct.Fields | Schema.Schema |
Schema.Void |
Request payload schema |
success |
Schema.Schema |
Schema.Void |
Success response schema |
error |
Schema.Schema |
Schema.Never |
Error schema |
stream |
boolean |
false |
Wrap success as RpcSchema.Stream |
primaryKey |
(payload) => string |
— | PrimaryKey for dedup (payload must be Struct.Fields) |
const MyRpc = Rpc.make("MyRpc")
.setPayload({ id: Schema.Number })
.setSuccess(Schema.String)
.setError(MyError)
.middleware(AuthMiddleware)
.prefix("admin.") // tag becomes "admin.MyRpc"
.annotate(tag, value)
.annotateContext(context);| Type | Extracts |
|---|---|
Rpc.Tag<R> |
The string tag |
Rpc.Payload<R> |
Decoded payload type |
Rpc.PayloadConstructor<R> |
Constructor input (uses Schema.Struct.Constructor for struct payloads) |
Rpc.Success<R> |
Decoded success type |
Rpc.SuccessSchema<R> |
Success schema (may be RpcSchema.Stream) |
Rpc.Error<R> |
Decoded error type (includes middleware errors) |
Rpc.ErrorSchema<R> |
Union of error schema + middleware failure schemas |
Rpc.Context<R> |
Schema context requirements |
Rpc.Middleware<R> |
Required middleware service identifiers |
Rpc.MiddlewareClient<R> |
Client-side middleware requirements (when requiredForClient: true) |
Rpc.Exit<R> |
Exit<SuccessExit, ErrorExit> |
Rpc.IsStream<R, Tag> |
true if the Rpc is a streaming procedure |
Rpc.ResultFrom<R, Ctx> |
Handler return type (Effect or Stream depending on stream) |
// Interop with legacy Schema.TaggedRequest
const rpc = Rpc.fromTaggedRequest(myTaggedRequestSchema);// Get the Exit schema for encoding/decoding exits on the wire
const schema = Rpc.exitSchema(myRpc);
// Schema.Schema<Exit<Success, Error>, ExitEncoded<...>>Module: RpcGroup.ts — Group procedures together, apply shared middleware/prefix, implement handlers.
import { RpcGroup } from "@effect/rpc";
const group = RpcGroup.make(GetUser, ListUsers, WatchUsers);group
.add(DeleteUser, UpdateUser) // add more procedures
.merge(otherGroup) // merge another group
.middleware(AuthMiddleware) // apply middleware to all above
.prefix("users.") // prefix all tags: "users.GetUser"
.annotate(tag, value) // annotate the group
.annotateRpcs(tag, value); // annotate all rpcs in group// All handlers at once — returns Layer<Rpc.ToHandler<Rpcs>>
const HandlersLive = group.toLayer({
GetUser: (payload, { headers, clientId }) =>
Effect.succeed({ name: "Alice" }),
ListUsers: () => Effect.succeed([{ name: "Alice" }]),
WatchUsers: () => Stream.fromIterable([{ name: "Alice" }]),
});
// Single handler
const GetUserLive = group.toLayerHandler("GetUser", (payload, { headers }) =>
Effect.succeed({ name: "Alice" })
);
// Effectful build (access services during construction)
const HandlersLive = group.toLayer(
Effect.gen(function* () {
const db = yield* Database;
return {
GetUser: (payload) => db.findUser(payload.id),
// ...
};
})
);type HandlerFn = (
payload: Rpc.Payload<Current>,
options: { readonly clientId: number; readonly headers: Headers }
) => Effect<Success, Error, R> | Stream<A, E, R> | Wrapper<...>For streaming RPCs, handlers can return:
Stream<A, E, R>— standard streamEffect<ReadonlyMailbox<A, E>, E, R>— mailbox-based streamingWrapper<Stream<...>>— with fork/uninterruptible wrapping
// Retrieve a handler from context for direct invocation
const handler = yield * group.accessHandler("GetUser");
const result = handler(payload, headers);type MyRpcs = RpcGroup.Rpcs<typeof group>;
// Union of all Rpc types in the groupModule: RpcServer.ts — Server engine that dispatches incoming RPC messages to handlers. The Protocol tag abstracts the transport layer.
class Protocol extends Context.Tag("@effect/rpc/RpcServer/Protocol")<Protocol, {
readonly run: (f: (clientId: number, data: FromClientEncoded) => Effect<void>) => Effect<never>
readonly disconnects: ReadonlyMailbox<number>
readonly send: (clientId: number, response: FromServerEncoded, transferables?: Transferable[]) => Effect<void>
readonly end: (clientId: number) => Effect<void>
readonly clientIds: Effect<ReadonlySet<number>>
readonly initialMessage: Effect<Option<unknown>>
readonly supportsAck: boolean
readonly supportsTransferables: boolean
readonly supportsSpanPropagation: boolean
}>()// Low-level: no serialization (works with decoded types)
RpcServer.makeNoSerialization(group, {
onFromServer: (response) => ...,
concurrency: 10, // default: "unbounded"
disableTracing: false,
spanPrefix: "RpcServer",
disableClientAcks: false,
disableFatalDefects: false,
})
// Full server: serialization + protocol
RpcServer.make(group, options?)
// Returns Effect<never> — runs until interrupted
// Layer variant
RpcServer.layer(group, options?)
// Returns Layer<never, never, Protocol | Handlers | Middleware>| Function | Transport | Requires |
|---|---|---|
layerProtocolWebsocket({ path }) |
WebSocket via HttpRouter | RpcSerialization |
layerProtocolWebsocketRouter({ path }) |
WebSocket via HttpLayerRouter | RpcSerialization, HttpLayerRouter |
layerProtocolHttp({ path }) |
Streaming HTTP via HttpRouter | RpcSerialization |
layerProtocolHttpRouter({ path }) |
Streaming HTTP via HttpLayerRouter | RpcSerialization, HttpLayerRouter |
layerProtocolSocketServer |
Raw Socket | RpcSerialization, SocketServer |
layerProtocolWorkerRunner |
Worker thread | WorkerRunner.PlatformRunner |
layerProtocolStdio({ stdin, stdout }) |
Stdio streams | RpcSerialization |
// All-in-one: group + path + protocol choice
RpcServer.layerHttpRouter({
group: myGroup,
path: "/rpc",
protocol: "websocket", // or "http" (default: "websocket")
concurrency: 10,
})
// HttpApp constructors (for custom routing)
RpcServer.toHttpApp(group, options?) // HTTP streaming app
RpcServer.toHttpAppWebsocket(group, options?) // WebSocket app
// Web standard handler (edge deployments)
const { handler, dispose } = RpcServer.toWebHandler(group, {
layer: HandlersLive.pipe(Layer.provide(RpcSerialization.layerJson)),
middleware: HttpMiddleware.logger,
})
// handler: (request: Request) => Promise<Response>concurrency: "unbounded"(default) — all requests run concurrentlyconcurrency: N— semaphore-based, limits N concurrent requestsRpc.fork(result)— bypasses concurrency control for that requestRpc.uninterruptible(result)— runs in uninterruptible region
RpcServer.fiberIdClientInterrupt; // FiberId(-499) — client-initiated interrupt
RpcServer.fiberIdTransientInterrupt; // FiberId(-503) — transient (shutdown, reconnect)Module: RpcClient.ts — Auto-generated typed client stubs with prefix-based namespacing.
type RpcClient<Rpcs, E> = {
readonly GetUser: (payload, options?) => Effect<User, MyError | E>;
readonly WatchUsers: (payload, options?) => Stream<User, MyError | E>;
// Prefixed RPCs create nested namespaces
readonly admin: {
readonly DeleteUser: (payload, options?) => Effect<void, AdminError | E>;
};
};// Full client with serialization
const client = yield* RpcClient.make(group, {
spanPrefix: "RpcClient",
disableTracing: false,
flatten: false, // true → flat function client(tag, payload)
})
// Low-level: no serialization
const { client, write } = yield* RpcClient.makeNoSerialization(group, {
onFromClient: ({ message, context, discard }) => ...,
supportsAck: true,
flatten: false,
})// Effect-based (non-streaming)
const user = yield * client.GetUser({ id: 1 });
// With options
const user =
yield *
client.GetUser(
{ id: 1 },
{
headers: { authorization: "Bearer ..." },
context: Context.empty(),
discard: true, // fire-and-forget (returns void)
}
);
// Stream-based
const users: Stream<User, Error> = client.WatchUsers({});
// Stream as Mailbox
const mailbox =
yield *
client.WatchUsers(
{},
{
asMailbox: true,
streamBufferSize: 16,
}
);const client = yield * RpcClient.make(group, { flatten: true });
// client: (tag: "GetUser" | ..., payload, options?) => Effect | Stream
const user = yield * client("GetUser", { id: 1 });class Protocol extends Context.Tag("@effect/rpc/RpcClient/Protocol")<Protocol, {
readonly run: (f: (data: FromServerEncoded) => Effect<void>) => Effect<never>
readonly send: (request: FromClientEncoded, transferables?: Transferable[]) => Effect<void, RpcClientError>
readonly supportsAck: boolean
readonly supportsTransferables: boolean
}>()| Function | Transport | Requires |
|---|---|---|
layerProtocolHttp({ url }) |
HTTP POST | RpcSerialization, HttpClient |
layerProtocolSocket(options?) |
WebSocket/Socket | RpcSerialization, Socket |
layerProtocolWorker({ size }) |
Worker pool | Worker.PlatformWorker, Worker.Spawner |
// Set headers for all RPCs in scope
RpcClient.withHeaders({ authorization: "Bearer ..." })(effect);
// Effectful headers
RpcClient.withHeadersEffect(getTokenEffect)(effect);
// FiberRef for current headers
RpcClient.currentHeaders; // FiberRef<Headers>RpcClient.layerProtocolSocket({
retryTransientErrors: true,
retrySchedule: Schedule.exponential(500, 1.5).pipe(
Schedule.union(Schedule.spaced(5000))
),
});RpcClient.layerProtocolWorker({
size: 4, // fixed pool
// or elastic pool:
minSize: 1,
maxSize: 8,
concurrency: 10,
targetUtilization: 0.8,
timeToLive: "5 minutes",
});Module: RpcMiddleware.ts — Middleware system with Tag-based registration, provides/wrap/optional modes.
import { RpcMiddleware } from "@effect/rpc";
// Basic middleware (runs before handler, can fail with typed error)
class AuthMiddleware extends RpcMiddleware.Tag<AuthMiddleware>()(
"AuthMiddleware",
{ failure: AuthError }
) {}
// Middleware that provides a service to the handler
class UserContext extends RpcMiddleware.Tag<UserContext>()("UserContext", {
provides: UserService, // Context.Tag to provide
failure: AuthError,
}) {}
// Optional middleware (handler runs even if middleware fails)
class Analytics extends RpcMiddleware.Tag<Analytics>()("Analytics", {
optional: true,
}) {}
// Wrap middleware (gets access to `next` — the handler effect)
class Caching extends RpcMiddleware.Tag<Caching>()("Caching", {
wrap: true,
failure: CacheError,
}) {}
// Client-required middleware (also runs on client side)
class RequestSigning extends RpcMiddleware.Tag<RequestSigning>()(
"RequestSigning",
{ requiredForClient: true }
) {}| Option | Type | Default | Description |
|---|---|---|---|
failure |
Schema.Schema.All |
Schema.Never |
Error schema (added to each Rpc's error union) |
provides |
Context.Tag<I, S> |
— | Service to provide to handler after middleware |
optional |
boolean |
false |
If true, handler runs even when middleware fails |
wrap |
boolean |
false |
If true, middleware receives next effect |
requiredForClient |
boolean |
false |
If true, client also needs this middleware |
// Standard: (options) => Effect<Provides, Error>
const AuthLive = Layer.succeed(AuthMiddleware, (options) =>
Effect.gen(function* () {
const token = Headers.get(options.headers, "authorization");
if (!token) return yield* Effect.fail(new AuthError());
return yield* verifyToken(token);
})
);
// Wrap: (options) => Effect<SuccessValue, Error>
const CachingLive = Layer.succeed(Caching, (options) =>
Effect.gen(function* () {
const cached = yield* checkCache(options.payload);
if (cached) return cached as any;
return yield* options.next; // call the actual handler
})
);// Runs on the client side, transforms the request before sending
const SigningLive = RpcMiddleware.layerClient(RequestSigning, (options) =>
Effect.succeed({
...options.request,
headers: Headers.set(
options.request.headers,
"x-sig",
sign(options.request)
),
})
);For each middleware attached to an Rpc:
- wrap middleware: wraps the handler, receives
next - provides middleware: runs before handler, provides service via
Effect.provideServiceEffect - standard middleware: runs before handler via
Effect.zipRight - optional middleware: runs before handler, failures are silently ignored
Module: RpcMessage.ts — Wire protocol message types.
| Type | Fields | Purpose |
|---|---|---|
Request<A> |
id: RequestId, tag, payload, headers, traceId?, spanId?, sampled? |
New RPC request |
Ack |
requestId |
Stream backpressure acknowledgment |
Interrupt |
requestId, interruptors |
Cancel a running request |
Eof |
— | Client is done sending |
Same structure but with id: string, headers: [string, string][], plus Ping.
| Type | Fields | Purpose |
|---|---|---|
ResponseChunk<A> |
clientId, requestId, values: NonEmptyReadonlyArray |
Stream data chunk |
ResponseExit<A> |
clientId, requestId, exit: Exit |
Request completed |
ResponseDefect |
clientId, defect |
Fatal server defect |
ClientEnd |
clientId |
Server finished with client |
Same but with requestId: string, plus Pong and ClientProtocolError.
type RequestId = Branded<bigint, RequestIdTypeId>;
const id = RequestId(42n); // from bigint
const id = RequestId("42"); // from stringRpcMessage.constEof; // { _tag: "Eof" }
RpcMessage.constPing; // { _tag: "Ping" }
RpcMessage.constPong; // { _tag: "Pong" }Module: RpcSerialization.ts — Serialization service with pluggable formats.
class RpcSerialization extends Context.Tag("@effect/rpc/RpcSerialization")<RpcSerialization, {
unsafeMake(): Parser
readonly contentType: string
readonly includesFraming: boolean
}>()interface Parser {
readonly decode: (data: Uint8Array | string) => ReadonlyArray<unknown>;
readonly encode: (response: unknown) => Uint8Array | string | undefined;
}| Format | Content-Type | Framing | Binary | Layer |
|---|---|---|---|---|
json |
application/json |
No | No | layerJson |
ndjson |
application/ndjson |
Yes | No | layerNdjson |
jsonRpc() |
application/json |
No | No | layerJsonRpc() |
ndJsonRpc() |
application/json-rpc |
Yes | No | layerNdJsonRpc() |
msgPack |
application/msgpack |
Yes | Yes | layerMsgPack |
- No framing (
includesFraming: false): Protocol handles message boundaries.decodereturns single-element array. Used with HTTP (one request = one message). - With framing (
includesFraming: true): Parser handles message boundaries itself via delimiters (newlines for NDJSON) or binary framing (MsgPack). Used with WebSocket/Socket streams.
Maps Effect RPC messages to/from JSON-RPC 2.0:
Request→{ jsonrpc: "2.0", method: tag, params: payload, id }Exit(Success)→{ jsonrpc: "2.0", id, result }Exit(Failure)→{ jsonrpc: "2.0", id, error: { code, message, data: cause } }Defect→{ jsonrpc: "2.0", id: -32603, error: { _tag: "Defect", ... } }- Control messages (
Ack,Ping, etc.) →{ method: "@effect/rpc/Ack", ... }
Supports batch requests (JSON arrays) with response correlation.
Module: RpcSchema.ts — Stream schema wrapper for streaming RPCs.
interface Stream<
A extends Schema.Any,
E extends Schema.All,
> extends Schema.Schema<
Stream<A["Type"], E["Type"]>,
Stream<A["Encoded"], E["Encoded"]>,
A["Context"] | E["Context"]
> {
readonly success: A;
readonly failure: E;
}
// Constructor
const streamSchema = RpcSchema.Stream({
success: Schema.String,
failure: Schema.String,
});RpcSchema.isStreamSchema(schema); // type guard
RpcSchema.isStreamSerializable(schema); // check WithResult schema
RpcSchema.getStreamSchemas(ast); // Option<{ success, failure }>The Stream schema uses Schema.declare internally and annotates the AST with StreamSchemaId so the server/client can detect streaming RPCs and handle them differently.
Module: RpcTest.ts — In-memory test client that bypasses serialization.
import { RpcTest } from "@effect/rpc";
const client = yield * RpcTest.makeClient(group);
// or
const client = yield * RpcTest.makeClient(group, { flatten: true });Internally wires RpcClient.makeNoSerialization directly to RpcServer.makeNoSerialization — messages pass in-memory without encoding/decoding. Requirements include Scope, handler layers, and middleware layers.
Module: RpcWorker.ts — Worker-specific InitialMessage support.
class InitialMessage extends Context.Tag("@effect/rpc/RpcWorker/InitialMessage")<
InitialMessage,
Effect<readonly [data: unknown, transfers: Transferable[]]>
>()// Create initial message with schema encoding + transferable collection
RpcWorker.makeInitialMessage(schema, effect);
// Layer for initial message
RpcWorker.layerInitialMessage(schema, build);
// Read initial message on server side
const msg = yield * RpcWorker.initialMessage(schema);
// Effect<A, NoSuchElementException | ParseError, Protocol | R>Module: RpcClientError.ts — Client-side error type.
class RpcClientError extends Schema.TaggedError<RpcClientError>(
"@effect/rpc/RpcClientError"
)("RpcClientError", {
reason: Schema.Literal("Protocol", "Unknown"),
message: Schema.String,
cause: Schema.optional(Schema.Defect),
})All client protocol implementations emit RpcClientError for transport failures (connection errors, decode failures, etc.).
Module: Rpc.ts (Wrapper section) — Control execution behavior of handler responses.
// Fork: bypass server concurrency control, run concurrently regardless
const handler = (payload) => Rpc.fork(Effect.succeed(result));
// Uninterruptible: run in uninterruptible region
const handler = (payload) => Rpc.uninterruptible(Effect.succeed(result));
// Both: composable
const handler = (payload) => Rpc.fork(Rpc.uninterruptible(myEffect));
// General form
Rpc.wrap({ fork: true, uninterruptible: true })(myEffect);Rpc.fork(value)— ensures the handler runs concurrently even whenRpcServerhasconcurrency: NRpc.uninterruptible(value)— wraps in an uninterruptible region- Works with both
EffectandStreamreturn values Rpc.isWrapper(obj)— type guard
import { Rpc, RpcGroup, RpcSchema } from "@effect/rpc";
import * as Schema from "effect/Schema";
const GetUser = Rpc.make("GetUser", {
payload: { id: Schema.Number },
success: Schema.Struct({ name: Schema.String, age: Schema.Number }),
error: Schema.String,
});
const WatchUsers = Rpc.make("WatchUsers", {
success: Schema.Struct({ name: Schema.String }),
error: Schema.String,
stream: true,
});
const group = RpcGroup.make(GetUser, WatchUsers);const HandlersLive = group.toLayer({
GetUser: ({ id }) => Effect.succeed({ name: "Alice", age: 30 }),
WatchUsers: () => Stream.fromIterable([{ name: "Alice" }, { name: "Bob" }]),
});import { RpcServer, RpcSerialization } from "@effect/rpc";
import { HttpLayerRouter } from "@effect/platform";
import { NodeHttpServer } from "@effect/platform-node";
const ServerLive = RpcServer.layerHttpRouter({
group,
path: "/rpc",
protocol: "websocket",
}).pipe(
Layer.provide(HandlersLive),
Layer.provide(RpcSerialization.layerJson),
Layer.provide(HttpLayerRouter.layer),
Layer.provide(NodeHttpServer.layer({ port: 3000 }))
);import { RpcClient } from "@effect/rpc";
import { Socket } from "@effect/platform";
const ClientLive = Layer.scoped(MyClient, RpcClient.make(group)).pipe(
Layer.provide(RpcClient.layerProtocolSocket()),
Layer.provide(RpcSerialization.layerJson),
Layer.provide(Socket.layerWebSocketConstructor),
Layer.provide(Socket.makeWebSocket("ws://localhost:3000/rpc"))
);
// Usage
const user = yield * client.GetUser({ id: 1 });
const users = yield * Stream.runCollect(client.WatchUsers({}));const testClient =
yield * RpcTest.makeClient(group).pipe(Effect.provide(HandlersLive));
const user = yield * testClient.GetUser({ id: 1 });CLIENT SERVER
────── ──────
Rpc.make("GetUser", { ... }) ← Shared definition
│
RpcClient.make(group)
→ onRequest(rpc)
→ encode payload via Schema
→ attach headers, traceId, spanId
→ Protocol.send({ Protocol.run(callback)
_tag: "Request", → callback(clientId, encoded)
id, tag, payload, ──────► → RpcSerialization.decode
headers, traceId → Schema.decode(payload)
}) → dispatch to Handler
→ handler(payload, { clientId, headers })
→ applyMiddleware(rpc, ...)
→ Effect/Stream result
│
├─ Effect → Exit
│ → Schema.encode(exit)
│ → Protocol.send({
│ _tag: "Exit",
Protocol.run(callback) ◄────── │ requestId, exit
→ callback(encoded) │ })
→ Schema.decode(exit) │
→ resume caller │
├─ Stream → Chunks + Exit
│ → for each chunk:
Protocol.run ◄──────────── │ Protocol.send({
→ decode chunk │ _tag: "Chunk",
→ mailbox.offerAll │ requestId, values
→ Protocol.send({ │ })
_tag: "Ack" ────────────► │ → await Ack (backpressure)
}) │
│ → final Exit when done
Server sends Chunk → Client receives → Client processes → Client sends Ack → Server sends next Chunk
Only active when both client and server supportsAck: true (WebSocket, Socket, Worker). HTTP protocol disables acks.
Client sends Ping every 10s → Server responds with Pong
If no Pong received → Client reconnects with retry schedule
Both RpcServer and RpcClient define a Protocol service tag. The protocol implementations are thin adapters — the core RPC logic is transport-agnostic:
// Server protocols: WebSocket, HTTP, Socket, Worker, Stdio
// Client protocols: HTTP, Socket, Worker
// Test: Direct in-memory wiringThe internal withRun utility solves a chicken-and-egg problem: the protocol needs a write function, but write depends on the protocol being ready. withRun buffers messages during initialization, then replays them when run takes over.
Every RPC's payload, success, and error types flow through Schema for:
- Compile-time type checking (client stubs are fully typed)
- Runtime validation (payload decoded on server, exit decoded on client)
- Serialization (encoding/decoding for wire transport)
group.prefix("users.");
// "GetUser" → "users.GetUser"
// Client type: { readonly users: { readonly GetUser: ... } }The dot separator creates nested client objects automatically.
stream: true in Rpc.make changes the entire pipeline:
- Success schema wraps in
RpcSchema.Stream - Server sends
Chunkmessages instead of a singleExit - Client returns
Stream(orMailboxwithasMailbox: true) - Backpressure via Ack protocol
Middleware failure schemas are automatically unioned into each Rpc's error type:
class Auth extends RpcMiddleware.Tag<Auth>()("Auth", {
failure: AuthError
}) {}
const MyRpc = Rpc.make("MyRpc", { ... }).middleware(Auth)
// Rpc.Error<typeof MyRpc> includes AuthErrorWhen requiredForClient: true, middleware runs on both sides:
- Server: Standard middleware execution
- Client:
RpcMiddleware.layerClient(tag, fn)— transforms the request before sending
Server concurrency limits parallel request handling. Rpc.fork() lets specific handlers opt out of the limit for high-priority or long-running requests.
// Barrel
import {
Rpc,
RpcGroup,
RpcServer,
RpcClient,
RpcMiddleware,
RpcSerialization,
} from "@effect/rpc";
// Direct (tree-shakeable)
import * as RpcServer from "@effect/rpc/RpcServer";Layer.mergeAll(
HandlersLive, // group.toLayer(...)
MiddlewareLive // Layer.succeed(AuthMiddleware, ...)
).pipe(
Layer.provide(
RpcServer.layerHttpRouter({
group,
path: "/rpc",
})
),
Layer.provide(RpcSerialization.layerJson),
Layer.provide(HttpLayerRouter.layer),
Layer.provide(NodeHttpServer.layer({ port: 3000 }))
);RpcClient.make(group).pipe(
Layer.provide(RpcClient.layerProtocolSocket()),
Layer.provide(RpcSerialization.layerJson),
Layer.provide(socketLayer)
);| Scenario | Format |
|---|---|
| WebSocket / Socket | layerJson (framing handled by transport) |
| HTTP streaming | layerNdjson (self-framing) |
| Binary data / performance | layerMsgPack (compact, binary-native) |
| JSON-RPC compatibility | layerJsonRpc() or layerNdJsonRpc() |
| Feature | WebSocket | HTTP | Socket | Worker | Stdio |
|---|---|---|---|---|---|
| Ack (backpressure) | Yes | No | Yes | Yes | Yes |
| Span propagation | Yes | No | Yes | Yes | Yes |
| Transferables | No | No | No | Yes | No |
| Ping/Pong | Yes (client) | No | Yes (client) | No | No |
| Multi-client | Yes | Yes | Yes | Yes | No |