Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/PLUGIN_ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,9 @@ message ExecuteStepRequest {
map<string, google.protobuf.Struct> step_outputs = 3;
google.protobuf.Struct current = 4;
google.protobuf.Struct metadata = 5;
// config carries the step's config with template expressions resolved
// against the current pipeline context.
google.protobuf.Struct config = 6;
}

// Step execution response
Expand Down
14 changes: 9 additions & 5 deletions docs/PLUGIN_DEVELOPMENT_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -616,15 +616,19 @@ When a workflow step is backed by an external plugin, the execution flow is:

```
1. Pipeline engine calls step.Execute(ctx, pipelineContext)
2. RemoteStep converts PipelineContext to protobuf:
2. RemoteStep resolves template expressions in step config against the current
pipeline context (e.g. {{ index .steps "fetch-user" "row" "id" }} becomes
the actual value from a previous step's output).
3. RemoteStep converts PipelineContext to protobuf:
- TriggerData -> google.protobuf.Struct
- StepOutputs -> map<string, google.protobuf.Struct>
- Current -> google.protobuf.Struct
- Metadata -> google.protobuf.Struct
3. gRPC call: ExecuteStep(ExecuteStepRequest) -> ExecuteStepResponse
4. Plugin deserializes, runs logic, returns output
5. RemoteStep converts response back to StepResult{Output, Stop}
6. Pipeline engine merges output into Current state
- Config -> google.protobuf.Struct (resolved step config)
4. gRPC call: ExecuteStep(ExecuteStepRequest) -> ExecuteStepResponse
5. Plugin deserializes, runs logic using resolved config, returns output
6. RemoteStep converts response back to StepResult{Output, Stop}
7. Pipeline engine merges output into Current state
```

All complex data structures are serialized through `google.protobuf.Struct`, which represents JSON-compatible maps. This means plugin data must be expressible as JSON (strings, numbers, booleans, arrays, nested objects). Binary data should be base64-encoded.
Expand Down
2 changes: 1 addition & 1 deletion examples/external-plugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type uppercaseStep struct {
name string
}

