From b19c643cc0d78a06aec68078e631331e94705a71 Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Mon, 17 Jul 2023 18:05:37 -0700 Subject: [PATCH] Show a way to implement cancellable updates --- go.mod | 3 ++ update-cancel/README.md | 14 ++++++ update-cancel/interceptor.go | 66 +++++++++++++++++++++++++++++ update-cancel/starter/main.go | 80 +++++++++++++++++++++++++++++++++++ update-cancel/update.go | 29 +++++++++++++ update-cancel/worker/main.go | 30 +++++++++++++ 6 files changed, 222 insertions(+) create mode 100644 update-cancel/README.md create mode 100644 update-cancel/interceptor.go create mode 100644 update-cancel/starter/main.go create mode 100644 update-cancel/update.go create mode 100644 update-cancel/worker/main.go diff --git a/go.mod b/go.mod index e1d9aa82..92a3d118 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,9 @@ go 1.16 replace github.com/cactus/go-statsd-client => github.com/cactus/go-statsd-client v3.2.1+incompatible +replace ( + go.temporal.io/sdk v1.23.1 => ../sdk-go +) require ( github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect github.com/golang/mock v1.6.0 diff --git a/update-cancel/README.md b/update-cancel/README.md new file mode 100644 index 00000000..7e1adf2d --- /dev/null +++ b/update-cancel/README.md @@ -0,0 +1,14 @@ +### Update Cancel Sample + +Here we show an example of a workflow with a long running update. Through the use of an interceptor we are able to cancel the update by sending another special "cancel" update. + +### Steps to run this sample: +1) Run a [Temporal service](https://github.com/temporalio/samples-go/tree/main/#how-to-use). +2) Run the following command to start the worker +``` +go run update-cancel/worker/main.go +``` +3) Run the following command to start the example +``` +go run update-cancel/starter/main.go +``` diff --git a/update-cancel/interceptor.go b/update-cancel/interceptor.go new file mode 100644 index 00000000..26b1dde1 --- /dev/null +++ b/update-cancel/interceptor.go @@ -0,0 +1,66 @@ +package update_cancel + +import ( + "errors" + + "go.temporal.io/sdk/interceptor" + "go.temporal.io/sdk/workflow" +) + +const ( + UpdateCancelHandle = "update-cancel" +) + +type workerInterceptor struct { + interceptor.WorkerInterceptorBase +} + +func NewWorkerInterceptor() interceptor.WorkerInterceptor { + return &workerInterceptor{} +} + +func (w *workerInterceptor) InterceptWorkflow( + ctx workflow.Context, + next interceptor.WorkflowInboundInterceptor, +) interceptor.WorkflowInboundInterceptor { + i := &workflowInboundInterceptor{root: w} + i.Next = next + return i +} + +type workflowInboundInterceptor struct { + ctxMap map[string]workflow.CancelFunc + interceptor.WorkflowInboundInterceptorBase + root *workerInterceptor +} + +func (w *workflowInboundInterceptor) Init(outbound interceptor.WorkflowOutboundInterceptor) error { + w.ctxMap = make(map[string]workflow.CancelFunc) + return w.Next.Init(outbound) +} + +func (w *workflowInboundInterceptor) ExecuteWorkflow(ctx workflow.Context, in *interceptor.ExecuteWorkflowInput) (interface{}, error) { + err := workflow.SetUpdateHandlerWithOptions(ctx, UpdateCancelHandle, func(ctx workflow.Context, updateID string) error { + // Cancel the update + w.ctxMap[updateID]() + return nil + }, workflow.UpdateHandlerOptions{ + Validator: func(ctx workflow.Context, updateID string) error { + // Validate that the update ID is known + if _, ok := w.ctxMap[updateID]; !ok { + return errors.New("unknown update ID") + } + return nil + }, + }) + if err != nil { + return nil, err + } + return w.Next.ExecuteWorkflow(ctx, in) +} + +func (w *workflowInboundInterceptor) ExecuteUpdate(ctx workflow.Context, in *interceptor.UpdateInput) (interface{}, error) { + ctx, cancel := workflow.WithCancel(ctx) + w.ctxMap[workflow.GetUpdateInfo(ctx).ID] = cancel + return w.Next.ExecuteUpdate(ctx, in) +} diff --git a/update-cancel/starter/main.go b/update-cancel/starter/main.go new file mode 100644 index 00000000..06e93b27 --- /dev/null +++ b/update-cancel/starter/main.go @@ -0,0 +1,80 @@ +package main + +import ( + "context" + "log" + "time" + + update_cancel "github.com/temporalio/samples-go/update-cancel" + enumspb "go.temporal.io/api/enums/v1" + updatepb "go.temporal.io/api/update/v1" + "go.temporal.io/sdk/client" +) + +func main() { + c, err := client.Dial(client.Options{}) + if err != nil { + log.Fatalln("Unable to create client", err) + } + defer c.Close() + + workflowOptions := client.StartWorkflowOptions{ + ID: "update_cancel-workflow-ID", + TaskQueue: "update_cancel", + } + + we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, update_cancel.UpdateWorkflow) + if err != nil { + log.Fatalln("Unable to execute workflow", err) + } + + log.Println("Started workflow", "WorkflowID", we.GetID(), "RunID", we.GetRunID()) + + cancellableUpdateID := "cancellable-update-ID" + log.Println("Sending update", "UpdateID", cancellableUpdateID) + + // Send an async update request. + handle, err := c.UpdateWorkflowWithOptions(context.Background(), &client.UpdateWorkflowWithOptionsRequest{ + WorkflowID: we.GetID(), + RunID: we.GetRunID(), + UpdateName: update_cancel.UpdateHandle, + UpdateID: cancellableUpdateID, + WaitPolicy: &updatepb.WaitPolicy{ + LifecycleStage: enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED, + }, + Args: []interface{}{ + 4 * time.Hour, + }, + }) + if err != nil { + log.Fatalln("Unable to execute update", err) + } + log.Println("Sent update") + + log.Println("Waiting 5s to send cancel") + time.Sleep(5 * time.Second) + log.Println("Sending cancel to update", "UpdateID", cancellableUpdateID) + + _, err = c.UpdateWorkflow(context.Background(), we.GetID(), we.GetRunID(), update_cancel.UpdateCancelHandle, cancellableUpdateID) + if err != nil { + log.Fatalln("Unable to send cancel", err) + } + log.Println("Sent cancel") + + var sleepTime time.Duration + err = handle.Get(context.Background(), &sleepTime) + if err != nil { + log.Fatalln("Unable to get update result", err) + } + // Update will only sleep for 5s because it was cancelled. + log.Println("Update slept for:", sleepTime) + + if err = c.SignalWorkflow(context.Background(), we.GetID(), we.GetRunID(), update_cancel.Done, nil); err != nil { + log.Fatalf("failed to send %q signal to workflow: %v", update_cancel.Done, err) + } + var wfresult int + if err = we.Get(context.Background(), &wfresult); err != nil { + log.Fatalf("unable get workflow result: %v", err) + } + log.Println("workflow result:", wfresult) +} diff --git a/update-cancel/update.go b/update-cancel/update.go new file mode 100644 index 00000000..33078b21 --- /dev/null +++ b/update-cancel/update.go @@ -0,0 +1,29 @@ +package update_cancel + +import ( + "time" + + "go.temporal.io/sdk/workflow" +) + +const ( + UpdateHandle = "update_handle" + Done = "done" +) + +func UpdateWorkflow(ctx workflow.Context) error { + if err := workflow.SetUpdateHandler( + ctx, + UpdateHandle, + func(ctx workflow.Context, sleepTime time.Duration) (time.Duration, error) { + dt := workflow.Now(ctx) + workflow.Sleep(ctx, sleepTime) + return workflow.Now(ctx).Sub(dt), nil + }, + ); err != nil { + return err + } + + _ = workflow.GetSignalChannel(ctx, Done).Receive(ctx, nil) + return ctx.Err() +} diff --git a/update-cancel/worker/main.go b/update-cancel/worker/main.go new file mode 100644 index 00000000..5aa47c13 --- /dev/null +++ b/update-cancel/worker/main.go @@ -0,0 +1,30 @@ +package main + +import ( + "log" + + update_cancel "github.com/temporalio/samples-go/update-cancel" + "go.temporal.io/sdk/client" + sdkinterceptor "go.temporal.io/sdk/interceptor" + "go.temporal.io/sdk/worker" +) + +func main() { + // The client and worker are heavyweight objects that should be created once per process. + c, err := client.Dial(client.Options{}) + if err != nil { + log.Fatalln("Unable to create client", err) + } + defer c.Close() + + w := worker.New(c, "update_cancel", worker.Options{ + Interceptors: []sdkinterceptor.WorkerInterceptor{update_cancel.NewWorkerInterceptor()}, + }) + + w.RegisterWorkflow(update_cancel.UpdateWorkflow) + + err = w.Run(worker.InterruptCh()) + if err != nil { + log.Fatalln("Unable to start worker", err) + } +}