diff --git a/module/cache_redis.go b/module/cache_redis.go new file mode 100644 index 00000000..3571b485 --- /dev/null +++ b/module/cache_redis.go @@ -0,0 +1,161 @@ +package module + +import ( + "context" + "fmt" + "os" + "time" + + "github.com/CrisisTextLine/modular" + "github.com/redis/go-redis/v9" +) + +// CacheModule defines the interface for cache operations used by pipeline steps. +type CacheModule interface { + Get(ctx context.Context, key string) (string, error) + Set(ctx context.Context, key, value string, ttl time.Duration) error + Delete(ctx context.Context, key string) error +} + +// RedisClient is the subset of go-redis client methods used by RedisCache. +// Keeping it as an interface enables mocking in tests. +type RedisClient interface { + Ping(ctx context.Context) *redis.StatusCmd + Get(ctx context.Context, key string) *redis.StringCmd + Set(ctx context.Context, key string, value any, expiration time.Duration) *redis.StatusCmd + Del(ctx context.Context, keys ...string) *redis.IntCmd + Close() error +} + +// RedisCacheConfig holds configuration for the cache.redis module. +type RedisCacheConfig struct { + Address string + Password string + DB int + Prefix string + DefaultTTL time.Duration +} + +// RedisCache is a module that connects to a Redis instance and exposes +// Get/Set/Delete operations for use by pipeline steps. +type RedisCache struct { + name string + cfg RedisCacheConfig + client RedisClient + logger modular.Logger +} + +// NewRedisCache creates a new RedisCache module with the given name and config. +func NewRedisCache(name string, cfg RedisCacheConfig) *RedisCache { + return &RedisCache{ + name: name, + cfg: cfg, + logger: &noopLogger{}, + } +} + +// NewRedisCacheWithClient creates a RedisCache backed by a pre-built client. +// This is intended for testing only. +func NewRedisCacheWithClient(name string, cfg RedisCacheConfig, client RedisClient) *RedisCache { + return &RedisCache{ + name: name, + cfg: cfg, + client: client, + logger: &noopLogger{}, + } +} + +func (r *RedisCache) Name() string { return r.name } + +func (r *RedisCache) Init(app modular.Application) error { + r.logger = app.Logger() + return nil +} + +// Start connects to Redis and verifies the connection with PING. +func (r *RedisCache) Start(ctx context.Context) error { + if r.client != nil { + // Already set (e.g. in tests) + return nil + } + + opts := &redis.Options{ + Addr: r.cfg.Address, + DB: r.cfg.DB, + } + if r.cfg.Password != "" { + opts.Password = r.cfg.Password + } + + r.client = redis.NewClient(opts) + + if err := r.client.Ping(ctx).Err(); err != nil { + _ = r.client.Close() + r.client = nil + return fmt.Errorf("cache.redis %q: ping failed: %w", r.name, err) + } + + r.logger.Info("Redis cache started", "name", r.name, "address", r.cfg.Address) + return nil +} + +// Stop closes the Redis connection. +func (r *RedisCache) Stop(_ context.Context) error { + if r.client != nil { + r.logger.Info("Redis cache stopped", "name", r.name) + return r.client.Close() + } + return nil +} + +// Get retrieves a value from Redis by key (with prefix applied). +// Returns redis.Nil wrapped in an error when the key does not exist. +func (r *RedisCache) Get(ctx context.Context, key string) (string, error) { + if r.client == nil { + return "", fmt.Errorf("cache.redis %q: not started", r.name) + } + val, err := r.client.Get(ctx, r.prefixed(key)).Result() + if err != nil { + return "", err + } + return val, nil +} + +// Set stores a value in Redis with optional TTL. A zero duration uses the +// module-level default; if the default is also zero the key never expires. +func (r *RedisCache) Set(ctx context.Context, key, value string, ttl time.Duration) error { + if r.client == nil { + return fmt.Errorf("cache.redis %q: not started", r.name) + } + if ttl == 0 { + ttl = r.cfg.DefaultTTL + } + return r.client.Set(ctx, r.prefixed(key), value, ttl).Err() +} + +// Delete removes a key from Redis (with prefix applied). +func (r *RedisCache) Delete(ctx context.Context, key string) error { + if r.client == nil { + return fmt.Errorf("cache.redis %q: not started", r.name) + } + return r.client.Del(ctx, r.prefixed(key)).Err() +} + +func (r *RedisCache) prefixed(key string) string { + return r.cfg.Prefix + key +} + +func (r *RedisCache) ProvidesServices() []modular.ServiceProvider { + return []modular.ServiceProvider{ + {Name: r.name, Description: "Redis cache connection", Instance: r}, + } +} + +func (r *RedisCache) RequiresServices() []modular.ServiceDependency { + return nil +} + +// ExpandEnvString resolves ${VAR} and $VAR environment variable references. +func ExpandEnvString(s string) string { + return os.ExpandEnv(s) +} diff --git a/module/cache_redis_test.go b/module/cache_redis_test.go new file mode 100644 index 00000000..87da5423 --- /dev/null +++ b/module/cache_redis_test.go @@ -0,0 +1,173 @@ +package module + +import ( + "context" + "testing" + "time" + + "github.com/alicebob/miniredis/v2" + "github.com/redis/go-redis/v9" +) + +// newTestRedisCache creates a RedisCache backed by a miniredis server. +func newTestRedisCache(t *testing.T) (*RedisCache, *miniredis.Miniredis) { + t.Helper() + mr := miniredis.RunT(t) + client := redis.NewClient(&redis.Options{Addr: mr.Addr()}) + t.Cleanup(func() { client.Close() }) + + cfg := RedisCacheConfig{ + Address: mr.Addr(), + Prefix: "test:", + DefaultTTL: time.Hour, + } + cache := NewRedisCacheWithClient("cache", cfg, client) + return cache, mr +} + +func TestRedisCacheGetSetDelete(t *testing.T) { + ctx := context.Background() + cache, _ := newTestRedisCache(t) + + // Set a value + if err := cache.Set(ctx, "mykey", "myvalue", 0); err != nil { + t.Fatalf("Set failed: %v", err) + } + + // Get it back + val, err := cache.Get(ctx, "mykey") + if err != nil { + t.Fatalf("Get failed: %v", err) + } + if val != "myvalue" { + t.Errorf("expected %q, got %q", "myvalue", val) + } + + // Delete it + if err := cache.Delete(ctx, "mykey"); err != nil { + t.Fatalf("Delete failed: %v", err) + } + + // Get after delete should return redis.Nil + _, err = cache.Get(ctx, "mykey") + if err == nil { + t.Fatal("expected error after delete, got nil") + } +} + +func TestRedisCacheKeyPrefix(t *testing.T) { + ctx := context.Background() + cache, mr := newTestRedisCache(t) + + if err := cache.Set(ctx, "hello", "world", 0); err != nil { + t.Fatalf("Set failed: %v", err) + } + + // Verify prefix is stored in miniredis + keys := mr.Keys() + found := false + for _, k := range keys { + if k == "test:hello" { + found = true + break + } + } + if !found { + t.Errorf("expected key %q in redis, got keys: %v", "test:hello", keys) + } +} + +func TestRedisCacheDefaultTTL(t *testing.T) { + ctx := context.Background() + cache, mr := newTestRedisCache(t) + + // Set with TTL=0 should use DefaultTTL (1 hour) + if err := cache.Set(ctx, "ttlkey", "ttlval", 0); err != nil { + t.Fatalf("Set failed: %v", err) + } + + ttl := mr.TTL("test:ttlkey") + if ttl <= 0 { + t.Errorf("expected positive TTL, got %v", ttl) + } +} + +func TestRedisCacheExplicitTTL(t *testing.T) { + ctx := context.Background() + cache, mr := newTestRedisCache(t) + + // Set with explicit TTL=30m + if err := cache.Set(ctx, "short", "val", 30*time.Minute); err != nil { + t.Fatalf("Set failed: %v", err) + } + + ttl := mr.TTL("test:short") + // miniredis reports TTL in seconds-level precision; just verify it's set + if ttl <= 0 { + t.Errorf("expected positive TTL, got %v", ttl) + } + if ttl > time.Hour { + t.Errorf("expected TTL <= 1h, got %v", ttl) + } +} + +func TestRedisCacheMiss(t *testing.T) { + ctx := context.Background() + cache, _ := newTestRedisCache(t) + + _, err := cache.Get(ctx, "nonexistent") + if err == nil { + t.Fatal("expected error for missing key") + } +} + +func TestRedisCacheNotStarted(t *testing.T) { + ctx := context.Background() + cfg := RedisCacheConfig{Address: "localhost:6379", Prefix: "wf:"} + cache := NewRedisCache("cache", cfg) + + if _, err := cache.Get(ctx, "k"); err == nil { + t.Error("expected error from Get when not started") + } + if err := cache.Set(ctx, "k", "v", 0); err == nil { + t.Error("expected error from Set when not started") + } + if err := cache.Delete(ctx, "k"); err == nil { + t.Error("expected error from Delete when not started") + } +} + +func TestRedisCacheInit(t *testing.T) { + cfg := RedisCacheConfig{Address: "localhost:6379", Prefix: "wf:"} + cache := NewRedisCache("cache", cfg) + app := NewMockApplication() + + if err := cache.Init(app); err != nil { + t.Fatalf("Init failed: %v", err) + } +} + +func TestRedisCacheStop(t *testing.T) { + ctx := context.Background() + cache, _ := newTestRedisCache(t) + + if err := cache.Stop(ctx); err != nil { + t.Fatalf("Stop failed: %v", err) + } + // Stop when already nil is a no-op + cache2 := NewRedisCache("cache2", RedisCacheConfig{}) + if err := cache2.Stop(ctx); err != nil { + t.Fatalf("Stop on uninitialised cache failed: %v", err) + } +} + +func TestRedisCacheProvidesServices(t *testing.T) { + cache := NewRedisCache("mycache", RedisCacheConfig{}) + svcs := cache.ProvidesServices() + if len(svcs) != 1 { + t.Fatalf("expected 1 service, got %d", len(svcs)) + } + if svcs[0].Name != "mycache" { + t.Errorf("expected service name %q, got %q", "mycache", svcs[0].Name) + } +} diff --git a/module/pipeline_step_build_from_config.go b/module/pipeline_step_build_from_config.go new file mode 100644 index 00000000..6c709f37 --- /dev/null +++ b/module/pipeline_step_build_from_config.go @@ -0,0 +1,246 @@ +package module + +import ( + "bytes" + "context" + "fmt" + "io" + "os" + "os/exec" + "path/filepath" + "strings" + + "github.com/CrisisTextLine/modular" +) + +// PluginSpec describes a plugin binary to include in the built image. +type PluginSpec struct { + Name string + Binary string +} + +// BuildFromConfigStep reads a workflow config YAML file, assembles a Docker +// build context with the server binary and any required plugin binaries, +// generates a Dockerfile, builds the image, and optionally pushes it. +type BuildFromConfigStep struct { + name string + configFile string + baseImage string + serverBinary string + tag string + push bool + plugins []PluginSpec + + // execCommand is the function used to create exec.Cmd instances. + // Defaults to exec.CommandContext; overridable in tests. + execCommand func(ctx context.Context, name string, args ...string) *exec.Cmd +} + +// NewBuildFromConfigStepFactory returns a StepFactory that creates BuildFromConfigStep instances. +func NewBuildFromConfigStepFactory() StepFactory { + return func(name string, config map[string]any, _ modular.Application) (PipelineStep, error) { + configFile, _ := config["config_file"].(string) + if configFile == "" { + return nil, fmt.Errorf("build_from_config step %q: 'config_file' is required", name) + } + + tag, _ := config["tag"].(string) + if tag == "" { + return nil, fmt.Errorf("build_from_config step %q: 'tag' is required", name) + } + + baseImage, _ := config["base_image"].(string) + if baseImage == "" { + baseImage = "ghcr.io/gocodealone/workflow-runtime:latest" + } + + serverBinary, _ := config["server_binary"].(string) + if serverBinary == "" { + serverBinary = "/usr/local/bin/workflow-server" + } + + push, _ := config["push"].(bool) + + var plugins []PluginSpec + if pluginsRaw, ok := config["plugins"].([]any); ok { + for i, p := range pluginsRaw { + m, ok := p.(map[string]any) + if !ok { + return nil, fmt.Errorf("build_from_config step %q: plugins[%d] must be a map", name, i) + } + pName, _ := m["name"].(string) + pBinary, _ := m["binary"].(string) + if pName == "" || pBinary == "" { + return nil, fmt.Errorf("build_from_config step %q: plugins[%d] requires 'name' and 'binary'", name, i) + } + plugins = append(plugins, PluginSpec{Name: pName, Binary: pBinary}) + } + } + + return &BuildFromConfigStep{ + name: name, + configFile: configFile, + baseImage: baseImage, + serverBinary: serverBinary, + tag: tag, + push: push, + plugins: plugins, + execCommand: exec.CommandContext, + }, nil + } +} + +// Name returns the step name. +func (s *BuildFromConfigStep) Name() string { return s.name } + +// Execute assembles the build context, generates a Dockerfile, builds the +// Docker image, and optionally pushes it. +func (s *BuildFromConfigStep) Execute(ctx context.Context, _ *PipelineContext) (*StepResult, error) { + // Validate that the config file exists. + if _, err := os.Stat(s.configFile); err != nil { + return nil, fmt.Errorf("build_from_config step %q: config_file %q not found: %w", s.name, s.configFile, err) + } + + // Validate that the server binary exists. + if _, err := os.Stat(s.serverBinary); err != nil { + return nil, fmt.Errorf("build_from_config step %q: server_binary %q not found: %w", s.name, s.serverBinary, err) + } + + // Create a temporary build context directory. + buildDir, err := os.MkdirTemp("", "workflow-build-*") + if err != nil { + return nil, fmt.Errorf("build_from_config step %q: failed to create temp build dir: %w", s.name, err) + } + defer os.RemoveAll(buildDir) + + // Copy config file into build context as config.yaml. + if err := copyFile(s.configFile, filepath.Join(buildDir, "config.yaml")); err != nil { + return nil, fmt.Errorf("build_from_config step %q: failed to copy config file: %w", s.name, err) + } + + // Copy server binary into build context as server. + serverDst := filepath.Join(buildDir, "server") + if err := copyFile(s.serverBinary, serverDst); err != nil { + return nil, fmt.Errorf("build_from_config step %q: failed to copy server binary: %w", s.name, err) + } + if err := os.Chmod(serverDst, 0755); err != nil { //nolint:gosec // G302: intentionally executable + return nil, fmt.Errorf("build_from_config step %q: failed to chmod server binary: %w", s.name, err) + } + + // Copy plugin binaries into build context under plugins//. + pluginsDir := filepath.Join(buildDir, "plugins") + for _, plugin := range s.plugins { + if _, err := os.Stat(plugin.Binary); err != nil { + return nil, fmt.Errorf("build_from_config step %q: plugin %q binary %q not found: %w", + s.name, plugin.Name, plugin.Binary, err) + } + pluginDir := filepath.Join(pluginsDir, plugin.Name) + if err := os.MkdirAll(pluginDir, 0750); err != nil { + return nil, fmt.Errorf("build_from_config step %q: failed to create plugin dir for %q: %w", + s.name, plugin.Name, err) + } + pluginBinaryName := filepath.Base(plugin.Binary) + pluginDst := filepath.Join(pluginDir, pluginBinaryName) + if err := copyFile(plugin.Binary, pluginDst); err != nil { + return nil, fmt.Errorf("build_from_config step %q: failed to copy plugin %q binary: %w", + s.name, plugin.Name, err) + } + if err := os.Chmod(pluginDst, 0755); err != nil { //nolint:gosec // G302: intentionally executable + return nil, fmt.Errorf("build_from_config step %q: failed to chmod plugin %q binary: %w", + s.name, plugin.Name, err) + } + } + + // Generate Dockerfile content. + dockerfileContent := s.generateDockerfile() + + // Write Dockerfile into build context. + dockerfilePath := filepath.Join(buildDir, "Dockerfile") + if err := os.WriteFile(dockerfilePath, []byte(dockerfileContent), 0600); err != nil { + return nil, fmt.Errorf("build_from_config step %q: failed to write Dockerfile: %w", s.name, err) + } + + // Execute docker build. + if err := s.runDockerBuild(ctx, buildDir); err != nil { + return nil, fmt.Errorf("build_from_config step %q: docker build failed: %w", s.name, err) + } + + // Optionally push the image. + if s.push { + if err := s.runDockerPush(ctx); err != nil { + return nil, fmt.Errorf("build_from_config step %q: docker push failed: %w", s.name, err) + } + } + + return &StepResult{ + Output: map[string]any{ + "image_tag": s.tag, + "dockerfile_content": dockerfileContent, + }, + }, nil +} + +// generateDockerfile returns a Dockerfile string for the build context layout. +func (s *BuildFromConfigStep) generateDockerfile() string { + var sb strings.Builder + + fmt.Fprintf(&sb, "FROM %s\n", s.baseImage) + sb.WriteString("COPY server /server\n") + sb.WriteString("COPY config.yaml /app/config.yaml\n") + + if len(s.plugins) > 0 { + sb.WriteString("COPY plugins/ /app/data/plugins/\n") + } + + sb.WriteString("WORKDIR /app\n") + sb.WriteString("ENTRYPOINT [\"/server\"]\n") + sb.WriteString("CMD [\"-config\", \"/app/config.yaml\", \"-data-dir\", \"/app/data\"]\n") + + return sb.String() +} + +// runDockerBuild executes "docker build -t ". +func (s *BuildFromConfigStep) runDockerBuild(ctx context.Context, buildDir string) error { + var stdout, stderr bytes.Buffer + cmd := s.execCommand(ctx, "docker", "build", "-t", s.tag, buildDir) //nolint:gosec // G204: tag from trusted pipeline config + cmd.Stdout = &stdout + cmd.Stderr = &stderr + + if err := cmd.Run(); err != nil { + return fmt.Errorf("%w\nstdout: %s\nstderr: %s", err, stdout.String(), stderr.String()) + } + return nil +} + +// runDockerPush executes "docker push ". +func (s *BuildFromConfigStep) runDockerPush(ctx context.Context) error { + var stdout, stderr bytes.Buffer + cmd := s.execCommand(ctx, "docker", "push", s.tag) //nolint:gosec // G204: tag from trusted pipeline config + cmd.Stdout = &stdout + cmd.Stderr = &stderr + + if err := cmd.Run(); err != nil { + return fmt.Errorf("%w\nstdout: %s\nstderr: %s", err, stdout.String(), stderr.String()) + } + return nil +} + +// copyFile copies src to dst, creating dst if it does not exist. +func copyFile(src, dst string) error { + in, err := os.Open(src) //nolint:gosec // G304: path from trusted pipeline config + if err != nil { + return fmt.Errorf("open %q: %w", src, err) + } + defer in.Close() + + out, err := os.OpenFile(dst, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600) + if err != nil { + return fmt.Errorf("create %q: %w", dst, err) + } + defer out.Close() + + if _, err := io.Copy(out, in); err != nil { + return fmt.Errorf("copy %q -> %q: %w", src, dst, err) + } + return nil +} diff --git a/module/pipeline_step_build_from_config_test.go b/module/pipeline_step_build_from_config_test.go new file mode 100644 index 00000000..5ee6b210 --- /dev/null +++ b/module/pipeline_step_build_from_config_test.go @@ -0,0 +1,517 @@ +package module + +import ( + "context" + "fmt" + "io" + "os" + "os/exec" + "path/filepath" + "strings" + "testing" +) + +// setupBuildFromConfigFiles creates a temporary directory with a fake config +// file and a fake server binary (empty files). It returns the directory path +// and a cleanup function. +func setupBuildFromConfigFiles(t *testing.T) (configFile, serverBinary string, cleanup func()) { + t.Helper() + dir := t.TempDir() + + configFile = filepath.Join(dir, "app.yaml") + if err := os.WriteFile(configFile, []byte("version: 1\n"), 0600); err != nil { + t.Fatalf("failed to create config file: %v", err) + } + + serverBinary = filepath.Join(dir, "workflow-server") + if err := os.WriteFile(serverBinary, []byte("#!/bin/sh\n"), 0755); err != nil { //nolint:gosec + t.Fatalf("failed to create server binary: %v", err) + } + + return configFile, serverBinary, func() {} // t.TempDir cleans up automatically +} + +// noopExecCommand returns a mock exec.CommandContext function that succeeds +// without running any real process. +func noopExecCommand(_ context.Context, name string, args ...string) *exec.Cmd { + // Invoke a real no-op command so cmd.Run() succeeds. + return exec.Command("true") +} + +// failingExecCommand returns a mock that always fails with an exit error. +func failingExecCommand(_ context.Context, _ string, _ ...string) *exec.Cmd { + return exec.Command("false") +} + +func TestBuildFromConfigStep_FactoryRequiresConfigFile(t *testing.T) { + factory := NewBuildFromConfigStepFactory() + _, err := factory("bfc", map[string]any{"tag": "my-app:latest"}, nil) + if err == nil { + t.Fatal("expected error when config_file is missing") + } + if !strings.Contains(err.Error(), "config_file") { + t.Errorf("expected error to mention config_file, got: %v", err) + } +} + +func TestBuildFromConfigStep_FactoryRequiresTag(t *testing.T) { + factory := NewBuildFromConfigStepFactory() + _, err := factory("bfc", map[string]any{"config_file": "app.yaml"}, nil) + if err == nil { + t.Fatal("expected error when tag is missing") + } + if !strings.Contains(err.Error(), "tag") { + t.Errorf("expected error to mention tag, got: %v", err) + } +} + +func TestBuildFromConfigStep_FactoryPluginMissingFields(t *testing.T) { + factory := NewBuildFromConfigStepFactory() + _, err := factory("bfc", map[string]any{ + "config_file": "app.yaml", + "tag": "my-app:latest", + "plugins": []any{ + map[string]any{"name": "admin"}, // missing binary + }, + }, nil) + if err == nil { + t.Fatal("expected error when plugin binary is missing") + } + if !strings.Contains(err.Error(), "binary") { + t.Errorf("expected error to mention binary, got: %v", err) + } +} + +func TestBuildFromConfigStep_FactoryPluginInvalidEntry(t *testing.T) { + factory := NewBuildFromConfigStepFactory() + _, err := factory("bfc", map[string]any{ + "config_file": "app.yaml", + "tag": "my-app:latest", + "plugins": []any{"not-a-map"}, + }, nil) + if err == nil { + t.Fatal("expected error for non-map plugin entry") + } +} + +func TestBuildFromConfigStep_Name(t *testing.T) { + factory := NewBuildFromConfigStepFactory() + step, err := factory("my-build", map[string]any{ + "config_file": "app.yaml", + "tag": "my-app:latest", + }, nil) + if err != nil { + t.Fatalf("unexpected factory error: %v", err) + } + if step.Name() != "my-build" { + t.Errorf("expected name %q, got %q", "my-build", step.Name()) + } +} + +func TestBuildFromConfigStep_DefaultBaseImage(t *testing.T) { + factory := NewBuildFromConfigStepFactory() + raw, err := factory("bfc", map[string]any{ + "config_file": "app.yaml", + "tag": "my-app:latest", + }, nil) + if err != nil { + t.Fatalf("unexpected factory error: %v", err) + } + bfc := raw.(*BuildFromConfigStep) + if bfc.baseImage != "ghcr.io/gocodealone/workflow-runtime:latest" { + t.Errorf("unexpected default base_image: %q", bfc.baseImage) + } +} + +func TestBuildFromConfigStep_GenerateDockerfile_NoPLugins(t *testing.T) { + s := &BuildFromConfigStep{ + name: "bfc", + baseImage: "gcr.io/distroless/static-debian12:nonroot", + tag: "my-app:latest", + plugins: nil, + } + + got := s.generateDockerfile() + + expectedLines := []string{ + "FROM gcr.io/distroless/static-debian12:nonroot", + "COPY server /server", + "COPY config.yaml /app/config.yaml", + "WORKDIR /app", + "ENTRYPOINT [\"/server\"]", + `CMD ["-config", "/app/config.yaml", "-data-dir", "/app/data"]`, + } + + for _, line := range expectedLines { + if !strings.Contains(got, line) { + t.Errorf("Dockerfile missing line %q\nGot:\n%s", line, got) + } + } + + // Without plugins, there should be no plugins COPY line. + if strings.Contains(got, "COPY plugins/") { + t.Errorf("Dockerfile should not contain plugins COPY when no plugins configured") + } +} + +func TestBuildFromConfigStep_GenerateDockerfile_WithPlugins(t *testing.T) { + s := &BuildFromConfigStep{ + name: "bfc", + baseImage: "gcr.io/distroless/static-debian12:nonroot", + tag: "my-app:latest", + plugins: []PluginSpec{ + {Name: "admin", Binary: "data/plugins/admin/admin"}, + }, + } + + got := s.generateDockerfile() + + if !strings.Contains(got, "COPY plugins/ /app/data/plugins/") { + t.Errorf("Dockerfile should contain plugins COPY line when plugins are configured\nGot:\n%s", got) + } +} + +func TestBuildFromConfigStep_Execute_MissingConfigFile(t *testing.T) { + s := &BuildFromConfigStep{ + name: "bfc", + configFile: "/nonexistent/app.yaml", + serverBinary: "/nonexistent/server", + tag: "my-app:latest", + execCommand: noopExecCommand, + } + + _, err := s.Execute(context.Background(), &PipelineContext{}) + if err == nil { + t.Fatal("expected error for missing config_file") + } + if !strings.Contains(err.Error(), "config_file") { + t.Errorf("expected error to mention config_file, got: %v", err) + } +} + +func TestBuildFromConfigStep_Execute_MissingServerBinary(t *testing.T) { + configFile, _, _ := setupBuildFromConfigFiles(t) + + s := &BuildFromConfigStep{ + name: "bfc", + configFile: configFile, + serverBinary: "/nonexistent/server", + tag: "my-app:latest", + execCommand: noopExecCommand, + } + + _, err := s.Execute(context.Background(), &PipelineContext{}) + if err == nil { + t.Fatal("expected error for missing server_binary") + } + if !strings.Contains(err.Error(), "server_binary") { + t.Errorf("expected error to mention server_binary, got: %v", err) + } +} + +func TestBuildFromConfigStep_Execute_MissingPluginBinary(t *testing.T) { + configFile, serverBinary, _ := setupBuildFromConfigFiles(t) + + s := &BuildFromConfigStep{ + name: "bfc", + configFile: configFile, + serverBinary: serverBinary, + tag: "my-app:latest", + plugins: []PluginSpec{ + {Name: "admin", Binary: "/nonexistent/admin"}, + }, + execCommand: noopExecCommand, + } + + _, err := s.Execute(context.Background(), &PipelineContext{}) + if err == nil { + t.Fatal("expected error for missing plugin binary") + } + if !strings.Contains(err.Error(), "plugin") { + t.Errorf("expected error to mention plugin, got: %v", err) + } +} + +func TestBuildFromConfigStep_Execute_DockerBuildFailure(t *testing.T) { + configFile, serverBinary, _ := setupBuildFromConfigFiles(t) + + s := &BuildFromConfigStep{ + name: "bfc", + configFile: configFile, + serverBinary: serverBinary, + tag: "my-app:latest", + execCommand: failingExecCommand, + } + + _, err := s.Execute(context.Background(), &PipelineContext{}) + if err == nil { + t.Fatal("expected error when docker build fails") + } + if !strings.Contains(err.Error(), "docker build") { + t.Errorf("expected error to mention docker build, got: %v", err) + } +} + +func TestBuildFromConfigStep_Execute_DockerPushFailure(t *testing.T) { + configFile, serverBinary, _ := setupBuildFromConfigFiles(t) + + callCount := 0 + s := &BuildFromConfigStep{ + name: "bfc", + configFile: configFile, + serverBinary: serverBinary, + tag: "my-app:latest", + push: true, + execCommand: func(ctx context.Context, name string, args ...string) *exec.Cmd { + callCount++ + if callCount == 1 { + // First call is docker build — succeed. + return exec.Command("true") + } + // Second call is docker push — fail. + return exec.Command("false") + }, + } + + _, err := s.Execute(context.Background(), &PipelineContext{}) + if err == nil { + t.Fatal("expected error when docker push fails") + } + if !strings.Contains(err.Error(), "docker push") { + t.Errorf("expected error to mention docker push, got: %v", err) + } +} + +func TestBuildFromConfigStep_Execute_NoPush(t *testing.T) { + configFile, serverBinary, _ := setupBuildFromConfigFiles(t) + + buildCalled := false + pushCalled := false + + s := &BuildFromConfigStep{ + name: "bfc", + configFile: configFile, + serverBinary: serverBinary, + baseImage: "gcr.io/distroless/static-debian12:nonroot", + tag: "my-app:latest", + push: false, + execCommand: func(ctx context.Context, name string, args ...string) *exec.Cmd { + if name == "docker" && len(args) > 0 { + switch args[0] { + case "build": + buildCalled = true + case "push": + pushCalled = true + } + } + return exec.Command("true") + }, + } + + result, err := s.Execute(context.Background(), &PipelineContext{}) + if err != nil { + t.Fatalf("Execute failed: %v", err) + } + + if !buildCalled { + t.Error("expected docker build to be called") + } + if pushCalled { + t.Error("expected docker push NOT to be called when push=false") + } + + if result.Output["image_tag"] != "my-app:latest" { + t.Errorf("expected image_tag %q, got %v", "my-app:latest", result.Output["image_tag"]) + } + + dockerfileContent, ok := result.Output["dockerfile_content"].(string) + if !ok || dockerfileContent == "" { + t.Error("expected dockerfile_content to be non-empty string") + } +} + +func TestBuildFromConfigStep_Execute_WithPush(t *testing.T) { + configFile, serverBinary, _ := setupBuildFromConfigFiles(t) + + var dockerCalls []string + s := &BuildFromConfigStep{ + name: "bfc", + configFile: configFile, + serverBinary: serverBinary, + baseImage: "gcr.io/distroless/static-debian12:nonroot", + tag: "my-app:latest", + push: true, + execCommand: func(ctx context.Context, name string, args ...string) *exec.Cmd { + if name == "docker" && len(args) > 0 { + dockerCalls = append(dockerCalls, args[0]) + } + return exec.Command("true") + }, + } + + result, err := s.Execute(context.Background(), &PipelineContext{}) + if err != nil { + t.Fatalf("Execute failed: %v", err) + } + + if len(dockerCalls) != 2 { + t.Fatalf("expected 2 docker calls (build + push), got %d: %v", len(dockerCalls), dockerCalls) + } + if dockerCalls[0] != "build" { + t.Errorf("expected first docker call to be 'build', got %q", dockerCalls[0]) + } + if dockerCalls[1] != "push" { + t.Errorf("expected second docker call to be 'push', got %q", dockerCalls[1]) + } + + if result.Output["image_tag"] != "my-app:latest" { + t.Errorf("expected image_tag %q, got %v", "my-app:latest", result.Output["image_tag"]) + } +} + +func TestBuildFromConfigStep_Execute_WithPlugins(t *testing.T) { + configFile, serverBinary, _ := setupBuildFromConfigFiles(t) + + // Create fake plugin binaries. + pluginDir := t.TempDir() + adminBinary := filepath.Join(pluginDir, "admin") + if err := os.WriteFile(adminBinary, []byte("#!/bin/sh\n"), 0755); err != nil { //nolint:gosec + t.Fatalf("failed to create admin binary: %v", err) + } + bentoBinary := filepath.Join(pluginDir, "workflow-plugin-bento") + if err := os.WriteFile(bentoBinary, []byte("#!/bin/sh\n"), 0755); err != nil { //nolint:gosec + t.Fatalf("failed to create bento binary: %v", err) + } + + var buildArgs []string + s := &BuildFromConfigStep{ + name: "bfc", + configFile: configFile, + serverBinary: serverBinary, + baseImage: "gcr.io/distroless/static-debian12:nonroot", + tag: "my-app:latest", + push: false, + plugins: []PluginSpec{ + {Name: "admin", Binary: adminBinary}, + {Name: "bento", Binary: bentoBinary}, + }, + execCommand: func(ctx context.Context, name string, args ...string) *exec.Cmd { + if name == "docker" && len(args) > 0 && args[0] == "build" { + buildArgs = args + } + return exec.Command("true") + }, + } + + result, err := s.Execute(context.Background(), &PipelineContext{}) + if err != nil { + t.Fatalf("Execute failed: %v", err) + } + + // Verify the Dockerfile includes the plugins COPY line. + dockerfileContent, _ := result.Output["dockerfile_content"].(string) + if !strings.Contains(dockerfileContent, "COPY plugins/ /app/data/plugins/") { + t.Errorf("Dockerfile should contain plugins COPY line\nGot:\n%s", dockerfileContent) + } + + // Verify docker build was called with a context dir argument. + if len(buildArgs) < 3 { + t.Fatalf("expected docker build -t , got args: %v", buildArgs) + } +} + +func TestBuildFromConfigStep_Execute_BuildContextLayout(t *testing.T) { + configFile, serverBinary, _ := setupBuildFromConfigFiles(t) + + pluginDir := t.TempDir() + adminBinary := filepath.Join(pluginDir, "admin") + if err := os.WriteFile(adminBinary, []byte("#!/bin/sh\n"), 0755); err != nil { //nolint:gosec + t.Fatalf("failed to create plugin binary: %v", err) + } + + var capturedBuildDir string + s := &BuildFromConfigStep{ + name: "bfc", + configFile: configFile, + serverBinary: serverBinary, + baseImage: "alpine:latest", + tag: "my-app:latest", + plugins: []PluginSpec{ + {Name: "admin", Binary: adminBinary}, + }, + execCommand: func(ctx context.Context, name string, args ...string) *exec.Cmd { + // Capture the build context dir (last argument to docker build). + if name == "docker" && len(args) > 0 && args[0] == "build" { + capturedBuildDir = args[len(args)-1] + // Make a copy so we can inspect it after Execute returns + // (Execute defers RemoveAll on buildDir). + copyDir := t.TempDir() + _ = copyDirRecursive(capturedBuildDir, copyDir) + capturedBuildDir = copyDir + } + return exec.Command("true") + }, + } + + _, err := s.Execute(context.Background(), &PipelineContext{}) + if err != nil { + t.Fatalf("Execute failed: %v", err) + } + + // Check expected files in the copied build context. + expectedFiles := []string{ + "Dockerfile", + "config.yaml", + "server", + filepath.Join("plugins", "admin", "admin"), + } + for _, f := range expectedFiles { + if _, err := os.Stat(filepath.Join(capturedBuildDir, f)); err != nil { + t.Errorf("build context missing expected file %q: %v", f, err) + } + } +} + +// copyDirRecursive copies the contents of src into dst directory. +func copyDirRecursive(src, dst string) error { + return filepath.Walk(src, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + rel, err := filepath.Rel(src, path) + if err != nil { + return err + } + dstPath := filepath.Join(dst, rel) + if info.IsDir() { + return os.MkdirAll(dstPath, info.Mode()) + } + return func() error { + in, err := os.Open(path) //nolint:gosec + if err != nil { + return err + } + defer in.Close() + out, err := os.OpenFile(dstPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, info.Mode()) + if err != nil { + return err + } + defer out.Close() + _, err = fmt.Fprintf(out, "") + if err != nil { + return err + } + _, err = out.Seek(0, 0) + if err != nil { + return err + } + f, err := os.Open(path) //nolint:gosec + if err != nil { + return err + } + defer f.Close() + _, copyErr := io.Copy(out, f) + return copyErr + }() + }) +} diff --git a/module/pipeline_step_cache_delete.go b/module/pipeline_step_cache_delete.go new file mode 100644 index 00000000..2073929c --- /dev/null +++ b/module/pipeline_step_cache_delete.go @@ -0,0 +1,78 @@ +package module + +import ( + "context" + "fmt" + + "github.com/CrisisTextLine/modular" +) + +// CacheDeleteStep removes a key from a named CacheModule. +type CacheDeleteStep struct { + name string + cache string // service name of the CacheModule + key string // key template + app modular.Application + tmpl *TemplateEngine +} + +// NewCacheDeleteStepFactory returns a StepFactory that creates CacheDeleteStep instances. +func NewCacheDeleteStepFactory() StepFactory { + return func(name string, config map[string]any, app modular.Application) (PipelineStep, error) { + cache, _ := config["cache"].(string) + if cache == "" { + return nil, fmt.Errorf("cache_delete step %q: 'cache' is required", name) + } + + key, _ := config["key"].(string) + if key == "" { + return nil, fmt.Errorf("cache_delete step %q: 'key' is required", name) + } + + return &CacheDeleteStep{ + name: name, + cache: cache, + key: key, + app: app, + tmpl: NewTemplateEngine(), + }, nil + } +} + +func (s *CacheDeleteStep) Name() string { return s.name } + +func (s *CacheDeleteStep) Execute(ctx context.Context, pc *PipelineContext) (*StepResult, error) { + if s.app == nil { + return nil, fmt.Errorf("cache_delete step %q: no application context", s.name) + } + + cm, err := s.resolveCache() + if err != nil { + return nil, err + } + + resolvedKey, err := s.tmpl.Resolve(s.key, pc) + if err != nil { + return nil, fmt.Errorf("cache_delete step %q: failed to resolve key template: %w", s.name, err) + } + + if err := cm.Delete(ctx, resolvedKey); err != nil { + return nil, fmt.Errorf("cache_delete step %q: delete failed: %w", s.name, err) + } + + return &StepResult{Output: map[string]any{ + "deleted": true, + }}, nil +} + +func (s *CacheDeleteStep) resolveCache() (CacheModule, error) { + svc, ok := s.app.SvcRegistry()[s.cache] + if !ok { + return nil, fmt.Errorf("cache_delete step %q: cache service %q not found", s.name, s.cache) + } + cm, ok := svc.(CacheModule) + if !ok { + return nil, fmt.Errorf("cache_delete step %q: service %q does not implement CacheModule", s.name, s.cache) + } + return cm, nil +} diff --git a/module/pipeline_step_cache_delete_test.go b/module/pipeline_step_cache_delete_test.go new file mode 100644 index 00000000..59a39223 --- /dev/null +++ b/module/pipeline_step_cache_delete_test.go @@ -0,0 +1,110 @@ +package module + +import ( + "context" + "errors" + "testing" +) + +func TestCacheDeleteStep_Basic(t *testing.T) { + cm := newMockCacheModule() + cm.data["user:42"] = "cached" + app := mockAppWithCache("cache", cm) + + factory := NewCacheDeleteStepFactory() + step, err := factory("del-user", map[string]any{ + "cache": "cache", + "key": "user:{{.user_id}}", + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(map[string]any{"user_id": "42"}, nil) + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + if result.Output["deleted"] != true { + t.Errorf("expected deleted=true, got %v", result.Output["deleted"]) + } + if _, exists := cm.data["user:42"]; exists { + t.Error("expected key to be removed from mock cache") + } +} + +func TestCacheDeleteStep_DeleteError(t *testing.T) { + cm := newMockCacheModule() + cm.deleteErr = errors.New("delete failed") + app := mockAppWithCache("cache", cm) + + factory := NewCacheDeleteStepFactory() + step, err := factory("del-err", map[string]any{ + "cache": "cache", + "key": "k", + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + _, err = step.Execute(context.Background(), pc) + if err == nil { + t.Fatal("expected error from underlying Delete") + } +} + +func TestCacheDeleteStep_MissingCache(t *testing.T) { + factory := NewCacheDeleteStepFactory() + _, err := factory("bad", map[string]any{"key": "k"}, nil) + if err == nil { + t.Fatal("expected error for missing cache") + } +} + +func TestCacheDeleteStep_MissingKey(t *testing.T) { + factory := NewCacheDeleteStepFactory() + _, err := factory("bad", map[string]any{"cache": "c"}, nil) + if err == nil { + t.Fatal("expected error for missing key") + } +} + +func TestCacheDeleteStep_ServiceNotFound(t *testing.T) { + app := NewMockApplication() + factory := NewCacheDeleteStepFactory() + step, err := factory("del-missing-svc", map[string]any{ + "cache": "nonexistent", + "key": "k", + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + _, err = step.Execute(context.Background(), pc) + if err == nil { + t.Fatal("expected error for missing service") + } +} + +func TestCacheDeleteStep_ServiceWrongType(t *testing.T) { + app := NewMockApplication() + app.Services["cache"] = "not-a-cache" + + factory := NewCacheDeleteStepFactory() + step, err := factory("del-wrong-type", map[string]any{ + "cache": "cache", + "key": "k", + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + _, err = step.Execute(context.Background(), pc) + if err == nil { + t.Fatal("expected error for wrong service type") + } +} diff --git a/module/pipeline_step_cache_get.go b/module/pipeline_step_cache_get.go new file mode 100644 index 00000000..ebd1e67b --- /dev/null +++ b/module/pipeline_step_cache_get.go @@ -0,0 +1,107 @@ +package module + +import ( + "context" + "errors" + "fmt" + + "github.com/CrisisTextLine/modular" + "github.com/redis/go-redis/v9" +) + +// CacheGetStep reads a value from a named CacheModule and stores it in the +// pipeline context under a configurable output field. +type CacheGetStep struct { + name string + cache string // service name of the CacheModule + key string // key template, e.g. "user:{{.user_id}}" + output string // output field name (default: "value") + missOK bool // when true a cache miss is not an error + app modular.Application + tmpl *TemplateEngine +} + +// NewCacheGetStepFactory returns a StepFactory that creates CacheGetStep instances. +func NewCacheGetStepFactory() StepFactory { + return func(name string, config map[string]any, app modular.Application) (PipelineStep, error) { + cache, _ := config["cache"].(string) + if cache == "" { + return nil, fmt.Errorf("cache_get step %q: 'cache' is required", name) + } + + key, _ := config["key"].(string) + if key == "" { + return nil, fmt.Errorf("cache_get step %q: 'key' is required", name) + } + + output, _ := config["output"].(string) + if output == "" { + output = "value" + } + + missOK := true + if v, ok := config["miss_ok"].(bool); ok { + missOK = v + } + + return &CacheGetStep{ + name: name, + cache: cache, + key: key, + output: output, + missOK: missOK, + app: app, + tmpl: NewTemplateEngine(), + }, nil + } +} + +func (s *CacheGetStep) Name() string { return s.name } + +func (s *CacheGetStep) Execute(ctx context.Context, pc *PipelineContext) (*StepResult, error) { + if s.app == nil { + return nil, fmt.Errorf("cache_get step %q: no application context", s.name) + } + + cm, err := s.resolveCache() + if err != nil { + return nil, err + } + + resolvedKey, err := s.tmpl.Resolve(s.key, pc) + if err != nil { + return nil, fmt.Errorf("cache_get step %q: failed to resolve key template: %w", s.name, err) + } + + val, err := cm.Get(ctx, resolvedKey) + if err != nil { + if errors.Is(err, redis.Nil) { + // Cache miss + if !s.missOK { + return nil, fmt.Errorf("cache_get step %q: cache miss for key %q", s.name, resolvedKey) + } + return &StepResult{Output: map[string]any{ + s.output: "", + "cache_hit": false, + }}, nil + } + return nil, fmt.Errorf("cache_get step %q: get failed: %w", s.name, err) + } + + return &StepResult{Output: map[string]any{ + s.output: val, + "cache_hit": true, + }}, nil +} + +func (s *CacheGetStep) resolveCache() (CacheModule, error) { + svc, ok := s.app.SvcRegistry()[s.cache] + if !ok { + return nil, fmt.Errorf("cache_get step %q: cache service %q not found", s.name, s.cache) + } + cm, ok := svc.(CacheModule) + if !ok { + return nil, fmt.Errorf("cache_get step %q: service %q does not implement CacheModule", s.name, s.cache) + } + return cm, nil +} diff --git a/module/pipeline_step_cache_get_test.go b/module/pipeline_step_cache_get_test.go new file mode 100644 index 00000000..ec3fc9e9 --- /dev/null +++ b/module/pipeline_step_cache_get_test.go @@ -0,0 +1,236 @@ +package module + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/redis/go-redis/v9" +) + +// mockCacheModule is an in-memory CacheModule for testing pipeline steps. +type mockCacheModule struct { + data map[string]string + getErr error + setErr error + deleteErr error +} + +func newMockCacheModule() *mockCacheModule { + return &mockCacheModule{data: make(map[string]string)} +} + +func (m *mockCacheModule) Get(_ context.Context, key string) (string, error) { + if m.getErr != nil { + return "", m.getErr + } + v, ok := m.data[key] + if !ok { + return "", redis.Nil + } + return v, nil +} + +func (m *mockCacheModule) Set(_ context.Context, key, value string, _ time.Duration) error { + if m.setErr != nil { + return m.setErr + } + m.data[key] = value + return nil +} + +func (m *mockCacheModule) Delete(_ context.Context, key string) error { + if m.deleteErr != nil { + return m.deleteErr + } + delete(m.data, key) + return nil +} + +// mockAppWithCache creates a MockApplication with a CacheModule service registered. +func mockAppWithCache(name string, cm CacheModule) *MockApplication { + app := NewMockApplication() + app.Services[name] = cm + return app +} + +// ---- tests ---- + +func TestCacheGetStep_Hit(t *testing.T) { + cm := newMockCacheModule() + cm.data["user:42"] = `{"id":42}` + app := mockAppWithCache("cache", cm) + + factory := NewCacheGetStepFactory() + step, err := factory("get-user", map[string]any{ + "cache": "cache", + "key": "user:{{.user_id}}", + "output": "user_data", + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(map[string]any{"user_id": "42"}, nil) + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + if result.Output["user_data"] != `{"id":42}` { + t.Errorf("expected user_data=%q, got %v", `{"id":42}`, result.Output["user_data"]) + } + if result.Output["cache_hit"] != true { + t.Errorf("expected cache_hit=true, got %v", result.Output["cache_hit"]) + } +} + +func TestCacheGetStep_MissOK(t *testing.T) { + cm := newMockCacheModule() + app := mockAppWithCache("cache", cm) + + factory := NewCacheGetStepFactory() + step, err := factory("get-user", map[string]any{ + "cache": "cache", + "key": "user:99", + "miss_ok": true, + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + if result.Output["value"] != "" { + t.Errorf("expected empty value on miss, got %v", result.Output["value"]) + } + if result.Output["cache_hit"] != false { + t.Errorf("expected cache_hit=false on miss, got %v", result.Output["cache_hit"]) + } +} + +func TestCacheGetStep_MissNotOK(t *testing.T) { + cm := newMockCacheModule() + app := mockAppWithCache("cache", cm) + + factory := NewCacheGetStepFactory() + step, err := factory("get-user", map[string]any{ + "cache": "cache", + "key": "user:99", + "miss_ok": false, + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + _, err = step.Execute(context.Background(), pc) + if err == nil { + t.Fatal("expected error on cache miss with miss_ok=false") + } +} + +func TestCacheGetStep_DefaultOutput(t *testing.T) { + cm := newMockCacheModule() + cm.data["thekey"] = "thevalue" + app := mockAppWithCache("cache", cm) + + factory := NewCacheGetStepFactory() + step, err := factory("get-val", map[string]any{ + "cache": "cache", + "key": "thekey", + // output not set → default "value" + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + if result.Output["value"] != "thevalue" { + t.Errorf("expected output[value]=%q, got %v", "thevalue", result.Output["value"]) + } +} + +func TestCacheGetStep_GetError(t *testing.T) { + cm := newMockCacheModule() + cm.getErr = errors.New("connection refused") + app := mockAppWithCache("cache", cm) + + factory := NewCacheGetStepFactory() + step, err := factory("get-err", map[string]any{ + "cache": "cache", + "key": "k", + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + _, err = step.Execute(context.Background(), pc) + if err == nil { + t.Fatal("expected error from underlying Get") + } +} + +func TestCacheGetStep_MissingCache(t *testing.T) { + factory := NewCacheGetStepFactory() + _, err := factory("bad", map[string]any{"key": "k"}, nil) + if err == nil { + t.Fatal("expected error for missing cache") + } +} + +func TestCacheGetStep_MissingKey(t *testing.T) { + factory := NewCacheGetStepFactory() + _, err := factory("bad", map[string]any{"cache": "c"}, nil) + if err == nil { + t.Fatal("expected error for missing key") + } +} + +func TestCacheGetStep_ServiceNotFound(t *testing.T) { + app := NewMockApplication() + factory := NewCacheGetStepFactory() + step, err := factory("get-missing-svc", map[string]any{ + "cache": "nonexistent", + "key": "k", + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + _, err = step.Execute(context.Background(), pc) + if err == nil { + t.Fatal("expected error for missing service") + } +} + +func TestCacheGetStep_ServiceWrongType(t *testing.T) { + app := NewMockApplication() + app.Services["cache"] = "not-a-cache-module" + + factory := NewCacheGetStepFactory() + step, err := factory("get-wrong-type", map[string]any{ + "cache": "cache", + "key": "k", + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + _, err = step.Execute(context.Background(), pc) + if err == nil { + t.Fatal("expected error for wrong service type") + } +} diff --git a/module/pipeline_step_cache_set.go b/module/pipeline_step_cache_set.go new file mode 100644 index 00000000..bb5dc74a --- /dev/null +++ b/module/pipeline_step_cache_set.go @@ -0,0 +1,102 @@ +package module + +import ( + "context" + "fmt" + "time" + + "github.com/CrisisTextLine/modular" +) + +// CacheSetStep writes a value to a named CacheModule. +type CacheSetStep struct { + name string + cache string // service name of the CacheModule + key string // key template + value string // value template + ttl time.Duration // 0 means use the module default + app modular.Application + tmpl *TemplateEngine +} + +// NewCacheSetStepFactory returns a StepFactory that creates CacheSetStep instances. +func NewCacheSetStepFactory() StepFactory { + return func(name string, config map[string]any, app modular.Application) (PipelineStep, error) { + cache, _ := config["cache"].(string) + if cache == "" { + return nil, fmt.Errorf("cache_set step %q: 'cache' is required", name) + } + + key, _ := config["key"].(string) + if key == "" { + return nil, fmt.Errorf("cache_set step %q: 'key' is required", name) + } + + value, _ := config["value"].(string) + if value == "" { + return nil, fmt.Errorf("cache_set step %q: 'value' is required", name) + } + + var ttl time.Duration + if ttlStr, ok := config["ttl"].(string); ok && ttlStr != "" { + parsed, err := time.ParseDuration(ttlStr) + if err != nil { + return nil, fmt.Errorf("cache_set step %q: invalid 'ttl' %q: %w", name, ttlStr, err) + } + ttl = parsed + } + + return &CacheSetStep{ + name: name, + cache: cache, + key: key, + value: value, + ttl: ttl, + app: app, + tmpl: NewTemplateEngine(), + }, nil + } +} + +func (s *CacheSetStep) Name() string { return s.name } + +func (s *CacheSetStep) Execute(ctx context.Context, pc *PipelineContext) (*StepResult, error) { + if s.app == nil { + return nil, fmt.Errorf("cache_set step %q: no application context", s.name) + } + + cm, err := s.resolveCache() + if err != nil { + return nil, err + } + + resolvedKey, err := s.tmpl.Resolve(s.key, pc) + if err != nil { + return nil, fmt.Errorf("cache_set step %q: failed to resolve key template: %w", s.name, err) + } + + resolvedValue, err := s.tmpl.Resolve(s.value, pc) + if err != nil { + return nil, fmt.Errorf("cache_set step %q: failed to resolve value template: %w", s.name, err) + } + + if err := cm.Set(ctx, resolvedKey, resolvedValue, s.ttl); err != nil { + return nil, fmt.Errorf("cache_set step %q: set failed: %w", s.name, err) + } + + return &StepResult{Output: map[string]any{ + "cached": true, + }}, nil +} + +func (s *CacheSetStep) resolveCache() (CacheModule, error) { + svc, ok := s.app.SvcRegistry()[s.cache] + if !ok { + return nil, fmt.Errorf("cache_set step %q: cache service %q not found", s.name, s.cache) + } + cm, ok := svc.(CacheModule) + if !ok { + return nil, fmt.Errorf("cache_set step %q: service %q does not implement CacheModule", s.name, s.cache) + } + return cm, nil +} diff --git a/module/pipeline_step_cache_set_test.go b/module/pipeline_step_cache_set_test.go new file mode 100644 index 00000000..29f0d71d --- /dev/null +++ b/module/pipeline_step_cache_set_test.go @@ -0,0 +1,145 @@ +package module + +import ( + "context" + "errors" + "testing" +) + +func TestCacheSetStep_Basic(t *testing.T) { + cm := newMockCacheModule() + app := mockAppWithCache("cache", cm) + + factory := NewCacheSetStepFactory() + step, err := factory("set-user", map[string]any{ + "cache": "cache", + "key": "user:{{.user_id}}", + "value": "{{.profile}}", + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(map[string]any{ + "user_id": "42", + "profile": `{"name":"Alice"}`, + }, nil) + + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + if result.Output["cached"] != true { + t.Errorf("expected cached=true, got %v", result.Output["cached"]) + } + if cm.data["user:42"] != `{"name":"Alice"}` { + t.Errorf("expected stored value, got %v", cm.data["user:42"]) + } +} + +func TestCacheSetStep_WithTTL(t *testing.T) { + cm := newMockCacheModule() + app := mockAppWithCache("cache", cm) + + factory := NewCacheSetStepFactory() + step, err := factory("set-ttl", map[string]any{ + "cache": "cache", + "key": "k", + "value": "v", + "ttl": "30m", + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + if result.Output["cached"] != true { + t.Errorf("expected cached=true") + } + if cm.data["k"] != "v" { + t.Errorf("expected stored value %q, got %q", "v", cm.data["k"]) + } +} + +func TestCacheSetStep_InvalidTTL(t *testing.T) { + factory := NewCacheSetStepFactory() + _, err := factory("bad-ttl", map[string]any{ + "cache": "cache", + "key": "k", + "value": "v", + "ttl": "notaduration", + }, nil) + if err == nil { + t.Fatal("expected error for invalid TTL") + } +} + +func TestCacheSetStep_SetError(t *testing.T) { + cm := newMockCacheModule() + cm.setErr = errors.New("redis unavailable") + app := mockAppWithCache("cache", cm) + + factory := NewCacheSetStepFactory() + step, err := factory("set-err", map[string]any{ + "cache": "cache", + "key": "k", + "value": "v", + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + _, err = step.Execute(context.Background(), pc) + if err == nil { + t.Fatal("expected error from underlying Set") + } +} + +func TestCacheSetStep_MissingCache(t *testing.T) { + factory := NewCacheSetStepFactory() + _, err := factory("bad", map[string]any{"key": "k", "value": "v"}, nil) + if err == nil { + t.Fatal("expected error for missing cache") + } +} + +func TestCacheSetStep_MissingKey(t *testing.T) { + factory := NewCacheSetStepFactory() + _, err := factory("bad", map[string]any{"cache": "c", "value": "v"}, nil) + if err == nil { + t.Fatal("expected error for missing key") + } +} + +func TestCacheSetStep_MissingValue(t *testing.T) { + factory := NewCacheSetStepFactory() + _, err := factory("bad", map[string]any{"cache": "c", "key": "k"}, nil) + if err == nil { + t.Fatal("expected error for missing value") + } +} + +func TestCacheSetStep_ServiceNotFound(t *testing.T) { + app := NewMockApplication() + factory := NewCacheSetStepFactory() + step, err := factory("set-missing-svc", map[string]any{ + "cache": "nonexistent", + "key": "k", + "value": "v", + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + _, err = step.Execute(context.Background(), pc) + if err == nil { + t.Fatal("expected error for missing service") + } +} diff --git a/module/pipeline_step_statemachine_get.go b/module/pipeline_step_statemachine_get.go new file mode 100644 index 00000000..293f1bd4 --- /dev/null +++ b/module/pipeline_step_statemachine_get.go @@ -0,0 +1,88 @@ +package module + +import ( + "context" + "fmt" + + "github.com/CrisisTextLine/modular" +) + +// StateMachineGetStep reads the current state of a workflow instance. +type StateMachineGetStep struct { + name string + statemachine string + entityID string + app modular.Application + tmpl *TemplateEngine +} + +// NewStateMachineGetStepFactory returns a StepFactory for step.statemachine_get. +// +// Config: +// +// type: step.statemachine_get +// config: +// statemachine: "order-sm" # service name of the StateMachineEngine +// entity_id: "{{.order_id}}" # which instance to look up (template) +// +// Outputs: current_state (string), entity_id (string). +// Returns an error (stopping the pipeline) when the instance is not found. +func NewStateMachineGetStepFactory() StepFactory { + return func(name string, config map[string]any, app modular.Application) (PipelineStep, error) { + sm, _ := config["statemachine"].(string) + if sm == "" { + return nil, fmt.Errorf("statemachine_get step %q: 'statemachine' is required", name) + } + + entityID, _ := config["entity_id"].(string) + if entityID == "" { + return nil, fmt.Errorf("statemachine_get step %q: 'entity_id' is required", name) + } + + return &StateMachineGetStep{ + name: name, + statemachine: sm, + entityID: entityID, + app: app, + tmpl: NewTemplateEngine(), + }, nil + } +} + +// Name returns the step name. +func (s *StateMachineGetStep) Name() string { return s.name } + +// Execute resolves the entity_id template, looks up the StateMachineEngine, and +// returns the current state of the workflow instance. +func (s *StateMachineGetStep) Execute(_ context.Context, pc *PipelineContext) (*StepResult, error) { + if s.app == nil { + return nil, fmt.Errorf("statemachine_get step %q: no application context", s.name) + } + + svc, ok := s.app.SvcRegistry()[s.statemachine] + if !ok { + return nil, fmt.Errorf("statemachine_get step %q: statemachine service %q not found", s.name, s.statemachine) + } + + engine, ok := svc.(*StateMachineEngine) + if !ok { + return nil, fmt.Errorf("statemachine_get step %q: service %q is not a StateMachineEngine", s.name, s.statemachine) + } + + entityID, err := s.tmpl.Resolve(s.entityID, pc) + if err != nil { + return nil, fmt.Errorf("statemachine_get step %q: failed to resolve entity_id: %w", s.name, err) + } + + instance, err := engine.GetInstance(entityID) + if err != nil { + return nil, fmt.Errorf("statemachine_get step %q: instance not found: %w", s.name, err) + } + + return &StepResult{ + Output: map[string]any{ + "current_state": instance.CurrentState, + "entity_id": entityID, + }, + }, nil +} diff --git a/module/pipeline_step_statemachine_get_test.go b/module/pipeline_step_statemachine_get_test.go new file mode 100644 index 00000000..320dfb24 --- /dev/null +++ b/module/pipeline_step_statemachine_get_test.go @@ -0,0 +1,194 @@ +package module + +import ( + "context" + "testing" +) + +// --- Factory validation tests --- + +func TestStateMachineGetStep_MissingStatemachine(t *testing.T) { + factory := NewStateMachineGetStepFactory() + _, err := factory("get-state", map[string]any{ + "entity_id": "order-1", + }, nil) + if err == nil { + t.Fatal("expected error for missing statemachine") + } +} + +func TestStateMachineGetStep_MissingEntityID(t *testing.T) { + factory := NewStateMachineGetStepFactory() + _, err := factory("get-state", map[string]any{ + "statemachine": "order-sm", + }, nil) + if err == nil { + t.Fatal("expected error for missing entity_id") + } +} + +// --- Execution tests --- + +func TestStateMachineGetStep_ReturnsCurrentState(t *testing.T) { + engine := setupOrderStateMachine(t, "order-1", "") + app := newAppWithSM(engine) + + factory := NewStateMachineGetStepFactory() + step, err := factory("get-order-state", map[string]any{ + "statemachine": "order-sm", + "entity_id": "order-1", + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + state, _ := result.Output["current_state"].(string) + if state != "pending" { + t.Errorf("expected current_state='pending', got %q", state) + } + entityID, _ := result.Output["entity_id"].(string) + if entityID != "order-1" { + t.Errorf("expected entity_id='order-1', got %q", entityID) + } +} + +func TestStateMachineGetStep_TemplatedEntityID(t *testing.T) { + engine := setupOrderStateMachine(t, "order-99", "") + app := newAppWithSM(engine) + + factory := NewStateMachineGetStepFactory() + step, err := factory("get-state-template", map[string]any{ + "statemachine": "order-sm", + "entity_id": "{{.order_id}}", + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(map[string]any{"order_id": "order-99"}, nil) + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + state, _ := result.Output["current_state"].(string) + if state != "pending" { + t.Errorf("expected current_state='pending', got %q", state) + } + entityID, _ := result.Output["entity_id"].(string) + if entityID != "order-99" { + t.Errorf("expected entity_id='order-99', got %q", entityID) + } +} + +func TestStateMachineGetStep_ReturnsStateAfterTransition(t *testing.T) { + engine := setupOrderStateMachine(t, "order-1", "") + app := newAppWithSM(engine) + + // Trigger a transition first + if err := engine.TriggerTransition(context.Background(), "order-1", "approve", nil); err != nil { + t.Fatalf("trigger transition: %v", err) + } + + factory := NewStateMachineGetStepFactory() + step, err := factory("get-approved-state", map[string]any{ + "statemachine": "order-sm", + "entity_id": "order-1", + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + state, _ := result.Output["current_state"].(string) + if state != "approved" { + t.Errorf("expected current_state='approved', got %q", state) + } +} + +func TestStateMachineGetStep_InstanceNotFound(t *testing.T) { + engine := setupOrderStateMachine(t, "", "") // no instances created + app := newAppWithSM(engine) + + factory := NewStateMachineGetStepFactory() + step, err := factory("get-missing", map[string]any{ + "statemachine": "order-sm", + "entity_id": "nonexistent", + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + _, err = step.Execute(context.Background(), pc) + if err == nil { + t.Fatal("expected error for nonexistent instance") + } +} + +func TestStateMachineGetStep_ServiceNotFound(t *testing.T) { + app := NewMockApplication() + + factory := NewStateMachineGetStepFactory() + step, err := factory("get-state", map[string]any{ + "statemachine": "nonexistent-sm", + "entity_id": "order-1", + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + _, err = step.Execute(context.Background(), pc) + if err == nil { + t.Fatal("expected error for missing service") + } +} + +func TestStateMachineGetStep_ServiceWrongType(t *testing.T) { + app := NewMockApplication() + app.Services["order-sm"] = "not-an-engine" + + factory := NewStateMachineGetStepFactory() + step, err := factory("get-state", map[string]any{ + "statemachine": "order-sm", + "entity_id": "order-1", + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + _, err = step.Execute(context.Background(), pc) + if err == nil { + t.Fatal("expected error for wrong service type") + } +} + +func TestStateMachineGetStep_NoAppContext(t *testing.T) { + factory := NewStateMachineGetStepFactory() + step, err := factory("get-state", map[string]any{ + "statemachine": "order-sm", + "entity_id": "order-1", + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + _, err = step.Execute(context.Background(), pc) + if err == nil { + t.Fatal("expected error for nil app") + } +} diff --git a/module/pipeline_step_statemachine_transition.go b/module/pipeline_step_statemachine_transition.go new file mode 100644 index 00000000..c193d05b --- /dev/null +++ b/module/pipeline_step_statemachine_transition.go @@ -0,0 +1,188 @@ +package module + +import ( + "context" + "fmt" + + "github.com/CrisisTextLine/modular" +) + +// StateMachineTransitionStep triggers a state machine transition from within a pipeline. +type StateMachineTransitionStep struct { + name string + statemachine string + entityID string + event string + data map[string]any + failOnError bool + app modular.Application + tmpl *TemplateEngine +} + +// NewStateMachineTransitionStepFactory returns a StepFactory for step.statemachine_transition. +// +// Config: +// +// type: step.statemachine_transition +// config: +// statemachine: "order-sm" # service name of the StateMachineEngine +// entity_id: "{{.order_id}}" # which instance to transition (template) +// event: "approve" # transition name +// data: # optional data map (values may use templates) +// approved_by: "{{.user_id}}" +// fail_on_error: false # stop pipeline on invalid transition (default: false) +// +// Outputs: transition_ok (bool), new_state (string), error (string, only on failure). +func NewStateMachineTransitionStepFactory() StepFactory { + return func(name string, config map[string]any, app modular.Application) (PipelineStep, error) { + sm, _ := config["statemachine"].(string) + if sm == "" { + return nil, fmt.Errorf("statemachine_transition step %q: 'statemachine' is required", name) + } + + entityID, _ := config["entity_id"].(string) + if entityID == "" { + return nil, fmt.Errorf("statemachine_transition step %q: 'entity_id' is required", name) + } + + event, _ := config["event"].(string) + if event == "" { + return nil, fmt.Errorf("statemachine_transition step %q: 'event' is required", name) + } + + var data map[string]any + if d, ok := config["data"].(map[string]any); ok { + data = d + } + + failOnError, _ := config["fail_on_error"].(bool) + + return &StateMachineTransitionStep{ + name: name, + statemachine: sm, + entityID: entityID, + event: event, + data: data, + failOnError: failOnError, + app: app, + tmpl: NewTemplateEngine(), + }, nil + } +} + +// Name returns the step name. +func (s *StateMachineTransitionStep) Name() string { return s.name } + +// Execute resolves templates, looks up the StateMachineEngine by service name, and +// triggers the requested transition. On success it sets transition_ok=true and +// new_state to the resulting state. On failure it sets transition_ok=false and +// error to the error message; if fail_on_error is true the pipeline is stopped. +func (s *StateMachineTransitionStep) Execute(ctx context.Context, pc *PipelineContext) (*StepResult, error) { + if s.app == nil { + return nil, fmt.Errorf("statemachine_transition step %q: no application context", s.name) + } + + // Resolve statemachine engine from service registry + svc, ok := s.app.SvcRegistry()[s.statemachine] + if !ok { + return nil, fmt.Errorf("statemachine_transition step %q: statemachine service %q not found", s.name, s.statemachine) + } + + engine, ok := svc.(*StateMachineEngine) + if !ok { + // Also accept the TransitionTrigger interface for testability / mocking + trigger, ok := svc.(TransitionTrigger) + if !ok { + return nil, fmt.Errorf("statemachine_transition step %q: service %q does not implement StateMachineEngine or TransitionTrigger", s.name, s.statemachine) + } + return s.executeViaTrigger(ctx, pc, trigger) + } + + return s.executeViaEngine(ctx, pc, engine) +} + +func (s *StateMachineTransitionStep) executeViaEngine(ctx context.Context, pc *PipelineContext, engine *StateMachineEngine) (*StepResult, error) { + entityID, err := s.tmpl.Resolve(s.entityID, pc) + if err != nil { + return nil, fmt.Errorf("statemachine_transition step %q: failed to resolve entity_id: %w", s.name, err) + } + + event, err := s.tmpl.Resolve(s.event, pc) + if err != nil { + return nil, fmt.Errorf("statemachine_transition step %q: failed to resolve event: %w", s.name, err) + } + + data, err := s.tmpl.ResolveMap(s.data, pc) + if err != nil { + return nil, fmt.Errorf("statemachine_transition step %q: failed to resolve data: %w", s.name, err) + } + + transErr := engine.TriggerTransition(ctx, entityID, event, data) + if transErr != nil { + if s.failOnError { + return nil, fmt.Errorf("statemachine_transition step %q: transition failed: %w", s.name, transErr) + } + return &StepResult{ + Output: map[string]any{ + "transition_ok": false, + "error": transErr.Error(), + }, + }, nil + } + + // Fetch the new state from the engine + instance, err := engine.GetInstance(entityID) + if err != nil { + // Transition succeeded but we can't read new state — treat as success with unknown state + return &StepResult{ + Output: map[string]any{ + "transition_ok": true, + "new_state": "", + }, + }, nil + } + + return &StepResult{ + Output: map[string]any{ + "transition_ok": true, + "new_state": instance.CurrentState, + }, + }, nil +} + +func (s *StateMachineTransitionStep) executeViaTrigger(ctx context.Context, pc *PipelineContext, trigger TransitionTrigger) (*StepResult, error) { + entityID, err := s.tmpl.Resolve(s.entityID, pc) + if err != nil { + return nil, fmt.Errorf("statemachine_transition step %q: failed to resolve entity_id: %w", s.name, err) + } + + event, err := s.tmpl.Resolve(s.event, pc) + if err != nil { + return nil, fmt.Errorf("statemachine_transition step %q: failed to resolve event: %w", s.name, err) + } + + data, err := s.tmpl.ResolveMap(s.data, pc) + if err != nil { + return nil, fmt.Errorf("statemachine_transition step %q: failed to resolve data: %w", s.name, err) + } + + transErr := trigger.TriggerTransition(ctx, entityID, event, data) + if transErr != nil { + if s.failOnError { + return nil, fmt.Errorf("statemachine_transition step %q: transition failed: %w", s.name, transErr) + } + return &StepResult{ + Output: map[string]any{ + "transition_ok": false, + "error": transErr.Error(), + }, + }, nil + } + + return &StepResult{ + Output: map[string]any{ + "transition_ok": true, + "new_state": "", + }, + }, nil +} diff --git a/module/pipeline_step_statemachine_transition_test.go b/module/pipeline_step_statemachine_transition_test.go new file mode 100644 index 00000000..3cb27865 --- /dev/null +++ b/module/pipeline_step_statemachine_transition_test.go @@ -0,0 +1,419 @@ +package module + +import ( + "context" + "errors" + "testing" +) + +// mockTransitionTrigger implements TransitionTrigger for testing without a real engine. +type mockTransitionTrigger struct { + triggerErr error + capturedID string + capturedEvt string + capturedData map[string]any +} + +func (m *mockTransitionTrigger) TriggerTransition(_ context.Context, workflowID, transitionName string, data map[string]any) error { + m.capturedID = workflowID + m.capturedEvt = transitionName + m.capturedData = data + return m.triggerErr +} + +// setupOrderStateMachine creates a StateMachineEngine with a simple order workflow. +func setupOrderStateMachine(t *testing.T, instanceID, initialState string) *StateMachineEngine { + t.Helper() + + engine := NewStateMachineEngine("order-sm") + def := &StateMachineDefinition{ + Name: "order", + InitialState: "pending", + States: map[string]*State{ + "pending": {Name: "pending"}, + "approved": {Name: "approved"}, + "rejected": {Name: "rejected", IsFinal: true}, + }, + Transitions: map[string]*Transition{ + "approve": {Name: "approve", FromState: "pending", ToState: "approved"}, + "reject": {Name: "reject", FromState: "pending", ToState: "rejected"}, + }, + } + if err := engine.RegisterDefinition(def); err != nil { + t.Fatalf("register definition: %v", err) + } + + if instanceID != "" { + instance, err := engine.CreateWorkflow("order", instanceID, nil) + if err != nil { + t.Fatalf("create workflow: %v", err) + } + // If caller wants a non-initial state, force it directly for test setup + if initialState != "" && initialState != def.InitialState { + instance.CurrentState = initialState + } + } + + return engine +} + +// newAppWithSM registers the engine under "order-sm" in a MockApplication. +func newAppWithSM(engine *StateMachineEngine) *MockApplication { + app := NewMockApplication() + app.Services["order-sm"] = engine + return app +} + +// --- Factory validation tests --- + +func TestStateMachineTransitionStep_MissingStatemachine(t *testing.T) { + factory := NewStateMachineTransitionStepFactory() + _, err := factory("step1", map[string]any{ + "entity_id": "order-1", + "event": "approve", + }, nil) + if err == nil { + t.Fatal("expected error for missing statemachine") + } +} + +func TestStateMachineTransitionStep_MissingEntityID(t *testing.T) { + factory := NewStateMachineTransitionStepFactory() + _, err := factory("step1", map[string]any{ + "statemachine": "order-sm", + "event": "approve", + }, nil) + if err == nil { + t.Fatal("expected error for missing entity_id") + } +} + +func TestStateMachineTransitionStep_MissingEvent(t *testing.T) { + factory := NewStateMachineTransitionStepFactory() + _, err := factory("step1", map[string]any{ + "statemachine": "order-sm", + "entity_id": "order-1", + }, nil) + if err == nil { + t.Fatal("expected error for missing event") + } +} + +// --- Execution tests: using real StateMachineEngine --- + +func TestStateMachineTransitionStep_SuccessfulTransition(t *testing.T) { + engine := setupOrderStateMachine(t, "order-1", "") + app := newAppWithSM(engine) + + factory := NewStateMachineTransitionStepFactory() + step, err := factory("approve-order", map[string]any{ + "statemachine": "order-sm", + "entity_id": "order-1", + "event": "approve", + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + ok, _ := result.Output["transition_ok"].(bool) + if !ok { + t.Error("expected transition_ok=true") + } + newState, _ := result.Output["new_state"].(string) + if newState != "approved" { + t.Errorf("expected new_state='approved', got %q", newState) + } +} + +func TestStateMachineTransitionStep_TemplatedEntityIDAndEvent(t *testing.T) { + engine := setupOrderStateMachine(t, "order-42", "") + app := newAppWithSM(engine) + + factory := NewStateMachineTransitionStepFactory() + step, err := factory("dynamic-approve", map[string]any{ + "statemachine": "order-sm", + "entity_id": "{{.order_id}}", + "event": "{{.action}}", + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(map[string]any{ + "order_id": "order-42", + "action": "approve", + }, nil) + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + ok, _ := result.Output["transition_ok"].(bool) + if !ok { + t.Error("expected transition_ok=true") + } +} + +func TestStateMachineTransitionStep_InvalidTransition_FailOnErrorFalse(t *testing.T) { + engine := setupOrderStateMachine(t, "order-1", "") + app := newAppWithSM(engine) + + factory := NewStateMachineTransitionStepFactory() + // "reject" is valid, but "approve" from "approved" is not — trigger approve twice + step, err := factory("double-approve", map[string]any{ + "statemachine": "order-sm", + "entity_id": "order-1", + "event": "approve", + "fail_on_error": false, + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + // First transition succeeds + _, err = step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("first execute error: %v", err) + } + + // Second transition: "approve" from "approved" is invalid + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("expected no pipeline error with fail_on_error=false, got: %v", err) + } + + ok, _ := result.Output["transition_ok"].(bool) + if ok { + t.Error("expected transition_ok=false for invalid transition") + } + errMsg, _ := result.Output["error"].(string) + if errMsg == "" { + t.Error("expected error message in output") + } +} + +func TestStateMachineTransitionStep_InvalidTransition_FailOnErrorTrue(t *testing.T) { + engine := setupOrderStateMachine(t, "order-1", "") + app := newAppWithSM(engine) + + factory := NewStateMachineTransitionStepFactory() + step, err := factory("strict-approve", map[string]any{ + "statemachine": "order-sm", + "entity_id": "order-1", + "event": "approve", + "fail_on_error": true, + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + // First transition succeeds + _, err = step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("first execute error: %v", err) + } + + // Second transition should fail with pipeline error + _, err = step.Execute(context.Background(), pc) + if err == nil { + t.Fatal("expected pipeline error with fail_on_error=true") + } +} + +func TestStateMachineTransitionStep_WithData(t *testing.T) { + engine := setupOrderStateMachine(t, "order-1", "") + app := newAppWithSM(engine) + + factory := NewStateMachineTransitionStepFactory() + step, err := factory("approve-with-data", map[string]any{ + "statemachine": "order-sm", + "entity_id": "order-1", + "event": "approve", + "data": map[string]any{ + "approved_by": "{{.user_id}}", + }, + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(map[string]any{"user_id": "u-99"}, nil) + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + ok, _ := result.Output["transition_ok"].(bool) + if !ok { + t.Error("expected transition_ok=true") + } + + // Verify data was merged into instance + instance, err := engine.GetInstance("order-1") + if err != nil { + t.Fatalf("get instance: %v", err) + } + approvedBy, _ := instance.Data["approved_by"].(string) + if approvedBy != "u-99" { + t.Errorf("expected approved_by='u-99', got %q", approvedBy) + } +} + +func TestStateMachineTransitionStep_ServiceNotFound(t *testing.T) { + app := NewMockApplication() + + factory := NewStateMachineTransitionStepFactory() + step, err := factory("step1", map[string]any{ + "statemachine": "nonexistent-sm", + "entity_id": "order-1", + "event": "approve", + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + _, err = step.Execute(context.Background(), pc) + if err == nil { + t.Fatal("expected error for missing service") + } +} + +func TestStateMachineTransitionStep_ServiceWrongType(t *testing.T) { + app := NewMockApplication() + // Register something that is neither *StateMachineEngine nor TransitionTrigger + app.Services["order-sm"] = "not-an-engine" + + factory := NewStateMachineTransitionStepFactory() + step, err := factory("step1", map[string]any{ + "statemachine": "order-sm", + "entity_id": "order-1", + "event": "approve", + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + _, err = step.Execute(context.Background(), pc) + if err == nil { + t.Fatal("expected error for wrong service type") + } +} + +func TestStateMachineTransitionStep_NoAppContext(t *testing.T) { + factory := NewStateMachineTransitionStepFactory() + step, err := factory("step1", map[string]any{ + "statemachine": "order-sm", + "entity_id": "order-1", + "event": "approve", + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + _, err = step.Execute(context.Background(), pc) + if err == nil { + t.Fatal("expected error for nil app") + } +} + +// --- Execution tests: using mock TransitionTrigger --- + +func TestStateMachineTransitionStep_MockTrigger_Success(t *testing.T) { + mock := &mockTransitionTrigger{} + app := NewMockApplication() + app.Services["order-sm"] = mock + + factory := NewStateMachineTransitionStepFactory() + step, err := factory("mock-approve", map[string]any{ + "statemachine": "order-sm", + "entity_id": "order-1", + "event": "approve", + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + if mock.capturedID != "order-1" { + t.Errorf("expected capturedID='order-1', got %q", mock.capturedID) + } + if mock.capturedEvt != "approve" { + t.Errorf("expected capturedEvt='approve', got %q", mock.capturedEvt) + } + + ok, _ := result.Output["transition_ok"].(bool) + if !ok { + t.Error("expected transition_ok=true") + } +} + +func TestStateMachineTransitionStep_MockTrigger_Error_NoFail(t *testing.T) { + mock := &mockTransitionTrigger{triggerErr: errors.New("invalid transition")} + app := NewMockApplication() + app.Services["order-sm"] = mock + + factory := NewStateMachineTransitionStepFactory() + step, err := factory("mock-fail", map[string]any{ + "statemachine": "order-sm", + "entity_id": "order-1", + "event": "approve", + "fail_on_error": false, + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("expected no pipeline error, got: %v", err) + } + + ok, _ := result.Output["transition_ok"].(bool) + if ok { + t.Error("expected transition_ok=false") + } + errMsg, _ := result.Output["error"].(string) + if errMsg == "" { + t.Error("expected error in output") + } +} + +func TestStateMachineTransitionStep_MockTrigger_Error_Fail(t *testing.T) { + mock := &mockTransitionTrigger{triggerErr: errors.New("invalid transition")} + app := NewMockApplication() + app.Services["order-sm"] = mock + + factory := NewStateMachineTransitionStepFactory() + step, err := factory("mock-strict-fail", map[string]any{ + "statemachine": "order-sm", + "entity_id": "order-1", + "event": "approve", + "fail_on_error": true, + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + _, err = step.Execute(context.Background(), pc) + if err == nil { + t.Fatal("expected pipeline error with fail_on_error=true") + } +} diff --git a/plugins/cicd/plugin.go b/plugins/cicd/plugin.go index 91544b89..145c695d 100644 --- a/plugins/cicd/plugin.go +++ b/plugins/cicd/plugin.go @@ -1,6 +1,7 @@ // Package cicd provides a plugin that registers CI/CD pipeline step types: // shell_exec, artifact_pull, artifact_push, docker_build, docker_push, -// docker_run, scan_sast, scan_container, scan_deps, deploy, gate, build_ui. +// docker_run, scan_sast, scan_container, scan_deps, deploy, gate, build_ui, +// build_from_config. package cicd import ( @@ -22,13 +23,13 @@ func New() *Plugin { BaseNativePlugin: plugin.BaseNativePlugin{ PluginName: "cicd", PluginVersion: "1.0.0", - PluginDescription: "CI/CD pipeline step types (shell exec, Docker, artifact management, security scanning, deploy, gate)", + PluginDescription: "CI/CD pipeline step types (shell exec, Docker, artifact management, security scanning, deploy, gate, build from config)", }, Manifest: plugin.PluginManifest{ Name: "cicd", Version: "1.0.0", Author: "GoCodeAlone", - Description: "CI/CD pipeline step types (shell exec, Docker, artifact management, security scanning, deploy, gate)", + Description: "CI/CD pipeline step types (shell exec, Docker, artifact management, security scanning, deploy, gate, build from config)", Tier: plugin.TierCore, StepTypes: []string{ "step.shell_exec", @@ -43,6 +44,7 @@ func New() *Plugin { "step.deploy", "step.gate", "step.build_ui", + "step.build_from_config", }, Capabilities: []plugin.CapabilityDecl{ {Name: "cicd-pipeline", Role: "provider", Priority: 50}, @@ -57,7 +59,7 @@ func (p *Plugin) Capabilities() []capability.Contract { return []capability.Contract{ { Name: "cicd-pipeline", - Description: "CI/CD pipeline operations: shell exec, Docker, artifact management, security scanning, deploy, gate", + Description: "CI/CD pipeline operations: shell exec, Docker, artifact management, security scanning, deploy, gate, build from config", }, } } @@ -65,18 +67,19 @@ func (p *Plugin) Capabilities() []capability.Contract { // StepFactories returns the CI/CD step factories. func (p *Plugin) StepFactories() map[string]plugin.StepFactory { return map[string]plugin.StepFactory{ - "step.shell_exec": wrapStepFactory(module.NewShellExecStepFactory()), - "step.artifact_pull": wrapStepFactory(module.NewArtifactPullStepFactory()), - "step.artifact_push": wrapStepFactory(module.NewArtifactPushStepFactory()), - "step.docker_build": wrapStepFactory(module.NewDockerBuildStepFactory()), - "step.docker_push": wrapStepFactory(module.NewDockerPushStepFactory()), - "step.docker_run": wrapStepFactory(module.NewDockerRunStepFactory()), - "step.scan_sast": wrapStepFactory(module.NewScanSASTStepFactory()), - "step.scan_container": wrapStepFactory(module.NewScanContainerStepFactory()), - "step.scan_deps": wrapStepFactory(module.NewScanDepsStepFactory()), - "step.deploy": wrapStepFactory(module.NewDeployStepFactory()), - "step.gate": wrapStepFactory(module.NewGateStepFactory()), - "step.build_ui": wrapStepFactory(module.NewBuildUIStepFactory()), + "step.shell_exec": wrapStepFactory(module.NewShellExecStepFactory()), + "step.artifact_pull": wrapStepFactory(module.NewArtifactPullStepFactory()), + "step.artifact_push": wrapStepFactory(module.NewArtifactPushStepFactory()), + "step.docker_build": wrapStepFactory(module.NewDockerBuildStepFactory()), + "step.docker_push": wrapStepFactory(module.NewDockerPushStepFactory()), + "step.docker_run": wrapStepFactory(module.NewDockerRunStepFactory()), + "step.scan_sast": wrapStepFactory(module.NewScanSASTStepFactory()), + "step.scan_container": wrapStepFactory(module.NewScanContainerStepFactory()), + "step.scan_deps": wrapStepFactory(module.NewScanDepsStepFactory()), + "step.deploy": wrapStepFactory(module.NewDeployStepFactory()), + "step.gate": wrapStepFactory(module.NewGateStepFactory()), + "step.build_ui": wrapStepFactory(module.NewBuildUIStepFactory()), + "step.build_from_config": wrapStepFactory(module.NewBuildFromConfigStepFactory()), } } diff --git a/plugins/cicd/plugin_test.go b/plugins/cicd/plugin_test.go index cacea13c..0f598a05 100644 --- a/plugins/cicd/plugin_test.go +++ b/plugins/cicd/plugin_test.go @@ -43,6 +43,7 @@ func TestStepFactories(t *testing.T) { "step.deploy", "step.gate", "step.build_ui", + "step.build_from_config", } for _, stepType := range expectedSteps { @@ -54,6 +55,7 @@ func TestStepFactories(t *testing.T) { if len(factories) != len(expectedSteps) { t.Errorf("expected %d step factories, got %d", len(expectedSteps), len(factories)) } + } func TestPluginLoads(t *testing.T) { @@ -64,7 +66,7 @@ func TestPluginLoads(t *testing.T) { } steps := loader.StepFactories() - if len(steps) != 12 { - t.Fatalf("expected 12 step factories after load, got %d", len(steps)) + if len(steps) != 13 { + t.Fatalf("expected 13 step factories after load, got %d", len(steps)) } } diff --git a/plugins/pipelinesteps/plugin.go b/plugins/pipelinesteps/plugin.go index c02f74f9..72213659 100644 --- a/plugins/pipelinesteps/plugin.go +++ b/plugins/pipelinesteps/plugin.go @@ -69,6 +69,9 @@ func New() *Plugin { "step.validate_request_body", "step.foreach", "step.webhook_verify", + "step.cache_get", + "step.cache_set", + "step.cache_delete", }, WorkflowTypes: []string{"pipeline"}, Capabilities: []plugin.CapabilityDecl{ @@ -114,6 +117,9 @@ func (p *Plugin) StepFactories() map[string]plugin.StepFactory { return p.concreteStepRegistry }, nil)), "step.webhook_verify": wrapStepFactory(module.NewWebhookVerifyStepFactory()), + "step.cache_get": wrapStepFactory(module.NewCacheGetStepFactory()), + "step.cache_set": wrapStepFactory(module.NewCacheSetStepFactory()), + "step.cache_delete": wrapStepFactory(module.NewCacheDeleteStepFactory()), } } diff --git a/plugins/pipelinesteps/plugin_test.go b/plugins/pipelinesteps/plugin_test.go index 8314fc0e..1c540050 100644 --- a/plugins/pipelinesteps/plugin_test.go +++ b/plugins/pipelinesteps/plugin_test.go @@ -49,6 +49,9 @@ func TestStepFactories(t *testing.T) { "step.validate_request_body", "step.foreach", "step.webhook_verify", + "step.cache_get", + "step.cache_set", + "step.cache_delete", } for _, stepType := range expectedSteps { @@ -70,7 +73,7 @@ func TestPluginLoads(t *testing.T) { } steps := loader.StepFactories() - if len(steps) != 18 { - t.Fatalf("expected 18 step factories after load, got %d", len(steps)) + if len(steps) != 21 { + t.Fatalf("expected 21 step factories after load, got %d", len(steps)) } } diff --git a/plugins/statemachine/plugin.go b/plugins/statemachine/plugin.go index d9ebf4d0..2bc303ff 100644 --- a/plugins/statemachine/plugin.go +++ b/plugins/statemachine/plugin.go @@ -37,6 +37,10 @@ func New() *Plugin { "state.tracker", "state.connector", }, + StepTypes: []string{ + "step.statemachine_transition", + "step.statemachine_get", + }, WorkflowTypes: []string{"statemachine"}, Capabilities: []plugin.CapabilityDecl{ {Name: "state-machine", Role: "provider", Priority: 10}, @@ -98,6 +102,22 @@ func (p *Plugin) WorkflowHandlers() map[string]plugin.WorkflowHandlerFactory { } } +// StepFactories returns the pipeline step factories for state machine operations. +func (p *Plugin) StepFactories() map[string]plugin.StepFactory { + return map[string]plugin.StepFactory{ + "step.statemachine_transition": wrapStepFactory(module.NewStateMachineTransitionStepFactory()), + "step.statemachine_get": wrapStepFactory(module.NewStateMachineGetStepFactory()), + } +} + +// wrapStepFactory converts a module.StepFactory to a plugin.StepFactory, +// threading the modular.Application through so steps can access the service registry. +func wrapStepFactory(f module.StepFactory) plugin.StepFactory { + return func(name string, cfg map[string]any, app modular.Application) (any, error) { + return f(name, cfg, app) + } +} + // ModuleSchemas returns UI schema definitions for state machine module types. func (p *Plugin) ModuleSchemas() []*schema.ModuleSchema { return []*schema.ModuleSchema{ diff --git a/plugins/statemachine/plugin_test.go b/plugins/statemachine/plugin_test.go index ec34d514..26d19671 100644 --- a/plugins/statemachine/plugin_test.go +++ b/plugins/statemachine/plugin_test.go @@ -24,6 +24,9 @@ func TestPluginManifest(t *testing.T) { if len(m.ModuleTypes) != 3 { t.Errorf("expected 3 module types, got %d", len(m.ModuleTypes)) } + if len(m.StepTypes) != 2 { + t.Errorf("expected 2 step types, got %d", len(m.StepTypes)) + } if len(m.WorkflowTypes) != 1 { t.Errorf("expected 1 workflow type, got %d", len(m.WorkflowTypes)) } @@ -100,6 +103,21 @@ func TestWorkflowHandlers(t *testing.T) { } } +func TestStepFactories(t *testing.T) { + p := New() + factories := p.StepFactories() + + expectedTypes := []string{"step.statemachine_transition", "step.statemachine_get"} + for _, typ := range expectedTypes { + if _, ok := factories[typ]; !ok { + t.Errorf("missing step factory for %q", typ) + } + } + if len(factories) != len(expectedTypes) { + t.Errorf("expected %d step factories, got %d", len(expectedTypes), len(factories)) + } +} + func TestModuleSchemas(t *testing.T) { p := New() schemas := p.ModuleSchemas() diff --git a/plugins/storage/plugin.go b/plugins/storage/plugin.go index 601c9aaf..c6b59836 100644 --- a/plugins/storage/plugin.go +++ b/plugins/storage/plugin.go @@ -1,6 +1,8 @@ package storage import ( + "time" + "github.com/CrisisTextLine/modular" "github.com/GoCodeAlone/workflow/capability" "github.com/GoCodeAlone/workflow/config" @@ -10,8 +12,8 @@ import ( ) // Plugin provides storage and database capabilities: storage.s3, storage.local, -// storage.gcs, storage.sqlite, database.workflow, persistence.store modules, -// and the step.db_query / step.db_exec pipeline step factories. +// storage.gcs, storage.sqlite, database.workflow, persistence.store, cache.redis +// modules, and the step.db_query / step.db_exec pipeline step factories. type Plugin struct { plugin.BaseEnginePlugin } @@ -23,13 +25,13 @@ func New() *Plugin { BaseNativePlugin: plugin.BaseNativePlugin{ PluginName: "storage", PluginVersion: "1.0.0", - PluginDescription: "Storage, database, and persistence modules with DB pipeline steps", + PluginDescription: "Storage, database, persistence, and cache modules with DB pipeline steps", }, Manifest: plugin.PluginManifest{ Name: "storage", Version: "1.0.0", Author: "GoCodeAlone", - Description: "Storage, database, and persistence modules with DB pipeline steps", + Description: "Storage, database, persistence, and cache modules with DB pipeline steps", Tier: plugin.TierCore, ModuleTypes: []string{ "storage.s3", @@ -38,11 +40,13 @@ func New() *Plugin { "storage.sqlite", "database.workflow", "persistence.store", + "cache.redis", }, Capabilities: []plugin.CapabilityDecl{ {Name: "storage", Role: "provider", Priority: 10}, {Name: "database", Role: "provider", Priority: 10}, {Name: "persistence", Role: "provider", Priority: 10}, + {Name: "cache", Role: "provider", Priority: 10}, }, }, }, @@ -64,6 +68,10 @@ func (p *Plugin) Capabilities() []capability.Contract { Name: "persistence", Description: "Persistence layer that uses a database service for storage", }, + { + Name: "cache", + Description: "Redis-backed key/value cache for pipeline data", + }, } } @@ -141,6 +149,31 @@ func (p *Plugin) ModuleFactories() map[string]plugin.ModuleFactory { } return module.NewPersistenceStore(name, dbServiceName) }, + "cache.redis": func(name string, cfg map[string]any) modular.Module { + redisCfg := module.RedisCacheConfig{ + Address: "localhost:6379", + Prefix: "wf:", + DefaultTTL: time.Hour, + } + if addr, ok := cfg["address"].(string); ok && addr != "" { + redisCfg.Address = module.ExpandEnvString(addr) + } + if pw, ok := cfg["password"].(string); ok { + redisCfg.Password = module.ExpandEnvString(pw) + } + if db, ok := cfg["db"].(float64); ok { + redisCfg.DB = int(db) + } + if prefix, ok := cfg["prefix"].(string); ok && prefix != "" { + redisCfg.Prefix = prefix + } + if ttlStr, ok := cfg["defaultTTL"].(string); ok && ttlStr != "" { + if d, err := time.ParseDuration(ttlStr); err == nil { + redisCfg.DefaultTTL = d + } + } + return module.NewRedisCache(name, redisCfg) + }, } } @@ -226,5 +259,25 @@ func (p *Plugin) ModuleSchemas() []*schema.ModuleSchema { }, DefaultConfig: map[string]any{"database": "database"}, }, + { + Type: "cache.redis", + Label: "Redis Cache", + Category: "cache", + Description: "Redis-backed key/value cache for pipeline step data", + Outputs: []schema.ServiceIODef{{Name: "cache", Type: "CacheModule", Description: "Redis cache service"}}, + ConfigFields: []schema.ConfigFieldDef{ + {Key: "address", Label: "Address", Type: schema.FieldTypeString, DefaultValue: "localhost:6379", Description: "Redis server address (host:port)", Placeholder: "localhost:6379"}, + {Key: "password", Label: "Password", Type: schema.FieldTypeString, Description: "Redis password (optional)", Sensitive: true}, + {Key: "db", Label: "Database", Type: schema.FieldTypeNumber, DefaultValue: 0, Description: "Redis database number"}, + {Key: "prefix", Label: "Key Prefix", Type: schema.FieldTypeString, DefaultValue: "wf:", Description: "Prefix applied to all cache keys"}, + {Key: "defaultTTL", Label: "Default TTL", Type: schema.FieldTypeString, DefaultValue: "1h", Description: "Default time-to-live for cached values (e.g. 30m, 1h, 24h)"}, + }, + DefaultConfig: map[string]any{ + "address": "localhost:6379", + "db": 0, + "prefix": "wf:", + "defaultTTL": "1h", + }, + }, } } diff --git a/plugins/storage/plugin_test.go b/plugins/storage/plugin_test.go index 34e9031e..627a0568 100644 --- a/plugins/storage/plugin_test.go +++ b/plugins/storage/plugin_test.go @@ -21,8 +21,8 @@ func TestPluginManifest(t *testing.T) { if m.Name != "storage" { t.Errorf("expected name %q, got %q", "storage", m.Name) } - if len(m.ModuleTypes) != 6 { - t.Errorf("expected 6 module types, got %d", len(m.ModuleTypes)) + if len(m.ModuleTypes) != 7 { + t.Errorf("expected 7 module types, got %d", len(m.ModuleTypes)) } if len(m.StepTypes) != 0 { t.Errorf("expected 0 step types, got %d", len(m.StepTypes)) @@ -32,14 +32,14 @@ func TestPluginManifest(t *testing.T) { func TestPluginCapabilities(t *testing.T) { p := New() caps := p.Capabilities() - if len(caps) != 3 { - t.Fatalf("expected 3 capabilities, got %d", len(caps)) + if len(caps) != 4 { + t.Fatalf("expected 4 capabilities, got %d", len(caps)) } names := map[string]bool{} for _, c := range caps { names[c.Name] = true } - for _, expected := range []string{"storage", "database", "persistence"} { + for _, expected := range []string{"storage", "database", "persistence", "cache"} { if !names[expected] { t.Errorf("missing capability %q", expected) } @@ -53,6 +53,7 @@ func TestModuleFactories(t *testing.T) { expectedTypes := []string{ "storage.s3", "storage.local", "storage.gcs", "storage.sqlite", "database.workflow", "persistence.store", + "cache.redis", } for _, typ := range expectedTypes { factory, ok := factories[typ] @@ -133,8 +134,8 @@ func TestStepFactories(t *testing.T) { func TestModuleSchemas(t *testing.T) { p := New() schemas := p.ModuleSchemas() - if len(schemas) != 6 { - t.Fatalf("expected 6 module schemas, got %d", len(schemas)) + if len(schemas) != 7 { + t.Fatalf("expected 7 module schemas, got %d", len(schemas)) } types := map[string]bool{} @@ -144,6 +145,7 @@ func TestModuleSchemas(t *testing.T) { expectedTypes := []string{ "storage.s3", "storage.local", "storage.gcs", "storage.sqlite", "database.workflow", "persistence.store", + "cache.redis", } for _, expected := range expectedTypes { if !types[expected] { @@ -151,3 +153,31 @@ func TestModuleSchemas(t *testing.T) { } } } + +func TestCacheRedisFactory(t *testing.T) { + p := New() + factories := p.ModuleFactories() + + factory, ok := factories["cache.redis"] + if !ok { + t.Fatal("missing factory for cache.redis") + } + + // Default config + mod := factory("cache", map[string]any{}) + if mod == nil { + t.Fatal("cache.redis factory returned nil with empty config") + } + + // Full config + mod = factory("cache", map[string]any{ + "address": "redis:6379", + "password": "secret", + "db": float64(1), + "prefix": "myapp:", + "defaultTTL": "30m", + }) + if mod == nil { + t.Fatal("cache.redis factory returned nil with full config") + } +}