From 6e9680711ca4ca84e917ab8eb1abbee95ae5ef79 Mon Sep 17 00:00:00 2001 From: levi Date: Thu, 3 Apr 2025 11:14:06 +0800 Subject: [PATCH] feat: Implement resource competition and queue scheduling mechanisms - Added a flexible resource pool management system using Temporal. - Implemented dynamic adjustment of resource pool size. - Enabled cancellation of resource requests in the waiting queue. - Introduced real-time monitoring capabilities for resource pool status. - Enhanced workflow management with preemptive resource acquisition and queuing mechanisms. - Updated README to reflect new features and usage instructions. --- queue/README.md | 73 ++ queue/consume/main.go | 92 +++ queue/queue_workflow.go | 429 +++++++++++ queue/queue_workflow_test.go | 96 +++ queue/resource/main.go | 48 ++ queue/resource_pool.go | 1303 ++++++++++++++++++++++++++++++++++ queue/starter/main.go | 535 ++++++++++++++ 7 files changed, 2576 insertions(+) create mode 100644 queue/README.md create mode 100644 queue/consume/main.go create mode 100644 queue/queue_workflow.go create mode 100644 queue/queue_workflow_test.go create mode 100644 queue/resource/main.go create mode 100644 queue/resource_pool.go create mode 100644 queue/starter/main.go diff --git a/queue/README.md b/queue/README.md new file mode 100644 index 00000000..a450a1e9 --- /dev/null +++ b/queue/README.md @@ -0,0 +1,73 @@ +# Resource Competition and Queue Scheduling Example + +This example demonstrates how to implement resource competition and queue scheduling mechanisms using Temporal, providing a flexible resource pool management system that supports dynamic adjustment of resource pool size, cancellation of resource requests, and monitoring functions. + +## Overview + +This example implements a resource pool and queue system based on Temporal, featuring the following characteristics: + +1. **Parallel Execution**: Supports the parallel execution of multiple workflow instances, making full use of system resources. +2. **Shared Resource Pool**: Provides a limited number of shared resources for workflows to use. +3. **Preemptive Resource Acquisition**: Workflows acquire resources through a preemptive mechanism to ensure efficient utilization. +4. **Waiting Queue**: When resources are unavailable, workflows enter a waiting queue and continue execution after automatically acquiring resources. +5. **Dynamic Adjustment of Resource Pool**: Supports dynamic adjustment of the resource pool size (expansion or reduction) at runtime. +6. **Cancellation of Resource Requests**: Supports cancellation of resource requests in the waiting queue, terminating workflows that are no longer needed. +7. **Real-time Monitoring**: Provides real-time monitoring capabilities to track the status and changes of the resource pool. + +## Component Description + +- **ResourcePool**: The structure of the resource pool, providing resource acquisition, release, and management functions. +- **ResourcePoolWorkflow**: The workflow that manages resource allocation and queues. +- **SampleWorkflowWithResourcePool**: An example workflow that uses the resource pool, demonstrating resource acquisition and release. +- **UpdateResourcePool**: The functionality to dynamically adjust the size of the resource pool. +- **CancelResourceRequest**: Cancels resource requests in the waiting queue. +- **ResourcePoolInitializer**: An interface for customizing resource pool initialization and scaling behavior. + +## Key Features + +- **Resource Allocation**: The resource pool workflow manages the allocation of limited resources to ensure efficient utilization. +- **Queuing Mechanism**: Workflows that have not acquired resources enter a waiting queue and continue execution after automatically acquiring resources. +- **Dynamic Scaling**: Supports adjusting the size of the resource pool at runtime to meet varying load demands. +- **Request Cancellation**: Supports terminating resource requests in the waiting queue to avoid unnecessary resource occupation. +- **Signal Communication**: Uses Temporal's signal mechanism for communication between workflows. +- **Persistence**: Even if the system crashes, the state of waiting workflows and resources can be restored. +- **Delayed Scaling Down**: Intelligently waits for resources to be released before completing scaling down, without affecting resources currently in use. + +# Usage Example + +### Start Workflow + +You can start the workflow with the following command: + +```bash +go run queue/starter/main.go -test=basic +``` + +### Query Resource Pool Status + +To query the status of the resource pool, you can use the following command: + +```bash +go run queue/starter/main.go -test=pool -poolid=resource-pool:{namespace}:{resourceID} +``` + +### Running Tests + +The project includes unit tests, and you can run the tests using the following command: + +```bash +go test ./... +``` + +## Dependencies + +- [Temporal Go SDK](https://github.com/temporalio/sdk-go) +- [Testify](https://github.com/stretchr/testify) + +## Contribution + +Contributions of any kind are welcome! Please submit issues or pull requests. + +## License + +This project is licensed under the MIT License. For more details, please see the LICENSE file. diff --git a/queue/consume/main.go b/queue/consume/main.go new file mode 100644 index 00000000..2a3173eb --- /dev/null +++ b/queue/consume/main.go @@ -0,0 +1,92 @@ +package main + +import ( + "context" + "log" + + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/worker" + "go.temporal.io/sdk/workflow" + + "github.com/temporalio/samples-go/queue" +) + +// helper workflow to cancel resource request +func cancelResourceRequest(ctx workflow.Context, resourcePoolWorkflowID string, targetWorkflowID string) error { + logger := workflow.GetLogger(ctx) + logger.Info("start cancel resource request workflow", + "resourcePoolWorkflowID", resourcePoolWorkflowID, + "targetWorkflowID", targetWorkflowID) + + // create resource pool workflow execution reference + execution := workflow.Execution{ + ID: resourcePoolWorkflowID, + } + + // send cancel command + err := queue.CancelResourceRequest(ctx, execution, targetWorkflowID) + if err != nil { + logger.Error("failed to cancel resource request", "Error", err) + return err + } + + logger.Info("successfully canceled resource request") + return nil +} + +// helper workflow to update resource pool size +func updateResourcePoolSize(ctx workflow.Context, resourcePoolWorkflowID string, newSize int) error { + logger := workflow.GetLogger(ctx) + logger.Info("start update resource pool size workflow", + "resourcePoolWorkflowID", resourcePoolWorkflowID, + "newSize", newSize) + + // create resource pool workflow execution reference + execution := workflow.Execution{ + ID: resourcePoolWorkflowID, + } + + // send update command + err := queue.UpdateResourcePool(ctx, execution, newSize) + if err != nil { + logger.Error("failed to update resource pool size", "Error", err) + return err + } + + logger.Info("successfully updated resource pool size") + return nil +} + +func main() { + // create temporal client + c, err := client.Dial(client.Options{ + HostPort: "localhost:7233", + }) + if err != nil { + log.Fatalln("can't create client", err) + } + defer c.Close() + // create sample workflow worker - also need to set context + sampleWorker := worker.New(c, "queue-sample", worker.Options{ + BackgroundActivityContext: context.WithValue(context.Background(), queue.ClientContextKey, c), + }) + + // register sample workflow + sampleWorker.RegisterWorkflow(queue.SampleWorkflowWithResourcePool) + // register resource pool management workflow + sampleWorker.RegisterWorkflow(cancelResourceRequest) + sampleWorker.RegisterWorkflow(updateResourcePoolSize) + + // start all workers + workerErr := make(chan error, 1) + + go func() { + workerErr <- sampleWorker.Run(worker.InterruptCh()) + }() + + // wait for any worker to fail + err = <-workerErr + if err != nil { + log.Fatalln("worker run failed", err) + } +} diff --git a/queue/queue_workflow.go b/queue/queue_workflow.go new file mode 100644 index 00000000..dd224cdb --- /dev/null +++ b/queue/queue_workflow.go @@ -0,0 +1,429 @@ +package queue + +import ( + "fmt" + "time" + + "go.temporal.io/sdk/workflow" +) + +// SampleWorkflowWithResourcePool 使用资源池的示例工作流 +func SampleWorkflowWithResourcePool( + ctx workflow.Context, + resourceID string, + processTime time.Duration, + cancelable bool, +) error { + currentWorkflowID := workflow.GetInfo(ctx).WorkflowExecution.ID + logger := workflow.GetLogger(ctx) + logger.Info("workflow start", + "workflowID", currentWorkflowID, + "resourceID", resourceID, + "cancelable", cancelable) + + // 创建资源池 + resourcePool := NewResourcePool(currentWorkflowID, "ResourcePoolDemo") + + // 尝试获取资源,如果指定了可取消,则在等待队列中可以被取消 + releaseFunc, resourceInfo, err := resourcePool.AcquireResource(ctx, resourceID, 30*time.Minute, cancelable) + if err != nil { + logger.Error("failed to acquire resource", "Error", err) + return err + } + + logger.Info("successfully acquired resource, start processing", + "resourceID", resourceInfo.GetID(), + "resourceIndex", resourceInfo.GetIndex(), + "acquiredTime", resourceInfo.GetAcquiredTime()) + + // 模拟处理过程 + logger.Info("start critical operation") + if processTime <= 0 { + processTime = 10 * time.Second // 默认处理时间 + } + _ = workflow.Sleep(ctx, processTime) + logger.Info("critical operation completed") + + // 释放资源 + err = releaseFunc() + if err != nil { + logger.Error("failed to release resource", "Error", err) + return err + } + + logger.Info("resource released, workflow completed") + return nil +} + +// CustomResourcePoolInitializer 自定义资源池初始化器示例 +type CustomResourcePoolInitializer struct { + Prefix string // 资源ID前缀 + ExtraProps map[string]interface{} // 额外属性 +} + +// Initialize 实现自定义资源池初始化 +func (ci *CustomResourcePoolInitializer) Initialize( + ctx workflow.Context, + resourceID string, + totalResources int, +) *ResourcePoolState { + logger := workflow.GetLogger(ctx) + logger.Info("using custom resource pool initializer", + "resourceID", resourceID, + "prefix", ci.Prefix, + "totalResources", totalResources) + + // 创建资源池状态 + state := &ResourcePoolState{ + ResourcePool: make([]InternalResource, totalResources), + AvailableCount: totalResources, + WaitingQueue: []ResourceRequest{}, + ActiveRequests: make(map[string]int), + } + + // 添加自定义属性到元数据 + baseMetadata := map[string]interface{}{ + "initTime": workflow.Now(ctx), + } + + // 合并额外属性 + for k, v := range ci.ExtraProps { + baseMetadata[k] = v + } + + // 初始化资源池 + for i := 0; i < totalResources; i++ { + // 为每个资源复制基础元数据 + metadata := make(map[string]interface{}) + for k, v := range baseMetadata { + metadata[k] = v + } + + // 添加资源特定属性 + metadata["poolIndex"] = i + + // 创建资源 + state.ResourcePool[i] = InternalResource{ + Info: ResourceInfo{ + ResourceID: fmt.Sprintf("%s:%s-%d", ci.Prefix, resourceID, i), + Metadata: metadata, + ResourceIndex: i, + }, + Available: true, + } + } + + return state +} + +// ExpandPool 自定义资源池扩展实现 +func (ci *CustomResourcePoolInitializer) ExpandPool( + ctx workflow.Context, + state *ResourcePoolState, + currentSize int, + newSize int, + resourceID string, +) int { + logger := workflow.GetLogger(ctx) + logger.Info("using custom expander to expand resource pool", + "from", currentSize, + "to", newSize) + + // 创建基础元数据 + baseMetadata := map[string]interface{}{ + "expandTime": workflow.Now(ctx), + } + + // 合并额外属性 + for k, v := range ci.ExtraProps { + baseMetadata[k] = v + } + + // 扩展资源池 + for i := currentSize; i < newSize; i++ { + // 为每个资源复制基础元数据 + metadata := make(map[string]interface{}) + for k, v := range baseMetadata { + metadata[k] = v + } + + // 添加资源特定属性 + metadata["poolIndex"] = i + metadata["expanded"] = true + + // 创建资源 + state.ResourcePool = append(state.ResourcePool, InternalResource{ + Info: ResourceInfo{ + ResourceID: fmt.Sprintf("%s:%s-%d", ci.Prefix, resourceID, i), + Metadata: metadata, + ResourceIndex: i, + }, + Available: true, + }) + } + + // 返回新增的资源数量 + return newSize - currentSize +} + +// ShrinkPool 自定义资源池缩减实现 +func (ci *CustomResourcePoolInitializer) ShrinkPool( + ctx workflow.Context, + state *ResourcePoolState, + currentSize int, + newSize int, +) int { + logger := workflow.GetLogger(ctx) + logger.Info("using custom shrinker to shrink resource pool", + "from", currentSize, + "to", newSize) + + // 优先缩减具有'expanded'标记的资源 + availableReduceCount := 0 + reducedIndices := make([]int, 0, currentSize-newSize) + + // 第一轮:寻找并处理标记为expanded的可用资源 + for i := currentSize - 1; i >= 0 && availableReduceCount < currentSize-newSize; i-- { + if state.ResourcePool[i].Available { + metadata := state.ResourcePool[i].Info.Metadata + if expanded, ok := metadata["expanded"].(bool); ok && expanded { + // 标记为不可用 + state.ResourcePool[i].Available = false + availableReduceCount++ + reducedIndices = append(reducedIndices, i) + } + } + } + + // 第二轮:如果还需要缩减更多资源,处理剩余的可用资源 + if availableReduceCount < currentSize-newSize { + for i := currentSize - 1; i >= 0 && availableReduceCount < currentSize-newSize; i-- { + // 检查此索引是否已经被处理过 + alreadyProcessed := false + for _, idx := range reducedIndices { + if idx == i { + alreadyProcessed = true + break + } + } + + if !alreadyProcessed && state.ResourcePool[i].Available { + // 标记为不可用 + state.ResourcePool[i].Available = false + availableReduceCount++ + } + } + } + + // 裁剪资源池大小 + if len(state.ActiveRequests) == 0 { + // 如果没有活动请求,可以直接缩减资源池 + state.ResourcePool = state.ResourcePool[:newSize] + logger.Info("resource pool physical size reduced", "new size", len(state.ResourcePool)) + } else { + logger.Info("resource pool has active requests, will reduce physical size after requests complete") + } + + // 返回减少的可用资源数 + return availableReduceCount +} + +// SampleWorkflowWithCustomResourcePool 使用自定义资源池的示例工作流 +func SampleWorkflowWithCustomResourcePool( + ctx workflow.Context, + resourceID string, + processTime time.Duration, + cancelable bool, +) error { + currentWorkflowID := workflow.GetInfo(ctx).WorkflowExecution.ID + logger := workflow.GetLogger(ctx) + logger.Info("custom resource pool workflow start", + "workflowID", currentWorkflowID, + "resourceID", resourceID, + "cancelable", cancelable) + + // 创建自定义初始化器 + customInitializer := &CustomResourcePoolInitializer{ + Prefix: "custom", + ExtraProps: map[string]interface{}{ + "creator": currentWorkflowID, + "purpose": "demonstration", + }, + } + + // 创建带自定义初始化器的资源池 + resourcePool := NewResourcePoolWithInitializer( + currentWorkflowID, + "CustomResourcePoolDemo", + customInitializer, + ) + + // 尝试获取资源 + releaseFunc, resourceInfo, err := resourcePool.AcquireResource(ctx, resourceID, 30*time.Minute, cancelable) + if err != nil { + logger.Error("failed to acquire resource", "Error", err) + return err + } + + logger.Info("successfully acquired custom resource", + "resourceID", resourceInfo.GetID(), + "resourceIndex", resourceInfo.GetIndex(), + "metadata", resourceInfo.GetMetadata(), + "acquiredTime", resourceInfo.GetAcquiredTime()) + + // 模拟处理过程 + logger.Info("start critical operation") + if processTime <= 0 { + processTime = 10 * time.Second // 默认处理时间 + } + _ = workflow.Sleep(ctx, processTime) + logger.Info("critical operation completed") + + // 释放资源 + err = releaseFunc() + if err != nil { + logger.Error("failed to release resource", "Error", err) + return err + } + + logger.Info("resource released, custom resource pool workflow completed") + return nil +} + +// SampleWorkflowWithResourcePoolResizing 演示资源池动态扩展缩减的示例工作流 +func SampleWorkflowWithResourcePoolResizing( + ctx workflow.Context, + resourceID string, + initialSize int, + expandSize int, + shrinkSize int, +) error { + currentWorkflowID := workflow.GetInfo(ctx).WorkflowExecution.ID + logger := workflow.GetLogger(ctx) + logger.Info("resource pool resizing demonstration workflow start", + "workflowID", currentWorkflowID, + "resourceID", resourceID, + "initialSize", initialSize, + "expandSize", expandSize, + "shrinkSize", shrinkSize) + + if initialSize <= 0 { + initialSize = 2 + } + if expandSize <= initialSize { + expandSize = initialSize + 2 + } + if shrinkSize >= initialSize || shrinkSize <= 0 { + shrinkSize = initialSize - 1 + } + + // 创建自定义初始化器 + customInitializer := &CustomResourcePoolInitializer{ + Prefix: "dynamic", + ExtraProps: map[string]interface{}{ + "creator": currentWorkflowID, + "purpose": "resize-demonstration", + }, + } + + // 创建带自定义初始化器的资源池 + resourcePool := NewResourcePoolWithInitializer( + currentWorkflowID, + "DynamicResourcePoolDemo", + customInitializer, + ) + + // 获取初始的两个资源用于演示 + logger.Info("start acquiring initial resources") + var resources []Resource + var releaseFuncs []ReleaseResourceFunc + + // 获取多个资源 + for i := 0; i < initialSize; i++ { + releaseFunc, resource, err := resourcePool.AcquireResource(ctx, resourceID, 30*time.Minute, false) + if err != nil { + logger.Error("failed to acquire resource", "Error", err) + return err + } + + resources = append(resources, resource) + releaseFuncs = append(releaseFuncs, releaseFunc) + + logger.Info("acquired initial resource", + "index", i, + "resourceID", resource.GetID(), + "metadata", resource.GetMetadata()) + } + + // 模拟处理一些时间 + _ = workflow.Sleep(ctx, 2*time.Second) + + // 扩展资源池 + logger.Info("start expanding resource pool", "new size", expandSize) + err := resourcePool.ResizePool(ctx, expandSize) + if err != nil { + logger.Error("failed to expand resource pool", "Error", err) + return err + } + + // 等待扩展生效 + _ = workflow.Sleep(ctx, 2*time.Second) + + // 获取新增的资源 + logger.Info("start acquiring expanded resources") + for i := 0; i < expandSize-initialSize; i++ { + releaseFunc, resource, err := resourcePool.AcquireResource(ctx, resourceID, 30*time.Minute, false) + if err != nil { + logger.Error("failed to acquire expanded resource", "Error", err) + return err + } + + resources = append(resources, resource) + releaseFuncs = append(releaseFuncs, releaseFunc) + + logger.Info("acquired expanded resource", + "index", initialSize+i, + "resourceID", resource.GetID(), + "metadata", resource.GetMetadata()) + } + + // 释放一些资源,为缩减做准备 + for i := 0; i < expandSize-shrinkSize; i++ { + logger.Info("release resource to prepare for shrink", + "index", i, + "resourceID", resources[i].GetID()) + + err := releaseFuncs[i]() + if err != nil { + logger.Error("failed to release resource", "Error", err) + } + } + + // 等待释放生效 + _ = workflow.Sleep(ctx, 2*time.Second) + + // 缩减资源池 + logger.Info("start shrinking resource pool", "new size", shrinkSize) + err = resourcePool.ResizePool(ctx, shrinkSize) + if err != nil { + logger.Error("failed to shrink resource pool", "Error", err) + } + + // 等待缩减生效 + _ = workflow.Sleep(ctx, 2*time.Second) + + // 释放剩余资源 + for i := expandSize - shrinkSize; i < len(releaseFuncs); i++ { + logger.Info("release remaining resources", + "index", i, + "resourceID", resources[i].GetID()) + + err := releaseFuncs[i]() + if err != nil { + logger.Error("failed to release resource", "Error", err) + } + } + + logger.Info("resource pool resizing demonstration workflow completed") + return nil +} diff --git a/queue/queue_workflow_test.go b/queue/queue_workflow_test.go new file mode 100644 index 00000000..b299fc59 --- /dev/null +++ b/queue/queue_workflow_test.go @@ -0,0 +1,96 @@ +package queue + +import ( + "testing" + "time" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + "go.temporal.io/sdk/testsuite" + "go.temporal.io/sdk/workflow" +) + +type UnitTestSuite struct { + suite.Suite + testsuite.WorkflowTestSuite +} + +// 测试资源获取和释放流程 +func (s *UnitTestSuite) Test_ResourceAcquisitionAndRelease() { + // 设置测试环境 + env := s.NewTestWorkflowEnvironment() + + // 模拟SignalWithStartResourcePoolWorkflowActivity的行为 + execution := &workflow.Execution{ID: "mockResourcePool", RunID: "mockRunID"} + env.OnActivity(SignalWithStartResourcePoolWorkflowActivity, + mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(execution, nil) + + // 模拟发送资源获取信号 + env.RegisterDelayedCallback(func() { + env.SignalWorkflow(ResourceAcquiredSignalName, "mockResourceChannelName") + }, time.Millisecond*100) + + // 模拟发送到外部工作流的信号 + env.OnSignalExternalWorkflow(mock.Anything, mock.Anything, mock.Anything, + mock.Anything, mock.Anything).Return(nil) + + // 执行示例工作流 + env.ExecuteWorkflow(SampleWorkflowWithResourcePool, "test-resource-1", 2*time.Second) + + // 验证工作流执行完成且无错误 + s.True(env.IsWorkflowCompleted()) + s.NoError(env.GetWorkflowError()) + + // 验证活动和信号 + env.AssertExpectations(s.T()) +} + +// 测试资源池中多个工作流争抢资源 +func (s *UnitTestSuite) Test_ResourcePoolWorkflow() { + // 设置测试环境 + env := s.NewTestWorkflowEnvironment() + + // 执行资源池工作流 + env.ExecuteWorkflow(ResourcePoolWorkflow, "TestNamespace", "test-resource-2", 1, 10*time.Second) + + // 模拟第一个工作流请求资源 + request1 := ResourceRequest{ + WorkflowID: "workflow-1", + Priority: 1, + } + env.SignalWorkflow(RequestResourceSignalName, request1) + + // 等待处理第一个请求 + env.ExecuteWorkflow(func(ctx workflow.Context) error { + return workflow.Sleep(ctx, 100*time.Millisecond) + }) + + // 模拟第二个工作流请求资源(此时资源已被占用) + request2 := ResourceRequest{ + WorkflowID: "workflow-2", + Priority: 2, + } + env.SignalWorkflow(RequestResourceSignalName, request2) + + // 等待处理第二个请求 + env.ExecuteWorkflow(func(ctx workflow.Context) error { + return workflow.Sleep(ctx, 100*time.Millisecond) + }) + + // 模拟第一个工作流释放资源 + env.SignalWorkflow("resource-channel-workflow-1", "release") + + // 等待处理资源释放和分配给第二个工作流 + env.ExecuteWorkflow(func(ctx workflow.Context) error { + return workflow.Sleep(ctx, 100*time.Millisecond) + }) + + // 验证工作流仍在运行(资源池工作流永远不会自行完成) + s.False(env.IsWorkflowCompleted()) +} + +// 运行所有测试 +func TestUnitTestSuite(t *testing.T) { + suite.Run(t, new(UnitTestSuite)) +} diff --git a/queue/resource/main.go b/queue/resource/main.go new file mode 100644 index 00000000..fbb5bcc6 --- /dev/null +++ b/queue/resource/main.go @@ -0,0 +1,48 @@ +package main + +import ( + "context" + "log" + + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/worker" + + "github.com/temporalio/samples-go/queue" +) + +func main() { + // create temporal client + c, err := client.Dial(client.Options{ + HostPort: "localhost:7233", + }) + if err != nil { + log.Fatalln("can't create client", err) + } + defer c.Close() + + // create resource pool task queue worker + resourcePoolWorker := worker.New(c, "resource-pool", worker.Options{ + BackgroundActivityContext: context.WithValue(context.Background(), queue.ClientContextKey, c), + }) + + // register resource pool related activities and workflows + resourcePoolWorker.RegisterActivity(queue.SignalWithStartResourcePoolWorkflowActivity) + resourcePoolWorker.RegisterWorkflow(queue.ResourcePoolWorkflow) + resourcePoolWorker.RegisterWorkflow(queue.ResourcePoolWorkflowWithInitializer) + + resourcePoolWorker.RegisterActivity(queue.QueryResourcePoolStatusActivity) + resourcePoolWorker.RegisterActivity(queue.QueryResourceAllocationActivity) + + // start all workers + workerErr := make(chan error, 1) + + go func() { + workerErr <- resourcePoolWorker.Run(worker.InterruptCh()) + }() + + // wait for any worker to fail + err = <-workerErr + if err != nil { + log.Fatalln("worker run failed", err) + } +} diff --git a/queue/resource_pool.go b/queue/resource_pool.go new file mode 100644 index 00000000..aa242145 --- /dev/null +++ b/queue/resource_pool.go @@ -0,0 +1,1303 @@ +package queue + +import ( + "context" + "fmt" + "time" + + "go.temporal.io/sdk/activity" + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/temporal" + "go.temporal.io/sdk/workflow" +) + +// 资源池相关的信号常量 +const ( + // ResourceAcquiredSignalName 资源获取信号通道名称 + ResourceAcquiredSignalName = "resource-acquired-signal" + // RequestResourceSignalName 请求资源信号通道名称 + RequestResourceSignalName = "request-resource-signal" + // ResourceReleasedSignalName 资源释放信号通道名称 + ResourceReleasedSignalName = "resource-released-signal" + // UpdateResourcePoolSignalName 更新资源池大小信号 + UpdateResourcePoolSignalName = "update-resource-pool-signal" + // CancelRequestSignalName 取消资源请求信号 + CancelRequestSignalName = "cancel-request-signal" + // RequestCancelledSignalName 请求已取消信号 + RequestCancelledSignalName = "request-cancelled-signal" + + // 资源池查询相关常量 + // GetResourcePoolStatusQuery 获取资源池状态查询名称 + GetResourcePoolStatusQuery = "get-resource-pool-status-query" + // GetResourceAllocationQuery 获取资源分配情况查询名称 + GetResourceAllocationQuery = "get-resource-allocation-query" + + // ClientContextKey 客户端上下文键 + ClientContextKey ContextKey = "Client" + + // InitializerContextKey 初始化器上下文键 + InitializerContextKey ContextKey = "ResourcePoolInitializer" +) + +// 资源定义 +type Resource interface { + // GetID 获取资源ID + GetID() string + // GetMetadata 获取资源元数据 + GetMetadata() map[string]interface{} + // GetAcquiredTime 获取资源获取时间 + GetAcquiredTime() time.Time + // GetIndex 获取资源索引 + GetIndex() int +} + +// ResourcePool 资源池接口 +type ResourcePoolInterface interface { + // AcquireResource 获取资源 + AcquireResource(ctx workflow.Context, resourceID string, releaseTimeout time.Duration, cancelable bool) (ReleaseResourceFunc, Resource, error) + // GetNamespace 获取资源池命名空间 + GetNamespace() string + // GetRequesterID 获取请求方ID + GetRequesterID() string + // QueryResourcePoolStatus 查询资源池状态 + QueryResourcePoolStatus(ctx workflow.Context) (*ResourcePoolStatus, error) + // QueryResourceAllocation 查询资源分配详情 + QueryResourceAllocation(ctx workflow.Context) (*ResourceAllocation, error) +} + +// ResourcePoolInitializer 资源池初始化器接口 +type ResourcePoolInitializer interface { + // Initialize 初始化资源池 + Initialize(ctx workflow.Context, resourceID string, totalResources int) *ResourcePoolState + + // ExpandPool 扩展资源池 + ExpandPool(ctx workflow.Context, state *ResourcePoolState, currentSize int, newSize int, resourceID string) int + + // ShrinkPool 缩减资源池 + ShrinkPool(ctx workflow.Context, state *ResourcePoolState, currentSize int, newSize int) int +} + +// 类型定义 +type ( + // ContextKey 上下文键类型 + ContextKey string + + // ReleaseResourceFunc 释放资源的函数类型 + ReleaseResourceFunc func() error + + // InternalResource 内部资源结构 + InternalResource struct { + Info ResourceInfo // 资源信息 + Available bool // 是否可用 + } + + // ResourceRequest 资源请求结构 + ResourceRequest struct { + WorkflowID string // 请求工作流ID + Priority int // 优先级,数值越低优先级越高 + } + + // ResourceResponse 资源请求响应结构 + ResourceResponse struct { + ResourceChannelName string // 资源通道名称 + ResourceInfo ResourceInfo // 资源详细信息 + } + + // ResourcePoolState 资源池状态 + ResourcePoolState struct { + ResourcePool []InternalResource // 资源池 + AvailableCount int // 可用资源数 + WaitingQueue []ResourceRequest // 等待队列 + ActiveRequests map[string]int // 工作流ID -> 资源索引 + } + + // UpdateResourcePoolRequest 更新资源池请求 + UpdateResourcePoolRequest struct { + NewSize int // 新的资源池大小 + } + + // CancelRequestCommand 取消资源请求命令 + CancelRequestCommand struct { + WorkflowID string // 要取消的工作流ID + } + + // ResourcePoolStatus 资源池状态查询结果 + ResourcePoolStatus struct { + ResourceID string // 资源ID + TotalResources int // 总资源数 + AvailableCount int // 可用资源数 + WaitingCount int // 等待队列长度 + AllocatedCount int // 已分配资源数 + } + + // ResourceAllocation 资源分配详情 + ResourceAllocation struct { + Resources []ResourceDetail // 资源详情列表 + WaitingQueue []WaitingRequestInfo // 等待队列信息 + AllocatedCount int // 已分配资源数 + } + + // ResourceDetail 资源详情 + ResourceDetail struct { + ResourceID string // 资源ID + ResourceIndex int // 资源索引 + Available bool // 是否可用 + Metadata map[string]interface{} // 资源元数据 + AcquiredTime time.Time // 资源获取时间,仅当资源被占用时有效 + AssignedTo string // 分配给的工作流ID,仅当资源被占用时有效 + } + + // WaitingRequestInfo 等待队列中的请求信息 + WaitingRequestInfo struct { + WorkflowID string // 工作流ID + Priority int // 优先级 + QueuePosition int // 在队列中的位置 + } +) + +// ResourceInfo 资源信息结构,实现Resource接口 +type ResourceInfo struct { + ResourceID string // 资源ID + Metadata map[string]interface{} // 资源元数据 + AcquiredTime time.Time // 资源获取时间 + ResourceIndex int // 资源在池中的索引 +} + +// Resource接口实现 +func (ri *ResourceInfo) GetID() string { + return ri.ResourceID +} + +func (ri *ResourceInfo) GetMetadata() map[string]interface{} { + return ri.Metadata +} + +func (ri *ResourceInfo) GetAcquiredTime() time.Time { + return ri.AcquiredTime +} + +func (ri *ResourceInfo) GetIndex() int { + return ri.ResourceIndex +} + +// IsDefaultInitializer 检查是否为默认初始化器 +func IsDefaultInitializer(initializer ResourcePoolInitializer) bool { + _, ok := initializer.(*DefaultResourcePoolInitializer) + return ok +} + +// ResourcePoolClient 资源池客户端结构 +type ResourcePoolClient struct { + RequesterID string // 请求方工作流ID + PoolNamespace string // 资源池命名空间 + PoolExecution *workflow.Execution + Initializer ResourcePoolInitializer // 资源池初始化器 +} + +// NewResourcePool 初始化资源池客户端 +func NewResourcePool(requesterID string, poolNamespace string) *ResourcePoolClient { + return &ResourcePoolClient{ + RequesterID: requesterID, + PoolNamespace: poolNamespace, + Initializer: &DefaultResourcePoolInitializer{}, + } +} + +// NewResourcePoolWithInitializer 使用自定义初始化器创建资源池客户端 +func NewResourcePoolWithInitializer( + requesterID string, + poolNamespace string, + initializer ResourcePoolInitializer, +) *ResourcePoolClient { + return &ResourcePoolClient{ + RequesterID: requesterID, + PoolNamespace: poolNamespace, + Initializer: initializer, + } +} + +// GetNamespace 获取资源池命名空间 +func (rp *ResourcePoolClient) GetNamespace() string { + return rp.PoolNamespace +} + +// GetRequesterID 获取请求方ID +func (rp *ResourcePoolClient) GetRequesterID() string { + return rp.RequesterID +} + +// AcquireResource 获取资源,支持资源请求取消 +func (rp *ResourcePoolClient) AcquireResource(ctx workflow.Context, + resourceID string, releaseTimeout time.Duration, cancelable bool, +) (ReleaseResourceFunc, Resource, error) { + // 设置本地活动选项 + activityCtx := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{ + ScheduleToCloseTimeout: time.Minute, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: time.Second, + BackoffCoefficient: 2.0, + MaximumInterval: time.Minute, + MaximumAttempts: 5, + }, + }) + + // 获取当前workflow的信息用于日志 + logger := workflow.GetLogger(ctx) + logger.Info("尝试获取资源", "workflowID", rp.RequesterID, "resourceID", resourceID) + + var resourceResponse ResourceResponse + var execution workflow.Execution + + // 如果使用自定义初始化器,则使用带初始化器的活动 + if rp.Initializer != nil && !IsDefaultInitializer(rp.Initializer) { + // 发送信号并启动自定义资源池workflow + err := workflow.ExecuteLocalActivity(activityCtx, + SignalWithStartCustomResourcePoolWorkflowActivity, rp.PoolNamespace, + resourceID, rp.RequesterID, 0, releaseTimeout, + ResourcePoolWorkflowWithInitializer, + rp.PoolNamespace, resourceID, 1, releaseTimeout, rp.Initializer).Get(ctx, &execution) + if err != nil { + return nil, nil, err + } + } else { + // 使用默认初始化器 + err := workflow.ExecuteLocalActivity(activityCtx, + SignalWithStartResourcePoolWorkflowActivity, rp.PoolNamespace, + resourceID, rp.RequesterID, 0, releaseTimeout).Get(ctx, &execution) + if err != nil { + return nil, nil, err + } + } + + // 保存执行信息 + rp.PoolExecution = &execution + + // 如果请求可取消,提供等待与取消机制 + if cancelable { + // 等待资源获取或取消信号 + resourceCh := workflow.GetSignalChannel(ctx, ResourceAcquiredSignalName) + cancelCh := workflow.GetSignalChannel(ctx, RequestCancelledSignalName) + + // 使用选择器等待任一信号 + selector := workflow.NewSelector(ctx) + received := false + + // 处理资源获取信号 + selector.AddReceive(resourceCh, func(c workflow.ReceiveChannel, more bool) { + c.Receive(ctx, &resourceResponse) + received = true + }) + + // 处理取消信号 + selector.AddReceive(cancelCh, func(c workflow.ReceiveChannel, more bool) { + c.Receive(ctx, nil) + logger.Info("资源请求被取消", "workflowID", rp.RequesterID) + received = false // 标记为未收到资源 + }) + + // 等待任一信号 + selector.Select(ctx) + + // 如果未收到资源获取信号,表示取消了 + if !received { + return nil, nil, fmt.Errorf("资源请求已被取消") + } + } else { + // 普通不可取消的获取,直接等待获取信号 + workflow.GetSignalChannel(ctx, ResourceAcquiredSignalName). + Receive(ctx, &resourceResponse) + } + + logger.Info("成功获取资源", "workflowID", rp.RequesterID, "resourceID", resourceID, + "resourceInfo", resourceResponse.ResourceInfo) + + // 创建资源释放函数 + releaseFunc := func() error { + logger.Info("释放资源", "workflowID", rp.RequesterID, "resourceID", resourceID) + return workflow.SignalExternalWorkflow(ctx, execution.ID, execution.RunID, + resourceResponse.ResourceChannelName, "releaseResource").Get(ctx, nil) + } + return releaseFunc, &resourceResponse.ResourceInfo, nil +} + +// ResizePool 调整资源池大小 +func (rp *ResourcePoolClient) ResizePool(ctx workflow.Context, newSize int) error { + if rp.PoolExecution == nil { + return fmt.Errorf("资源池未初始化") + } + + return UpdateResourcePool(ctx, *rp.PoolExecution, newSize) +} + +// CancelRequest 取消资源请求 +func (rp *ResourcePoolClient) CancelRequest(ctx workflow.Context, workflowID string) error { + if rp.PoolExecution == nil { + return fmt.Errorf("资源池未初始化") + } + + return CancelResourceRequest(ctx, *rp.PoolExecution, workflowID) +} + +// ResourcePoolWorkflowImpl 资源池工作流结构体 +type ResourcePoolWorkflowImpl struct { + Namespace string // 命名空间 + ResourceID string // 资源ID + ReleaseTimeout time.Duration // 资源释放超时时间 + Initializer ResourcePoolInitializer // 资源池初始化器 + State *ResourcePoolState // 资源池状态 +} + +// NewResourcePoolWorkflow 创建新的资源池工作流 +func NewResourcePoolWorkflow( + ctx workflow.Context, + namespace string, + resourceID string, + totalResources int, + releaseTimeout time.Duration, + initializer ResourcePoolInitializer, +) *ResourcePoolWorkflowImpl { + if initializer == nil { + initializer = &DefaultResourcePoolInitializer{} + } + + rpw := &ResourcePoolWorkflowImpl{ + Namespace: namespace, + ResourceID: resourceID, + ReleaseTimeout: releaseTimeout, + Initializer: initializer, + } + + // 将初始化器存入上下文中 + ctx = workflow.WithValue(ctx, InitializerContextKey, initializer) + + // 使用初始化器创建资源池状态 + if totalResources <= 0 { + totalResources = 1 // 默认至少有一个资源 + } + + rpw.State = initializer.Initialize(ctx, resourceID, totalResources) + return rpw +} + +// Run 运行资源池工作流 +func (rpw *ResourcePoolWorkflowImpl) Run(ctx workflow.Context) error { + logger := workflow.GetLogger(ctx) + workflowInfo := workflow.GetInfo(ctx) + logger.Info("资源池工作流启动", + "workflowID", workflowInfo.WorkflowExecution.ID, + "resourceID", rpw.ResourceID) + + // 设置信号处理通道 + requestResourceCh := workflow.GetSignalChannel(ctx, RequestResourceSignalName) + updateResourcePoolCh := workflow.GetSignalChannel(ctx, UpdateResourcePoolSignalName) + cancelRequestCh := workflow.GetSignalChannel(ctx, CancelRequestSignalName) + + // 注册查询处理器 + if err := workflow.SetQueryHandler(ctx, GetResourcePoolStatusQuery, rpw.handleResourcePoolStatusQuery); err != nil { + logger.Error("注册资源池状态查询处理器失败", "Error", err) + return err + } + + if err := workflow.SetQueryHandler(ctx, GetResourceAllocationQuery, rpw.handleResourceAllocationQuery); err != nil { + logger.Error("注册资源分配详情查询处理器失败", "Error", err) + return err + } + + // 无限循环处理资源请求 + for { + // 使用选择器处理不同的事件 + selector := workflow.NewSelector(ctx) + + // 处理资源请求信号 + selector.AddReceive(requestResourceCh, func(c workflow.ReceiveChannel, _ bool) { + var request ResourceRequest + c.Receive(ctx, &request) + rpw.HandleResourceRequest(ctx, request) + }) + + // 处理资源池大小更新信号 + selector.AddReceive(updateResourcePoolCh, func(c workflow.ReceiveChannel, _ bool) { + var updateRequest UpdateResourcePoolRequest + c.Receive(ctx, &updateRequest) + rpw.HandlePoolSizeUpdate(ctx, updateRequest) + }) + + // 处理取消资源请求信号 + selector.AddReceive(cancelRequestCh, func(c workflow.ReceiveChannel, _ bool) { + var cancelCmd CancelRequestCommand + c.Receive(ctx, &cancelCmd) + rpw.HandleCancelRequest(ctx, cancelCmd) + }) + + // 执行选择器 + selector.Select(ctx) + } +} + +// HandleResourceRequest 处理资源请求 +func (rpw *ResourcePoolWorkflowImpl) HandleResourceRequest( + ctx workflow.Context, + request ResourceRequest, +) { + logger := workflow.GetLogger(ctx) + logger.Info("收到资源请求", + "requesterID", request.WorkflowID, + "availableCount", rpw.State.AvailableCount) + + if rpw.State.AvailableCount > 0 { + // 有可用资源,查找可用资源索引 + resourceIndex := rpw.FindAvailableResource() + if resourceIndex == -1 { + logger.Error("资源计数与实际可用资源不一致", + "availableCount", rpw.State.AvailableCount) + return + } + + // 分配资源 + rpw.AllocateResource(ctx, resourceIndex, request) + } else { + // 没有可用资源,加入等待队列 + logger.Info("资源不可用,添加到等待队列", "requesterID", request.WorkflowID) + rpw.State.WaitingQueue = append(rpw.State.WaitingQueue, request) + } +} + +// FindAvailableResource 查找可用资源 +func (rpw *ResourcePoolWorkflowImpl) FindAvailableResource() int { + for i, res := range rpw.State.ResourcePool { + if res.Available { + return i + } + } + return -1 +} + +// AllocateResource 分配资源 +func (rpw *ResourcePoolWorkflowImpl) AllocateResource( + ctx workflow.Context, + resourceIndex int, + request ResourceRequest, +) { + // 标记资源为已占用 + rpw.State.ResourcePool[resourceIndex].Available = false + rpw.State.ResourcePool[resourceIndex].Info.AcquiredTime = workflow.Now(ctx) + rpw.State.AvailableCount-- + rpw.State.ActiveRequests[request.WorkflowID] = resourceIndex + + // 生成唯一的资源通道名称 - 不再使用SideEffect + resourceChannelName := generateResourceChannelName(request.WorkflowID) + + // 准备资源响应 + response := ResourceResponse{ + ResourceChannelName: resourceChannelName, + ResourceInfo: rpw.State.ResourcePool[resourceIndex].Info, + } + + // 通知请求方已获取资源 + err := workflow.SignalExternalWorkflow(ctx, request.WorkflowID, "", + ResourceAcquiredSignalName, response).Get(ctx, nil) + if err != nil { + logger := workflow.GetLogger(ctx) + logger.Info("通知请求方失败,资源将被重新释放", "Error", err) + rpw.State.ResourcePool[resourceIndex].Available = true + rpw.State.AvailableCount++ + delete(rpw.State.ActiveRequests, request.WorkflowID) + return + } + + // 创建资源释放通道 + releaseChannel := workflow.GetSignalChannel(ctx, resourceChannelName) + + // 在新协程中处理资源释放和超时 + workflow.Go(ctx, func(childCtx workflow.Context) { + childSelector := workflow.NewSelector(childCtx) + + // 添加超时处理 + if rpw.ReleaseTimeout > 0 { + childSelector.AddFuture(workflow.NewTimer(childCtx, rpw.ReleaseTimeout), func(f workflow.Future) { + rpw.HandleResourceRelease(childCtx, resourceIndex, request.WorkflowID, "timeout") + }) + } + + // 添加资源释放信号处理 + childSelector.AddReceive(releaseChannel, func(c workflow.ReceiveChannel, more bool) { + var ack string + c.Receive(childCtx, &ack) + rpw.HandleResourceRelease(childCtx, resourceIndex, request.WorkflowID, "release") + }) + + // 等待任一事件发生 + childSelector.Select(childCtx) + }) +} + +// HandleResourceRelease 处理资源释放 +func (rpw *ResourcePoolWorkflowImpl) HandleResourceRelease( + ctx workflow.Context, + resourceIndex int, + requesterID string, + reason string, +) { + logger := workflow.GetLogger(ctx) + if reason == "timeout" { + logger.Info("资源使用超时,自动释放", "requesterID", requesterID) + } else { + logger.Info("资源已被释放", "requesterID", requesterID) + } + + // 释放资源 + rpw.State.ResourcePool[resourceIndex].Available = true + rpw.State.AvailableCount++ + delete(rpw.State.ActiveRequests, requesterID) + + // 处理等待队列中的下一个请求 + if len(rpw.State.WaitingQueue) > 0 { + rpw.ProcessNextRequest(ctx) + } +} + +// HandlePoolSizeUpdate 处理资源池大小更新 +func (rpw *ResourcePoolWorkflowImpl) HandlePoolSizeUpdate( + ctx workflow.Context, + updateRequest UpdateResourcePoolRequest, +) { + logger := workflow.GetLogger(ctx) + logger.Info("收到资源池大小更新请求", + "当前大小", len(rpw.State.ResourcePool), + "新大小", updateRequest.NewSize, + "可用资源", rpw.State.AvailableCount) + + if updateRequest.NewSize <= 0 { + logger.Info("资源池大小必须大于0,忽略本次更新") + return + } + + currentSize := len(rpw.State.ResourcePool) + // 处理扩容 + if updateRequest.NewSize > currentSize { + rpw.ExpandResourcePool(ctx, updateRequest.NewSize) + } else if updateRequest.NewSize < currentSize { + rpw.ShrinkResourcePool(ctx, updateRequest.NewSize) + } +} + +// ExpandResourcePool 扩展资源池 +func (rpw *ResourcePoolWorkflowImpl) ExpandResourcePool( + ctx workflow.Context, + newSize int, +) { + logger := workflow.GetLogger(ctx) + currentSize := len(rpw.State.ResourcePool) + + // 使用初始化器扩展资源池 + addedCount := rpw.Initializer.ExpandPool(ctx, rpw.State, currentSize, newSize, rpw.ResourceID) + + // 更新可用资源计数 + rpw.State.AvailableCount += addedCount + + logger.Info("资源池扩大", + "新增资源数", addedCount, + "当前可用资源", rpw.State.AvailableCount) + + // 处理等待队列中的请求 + rpw.ProcessWaitingRequests(ctx) +} + +// ShrinkResourcePool 缩减资源池 +func (rpw *ResourcePoolWorkflowImpl) ShrinkResourcePool( + ctx workflow.Context, + newSize int, +) { + logger := workflow.GetLogger(ctx) + currentSize := len(rpw.State.ResourcePool) + + // 使用初始化器缩减资源池 + reducedCount := rpw.Initializer.ShrinkPool(ctx, rpw.State, currentSize, newSize) + + // 更新可用计数 + if rpw.State.AvailableCount >= reducedCount { + rpw.State.AvailableCount -= reducedCount + logger.Info("资源池缩小", + "减少资源数", reducedCount, + "当前可用资源", rpw.State.AvailableCount) + } else { + logger.Info("资源计数不一致,将进行修正", + "oldCount", rpw.State.AvailableCount, + "newCount", 0) + rpw.State.AvailableCount = 0 + } +} + +// HandleCancelRequest 处理取消资源请求 +func (rpw *ResourcePoolWorkflowImpl) HandleCancelRequest( + ctx workflow.Context, + cancelCmd CancelRequestCommand, +) { + logger := workflow.GetLogger(ctx) + logger.Info("收到取消资源请求命令", "workflowID", cancelCmd.WorkflowID) + + // 检查是否在等待队列中 + for i, req := range rpw.State.WaitingQueue { + if req.WorkflowID == cancelCmd.WorkflowID { + // 从等待队列中移除 + rpw.State.WaitingQueue = append(rpw.State.WaitingQueue[:i], rpw.State.WaitingQueue[i+1:]...) + logger.Info("已从等待队列中移除工作流", "workflowID", cancelCmd.WorkflowID) + + // 通知工作流资源请求已取消 + err := workflow.SignalExternalWorkflow(ctx, cancelCmd.WorkflowID, "", + RequestCancelledSignalName, nil).Get(ctx, nil) + if err != nil { + logger.Info("通知工作流取消失败", "Error", err) + } + + return + } + } + + logger.Info("工作流不在等待队列中", "workflowID", cancelCmd.WorkflowID) +} + +// ProcessNextRequest 处理等待队列中的下一个请求 +func (rpw *ResourcePoolWorkflowImpl) ProcessNextRequest( + ctx workflow.Context, +) { + if len(rpw.State.WaitingQueue) == 0 || rpw.State.AvailableCount <= 0 { + return + } + + logger := workflow.GetLogger(ctx) + + // 查找可用资源 + resourceIndex := rpw.FindAvailableResource() + if resourceIndex == -1 { + logger.Error("资源计数与实际可用资源不一致", + "availableCount", rpw.State.AvailableCount) + return + } + + // 取出下一个请求 + nextRequest := rpw.State.WaitingQueue[0] + rpw.State.WaitingQueue = rpw.State.WaitingQueue[1:] + + // 分配资源 + rpw.AllocateResource(ctx, resourceIndex, nextRequest) +} + +// ProcessWaitingRequests 处理所有等待中的请求 +func (rpw *ResourcePoolWorkflowImpl) ProcessWaitingRequests( + ctx workflow.Context, +) { + // 循环处理等待队列中的请求,直到没有可用资源或队列为空 + for rpw.State.AvailableCount > 0 && len(rpw.State.WaitingQueue) > 0 { + // 处理下一个请求 + rpw.ProcessNextRequest(ctx) + } + + logger := workflow.GetLogger(ctx) + logger.Info("等待队列处理完成", + "剩余可用资源", rpw.State.AvailableCount, + "等待队列长度", len(rpw.State.WaitingQueue)) +} + +// ResourcePoolWorkflow 资源池工作流,管理资源的分配和释放 +func ResourcePoolWorkflow( + ctx workflow.Context, + namespace string, + resourceID string, + totalResources int, + releaseTimeout time.Duration, +) error { + return ResourcePoolWorkflowWithInitializer( + ctx, + namespace, + resourceID, + totalResources, + releaseTimeout, + &DefaultResourcePoolInitializer{}, + ) +} + +// ResourcePoolWorkflowWithInitializer 带初始化器的资源池工作流 +func ResourcePoolWorkflowWithInitializer( + ctx workflow.Context, + namespace string, + resourceID string, + totalResources int, + releaseTimeout time.Duration, + initializer ResourcePoolInitializer, +) error { + // 创建工作流结构体并运行 + rpw := NewResourcePoolWorkflow( + ctx, + namespace, + resourceID, + totalResources, + releaseTimeout, + initializer, + ) + + return rpw.Run(ctx) +} + +// 以下为现有的外部接口,保持兼容性 + +// SignalWithStartResourcePoolWorkflowActivity 发送信号并启动资源池工作流 +func SignalWithStartResourcePoolWorkflowActivity( + ctx context.Context, + namespace string, + resourceID string, + requesterWorkflowID string, + priority int, + releaseTimeout time.Duration, +) (*workflow.Execution, error) { + c, ok := ctx.Value(ClientContextKey).(client.Client) + if !ok || c == nil { + return nil, fmt.Errorf("无法从上下文中获取有效的 Temporal 客户端") + } + + // 生成资源池工作流的唯一ID + workflowID := fmt.Sprintf( + "%s:%s:%s", + "resource-pool", + namespace, + resourceID, + ) + + // 配置工作流选项 + workflowOptions := client.StartWorkflowOptions{ + ID: workflowID, + TaskQueue: "resource-pool", + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: time.Second, + BackoffCoefficient: 2.0, + MaximumInterval: time.Minute, + MaximumAttempts: 5, + }, + } + + // 创建资源请求 + request := ResourceRequest{ + WorkflowID: requesterWorkflowID, + Priority: priority, + } + + // 发送信号并启动工作流 + wr, err := c.SignalWithStartWorkflow( + ctx, workflowID, RequestResourceSignalName, request, + workflowOptions, ResourcePoolWorkflow, namespace, resourceID, 1, releaseTimeout) + if err != nil { + activity.GetLogger(ctx).Error("无法发送信号和启动工作流", "Error", err) + return nil, err + } + + activity.GetLogger(ctx).Info("信号发送和工作流启动成功", + "WorkflowID", wr.GetID(), + "RunID", wr.GetRunID()) + + return &workflow.Execution{ + ID: wr.GetID(), + RunID: wr.GetRunID(), + }, nil +} + +// SignalWithStartCustomResourcePoolWorkflowActivity 发送信号并启动自定义资源池工作流 +func SignalWithStartCustomResourcePoolWorkflowActivity( + ctx context.Context, + namespace string, + resourceID string, + requesterWorkflowID string, + priority int, + releaseTimeout time.Duration, + workflowFunc interface{}, + workflowArgs ...interface{}, +) (*workflow.Execution, error) { + c, ok := ctx.Value(ClientContextKey).(client.Client) + if !ok || c == nil { + return nil, fmt.Errorf("无法从上下文中获取有效的 Temporal 客户端") + } + + // 生成资源池工作流的唯一ID + workflowID := fmt.Sprintf( + "%s:%s:%s", + "resource-pool", + namespace, + resourceID, + ) + + // 配置工作流选项 + workflowOptions := client.StartWorkflowOptions{ + ID: workflowID, + TaskQueue: "resource-pool", + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: time.Second, + BackoffCoefficient: 2.0, + MaximumInterval: time.Minute, + MaximumAttempts: 5, + }, + } + + // 创建资源请求 + request := ResourceRequest{ + WorkflowID: requesterWorkflowID, + Priority: priority, + } + + // 发送信号并启动工作流 + wr, err := c.SignalWithStartWorkflow( + ctx, workflowID, RequestResourceSignalName, request, + workflowOptions, workflowFunc, workflowArgs...) + if err != nil { + activity.GetLogger(ctx).Error("无法发送信号和启动自定义工作流", "Error", err) + return nil, err + } + + activity.GetLogger(ctx).Info("信号发送和自定义工作流启动成功", + "WorkflowID", wr.GetID(), + "RunID", wr.GetRunID()) + + return &workflow.Execution{ + ID: wr.GetID(), + RunID: wr.GetRunID(), + }, nil +} + +// generateResourceChannelName 生成资源通道名称 +func generateResourceChannelName(requesterWorkflowID string) string { + return fmt.Sprintf("resource-channel-%s", requesterWorkflowID) +} + +// UpdateResourcePool 更新资源池大小 +func UpdateResourcePool( + ctx workflow.Context, + execution workflow.Execution, + newSize int, +) error { + logger := workflow.GetLogger(ctx) + + // 创建更新请求 + updateRequest := UpdateResourcePoolRequest{ + NewSize: newSize, + } + + // 发送更新信号 + err := workflow.SignalExternalWorkflow(ctx, execution.ID, execution.RunID, + UpdateResourcePoolSignalName, updateRequest).Get(ctx, nil) + if err != nil { + logger.Error("更新资源池失败", "Error", err) + return err + } + + logger.Info("已发送资源池更新请求", "newSize", newSize) + return nil +} + +// CancelResourceRequest 取消在等待队列中的资源请求 +func CancelResourceRequest( + ctx workflow.Context, + execution workflow.Execution, + workflowID string, +) error { + logger := workflow.GetLogger(ctx) + + // 创建取消命令 + cancelCmd := CancelRequestCommand{ + WorkflowID: workflowID, + } + + // 发送取消信号 + err := workflow.SignalExternalWorkflow(ctx, execution.ID, execution.RunID, + CancelRequestSignalName, cancelCmd).Get(ctx, nil) + if err != nil { + logger.Error("取消资源请求失败", "Error", err) + return err + } + + logger.Info("已发送取消资源请求命令", "workflowID", workflowID) + return nil +} + +// ================================================ +// 默认资源池初始化器 +// ================================================ + +// DefaultResourcePoolInitializer 默认资源池初始化器 +type DefaultResourcePoolInitializer struct{} + +// Initialize 初始化默认资源池 +func (di *DefaultResourcePoolInitializer) Initialize( + ctx workflow.Context, + resourceID string, + totalResources int, +) *ResourcePoolState { + // 创建资源池状态 + state := &ResourcePoolState{ + ResourcePool: make([]InternalResource, totalResources), + AvailableCount: totalResources, + WaitingQueue: []ResourceRequest{}, + ActiveRequests: make(map[string]int), + } + + // 初始化资源池 + for i := 0; i < totalResources; i++ { + state.ResourcePool[i] = InternalResource{ + Info: ResourceInfo{ + ResourceID: fmt.Sprintf("%s-%d", resourceID, i), + Metadata: map[string]interface{}{"poolIndex": i}, + ResourceIndex: i, + }, + Available: true, + } + } + + return state +} + +// ExpandPool 默认的资源池扩展实现 +func (di *DefaultResourcePoolInitializer) ExpandPool( + ctx workflow.Context, + state *ResourcePoolState, + currentSize int, + newSize int, + resourceID string, +) int { + // 扩展资源池 + for i := currentSize; i < newSize; i++ { + state.ResourcePool = append(state.ResourcePool, InternalResource{ + Info: ResourceInfo{ + ResourceID: fmt.Sprintf("%s-%d", resourceID, i), + Metadata: map[string]interface{}{"poolIndex": i}, + ResourceIndex: i, + }, + Available: true, + }) + } + + // 返回新增的可用资源数量 + return newSize - currentSize +} + +// ShrinkPool 默认的资源池缩减实现 +func (di *DefaultResourcePoolInitializer) ShrinkPool( + ctx workflow.Context, + state *ResourcePoolState, + currentSize int, + newSize int, +) int { + logger := workflow.GetLogger(ctx) + + // 计算需要减少的资源数量 + reduceCount := currentSize - newSize + if reduceCount <= 0 { + return 0 // 没有需要缩减的资源 + } + + // 处理缩容,先统计可用资源数量 + availableCount := 0 + for i := currentSize - 1; i >= 0; i-- { + if state.ResourcePool[i].Available { + availableCount++ + } + } + + logger.Info("开始资源池缩容", + "当前资源数", currentSize, + "目标资源数", newSize, + "可用资源数", availableCount, + "已使用资源数", currentSize-availableCount) + + // 确定可以立即缩减的资源数量 + immediateReduceCount := min(availableCount, reduceCount) + + // 标记要缩减的资源 + availableReduceCount := 0 + pendingReduceIndexes := make([]int, 0, reduceCount-immediateReduceCount) + + // 从后向前缩减可用资源 + for i := currentSize - 1; i >= 0 && availableReduceCount < immediateReduceCount; i-- { + if state.ResourcePool[i].Available { + // 标记为不可用的资源 + state.ResourcePool[i].Available = false + availableReduceCount++ + } else if availableReduceCount+len(pendingReduceIndexes) < reduceCount { + // 记录需要延迟缩减的资源索引 + pendingReduceIndexes = append(pendingReduceIndexes, i) + } + } + + logger.Info("资源池缩容进展", + "立即缩减资源数", availableReduceCount, + "待延迟缩减资源数", len(pendingReduceIndexes)) + + // 如果有需要延迟缩减的资源,则启动观察协程 + if len(pendingReduceIndexes) > 0 { + workflow.Go(ctx, func(ctx workflow.Context) { + di.monitorPendingResourcesForShrink(ctx, state, pendingReduceIndexes) + }) + } + + // 返回已经减少的可用资源数 + return availableReduceCount +} + +// monitorPendingResourcesForShrink 监控延迟缩减的资源 +func (di *DefaultResourcePoolInitializer) monitorPendingResourcesForShrink( + ctx workflow.Context, + state *ResourcePoolState, + pendingIndexes []int, +) { + logger := workflow.GetLogger(ctx) + logger.Info("启动延迟缩容监控", "待缩减资源数", len(pendingIndexes)) + + // 创建待观察资源的映射,方便查找 + pendingMap := make(map[int]bool) + for _, idx := range pendingIndexes { + pendingMap[idx] = true + } + + // 每隔一段时间检查一次资源状态 + for len(pendingMap) > 0 { + // 等待一段时间再检查 + _ = workflow.Sleep(ctx, 30*time.Second) + + // 检查是否有资源已经释放 + for idx := range pendingMap { + if idx < len(state.ResourcePool) && state.ResourcePool[idx].Available { + // 资源已释放,可以缩减 + state.ResourcePool[idx].Available = false + logger.Info("延迟缩容:资源已释放,标记为不可用", "resourceIndex", idx) + delete(pendingMap, idx) + + // 更新可用资源计数 + if state.AvailableCount > 0 { + state.AvailableCount-- + } + } + } + + // 记录剩余待缩减资源数 + if len(pendingMap) > 0 { + logger.Info("延迟缩容进行中", "剩余待缩减资源数", len(pendingMap)) + } + } + + logger.Info("延迟缩容完成,所有标记的资源都已缩减") +} + +// min 返回两个整数中的较小值 +func min(a, b int) int { + if a < b { + return a + } + return b +} + +// QueryResourcePoolStatus 查询资源池状态 +func (rp *ResourcePoolClient) QueryResourcePoolStatus(ctx workflow.Context) (*ResourcePoolStatus, error) { + if rp.PoolExecution == nil { + return nil, fmt.Errorf("资源池未初始化") + } + + // 使用查询接口获取资源池状态 + logger := workflow.GetLogger(ctx) + logger.Info("查询资源池状态", "resourcePoolID", rp.PoolExecution.ID) + + // 设置本地活动选项 + activityCtx := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{ + ScheduleToCloseTimeout: time.Minute, + }) + + var result ResourcePoolStatus + err := workflow.ExecuteLocalActivity(activityCtx, QueryResourcePoolStatusActivity, + rp.PoolExecution.ID, rp.PoolExecution.RunID).Get(ctx, &result) + if err != nil { + logger.Error("查询资源池状态失败", "Error", err) + return nil, err + } + + return &result, nil +} + +// QueryResourceAllocation 查询资源分配详情 +func (rp *ResourcePoolClient) QueryResourceAllocation(ctx workflow.Context) (*ResourceAllocation, error) { + if rp.PoolExecution == nil { + return nil, fmt.Errorf("资源池未初始化") + } + + // 使用查询接口获取资源分配详情 + logger := workflow.GetLogger(ctx) + logger.Info("查询资源分配详情", "resourcePoolID", rp.PoolExecution.ID) + + // 设置本地活动选项 + activityCtx := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{ + ScheduleToCloseTimeout: time.Minute, + }) + + var result ResourceAllocation + err := workflow.ExecuteLocalActivity(activityCtx, QueryResourceAllocationActivity, + rp.PoolExecution.ID, rp.PoolExecution.RunID).Get(ctx, &result) + if err != nil { + logger.Error("查询资源分配详情失败", "Error", err) + return nil, err + } + + return &result, nil +} + +// QueryResourcePoolStatus 查询资源池状态 +func QueryResourcePoolStatus( + ctx workflow.Context, + execution workflow.Execution, +) (*ResourcePoolStatus, error) { + logger := workflow.GetLogger(ctx) + + // 设置本地活动选项 + activityCtx := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{ + ScheduleToCloseTimeout: time.Minute, + }) + + var result ResourcePoolStatus + err := workflow.ExecuteLocalActivity(activityCtx, QueryResourcePoolStatusActivity, + execution.ID, execution.RunID).Get(ctx, &result) + if err != nil { + logger.Error("查询资源池状态失败", "Error", err) + return nil, err + } + + return &result, nil +} + +// QueryResourceAllocation 查询资源分配详情 +func QueryResourceAllocation( + ctx workflow.Context, + execution workflow.Execution, +) (*ResourceAllocation, error) { + logger := workflow.GetLogger(ctx) + + // 设置本地活动选项 + activityCtx := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{ + ScheduleToCloseTimeout: time.Minute, + }) + + var result ResourceAllocation + err := workflow.ExecuteLocalActivity(activityCtx, QueryResourceAllocationActivity, + execution.ID, execution.RunID).Get(ctx, &result) + if err != nil { + logger.Error("查询资源分配详情失败", "Error", err) + return nil, err + } + + return &result, nil +} + +// QueryResourcePoolStatusActivity 查询资源池状态活动 +func QueryResourcePoolStatusActivity( + ctx context.Context, + workflowID string, + runID string, +) (ResourcePoolStatus, error) { + c, ok := ctx.Value(ClientContextKey).(client.Client) + if !ok || c == nil { + return ResourcePoolStatus{}, fmt.Errorf("无法从上下文中获取有效的 Temporal 客户端") + } + + // 使用客户端查询工作流 + resp, err := c.QueryWorkflow(ctx, workflowID, runID, GetResourcePoolStatusQuery) + if err != nil { + activity.GetLogger(ctx).Error("查询资源池状态失败", "Error", err) + return ResourcePoolStatus{}, err + } + + var result ResourcePoolStatus + if err := resp.Get(&result); err != nil { + activity.GetLogger(ctx).Error("解析查询结果失败", "Error", err) + return ResourcePoolStatus{}, err + } + + return result, nil +} + +// QueryResourceAllocationActivity 查询资源分配详情活动 +func QueryResourceAllocationActivity( + ctx context.Context, + workflowID string, + runID string, +) (ResourceAllocation, error) { + c, ok := ctx.Value(ClientContextKey).(client.Client) + if !ok || c == nil { + return ResourceAllocation{}, fmt.Errorf("无法从上下文中获取有效的 Temporal 客户端") + } + + // 使用客户端查询工作流 + resp, err := c.QueryWorkflow(ctx, workflowID, runID, GetResourceAllocationQuery) + if err != nil { + activity.GetLogger(ctx).Error("查询资源分配详情失败", "Error", err) + return ResourceAllocation{}, err + } + + var result ResourceAllocation + if err := resp.Get(&result); err != nil { + activity.GetLogger(ctx).Error("解析查询结果失败", "Error", err) + return ResourceAllocation{}, err + } + + return result, nil +} + +// handleResourcePoolStatusQuery 处理资源池状态查询 +func (rpw *ResourcePoolWorkflowImpl) handleResourcePoolStatusQuery() (ResourcePoolStatus, error) { + totalResources := len(rpw.State.ResourcePool) + allocatedCount := totalResources - rpw.State.AvailableCount + + return ResourcePoolStatus{ + ResourceID: rpw.ResourceID, + TotalResources: totalResources, + AvailableCount: rpw.State.AvailableCount, + WaitingCount: len(rpw.State.WaitingQueue), + AllocatedCount: allocatedCount, + }, nil +} + +// handleResourceAllocationQuery 处理资源分配详情查询 +func (rpw *ResourcePoolWorkflowImpl) handleResourceAllocationQuery() (ResourceAllocation, error) { + // 构建资源详情列表 + resources := make([]ResourceDetail, len(rpw.State.ResourcePool)) + + // 填充资源详情 + for i, res := range rpw.State.ResourcePool { + resourceDetail := ResourceDetail{ + ResourceID: res.Info.ResourceID, + ResourceIndex: res.Info.ResourceIndex, + Available: res.Available, + Metadata: res.Info.Metadata, + } + + // 如果资源不可用,查找它被分配给的工作流 + if !res.Available { + for workflowID, resIndex := range rpw.State.ActiveRequests { + if resIndex == i { + resourceDetail.AssignedTo = workflowID + resourceDetail.AcquiredTime = res.Info.AcquiredTime + break + } + } + } + + resources[i] = resourceDetail + } + + // 构建等待队列信息 + waitingQueue := make([]WaitingRequestInfo, len(rpw.State.WaitingQueue)) + for i, req := range rpw.State.WaitingQueue { + waitingQueue[i] = WaitingRequestInfo{ + WorkflowID: req.WorkflowID, + Priority: req.Priority, + QueuePosition: i + 1, + } + } + + return ResourceAllocation{ + Resources: resources, + WaitingQueue: waitingQueue, + AllocatedCount: len(rpw.State.ActiveRequests), + }, nil +} diff --git a/queue/starter/main.go b/queue/starter/main.go new file mode 100644 index 00000000..c54757dd --- /dev/null +++ b/queue/starter/main.go @@ -0,0 +1,535 @@ +package main + +import ( + "context" + "encoding/json" + "flag" + "fmt" + "log" + "strings" + "time" + + "github.com/pborman/uuid" + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/workflow" + + "github.com/temporalio/samples-go/queue" +) + +// define command line parameters +var ( + testMode = flag.String("test", "basic", "test mode: basic=basic competition test, query=query function test, pool=inspect internal resource pool") + poolID = flag.String("poolid", "", "specify the resource pool ID to query, format: 'resource-pool:{namespace}:{resourceID}'") +) + +func main() { + // parse command line parameters + flag.Parse() + + // create temporal client + c, err := client.Dial(client.Options{ + HostPort: "localhost:7233", + }) + if err != nil { + log.Fatalln("can't create client", err) + } + defer c.Close() + + // execute different tests based on test mode + switch *testMode { + case "basic": + runBasicTest(c) + case "query": + runQueryTest(c) + case "pool": + if *poolID == "" { + log.Fatalln("when using pool mode, you must specify the resource pool ID through the -poolid parameter") + } + runPoolInspectorTest(c, *poolID) + default: + log.Fatalln("unknown test mode:", *testMode) + } +} + +// runBasicTest 运行基本的资源竞争测试 +func runBasicTest(c client.Client) { + // 生成一个随机的资源ID,此ID可以是业务逻辑标识符 + resourceID := "resource-" + uuid.New() + + // 准备资源池相关信息 + resourcePoolWorkflowID := "resource-pool:ResourcePoolDemo:" + resourceID + + // start multiple workflow instances that will compete for the same resource + // each workflow will attempt to acquire the resource, complete work, and then release the resource + + // configure first workflow + workflow1ID := "QueueWorkflow1_" + uuid.New() + workflow1Options := client.StartWorkflowOptions{ + ID: workflow1ID, + TaskQueue: "queue-sample", + } + + // configure second workflow + workflow2ID := "QueueWorkflow2_" + uuid.New() + workflow2Options := client.StartWorkflowOptions{ + ID: workflow2ID, + TaskQueue: "queue-sample", + } + + // configure third workflow + workflow3ID := "QueueWorkflow3_" + uuid.New() + workflow3Options := client.StartWorkflowOptions{ + ID: workflow3ID, + TaskQueue: "queue-sample", + } + + // configure fourth workflow + workflow4ID := "QueueWorkflow4_" + uuid.New() + workflow4Options := client.StartWorkflowOptions{ + ID: workflow4ID, + TaskQueue: "queue-sample", + } + + // start first workflow (processing time 5 seconds, cancelable) + we, err := c.ExecuteWorkflow(context.Background(), workflow1Options, + queue.SampleWorkflowWithResourcePool, resourceID, 5*time.Second, true) + if err != nil { + log.Fatalln("can't execute workflow1", err) + } + log.Println("workflow1 started", "WorkflowID", we.GetID(), "RunID", we.GetRunID()) + + // start second workflow (processing time 7 seconds, cancelable) + we, err = c.ExecuteWorkflow(context.Background(), workflow2Options, + queue.SampleWorkflowWithResourcePool, resourceID, 7*time.Second, true) + if err != nil { + log.Fatalln("failed to execute workflow2", err) + } + log.Println("workflow2 started", "WorkflowID", we.GetID(), "RunID", we.GetRunID()) + + // start third workflow (processing time 3 seconds, cancelable) + we, err = c.ExecuteWorkflow(context.Background(), workflow3Options, + queue.SampleWorkflowWithResourcePool, resourceID, 3*time.Second, true) + if err != nil { + log.Fatalln("failed to execute workflow3", err) + } + log.Println("workflow3 started", "WorkflowID", we.GetID(), "RunID", we.GetRunID()) + + // wait 2 seconds, there should be one workflow in progress and two waiting workflows in the resource pool + time.Sleep(2 * time.Second) + + // execute the following operations in a separate goroutine to avoid blocking the main process + go func() { + // wait 3 seconds, one of the first or second workflows should have acquired the resource and started processing + time.Sleep(3 * time.Second) + + // try to cancel the resource request of the third workflow + cancelWorkflow, err := c.ExecuteWorkflow(context.Background(), + client.StartWorkflowOptions{ + ID: "CancelResourceRequest_" + uuid.New(), + TaskQueue: "queue-sample", + }, + cancelResourceRequest, resourcePoolWorkflowID, workflow3ID) + if err != nil { + log.Println("failed to cancel resource request workflow", err) + } else { + log.Println("cancel resource request workflow started", cancelWorkflow.GetID()) + } + + // wait 2 seconds and then increase the resource pool size + time.Sleep(2 * time.Second) + updateResourcePoolWorkflow, err := c.ExecuteWorkflow(context.Background(), + client.StartWorkflowOptions{ + ID: "UpdateResourcePool_" + uuid.New(), + TaskQueue: "queue-sample", + }, + updateResourcePoolSize, resourcePoolWorkflowID, 3) // 增加到3个资源 + if err != nil { + log.Println("failed to update resource pool size workflow", err) + } else { + log.Println("update resource pool size workflow started", updateResourcePoolWorkflow.GetID()) + } + + // wait 2 seconds and then start the fourth workflow + time.Sleep(2 * time.Second) + we, err = c.ExecuteWorkflow(context.Background(), workflow4Options, + queue.SampleWorkflowWithResourcePool, resourceID, 4*time.Second, true) + if err != nil { + log.Println("failed to execute workflow4", err) + } else { + log.Println("workflow4 started", "WorkflowID", we.GetID(), "RunID", we.GetRunID()) + } + + // wait 5 seconds and then reduce the resource pool size + time.Sleep(5 * time.Second) + updateResourcePoolWorkflow, err = c.ExecuteWorkflow(context.Background(), + client.StartWorkflowOptions{ + ID: "UpdateResourcePool2_" + uuid.New(), + TaskQueue: "queue-sample", + }, + updateResourcePoolSize, resourcePoolWorkflowID, 1) // reduce to 1 resource + if err != nil { + log.Println("failed to update resource pool size workflow", err) + } else { + log.Println("update resource pool size workflow started", updateResourcePoolWorkflow.GetID()) + } + }() + + log.Println("all workflows have been started, they will compete for the same resource. please check the workflow logs for resource competition details.") + + // main thread waits long enough for all examples to complete + time.Sleep(60 * time.Second) +} + +// runQueryTest 运行资源池查询测试 +func runQueryTest(c client.Client) { + // 生成一个随机的资源ID,此ID可以是业务逻辑标识符 + resourceID := "resource-" + uuid.New() + + // 准备资源池相关信息 + resourcePoolWorkflowID := "resource-pool:ResourcePoolDemo:" + resourceID + + // start multiple workflow instances that will compete for the same resource + log.Println("start resource pool query test, resource ID:", resourceID) + log.Println("resource pool workflow ID:", resourcePoolWorkflowID) + + // start 5 workflows that will compete for the resource + workflowIDs := make([]string, 5) + for i := 0; i < 5; i++ { + // configure workflow + workflowID := fmt.Sprintf("QueryTestWorkflow%d_%s", i+1, uuid.New()) + workflowIDs[i] = workflowID + workflowOptions := client.StartWorkflowOptions{ + ID: workflowID, + TaskQueue: "queue-sample", + } + + // processing time increases with index + processingTime := time.Duration(i+5) * time.Second + + // start workflow + we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, + queue.SampleWorkflowWithResourcePool, resourceID, processingTime, true) + if err != nil { + log.Fatalln("failed to execute workflow", i+1, err) + } + log.Printf("workflow%d started, WorkflowID: %s, RunID: %s", i+1, we.GetID(), we.GetRunID()) + + // wait a short time after starting the workflow to better observe the query results + time.Sleep(500 * time.Millisecond) + } + + // wait 2 seconds, there should be one workflow in progress and four waiting workflows in the resource pool + time.Sleep(2 * time.Second) + + // execute first query - initial state, default resource pool size is 1 + queryAndPrintResourcePool(c, resourcePoolWorkflowID, "initial state") + + // update resource pool size to 3 + updatePoolWorkflow, err := c.ExecuteWorkflow(context.Background(), + client.StartWorkflowOptions{ + ID: "UpdateResourcePoolForQuery_" + uuid.New(), + TaskQueue: "queue-sample", + }, + updateResourcePoolSize, resourcePoolWorkflowID, 3) + if err != nil { + log.Println("failed to update resource pool size workflow", err) + } else { + log.Println("update resource pool size workflow started", updatePoolWorkflow.GetID()) + } + + // wait 3 seconds for the resource pool to update + time.Sleep(3 * time.Second) + + // execute second query - resource pool size is 3 + queryAndPrintResourcePool(c, resourcePoolWorkflowID, "expanded to 3 resources") + + // try to cancel the resource request of the fifth workflow + cancelWorkflow, err := c.ExecuteWorkflow(context.Background(), + client.StartWorkflowOptions{ + ID: "CancelResourceRequestForQuery_" + uuid.New(), + TaskQueue: "queue-sample", + }, + cancelResourceRequest, resourcePoolWorkflowID, workflowIDs[4]) + if err != nil { + log.Println("failed to cancel resource request workflow", err) + } else { + log.Println("cancel resource request workflow started", cancelWorkflow.GetID()) + } + + // wait 2 seconds + time.Sleep(2 * time.Second) + + // execute third query - after canceling one request + queryAndPrintResourcePool(c, resourcePoolWorkflowID, "after canceling one request") + + // wait a while for some workflows to complete + time.Sleep(10 * time.Second) + + // execute fourth query - after some workflows have completed + queryAndPrintResourcePool(c, resourcePoolWorkflowID, "after some workflows have completed") + + // update resource pool size to 1 + updatePoolWorkflow, err = c.ExecuteWorkflow(context.Background(), + client.StartWorkflowOptions{ + ID: "ShrinkResourcePoolForQuery_" + uuid.New(), + TaskQueue: "queue-sample", + }, + updateResourcePoolSize, resourcePoolWorkflowID, 1) + if err != nil { + log.Println("failed to update resource pool size workflow", err) + } else { + log.Println("update resource pool size workflow started", updatePoolWorkflow.GetID()) + } + + // wait 3 seconds for the resource pool to update + time.Sleep(3 * time.Second) + + // execute fifth query - after shrinking back to 1 resource + queryAndPrintResourcePool(c, resourcePoolWorkflowID, "after shrinking back to 1 resource") + + // main thread waits long enough for all workflows to complete + log.Println("waiting for all workflows to complete...") + time.Sleep(30 * time.Second) + + // execute last query - after all workflows have completed + queryAndPrintResourcePool(c, resourcePoolWorkflowID, "after all workflows have completed") + + log.Println("query test completed!") +} + +// runPoolInspectorTest 运行资源池内部状态检查测试 +func runPoolInspectorTest(c client.Client, poolID string) { + ctx := context.Background() + + log.Printf("start checking the internal state of resource pool %s\n", poolID) + + // query resource pool basic status + statusResp, err := c.QueryWorkflow(ctx, poolID, "", queue.GetResourcePoolStatusQuery) + if err != nil { + log.Fatalf("failed to query resource pool status: %v", err) + } + + var status queue.ResourcePoolStatus + if err := statusResp.Get(&status); err != nil { + log.Fatalf("failed to parse resource pool status: %v", err) + } + + // print resource pool basic information + log.Println("==== resource pool basic information ====") + log.Printf("resource ID: %s\n", status.ResourceID) + log.Printf("total resources: %d\n", status.TotalResources) + log.Printf("available resources: %d\n", status.AvailableCount) + log.Printf("allocated resources: %d\n", status.AllocatedCount) + log.Printf("waiting queue length: %d\n", status.WaitingCount) + + // query resource pool detailed allocation details + allocationResp, err := c.QueryWorkflow(ctx, poolID, "", queue.GetResourceAllocationQuery) + if err != nil { + log.Fatalf("failed to query resource allocation details: %v", err) + } + + var allocation queue.ResourceAllocation + if err := allocationResp.Get(&allocation); err != nil { + log.Fatalf("failed to parse resource allocation details: %v", err) + } + + // print resource pool internal state JSON + allocationJSON, err := json.MarshalIndent(allocation, "", " ") + if err != nil { + log.Fatalf("failed to format JSON: %v", err) + } + log.Println("\n==== resource pool internal state (JSON) ====") + log.Println(string(allocationJSON)) + + // print resource details table + log.Println("\n==== resource details ====") + log.Printf("%-20s %-10s %-10s %-30s %-25s\n", "resource ID", "index", "status", "assigned to", "acquired time") + log.Println(strings.Repeat("-", 100)) + + for _, res := range allocation.Resources { + status := "available" + assignedTo := "-" + acquiredTime := "-" + + if !res.Available { + status = "allocated" + assignedTo = res.AssignedTo + acquiredTime = res.AcquiredTime.Format("2006-01-02 15:04:05") + } + + log.Printf("%-20s %-10d %-10s %-30s %-25s\n", + res.ResourceID, + res.ResourceIndex, + status, + assignedTo, + acquiredTime) + } + + // print waiting queue + if len(allocation.WaitingQueue) > 0 { + log.Println("\n==== waiting queue ====") + log.Printf("%-5s %-30s %-10s\n", "position", "workflow ID", "priority") + log.Println(strings.Repeat("-", 50)) + + for _, req := range allocation.WaitingQueue { + log.Printf("%-5d %-30s %-10d\n", + req.QueuePosition, + req.WorkflowID, + req.Priority) + } + } else { + log.Println("\nwaiting queue is empty") + } + + // start monitoring resource pool changes (update every 2 seconds, press Ctrl+C to stop) + log.Println("\n==== start monitoring resource pool changes (update every 2 seconds, press Ctrl+C to stop) ====") + log.Println("time total resources available allocated waiting queue") + log.Println(strings.Repeat("-", 70)) + + startTime := time.Now() + for time.Since(startTime) < 60*time.Second { + // query resource pool status + statusResp, err := c.QueryWorkflow(ctx, poolID, "", queue.GetResourcePoolStatusQuery) + if err != nil { + log.Printf("[%s] failed to query: %v\n", time.Now().Format("15:04:05"), err) + time.Sleep(2 * time.Second) + continue + } + + if err := statusResp.Get(&status); err != nil { + log.Printf("[%s] failed to parse: %v\n", time.Now().Format("15:04:05"), err) + time.Sleep(2 * time.Second) + continue + } + + log.Printf("%-23s %-8d %-8d %-8d %-8d\n", + time.Now().Format("2006-01-02 15:04:05"), + status.TotalResources, + status.AvailableCount, + status.AllocatedCount, + status.WaitingCount) + + time.Sleep(2 * time.Second) + } +} + +// queryAndPrintResourcePool 查询资源池并打印结果 +func queryAndPrintResourcePool(c client.Client, resourcePoolWorkflowID string, stage string) { + ctx := context.Background() + + log.Printf("\n===== query stage: %s =====\n", stage) + + // query resource pool status + statusResp, err := c.QueryWorkflow(ctx, resourcePoolWorkflowID, "", queue.GetResourcePoolStatusQuery) + if err != nil { + log.Printf("failed to query resource pool status: %v\n", err) + return + } + + var status queue.ResourcePoolStatus + if err := statusResp.Get(&status); err != nil { + log.Printf("failed to parse resource pool status: %v\n", err) + return + } + + // print basic status information + log.Println("resource pool status:") + log.Printf(" resource ID: %s\n", status.ResourceID) + log.Printf(" total resources: %d\n", status.TotalResources) + log.Printf(" available resources: %d\n", status.AvailableCount) + log.Printf(" allocated resources: %d\n", status.AllocatedCount) + log.Printf(" waiting queue length: %d\n", status.WaitingCount) + + // query resource allocation details + allocationResp, err := c.QueryWorkflow(ctx, resourcePoolWorkflowID, "", queue.GetResourceAllocationQuery) + if err != nil { + log.Printf("failed to query resource allocation details: %v\n", err) + return + } + + var allocation queue.ResourceAllocation + if err := allocationResp.Get(&allocation); err != nil { + log.Printf("failed to parse resource allocation details: %v\n", err) + return + } + + // print resource details + log.Println("\nresource details:") + for i, res := range allocation.Resources { + log.Printf(" resource #%d:\n", i+1) + log.Printf(" ID: %s\n", res.ResourceID) + log.Printf(" index: %d\n", res.ResourceIndex) + log.Printf(" available: %v\n", res.Available) + + // if the resource is allocated, display the allocation details + if !res.Available { + log.Printf(" assigned to: %s\n", res.AssignedTo) + log.Printf(" acquired time: %s\n", res.AcquiredTime.Format("2006-01-02 15:04:05")) + } + + // display metadata + if len(res.Metadata) > 0 { + metadataJSON, _ := json.MarshalIndent(res.Metadata, " ", " ") + log.Printf(" metadata: %s\n", string(metadataJSON)) + } + } + + // if there is a waiting queue, display the waiting queue information + if len(allocation.WaitingQueue) > 0 { + log.Println("\nwaiting queue:") + for i, req := range allocation.WaitingQueue { + log.Printf(" request #%d:\n", i+1) + log.Printf(" workflow ID: %s\n", req.WorkflowID) + log.Printf(" priority: %d\n", req.Priority) + log.Printf(" queue position: %d\n", req.QueuePosition) + } + } + + log.Println("") +} + +// cancelResourceRequest 取消资源请求的辅助工作流 +func cancelResourceRequest(ctx workflow.Context, resourcePoolWorkflowID string, targetWorkflowID string) error { + logger := workflow.GetLogger(ctx) + logger.Info("start cancel resource request workflow", + "resourcePoolWorkflowID", resourcePoolWorkflowID, + "targetWorkflowID", targetWorkflowID) + + // create resource pool workflow execution reference + execution := workflow.Execution{ + ID: resourcePoolWorkflowID, + } + + // send cancel command + err := queue.CancelResourceRequest(ctx, execution, targetWorkflowID) + if err != nil { + logger.Error("failed to cancel resource request", "Error", err) + return err + } + + logger.Info("successfully canceled resource request") + return nil +} + +// updateResourcePoolSize 更新资源池大小的辅助工作流 +func updateResourcePoolSize(ctx workflow.Context, resourcePoolWorkflowID string, newSize int) error { + logger := workflow.GetLogger(ctx) + logger.Info("start update resource pool size workflow", + "resourcePoolWorkflowID", resourcePoolWorkflowID, + "newSize", newSize) + + // create resource pool workflow execution reference + execution := workflow.Execution{ + ID: resourcePoolWorkflowID, + } + + // send update command + err := queue.UpdateResourcePool(ctx, execution, newSize) + if err != nil { + logger.Error("failed to update resource pool size", "Error", err) + return err + } + + logger.Info("successfully updated resource pool size") + return nil +}