From b8608a22a87f5b44dba503e0e6ffb3cf0e709186 Mon Sep 17 00:00:00 2001 From: Ozoniuss Date: Fri, 17 Oct 2025 18:19:04 +0300 Subject: [PATCH] Show how to use Temporal as a queue This example shows how you can use Temporal's event history and workflow input as a queue. It implements a batching mechanism where a workflow continuously listens for incoming signals, as well as a timer, and processes the batch when the timer fires, followed by restarting the timer. The workflow keeps track of the number of incoming events and decides to continue-as-new when there are too many events recorded in the event history, passing the accumulated events in the next workflow's input. --- batch-queue/README.md | 20 ++++ batch-queue/accumulate_and_batch_workflow.go | 117 +++++++++++++++++++ batch-queue/signal_new_values_workflow.go | 90 ++++++++++++++ batch-queue/starter/main.go | 39 +++++++ batch-queue/values_received.txt | 54 +++++++++ batch-queue/values_sent.txt | 64 ++++++++++ batch-queue/worker/main.go | 32 +++++ 7 files changed, 416 insertions(+) create mode 100644 batch-queue/README.md create mode 100644 batch-queue/accumulate_and_batch_workflow.go create mode 100644 batch-queue/signal_new_values_workflow.go create mode 100644 batch-queue/starter/main.go create mode 100644 batch-queue/values_received.txt create mode 100644 batch-queue/values_sent.txt create mode 100644 batch-queue/worker/main.go diff --git a/batch-queue/README.md b/batch-queue/README.md new file mode 100644 index 00000000..dc4b8849 --- /dev/null +++ b/batch-queue/README.md @@ -0,0 +1,20 @@ +This sample shows how to implement a workflow that simulates a queue, using the event history and workflow input. It continuously listens for incoming tasks via a channel, while also listening on a different channel to understand when to write a batch. The tasks are sent from a different workflow through Temporal signals and a repeating timer writes on the other channel to signal the workflow when to write a batch. + +The workflow also uses continue-as-new to avoid large history size caused by the timers and signals. This is done when a certain number of events (signal received, timer fired) have been recorded. + +The example also illustrates how to simulate a ticker using timers. + +### Steps to run this sample: +1) Run a [Temporal service](https://github.com/temporalio/samples-go/tree/main/#how-to-use). +2) Run the following command to start the worker +``` +go run forever-batch-operations/worker/main.go +``` +3) Run the following command to start the example +``` +go run forever-batch-operations/starter/main.go +``` + +Note that the workflows will continue running even after you stop the worker. + +Compare the values_received.txt and values_sent.txt files. Values should be written in the same order. diff --git a/batch-queue/accumulate_and_batch_workflow.go b/batch-queue/accumulate_and_batch_workflow.go new file mode 100644 index 00000000..f5189aaa --- /dev/null +++ b/batch-queue/accumulate_and_batch_workflow.go @@ -0,0 +1,117 @@ +package batch_queue + +import ( + "context" + "fmt" + "os" + "strings" + "time" + + "go.temporal.io/sdk/workflow" +) + +const ( + SIGNAL_READ_VALS = "read_vals" + SIGNAL_COMMIT_BATCH = "commit_batch" +) + +func GetAccumulateAndBatchWorkflowID() string { + return "AccumulateAndBatchWorkflowID" +} + +type WorkflowTicker struct { + d time.Duration + C workflow.Channel +} + +func NewWorkflowTicker(ctx workflow.Context, d time.Duration) *WorkflowTicker { + wt := WorkflowTicker{ + d: d, + C: workflow.NewChannel(ctx), + } + + workflow.Go(ctx, func(ctx workflow.Context) { + for { + t := workflow.NewTimer(ctx, d) + err := t.Get(ctx, nil) + if err != nil { + workflow.GetLogger(ctx).Error("timer failed, restarting...", "error", err) + continue + } + wt.C.Send(ctx, nil) + } + }) + + return &wt +} + +// AccumulateAndBatchWorkflow keeps accumulating data via signals. A timer +// decides when the accumulated data should be processed as a batch. +func AccumulateAndBatchWorkflow(ctx workflow.Context, vals []string) error { + + logger := workflow.GetLogger(ctx) + logger.Info("AccumulateAndBatchWorkflow workflow started", "vals", vals) + + // listen for incoming data + readValsCh := workflow.GetSignalChannel(ctx, SIGNAL_READ_VALS) + + // batch every 10 seconds + t := NewWorkflowTicker(ctx, 10*time.Second) + + eventsReceived := 0 + + selector := workflow.NewSelector(ctx) + selector.AddReceive(readValsCh, func(c workflow.ReceiveChannel, more bool) { + eventsReceived += 1 + var incoming string + c.Receive(ctx, &incoming) + vals = append(vals, incoming) + }) + selector.AddReceive(t.C, func(c workflow.ReceiveChannel, more bool) { + eventsReceived += 1 + c.Receive(ctx, nil) + logger.Info("commiting batch...") + + ao := workflow.ActivityOptions{ + StartToCloseTimeout: 1 * time.Minute, + } + actx := workflow.WithActivityOptions(ctx, ao) + err := workflow.ExecuteActivity(actx, WriteBatchToFile, vals).Get(ctx, nil) + if err != nil { + // Couldn't write the batch, so do not discard the data. Note that + // activity should be idempotent, so you may need to dedupe. This + // example doesn't dedupe. + logger.Error("failed to write batch", "error", err) + return + } + + // Discard the written data. + vals = []string{} + }) + + // Batch when you received enough events. Using a low number just to + // illustrate the batching, you can go much higher. + for { + selector.Select(ctx) + if eventsReceived > 15 { + // Pass in the existing vals since the signal history is no longer + // available. + return workflow.NewContinueAsNewError(ctx, AccumulateAndBatchWorkflow, vals) + } + } +} + +func WriteBatchToFile(ctx context.Context, vals []string) error { + // Write the values to this file. We can compare values here to values sent + // to the workflow to see the durability. + f, err := os.OpenFile("values_received.txt", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return fmt.Errorf("failed to open file: %w", err) + } + defer f.Close() + + f.WriteString(strings.Join(vals, "\n")) + f.WriteString("\n") + + return nil +} diff --git a/batch-queue/signal_new_values_workflow.go b/batch-queue/signal_new_values_workflow.go new file mode 100644 index 00000000..e564b0b7 --- /dev/null +++ b/batch-queue/signal_new_values_workflow.go @@ -0,0 +1,90 @@ +package batch_queue + +import ( + "context" + "fmt" + "math/rand/v2" + "os" + "strconv" + "time" + + "go.temporal.io/sdk/workflow" +) + +func GetSignalNewValuesWorkflowID() string { + return "SignalNewValuesWorkflowID" +} + +// SignalNewValuesWorkflow is a workflow that keeps signaling new values to +// a forever-running workflow periodically. After each value is signaled, this +// workflow also writes it to a file. This file and the file where the batching +// workflow writes all values can be compared to see the durability. +func SignalNewValuesWorkflow(ctx workflow.Context) error { + + logger := workflow.GetLogger(ctx) + logger.Info("SignalNewValuesWorkflow workflow started") + + var sendValue int + sendValueFuture := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} { + return rand.IntN(1_000) + }) + err := sendValueFuture.Get(&sendValue) + if err != nil { + return err + } + + // Note that to durably send, you can use activities. Here we don't handle + // that, just fail early. + for range 1000 { + + var sendValue int + sendValueFuture := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} { + return rand.IntN(1_000) + }) + err := sendValueFuture.Get(&sendValue) + if err != nil { + return err + } + + // The workflow we're signaling does continue-as-new, so do not pass a + // run ID here. + // + // We can also signal in an activity if we want retries. + err = workflow.SignalExternalWorkflow(ctx, GetAccumulateAndBatchWorkflowID(), "", SIGNAL_READ_VALS, strconv.Itoa(sendValue)).Get(ctx, nil) + if err != nil { + return err + } + + ao := workflow.ActivityOptions{ + StartToCloseTimeout: 1 * time.Minute, + } + actx := workflow.WithActivityOptions(ctx, ao) + err = workflow.ExecuteActivity(actx, WriteValToFile, sendValue).Get(ctx, nil) + if err != nil { + return err + } + + err = workflow.Sleep(ctx, 1*time.Second) + if err != nil { + return err + } + } + + // avoid big history size + return workflow.NewContinueAsNewError(ctx, SignalNewValuesWorkflow) +} + +func WriteValToFile(ctx context.Context, val int) error { + // Write the values to this file. We can compare values here to values sent + // to the workflow to see the durability. + f, err := os.OpenFile("values_sent.txt", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return fmt.Errorf("failed to open file: %w", err) + } + defer f.Close() + + f.WriteString(strconv.Itoa(val)) + f.WriteString("\n") + + return nil +} diff --git a/batch-queue/starter/main.go b/batch-queue/starter/main.go new file mode 100644 index 00000000..d7f1318f --- /dev/null +++ b/batch-queue/starter/main.go @@ -0,0 +1,39 @@ +package main + +import ( + "context" + "log" + + "go.temporal.io/sdk/client" + + batch_queue "github.com/temporalio/samples-go/batch-queue" +) + +func main() { + // The client is a heavyweight object that should be created once per process. + c, err := client.Dial(client.Options{}) + if err != nil { + log.Fatalln("Unable to create client", err) + } + defer c.Close() + + // start forever batching workflow + workflowOptions := client.StartWorkflowOptions{ + ID: batch_queue.GetAccumulateAndBatchWorkflowID(), + TaskQueue: "batch", + } + _, err = c.ExecuteWorkflow(context.Background(), workflowOptions, batch_queue.AccumulateAndBatchWorkflow, nil) + if err != nil { + log.Fatalln("Unable to execute workflow", err) + } + + // start signaling workflow + workflowOptions = client.StartWorkflowOptions{ + ID: batch_queue.GetSignalNewValuesWorkflowID(), + TaskQueue: "batch", + } + _, err = c.ExecuteWorkflow(context.Background(), workflowOptions, batch_queue.SignalNewValuesWorkflow) + if err != nil { + log.Fatalln("Unable to execute workflow", err) + } +} diff --git a/batch-queue/values_received.txt b/batch-queue/values_received.txt new file mode 100644 index 00000000..01492149 --- /dev/null +++ b/batch-queue/values_received.txt @@ -0,0 +1,54 @@ +557 +255 +884 +82 +385 +477 +121 +989 +673 +883 +448 +796 +823 +381 +451 +747 +157 +876 +620 +743 +490 +842 +554 +632 +493 +741 +198 +930 +630 +530 +451 +29 +383 +968 +421 +861 +24 +356 +511 +430 +774 +510 +671 +281 +212 +636 +627 +374 +371 +762 +147 +203 +408 +559 diff --git a/batch-queue/values_sent.txt b/batch-queue/values_sent.txt new file mode 100644 index 00000000..4f1df6f6 --- /dev/null +++ b/batch-queue/values_sent.txt @@ -0,0 +1,64 @@ +557 +255 +884 +82 +385 +477 +121 +989 +673 +883 +448 +796 +823 +381 +451 +747 +157 +876 +620 +743 +490 +842 +554 +632 +493 +741 +198 +930 +630 +530 +451 +29 +383 +968 +421 +861 +24 +356 +511 +430 +774 +510 +671 +281 +212 +636 +627 +374 +371 +762 +147 +203 +408 +559 +201 +541 +34 +22 +993 +0 +502 +709 +970 +837 diff --git a/batch-queue/worker/main.go b/batch-queue/worker/main.go new file mode 100644 index 00000000..652640d6 --- /dev/null +++ b/batch-queue/worker/main.go @@ -0,0 +1,32 @@ +package main + +import ( + "log" + + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/worker" + + batch_queue "github.com/temporalio/samples-go/batch-queue" +) + +func main() { + + // The client and worker are heavyweight objects that should be created once per process. + c, err := client.Dial(client.Options{}) + if err != nil { + log.Fatalln("Unable to create client", err) + } + defer c.Close() + + w := worker.New(c, "batch", worker.Options{}) + + w.RegisterWorkflow(batch_queue.AccumulateAndBatchWorkflow) + w.RegisterWorkflow(batch_queue.SignalNewValuesWorkflow) + w.RegisterActivity(batch_queue.WriteBatchToFile) + w.RegisterActivity(batch_queue.WriteValToFile) + + err = w.Run(worker.InterruptCh()) + if err != nil { + log.Fatalln("Unable to start worker", err) + } +}