diff --git a/splitmerge-channel/README.md b/splitmerge-channel/README.md new file mode 100644 index 00000000..bddf6ba6 --- /dev/null +++ b/splitmerge-channel/README.md @@ -0,0 +1,18 @@ +This sample workflow demonstrates how to execute multiple activities in parallel and merge their results using futures. +The futures are awaited using Selector. It allows processing them as soon as they become ready. See `split-merge-future` +sample to see how to process them without Selector in the order of activity invocation instead. + +### Steps to run this sample: + +1) Run a [Temporal service](https://github.com/temporalio/samples-go/tree/main/#how-to-use). +2) Run the following command to start the worker + +``` +go run splitmerge-selector/worker/main.go +``` + +3) Run the following command to start the example + +``` +go run splitmerge-selector/starter/main.go +``` diff --git a/splitmerge-channel/local b/splitmerge-channel/local new file mode 100644 index 00000000..1067d851 Binary files /dev/null and b/splitmerge-channel/local differ diff --git a/splitmerge-channel/splitmerge_workflow.go b/splitmerge-channel/splitmerge_workflow.go new file mode 100644 index 00000000..5d13ac25 --- /dev/null +++ b/splitmerge-channel/splitmerge_workflow.go @@ -0,0 +1,96 @@ +package splitmerge_channel + +import ( + "context" + "math/rand" + "time" + + "go.temporal.io/sdk/activity" + "go.temporal.io/sdk/workflow" +) + +/** + * This sample workflow demonstrates how to execute multiple activities in parallel and merge their results using futures. + * The futures are awaited using Selector. It allows processing them as soon as they become ready. See `split-merge-future` sample + * to see how to process them without Selector in the order of activity invocation instead. + */ + +// ChunkResult contains the activity result for this sample +type ChunkResult struct { + NumberOfItemsInChunk int + SumInChunk int +} + +type ActivityNResult struct { + ID int +} +type ActivityMResult struct { +} + +// SampleSplitMergeChannelWorkflow workflow definition +func SampleSplitMergeChannelWorkflow(ctx workflow.Context, n int) (result ChunkResult, err error) { + ao := workflow.ActivityOptions{ + StartToCloseTimeout: 10 * time.Second, + } + ctx = workflow.WithActivityOptions(ctx, ao) + selector := workflow.NewSelector(ctx) + channel := workflow.NewChannel(ctx) + wg := workflow.NewWaitGroup(ctx) + + workflow.Go(ctx, func(ctx workflow.Context) { + for { + var r ActivityNResult + channel.Receive(ctx, &r) + var res ActivityMResult + err1 := workflow.ExecuteActivity(ctx, ActivityM, r.ID).Get(ctx, &res) + if err1 != nil { + err = err1 + return + } + wg.Done() + } + }) + + for i := 0; i < n; i++ { + future := workflow.ExecuteActivity(ctx, ActivityN, i+1) + selector.AddFuture(future, func(f workflow.Future) { + var r ActivityNResult + err1 := f.Get(ctx, &r) + if err1 != nil { + err = err1 + return + } + + wg.Add(1) + workflow.Go(ctx, func(ctx workflow.Context) { + channel.Send(ctx, r) + }) + }) + } + + for i := 0; i < n; i++ { + selector.Select(ctx) + + workflow.GetLogger(ctx).Info("the value of n is ", "n", n, "length", channel.Len()) + } + wg.Wait(ctx) + + workflow.GetLogger(ctx).Info("Workflow completed.") + return ChunkResult{1, 1}, nil +} + +func ActivityN(ctx context.Context, ID int) (ActivityNResult, error) { + time.Sleep(time.Second * time.Duration(rand.Int31n(5))) + + activity.GetLogger(ctx).Info("Activity N processed", "chunkID", ID) + return ActivityNResult{ + ID: ID, + }, nil +} + +func ActivityM(ctx context.Context, ActivityNID int) (ActivityMResult, error) { + time.Sleep(time.Second * 3) + + activity.GetLogger(ctx).Info("Activity M processed", "ActivityNID", ActivityNID) + return ActivityMResult{}, nil +} diff --git a/splitmerge-channel/splitmerge_workflow_test.go b/splitmerge-channel/splitmerge_workflow_test.go new file mode 100644 index 00000000..6981fea5 --- /dev/null +++ b/splitmerge-channel/splitmerge_workflow_test.go @@ -0,0 +1,41 @@ +package splitmerge_channel + +import ( + "testing" + + "github.com/stretchr/testify/suite" + "go.temporal.io/sdk/testsuite" +) + +type UnitTestSuite struct { + suite.Suite + testsuite.WorkflowTestSuite +} + +func TestUnitTestSuite(t *testing.T) { + suite.Run(t, new(UnitTestSuite)) +} + +// +//func (s *UnitTestSuite) Test_Workflow() { +// env := s.NewTestWorkflowEnvironment() +// env.RegisterActivity(ChunkProcessingActivity) +// +// workerCount := 5 +// env.ExecuteWorkflow(SampleSplitMergeSelectorWorkflow, workerCount) +// +// s.True(env.IsWorkflowCompleted()) +// s.NoError(env.GetWorkflowError()) +// +// var result ChunkResult +// _ = env.GetWorkflowResult(&result) +// +// totalItem, totalSum := 0, 0 +// for i := 1; i <= workerCount; i++ { +// totalItem += i +// totalSum += i * i +// } +// +// s.Equal(totalItem, result.NumberOfItemsInChunk) +// s.Equal(totalSum, result.SumInChunk) +//} diff --git a/splitmerge-channel/starter/main.go b/splitmerge-channel/starter/main.go new file mode 100644 index 00000000..3b130e75 --- /dev/null +++ b/splitmerge-channel/starter/main.go @@ -0,0 +1,36 @@ +package main + +import ( + "context" + splitmerge_channel "github.com/temporalio/samples-go/splitmerge-channel" + "log" + + "github.com/pborman/uuid" + "go.temporal.io/sdk/client" +) + +func main() { + // The client is a heavyweight object that should be created once per process. + c, err := client.Dial(client.Options{ + HostPort: client.DefaultHostPort, + }) + if err != nil { + log.Fatalln("Unable to create client", err) + } + defer c.Close() + + workflowOptions := client.StartWorkflowOptions{ + ID: "split_merge_selector_" + uuid.New(), + TaskQueue: "split-merge-selector", + } + + we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, splitmerge_channel.SampleSplitMergeChannelWorkflow, 10) + if err != nil { + log.Fatalln("Unable to execute workflow", err) + } + + log.Println("Started workflow", "WorkflowID", we.GetID(), "RunID", we.GetRunID()) + _ = we.Get(context.Background(), nil) + log.Println(" Workflow Completed", "WorkflowID", we.GetID(), "RunID", we.GetRunID()) + +} diff --git a/splitmerge-channel/worker/main.go b/splitmerge-channel/worker/main.go new file mode 100644 index 00000000..15131096 --- /dev/null +++ b/splitmerge-channel/worker/main.go @@ -0,0 +1,31 @@ +package main + +import ( + splitmerge_channel "github.com/temporalio/samples-go/splitmerge-channel" + "log" + + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/worker" +) + +func main() { + // The client and worker are heavyweight objects that should be created once per process. + c, err := client.Dial(client.Options{ + HostPort: client.DefaultHostPort, + }) + if err != nil { + log.Fatalln("Unable to create client", err) + } + defer c.Close() + + w := worker.New(c, "split-merge-selector", worker.Options{}) + + w.RegisterWorkflow(splitmerge_channel.SampleSplitMergeChannelWorkflow) + w.RegisterActivity(splitmerge_channel.ActivityN) + w.RegisterActivity(splitmerge_channel.ActivityM) + + err = w.Run(worker.InterruptCh()) + if err != nil { + log.Fatalln("Unable to start worker", err) + } +}