Skip to content

Commit 0915b86

Browse files
Copilotintel352
andauthored
RemoteStep: resolve template expressions in step config before gRPC call (#217)
* Initial plan * Initial plan: resolve template expressions in RemoteStep.Execute() Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> * RemoteStep: resolve template expressions in step config before gRPC call Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> * RemoteStep: resolve template expressions in step config before gRPC call Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> * remote_step: handle nil config, improve error message, strengthen nil config test Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: intel352 <77607+intel352@users.noreply.github.com>
1 parent 54b30d2 commit 0915b86

12 files changed

Lines changed: 284 additions & 80 deletions

File tree

docs/PLUGIN_ARCHITECTURE.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,9 @@ message ExecuteStepRequest {
358358
map<string, google.protobuf.Struct> step_outputs = 3;
359359
google.protobuf.Struct current = 4;
360360
google.protobuf.Struct metadata = 5;
361+
// config carries the step's config with template expressions resolved
362+
// against the current pipeline context.
363+
google.protobuf.Struct config = 6;
361364
}
362365
363366
// Step execution response

docs/PLUGIN_DEVELOPMENT_GUIDE.md

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -616,15 +616,19 @@ When a workflow step is backed by an external plugin, the execution flow is:
616616

617617
```
618618
1. Pipeline engine calls step.Execute(ctx, pipelineContext)
619-
2. RemoteStep converts PipelineContext to protobuf:
619+
2. RemoteStep resolves template expressions in step config against the current
620+
pipeline context (e.g. {{ index .steps "fetch-user" "row" "id" }} becomes
621+
the actual value from a previous step's output).
622+
3. RemoteStep converts PipelineContext to protobuf:
620623
- TriggerData -> google.protobuf.Struct
621624
- StepOutputs -> map<string, google.protobuf.Struct>
622625
- Current -> google.protobuf.Struct
623626
- Metadata -> google.protobuf.Struct
624-
3. gRPC call: ExecuteStep(ExecuteStepRequest) -> ExecuteStepResponse
625-
4. Plugin deserializes, runs logic, returns output
626-
5. RemoteStep converts response back to StepResult{Output, Stop}
627-
6. Pipeline engine merges output into Current state
627+
- Config -> google.protobuf.Struct (resolved step config)
628+
4. gRPC call: ExecuteStep(ExecuteStepRequest) -> ExecuteStepResponse
629+
5. Plugin deserializes, runs logic using resolved config, returns output
630+
6. RemoteStep converts response back to StepResult{Output, Stop}
631+
7. Pipeline engine merges output into Current state
628632
```
629633

630634
All complex data structures are serialized through `google.protobuf.Struct`, which represents JSON-compatible maps. This means plugin data must be expressible as JSON (strings, numbers, booleans, arrays, nested objects). Binary data should be base64-encoded.

examples/external-plugin/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ type uppercaseStep struct {
3838
name string
3939
}
4040

41-
func (s *uppercaseStep) Execute(_ context.Context, _ map[string]any, _ map[string]map[string]any, current map[string]any, _ map[string]any) (*sdk.StepResult, error) {
41+
func (s *uppercaseStep) Execute(_ context.Context, _ map[string]any, _ map[string]map[string]any, current map[string]any, _ map[string]any, _ map[string]any) (*sdk.StepResult, error) {
4242
input, _ := current["input"].(string)
4343
return &sdk.StepResult{
4444
Output: map[string]any{

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ require (
5050
github.com/redis/go-redis/v9 v9.18.0
5151
github.com/stripe/stripe-go/v82 v82.5.1
5252
github.com/tliron/glsp v0.2.2
53+
github.com/xdg-go/scram v1.2.0
5354
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.65.0
5455
go.opentelemetry.io/otel v1.40.0
5556
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.40.0
@@ -208,7 +209,6 @@ require (
208209
github.com/tliron/commonlog v0.2.8 // indirect
209210
github.com/tliron/kutil v0.3.11 // indirect
210211
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
211-
github.com/xdg-go/scram v1.2.0 // indirect
212212
github.com/xdg-go/stringprep v1.0.4 // indirect
213213
github.com/yosida95/uritemplate/v3 v3.0.2 // indirect
214214
github.com/yuin/gopher-lua v1.1.1 // indirect

plugin/external/adapter.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ func (a *ExternalPluginAdapter) StepFactories() map[string]plugin.StepFactory {
165165
if createResp.Error != "" {
166166
return nil, fmt.Errorf("create remote step %s: %s", tn, createResp.Error)
167167
}
168-
return NewRemoteStep(name, createResp.HandleId, a.client.client), nil
168+
return NewRemoteStep(name, createResp.HandleId, a.client.client, cfg), nil
169169
}
170170
}
171171
return factories

plugin/external/proto/plugin.pb.go

Lines changed: 81 additions & 68 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

plugin/external/proto/plugin.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,10 @@ message ExecuteStepRequest {
179179
map<string, google.protobuf.Struct> step_outputs = 3;
180180
google.protobuf.Struct current = 4;
181181
google.protobuf.Struct metadata = 5;
182+
// config carries the step's config with template expressions resolved against
183+
// the current pipeline context. Plugin steps should prefer this over any
184+
// config stored at CreateStep time so that dynamic values are honoured.
185+
google.protobuf.Struct config = 6;
182186
}
183187

184188
// ExecuteStepResponse returns the step result.

plugin/external/proto/plugin_grpc.pb.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

plugin/external/remote_step.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,21 @@ import (
1313
type RemoteStep struct {
1414
name string
1515
handleID string
16+
config map[string]any
1617
client pb.PluginServiceClient
18+
tmpl *module.TemplateEngine
1719
}
1820

1921
// NewRemoteStep creates a remote step proxy.
20-
func NewRemoteStep(name, handleID string, client pb.PluginServiceClient) *RemoteStep {
22+
// config holds the raw (possibly template-containing) step configuration that
23+
// will be resolved against the live pipeline context on each Execute call.
24+
func NewRemoteStep(name, handleID string, client pb.PluginServiceClient, config map[string]any) *RemoteStep {
2125
return &RemoteStep{
2226
name: name,
2327
handleID: handleID,
28+
config: config,
2429
client: client,
30+
tmpl: module.NewTemplateEngine(),
2531
}
2632
}
2733

@@ -30,6 +36,19 @@ func (s *RemoteStep) Name() string {
3036
}
3137

3238
func (s *RemoteStep) Execute(ctx context.Context, pc *module.PipelineContext) (*module.StepResult, error) {
39+
// Resolve template expressions in the step config against the current
40+
// pipeline context so that dynamic values (e.g. outputs of earlier steps)
41+
// are available to the plugin. When no config was provided, skip resolution
42+
// and leave resolvedConfig nil so the Config proto field is omitted.
43+
var resolvedConfig map[string]any
44+
if s.config != nil {
45+
var err error
46+
resolvedConfig, err = s.tmpl.ResolveMap(s.config, pc)
47+
if err != nil {
48+
return nil, fmt.Errorf("remote step %q (handle %s) config resolve: %w", s.name, s.handleID, err)
49+
}
50+
}
51+
3352
// Convert step outputs to proto map
3453
stepOutputs := make(map[string]*structpb.Struct)
3554
for k, v := range pc.StepOutputs {
@@ -42,6 +61,7 @@ func (s *RemoteStep) Execute(ctx context.Context, pc *module.PipelineContext) (*
4261
StepOutputs: stepOutputs,
4362
Current: mapToStruct(pc.Current),
4463
Metadata: mapToStruct(pc.Metadata),
64+
Config: mapToStruct(resolvedConfig),
4565
})
4666
if err != nil {
4767
return nil, fmt.Errorf("remote step execute: %w", err)
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
package external
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"github.com/GoCodeAlone/workflow/module"
8+
pb "github.com/GoCodeAlone/workflow/plugin/external/proto"
9+
"google.golang.org/grpc"
10+
"google.golang.org/protobuf/types/known/emptypb"
11+
"google.golang.org/protobuf/types/known/structpb"
12+
)
13+
14+
// stubPluginServiceClient is a minimal PluginServiceClient that captures
15+
// ExecuteStep requests for assertion in tests.
16+
type stubPluginServiceClient struct {
17+
pb.UnimplementedPluginServiceServer // provides no-op implementations
18+
19+
lastRequest *pb.ExecuteStepRequest
20+
response *pb.ExecuteStepResponse
21+
}
22+
23+
// ExecuteStep records the request and returns the configured response.
24+
func (c *stubPluginServiceClient) ExecuteStep(_ context.Context, req *pb.ExecuteStepRequest, _ ...grpc.CallOption) (*pb.ExecuteStepResponse, error) {
25+
c.lastRequest = req
26+
if c.response != nil {
27+
return c.response, nil
28+
}
29+
return &pb.ExecuteStepResponse{Output: &structpb.Struct{}}, nil
30+
}
31+
32+
// Implement the remaining PluginServiceClient interface methods as no-ops.
33+
func (c *stubPluginServiceClient) GetManifest(_ context.Context, _ *emptypb.Empty, _ ...grpc.CallOption) (*pb.Manifest, error) {
34+
return &pb.Manifest{}, nil
35+
}
36+
func (c *stubPluginServiceClient) GetModuleTypes(_ context.Context, _ *emptypb.Empty, _ ...grpc.CallOption) (*pb.TypeList, error) {
37+
return &pb.TypeList{}, nil
38+
}
39+
func (c *stubPluginServiceClient) GetStepTypes(_ context.Context, _ *emptypb.Empty, _ ...grpc.CallOption) (*pb.TypeList, error) {
40+
return &pb.TypeList{}, nil
41+
}
42+
func (c *stubPluginServiceClient) GetTriggerTypes(_ context.Context, _ *emptypb.Empty, _ ...grpc.CallOption) (*pb.TypeList, error) {
43+
return &pb.TypeList{}, nil
44+
}
45+
func (c *stubPluginServiceClient) GetModuleSchemas(_ context.Context, _ *emptypb.Empty, _ ...grpc.CallOption) (*pb.ModuleSchemaList, error) {
46+
return &pb.ModuleSchemaList{}, nil
47+
}
48+
func (c *stubPluginServiceClient) CreateModule(_ context.Context, _ *pb.CreateModuleRequest, _ ...grpc.CallOption) (*pb.HandleResponse, error) {
49+
return &pb.HandleResponse{}, nil
50+
}
51+
func (c *stubPluginServiceClient) InitModule(_ context.Context, _ *pb.HandleRequest, _ ...grpc.CallOption) (*pb.ErrorResponse, error) {
52+
return &pb.ErrorResponse{}, nil
53+
}
54+
func (c *stubPluginServiceClient) StartModule(_ context.Context, _ *pb.HandleRequest, _ ...grpc.CallOption) (*pb.ErrorResponse, error) {
55+
return &pb.ErrorResponse{}, nil
56+
}
57+
func (c *stubPluginServiceClient) StopModule(_ context.Context, _ *pb.HandleRequest, _ ...grpc.CallOption) (*pb.ErrorResponse, error) {
58+
return &pb.ErrorResponse{}, nil
59+
}
60+
func (c *stubPluginServiceClient) DestroyModule(_ context.Context, _ *pb.HandleRequest, _ ...grpc.CallOption) (*pb.ErrorResponse, error) {
61+
return &pb.ErrorResponse{}, nil
62+
}
63+
func (c *stubPluginServiceClient) CreateStep(_ context.Context, _ *pb.CreateStepRequest, _ ...grpc.CallOption) (*pb.HandleResponse, error) {
64+
return &pb.HandleResponse{}, nil
65+
}
66+
func (c *stubPluginServiceClient) DestroyStep(_ context.Context, _ *pb.HandleRequest, _ ...grpc.CallOption) (*pb.ErrorResponse, error) {
67+
return &pb.ErrorResponse{}, nil
68+
}
69+
func (c *stubPluginServiceClient) InvokeService(_ context.Context, _ *pb.InvokeServiceRequest, _ ...grpc.CallOption) (*pb.InvokeServiceResponse, error) {
70+
return &pb.InvokeServiceResponse{}, nil
71+
}
72+
func (c *stubPluginServiceClient) DeliverMessage(_ context.Context, _ *pb.DeliverMessageRequest, _ ...grpc.CallOption) (*pb.DeliverMessageResponse, error) {
73+
return &pb.DeliverMessageResponse{}, nil
74+
}
75+
func (c *stubPluginServiceClient) GetConfigFragment(_ context.Context, _ *emptypb.Empty, _ ...grpc.CallOption) (*pb.ConfigFragmentResponse, error) {
76+
return &pb.ConfigFragmentResponse{}, nil
77+
}
78+
func (c *stubPluginServiceClient) GetAsset(_ context.Context, _ *pb.GetAssetRequest, _ ...grpc.CallOption) (*pb.GetAssetResponse, error) {
79+
return &pb.GetAssetResponse{}, nil
80+
}
81+
82+
// TestRemoteStep_Execute_ResolvesTemplatesInConfig verifies that template
83+
// expressions in step config are resolved against the PipelineContext before
84+
// the gRPC ExecuteStep call is made.
85+
func TestRemoteStep_Execute_ResolvesTemplatesInConfig(t *testing.T) {
86+
stub := &stubPluginServiceClient{}
87+
cfg := map[string]any{
88+
"user_id": `{{ index .steps "fetch-user" "row" "id" }}`,
89+
"static": "plain-value",
90+
}
91+
step := NewRemoteStep("test-step", "handle-1", stub, cfg)
92+
93+
pc := module.NewPipelineContext(nil, nil)
94+
pc.MergeStepOutput("fetch-user", map[string]any{
95+
"row": map[string]any{"id": "user-42"},
96+
})
97+
98+
_, err := step.Execute(context.Background(), pc)
99+
if err != nil {
100+
t.Fatalf("unexpected error: %v", err)
101+
}
102+
103+
if stub.lastRequest == nil {
104+
t.Fatal("expected ExecuteStep to be called")
105+
}
106+
107+
sent := stub.lastRequest.Config.AsMap()
108+
109+
if sent["user_id"] != "user-42" {
110+
t.Errorf("expected config user_id='user-42', got %q", sent["user_id"])
111+
}
112+
if sent["static"] != "plain-value" {
113+
t.Errorf("expected config static='plain-value', got %q", sent["static"])
114+
}
115+
}
116+
117+
// TestRemoteStep_Execute_NilConfig verifies that a step with no config does
118+
// not panic and sends a nil Config in the request (so the plugin receives nil,
119+
// not an empty map).
120+
func TestRemoteStep_Execute_NilConfig(t *testing.T) {
121+
stub := &stubPluginServiceClient{}
122+
step := NewRemoteStep("test-step", "handle-2", stub, nil)
123+
124+
pc := module.NewPipelineContext(nil, nil)
125+
126+
_, err := step.Execute(context.Background(), pc)
127+
if err != nil {
128+
t.Fatalf("unexpected error: %v", err)
129+
}
130+
if stub.lastRequest == nil {
131+
t.Fatal("expected ExecuteStep to be called")
132+
}
133+
if stub.lastRequest.Config != nil {
134+
t.Errorf("expected nil Config for step with no config, got %v", stub.lastRequest.Config)
135+
}
136+
}
137+
138+
// TestRemoteStep_Execute_StaticConfigPassthrough verifies that a config with
139+
// no template expressions is passed through unmodified.
140+
func TestRemoteStep_Execute_StaticConfigPassthrough(t *testing.T) {
141+
stub := &stubPluginServiceClient{}
142+
cfg := map[string]any{
143+
"endpoint": "https://api.example.com",
144+
"timeout": float64(30),
145+
}
146+
step := NewRemoteStep("test-step", "handle-3", stub, cfg)
147+
148+
_, err := step.Execute(context.Background(), module.NewPipelineContext(nil, nil))
149+
if err != nil {
150+
t.Fatalf("unexpected error: %v", err)
151+
}
152+
153+
sent := stub.lastRequest.Config.AsMap()
154+
if sent["endpoint"] != "https://api.example.com" {
155+
t.Errorf("expected endpoint='https://api.example.com', got %q", sent["endpoint"])
156+
}
157+
if sent["timeout"] != float64(30) {
158+
t.Errorf("expected timeout=30, got %v", sent["timeout"])
159+
}
160+
}

0 commit comments

Comments
 (0)