From bc2431317dcb5f5e217dd9870dcba5628f4fc0ec Mon Sep 17 00:00:00 2001 From: Jon Langevin Date: Tue, 24 Feb 2026 23:02:20 -0500 Subject: [PATCH] feat: add wfctl pipeline run/list commands Implements `wfctl pipeline list -c ` to enumerate pipelines and `wfctl pipeline run -c -p ` to execute a pipeline locally without starting the HTTP server. - Adds `GetPipeline(name)` method to StdEngine to expose the compiled pipeline registry for CLI access - Builds a minimal engine (pipeline-steps plugin only, no HTTP triggers) from the config, then looks up the named pipeline and executes it - Supports `--var key=value` for injecting variables into trigger data - Supports `--input ` for passing structured input - Supports `--verbose` for detailed step output - Prints step-by-step progress (Step N/M: name ... OK (elapsed)) - Registers the `pipeline` top-level command in wfctl main dispatch Co-Authored-By: Claude Opus 4.6 --- cmd/wfctl/main.go | 4 +- cmd/wfctl/pipeline.go | 302 ++++++++++++++++++++++++++++++++++++ cmd/wfctl/pipeline_test.go | 307 +++++++++++++++++++++++++++++++++++++ engine.go | 9 ++ 4 files changed, 621 insertions(+), 1 deletion(-) create mode 100644 cmd/wfctl/pipeline.go create mode 100644 cmd/wfctl/pipeline_test.go diff --git a/cmd/wfctl/main.go b/cmd/wfctl/main.go index e47c2ad1..6cd2d745 100644 --- a/cmd/wfctl/main.go +++ b/cmd/wfctl/main.go @@ -13,6 +13,7 @@ var commands = map[string]func([]string) error{ "inspect": runInspect, "run": runRun, "plugin": runPlugin, + "pipeline": runPipeline, "schema": runSchema, "manifest": runManifest, "migrate": runMigrate, @@ -32,7 +33,8 @@ Commands: validate Validate a workflow configuration file inspect Inspect modules, workflows, and triggers in a config run Run a workflow engine from a config file - plugin Plugin management (init, docs) + plugin Plugin management (init, docs, search, install, list, update, remove) + pipeline Pipeline management (list, run) schema Generate JSON Schema for workflow configs manifest Analyze config and report infrastructure requirements migrate Manage database schema migrations diff --git a/cmd/wfctl/pipeline.go b/cmd/wfctl/pipeline.go new file mode 100644 index 00000000..326f32e4 --- /dev/null +++ b/cmd/wfctl/pipeline.go @@ -0,0 +1,302 @@ +package main + +import ( + "context" + "encoding/json" + "flag" + "fmt" + "log/slog" + "os" + "sort" + "strings" + "time" + + "github.com/CrisisTextLine/modular" + "github.com/GoCodeAlone/workflow" + "github.com/GoCodeAlone/workflow/config" + "github.com/GoCodeAlone/workflow/handlers" + "github.com/GoCodeAlone/workflow/module" + pluginpipeline "github.com/GoCodeAlone/workflow/plugins/pipelinesteps" +) + +func runPipeline(args []string) error { + if len(args) < 1 { + return pipelineUsage() + } + switch args[0] { + case "list": + return runPipelineList(args[1:]) + case "run": + return runPipelineRun(args[1:]) + default: + return pipelineUsage() + } +} + +func pipelineUsage() error { + fmt.Fprintf(flag.CommandLine.Output(), `Usage: wfctl pipeline [options] + +Subcommands: + list List available pipelines in a config file + run Execute a pipeline from a config file +`) + return fmt.Errorf("pipeline subcommand is required") +} + +// runPipelineList lists all pipelines defined in a config file. +func runPipelineList(args []string) error { + fs := flag.NewFlagSet("pipeline list", flag.ContinueOnError) + configPath := fs.String("c", "", "Path to workflow config YAML file (required)") + fs.Usage = func() { + fmt.Fprintf(fs.Output(), "Usage: wfctl pipeline list -c \n\nList available pipelines in a config file.\n\nOptions:\n") + fs.PrintDefaults() + } + if err := fs.Parse(args); err != nil { + return err + } + if *configPath == "" { + fs.Usage() + return fmt.Errorf("-c (config file) is required") + } + + cfg, err := config.LoadFromFile(*configPath) + if err != nil { + return fmt.Errorf("failed to load config: %w", err) + } + + if len(cfg.Pipelines) == 0 { + fmt.Println("No pipelines defined in config.") + return nil + } + + // Sort pipeline names for stable output + names := make([]string, 0, len(cfg.Pipelines)) + for name := range cfg.Pipelines { + names = append(names, name) + } + sort.Strings(names) + + fmt.Printf("Pipelines (%d):\n", len(names)) + for _, name := range names { + // Extract step count if possible + stepCount := 0 + if rawCfg, ok := cfg.Pipelines[name].(map[string]any); ok { + if steps, ok := rawCfg["steps"].([]any); ok { + stepCount = len(steps) + } + } + if stepCount > 0 { + fmt.Printf(" %-40s (%d steps)\n", name, stepCount) + } else { + fmt.Printf(" %s\n", name) + } + } + return nil +} + +// stringSliceFlag is a flag.Value that accumulates multiple --var key=value flags. +type stringSliceFlag []string + +func (s *stringSliceFlag) String() string { + return strings.Join(*s, ", ") +} + +func (s *stringSliceFlag) Set(v string) error { + *s = append(*s, v) + return nil +} + +// runPipelineRun executes a named pipeline from a config file. +func runPipelineRun(args []string) error { + fs := flag.NewFlagSet("pipeline run", flag.ContinueOnError) + configPath := fs.String("c", "", "Path to workflow config YAML file (required)") + pipelineName := fs.String("p", "", "Name of the pipeline to run (required)") + inputJSON := fs.String("input", "", "Input data as JSON object") + verbose := fs.Bool("verbose", false, "Show detailed step output") + var vars stringSliceFlag + fs.Var(&vars, "var", "Variable in key=value format (repeatable)") + fs.Usage = func() { + fmt.Fprintf(fs.Output(), `Usage: wfctl pipeline run -c -p [options] + +Execute a pipeline locally from a config file. + +Examples: + wfctl pipeline run -c app.yaml -p build-and-deploy + wfctl pipeline run -c app.yaml -p deploy --var env=staging --var version=1.2.3 + wfctl pipeline run -c app.yaml -p process-data --input '{"items":[1,2,3]}' + +Options: +`) + fs.PrintDefaults() + } + if err := fs.Parse(args); err != nil { + return err + } + if *configPath == "" { + fs.Usage() + return fmt.Errorf("-c (config file) is required") + } + if *pipelineName == "" { + fs.Usage() + return fmt.Errorf("-p (pipeline name) is required") + } + + // Build initial trigger data from --input JSON + triggerData := make(map[string]any) + if *inputJSON != "" { + if err := json.Unmarshal([]byte(*inputJSON), &triggerData); err != nil { + return fmt.Errorf("invalid --input JSON: %w", err) + } + } + + // Inject --var entries into trigger data + for _, kv := range vars { + idx := strings.IndexByte(kv, '=') + if idx < 0 { + return fmt.Errorf("invalid --var %q: expected key=value format", kv) + } + triggerData[kv[:idx]] = kv[idx+1:] + } + + // Load config + cfg, err := config.LoadFromFile(*configPath) + if err != nil { + return fmt.Errorf("failed to load config: %w", err) + } + + // Verify the pipeline exists before building the engine + if _, ok := cfg.Pipelines[*pipelineName]; !ok { + available := make([]string, 0, len(cfg.Pipelines)) + for name := range cfg.Pipelines { + available = append(available, name) + } + sort.Strings(available) + if len(available) == 0 { + return fmt.Errorf("pipeline %q not found (no pipelines defined in config)", *pipelineName) + } + return fmt.Errorf("pipeline %q not found; available: %s", *pipelineName, strings.Join(available, ", ")) + } + + // Set up a logger — suppress engine noise unless --verbose + logLevel := slog.LevelError + if *verbose { + logLevel = slog.LevelDebug + } + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: logLevel})) + + // Build a minimal engine that can run pipelines without starting an HTTP server. + // Strategy: register the pipeline workflow handler and the pipeline-steps plugin, + // build from config (which wires all step factories and compiles pipelines), + // then look up the named pipeline from the engine's pipeline registry directly. + // We deliberately skip engine.Start() so no HTTP servers or triggers are started. + app := modular.NewStdApplication(nil, logger) + eng := workflow.NewStdEngine(app, logger) + + // Register the pipeline workflow handler (required for configurePipelines to find a PipelineAdder). + eng.RegisterWorkflowHandler(handlers.NewPipelineWorkflowHandler()) + + // Load the pipeline-steps plugin (registers step.log, step.set, step.validate, etc.) + if err := eng.LoadPlugin(pluginpipeline.New()); err != nil { + return fmt.Errorf("failed to load pipeline-steps plugin: %w", err) + } + + // BuildFromConfig registers modules, compiles pipeline steps, and populates + // the engine's pipeline registry. It does NOT start the HTTP server. + if err := eng.BuildFromConfig(cfg); err != nil { + return fmt.Errorf("failed to build engine from config: %w", err) + } + + // Retrieve the compiled pipeline from the engine's registry. + pipeline, ok := eng.GetPipeline(*pipelineName) + if !ok { + return fmt.Errorf("pipeline %q was not compiled by the engine (check config)", *pipelineName) + } + + // Attach a progress-reporting logger to the pipeline steps + pipeline.Logger = slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + + // Print execution header + fmt.Printf("Pipeline: %s\n", *pipelineName) + if len(triggerData) > 0 { + inputBytes, _ := json.Marshal(triggerData) + fmt.Printf("Input: %s\n", inputBytes) + } + fmt.Println() + + totalStart := time.Now() + + // Execute the pipeline, printing step progress inline. + pc, execErr := executePipelineWithProgress(context.Background(), pipeline, triggerData, *verbose) + + totalElapsed := time.Since(totalStart) + + if execErr != nil { + fmt.Printf("\nPipeline FAILED in %s\n", totalElapsed.Round(time.Millisecond)) + return execErr + } + + fmt.Printf("Pipeline completed successfully in %s\n", totalElapsed.Round(time.Millisecond)) + + if *verbose && pc != nil && len(pc.Current) > 0 { + fmt.Println("\nFinal context:") + for k, v := range pc.Current { + fmt.Printf(" %s = %v\n", k, v) + } + } + + return nil +} + +// executePipelineWithProgress wraps pipeline.Execute and prints step-by-step progress to stdout. +// It intercepts step execution by wrapping each step in a progressStep decorator. +func executePipelineWithProgress(ctx context.Context, p *module.Pipeline, triggerData map[string]any, verbose bool) (*module.PipelineContext, error) { + // Wrap each step with a progress reporter + original := p.Steps + wrapped := make([]module.PipelineStep, len(original)) + for i, step := range original { + wrapped[i] = &progressStep{ + inner: step, + index: i, + total: len(original), + verbose: verbose, + } + } + p.Steps = wrapped + defer func() { p.Steps = original }() + + return p.Execute(ctx, triggerData) +} + +// progressStep wraps a PipelineStep and prints progress before/after execution. +type progressStep struct { + inner module.PipelineStep + index int + total int + verbose bool +} + +func (ps *progressStep) Name() string { return ps.inner.Name() } + +func (ps *progressStep) Execute(ctx context.Context, pc *module.PipelineContext) (*module.StepResult, error) { + start := time.Now() + fmt.Printf("Step %d/%d: %s ... ", ps.index+1, ps.total, ps.inner.Name()) + + result, err := ps.inner.Execute(ctx, pc) + elapsed := time.Since(start) + + if err != nil { + fmt.Printf("FAILED (%s)\n", elapsed.Round(time.Millisecond)) + fmt.Printf(" Error: %v\n", err) + return result, err + } + + fmt.Printf("OK (%s)\n", elapsed.Round(time.Millisecond)) + + if ps.verbose && result != nil && len(result.Output) > 0 { + for k, v := range result.Output { + fmt.Printf(" %s = %v\n", k, v) + } + } + + return result, nil +} diff --git a/cmd/wfctl/pipeline_test.go b/cmd/wfctl/pipeline_test.go new file mode 100644 index 00000000..b73e31d7 --- /dev/null +++ b/cmd/wfctl/pipeline_test.go @@ -0,0 +1,307 @@ +package main + +import ( + "os" + "path/filepath" + "strings" + "testing" +) + +// pipelineConfig is a minimal config with two pipelines using only +// step.log and step.set, which are always available via the pipeline-steps plugin. +const pipelineConfig = ` +modules: [] + +pipelines: + greet: + trigger: + type: http + config: + path: /greet + method: POST + steps: + - name: say-hello + type: step.log + config: + level: info + message: "Hello, world!" + - name: set-result + type: step.set + config: + values: + greeted: "true" + on_error: stop + + echo: + trigger: + type: http + config: + path: /echo + method: POST + steps: + - name: log-input + type: step.log + config: + level: info + message: "Echo: {{ .message }}" + on_error: stop +` + +// pipelineSingleConfig has only one pipeline, for tests that need a single-pipeline config. +const pipelineSingleConfig = ` +modules: [] + +pipelines: + hello: + trigger: + type: http + config: + path: /hello + method: GET + steps: + - name: log-hello + type: step.log + config: + level: info + message: "Hello from pipeline!" + on_error: stop +` + +// noPipelinesConfig has no pipelines section. +const noPipelinesConfig = ` +modules: + - name: server + type: http.server + config: + address: ":8080" +` + +func writePipelineConfig(t *testing.T, dir, name, content string) string { + t.Helper() + path := filepath.Join(dir, name) + if err := os.WriteFile(path, []byte(content), 0644); err != nil { + t.Fatalf("failed to write config: %v", err) + } + return path +} + +// --- pipeline subcommand routing --- + +func TestRunPipelineMissingSubcommand(t *testing.T) { + err := runPipeline([]string{}) + if err == nil { + t.Fatal("expected error when no pipeline subcommand given") + } + if !strings.Contains(err.Error(), "subcommand") { + t.Errorf("expected subcommand error, got: %v", err) + } +} + +func TestRunPipelineUnknownSubcommand(t *testing.T) { + err := runPipeline([]string{"bogus"}) + if err == nil { + t.Fatal("expected error for unknown subcommand") + } +} + +// --- pipeline list --- + +func TestRunPipelineListMissingConfig(t *testing.T) { + err := runPipelineList([]string{}) + if err == nil { + t.Fatal("expected error when -c is missing") + } + if !strings.Contains(err.Error(), "-c") { + t.Errorf("expected -c error, got: %v", err) + } +} + +func TestRunPipelineListNoPipelines(t *testing.T) { + dir := t.TempDir() + path := writePipelineConfig(t, dir, "no-pipelines.yaml", noPipelinesConfig) + if err := runPipelineList([]string{"-c", path}); err != nil { + t.Fatalf("expected no error for empty pipelines, got: %v", err) + } +} + +func TestRunPipelineListWithPipelines(t *testing.T) { + dir := t.TempDir() + path := writePipelineConfig(t, dir, "config.yaml", pipelineConfig) + if err := runPipelineList([]string{"-c", path}); err != nil { + t.Fatalf("pipeline list failed: %v", err) + } +} + +func TestRunPipelineListSinglePipeline(t *testing.T) { + dir := t.TempDir() + path := writePipelineConfig(t, dir, "single.yaml", pipelineSingleConfig) + if err := runPipelineList([]string{"-c", path}); err != nil { + t.Fatalf("pipeline list failed: %v", err) + } +} + +func TestRunPipelineListInvalidConfig(t *testing.T) { + dir := t.TempDir() + path := writePipelineConfig(t, dir, "bad.yaml", "not: valid: yaml: ::::") + // LoadFromFile uses yaml.Unmarshal which is lenient; test a nonexistent file + _ = path + err := runPipelineList([]string{"-c", filepath.Join(dir, "nonexistent.yaml")}) + if err == nil { + t.Fatal("expected error for missing file") + } +} + +// --- pipeline run --- + +func TestRunPipelineRunMissingConfig(t *testing.T) { + err := runPipelineRun([]string{"-p", "greet"}) + if err == nil { + t.Fatal("expected error when -c is missing") + } + if !strings.Contains(err.Error(), "-c") { + t.Errorf("expected -c error, got: %v", err) + } +} + +func TestRunPipelineRunMissingPipelineName(t *testing.T) { + dir := t.TempDir() + path := writePipelineConfig(t, dir, "config.yaml", pipelineConfig) + err := runPipelineRun([]string{"-c", path}) + if err == nil { + t.Fatal("expected error when -p is missing") + } + if !strings.Contains(err.Error(), "-p") { + t.Errorf("expected -p error, got: %v", err) + } +} + +func TestRunPipelineRunUnknownPipeline(t *testing.T) { + dir := t.TempDir() + path := writePipelineConfig(t, dir, "config.yaml", pipelineConfig) + err := runPipelineRun([]string{"-c", path, "-p", "does-not-exist"}) + if err == nil { + t.Fatal("expected error for unknown pipeline name") + } + if !strings.Contains(err.Error(), "does-not-exist") { + t.Errorf("expected pipeline name in error, got: %v", err) + } +} + +func TestRunPipelineRunUnknownPipelineShowsAvailable(t *testing.T) { + dir := t.TempDir() + path := writePipelineConfig(t, dir, "config.yaml", pipelineConfig) + err := runPipelineRun([]string{"-c", path, "-p", "missing"}) + if err == nil { + t.Fatal("expected error for unknown pipeline") + } + // Should list available pipelines in the error + if !strings.Contains(err.Error(), "available") { + t.Errorf("expected 'available' in error, got: %v", err) + } +} + +func TestRunPipelineRunNoPipelinesInConfig(t *testing.T) { + dir := t.TempDir() + path := writePipelineConfig(t, dir, "config.yaml", noPipelinesConfig) + err := runPipelineRun([]string{"-c", path, "-p", "anything"}) + if err == nil { + t.Fatal("expected error when no pipelines defined") + } + if !strings.Contains(err.Error(), "no pipelines defined") { + t.Errorf("expected 'no pipelines defined' in error, got: %v", err) + } +} + +func TestRunPipelineRunSuccess(t *testing.T) { + dir := t.TempDir() + path := writePipelineConfig(t, dir, "config.yaml", pipelineSingleConfig) + if err := runPipelineRun([]string{"-c", path, "-p", "hello"}); err != nil { + t.Fatalf("pipeline run failed: %v", err) + } +} + +func TestRunPipelineRunWithVars(t *testing.T) { + dir := t.TempDir() + path := writePipelineConfig(t, dir, "config.yaml", pipelineSingleConfig) + if err := runPipelineRun([]string{"-c", path, "-p", "hello", "--var", "env=test", "--var", "version=1.0"}); err != nil { + t.Fatalf("pipeline run with vars failed: %v", err) + } +} + +func TestRunPipelineRunWithInputJSON(t *testing.T) { + dir := t.TempDir() + path := writePipelineConfig(t, dir, "config.yaml", pipelineSingleConfig) + if err := runPipelineRun([]string{"-c", path, "-p", "hello", "--input", `{"key":"value"}`}); err != nil { + t.Fatalf("pipeline run with input JSON failed: %v", err) + } +} + +func TestRunPipelineRunWithInvalidInputJSON(t *testing.T) { + dir := t.TempDir() + path := writePipelineConfig(t, dir, "config.yaml", pipelineSingleConfig) + err := runPipelineRun([]string{"-c", path, "-p", "hello", "--input", `not-json`}) + if err == nil { + t.Fatal("expected error for invalid JSON input") + } + if !strings.Contains(err.Error(), "JSON") { + t.Errorf("expected JSON error, got: %v", err) + } +} + +func TestRunPipelineRunWithInvalidVar(t *testing.T) { + dir := t.TempDir() + path := writePipelineConfig(t, dir, "config.yaml", pipelineSingleConfig) + err := runPipelineRun([]string{"-c", path, "-p", "hello", "--var", "noequals"}) + if err == nil { + t.Fatal("expected error for invalid --var format") + } + if !strings.Contains(err.Error(), "key=value") { + t.Errorf("expected key=value error, got: %v", err) + } +} + +func TestRunPipelineRunVerbose(t *testing.T) { + dir := t.TempDir() + path := writePipelineConfig(t, dir, "config.yaml", pipelineSingleConfig) + if err := runPipelineRun([]string{"-c", path, "-p", "hello", "--verbose"}); err != nil { + t.Fatalf("pipeline run verbose failed: %v", err) + } +} + +func TestRunPipelineRunMultiStep(t *testing.T) { + dir := t.TempDir() + path := writePipelineConfig(t, dir, "config.yaml", pipelineConfig) + if err := runPipelineRun([]string{"-c", path, "-p", "greet"}); err != nil { + t.Fatalf("multi-step pipeline run failed: %v", err) + } +} + +func TestRunPipelineRunEchoPipeline(t *testing.T) { + dir := t.TempDir() + path := writePipelineConfig(t, dir, "config.yaml", pipelineConfig) + // Echo pipeline logs .message — pass it via --var + if err := runPipelineRun([]string{"-c", path, "-p", "echo", "--var", "message=hello"}); err != nil { + t.Fatalf("echo pipeline run failed: %v", err) + } +} + +// --- stringSliceFlag --- + +func TestStringSliceFlag(t *testing.T) { + var f stringSliceFlag + if err := f.Set("key=val"); err != nil { + t.Fatalf("Set failed: %v", err) + } + if err := f.Set("foo=bar"); err != nil { + t.Fatalf("Set failed: %v", err) + } + if len(f) != 2 { + t.Errorf("expected 2 entries, got %d", len(f)) + } + if f[0] != "key=val" || f[1] != "foo=bar" { + t.Errorf("unexpected values: %v", f) + } + if !strings.Contains(f.String(), "key=val") { + t.Errorf("String() missing entry: %s", f.String()) + } +} diff --git a/engine.go b/engine.go index 8e6a49d9..af98fbfc 100644 --- a/engine.go +++ b/engine.go @@ -917,6 +917,15 @@ func (e *StdEngine) GetApp() modular.Application { return e.app } +// GetPipeline returns the named pipeline from the engine's pipeline registry. +// Returns nil and false if no pipeline with the given name exists. +// This is useful for CLI tools (e.g., wfctl pipeline run) that need to +// execute a pipeline directly without starting the HTTP server. +func (e *StdEngine) GetPipeline(name string) (*module.Pipeline, bool) { + p, ok := e.pipelineRegistry[name] + return p, ok +} + // LoadedPlugins returns all engine plugins that were loaded via LoadPlugin. func (e *StdEngine) LoadedPlugins() []plugin.EnginePlugin { out := make([]plugin.EnginePlugin, len(e.enginePlugins))