Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/calm-seas-smile.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"effect": patch
---

Align workflow tags with RPCs by changing `Workflow.make` to accept the tag as its first argument, exposing workflow tags as `_tag`, and supporting `class MyWorkflow extends Workflow.make(...) {}`.
30 changes: 15 additions & 15 deletions packages/effect/src/unstable/cluster/ClusterWorkflowEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,11 @@ export const make = Effect.gen(function*() {
>
>()
const ensureEntity = (workflow: Workflow.Any) => {
let entity = entities.get(workflow.name)
let entity = entities.get(workflow._tag)
if (!entity) {
entity = makeWorkflowEntity(workflow) as any
workflows.set(workflow.name, workflow)
entities.set(workflow.name, entity as any)
workflows.set(workflow._tag, workflow)
entities.set(workflow._tag, entity as any)
}
return entity!
}
Expand Down Expand Up @@ -248,7 +248,7 @@ export const make = Effect.gen(function*() {
}) {
const requestId = yield* requestIdFor({
workflow: options.workflow,
entityType: `Workflow/${options.workflow.name}`,
entityType: `Workflow/${options.workflow._tag}`,
executionId: options.executionId,
tag: "activity",
id: activityPrimaryKey(options.activity.name, options.attempt)
Expand Down Expand Up @@ -283,7 +283,7 @@ export const make = Effect.gen(function*() {
const resume = Effect.fnUntraced(function*(workflow: Workflow.Any, executionId: string) {
const maybeReply = yield* requestReply({
workflow,
entityType: `Workflow/${workflow.name}`,
entityType: `Workflow/${workflow._tag}`,
executionId,
tag: "run",
id: ""
Expand Down Expand Up @@ -324,7 +324,7 @@ export const make = Effect.gen(function*() {
ensureEntity(workflow)
const requestId = yield* requestIdFor({
workflow,
entityType: `Workflow/${workflow.name}`,
entityType: `Workflow/${workflow._tag}`,
executionId,
tag: "run",
id: ""
Expand All @@ -343,7 +343,7 @@ export const make = Effect.gen(function*() {
}

yield* engine.deferredDone(InterruptSignal, {
workflowName: workflow.name,
workflowName: workflow._tag,
executionId,
deferredName: InterruptSignal.name,
exit: Exit.void
Expand Down Expand Up @@ -457,13 +457,13 @@ export const make = Effect.gen(function*() {

execute: (workflow, { discard, executionId, parent, payload }) => {
ensureEntity(workflow)
return RcMap.get(clients, workflow.name).pipe(
return RcMap.get(clients, workflow._tag).pipe(
Effect.flatMap((make) =>
make(executionId).run(
parent ?
{
...payload,
[payloadParentKey]: { workflowName: parent.workflow.name, executionId: parent.executionId }
[payloadParentKey]: { workflowName: parent.workflow._tag, executionId: parent.executionId }
} :
payload,
{ discard }
Expand All @@ -479,7 +479,7 @@ export const make = Effect.gen(function*() {
const exitSchema = Schema.toCodecJson(Rpc.exitSchema(entity.protocol.requests.get("run")!))
const reply = yield* requestReply({
workflow,
entityType: `Workflow/${workflow.name}`,
entityType: `Workflow/${workflow._tag}`,
executionId,
tag: "run",
id: ""
Expand All @@ -505,7 +505,7 @@ export const make = Effect.gen(function*() {
id: yield* sharding.getSnowflake,
address: entityAddressFor({
workflow,
entityType: `Workflow/${workflow.name}`,
entityType: `Workflow/${workflow._tag}`,
executionId
}),
requestId: requestId.value
Expand All @@ -523,7 +523,7 @@ export const make = Effect.gen(function*() {
const instance = Context.get(services, WorkflowEngine.WorkflowInstance)
yield* Effect.annotateCurrentSpan("executionId", instance.executionId)
const activityId = `${instance.executionId}/${activity.name}`
const client = (yield* RcMap.get(clientsPartial, instance.workflow.name))(instance.executionId)
const client = (yield* RcMap.get(clientsPartial, instance.workflow._tag))(instance.executionId)
while (true) {
if (!activities.has(activityId)) {
activities.set(activityId, { activity, context: services })
Expand Down Expand Up @@ -563,7 +563,7 @@ export const make = Effect.gen(function*() {
Effect.flatMap((instance) =>
requestReply({
workflow: instance.workflow,
entityType: `Workflow/${instance.workflow.name}`,
entityType: `Workflow/${instance.workflow._tag}`,
executionId: instance.executionId,
tag: "deferred",
id: deferred.name
Expand Down Expand Up @@ -628,7 +628,7 @@ export const make = Effect.gen(function*() {
}),
payload: {
name: options.clock.name,
workflowName: workflow.name,
workflowName: workflow._tag,
wakeUp: DateTime.addDuration(now, options.clock.duration)
}
})
Expand Down Expand Up @@ -701,7 +701,7 @@ const ResumeRpc = Rpc.make("resume", {
const payloadParentKey = "~effect/cluster/ClusterWorkflowEngine/payloadParentKey"

const makeWorkflowEntity = (workflow: Workflow.Any) =>
Entity.make(`Workflow/${workflow.name}`, [
Entity.make(`Workflow/${workflow._tag}`, [
Rpc.make("run", {
payload: {
...workflow.payloadSchema.fields,
Expand Down
4 changes: 2 additions & 2 deletions packages/effect/src/unstable/workflow/DurableDeferred.ts
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ export const into: {
}
}
yield* engine.deferredDone(self, {
workflowName: instance.workflow.name,
workflowName: instance.workflow._tag,
executionId: instance.executionId,
deferredName: self.name,
exit
Expand Down Expand Up @@ -455,7 +455,7 @@ export const tokenFromExecutionId: {
}
): Token =>
new TokenParsed({
workflowName: options.workflow.name,
workflowName: options.workflow._tag,
executionId: options.executionId,
deferredName: self.name
}).asToken
Expand Down
3 changes: 1 addition & 2 deletions packages/effect/src/unstable/workflow/DurableQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ export interface DurableQueue<
* }
* })
*
* const MyWorkflow = Workflow.make({
* name: "MyWorkflow",
* const MyWorkflow = Workflow.make("MyWorkflow", {
* payload: {
* id: Schema.String
* },
Expand Down
Loading
Loading