Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions cmd/wfctl/multi_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ func NewMultiRegistry(cfg *RegistryConfig) *MultiRegistry {
case "github":
sources = append(sources, NewGitHubRegistrySource(sc))
case "static":
staticSrc, staticErr := NewStaticRegistrySource(sc)
if staticErr != nil {
fmt.Fprintf(os.Stderr, "warning: %v, skipping\n", staticErr)
src, err := NewStaticRegistrySource(sc)
if err != nil {
fmt.Fprintf(os.Stderr, "warning: %v, skipping\n", err)
continue
}
sources = append(sources, staticSrc)
sources = append(sources, src)
default:
// Skip unknown types
fmt.Fprintf(os.Stderr, "warning: unknown registry type %q for %q, skipping\n", sc.Type, sc.Name)
Expand Down
60 changes: 32 additions & 28 deletions cmd/wfctl/plugin_install.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,19 +81,19 @@ func runPluginInstall(args []string) error {
return err
}

// Enforce mutual exclusivity: at most one of --url, --local, or positional args.
exclusiveCount := 0
// Validate mutual exclusivity of install modes.
modes := 0
if *directURL != "" {
exclusiveCount++
modes++
}
if *localPath != "" {
exclusiveCount++
modes++
}
if fs.NArg() > 0 {
exclusiveCount++
modes++
}
if exclusiveCount > 1 {
return fmt.Errorf("--url, --local, and <name> are mutually exclusive; specify only one")
if modes > 1 {
return fmt.Errorf("specify only one of: <name>, --url, or --local")
}

if *directURL != "" {
Expand Down Expand Up @@ -165,13 +165,15 @@ func runPluginInstall(args []string) error {

// Update .wfctl.yaml lockfile if name@version was provided.
if _, ver := parseNameVersion(nameArg); ver != "" {
// Hash the installed binary (not the archive) so verifyInstalledChecksum matches.
pluginName = normalizePluginName(pluginName)
binaryChecksum := ""
binaryPath := filepath.Join(pluginDirVal, pluginName, pluginName)
sha, hashErr := hashFileSHA256(binaryPath)
if hashErr != nil {
fmt.Fprintf(os.Stderr, "warning: could not hash installed binary: %v\n", hashErr)
if cs, hashErr := hashFileSHA256(binaryPath); hashErr == nil {
binaryChecksum = cs
} else {
fmt.Fprintf(os.Stderr, "warning: could not hash binary %s: %v (lockfile will have no checksum)\n", binaryPath, hashErr)
}
updateLockfileWithChecksum(pluginName, manifest.Version, manifest.Repository, sourceName, sha)
updateLockfileWithChecksum(pluginName, manifest.Version, manifest.Repository, sourceName, binaryChecksum)
}

return nil
Expand Down Expand Up @@ -516,7 +518,7 @@ func installFromURL(url, pluginDir string) error {
}

if err := ensurePluginBinary(destDir, pluginName); err != nil {
return fmt.Errorf("normalize binary name: %w", err)
return fmt.Errorf("could not normalize binary name: %w", err)
}

// Validate the installed plugin (same checks as registry installs).
Expand All @@ -536,6 +538,20 @@ func installFromURL(url, pluginDir string) error {
return nil
}

// hashFileSHA256 computes the SHA-256 hex digest of the file at path using streaming I/O.
func hashFileSHA256(path string) (string, error) {
f, err := os.Open(path)
if err != nil {
return "", fmt.Errorf("hash file %s: %w", path, err)
}
defer f.Close()
h := sha256.New()
if _, err := io.Copy(h, f); err != nil {
return "", fmt.Errorf("hash file %s: %w", path, err)
}
return hex.EncodeToString(h.Sum(nil)), nil
}

// verifyInstalledChecksum reads the plugin binary and verifies its SHA-256 checksum.
func verifyInstalledChecksum(pluginDir, pluginName, expectedSHA256 string) error {
binaryPath := filepath.Join(pluginDir, pluginName)
Expand Down Expand Up @@ -590,13 +606,11 @@ func installFromLocal(srcDir, pluginDir string) error {
return err
}

// Update lockfile with binary checksum for consistency with other install paths.
installedBinary := filepath.Join(destDir, pluginName)
sha, hashErr := hashFileSHA256(installedBinary)
binaryChecksum, hashErr := hashFileSHA256(filepath.Join(destDir, pluginName))
if hashErr != nil {
fmt.Fprintf(os.Stderr, "warning: could not hash installed binary: %v\n", hashErr)
fmt.Fprintf(os.Stderr, "warning: could not compute binary checksum: %v\n", hashErr)
}
updateLockfileWithChecksum(pluginName, pj.Version, "", "", sha)
updateLockfileWithChecksum(pluginName, pj.Version, "", "", binaryChecksum)

fmt.Printf("Installed %s v%s from %s to %s\n", pluginName, pj.Version, srcDir, destDir)
return nil
Expand Down Expand Up @@ -693,16 +707,6 @@ func parseGitHubRepoURL(repoURL string) (owner, repo string, err error) {
return parts[1], repoName, nil
}

// hashFileSHA256 returns the hex-encoded SHA-256 hash of the file at path.
func hashFileSHA256(path string) (string, error) {
data, err := os.ReadFile(path)
if err != nil {
return "", fmt.Errorf("hash file %s: %w", path, err)
}
h := sha256.Sum256(data)
return hex.EncodeToString(h[:]), nil
}

// extractTarGz decompresses and extracts a .tar.gz archive into destDir.
// It guards against path traversal (zip-slip) attacks.
func extractTarGz(data []byte, destDir string) error {
Expand Down
6 changes: 4 additions & 2 deletions cmd/wfctl/plugin_lockfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,10 @@ func installFromLockfile(pluginDir, cfgPath string) error {
if entry.SHA256 != "" {
pluginInstallDir := filepath.Join(pluginDir, name)
if verifyErr := verifyInstalledChecksum(pluginInstallDir, name, entry.SHA256); verifyErr != nil {
fmt.Fprintf(os.Stderr, "CHECKSUM MISMATCH for %s: %v — removing plugin\n", name, verifyErr)
_ = os.RemoveAll(pluginInstallDir)
fmt.Fprintf(os.Stderr, "CHECKSUM MISMATCH for %s: %v\n", name, verifyErr)
if removeErr := os.RemoveAll(pluginInstallDir); removeErr != nil {
fmt.Fprintf(os.Stderr, "warning: could not remove plugin dir: %v\n", removeErr)
}
failed = append(failed, name)
continue
}
Expand Down
20 changes: 15 additions & 5 deletions cmd/wfctl/registry_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (g *GitHubRegistrySource) ListPlugins() ([]string, error) {
req.Header.Set("Authorization", "Bearer "+token)
}

resp, err := http.DefaultClient.Do(req)
resp, err := registryHTTPClient.Do(req)
if err != nil {
return nil, fmt.Errorf("list registry plugins from %s: %w", g.name, err)
}
Expand All @@ -90,7 +90,11 @@ func (g *GitHubRegistrySource) FetchManifest(name string) (*RegistryManifest, er
"https://raw.githubusercontent.com/%s/%s/%s/plugins/%s/manifest.json",
g.owner, g.repo, g.branch, name,
)
resp, err := http.Get(url) //nolint:gosec // URL constructed from configured registry
req, err := http.NewRequest(http.MethodGet, url, nil) //nolint:gosec // URL constructed from configured registry
if err != nil {
return nil, fmt.Errorf("build request: %w", err)
}
resp, err := registryHTTPClient.Do(req)
if err != nil {
return nil, fmt.Errorf("fetch manifest for %q from %s: %w", name, g.name, err)
}
Expand Down Expand Up @@ -206,8 +210,13 @@ func (s *StaticRegistrySource) SearchPlugins(query string) ([]PluginSearchResult
strings.Contains(strings.ToLower(e.Name), q) ||
strings.Contains(strings.ToLower(e.Description), q) {
results = append(results, PluginSearchResult{
PluginSummary: PluginSummary(e),
Source: s.name,
PluginSummary: PluginSummary{ //nolint:staticcheck // S1016: explicit fields for clarity across struct tag boundaries
Name: e.Name,
Version: e.Version,
Description: e.Description,
Tier: e.Tier,
},
Source: s.name,
})
}
}
Expand All @@ -226,7 +235,8 @@ func (s *StaticRegistrySource) ListPlugins() ([]string, error) {
return names, nil
}

// registryHTTPClient is used for all registry HTTP requests with a reasonable timeout.
// registryHTTPClient is used for all registry HTTP requests (both GitHub and static
// sources) with a reasonable timeout to avoid hangs on network issues.
var registryHTTPClient = &http.Client{Timeout: 30 * time.Second}

// fetch performs an HTTP GET with optional auth token.
Expand Down
19 changes: 19 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,25 @@ func MergeApplicationConfig(appCfg *ApplicationConfig) (*WorkflowConfig, error)
}

combined.Modules = append(combined.Modules, wfCfg.Modules...)

// Merge external plugin declarations — deduplicate by name (first definition wins).
if wfCfg.Plugins != nil && len(wfCfg.Plugins.External) > 0 {
if combined.Plugins == nil {
combined.Plugins = &PluginsConfig{}
}
existingPlugins := make(map[string]struct{}, len(combined.Plugins.External))
for _, ep := range combined.Plugins.External {
existingPlugins[ep.Name] = struct{}{}
}
for _, ep := range wfCfg.Plugins.External {
if _, exists := existingPlugins[ep.Name]; exists {
continue
}
combined.Plugins.External = append(combined.Plugins.External, ep)
existingPlugins[ep.Name] = struct{}{}
}
}
Comment on lines +423 to +439

for k, v := range wfCfg.Workflows {
if existing, exists := combined.Workflows[k]; exists {
// If the existing value is nil (e.g. `http:` with no body in YAML),
Expand Down
66 changes: 66 additions & 0 deletions config/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package config

import (
"os"
"path/filepath"
"reflect"
"strings"
"testing"
Expand Down Expand Up @@ -533,3 +534,68 @@ func writeFileContent(path, content string) error {
func contains(s, substr string) bool {
return strings.Contains(s, substr)
}

func TestMergeApplicationConfig_PluginDedup(t *testing.T) {
dir := t.TempDir()

// Workflow A declares plugin "foo"
wfA := filepath.Join(dir, "a.yaml")
if err := writeFileContent(wfA, `
plugins:
external:
- name: foo
version: "1.0"
repository: "https://example.com/foo"
modules: []
`); err != nil {
t.Fatalf("write a.yaml: %v", err)
}

// Workflow B declares plugin "foo" (duplicate) and "bar"
wfB := filepath.Join(dir, "b.yaml")
if err := writeFileContent(wfB, `
plugins:
external:
- name: foo
version: "2.0"
repository: "https://example.com/foo-v2"
- name: bar
version: "1.0"
repository: "https://example.com/bar"
modules: []
`); err != nil {
t.Fatalf("write b.yaml: %v", err)
}

appCfg := &ApplicationConfig{
Application: ApplicationInfo{
Workflows: []WorkflowRef{
{File: wfA},
{File: wfB},
},
},
}

cfg, err := MergeApplicationConfig(appCfg)
if err != nil {
t.Fatalf("MergeApplicationConfig: %v", err)
}

if cfg.Plugins == nil {
t.Fatal("expected Plugins to be non-nil after merge")
}
if len(cfg.Plugins.External) != 2 {
t.Fatalf("expected 2 plugins (foo + bar), got %d", len(cfg.Plugins.External))
}

// First definition wins — foo should have version "1.0" from workflow A
var fooVer string
for _, p := range cfg.Plugins.External {
if p.Name == "foo" {
fooVer = p.Version
}
}
if fooVer != "1.0" {
t.Errorf("expected foo version 1.0 (first definition wins), got %s", fooVer)
}
}
12 changes: 11 additions & 1 deletion docs/PLUGIN_AUTHORING.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,17 @@ func (p *Provider) CreateModule(typeName, name string, config map[string]any) (s

## Plugin Manifest

The `plugin.json` declares what your plugin provides. The name should match what you passed to `wfctl plugin init`:
The `plugin.json` at the project root declares what your plugin provides. The `name`
field **must match the short name** you passed to `wfctl plugin init` (e.g. `my-plugin`).
This is the name used by the engine for plugin discovery, the `requires.plugins` dependency
check, and `wfctl plugin install`.

> **Note:** The scaffolded `internal/provider.go` returns a manifest with the name prefixed
> as `workflow-plugin-<short-name>` (e.g. `workflow-plugin-my-plugin`). That longer form is
> the canonical name used in the **public registry** (`workflow-registry`) and in release
> artifact URLs. When referencing your plugin in a workflow config's `requires.plugins` or
> `plugins.external`, use the same short name you put in `plugin.json` — the engine resolves
> both forms automatically.

```json
{
Expand Down
19 changes: 14 additions & 5 deletions module/pipeline_step_workflow_call.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type WorkflowCallStep struct {
name string
workflow string // target pipeline name
mode WorkflowCallMode // "sync" (default) or "async"
stopPipeline bool // if true, stop parent pipeline after this step completes
inputMapping map[string]string
outputMapping map[string]string
timeout time.Duration
Expand Down Expand Up @@ -86,6 +87,10 @@ func NewWorkflowCallStepFactory(lookup PipelineLookupFn) StepFactory {
}
}

if v, ok := cfg["stop_pipeline"].(bool); ok {
step.stopPipeline = v
}

return step, nil
}
}
Expand All @@ -101,9 +106,13 @@ func (s *WorkflowCallStep) Execute(ctx context.Context, pc *PipelineContext) (*S
if s.lookup == nil {
return nil, fmt.Errorf("workflow_call step %q: no pipeline lookup function configured", s.name)
}
target, ok := s.lookup(s.workflow)
workflowName, resolveErr := s.tmpl.Resolve(s.workflow, pc)
if resolveErr != nil {
return nil, fmt.Errorf("workflow_call step %q: failed to resolve workflow name %q: %w", s.name, s.workflow, resolveErr)
}
target, ok := s.lookup(workflowName)
Comment on lines +109 to +113
if !ok {
return nil, fmt.Errorf("workflow_call step %q: pipeline %q not found — ensure it is defined in the application config", s.name, s.workflow)
return nil, fmt.Errorf("workflow_call step %q: pipeline %q not found — ensure it is defined in the application config", s.name, workflowName)
}

// Build trigger data from input mapping or fall back to passing all current data
Expand All @@ -130,7 +139,7 @@ func (s *WorkflowCallStep) Execute(ctx context.Context, pc *PipelineContext) (*S
defer cancel()
_, _ = target.Execute(asyncCtx, data) //nolint:errcheck
}(ctx, triggerData)
return &StepResult{Output: map[string]any{"workflow": s.workflow, "mode": "async", "dispatched": true}}, nil
return &StepResult{Output: map[string]any{"workflow": workflowName, "mode": "async", "dispatched": true}, Stop: s.stopPipeline}, nil
}

// Sync mode: apply timeout and wait for result
Expand All @@ -139,7 +148,7 @@ func (s *WorkflowCallStep) Execute(ctx context.Context, pc *PipelineContext) (*S

childCtx, err := target.Execute(syncCtx, triggerData)
if err != nil {
return nil, fmt.Errorf("workflow_call step %q: workflow %q failed: %w", s.name, s.workflow, err)
return nil, fmt.Errorf("workflow_call step %q: workflow %q failed: %w", s.name, workflowName, err)
}

// Map outputs back to parent context
Expand All @@ -153,7 +162,7 @@ func (s *WorkflowCallStep) Execute(ctx context.Context, pc *PipelineContext) (*S
output["result"] = childCtx.Current
}

return &StepResult{Output: output}, nil
return &StepResult{Output: output, Stop: s.stopPipeline}, nil
}

// Ensure interface satisfaction at compile time.
Expand Down
Loading
Loading