From ee15bc41198ef908f5ac31e4201748515429ae7f Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 1 Mar 2026 08:47:31 +0000 Subject: [PATCH 1/5] Initial plan From de73823604f2b9a3a41bfbd5ea5ec3c431f6bd26 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 1 Mar 2026 08:52:59 +0000 Subject: [PATCH 2/5] Initial plan: resolve template expressions in RemoteStep.Execute() Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --- plugin/external/proto/plugin.pb.go | 2 +- plugin/external/proto/plugin_grpc.pb.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/plugin/external/proto/plugin.pb.go b/plugin/external/proto/plugin.pb.go index b34ebd31..15473c7f 100644 --- a/plugin/external/proto/plugin.pb.go +++ b/plugin/external/proto/plugin.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.11 -// protoc v6.33.4 +// protoc v5.28.3 // source: plugin.proto package proto diff --git a/plugin/external/proto/plugin_grpc.pb.go b/plugin/external/proto/plugin_grpc.pb.go index 8bf79585..0c53862c 100644 --- a/plugin/external/proto/plugin_grpc.pb.go +++ b/plugin/external/proto/plugin_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.6.1 -// - protoc v6.33.4 +// - protoc v5.28.3 // source: plugin.proto package proto From 90895869e3b445e369e8d138e6f9149215383497 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 1 Mar 2026 08:58:30 +0000 Subject: [PATCH 3/5] RemoteStep: resolve template expressions in step config before gRPC call Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --- docs/PLUGIN_ARCHITECTURE.md | 3 + docs/PLUGIN_DEVELOPMENT_GUIDE.md | 14 ++- examples/external-plugin/main.go | 2 +- plugin/external/adapter.go | 2 +- plugin/external/proto/plugin.pb.go | 147 ++++++++++++++------------ plugin/external/proto/plugin.proto | 4 + plugin/external/remote_step.go | 17 ++- plugin/external/remote_step_test.go | 156 ++++++++++++++++++++++++++++ plugin/external/sdk/grpc_server.go | 2 +- plugin/external/sdk/interfaces.go | 2 +- 10 files changed, 272 insertions(+), 77 deletions(-) create mode 100644 plugin/external/remote_step_test.go diff --git a/docs/PLUGIN_ARCHITECTURE.md b/docs/PLUGIN_ARCHITECTURE.md index 5c94b26a..4b0b624e 100644 --- a/docs/PLUGIN_ARCHITECTURE.md +++ b/docs/PLUGIN_ARCHITECTURE.md @@ -358,6 +358,9 @@ message ExecuteStepRequest { map step_outputs = 3; google.protobuf.Struct current = 4; google.protobuf.Struct metadata = 5; + // config carries the step's config with template expressions resolved + // against the current pipeline context. + google.protobuf.Struct config = 6; } // Step execution response diff --git a/docs/PLUGIN_DEVELOPMENT_GUIDE.md b/docs/PLUGIN_DEVELOPMENT_GUIDE.md index e060af39..8682ff6d 100644 --- a/docs/PLUGIN_DEVELOPMENT_GUIDE.md +++ b/docs/PLUGIN_DEVELOPMENT_GUIDE.md @@ -616,15 +616,19 @@ When a workflow step is backed by an external plugin, the execution flow is: ``` 1. Pipeline engine calls step.Execute(ctx, pipelineContext) -2. RemoteStep converts PipelineContext to protobuf: +2. RemoteStep resolves template expressions in step config against the current + pipeline context (e.g. {{ index .steps "fetch-user" "row" "id" }} becomes + the actual value from a previous step's output). +3. RemoteStep converts PipelineContext to protobuf: - TriggerData -> google.protobuf.Struct - StepOutputs -> map - Current -> google.protobuf.Struct - Metadata -> google.protobuf.Struct -3. gRPC call: ExecuteStep(ExecuteStepRequest) -> ExecuteStepResponse -4. Plugin deserializes, runs logic, returns output -5. RemoteStep converts response back to StepResult{Output, Stop} -6. Pipeline engine merges output into Current state + - Config -> google.protobuf.Struct (resolved step config) +4. gRPC call: ExecuteStep(ExecuteStepRequest) -> ExecuteStepResponse +5. Plugin deserializes, runs logic using resolved config, returns output +6. RemoteStep converts response back to StepResult{Output, Stop} +7. Pipeline engine merges output into Current state ``` All complex data structures are serialized through `google.protobuf.Struct`, which represents JSON-compatible maps. This means plugin data must be expressible as JSON (strings, numbers, booleans, arrays, nested objects). Binary data should be base64-encoded. diff --git a/examples/external-plugin/main.go b/examples/external-plugin/main.go index e38c08e4..28d17d84 100644 --- a/examples/external-plugin/main.go +++ b/examples/external-plugin/main.go @@ -38,7 +38,7 @@ type uppercaseStep struct { name string } -func (s *uppercaseStep) Execute(_ context.Context, _ map[string]any, _ map[string]map[string]any, current map[string]any, _ map[string]any) (*sdk.StepResult, error) { +func (s *uppercaseStep) Execute(_ context.Context, _ map[string]any, _ map[string]map[string]any, current map[string]any, _ map[string]any, _ map[string]any) (*sdk.StepResult, error) { input, _ := current["input"].(string) return &sdk.StepResult{ Output: map[string]any{ diff --git a/plugin/external/adapter.go b/plugin/external/adapter.go index 7380f1fc..36dd81d9 100644 --- a/plugin/external/adapter.go +++ b/plugin/external/adapter.go @@ -165,7 +165,7 @@ func (a *ExternalPluginAdapter) StepFactories() map[string]plugin.StepFactory { if createResp.Error != "" { return nil, fmt.Errorf("create remote step %s: %s", tn, createResp.Error) } - return NewRemoteStep(name, createResp.HandleId, a.client.client), nil + return NewRemoteStep(name, createResp.HandleId, a.client.client, cfg), nil } } return factories diff --git a/plugin/external/proto/plugin.pb.go b/plugin/external/proto/plugin.pb.go index 15473c7f..8239823b 100644 --- a/plugin/external/proto/plugin.pb.go +++ b/plugin/external/proto/plugin.pb.go @@ -810,12 +810,16 @@ func (x *ErrorResponse) GetError() string { // ExecuteStepRequest runs a step with pipeline context data. type ExecuteStepRequest struct { - state protoimpl.MessageState `protogen:"open.v1"` - HandleId string `protobuf:"bytes,1,opt,name=handle_id,json=handleId,proto3" json:"handle_id,omitempty"` - TriggerData *structpb.Struct `protobuf:"bytes,2,opt,name=trigger_data,json=triggerData,proto3" json:"trigger_data,omitempty"` - StepOutputs map[string]*structpb.Struct `protobuf:"bytes,3,rep,name=step_outputs,json=stepOutputs,proto3" json:"step_outputs,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` - Current *structpb.Struct `protobuf:"bytes,4,opt,name=current,proto3" json:"current,omitempty"` - Metadata *structpb.Struct `protobuf:"bytes,5,opt,name=metadata,proto3" json:"metadata,omitempty"` + state protoimpl.MessageState `protogen:"open.v1"` + HandleId string `protobuf:"bytes,1,opt,name=handle_id,json=handleId,proto3" json:"handle_id,omitempty"` + TriggerData *structpb.Struct `protobuf:"bytes,2,opt,name=trigger_data,json=triggerData,proto3" json:"trigger_data,omitempty"` + StepOutputs map[string]*structpb.Struct `protobuf:"bytes,3,rep,name=step_outputs,json=stepOutputs,proto3" json:"step_outputs,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` + Current *structpb.Struct `protobuf:"bytes,4,opt,name=current,proto3" json:"current,omitempty"` + Metadata *structpb.Struct `protobuf:"bytes,5,opt,name=metadata,proto3" json:"metadata,omitempty"` + // config carries the step's config with template expressions resolved against + // the current pipeline context. Plugin steps should prefer this over any + // config stored at CreateStep time so that dynamic values are honoured. + Config *structpb.Struct `protobuf:"bytes,6,opt,name=config,proto3" json:"config,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -885,6 +889,13 @@ func (x *ExecuteStepRequest) GetMetadata() *structpb.Struct { return nil } +func (x *ExecuteStepRequest) GetConfig() *structpb.Struct { + if x != nil { + return x.Config + } + return nil +} + // ExecuteStepResponse returns the step result. type ExecuteStepResponse struct { state protoimpl.MessageState `protogen:"open.v1"` @@ -1746,13 +1757,14 @@ const file_plugin_proto_rawDesc = "" + "\rHandleRequest\x12\x1b\n" + "\thandle_id\x18\x01 \x01(\tR\bhandleId\"%\n" + "\rErrorResponse\x12\x14\n" + - "\x05error\x18\x01 \x01(\tR\x05error\"\x8a\x03\n" + + "\x05error\x18\x01 \x01(\tR\x05error\"\xbb\x03\n" + "\x12ExecuteStepRequest\x12\x1b\n" + "\thandle_id\x18\x01 \x01(\tR\bhandleId\x12:\n" + "\ftrigger_data\x18\x02 \x01(\v2\x17.google.protobuf.StructR\vtriggerData\x12Z\n" + "\fstep_outputs\x18\x03 \x03(\v27.workflow.plugin.v1.ExecuteStepRequest.StepOutputsEntryR\vstepOutputs\x121\n" + "\acurrent\x18\x04 \x01(\v2\x17.google.protobuf.StructR\acurrent\x123\n" + - "\bmetadata\x18\x05 \x01(\v2\x17.google.protobuf.StructR\bmetadata\x1aW\n" + + "\bmetadata\x18\x05 \x01(\v2\x17.google.protobuf.StructR\bmetadata\x12/\n" + + "\x06config\x18\x06 \x01(\v2\x17.google.protobuf.StructR\x06config\x1aW\n" + "\x10StepOutputsEntry\x12\x10\n" + "\x03key\x18\x01 \x01(\tR\x03key\x12-\n" + "\x05value\x18\x02 \x01(\v2\x17.google.protobuf.StructR\x05value:\x028\x01\"\x81\x01\n" + @@ -1907,65 +1919,66 @@ var file_plugin_proto_depIdxs = []int32{ 28, // 7: workflow.plugin.v1.ExecuteStepRequest.step_outputs:type_name -> workflow.plugin.v1.ExecuteStepRequest.StepOutputsEntry 31, // 8: workflow.plugin.v1.ExecuteStepRequest.current:type_name -> google.protobuf.Struct 31, // 9: workflow.plugin.v1.ExecuteStepRequest.metadata:type_name -> google.protobuf.Struct - 31, // 10: workflow.plugin.v1.ExecuteStepResponse.output:type_name -> google.protobuf.Struct - 31, // 11: workflow.plugin.v1.InvokeServiceRequest.args:type_name -> google.protobuf.Struct - 31, // 12: workflow.plugin.v1.InvokeServiceResponse.result:type_name -> google.protobuf.Struct - 31, // 13: workflow.plugin.v1.TriggerWorkflowRequest.data:type_name -> google.protobuf.Struct - 31, // 14: workflow.plugin.v1.LogRequest.fields:type_name -> google.protobuf.Struct - 29, // 15: workflow.plugin.v1.PublishMessageRequest.metadata:type_name -> workflow.plugin.v1.PublishMessageRequest.MetadataEntry - 30, // 16: workflow.plugin.v1.DeliverMessageRequest.metadata:type_name -> workflow.plugin.v1.DeliverMessageRequest.MetadataEntry - 31, // 17: workflow.plugin.v1.ExecuteStepRequest.StepOutputsEntry.value:type_name -> google.protobuf.Struct - 32, // 18: workflow.plugin.v1.PluginService.GetManifest:input_type -> google.protobuf.Empty - 32, // 19: workflow.plugin.v1.PluginService.GetModuleTypes:input_type -> google.protobuf.Empty - 32, // 20: workflow.plugin.v1.PluginService.GetStepTypes:input_type -> google.protobuf.Empty - 32, // 21: workflow.plugin.v1.PluginService.GetTriggerTypes:input_type -> google.protobuf.Empty - 32, // 22: workflow.plugin.v1.PluginService.GetModuleSchemas:input_type -> google.protobuf.Empty - 8, // 23: workflow.plugin.v1.PluginService.CreateModule:input_type -> workflow.plugin.v1.CreateModuleRequest - 11, // 24: workflow.plugin.v1.PluginService.InitModule:input_type -> workflow.plugin.v1.HandleRequest - 11, // 25: workflow.plugin.v1.PluginService.StartModule:input_type -> workflow.plugin.v1.HandleRequest - 11, // 26: workflow.plugin.v1.PluginService.StopModule:input_type -> workflow.plugin.v1.HandleRequest - 11, // 27: workflow.plugin.v1.PluginService.DestroyModule:input_type -> workflow.plugin.v1.HandleRequest - 9, // 28: workflow.plugin.v1.PluginService.CreateStep:input_type -> workflow.plugin.v1.CreateStepRequest - 13, // 29: workflow.plugin.v1.PluginService.ExecuteStep:input_type -> workflow.plugin.v1.ExecuteStepRequest - 11, // 30: workflow.plugin.v1.PluginService.DestroyStep:input_type -> workflow.plugin.v1.HandleRequest - 15, // 31: workflow.plugin.v1.PluginService.InvokeService:input_type -> workflow.plugin.v1.InvokeServiceRequest - 25, // 32: workflow.plugin.v1.PluginService.DeliverMessage:input_type -> workflow.plugin.v1.DeliverMessageRequest - 32, // 33: workflow.plugin.v1.PluginService.GetConfigFragment:input_type -> google.protobuf.Empty - 1, // 34: workflow.plugin.v1.PluginService.GetAsset:input_type -> workflow.plugin.v1.GetAssetRequest - 17, // 35: workflow.plugin.v1.EngineCallbackService.TriggerWorkflow:input_type -> workflow.plugin.v1.TriggerWorkflowRequest - 18, // 36: workflow.plugin.v1.EngineCallbackService.GetService:input_type -> workflow.plugin.v1.GetServiceRequest - 20, // 37: workflow.plugin.v1.EngineCallbackService.Log:input_type -> workflow.plugin.v1.LogRequest - 21, // 38: workflow.plugin.v1.EngineCallbackService.PublishMessage:input_type -> workflow.plugin.v1.PublishMessageRequest - 23, // 39: workflow.plugin.v1.EngineCallbackService.Subscribe:input_type -> workflow.plugin.v1.SubscribeRequest - 24, // 40: workflow.plugin.v1.EngineCallbackService.Unsubscribe:input_type -> workflow.plugin.v1.UnsubscribeRequest - 0, // 41: workflow.plugin.v1.PluginService.GetManifest:output_type -> workflow.plugin.v1.Manifest - 3, // 42: workflow.plugin.v1.PluginService.GetModuleTypes:output_type -> workflow.plugin.v1.TypeList - 3, // 43: workflow.plugin.v1.PluginService.GetStepTypes:output_type -> workflow.plugin.v1.TypeList - 3, // 44: workflow.plugin.v1.PluginService.GetTriggerTypes:output_type -> workflow.plugin.v1.TypeList - 4, // 45: workflow.plugin.v1.PluginService.GetModuleSchemas:output_type -> workflow.plugin.v1.ModuleSchemaList - 10, // 46: workflow.plugin.v1.PluginService.CreateModule:output_type -> workflow.plugin.v1.HandleResponse - 12, // 47: workflow.plugin.v1.PluginService.InitModule:output_type -> workflow.plugin.v1.ErrorResponse - 12, // 48: workflow.plugin.v1.PluginService.StartModule:output_type -> workflow.plugin.v1.ErrorResponse - 12, // 49: workflow.plugin.v1.PluginService.StopModule:output_type -> workflow.plugin.v1.ErrorResponse - 12, // 50: workflow.plugin.v1.PluginService.DestroyModule:output_type -> workflow.plugin.v1.ErrorResponse - 10, // 51: workflow.plugin.v1.PluginService.CreateStep:output_type -> workflow.plugin.v1.HandleResponse - 14, // 52: workflow.plugin.v1.PluginService.ExecuteStep:output_type -> workflow.plugin.v1.ExecuteStepResponse - 12, // 53: workflow.plugin.v1.PluginService.DestroyStep:output_type -> workflow.plugin.v1.ErrorResponse - 16, // 54: workflow.plugin.v1.PluginService.InvokeService:output_type -> workflow.plugin.v1.InvokeServiceResponse - 26, // 55: workflow.plugin.v1.PluginService.DeliverMessage:output_type -> workflow.plugin.v1.DeliverMessageResponse - 27, // 56: workflow.plugin.v1.PluginService.GetConfigFragment:output_type -> workflow.plugin.v1.ConfigFragmentResponse - 2, // 57: workflow.plugin.v1.PluginService.GetAsset:output_type -> workflow.plugin.v1.GetAssetResponse - 12, // 58: workflow.plugin.v1.EngineCallbackService.TriggerWorkflow:output_type -> workflow.plugin.v1.ErrorResponse - 19, // 59: workflow.plugin.v1.EngineCallbackService.GetService:output_type -> workflow.plugin.v1.GetServiceResponse - 32, // 60: workflow.plugin.v1.EngineCallbackService.Log:output_type -> google.protobuf.Empty - 22, // 61: workflow.plugin.v1.EngineCallbackService.PublishMessage:output_type -> workflow.plugin.v1.PublishMessageResponse - 12, // 62: workflow.plugin.v1.EngineCallbackService.Subscribe:output_type -> workflow.plugin.v1.ErrorResponse - 12, // 63: workflow.plugin.v1.EngineCallbackService.Unsubscribe:output_type -> workflow.plugin.v1.ErrorResponse - 41, // [41:64] is the sub-list for method output_type - 18, // [18:41] is the sub-list for method input_type - 18, // [18:18] is the sub-list for extension type_name - 18, // [18:18] is the sub-list for extension extendee - 0, // [0:18] is the sub-list for field type_name + 31, // 10: workflow.plugin.v1.ExecuteStepRequest.config:type_name -> google.protobuf.Struct + 31, // 11: workflow.plugin.v1.ExecuteStepResponse.output:type_name -> google.protobuf.Struct + 31, // 12: workflow.plugin.v1.InvokeServiceRequest.args:type_name -> google.protobuf.Struct + 31, // 13: workflow.plugin.v1.InvokeServiceResponse.result:type_name -> google.protobuf.Struct + 31, // 14: workflow.plugin.v1.TriggerWorkflowRequest.data:type_name -> google.protobuf.Struct + 31, // 15: workflow.plugin.v1.LogRequest.fields:type_name -> google.protobuf.Struct + 29, // 16: workflow.plugin.v1.PublishMessageRequest.metadata:type_name -> workflow.plugin.v1.PublishMessageRequest.MetadataEntry + 30, // 17: workflow.plugin.v1.DeliverMessageRequest.metadata:type_name -> workflow.plugin.v1.DeliverMessageRequest.MetadataEntry + 31, // 18: workflow.plugin.v1.ExecuteStepRequest.StepOutputsEntry.value:type_name -> google.protobuf.Struct + 32, // 19: workflow.plugin.v1.PluginService.GetManifest:input_type -> google.protobuf.Empty + 32, // 20: workflow.plugin.v1.PluginService.GetModuleTypes:input_type -> google.protobuf.Empty + 32, // 21: workflow.plugin.v1.PluginService.GetStepTypes:input_type -> google.protobuf.Empty + 32, // 22: workflow.plugin.v1.PluginService.GetTriggerTypes:input_type -> google.protobuf.Empty + 32, // 23: workflow.plugin.v1.PluginService.GetModuleSchemas:input_type -> google.protobuf.Empty + 8, // 24: workflow.plugin.v1.PluginService.CreateModule:input_type -> workflow.plugin.v1.CreateModuleRequest + 11, // 25: workflow.plugin.v1.PluginService.InitModule:input_type -> workflow.plugin.v1.HandleRequest + 11, // 26: workflow.plugin.v1.PluginService.StartModule:input_type -> workflow.plugin.v1.HandleRequest + 11, // 27: workflow.plugin.v1.PluginService.StopModule:input_type -> workflow.plugin.v1.HandleRequest + 11, // 28: workflow.plugin.v1.PluginService.DestroyModule:input_type -> workflow.plugin.v1.HandleRequest + 9, // 29: workflow.plugin.v1.PluginService.CreateStep:input_type -> workflow.plugin.v1.CreateStepRequest + 13, // 30: workflow.plugin.v1.PluginService.ExecuteStep:input_type -> workflow.plugin.v1.ExecuteStepRequest + 11, // 31: workflow.plugin.v1.PluginService.DestroyStep:input_type -> workflow.plugin.v1.HandleRequest + 15, // 32: workflow.plugin.v1.PluginService.InvokeService:input_type -> workflow.plugin.v1.InvokeServiceRequest + 25, // 33: workflow.plugin.v1.PluginService.DeliverMessage:input_type -> workflow.plugin.v1.DeliverMessageRequest + 32, // 34: workflow.plugin.v1.PluginService.GetConfigFragment:input_type -> google.protobuf.Empty + 1, // 35: workflow.plugin.v1.PluginService.GetAsset:input_type -> workflow.plugin.v1.GetAssetRequest + 17, // 36: workflow.plugin.v1.EngineCallbackService.TriggerWorkflow:input_type -> workflow.plugin.v1.TriggerWorkflowRequest + 18, // 37: workflow.plugin.v1.EngineCallbackService.GetService:input_type -> workflow.plugin.v1.GetServiceRequest + 20, // 38: workflow.plugin.v1.EngineCallbackService.Log:input_type -> workflow.plugin.v1.LogRequest + 21, // 39: workflow.plugin.v1.EngineCallbackService.PublishMessage:input_type -> workflow.plugin.v1.PublishMessageRequest + 23, // 40: workflow.plugin.v1.EngineCallbackService.Subscribe:input_type -> workflow.plugin.v1.SubscribeRequest + 24, // 41: workflow.plugin.v1.EngineCallbackService.Unsubscribe:input_type -> workflow.plugin.v1.UnsubscribeRequest + 0, // 42: workflow.plugin.v1.PluginService.GetManifest:output_type -> workflow.plugin.v1.Manifest + 3, // 43: workflow.plugin.v1.PluginService.GetModuleTypes:output_type -> workflow.plugin.v1.TypeList + 3, // 44: workflow.plugin.v1.PluginService.GetStepTypes:output_type -> workflow.plugin.v1.TypeList + 3, // 45: workflow.plugin.v1.PluginService.GetTriggerTypes:output_type -> workflow.plugin.v1.TypeList + 4, // 46: workflow.plugin.v1.PluginService.GetModuleSchemas:output_type -> workflow.plugin.v1.ModuleSchemaList + 10, // 47: workflow.plugin.v1.PluginService.CreateModule:output_type -> workflow.plugin.v1.HandleResponse + 12, // 48: workflow.plugin.v1.PluginService.InitModule:output_type -> workflow.plugin.v1.ErrorResponse + 12, // 49: workflow.plugin.v1.PluginService.StartModule:output_type -> workflow.plugin.v1.ErrorResponse + 12, // 50: workflow.plugin.v1.PluginService.StopModule:output_type -> workflow.plugin.v1.ErrorResponse + 12, // 51: workflow.plugin.v1.PluginService.DestroyModule:output_type -> workflow.plugin.v1.ErrorResponse + 10, // 52: workflow.plugin.v1.PluginService.CreateStep:output_type -> workflow.plugin.v1.HandleResponse + 14, // 53: workflow.plugin.v1.PluginService.ExecuteStep:output_type -> workflow.plugin.v1.ExecuteStepResponse + 12, // 54: workflow.plugin.v1.PluginService.DestroyStep:output_type -> workflow.plugin.v1.ErrorResponse + 16, // 55: workflow.plugin.v1.PluginService.InvokeService:output_type -> workflow.plugin.v1.InvokeServiceResponse + 26, // 56: workflow.plugin.v1.PluginService.DeliverMessage:output_type -> workflow.plugin.v1.DeliverMessageResponse + 27, // 57: workflow.plugin.v1.PluginService.GetConfigFragment:output_type -> workflow.plugin.v1.ConfigFragmentResponse + 2, // 58: workflow.plugin.v1.PluginService.GetAsset:output_type -> workflow.plugin.v1.GetAssetResponse + 12, // 59: workflow.plugin.v1.EngineCallbackService.TriggerWorkflow:output_type -> workflow.plugin.v1.ErrorResponse + 19, // 60: workflow.plugin.v1.EngineCallbackService.GetService:output_type -> workflow.plugin.v1.GetServiceResponse + 32, // 61: workflow.plugin.v1.EngineCallbackService.Log:output_type -> google.protobuf.Empty + 22, // 62: workflow.plugin.v1.EngineCallbackService.PublishMessage:output_type -> workflow.plugin.v1.PublishMessageResponse + 12, // 63: workflow.plugin.v1.EngineCallbackService.Subscribe:output_type -> workflow.plugin.v1.ErrorResponse + 12, // 64: workflow.plugin.v1.EngineCallbackService.Unsubscribe:output_type -> workflow.plugin.v1.ErrorResponse + 42, // [42:65] is the sub-list for method output_type + 19, // [19:42] is the sub-list for method input_type + 19, // [19:19] is the sub-list for extension type_name + 19, // [19:19] is the sub-list for extension extendee + 0, // [0:19] is the sub-list for field type_name } func init() { file_plugin_proto_init() } diff --git a/plugin/external/proto/plugin.proto b/plugin/external/proto/plugin.proto index 83f780ac..c6a06893 100644 --- a/plugin/external/proto/plugin.proto +++ b/plugin/external/proto/plugin.proto @@ -179,6 +179,10 @@ message ExecuteStepRequest { map step_outputs = 3; google.protobuf.Struct current = 4; google.protobuf.Struct metadata = 5; + // config carries the step's config with template expressions resolved against + // the current pipeline context. Plugin steps should prefer this over any + // config stored at CreateStep time so that dynamic values are honoured. + google.protobuf.Struct config = 6; } // ExecuteStepResponse returns the step result. diff --git a/plugin/external/remote_step.go b/plugin/external/remote_step.go index 6802b49e..34844982 100644 --- a/plugin/external/remote_step.go +++ b/plugin/external/remote_step.go @@ -13,15 +13,21 @@ import ( type RemoteStep struct { name string handleID string + config map[string]any client pb.PluginServiceClient + tmpl *module.TemplateEngine } // NewRemoteStep creates a remote step proxy. -func NewRemoteStep(name, handleID string, client pb.PluginServiceClient) *RemoteStep { +// config holds the raw (possibly template-containing) step configuration that +// will be resolved against the live pipeline context on each Execute call. +func NewRemoteStep(name, handleID string, client pb.PluginServiceClient, config map[string]any) *RemoteStep { return &RemoteStep{ name: name, handleID: handleID, + config: config, client: client, + tmpl: module.NewTemplateEngine(), } } @@ -30,6 +36,14 @@ func (s *RemoteStep) Name() string { } func (s *RemoteStep) Execute(ctx context.Context, pc *module.PipelineContext) (*module.StepResult, error) { + // Resolve template expressions in the step config against the current + // pipeline context so that dynamic values (e.g. outputs of earlier steps) + // are available to the plugin. + resolvedConfig, err := s.tmpl.ResolveMap(s.config, pc) + if err != nil { + return nil, fmt.Errorf("remote step config resolve: %w", err) + } + // Convert step outputs to proto map stepOutputs := make(map[string]*structpb.Struct) for k, v := range pc.StepOutputs { @@ -42,6 +56,7 @@ func (s *RemoteStep) Execute(ctx context.Context, pc *module.PipelineContext) (* StepOutputs: stepOutputs, Current: mapToStruct(pc.Current), Metadata: mapToStruct(pc.Metadata), + Config: mapToStruct(resolvedConfig), }) if err != nil { return nil, fmt.Errorf("remote step execute: %w", err) diff --git a/plugin/external/remote_step_test.go b/plugin/external/remote_step_test.go new file mode 100644 index 00000000..a9ce47ad --- /dev/null +++ b/plugin/external/remote_step_test.go @@ -0,0 +1,156 @@ +package external + +import ( + "context" + "testing" + + "github.com/GoCodeAlone/workflow/module" + pb "github.com/GoCodeAlone/workflow/plugin/external/proto" + "google.golang.org/grpc" + "google.golang.org/protobuf/types/known/emptypb" + "google.golang.org/protobuf/types/known/structpb" +) + +// stubPluginServiceClient is a minimal PluginServiceClient that captures +// ExecuteStep requests for assertion in tests. +type stubPluginServiceClient struct { + pb.UnimplementedPluginServiceServer // provides no-op implementations + + lastRequest *pb.ExecuteStepRequest + response *pb.ExecuteStepResponse +} + +// ExecuteStep records the request and returns the configured response. +func (c *stubPluginServiceClient) ExecuteStep(_ context.Context, req *pb.ExecuteStepRequest, _ ...grpc.CallOption) (*pb.ExecuteStepResponse, error) { + c.lastRequest = req + if c.response != nil { + return c.response, nil + } + return &pb.ExecuteStepResponse{Output: &structpb.Struct{}}, nil +} + +// Implement the remaining PluginServiceClient interface methods as no-ops. +func (c *stubPluginServiceClient) GetManifest(_ context.Context, _ *emptypb.Empty, _ ...grpc.CallOption) (*pb.Manifest, error) { + return &pb.Manifest{}, nil +} +func (c *stubPluginServiceClient) GetModuleTypes(_ context.Context, _ *emptypb.Empty, _ ...grpc.CallOption) (*pb.TypeList, error) { + return &pb.TypeList{}, nil +} +func (c *stubPluginServiceClient) GetStepTypes(_ context.Context, _ *emptypb.Empty, _ ...grpc.CallOption) (*pb.TypeList, error) { + return &pb.TypeList{}, nil +} +func (c *stubPluginServiceClient) GetTriggerTypes(_ context.Context, _ *emptypb.Empty, _ ...grpc.CallOption) (*pb.TypeList, error) { + return &pb.TypeList{}, nil +} +func (c *stubPluginServiceClient) GetModuleSchemas(_ context.Context, _ *emptypb.Empty, _ ...grpc.CallOption) (*pb.ModuleSchemaList, error) { + return &pb.ModuleSchemaList{}, nil +} +func (c *stubPluginServiceClient) CreateModule(_ context.Context, _ *pb.CreateModuleRequest, _ ...grpc.CallOption) (*pb.HandleResponse, error) { + return &pb.HandleResponse{}, nil +} +func (c *stubPluginServiceClient) InitModule(_ context.Context, _ *pb.HandleRequest, _ ...grpc.CallOption) (*pb.ErrorResponse, error) { + return &pb.ErrorResponse{}, nil +} +func (c *stubPluginServiceClient) StartModule(_ context.Context, _ *pb.HandleRequest, _ ...grpc.CallOption) (*pb.ErrorResponse, error) { + return &pb.ErrorResponse{}, nil +} +func (c *stubPluginServiceClient) StopModule(_ context.Context, _ *pb.HandleRequest, _ ...grpc.CallOption) (*pb.ErrorResponse, error) { + return &pb.ErrorResponse{}, nil +} +func (c *stubPluginServiceClient) DestroyModule(_ context.Context, _ *pb.HandleRequest, _ ...grpc.CallOption) (*pb.ErrorResponse, error) { + return &pb.ErrorResponse{}, nil +} +func (c *stubPluginServiceClient) CreateStep(_ context.Context, _ *pb.CreateStepRequest, _ ...grpc.CallOption) (*pb.HandleResponse, error) { + return &pb.HandleResponse{}, nil +} +func (c *stubPluginServiceClient) DestroyStep(_ context.Context, _ *pb.HandleRequest, _ ...grpc.CallOption) (*pb.ErrorResponse, error) { + return &pb.ErrorResponse{}, nil +} +func (c *stubPluginServiceClient) InvokeService(_ context.Context, _ *pb.InvokeServiceRequest, _ ...grpc.CallOption) (*pb.InvokeServiceResponse, error) { + return &pb.InvokeServiceResponse{}, nil +} +func (c *stubPluginServiceClient) DeliverMessage(_ context.Context, _ *pb.DeliverMessageRequest, _ ...grpc.CallOption) (*pb.DeliverMessageResponse, error) { + return &pb.DeliverMessageResponse{}, nil +} +func (c *stubPluginServiceClient) GetConfigFragment(_ context.Context, _ *emptypb.Empty, _ ...grpc.CallOption) (*pb.ConfigFragmentResponse, error) { + return &pb.ConfigFragmentResponse{}, nil +} +func (c *stubPluginServiceClient) GetAsset(_ context.Context, _ *pb.GetAssetRequest, _ ...grpc.CallOption) (*pb.GetAssetResponse, error) { + return &pb.GetAssetResponse{}, nil +} + +// TestRemoteStep_Execute_ResolvesTemplatesInConfig verifies that template +// expressions in step config are resolved against the PipelineContext before +// the gRPC ExecuteStep call is made. +func TestRemoteStep_Execute_ResolvesTemplatesInConfig(t *testing.T) { + stub := &stubPluginServiceClient{} + cfg := map[string]any{ + "user_id": `{{ index .steps "fetch-user" "row" "id" }}`, + "static": "plain-value", + } + step := NewRemoteStep("test-step", "handle-1", stub, cfg) + + pc := module.NewPipelineContext(nil, nil) + pc.MergeStepOutput("fetch-user", map[string]any{ + "row": map[string]any{"id": "user-42"}, + }) + + _, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if stub.lastRequest == nil { + t.Fatal("expected ExecuteStep to be called") + } + + sent := stub.lastRequest.Config.AsMap() + + if sent["user_id"] != "user-42" { + t.Errorf("expected config user_id='user-42', got %q", sent["user_id"]) + } + if sent["static"] != "plain-value" { + t.Errorf("expected config static='plain-value', got %q", sent["static"]) + } +} + +// TestRemoteStep_Execute_NilConfig verifies that a step with no config does +// not panic and sends an empty (or nil) config in the request. +func TestRemoteStep_Execute_NilConfig(t *testing.T) { + stub := &stubPluginServiceClient{} + step := NewRemoteStep("test-step", "handle-2", stub, nil) + + pc := module.NewPipelineContext(nil, nil) + + _, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if stub.lastRequest == nil { + t.Fatal("expected ExecuteStep to be called") + } +} + +// TestRemoteStep_Execute_StaticConfigPassthrough verifies that a config with +// no template expressions is passed through unmodified. +func TestRemoteStep_Execute_StaticConfigPassthrough(t *testing.T) { + stub := &stubPluginServiceClient{} + cfg := map[string]any{ + "endpoint": "https://api.example.com", + "timeout": float64(30), + } + step := NewRemoteStep("test-step", "handle-3", stub, cfg) + + _, err := step.Execute(context.Background(), module.NewPipelineContext(nil, nil)) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + sent := stub.lastRequest.Config.AsMap() + if sent["endpoint"] != "https://api.example.com" { + t.Errorf("expected endpoint='https://api.example.com', got %q", sent["endpoint"]) + } + if sent["timeout"] != float64(30) { + t.Errorf("expected timeout=30, got %v", sent["timeout"]) + } +} diff --git a/plugin/external/sdk/grpc_server.go b/plugin/external/sdk/grpc_server.go index cdff51c4..ff5231af 100644 --- a/plugin/external/sdk/grpc_server.go +++ b/plugin/external/sdk/grpc_server.go @@ -332,7 +332,7 @@ func (s *grpcServer) ExecuteStep(ctx context.Context, req *pb.ExecuteStepRequest stepOutputs[k] = structToMap(v) } - result, err := inst.Execute(ctx, structToMap(req.TriggerData), stepOutputs, structToMap(req.Current), structToMap(req.Metadata)) + result, err := inst.Execute(ctx, structToMap(req.TriggerData), stepOutputs, structToMap(req.Current), structToMap(req.Metadata), structToMap(req.Config)) if err != nil { return &pb.ExecuteStepResponse{Error: err.Error()}, nil //nolint:nilerr // app error in response field } diff --git a/plugin/external/sdk/interfaces.go b/plugin/external/sdk/interfaces.go index 1596b2e2..cd8751a5 100644 --- a/plugin/external/sdk/interfaces.go +++ b/plugin/external/sdk/interfaces.go @@ -50,7 +50,7 @@ type StepProvider interface { // StepInstance is a remote pipeline step. type StepInstance interface { - Execute(ctx context.Context, triggerData map[string]any, stepOutputs map[string]map[string]any, current map[string]any, metadata map[string]any) (*StepResult, error) + Execute(ctx context.Context, triggerData map[string]any, stepOutputs map[string]map[string]any, current map[string]any, metadata map[string]any, config map[string]any) (*StepResult, error) } // StepResult is the output of a step execution. From 0e7c4516a32591589ee97f41cc563ea79b6f668f Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 1 Mar 2026 09:02:52 +0000 Subject: [PATCH 4/5] RemoteStep: resolve template expressions in step config before gRPC call Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index db1134f3..2b2c47b8 100644 --- a/go.mod +++ b/go.mod @@ -50,6 +50,7 @@ require ( github.com/redis/go-redis/v9 v9.18.0 github.com/stripe/stripe-go/v82 v82.5.1 github.com/tliron/glsp v0.2.2 + github.com/xdg-go/scram v1.2.0 go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.65.0 go.opentelemetry.io/otel v1.40.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.40.0 @@ -208,7 +209,6 @@ require ( github.com/tliron/commonlog v0.2.8 // indirect github.com/tliron/kutil v0.3.11 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect - github.com/xdg-go/scram v1.2.0 // indirect github.com/xdg-go/stringprep v1.0.4 // indirect github.com/yosida95/uritemplate/v3 v3.0.2 // indirect github.com/yuin/gopher-lua v1.1.1 // indirect From 8673d9eaa35d1876b68342e52640f2c4c2253677 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 1 Mar 2026 10:19:28 +0000 Subject: [PATCH 5/5] remote_step: handle nil config, improve error message, strengthen nil config test Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --- plugin/external/remote_step.go | 13 +++++++++---- plugin/external/remote_step_test.go | 6 +++++- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/plugin/external/remote_step.go b/plugin/external/remote_step.go index 34844982..6f977233 100644 --- a/plugin/external/remote_step.go +++ b/plugin/external/remote_step.go @@ -38,10 +38,15 @@ func (s *RemoteStep) Name() string { func (s *RemoteStep) Execute(ctx context.Context, pc *module.PipelineContext) (*module.StepResult, error) { // Resolve template expressions in the step config against the current // pipeline context so that dynamic values (e.g. outputs of earlier steps) - // are available to the plugin. - resolvedConfig, err := s.tmpl.ResolveMap(s.config, pc) - if err != nil { - return nil, fmt.Errorf("remote step config resolve: %w", err) + // are available to the plugin. When no config was provided, skip resolution + // and leave resolvedConfig nil so the Config proto field is omitted. + var resolvedConfig map[string]any + if s.config != nil { + var err error + resolvedConfig, err = s.tmpl.ResolveMap(s.config, pc) + if err != nil { + return nil, fmt.Errorf("remote step %q (handle %s) config resolve: %w", s.name, s.handleID, err) + } } // Convert step outputs to proto map diff --git a/plugin/external/remote_step_test.go b/plugin/external/remote_step_test.go index a9ce47ad..d3e17197 100644 --- a/plugin/external/remote_step_test.go +++ b/plugin/external/remote_step_test.go @@ -115,7 +115,8 @@ func TestRemoteStep_Execute_ResolvesTemplatesInConfig(t *testing.T) { } // TestRemoteStep_Execute_NilConfig verifies that a step with no config does -// not panic and sends an empty (or nil) config in the request. +// not panic and sends a nil Config in the request (so the plugin receives nil, +// not an empty map). func TestRemoteStep_Execute_NilConfig(t *testing.T) { stub := &stubPluginServiceClient{} step := NewRemoteStep("test-step", "handle-2", stub, nil) @@ -129,6 +130,9 @@ func TestRemoteStep_Execute_NilConfig(t *testing.T) { if stub.lastRequest == nil { t.Fatal("expected ExecuteStep to be called") } + if stub.lastRequest.Config != nil { + t.Errorf("expected nil Config for step with no config, got %v", stub.lastRequest.Config) + } } // TestRemoteStep_Execute_StaticConfigPassthrough verifies that a config with