func (s *uppercaseStep) Execute(_ context.Context, _ map[string]any, _ map[string]map[string]any, current map[string]any, _ map[string]any) (*sdk.StepResult, error) {
func (s *uppercaseStep) Execute(_ context.Context, _ map[string]any, _ map[string]map[string]any, current map[string]any, _ map[string]any, _ map[string]any) (*sdk.StepResult, error) {
input, _ := current["input"].(string)
return &sdk.StepResult{
Output: map[string]any{
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ require (
github.com/redis/go-redis/v9 v9.18.0
github.com/stripe/stripe-go/v82 v82.5.1
github.com/tliron/glsp v0.2.2
github.com/xdg-go/scram v1.2.0
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.65.0
go.opentelemetry.io/otel v1.40.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.40.0
Expand Down Expand Up @@ -208,7 +209,6 @@ require (
github.com/tliron/commonlog v0.2.8 // indirect
github.com/tliron/kutil v0.3.11 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.2.0 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/yosida95/uritemplate/v3 v3.0.2 // indirect
github.com/yuin/gopher-lua v1.1.1 // indirect
Expand Down
2 changes: 1 addition & 1 deletion plugin/external/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (a *ExternalPluginAdapter) StepFactories() map[string]plugin.StepFactory {
if createResp.Error != "" {
return nil, fmt.Errorf("create remote step %s: %s", tn, createResp.Error)
}
return NewRemoteStep(name, createResp.HandleId, a.client.client), nil
return NewRemoteStep(name, createResp.HandleId, a.client.client, cfg), nil
}
}
return factories
Expand Down
149 changes: 81 additions & 68 deletions plugin/external/proto/plugin.pb.go

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions plugin/external/proto/plugin.proto
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,10 @@ message ExecuteStepRequest {
map<string, google.protobuf.Struct> step_outputs = 3;
google.protobuf.Struct current = 4;
google.protobuf.Struct metadata = 5;
// config carries the step's config with template expressions resolved against
// the current pipeline context. Plugin steps should prefer this over any
// config stored at CreateStep time so that dynamic values are honoured.
google.protobuf.Struct config = 6;
}

// ExecuteStepResponse returns the step result.
Expand Down
2 changes: 1 addition & 1 deletion plugin/external/proto/plugin_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 21 additions & 1 deletion plugin/external/remote_step.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,21 @@ import (
type RemoteStep struct {
name string
handleID string
config map[string]any
client pb.PluginServiceClient
tmpl *module.TemplateEngine
}

// NewRemoteStep creates a remote step proxy.
func NewRemoteStep(name, handleID string, client pb.PluginServiceClient) *RemoteStep {
// config holds the raw (possibly template-containing) step configuration that
// will be resolved against the live pipeline context on each Execute call.
func NewRemoteStep(name, handleID string, client pb.PluginServiceClient, config map[string]any) *RemoteStep {
return &RemoteStep{
name: name,
handleID: handleID,
config: config,
client: client,
tmpl: module.NewTemplateEngine(),
}
}

Expand All @@ -30,6 +36,19 @@ func (s *RemoteStep) Name() string {
}

func (s *RemoteStep) Execute(ctx context.Context, pc *module.PipelineContext) (*module.StepResult, error) {
// Resolve template expressions in the step config against the current
// pipeline context so that dynamic values (e.g. outputs of earlier steps)
// are available to the plugin. When no config was provided, skip resolution
// and leave resolvedConfig nil so the Config proto field is omitted.
var resolvedConfig map[string]any
if s.config != nil {
var err error
resolvedConfig, err = s.tmpl.ResolveMap(s.config, pc)
if err != nil {
return nil, fmt.Errorf("remote step %q (handle %s) config resolve: %w", s.name, s.handleID, err)
}
}

// Convert step outputs to proto map
stepOutputs := make(map[string]*structpb.Struct)
for k, v := range pc.StepOutputs {
Expand All @@ -42,6 +61,7 @@ func (s *RemoteStep) Execute(ctx context.Context, pc *module.PipelineContext) (*
StepOutputs: stepOutputs,
Current: mapToStruct(pc.Current),
Metadata: mapToStruct(pc.Metadata),
Config: mapToStruct(resolvedConfig),
})
if err != nil {
return nil, fmt.Errorf("remote step execute: %w", err)
Expand Down
160 changes: 160 additions & 0 deletions plugin/external/remote_step_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package external

import (
"context"
"testing"

"github.com/GoCodeAlone/workflow/module"
pb "github.com/GoCodeAlone/workflow/plugin/external/proto"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/structpb"
)

// stubPluginServiceClient is a minimal PluginServiceClient that captures
// ExecuteStep requests for assertion in tests.
type stubPluginServiceClient struct {
pb.UnimplementedPluginServiceServer // provides no-op implementations

lastRequest *pb.ExecuteStepRequest
response *pb.ExecuteStepResponse
}

// ExecuteStep records the request and returns the configured response.
func (c *stubPluginServiceClient) ExecuteStep(_ context.Context, req *pb.ExecuteStepRequest, _ ...grpc.CallOption) (*pb.ExecuteStepResponse, error) {
c.lastRequest = req
if c.response != nil {
return c.response, nil
}
return &pb.ExecuteStepResponse{Output: &structpb.Struct{}}, nil
}

// Implement the remaining PluginServiceClient interface methods as no-ops.
func (c *stubPluginServiceClient) GetManifest(_ context.Context, _ *emptypb.Empty, _ ...grpc.CallOption) (*pb.Manifest, error) {
return &pb.Manifest{}, nil
}
func (c *stubPluginServiceClient) GetModuleTypes(_ context.Context, _ *emptypb.Empty, _ ...grpc.CallOption) (*pb.TypeList, error) {
return &pb.TypeList{}, nil
}
func (c *stubPluginServiceClient) GetStepTypes(_ context.Context, _ *emptypb.Empty, _ ...grpc.CallOption) (*pb.TypeList, error) {
return &pb.TypeList{}, nil
}
func (c *stubPluginServiceClient) GetTriggerTypes(_ context.Context, _ *emptypb.Empty, _ ...grpc.CallOption) (*pb.TypeList, error) {
return &pb.TypeList{}, nil
}
func (c *stubPluginServiceClient) GetModuleSchemas(_ context.Context, _ *emptypb.Empty, _ ...grpc.CallOption) (*pb.ModuleSchemaList, error) {
return &pb.ModuleSchemaList{}, nil
}
func (c *stubPluginServiceClient) CreateModule(_ context.Context, _ *pb.CreateModuleRequest, _ ...grpc.CallOption) (*pb.HandleResponse, error) {
return &pb.HandleResponse{}, nil
}
func (c *stubPluginServiceClient) InitModule(_ context.Context, _ *pb.HandleRequest, _ ...grpc.CallOption) (*pb.ErrorResponse, error) {
return &pb.ErrorResponse{}, nil
}
func (c *stubPluginServiceClient) StartModule(_ context.Context, _ *pb.HandleRequest, _ ...grpc.CallOption) (*pb.ErrorResponse, error) {
return &pb.ErrorResponse{}, nil
}
func (c *stubPluginServiceClient) StopModule(_ context.Context, _ *pb.HandleRequest, _ ...grpc.CallOption) (*pb.ErrorResponse, error) {
return &pb.ErrorResponse{}, nil
}
func (c *stubPluginServiceClient) DestroyModule(_ context.Context, _ *pb.HandleRequest, _ ...grpc.CallOption) (*pb.ErrorResponse, error) {
return &pb.ErrorResponse{}, nil
}
func (c *stubPluginServiceClient) CreateStep(_ context.Context, _ *pb.CreateStepRequest, _ ...grpc.CallOption) (*pb.HandleResponse, error) {
return &pb.HandleResponse{}, nil
}
func (c *stubPluginServiceClient) DestroyStep(_ context.Context, _ *pb.HandleRequest, _ ...grpc.CallOption) (*pb.ErrorResponse, error) {
return &pb.ErrorResponse{}, nil
}
func (c *stubPluginServiceClient) InvokeService(_ context.Context, _ *pb.InvokeServiceRequest, _ ...grpc.CallOption) (*pb.InvokeServiceResponse, error) {
return &pb.InvokeServiceResponse{}, nil
}
func (c *stubPluginServiceClient) DeliverMessage(_ context.Context, _ *pb.DeliverMessageRequest, _ ...grpc.CallOption) (*pb.DeliverMessageResponse, error) {
return &pb.DeliverMessageResponse{}, nil
}
func (c *stubPluginServiceClient) GetConfigFragment(_ context.Context, _ *emptypb.Empty, _ ...grpc.CallOption) (*pb.ConfigFragmentResponse, error) {
return &pb.ConfigFragmentResponse{}, nil
}
func (c *stubPluginServiceClient) GetAsset(_ context.Context, _ *pb.GetAssetRequest, _ ...grpc.CallOption) (*pb.GetAssetResponse, error) {
return &pb.GetAssetResponse{}, nil
}

// TestRemoteStep_Execute_ResolvesTemplatesInConfig verifies that template
// expressions in step config are resolved against the PipelineContext before
// the gRPC ExecuteStep call is made.
func TestRemoteStep_Execute_ResolvesTemplatesInConfig(t *testing.T) {
stub := &stubPluginServiceClient{}
cfg := map[string]any{
"user_id": `{{ index .steps "fetch-user" "row" "id" }}`,
"static": "plain-value",
}
step := NewRemoteStep("test-step", "handle-1", stub, cfg)

pc := module.NewPipelineContext(nil, nil)
pc.MergeStepOutput("fetch-user", map[string]any{
"row": map[string]any{"id": "user-42"},
})

_, err := step.Execute(context.Background(), pc)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

if stub.lastRequest == nil {
t.Fatal("expected ExecuteStep to be called")
}

sent := stub.lastRequest.Config.AsMap()

if sent["user_id"] != "user-42" {
t.Errorf("expected config user_id='user-42', got %q", sent["user_id"])
}
if sent["static"] != "plain-value" {
t.Errorf("expected config static='plain-value', got %q", sent["static"])
}
}

// TestRemoteStep_Execute_NilConfig verifies that a step with no config does
// not panic and sends a nil Config in the request (so the plugin receives nil,
// not an empty map).
func TestRemoteStep_Execute_NilConfig(t *testing.T) {
stub := &stubPluginServiceClient{}
step := NewRemoteStep("test-step", "handle-2", stub, nil)

pc := module.NewPipelineContext(nil, nil)

_, err := step.Execute(context.Background(), pc)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if stub.lastRequest == nil {
t.Fatal("expected ExecuteStep to be called")
}
if stub.lastRequest.Config != nil {
t.Errorf("expected nil Config for step with no config, got %v", stub.lastRequest.Config)
}
}

// TestRemoteStep_Execute_StaticConfigPassthrough verifies that a config with
// no template expressions is passed through unmodified.
func TestRemoteStep_Execute_StaticConfigPassthrough(t *testing.T) {
stub := &stubPluginServiceClient{}
cfg := map[string]any{
"endpoint": "https://api.example.com",
"timeout": float64(30),
}
step := NewRemoteStep("test-step", "handle-3", stub, cfg)

_, err := step.Execute(context.Background(), module.NewPipelineContext(nil, nil))
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

sent := stub.lastRequest.Config.AsMap()
if sent["endpoint"] != "https://api.example.com" {
t.Errorf("expected endpoint='https://api.example.com', got %q", sent["endpoint"])
}
if sent["timeout"] != float64(30) {
t.Errorf("expected timeout=30, got %v", sent["timeout"])
}
}
2 changes: 1 addition & 1 deletion plugin/external/sdk/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func (s *grpcServer) ExecuteStep(ctx context.Context, req *pb.ExecuteStepRequest
stepOutputs[k] = structToMap(v)
}

result, err := inst.Execute(ctx, structToMap(req.TriggerData), stepOutputs, structToMap(req.Current), structToMap(req.Metadata))
result, err := inst.Execute(ctx, structToMap(req.TriggerData), stepOutputs, structToMap(req.Current), structToMap(req.Metadata), structToMap(req.Config))
if err != nil {
return &pb.ExecuteStepResponse{Error: err.Error()}, nil //nolint:nilerr // app error in response field
}
Expand Down
2 changes: 1 addition & 1 deletion plugin/external/sdk/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type StepProvider interface {

// StepInstance is a remote pipeline step.
type StepInstance interface {
Execute(ctx context.Context, triggerData map[string]any, stepOutputs map[string]map[string]any, current map[string]any, metadata map[string]any) (*StepResult, error)
Execute(ctx context.Context, triggerData map[string]any, stepOutputs map[string]map[string]any, current map[string]any, metadata map[string]any, config map[string]any) (*StepResult, error)
}

// StepResult is the output of a step execution.
Expand Down
Loading