Conversation
Implements step.build_from_config (Phase 5.1 roadmap) — a pipeline step that assembles a self-contained Docker image from a workflow config YAML file, a server binary, and optional plugin binaries. - Creates a temp build context, copies config + server + plugin binaries - Generates a Dockerfile with correct ENTRYPOINT/CMD for workflow server - Executes docker build (and optional docker push) via exec.Command - exec.Command is injectable for deterministic unit testing - 17 tests cover factory validation, Dockerfile generation, error paths, push flag, plugin inclusion, and build context file layout - Registers step.build_from_config in plugins/cicd manifest and factory map Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…line steps Implements two new pipeline steps for interacting with state machine workflow instances directly from pipeline execution: - step.statemachine_transition: triggers a named transition on a workflow instance, supports data templates, fail_on_error flag, and the TransitionTrigger interface for testability via mocks. - step.statemachine_get: reads the current state of a workflow instance. Both steps resolve entity_id and data fields from Go templates using the existing TemplateEngine, look up the named StateMachineEngine by service name from the app registry, and return structured output (transition_ok, new_state, current_state, entity_id). Steps are registered in plugins/statemachine with StepFactories() and declared in the manifest's StepTypes list. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…teps Implements Redis-backed caching for the workflow engine: - module/cache_redis.go: CacheModule interface + RedisCache module (cache.redis type) with Get/Set/Delete ops, key prefixing, default TTL, and RedisClient interface for testability - module/pipeline_step_cache_get.go: step.cache_get — reads from cache with template key, configurable output field, miss_ok flag - module/pipeline_step_cache_set.go: step.cache_set — writes to cache with template key+value, optional TTL override - module/pipeline_step_cache_delete.go: step.cache_delete — removes a key from cache - Full test coverage using miniredis (already a project dependency) for the Redis module, and a mockCacheModule for the pipeline step tests - plugins/storage/plugin.go: registers cache.redis module factory and schema (7 module types) - plugins/pipelinesteps/plugin.go: registers step.cache_get/set/delete factories (21 step types) All existing tests continue to pass. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This pull request adds Redis-based caching capabilities, state machine pipeline steps, and a Docker build-from-config CI/CD step to the workflow engine. However, the PR title and description only mention the cache-related functionality, creating a significant mismatch with the actual scope of changes.
Changes:
- Adds
cache.redismodule with Get/Set/Delete operations and three corresponding pipeline steps (step.cache_get,step.cache_set,step.cache_delete) - Adds state machine pipeline steps (
step.statemachine_transition,step.statemachine_get) for triggering and querying workflow state transitions - Adds
step.build_from_configCI/CD step that builds Docker images from workflow config files with optional plugin binaries
Reviewed changes
Copilot reviewed 22 out of 22 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
module/cache_redis.go |
Implements CacheModule interface and RedisCache module with prefix support and configurable TTL |
module/cache_redis_test.go |
Tests RedisCache using miniredis for all operations |
module/pipeline_step_cache_get.go |
Implements cache read step with template support and miss_ok flag |
module/pipeline_step_cache_get_test.go |
Tests cache_get step with mock CacheModule |
module/pipeline_step_cache_set.go |
Implements cache write step with TTL support |
module/pipeline_step_cache_set_test.go |
Tests cache_set step including TTL validation |
module/pipeline_step_cache_delete.go |
Implements cache deletion step |
module/pipeline_step_cache_delete_test.go |
Tests cache_delete step |
module/pipeline_step_statemachine_transition.go |
Implements step for triggering state machine transitions from pipelines |
module/pipeline_step_statemachine_transition_test.go |
Comprehensive tests including mock TransitionTrigger and real StateMachineEngine |
module/pipeline_step_statemachine_get.go |
Implements step for reading current workflow instance state |
module/pipeline_step_statemachine_get_test.go |
Tests state retrieval step |
module/pipeline_step_build_from_config.go |
Implements Docker image builder from workflow config with plugin support |
module/pipeline_step_build_from_config_test.go |
Tests build step with injectable exec.CommandContext for testability |
plugins/storage/plugin.go |
Registers cache.redis module factory with default config handling |
plugins/storage/plugin_test.go |
Updates test counts (6→7 modules, 3→4 capabilities) and adds cache.redis factory test |
plugins/pipelinesteps/plugin.go |
Registers three cache step factories |
plugins/pipelinesteps/plugin_test.go |
Updates test count (18→21 steps) |
plugins/statemachine/plugin.go |
Adds StepFactories method returning two state machine step factories |
plugins/statemachine/plugin_test.go |
Adds TestStepFactories verifying 2 step types |
plugins/cicd/plugin.go |
Registers step.build_from_config factory and updates descriptions |
plugins/cicd/plugin_test.go |
Updates test count (12→13 steps) and adds build_from_config to expected list |
| package module | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
|
|
||
| "github.com/CrisisTextLine/modular" | ||
| ) | ||
|
|
||
| // StateMachineTransitionStep triggers a state machine transition from within a pipeline. | ||
| type StateMachineTransitionStep struct { | ||
| name string | ||
| statemachine string | ||
| entityID string | ||
| event string | ||
| data map[string]any | ||
| failOnError bool | ||
| app modular.Application | ||
| tmpl *TemplateEngine | ||
| } | ||
|
|
||
| // NewStateMachineTransitionStepFactory returns a StepFactory for step.statemachine_transition. | ||
| // | ||
| // Config: | ||
| // | ||
| // type: step.statemachine_transition | ||
| // config: | ||
| // statemachine: "order-sm" # service name of the StateMachineEngine | ||
| // entity_id: "{{.order_id}}" # which instance to transition (template) | ||
| // event: "approve" # transition name | ||
| // data: # optional data map (values may use templates) | ||
| // approved_by: "{{.user_id}}" | ||
| // fail_on_error: false # stop pipeline on invalid transition (default: false) | ||
| // | ||
| // Outputs: transition_ok (bool), new_state (string), error (string, only on failure). | ||
| func NewStateMachineTransitionStepFactory() StepFactory { | ||
| return func(name string, config map[string]any, app modular.Application) (PipelineStep, error) { | ||
| sm, _ := config["statemachine"].(string) | ||
| if sm == "" { | ||
| return nil, fmt.Errorf("statemachine_transition step %q: 'statemachine' is required", name) | ||
| } | ||
|
|
||
| entityID, _ := config["entity_id"].(string) | ||
| if entityID == "" { | ||
| return nil, fmt.Errorf("statemachine_transition step %q: 'entity_id' is required", name) | ||
| } | ||
|
|
||
| event, _ := config["event"].(string) | ||
| if event == "" { | ||
| return nil, fmt.Errorf("statemachine_transition step %q: 'event' is required", name) | ||
| } | ||
|
|
||
| var data map[string]any | ||
| if d, ok := config["data"].(map[string]any); ok { | ||
| data = d | ||
| } | ||
|
|
||
| failOnError, _ := config["fail_on_error"].(bool) | ||
|
|
||
| return &StateMachineTransitionStep{ | ||
| name: name, | ||
| statemachine: sm, | ||
| entityID: entityID, | ||
| event: event, | ||
| data: data, | ||
| failOnError: failOnError, | ||
| app: app, | ||
| tmpl: NewTemplateEngine(), | ||
| }, nil | ||
| } | ||
| } | ||
|
|
||
| // Name returns the step name. | ||
| func (s *StateMachineTransitionStep) Name() string { return s.name } | ||
|
|
||
| // Execute resolves templates, looks up the StateMachineEngine by service name, and | ||
| // triggers the requested transition. On success it sets transition_ok=true and | ||
| // new_state to the resulting state. On failure it sets transition_ok=false and | ||
| // error to the error message; if fail_on_error is true the pipeline is stopped. | ||
| func (s *StateMachineTransitionStep) Execute(ctx context.Context, pc *PipelineContext) (*StepResult, error) { | ||
| if s.app == nil { | ||
| return nil, fmt.Errorf("statemachine_transition step %q: no application context", s.name) | ||
| } | ||
|
|
||
| // Resolve statemachine engine from service registry | ||
| svc, ok := s.app.SvcRegistry()[s.statemachine] | ||
| if !ok { | ||
| return nil, fmt.Errorf("statemachine_transition step %q: statemachine service %q not found", s.name, s.statemachine) | ||
| } | ||
|
|
||
| engine, ok := svc.(*StateMachineEngine) | ||
| if !ok { | ||
| // Also accept the TransitionTrigger interface for testability / mocking | ||
| trigger, ok := svc.(TransitionTrigger) | ||
| if !ok { | ||
| return nil, fmt.Errorf("statemachine_transition step %q: service %q does not implement StateMachineEngine or TransitionTrigger", s.name, s.statemachine) | ||
| } | ||
| return s.executeViaTrigger(ctx, pc, trigger) | ||
| } | ||
|
|
||
| return s.executeViaEngine(ctx, pc, engine) | ||
| } | ||
|
|
||
| func (s *StateMachineTransitionStep) executeViaEngine(ctx context.Context, pc *PipelineContext, engine *StateMachineEngine) (*StepResult, error) { | ||
| entityID, err := s.tmpl.Resolve(s.entityID, pc) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("statemachine_transition step %q: failed to resolve entity_id: %w", s.name, err) | ||
| } | ||
|
|
||
| event, err := s.tmpl.Resolve(s.event, pc) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("statemachine_transition step %q: failed to resolve event: %w", s.name, err) | ||
| } | ||
|
|
||
| data, err := s.tmpl.ResolveMap(s.data, pc) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("statemachine_transition step %q: failed to resolve data: %w", s.name, err) | ||
| } | ||
|
|
||
| transErr := engine.TriggerTransition(ctx, entityID, event, data) | ||
| if transErr != nil { | ||
| if s.failOnError { | ||
| return nil, fmt.Errorf("statemachine_transition step %q: transition failed: %w", s.name, transErr) | ||
| } | ||
| return &StepResult{ | ||
| Output: map[string]any{ | ||
| "transition_ok": false, | ||
| "error": transErr.Error(), | ||
| }, | ||
| }, nil | ||
| } | ||
|
|
||
| // Fetch the new state from the engine | ||
| instance, err := engine.GetInstance(entityID) | ||
| if err != nil { | ||
| // Transition succeeded but we can't read new state — treat as success with unknown state | ||
| return &StepResult{ | ||
| Output: map[string]any{ | ||
| "transition_ok": true, | ||
| "new_state": "", | ||
| }, | ||
| }, nil | ||
| } | ||
|
|
||
| return &StepResult{ | ||
| Output: map[string]any{ | ||
| "transition_ok": true, | ||
| "new_state": instance.CurrentState, | ||
| }, | ||
| }, nil | ||
| } | ||
|
|
||
| func (s *StateMachineTransitionStep) executeViaTrigger(ctx context.Context, pc *PipelineContext, trigger TransitionTrigger) (*StepResult, error) { | ||
| entityID, err := s.tmpl.Resolve(s.entityID, pc) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("statemachine_transition step %q: failed to resolve entity_id: %w", s.name, err) | ||
| } | ||
|
|
||
| event, err := s.tmpl.Resolve(s.event, pc) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("statemachine_transition step %q: failed to resolve event: %w", s.name, err) | ||
| } | ||
|
|
||
| data, err := s.tmpl.ResolveMap(s.data, pc) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("statemachine_transition step %q: failed to resolve data: %w", s.name, err) | ||
| } | ||
|
|
||
| transErr := trigger.TriggerTransition(ctx, entityID, event, data) | ||
| if transErr != nil { | ||
| if s.failOnError { | ||
| return nil, fmt.Errorf("statemachine_transition step %q: transition failed: %w", s.name, transErr) | ||
| } | ||
| return &StepResult{ | ||
| Output: map[string]any{ | ||
| "transition_ok": false, | ||
| "error": transErr.Error(), | ||
| }, | ||
| }, nil | ||
| } | ||
|
|
||
| return &StepResult{ | ||
| Output: map[string]any{ | ||
| "transition_ok": true, | ||
| "new_state": "", | ||
| }, | ||
| }, nil | ||
| } |
There was a problem hiding this comment.
This file (pipeline_step_statemachine_transition.go) and its test file are included in this PR, but the PR title and description only mention cache-related functionality. The PR description does not mention the addition of state machine pipeline steps (step.statemachine_transition and step.statemachine_get). This creates a discrepancy between what the PR claims to do and what it actually changes. Consider either updating the PR description to include these state machine steps, or moving them to a separate PR for better change tracking and review.
| package module | ||
|
|
||
| import ( | ||
| "bytes" | ||
| "context" | ||
| "fmt" | ||
| "io" | ||
| "os" | ||
| "os/exec" | ||
| "path/filepath" | ||
| "strings" | ||
|
|
||
| "github.com/CrisisTextLine/modular" | ||
| ) | ||
|
|
||
| // PluginSpec describes a plugin binary to include in the built image. | ||
| type PluginSpec struct { | ||
| Name string | ||
| Binary string | ||
| } | ||
|
|
||
| // BuildFromConfigStep reads a workflow config YAML file, assembles a Docker | ||
| // build context with the server binary and any required plugin binaries, | ||
| // generates a Dockerfile, builds the image, and optionally pushes it. | ||
| type BuildFromConfigStep struct { | ||
| name string | ||
| configFile string | ||
| baseImage string | ||
| serverBinary string | ||
| tag string | ||
| push bool | ||
| plugins []PluginSpec | ||
|
|
||
| // execCommand is the function used to create exec.Cmd instances. | ||
| // Defaults to exec.CommandContext; overridable in tests. | ||
| execCommand func(ctx context.Context, name string, args ...string) *exec.Cmd | ||
| } | ||
|
|
||
| // NewBuildFromConfigStepFactory returns a StepFactory that creates BuildFromConfigStep instances. | ||
| func NewBuildFromConfigStepFactory() StepFactory { | ||
| return func(name string, config map[string]any, _ modular.Application) (PipelineStep, error) { | ||
| configFile, _ := config["config_file"].(string) | ||
| if configFile == "" { | ||
| return nil, fmt.Errorf("build_from_config step %q: 'config_file' is required", name) | ||
| } | ||
|
|
||
| tag, _ := config["tag"].(string) | ||
| if tag == "" { | ||
| return nil, fmt.Errorf("build_from_config step %q: 'tag' is required", name) | ||
| } | ||
|
|
||
| baseImage, _ := config["base_image"].(string) | ||
| if baseImage == "" { | ||
| baseImage = "ghcr.io/gocodealone/workflow-runtime:latest" | ||
| } | ||
|
|
||
| serverBinary, _ := config["server_binary"].(string) | ||
| if serverBinary == "" { | ||
| serverBinary = "/usr/local/bin/workflow-server" | ||
| } | ||
|
|
||
| push, _ := config["push"].(bool) | ||
|
|
||
| var plugins []PluginSpec | ||
| if pluginsRaw, ok := config["plugins"].([]any); ok { | ||
| for i, p := range pluginsRaw { | ||
| m, ok := p.(map[string]any) | ||
| if !ok { | ||
| return nil, fmt.Errorf("build_from_config step %q: plugins[%d] must be a map", name, i) | ||
| } | ||
| pName, _ := m["name"].(string) | ||
| pBinary, _ := m["binary"].(string) | ||
| if pName == "" || pBinary == "" { | ||
| return nil, fmt.Errorf("build_from_config step %q: plugins[%d] requires 'name' and 'binary'", name, i) | ||
| } | ||
| plugins = append(plugins, PluginSpec{Name: pName, Binary: pBinary}) | ||
| } | ||
| } | ||
|
|
||
| return &BuildFromConfigStep{ | ||
| name: name, | ||
| configFile: configFile, | ||
| baseImage: baseImage, | ||
| serverBinary: serverBinary, | ||
| tag: tag, | ||
| push: push, | ||
| plugins: plugins, | ||
| execCommand: exec.CommandContext, | ||
| }, nil | ||
| } | ||
| } | ||
|
|
||
| // Name returns the step name. | ||
| func (s *BuildFromConfigStep) Name() string { return s.name } | ||
|
|
||
| // Execute assembles the build context, generates a Dockerfile, builds the | ||
| // Docker image, and optionally pushes it. | ||
| func (s *BuildFromConfigStep) Execute(ctx context.Context, _ *PipelineContext) (*StepResult, error) { | ||
| // Validate that the config file exists. | ||
| if _, err := os.Stat(s.configFile); err != nil { | ||
| return nil, fmt.Errorf("build_from_config step %q: config_file %q not found: %w", s.name, s.configFile, err) | ||
| } | ||
|
|
||
| // Validate that the server binary exists. | ||
| if _, err := os.Stat(s.serverBinary); err != nil { | ||
| return nil, fmt.Errorf("build_from_config step %q: server_binary %q not found: %w", s.name, s.serverBinary, err) | ||
| } | ||
|
|
||
| // Create a temporary build context directory. | ||
| buildDir, err := os.MkdirTemp("", "workflow-build-*") | ||
| if err != nil { | ||
| return nil, fmt.Errorf("build_from_config step %q: failed to create temp build dir: %w", s.name, err) | ||
| } | ||
| defer os.RemoveAll(buildDir) | ||
|
|
||
| // Copy config file into build context as config.yaml. | ||
| if err := copyFile(s.configFile, filepath.Join(buildDir, "config.yaml")); err != nil { | ||
| return nil, fmt.Errorf("build_from_config step %q: failed to copy config file: %w", s.name, err) | ||
| } | ||
|
|
||
| // Copy server binary into build context as server. | ||
| serverDst := filepath.Join(buildDir, "server") | ||
| if err := copyFile(s.serverBinary, serverDst); err != nil { | ||
| return nil, fmt.Errorf("build_from_config step %q: failed to copy server binary: %w", s.name, err) | ||
| } | ||
| if err := os.Chmod(serverDst, 0755); err != nil { //nolint:gosec // G302: intentionally executable | ||
| return nil, fmt.Errorf("build_from_config step %q: failed to chmod server binary: %w", s.name, err) | ||
| } | ||
|
|
||
| // Copy plugin binaries into build context under plugins/<name>/. | ||
| pluginsDir := filepath.Join(buildDir, "plugins") | ||
| for _, plugin := range s.plugins { | ||
| if _, err := os.Stat(plugin.Binary); err != nil { | ||
| return nil, fmt.Errorf("build_from_config step %q: plugin %q binary %q not found: %w", | ||
| s.name, plugin.Name, plugin.Binary, err) | ||
| } | ||
| pluginDir := filepath.Join(pluginsDir, plugin.Name) | ||
| if err := os.MkdirAll(pluginDir, 0750); err != nil { | ||
| return nil, fmt.Errorf("build_from_config step %q: failed to create plugin dir for %q: %w", | ||
| s.name, plugin.Name, err) | ||
| } | ||
| pluginBinaryName := filepath.Base(plugin.Binary) | ||
| pluginDst := filepath.Join(pluginDir, pluginBinaryName) | ||
| if err := copyFile(plugin.Binary, pluginDst); err != nil { | ||
| return nil, fmt.Errorf("build_from_config step %q: failed to copy plugin %q binary: %w", | ||
| s.name, plugin.Name, err) | ||
| } | ||
| if err := os.Chmod(pluginDst, 0755); err != nil { //nolint:gosec // G302: intentionally executable | ||
| return nil, fmt.Errorf("build_from_config step %q: failed to chmod plugin %q binary: %w", | ||
| s.name, plugin.Name, err) | ||
| } | ||
| } | ||
|
|
||
| // Generate Dockerfile content. | ||
| dockerfileContent := s.generateDockerfile() | ||
|
|
||
| // Write Dockerfile into build context. | ||
| dockerfilePath := filepath.Join(buildDir, "Dockerfile") | ||
| if err := os.WriteFile(dockerfilePath, []byte(dockerfileContent), 0600); err != nil { | ||
| return nil, fmt.Errorf("build_from_config step %q: failed to write Dockerfile: %w", s.name, err) | ||
| } | ||
|
|
||
| // Execute docker build. | ||
| if err := s.runDockerBuild(ctx, buildDir); err != nil { | ||
| return nil, fmt.Errorf("build_from_config step %q: docker build failed: %w", s.name, err) | ||
| } | ||
|
|
||
| // Optionally push the image. | ||
| if s.push { | ||
| if err := s.runDockerPush(ctx); err != nil { | ||
| return nil, fmt.Errorf("build_from_config step %q: docker push failed: %w", s.name, err) | ||
| } | ||
| } | ||
|
|
||
| return &StepResult{ | ||
| Output: map[string]any{ | ||
| "image_tag": s.tag, | ||
| "dockerfile_content": dockerfileContent, | ||
| }, | ||
| }, nil | ||
| } | ||
|
|
||
| // generateDockerfile returns a Dockerfile string for the build context layout. | ||
| func (s *BuildFromConfigStep) generateDockerfile() string { | ||
| var sb strings.Builder | ||
|
|
||
| fmt.Fprintf(&sb, "FROM %s\n", s.baseImage) | ||
| sb.WriteString("COPY server /server\n") | ||
| sb.WriteString("COPY config.yaml /app/config.yaml\n") | ||
|
|
||
| if len(s.plugins) > 0 { | ||
| sb.WriteString("COPY plugins/ /app/data/plugins/\n") | ||
| } | ||
|
|
||
| sb.WriteString("WORKDIR /app\n") | ||
| sb.WriteString("ENTRYPOINT [\"/server\"]\n") | ||
| sb.WriteString("CMD [\"-config\", \"/app/config.yaml\", \"-data-dir\", \"/app/data\"]\n") | ||
|
|
||
| return sb.String() | ||
| } | ||
|
|
||
| // runDockerBuild executes "docker build -t <tag> <buildDir>". | ||
| func (s *BuildFromConfigStep) runDockerBuild(ctx context.Context, buildDir string) error { | ||
| var stdout, stderr bytes.Buffer | ||
| cmd := s.execCommand(ctx, "docker", "build", "-t", s.tag, buildDir) //nolint:gosec // G204: tag from trusted pipeline config | ||
| cmd.Stdout = &stdout | ||
| cmd.Stderr = &stderr | ||
|
|
||
| if err := cmd.Run(); err != nil { | ||
| return fmt.Errorf("%w\nstdout: %s\nstderr: %s", err, stdout.String(), stderr.String()) | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| // runDockerPush executes "docker push <tag>". | ||
| func (s *BuildFromConfigStep) runDockerPush(ctx context.Context) error { | ||
| var stdout, stderr bytes.Buffer | ||
| cmd := s.execCommand(ctx, "docker", "push", s.tag) //nolint:gosec // G204: tag from trusted pipeline config | ||
| cmd.Stdout = &stdout | ||
| cmd.Stderr = &stderr | ||
|
|
||
| if err := cmd.Run(); err != nil { | ||
| return fmt.Errorf("%w\nstdout: %s\nstderr: %s", err, stdout.String(), stderr.String()) | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| // copyFile copies src to dst, creating dst if it does not exist. | ||
| func copyFile(src, dst string) error { | ||
| in, err := os.Open(src) //nolint:gosec // G304: path from trusted pipeline config | ||
| if err != nil { | ||
| return fmt.Errorf("open %q: %w", src, err) | ||
| } | ||
| defer in.Close() | ||
|
|
||
| out, err := os.OpenFile(dst, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600) | ||
| if err != nil { | ||
| return fmt.Errorf("create %q: %w", dst, err) | ||
| } | ||
| defer out.Close() | ||
|
|
||
| if _, err := io.Copy(out, in); err != nil { | ||
| return fmt.Errorf("copy %q -> %q: %w", src, dst, err) | ||
| } | ||
| return nil | ||
| } |
There was a problem hiding this comment.
This file (pipeline_step_build_from_config.go) and its test file are included in this PR, but the PR title and description only mention cache-related functionality. The PR description does not mention the addition of the CI/CD build_from_config pipeline step. This creates a discrepancy between what the PR claims to do and what it actually changes. Consider either updating the PR description to include this build step, or moving it to a separate PR for better change tracking and review.
| if errors.Is(err, redis.Nil) { | ||
| // Cache miss | ||
| if !s.missOK { | ||
| return nil, fmt.Errorf("cache_get step %q: cache miss for key %q", s.name, resolvedKey) | ||
| } | ||
| return &StepResult{Output: map[string]any{ | ||
| s.output: "", | ||
| "cache_hit": false, | ||
| }}, nil |
There was a problem hiding this comment.
The CacheGetStep is tightly coupled to the Redis implementation by directly checking for redis.Nil at line 78. This creates a layer violation because the step should only depend on the CacheModule interface, not on specific implementation details from go-redis. If a different cache implementation (e.g., Memcached, in-memory) were added in the future, it would need to return redis.Nil errors to work with this step.
Consider defining a custom sentinel error like ErrCacheMiss in the CacheModule interface or module package, and having RedisCache wrap redis.Nil with this error. This would maintain proper abstraction boundaries and make the code more extensible.
| } | ||
| } | ||
|
|
||
| func TestBuildFromConfigStep_GenerateDockerfile_NoPLugins(t *testing.T) { |
There was a problem hiding this comment.
The test function name contains a typo: "NoPLugins" should be "NoPlugins" (lowercase 'l').
| func TestBuildFromConfigStep_GenerateDockerfile_NoPLugins(t *testing.T) { | |
| func TestBuildFromConfigStep_GenerateDockerfile_NoPlugins(t *testing.T) { |
| // generateDockerfile returns a Dockerfile string for the build context layout. | ||
| func (s *BuildFromConfigStep) generateDockerfile() string { | ||
| var sb strings.Builder | ||
|
|
||
| fmt.Fprintf(&sb, "FROM %s\n", s.baseImage) |
There was a problem hiding this comment.
The baseImage field is directly interpolated into the Dockerfile using fmt.Fprintf without any validation or sanitization (line 187). While the nolint comment on lines 205 and 218 mentions "trusted pipeline config," a malicious or misconfigured baseImage value could potentially inject arbitrary Dockerfile commands through newline characters.
Consider validating the baseImage field to ensure it doesn't contain newline characters or other Dockerfile meta-characters that could be used for injection. For example:
- Reject values containing '\n' or '\r'
- Optionally validate against a regex pattern for valid Docker image names
| // generateDockerfile returns a Dockerfile string for the build context layout. | |
| func (s *BuildFromConfigStep) generateDockerfile() string { | |
| var sb strings.Builder | |
| fmt.Fprintf(&sb, "FROM %s\n", s.baseImage) | |
| func sanitizeBaseImage(baseImage string) string { | |
| // Remove any newline or carriage return characters to prevent Dockerfile injection, | |
| // then trim surrounding whitespace. If the result is empty, fall back to a safe base image. | |
| baseImage = strings.ReplaceAll(baseImage, "\r", "") | |
| baseImage = strings.ReplaceAll(baseImage, "\n", "") | |
| baseImage = strings.TrimSpace(baseImage) | |
| if baseImage == "" { | |
| return "scratch" | |
| } | |
| return baseImage | |
| } | |
| // generateDockerfile returns a Dockerfile string for the build context layout. | |
| func (s *BuildFromConfigStep) generateDockerfile() string { | |
| var sb strings.Builder | |
| baseImage := sanitizeBaseImage(s.baseImage) | |
| fmt.Fprintf(&sb, "FROM %s\n", baseImage) |
Summary
cache.redismodule backed bygithub.com/redis/go-redis/v9(already in go.mod) that provides aCacheModuleinterface withGet/Set/Deleteoperations, configurable key prefix, and default TTLstep.cache_get,step.cache_set,step.cache_delete— all support Go template expressions in key/value fields and resolve the cache by named service referenceplugins/storage(7 module types) and the new steps inplugins/pipelinesteps(21 step types)cache_redis_test.gousesminiredis(already a project dependency); step tests use amockCacheModuleimplementing theCacheModuleinterfaceNew files
module/cache_redis.goCacheModuleinterface +RedisCachemodule implementationmodule/pipeline_step_cache_get.gostep.cache_getfactorymodule/pipeline_step_cache_set.gostep.cache_setfactorymodule/pipeline_step_cache_delete.gostep.cache_deletefactorymodule/cache_redis_test.gomodule/pipeline_step_cache_get_test.gomodule/pipeline_step_cache_set_test.gomodule/pipeline_step_cache_delete_test.goExample YAML config
Test plan
go build ./...passesgo test ./module/...passes (12.8s)go test ./plugins/storage/...passesgo test ./plugins/pipelinesteps/...passesgo test ./...— full suite passes with no regressions🤖 Generated with Claude Code