diff --git a/.changeset/calm-seas-smile.md b/.changeset/calm-seas-smile.md new file mode 100644 index 0000000000..bde18ad7fb --- /dev/null +++ b/.changeset/calm-seas-smile.md @@ -0,0 +1,5 @@ +--- +"effect": patch +--- + +Align workflow tags with RPCs by changing `Workflow.make` to accept the tag as its first argument, exposing workflow tags as `_tag`, and supporting `class MyWorkflow extends Workflow.make(...) {}`. diff --git a/packages/effect/src/unstable/cluster/ClusterWorkflowEngine.ts b/packages/effect/src/unstable/cluster/ClusterWorkflowEngine.ts index 10f8ba703a..c7fc4b23a4 100644 --- a/packages/effect/src/unstable/cluster/ClusterWorkflowEngine.ts +++ b/packages/effect/src/unstable/cluster/ClusterWorkflowEngine.ts @@ -120,11 +120,11 @@ export const make = Effect.gen(function*() { > >() const ensureEntity = (workflow: Workflow.Any) => { - let entity = entities.get(workflow.name) + let entity = entities.get(workflow._tag) if (!entity) { entity = makeWorkflowEntity(workflow) as any - workflows.set(workflow.name, workflow) - entities.set(workflow.name, entity as any) + workflows.set(workflow._tag, workflow) + entities.set(workflow._tag, entity as any) } return entity! } @@ -248,7 +248,7 @@ export const make = Effect.gen(function*() { }) { const requestId = yield* requestIdFor({ workflow: options.workflow, - entityType: `Workflow/${options.workflow.name}`, + entityType: `Workflow/${options.workflow._tag}`, executionId: options.executionId, tag: "activity", id: activityPrimaryKey(options.activity.name, options.attempt) @@ -283,7 +283,7 @@ export const make = Effect.gen(function*() { const resume = Effect.fnUntraced(function*(workflow: Workflow.Any, executionId: string) { const maybeReply = yield* requestReply({ workflow, - entityType: `Workflow/${workflow.name}`, + entityType: `Workflow/${workflow._tag}`, executionId, tag: "run", id: "" @@ -324,7 +324,7 @@ export const make = Effect.gen(function*() { ensureEntity(workflow) const requestId = yield* requestIdFor({ workflow, - entityType: `Workflow/${workflow.name}`, + entityType: `Workflow/${workflow._tag}`, executionId, tag: "run", id: "" @@ -343,7 +343,7 @@ export const make = Effect.gen(function*() { } yield* engine.deferredDone(InterruptSignal, { - workflowName: workflow.name, + workflowName: workflow._tag, executionId, deferredName: InterruptSignal.name, exit: Exit.void @@ -457,13 +457,13 @@ export const make = Effect.gen(function*() { execute: (workflow, { discard, executionId, parent, payload }) => { ensureEntity(workflow) - return RcMap.get(clients, workflow.name).pipe( + return RcMap.get(clients, workflow._tag).pipe( Effect.flatMap((make) => make(executionId).run( parent ? { ...payload, - [payloadParentKey]: { workflowName: parent.workflow.name, executionId: parent.executionId } + [payloadParentKey]: { workflowName: parent.workflow._tag, executionId: parent.executionId } } : payload, { discard } @@ -479,7 +479,7 @@ export const make = Effect.gen(function*() { const exitSchema = Schema.toCodecJson(Rpc.exitSchema(entity.protocol.requests.get("run")!)) const reply = yield* requestReply({ workflow, - entityType: `Workflow/${workflow.name}`, + entityType: `Workflow/${workflow._tag}`, executionId, tag: "run", id: "" @@ -505,7 +505,7 @@ export const make = Effect.gen(function*() { id: yield* sharding.getSnowflake, address: entityAddressFor({ workflow, - entityType: `Workflow/${workflow.name}`, + entityType: `Workflow/${workflow._tag}`, executionId }), requestId: requestId.value @@ -523,7 +523,7 @@ export const make = Effect.gen(function*() { const instance = Context.get(services, WorkflowEngine.WorkflowInstance) yield* Effect.annotateCurrentSpan("executionId", instance.executionId) const activityId = `${instance.executionId}/${activity.name}` - const client = (yield* RcMap.get(clientsPartial, instance.workflow.name))(instance.executionId) + const client = (yield* RcMap.get(clientsPartial, instance.workflow._tag))(instance.executionId) while (true) { if (!activities.has(activityId)) { activities.set(activityId, { activity, context: services }) @@ -563,7 +563,7 @@ export const make = Effect.gen(function*() { Effect.flatMap((instance) => requestReply({ workflow: instance.workflow, - entityType: `Workflow/${instance.workflow.name}`, + entityType: `Workflow/${instance.workflow._tag}`, executionId: instance.executionId, tag: "deferred", id: deferred.name @@ -628,7 +628,7 @@ export const make = Effect.gen(function*() { }), payload: { name: options.clock.name, - workflowName: workflow.name, + workflowName: workflow._tag, wakeUp: DateTime.addDuration(now, options.clock.duration) } }) @@ -701,7 +701,7 @@ const ResumeRpc = Rpc.make("resume", { const payloadParentKey = "~effect/cluster/ClusterWorkflowEngine/payloadParentKey" const makeWorkflowEntity = (workflow: Workflow.Any) => - Entity.make(`Workflow/${workflow.name}`, [ + Entity.make(`Workflow/${workflow._tag}`, [ Rpc.make("run", { payload: { ...workflow.payloadSchema.fields, diff --git a/packages/effect/src/unstable/workflow/DurableDeferred.ts b/packages/effect/src/unstable/workflow/DurableDeferred.ts index 7cc767a1fd..e4522935d8 100644 --- a/packages/effect/src/unstable/workflow/DurableDeferred.ts +++ b/packages/effect/src/unstable/workflow/DurableDeferred.ts @@ -246,7 +246,7 @@ export const into: { } } yield* engine.deferredDone(self, { - workflowName: instance.workflow.name, + workflowName: instance.workflow._tag, executionId: instance.executionId, deferredName: self.name, exit @@ -455,7 +455,7 @@ export const tokenFromExecutionId: { } ): Token => new TokenParsed({ - workflowName: options.workflow.name, + workflowName: options.workflow._tag, executionId: options.executionId, deferredName: self.name }).asToken diff --git a/packages/effect/src/unstable/workflow/DurableQueue.ts b/packages/effect/src/unstable/workflow/DurableQueue.ts index 83ca2590b0..811cc37ac4 100644 --- a/packages/effect/src/unstable/workflow/DurableQueue.ts +++ b/packages/effect/src/unstable/workflow/DurableQueue.ts @@ -104,8 +104,7 @@ export interface DurableQueue< * } * }) * - * const MyWorkflow = Workflow.make({ - * name: "MyWorkflow", + * const MyWorkflow = Workflow.make("MyWorkflow", { * payload: { * id: Schema.String * }, diff --git a/packages/effect/src/unstable/workflow/Workflow.ts b/packages/effect/src/unstable/workflow/Workflow.ts index a152095eae..9650f24f67 100644 --- a/packages/effect/src/unstable/workflow/Workflow.ts +++ b/packages/effect/src/unstable/workflow/Workflow.ts @@ -15,7 +15,7 @@ * effects, not nested activities. * * When exposing workflows through `WorkflowProxy`, remember that proxy APIs are - * derived from the workflow name and schemas. Discard execution returns the + * derived from the workflow tag and schemas. Discard execution returns the * `executionId` instead of the workflow result, resume requires the persisted * `executionId`, and idempotency keys must remain stable for the same logical * request. @@ -55,17 +55,21 @@ const TypeId = "~effect/workflow/Workflow" * @since 4.0.0 */ export interface Workflow< - Name extends string, + Tag extends string, Payload extends AnyStructSchema, Success extends Schema.Top, Error extends Schema.Top > { + new(_: never): {} + readonly [TypeId]: typeof TypeId - readonly name: Name + readonly _tag: Tag readonly payloadSchema: Payload readonly successSchema: Success readonly errorSchema: Error readonly annotations: Context.Context + readonly idempotencyKey: (payload: Payload["Type"]) => string + readonly suspendedRetrySchedule?: Schedule.Schedule | undefined /** * Add an annotation to the workflow. @@ -73,14 +77,14 @@ export interface Workflow< annotate( key: Context.Key, value: S - ): Workflow + ): Workflow /** * Merge multiple annotations into the workflow. */ annotateMerge( annotations: Context.Context - ): Workflow + ): Workflow /** * Execute the workflow with the given payload. @@ -139,7 +143,7 @@ export interface Workflow< | WorkflowEngine | Exclude< R, - WorkflowEngine | WorkflowInstance | Execution | Scope.Scope + WorkflowEngine | WorkflowInstance | Execution | Scope.Scope > | Payload["DecodingServices"] | Payload["EncodingServices"] @@ -178,7 +182,7 @@ export interface Workflow< ) => Effect.Effect< A, E, - R | R2 | WorkflowInstance | Execution | Scope.Scope + R | R2 | WorkflowInstance | Execution | Scope.Scope > ( effect: Effect.Effect, @@ -189,7 +193,7 @@ export interface Workflow< ): Effect.Effect< A, E, - R | R2 | WorkflowInstance | Execution | Scope.Scope + R | R2 | WorkflowInstance | Execution | Scope.Scope > } } @@ -206,14 +210,14 @@ export interface AnyStructSchema extends Schema.Top { /** * Type-level marker for services associated with a specific workflow - * execution name. + * execution tag. * * @category models * @since 4.0.0 */ -export interface Execution { +export interface Execution { readonly _: unique symbol - readonly name: Name + readonly _tag: Tag } /** @@ -224,13 +228,17 @@ export interface Execution { * @since 4.0.0 */ export interface Any { + new(_: never): {} + readonly [TypeId]: typeof TypeId - readonly name: string + readonly _tag: string readonly executionId: (payload: any) => Effect.Effect readonly payloadSchema: AnyStructSchema readonly successSchema: Schema.Top readonly errorSchema: Schema.Top readonly annotations: Context.Context + readonly idempotencyKey: (payload: any) => string + readonly suspendedRetrySchedule?: Schedule.Schedule | undefined } /** @@ -317,21 +325,125 @@ const InstanceTag = Context.Service< "effect/workflow/WorkflowEngine/WorkflowInstance" satisfies typeof WorkflowInstance.key ) +const makeExecutionIdFromPayload = (self: AnyWithProps, payload: unknown) => + makeHashDigest(`${self._tag}-${self.idempotencyKey(payload)}`) + +const Proto = { + [TypeId]: TypeId, + annotate(this: AnyWithProps, tag: Context.Key, value: any) { + return makeProto({ + _tag: this._tag, + payloadSchema: this.payloadSchema, + successSchema: this.successSchema, + errorSchema: this.errorSchema, + annotations: Context.add(this.annotations, tag, value), + idempotencyKey: this.idempotencyKey, + suspendedRetrySchedule: this.suspendedRetrySchedule + }) + }, + annotateMerge(this: AnyWithProps, context: Context.Context) { + return makeProto({ + _tag: this._tag, + payloadSchema: this.payloadSchema, + successSchema: this.successSchema, + errorSchema: this.errorSchema, + annotations: Context.merge(this.annotations, context), + idempotencyKey: this.idempotencyKey, + suspendedRetrySchedule: this.suspendedRetrySchedule + }) + }, + execute( + this: AnyWithProps, + fields: any, + opts?: { readonly discard?: Discard } | undefined + ) { + return Effect.suspend(() => { + const payload = this.payloadSchema.make(fields) + return Effect.flatMap( + EngineTag, + (engine) => + Effect.flatMap(makeExecutionIdFromPayload(this, payload), (executionId) => + Effect.andThen( + Effect.annotateCurrentSpan({ executionId }), + engine.execute(this as any, { + executionId, + payload, + discard: opts?.discard, + suspendedRetrySchedule: this.suspendedRetrySchedule + }) + )) + ) + }).pipe( + Effect.withSpan( + `${this._tag}.execute`, + {}, + { captureStackTrace: false } + ) + ) as any + }, + poll(this: Workflow, executionId: string) { + return Effect.flatMap(EngineTag, (engine) => engine.poll(this, executionId)).pipe( + Effect.withSpan(`${this._tag}.poll`, { attributes: { executionId } }, { captureStackTrace: false }) + ) + }, + interrupt(this: AnyWithProps, executionId: string) { + return Effect.flatMap(EngineTag, (engine) => engine.interrupt(this, executionId)).pipe( + Effect.withSpan(`${this._tag}.interrupt`, { attributes: { executionId } }, { captureStackTrace: false }) + ) + }, + resume(this: Workflow, executionId: string) { + return Effect.flatMap(EngineTag, (engine) => engine.resume(this, executionId)).pipe( + Effect.withSpan(`${this._tag}.resume`, { attributes: { executionId } }, { captureStackTrace: false }) + ) + }, + toLayer(this: Workflow, execute: any) { + return Layer.effectDiscard( + Effect.flatMap(EngineTag, (engine) => engine.register(this, execute)) + ) + }, + executionId(this: AnyWithProps, payload: any) { + return Effect.flatMap( + Effect.orDie(this.payloadSchema.makeEffect(payload)), + (payload) => makeExecutionIdFromPayload(this, payload) + ) + }, + withCompensation: ((...args: ReadonlyArray) => (withCompensation as any)(...args)) +} + +const makeProto = < + const Tag extends string, + Payload extends AnyStructSchema, + Success extends Schema.Top, + Error extends Schema.Top +>(options: { + readonly _tag: Tag + readonly payloadSchema: Payload + readonly successSchema: Success + readonly errorSchema: Error + readonly annotations: Context.Context + readonly idempotencyKey: (payload: Payload["Type"]) => string + readonly suspendedRetrySchedule?: Schedule.Schedule | undefined +}): Workflow => { + function Workflow() {} + Object.setPrototypeOf(Workflow, Proto) + Object.assign(Workflow, options) + return Workflow as any +} + /** * Creates a durable workflow definition with schemas, annotations, and - * deterministic execution IDs derived from the workflow name and idempotency + * deterministic execution IDs derived from the workflow tag and idempotency * key. * * @category constructors * @since 4.0.0 */ export const make = < - const Name extends string, + const Tag extends string, Payload extends Schema.Struct.Fields | AnyStructSchema, Success extends Schema.Top = Schema.Void, Error extends Schema.Top = Schema.Never ->(options: { - readonly name: Name +>(tag: Tag, options: { readonly payload: Payload readonly idempotencyKey: ( payload: Payload extends Schema.Struct.Fields ? Schema.Struct.Type @@ -342,105 +454,23 @@ export const make = < readonly suspendedRetrySchedule?: Schedule.Schedule | undefined readonly annotations?: Context.Context }): Workflow< - Name, + Tag, Payload extends Schema.Struct.Fields ? Schema.Struct : Payload, Success, Error -> => { - const makeExecutionId = (payload: any) => makeHashDigest(`${options.name}-${options.idempotencyKey(payload)}`) - const self: Workflow = { - [TypeId]: TypeId, - name: options.name, - payloadSchema: Schema.isSchema(options.payload) +> => + makeProto : Payload, Success, Error>({ + _tag: tag, + payloadSchema: (Schema.isSchema(options.payload) ? options.payload - : Schema.Struct(options.payload as any), + : Schema.Struct(options.payload as any)) as Payload extends Schema.Struct.Fields ? Schema.Struct + : Payload, successSchema: options.success ?? (Schema.Void as any), errorSchema: options.error ?? (Schema.Never as any), annotations: options.annotations ?? Context.empty(), - annotate(tag, value) { - return make({ - ...options, - annotations: Context.add(self.annotations, tag, value) - }) - }, - annotateMerge(context) { - return make({ - ...options, - annotations: Context.merge(self.annotations, context) - }) - }, - execute: Effect.fnUntraced( - function*( - fields: any, - opts?: { readonly discard?: Discard } | undefined - ) { - const payload = self.payloadSchema.make(fields) - const engine = yield* EngineTag - const executionId = yield* makeExecutionId(payload) - yield* Effect.annotateCurrentSpan({ executionId }) - return yield* engine.execute(self, { - executionId, - payload, - discard: opts?.discard, - suspendedRetrySchedule: options.suspendedRetrySchedule - }) - }, - Effect.withSpan( - `${options.name}.execute`, - {}, - { captureStackTrace: false } - ) - ) as any, - poll: Effect.fnUntraced( - function*(executionId: string) { - const engine = yield* EngineTag - return yield* engine.poll(self, executionId) - }, - (effect, executionId) => - Effect.withSpan(effect, `${options.name}.poll`, { - captureStackTrace: false, - attributes: { executionId } - }) - ), - interrupt: Effect.fnUntraced( - function*(executionId: string) { - const engine = yield* EngineTag - yield* engine.interrupt(self, executionId) - }, - (effect, executionId) => - Effect.withSpan(effect, `${options.name}.interrupt`, { - captureStackTrace: false, - attributes: { executionId } - }) - ), - resume: Effect.fnUntraced( - function*(executionId: string) { - const engine = yield* EngineTag - yield* engine.resume(self, executionId) - }, - (effect, executionId) => - Effect.withSpan(effect, `${options.name}.resume`, { - captureStackTrace: false, - attributes: { executionId } - }) - ), - toLayer: (execute) => - Layer.effectDiscard( - Effect.gen(function*() { - const engine = yield* EngineTag - return yield* engine.register(self, execute) - }) - ), - executionId: (payload) => - Effect.flatMap( - Effect.orDie(self.payloadSchema.makeEffect(payload)), - makeExecutionId - ), - withCompensation - } - - return self -} + idempotencyKey: options.idempotencyKey as any, + suspendedRetrySchedule: options.suspendedRetrySchedule + }) const ResultTypeId = "~effect/workflow/Workflow/Result" diff --git a/packages/effect/src/unstable/workflow/WorkflowEngine.ts b/packages/effect/src/unstable/workflow/WorkflowEngine.ts index 52b98ba717..05fa413213 100644 --- a/packages/effect/src/unstable/workflow/WorkflowEngine.ts +++ b/packages/effect/src/unstable/workflow/WorkflowEngine.ts @@ -461,7 +461,7 @@ export const makeUnsafe = (options: Encoded): WorkflowEngine["Service"] => ).pipe( Effect.catch(() => Effect.die( - `${self.name}.execute: suspendedRetrySchedule exhausted` + `${self._tag}.execute: suspendedRetrySchedule exhausted` ) ) ) @@ -616,7 +616,7 @@ export const layerMemory: Layer.Layer = Layer.effect(WorkflowEng return } - const entry = workflows.get(state.instance.workflow.name)! + const entry = workflows.get(state.instance.workflow._tag)! const instance = WorkflowInstance.initial(state.instance.workflow, state.instance.executionId) instance.interrupted = state.instance.interrupted state.instance = instance @@ -647,16 +647,16 @@ export const layerMemory: Layer.Layer = Layer.effect(WorkflowEng const engine = makeUnsafe({ register: Effect.fnUntraced(function*(workflow, execute) { - workflows.set(workflow.name, { + workflows.set(workflow._tag, { workflow, execute, scope: yield* Effect.scope }) }), execute: Effect.fnUntraced(function*(workflow, options) { - const entry = workflows.get(workflow.name) + const entry = workflows.get(workflow._tag) if (!entry) { - return yield* Effect.orDie(Effect.fail(`Workflow ${workflow.name} is not registered`)) + return yield* Effect.orDie(Effect.fail(`Workflow ${workflow._tag} is not registered`)) } let state = executions.get(options.executionId) @@ -745,7 +745,7 @@ export const layerMemory: Layer.Layer = Layer.effect(WorkflowEng }), scheduleClock: (workflow, options) => engine.deferredDone(options.clock.deferred, { - workflowName: workflow.name, + workflowName: workflow._tag, executionId: options.executionId, deferredName: options.clock.deferred.name, exit: Exit.void diff --git a/packages/effect/src/unstable/workflow/WorkflowProxy.ts b/packages/effect/src/unstable/workflow/WorkflowProxy.ts index 641d91478f..786f129225 100644 --- a/packages/effect/src/unstable/workflow/WorkflowProxy.ts +++ b/packages/effect/src/unstable/workflow/WorkflowProxy.ts @@ -43,8 +43,7 @@ import type * as Workflow from "./Workflow.ts" * import { RpcServer } from "effect/unstable/rpc" * import { Workflow, WorkflowProxy, WorkflowProxyServer } from "effect/unstable/workflow" * - * const EmailWorkflow = Workflow.make({ - * name: "EmailWorkflow", + * const EmailWorkflow = Workflow.make("EmailWorkflow", { * payload: { * id: Schema.String, * to: Schema.String @@ -82,15 +81,15 @@ export const toRpcGroup = < for (const workflow_ of workflows) { const workflow = workflow_ as Workflow.AnyWithProps rpcs.push( - Rpc.make(`${prefix}${workflow.name}`, { + Rpc.make(`${prefix}${workflow._tag}`, { payload: workflow.payloadSchema, error: workflow.errorSchema, success: workflow.successSchema }).annotateMerge(workflow.annotations), - Rpc.make(`${prefix}${workflow.name}Discard`, { + Rpc.make(`${prefix}${workflow._tag}Discard`, { payload: workflow.payloadSchema }).annotateMerge(workflow.annotations), - Rpc.make(`${prefix}${workflow.name}Resume`, { payload: ResumePayload }) + Rpc.make(`${prefix}${workflow._tag}Resume`, { payload: ResumePayload }) .annotateMerge(workflow.annotations) ) } @@ -125,8 +124,7 @@ export type ConvertRpcs = * import { HttpApi, HttpApiBuilder } from "effect/unstable/httpapi" * import { Workflow, WorkflowProxy, WorkflowProxyServer } from "effect/unstable/workflow" * - * const EmailWorkflow = Workflow.make({ - * name: "EmailWorkflow", + * const EmailWorkflow = Workflow.make("EmailWorkflow", { * payload: { * id: Schema.String, * to: Schema.String @@ -161,17 +159,17 @@ export const toHttpApiGroup = workflow.execute(payload).pipe( Effect.tapDefect(Effect.logError), Effect.annotateLogs({ module: "WorkflowProxyServer", - method: workflow.name + method: workflow._tag }) ) ) .handle( - workflow.name + "Discard" as any, + workflow._tag + "Discard" as any, ({ payload }: { payload: any }) => workflow.execute(payload, { discard: true } as any).pipe( Effect.tapDefect(Effect.logError), Effect.annotateLogs({ module: "WorkflowProxyServer", - method: workflow.name + "Discard" + method: workflow._tag + "Discard" }) ) ) .handle( - workflow.name + "Resume" as any, + workflow._tag + "Resume" as any, ({ payload }: { payload: any }) => workflow.resume(payload.executionId).pipe( Effect.tapDefect(Effect.logError), Effect.annotateLogs({ module: "WorkflowProxyServer", - method: workflow.name + "Resume" + method: workflow._tag + "Resume" }) ) ) @@ -127,7 +127,7 @@ export const layerRpcHandlers = < const handlers = new Map>() for (const workflow_ of workflows) { const workflow = workflow_ as Workflow.AnyWithProps - const tag = `${prefix}${workflow.name}` + const tag = `${prefix}${workflow._tag}` const tagDiscard = `${tag}Discard` const tagResume = `${tag}Resume` const key = `effect/rpc/Rpc/${tag}` diff --git a/packages/effect/test/cluster/ClusterWorkflowEngine.test.ts b/packages/effect/test/cluster/ClusterWorkflowEngine.test.ts index 38ae91ecec..7b86ea8b44 100644 --- a/packages/effect/test/cluster/ClusterWorkflowEngine.test.ts +++ b/packages/effect/test/cluster/ClusterWorkflowEngine.test.ts @@ -333,8 +333,7 @@ class SendEmailError extends Schema.ErrorClass("SendEmailError") message: Schema.String }) {} -const EmailWorkflow = Workflow.make({ - name: "EmailWorkflow", +const EmailWorkflow = Workflow.make("EmailWorkflow", { payload: { to: Schema.String, id: Schema.String @@ -418,8 +417,7 @@ const EmailTrigger = DurableDeferred.make("EmailTrigger", { success: Schema.String }) -const RaceWorkflow = Workflow.make({ - name: "RaceWorkflow", +const RaceWorkflow = Workflow.make("RaceWorkflow", { payload: { id: Schema.String }, @@ -467,8 +465,7 @@ const RaceWorkflowLayer = RaceWorkflow.toLayer(Effect.fnUntraced(function*() { ]) })) -const DurableRaceWorkflow = Workflow.make({ - name: "DurableRaceWorkflow", +const DurableRaceWorkflow = Workflow.make("DurableRaceWorkflow", { payload: { id: Schema.String }, @@ -525,8 +522,7 @@ const DurableRaceWorkflowLayer = DurableRaceWorkflow.toLayer(Effect.fnUntraced(f ]) })) -const ParentWorkflow = Workflow.make({ - name: "ParentWorkflow", +const ParentWorkflow = Workflow.make("ParentWorkflow", { payload: { id: Schema.String }, @@ -535,8 +531,7 @@ const ParentWorkflow = Workflow.make({ } }) -const ChildWorkflow = Workflow.make({ - name: "ChildWorkflow", +const ChildWorkflow = Workflow.make("ChildWorkflow", { payload: { id: Schema.String }, @@ -565,8 +560,7 @@ const ChildWorkflowLayer = ChildWorkflow.toLayer(Effect.fnUntraced(function*() { flags.set("child-end", true) })) -const ShardedClockWorkflow = Workflow.make({ - name: "ShardedClockWorkflow", +const ShardedClockWorkflow = Workflow.make("ShardedClockWorkflow", { payload: { id: Schema.String }, @@ -585,8 +579,7 @@ const ShardedClockWorkflowLayer = ShardedClockWorkflow.toLayer(Effect.fnUntraced const ShardedDeferred = DurableDeferred.make("ShardedDeferred") -const ShardedDeferredWorkflow = Workflow.make({ - name: "ShardedDeferredWorkflow", +const ShardedDeferredWorkflow = Workflow.make("ShardedDeferredWorkflow", { payload: { id: Schema.String }, @@ -595,8 +588,7 @@ const ShardedDeferredWorkflow = Workflow.make({ } }).annotate(ClusterSchema.ShardGroup, () => "workflow") -const SuspendOnFailureWorkflow = Workflow.make({ - name: "SuspendOnFailureWorkflow", +const SuspendOnFailureWorkflow = Workflow.make("SuspendOnFailureWorkflow", { payload: { id: Schema.String }, @@ -620,8 +612,7 @@ const SuspendOnFailureWorkflowLayer = SuspendOnFailureWorkflow.toLayer(Effect.fn }) })) -const CatchWorkflow = Workflow.make({ - name: "CatchWorkflow", +const CatchWorkflow = Workflow.make("CatchWorkflow", { payload: { id: Schema.String }, diff --git a/packages/effect/test/unstable/workflow/DurableQueue.test.ts b/packages/effect/test/unstable/workflow/DurableQueue.test.ts index 76e80c4bb4..2a0b4603f9 100644 --- a/packages/effect/test/unstable/workflow/DurableQueue.test.ts +++ b/packages/effect/test/unstable/workflow/DurableQueue.test.ts @@ -33,8 +33,7 @@ describe("DurableQueue", () => { idempotencyKey: ({ id }) => id }) - const SuccessWorkflow = Workflow.make({ - name: "DurableQueueTest/SuccessWorkflow", + const SuccessWorkflow = Workflow.make("DurableQueueTest/SuccessWorkflow", { payload: { id: Schema.String, value: Schema.Number @@ -74,8 +73,7 @@ describe("DurableQueue", () => { idempotencyKey: ({ id }) => id }) - const FailureWorkflow = Workflow.make({ - name: "DurableQueueTest/FailureWorkflow", + const FailureWorkflow = Workflow.make("DurableQueueTest/FailureWorkflow", { payload: { id: Schema.String }, diff --git a/packages/effect/test/unstable/workflow/WorkflowEngine.test.ts b/packages/effect/test/unstable/workflow/WorkflowEngine.test.ts index 1eb209d482..fe14b492cb 100644 --- a/packages/effect/test/unstable/workflow/WorkflowEngine.test.ts +++ b/packages/effect/test/unstable/workflow/WorkflowEngine.test.ts @@ -3,8 +3,7 @@ import { Effect, Exit, Layer, Option, Schema } from "effect" import { Workflow, WorkflowEngine } from "effect/unstable/workflow" describe("WorkflowEngine", () => { - const IncrementWorkflow = Workflow.make({ - name: "WorkflowEngine/IncrementWorkflow", + const IncrementWorkflow = Workflow.make("WorkflowEngine/IncrementWorkflow", { payload: { value: Schema.Number }, success: Schema.Number, idempotencyKey: ({ value }) => String(value) @@ -12,6 +11,14 @@ describe("WorkflowEngine", () => { const IncrementWorkflowLayer = IncrementWorkflow.toLayer(({ value }) => Effect.succeed(value + 1)) + class ClassWorkflow extends Workflow.make("WorkflowEngine/ClassWorkflow", { + payload: { value: Schema.Number }, + success: Schema.Number, + idempotencyKey: ({ value }) => String(value) + }) {} + + const ClassWorkflowLayer = ClassWorkflow.toLayer(({ value }) => Effect.succeed(value + 1)) + it.effect("layer executes and polls workflows", () => Effect.gen(function*() { const executionId = yield* IncrementWorkflow.execute({ value: 1 }, { discard: true }) @@ -38,4 +45,16 @@ describe("WorkflowEngine", () => { Layer.provideMerge(WorkflowEngine.layerMemory) )) )) + + it.effect("supports class extension", () => + Effect.gen(function*() { + const result = yield* ClassWorkflow.execute({ value: 1 }) + + assert.strictEqual(ClassWorkflow._tag, "WorkflowEngine/ClassWorkflow") + assert.strictEqual(result, 2) + }).pipe( + Effect.provide(ClassWorkflowLayer.pipe( + Layer.provideMerge(WorkflowEngine.layerMemory) + )) + )) }) diff --git a/packages/effect/typetest/Effect.tst.ts b/packages/effect/typetest/Effect.tst.ts index 64bda0bc50..915bf151ae 100644 --- a/packages/effect/typetest/Effect.tst.ts +++ b/packages/effect/typetest/Effect.tst.ts @@ -1,4 +1,4 @@ -/** @effect-diagnostics floatingEffect:skip-file */ +/** @effect-diagnostics floatingEffect:skip-file missingEffectError:skip-file */ import { type Cause, type Channel, diff --git a/packages/effect/typetest/unstable/workflow/Workflow.tst.ts b/packages/effect/typetest/unstable/workflow/Workflow.tst.ts new file mode 100644 index 0000000000..dbfafea728 --- /dev/null +++ b/packages/effect/typetest/unstable/workflow/Workflow.tst.ts @@ -0,0 +1,16 @@ +import { Schema } from "effect" +import * as Workflow from "effect/unstable/workflow/Workflow" +import { describe, expect, it } from "tstyche" + +describe("Workflow", () => { + it("supports class extension", () => { + class ClassWorkflow extends Workflow.make("ClassWorkflow", { + payload: { value: Schema.Number }, + success: Schema.Number, + idempotencyKey: ({ value }) => String(value) + }) {} + + expect().type.toBeAssignableTo() + expect(ClassWorkflow.payloadSchema).type.toBe>() + }) +})