From bd3cc2e1c35d4202b399de0b79200bf922346aa8 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 27 Feb 2026 05:20:45 +0000 Subject: [PATCH 1/4] Initial plan From bebf3d9b250fa821279b3c005cfb4d3850a30f25 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 27 Feb 2026 05:36:24 +0000 Subject: [PATCH 2/4] feat: add step.s3_upload pipeline step for S3-compatible object storage upload Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --- module/app_container.go | 22 +- module/argo_workflows.go | 10 +- module/cloud_account_aws_creds.go | 8 +- module/cloud_account_azure.go | 10 +- module/cloud_account_do.go | 6 +- module/cloud_account_gcp.go | 12 +- module/cloud_account_k8s.go | 6 +- module/gitlab_client_module.go | 10 +- module/gitlab_webhook.go | 20 +- module/jwt_auth.go | 28 +- module/nosql_dynamodb.go | 2 +- module/nosql_memory.go | 8 +- module/nosql_mongodb.go | 2 +- module/pipeline_step_argo.go | 16 +- module/pipeline_step_artifact.go | 3 +- module/pipeline_step_build_binary.go | 20 +- module/pipeline_step_cache_get.go | 4 +- module/pipeline_step_dlq_test.go | 10 +- module/pipeline_step_event_publish_test.go | 8 +- module/pipeline_step_foreach_test.go | 4 +- module/pipeline_step_gitlab.go | 52 +-- module/pipeline_step_s3_upload.go | 228 ++++++++++++ module/pipeline_step_s3_upload_test.go | 335 ++++++++++++++++++ .../pipeline_step_statemachine_transition.go | 16 +- ...eline_step_statemachine_transition_test.go | 12 +- module/platform_apigateway.go | 14 +- module/platform_dns_backends.go | 2 +- module/platform_do_app.go | 26 +- module/platform_do_networking.go | 10 +- module/platform_ecs.go | 28 +- module/platform_kubernetes_kind.go | 34 +- module/platform_provider.go | 2 +- module/policy_engine.go | 9 +- module/tracing_propagation.go | 4 +- plugins/pipelinesteps/plugin.go | 9 +- plugins/pipelinesteps/plugin_test.go | 5 +- 36 files changed, 780 insertions(+), 215 deletions(-) create mode 100644 module/pipeline_step_s3_upload.go create mode 100644 module/pipeline_step_s3_upload_test.go diff --git a/module/app_container.go b/module/app_container.go index 64613387..41f048e5 100644 --- a/module/app_container.go +++ b/module/app_container.go @@ -82,9 +82,9 @@ type ECSAppManifests struct { // ECSAppTaskDef represents an ECS task definition for an app container. type ECSAppTaskDef struct { - Family string `json:"family"` - CPU string `json:"cpu"` - Memory string `json:"memory"` + Family string `json:"family"` + CPU string `json:"cpu"` + Memory string `json:"memory"` Containers []ECSContainer `json:"containers"` } @@ -292,17 +292,17 @@ type K8sDeploymentManifest struct { // K8sServiceManifest represents a Kubernetes Service resource. type K8sServiceManifest struct { - APIVersion string `json:"apiVersion"` - Kind string `json:"kind"` - Metadata K8sObjectMeta `json:"metadata"` + APIVersion string `json:"apiVersion"` + Kind string `json:"kind"` + Metadata K8sObjectMeta `json:"metadata"` Spec K8sServiceSpec `json:"spec"` } // K8sIngressManifest represents a Kubernetes Ingress resource. type K8sIngressManifest struct { - APIVersion string `json:"apiVersion"` - Kind string `json:"kind"` - Metadata K8sObjectMeta `json:"metadata"` + APIVersion string `json:"apiVersion"` + Kind string `json:"kind"` + Metadata K8sObjectMeta `json:"metadata"` Spec K8sIngressSpec `json:"spec"` } @@ -408,8 +408,8 @@ type K8sIngressHTTP struct { // K8sIngressPath defines an HTTP path in an ingress rule. type K8sIngressPath struct { - Path string `json:"path"` - PathType string `json:"pathType"` + Path string `json:"path"` + PathType string `json:"pathType"` Backend K8sIngressBackend `json:"backend"` } diff --git a/module/argo_workflows.go b/module/argo_workflows.go index eeabd54a..6ffec49c 100644 --- a/module/argo_workflows.go +++ b/module/argo_workflows.go @@ -35,9 +35,9 @@ type ArgoWorkflowSpec struct { // ArgoTemplate is a single template (DAG or step list) within an Argo Workflow. type ArgoTemplate struct { - Name string `json:"name"` - Kind string `json:"kind"` // dag, steps, container - DAG []ArgoDAGTask `json:"dag,omitempty"` + Name string `json:"name"` + Kind string `json:"kind"` // dag, steps, container + DAG []ArgoDAGTask `json:"dag,omitempty"` Container *ArgoContainer `json:"container,omitempty"` } @@ -408,8 +408,8 @@ func (b *argoMockBackend) listWorkflows(m *ArgoWorkflowsModule, labelSelector st // argoRealBackend implements argoBackend using the Argo Workflows REST API. // It targets the Argo Server HTTP API (default port 2746). type argoRealBackend struct { - endpoint string // e.g. http://argo-server.argo.svc.cluster.local:2746 - token string // Bearer token (optional) + endpoint string // e.g. http://argo-server.argo.svc.cluster.local:2746 + token string // Bearer token (optional) httpClient *http.Client } diff --git a/module/cloud_account_aws_creds.go b/module/cloud_account_aws_creds.go index 49e2b4d4..8edc3e0f 100644 --- a/module/cloud_account_aws_creds.go +++ b/module/cloud_account_aws_creds.go @@ -21,7 +21,7 @@ func init() { // awsStaticResolver resolves AWS credentials from static config fields. type awsStaticResolver struct{} -func (r *awsStaticResolver) Provider() string { return "aws" } +func (r *awsStaticResolver) Provider() string { return "aws" } func (r *awsStaticResolver) CredentialType() string { return "static" } func (r *awsStaticResolver) Resolve(m *CloudAccount) error { @@ -38,7 +38,7 @@ func (r *awsStaticResolver) Resolve(m *CloudAccount) error { // awsEnvResolver resolves AWS credentials from environment variables. type awsEnvResolver struct{} -func (r *awsEnvResolver) Provider() string { return "aws" } +func (r *awsEnvResolver) Provider() string { return "aws" } func (r *awsEnvResolver) CredentialType() string { return "env" } func (r *awsEnvResolver) Resolve(m *CloudAccount) error { @@ -59,7 +59,7 @@ func (r *awsEnvResolver) Resolve(m *CloudAccount) error { // using aws-sdk-go-v2/config.LoadDefaultConfig with WithSharedConfigProfile. type awsProfileResolver struct{} -func (r *awsProfileResolver) Provider() string { return "aws" } +func (r *awsProfileResolver) Provider() string { return "aws" } func (r *awsProfileResolver) CredentialType() string { return "profile" } func (r *awsProfileResolver) Resolve(m *CloudAccount) error { @@ -102,7 +102,7 @@ func (r *awsProfileResolver) Resolve(m *CloudAccount) error { // sts:AssumeRole to obtain temporary credentials for the target role. type awsRoleARNResolver struct{} -func (r *awsRoleARNResolver) Provider() string { return "aws" } +func (r *awsRoleARNResolver) Provider() string { return "aws" } func (r *awsRoleARNResolver) CredentialType() string { return "role_arn" } func (r *awsRoleARNResolver) Resolve(m *CloudAccount) error { diff --git a/module/cloud_account_azure.go b/module/cloud_account_azure.go index 7d2dc492..54eb0de5 100644 --- a/module/cloud_account_azure.go +++ b/module/cloud_account_azure.go @@ -16,7 +16,7 @@ func init() { // azureStaticResolver resolves Azure credentials from static config fields. type azureStaticResolver struct{} -func (r *azureStaticResolver) Provider() string { return "azure" } +func (r *azureStaticResolver) Provider() string { return "azure" } func (r *azureStaticResolver) CredentialType() string { return "static" } func (r *azureStaticResolver) Resolve(m *CloudAccount) error { @@ -36,7 +36,7 @@ func (r *azureStaticResolver) Resolve(m *CloudAccount) error { // azureEnvResolver resolves Azure credentials from environment variables. type azureEnvResolver struct{} -func (r *azureEnvResolver) Provider() string { return "azure" } +func (r *azureEnvResolver) Provider() string { return "azure" } func (r *azureEnvResolver) CredentialType() string { return "env" } func (r *azureEnvResolver) Resolve(m *CloudAccount) error { @@ -52,7 +52,7 @@ func (r *azureEnvResolver) Resolve(m *CloudAccount) error { // azureClientCredentialsResolver resolves Azure service principal client credentials. type azureClientCredentialsResolver struct{} -func (r *azureClientCredentialsResolver) Provider() string { return "azure" } +func (r *azureClientCredentialsResolver) Provider() string { return "azure" } func (r *azureClientCredentialsResolver) CredentialType() string { return "client_credentials" } func (r *azureClientCredentialsResolver) Resolve(m *CloudAccount) error { @@ -74,7 +74,7 @@ func (r *azureClientCredentialsResolver) Resolve(m *CloudAccount) error { // Production: use github.com/Azure/azure-sdk-for-go/sdk/azidentity ManagedIdentityCredential. type azureManagedIdentityResolver struct{} -func (r *azureManagedIdentityResolver) Provider() string { return "azure" } +func (r *azureManagedIdentityResolver) Provider() string { return "azure" } func (r *azureManagedIdentityResolver) CredentialType() string { return "managed_identity" } func (r *azureManagedIdentityResolver) Resolve(m *CloudAccount) error { @@ -95,7 +95,7 @@ func (r *azureManagedIdentityResolver) Resolve(m *CloudAccount) error { // Production: use github.com/Azure/azure-sdk-for-go/sdk/azidentity AzureCLICredential. type azureCLIResolver struct{} -func (r *azureCLIResolver) Provider() string { return "azure" } +func (r *azureCLIResolver) Provider() string { return "azure" } func (r *azureCLIResolver) CredentialType() string { return "cli" } func (r *azureCLIResolver) Resolve(m *CloudAccount) error { diff --git a/module/cloud_account_do.go b/module/cloud_account_do.go index a194c3fa..12748c1c 100644 --- a/module/cloud_account_do.go +++ b/module/cloud_account_do.go @@ -18,7 +18,7 @@ func init() { // doStaticResolver resolves DigitalOcean credentials from static config fields. type doStaticResolver struct{} -func (r *doStaticResolver) Provider() string { return "digitalocean" } +func (r *doStaticResolver) Provider() string { return "digitalocean" } func (r *doStaticResolver) CredentialType() string { return "static" } func (r *doStaticResolver) Resolve(m *CloudAccount) error { @@ -32,7 +32,7 @@ func (r *doStaticResolver) Resolve(m *CloudAccount) error { // doEnvResolver resolves DigitalOcean credentials from environment variables. type doEnvResolver struct{} -func (r *doEnvResolver) Provider() string { return "digitalocean" } +func (r *doEnvResolver) Provider() string { return "digitalocean" } func (r *doEnvResolver) CredentialType() string { return "env" } func (r *doEnvResolver) Resolve(m *CloudAccount) error { @@ -46,7 +46,7 @@ func (r *doEnvResolver) Resolve(m *CloudAccount) error { // doAPITokenResolver resolves a DigitalOcean API token from explicit config. type doAPITokenResolver struct{} -func (r *doAPITokenResolver) Provider() string { return "digitalocean" } +func (r *doAPITokenResolver) Provider() string { return "digitalocean" } func (r *doAPITokenResolver) CredentialType() string { return "api_token" } func (r *doAPITokenResolver) Resolve(m *CloudAccount) error { diff --git a/module/cloud_account_gcp.go b/module/cloud_account_gcp.go index 601ba04d..d7cfa73b 100644 --- a/module/cloud_account_gcp.go +++ b/module/cloud_account_gcp.go @@ -17,7 +17,7 @@ func init() { // gcpStaticResolver resolves GCP credentials from static config fields. type gcpStaticResolver struct{} -func (r *gcpStaticResolver) Provider() string { return "gcp" } +func (r *gcpStaticResolver) Provider() string { return "gcp" } func (r *gcpStaticResolver) CredentialType() string { return "static" } func (r *gcpStaticResolver) Resolve(m *CloudAccount) error { @@ -37,7 +37,7 @@ func (r *gcpStaticResolver) Resolve(m *CloudAccount) error { // gcpEnvResolver resolves GCP credentials from environment variables. type gcpEnvResolver struct{} -func (r *gcpEnvResolver) Provider() string { return "gcp" } +func (r *gcpEnvResolver) Provider() string { return "gcp" } func (r *gcpEnvResolver) CredentialType() string { return "env" } func (r *gcpEnvResolver) Resolve(m *CloudAccount) error { @@ -59,7 +59,7 @@ func (r *gcpEnvResolver) Resolve(m *CloudAccount) error { // gcpServiceAccountJSONResolver reads a GCP service account JSON key file from the given path. type gcpServiceAccountJSONResolver struct{} -func (r *gcpServiceAccountJSONResolver) Provider() string { return "gcp" } +func (r *gcpServiceAccountJSONResolver) Provider() string { return "gcp" } func (r *gcpServiceAccountJSONResolver) CredentialType() string { return "service_account_json" } func (r *gcpServiceAccountJSONResolver) Resolve(m *CloudAccount) error { @@ -82,7 +82,7 @@ func (r *gcpServiceAccountJSONResolver) Resolve(m *CloudAccount) error { // gcpServiceAccountKeyResolver uses an inline GCP service account JSON key. type gcpServiceAccountKeyResolver struct{} -func (r *gcpServiceAccountKeyResolver) Provider() string { return "gcp" } +func (r *gcpServiceAccountKeyResolver) Provider() string { return "gcp" } func (r *gcpServiceAccountKeyResolver) CredentialType() string { return "service_account_key" } func (r *gcpServiceAccountKeyResolver) Resolve(m *CloudAccount) error { @@ -102,7 +102,7 @@ func (r *gcpServiceAccountKeyResolver) Resolve(m *CloudAccount) error { // Production: use golang.org/x/oauth2/google with google.FindDefaultCredentials. type gcpWorkloadIdentityResolver struct{} -func (r *gcpWorkloadIdentityResolver) Provider() string { return "gcp" } +func (r *gcpWorkloadIdentityResolver) Provider() string { return "gcp" } func (r *gcpWorkloadIdentityResolver) CredentialType() string { return "workload_identity" } func (r *gcpWorkloadIdentityResolver) Resolve(m *CloudAccount) error { @@ -117,7 +117,7 @@ func (r *gcpWorkloadIdentityResolver) Resolve(m *CloudAccount) error { // Reads GOOGLE_APPLICATION_CREDENTIALS if set; otherwise records the ADC source. type gcpApplicationDefaultResolver struct{} -func (r *gcpApplicationDefaultResolver) Provider() string { return "gcp" } +func (r *gcpApplicationDefaultResolver) Provider() string { return "gcp" } func (r *gcpApplicationDefaultResolver) CredentialType() string { return "application_default" } func (r *gcpApplicationDefaultResolver) Resolve(m *CloudAccount) error { diff --git a/module/cloud_account_k8s.go b/module/cloud_account_k8s.go index 773cf2fe..523cf9c9 100644 --- a/module/cloud_account_k8s.go +++ b/module/cloud_account_k8s.go @@ -14,7 +14,7 @@ func init() { // k8sStaticResolver resolves Kubernetes credentials from static config fields. type k8sStaticResolver struct{} -func (r *k8sStaticResolver) Provider() string { return "kubernetes" } +func (r *k8sStaticResolver) Provider() string { return "kubernetes" } func (r *k8sStaticResolver) CredentialType() string { return "static" } func (r *k8sStaticResolver) Resolve(m *CloudAccount) error { @@ -32,7 +32,7 @@ func (r *k8sStaticResolver) Resolve(m *CloudAccount) error { // k8sEnvResolver resolves Kubernetes credentials from the KUBECONFIG environment variable. type k8sEnvResolver struct{} -func (r *k8sEnvResolver) Provider() string { return "kubernetes" } +func (r *k8sEnvResolver) Provider() string { return "kubernetes" } func (r *k8sEnvResolver) CredentialType() string { return "env" } func (r *k8sEnvResolver) Resolve(m *CloudAccount) error { @@ -52,7 +52,7 @@ func (r *k8sEnvResolver) Resolve(m *CloudAccount) error { // k8sKubeconfigResolver resolves Kubernetes credentials from a kubeconfig file or inline content. type k8sKubeconfigResolver struct{} -func (r *k8sKubeconfigResolver) Provider() string { return "kubernetes" } +func (r *k8sKubeconfigResolver) Provider() string { return "kubernetes" } func (r *k8sKubeconfigResolver) CredentialType() string { return "kubeconfig" } func (r *k8sKubeconfigResolver) Resolve(m *CloudAccount) error { diff --git a/module/gitlab_client_module.go b/module/gitlab_client_module.go index ae86f95c..0a4c3eea 100644 --- a/module/gitlab_client_module.go +++ b/module/gitlab_client_module.go @@ -13,11 +13,11 @@ import ( // // Config: // -// - name: gitlab-client -// type: gitlab.client -// config: -// url: "https://gitlab.com" # or self-hosted URL; use "mock://" for testing -// token: "${GITLAB_TOKEN}" +// - name: gitlab-client +// type: gitlab.client +// config: +// url: "https://gitlab.com" # or self-hosted URL; use "mock://" for testing +// token: "${GITLAB_TOKEN}" type GitLabClientModule struct { name string config map[string]any diff --git a/module/gitlab_webhook.go b/module/gitlab_webhook.go index 110729ce..4bbab18a 100644 --- a/module/gitlab_webhook.go +++ b/module/gitlab_webhook.go @@ -34,12 +34,12 @@ type GitEvent struct { // // Config: // -// - name: gitlab-hooks -// type: gitlab.webhook -// config: -// secret: "${GITLAB_WEBHOOK_SECRET}" -// path: /webhooks/gitlab # optional, default: /webhooks/gitlab -// events: [push, merge_request, tag_push, pipeline] +// - name: gitlab-hooks +// type: gitlab.webhook +// config: +// secret: "${GITLAB_WEBHOOK_SECRET}" +// path: /webhooks/gitlab # optional, default: /webhooks/gitlab +// events: [push, merge_request, tag_push, pipeline] type GitLabWebhookModule struct { name string config map[string]any @@ -255,10 +255,10 @@ func normalizeGitLabEventType(header string) string { // GitLabWebhookParseStep is a pipeline step that parses a GitLab webhook from // the HTTP request in the pipeline context. // -// - name: parse-webhook -// type: step.gitlab_parse_webhook -// config: -// secret: "${GITLAB_WEBHOOK_SECRET}" # optional; skips validation if empty +// - name: parse-webhook +// type: step.gitlab_parse_webhook +// config: +// secret: "${GITLAB_WEBHOOK_SECRET}" # optional; skips validation if empty type GitLabWebhookParseStep struct { name string secret string diff --git a/module/jwt_auth.go b/module/jwt_auth.go index d2d86157..f047e0d1 100644 --- a/module/jwt_auth.go +++ b/module/jwt_auth.go @@ -30,20 +30,20 @@ type User struct { // When an auth.user-store service is available, it delegates user CRUD to it; // otherwise it uses its own internal map for backward compatibility. type JWTAuthModule struct { - name string - secret string - tokenExpiry time.Duration - issuer string - seedFile string - responseFormat string // "standard" (default) or "v1" (access_token/refresh_token) - users map[string]*User // keyed by email (used when no external userStore) - mu sync.RWMutex - nextID int - app modular.Application - logger modular.Logger - persistence *PersistenceStore // optional write-through backend - userStore *UserStore // optional external user store (from auth.user-store module) - allowRegistration bool // when true, any visitor may self-register + name string + secret string + tokenExpiry time.Duration + issuer string + seedFile string + responseFormat string // "standard" (default) or "v1" (access_token/refresh_token) + users map[string]*User // keyed by email (used when no external userStore) + mu sync.RWMutex + nextID int + app modular.Application + logger modular.Logger + persistence *PersistenceStore // optional write-through backend + userStore *UserStore // optional external user store (from auth.user-store module) + allowRegistration bool // when true, any visitor may self-register } // NewJWTAuthModule creates a new JWT auth module diff --git a/module/nosql_dynamodb.go b/module/nosql_dynamodb.go index 48e5afd2..4037bf36 100644 --- a/module/nosql_dynamodb.go +++ b/module/nosql_dynamodb.go @@ -18,7 +18,7 @@ import ( type DynamoDBNoSQLConfig struct { TableName string `json:"tableName" yaml:"tableName"` Region string `json:"region" yaml:"region"` - Endpoint string `json:"endpoint" yaml:"endpoint"` // "local" => in-memory fallback + Endpoint string `json:"endpoint" yaml:"endpoint"` // "local" => in-memory fallback Credentials string `json:"credentials" yaml:"credentials"` // ref to cloud.account module name } diff --git a/module/nosql_memory.go b/module/nosql_memory.go index 6356116a..9a4ddd73 100644 --- a/module/nosql_memory.go +++ b/module/nosql_memory.go @@ -17,10 +17,10 @@ type MemoryNoSQLConfig struct { // MemoryNoSQL is a thread-safe in-memory NoSQL store. // type: nosql.memory — useful for testing and local scenarios. type MemoryNoSQL struct { - name string - cfg MemoryNoSQLConfig - mu sync.RWMutex - items map[string]map[string]any + name string + cfg MemoryNoSQLConfig + mu sync.RWMutex + items map[string]map[string]any } // NewMemoryNoSQL creates a new MemoryNoSQL module. diff --git a/module/nosql_mongodb.go b/module/nosql_mongodb.go index f12cb875..1f5aca09 100644 --- a/module/nosql_mongodb.go +++ b/module/nosql_mongodb.go @@ -14,7 +14,7 @@ import ( // // When uri == "memory://" the module falls back to the in-memory backend. type MongoDBNoSQLConfig struct { - URI string `json:"uri" yaml:"uri"` // "memory://" => in-memory fallback + URI string `json:"uri" yaml:"uri"` // "memory://" => in-memory fallback Database string `json:"database" yaml:"database"` Collection string `json:"collection" yaml:"collection"` } diff --git a/module/pipeline_step_argo.go b/module/pipeline_step_argo.go index b5502a27..7861f7d9 100644 --- a/module/pipeline_step_argo.go +++ b/module/pipeline_step_argo.go @@ -54,11 +54,11 @@ func (s *ArgoSubmitStep) Execute(_ context.Context, _ *PipelineContext) (*StepRe return nil, fmt.Errorf("argo_submit step %q: %w", s.name, err) } return &StepResult{Output: map[string]any{ - "workflow_run": runName, - "service": s.service, + "workflow_run": runName, + "service": s.service, "workflow_name": s.wfName, - "namespace": spec.Namespace, - "templates": len(spec.Templates), + "namespace": spec.Namespace, + "templates": len(spec.Templates), }}, nil } @@ -66,10 +66,10 @@ func (s *ArgoSubmitStep) Execute(_ context.Context, _ *PipelineContext) (*StepRe // ArgoStatusStep checks the execution status of an Argo Workflow run. type ArgoStatusStep struct { - name string - service string - workflowRun string - app modular.Application + name string + service string + workflowRun string + app modular.Application } // NewArgoStatusStepFactory returns a StepFactory for step.argo_status. diff --git a/module/pipeline_step_artifact.go b/module/pipeline_step_artifact.go index 734eba6f..d06c5981 100644 --- a/module/pipeline_step_artifact.go +++ b/module/pipeline_step_artifact.go @@ -281,7 +281,7 @@ func (s *ArtifactListStep) Execute(ctx context.Context, pc *PipelineContext) (*S return &StepResult{Output: map[string]any{ s.output: items, - "count": len(items), + "count": len(items), }}, nil } @@ -340,4 +340,3 @@ func (s *ArtifactDeleteStep) Execute(ctx context.Context, pc *PipelineContext) ( "deleted": true, }}, nil } - diff --git a/module/pipeline_step_build_binary.go b/module/pipeline_step_build_binary.go index 144b84f8..d95547a2 100644 --- a/module/pipeline_step_build_binary.go +++ b/module/pipeline_step_build_binary.go @@ -16,16 +16,16 @@ import ( // BuildBinaryStep reads a workflow config YAML, generates a self-contained Go // project that embeds the config, and compiles it into a standalone binary. type BuildBinaryStep struct { - name string - configFile string - output string - targetOS string - targetArch string - embedConfig bool - modulePath string - goVersion string - dryRun bool - execCommand func(ctx context.Context, name string, args ...string) *exec.Cmd + name string + configFile string + output string + targetOS string + targetArch string + embedConfig bool + modulePath string + goVersion string + dryRun bool + execCommand func(ctx context.Context, name string, args ...string) *exec.Cmd } // NewBuildBinaryStepFactory returns a StepFactory that creates BuildBinaryStep instances. diff --git a/module/pipeline_step_cache_get.go b/module/pipeline_step_cache_get.go index ebd1e67b..72a21c61 100644 --- a/module/pipeline_step_cache_get.go +++ b/module/pipeline_step_cache_get.go @@ -81,7 +81,7 @@ func (s *CacheGetStep) Execute(ctx context.Context, pc *PipelineContext) (*StepR return nil, fmt.Errorf("cache_get step %q: cache miss for key %q", s.name, resolvedKey) } return &StepResult{Output: map[string]any{ - s.output: "", + s.output: "", "cache_hit": false, }}, nil } @@ -89,7 +89,7 @@ func (s *CacheGetStep) Execute(ctx context.Context, pc *PipelineContext) (*StepR } return &StepResult{Output: map[string]any{ - s.output: val, + s.output: val, "cache_hit": true, }}, nil } diff --git a/module/pipeline_step_dlq_test.go b/module/pipeline_step_dlq_test.go index 686b35f1..652c4571 100644 --- a/module/pipeline_step_dlq_test.go +++ b/module/pipeline_step_dlq_test.go @@ -27,11 +27,11 @@ type dlqMockBroker struct { producer *dlqMockProducer } -func (b *dlqMockBroker) Producer() MessageProducer { return b.producer } -func (b *dlqMockBroker) Consumer() MessageConsumer { return nil } -func (b *dlqMockBroker) Subscribe(_ string, _ MessageHandler) error { return nil } -func (b *dlqMockBroker) Start(_ context.Context) error { return nil } -func (b *dlqMockBroker) Stop(_ context.Context) error { return nil } +func (b *dlqMockBroker) Producer() MessageProducer { return b.producer } +func (b *dlqMockBroker) Consumer() MessageConsumer { return nil } +func (b *dlqMockBroker) Subscribe(_ string, _ MessageHandler) error { return nil } +func (b *dlqMockBroker) Start(_ context.Context) error { return nil } +func (b *dlqMockBroker) Stop(_ context.Context) error { return nil } // newAppWithDLQBroker creates a MockApplication with the broker registered under "test-broker". func newAppWithDLQBroker() (*MockApplication, *dlqMockProducer) { diff --git a/module/pipeline_step_event_publish_test.go b/module/pipeline_step_event_publish_test.go index e324788f..1c3ae1b5 100644 --- a/module/pipeline_step_event_publish_test.go +++ b/module/pipeline_step_event_publish_test.go @@ -36,11 +36,11 @@ func newMockBroker() *mockBroker { return &mockBroker{producer: &mockBrokerProducer{}} } -func (b *mockBroker) Producer() MessageProducer { return b.producer } -func (b *mockBroker) Consumer() MessageConsumer { return nil } +func (b *mockBroker) Producer() MessageProducer { return b.producer } +func (b *mockBroker) Consumer() MessageConsumer { return nil } func (b *mockBroker) Subscribe(_ string, _ MessageHandler) error { return nil } -func (b *mockBroker) Start(_ context.Context) error { return nil } -func (b *mockBroker) Stop(_ context.Context) error { return nil } +func (b *mockBroker) Start(_ context.Context) error { return nil } +func (b *mockBroker) Stop(_ context.Context) error { return nil } func mockAppWithBroker(name string, broker MessageBroker) *MockApplication { app := NewMockApplication() diff --git a/module/pipeline_step_foreach_test.go b/module/pipeline_step_foreach_test.go index fa4ba18e..ad4142c4 100644 --- a/module/pipeline_step_foreach_test.go +++ b/module/pipeline_step_foreach_test.go @@ -470,8 +470,8 @@ func TestForEachStep_StepAndStepsMutuallyExclusive(t *testing.T) { _, err := buildTestForEachStep(t, "foreach-both", map[string]any{ "collection": "items", "step": map[string]any{ - "type": "step.set", - "name": "s", + "type": "step.set", + "name": "s", "values": map[string]any{"x": "1"}, }, "steps": []any{ diff --git a/module/pipeline_step_gitlab.go b/module/pipeline_step_gitlab.go index 33381f9d..20d9db2b 100644 --- a/module/pipeline_step_gitlab.go +++ b/module/pipeline_step_gitlab.go @@ -10,13 +10,13 @@ import ( // NewGitLabTriggerPipelineStepFactory returns a StepFactory for step.gitlab_trigger_pipeline. // -// - type: step.gitlab_trigger_pipeline -// config: -// client: gitlab-client # name of the gitlab.client module -// project: "group/project" # project path or numeric ID -// ref: main -// variables: -// KEY: value +// - type: step.gitlab_trigger_pipeline +// config: +// client: gitlab-client # name of the gitlab.client module +// project: "group/project" # project path or numeric ID +// ref: main +// variables: +// KEY: value func NewGitLabTriggerPipelineStepFactory() StepFactory { return func(name string, config map[string]any, app modular.Application) (PipelineStep, error) { clientName, _ := config["client"].(string) @@ -82,11 +82,11 @@ func (s *gitLabTriggerPipelineStep) Execute(_ context.Context, _ *PipelineContex // NewGitLabPipelineStatusStepFactory returns a StepFactory for step.gitlab_pipeline_status. // -// - type: step.gitlab_pipeline_status -// config: -// client: gitlab-client -// project: "group/project" -// pipeline_id: "42" # string or int +// - type: step.gitlab_pipeline_status +// config: +// client: gitlab-client +// project: "group/project" +// pipeline_id: "42" # string or int func NewGitLabPipelineStatusStepFactory() StepFactory { return func(name string, config map[string]any, app modular.Application) (PipelineStep, error) { clientName, _ := config["client"].(string) @@ -152,14 +152,14 @@ func (s *gitLabPipelineStatusStep) Execute(_ context.Context, _ *PipelineContext // NewGitLabCreateMRStepFactory returns a StepFactory for step.gitlab_create_mr. // -// - type: step.gitlab_create_mr -// config: -// client: gitlab-client -// project: "group/project" -// source_branch: feature-x -// target_branch: main -// title: "Feature X" -// description: "Optional description" +// - type: step.gitlab_create_mr +// config: +// client: gitlab-client +// project: "group/project" +// source_branch: feature-x +// target_branch: main +// title: "Feature X" +// description: "Optional description" func NewGitLabCreateMRStepFactory() StepFactory { return func(name string, config map[string]any, app modular.Application) (PipelineStep, error) { clientName, _ := config["client"].(string) @@ -237,12 +237,12 @@ func (s *gitLabCreateMRStep) Execute(_ context.Context, _ *PipelineContext) (*St // NewGitLabMRCommentStepFactory returns a StepFactory for step.gitlab_mr_comment. // -// - type: step.gitlab_mr_comment -// config: -// client: gitlab-client -// project: "group/project" -// mr_iid: 42 -// body: "Pipeline passed!" +// - type: step.gitlab_mr_comment +// config: +// client: gitlab-client +// project: "group/project" +// mr_iid: 42 +// body: "Pipeline passed!" func NewGitLabMRCommentStepFactory() StepFactory { return func(name string, config map[string]any, app modular.Application) (PipelineStep, error) { clientName, _ := config["client"].(string) diff --git a/module/pipeline_step_s3_upload.go b/module/pipeline_step_s3_upload.go new file mode 100644 index 00000000..5a42ee7b --- /dev/null +++ b/module/pipeline_step_s3_upload.go @@ -0,0 +1,228 @@ +package module + +import ( + "bytes" + "context" + "encoding/base64" + "fmt" + "os" + "strings" + + "github.com/CrisisTextLine/modular" + awsconfig "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/s3" +) + +// s3PutObjectAPI is the minimal S3 interface needed by S3UploadStep, +// allowing injection of a mock client in tests. +type s3PutObjectAPI interface { + PutObject(ctx context.Context, params *s3.PutObjectInput, optFns ...func(*s3.Options)) (*s3.PutObjectOutput, error) +} + +// S3UploadStep uploads binary data (base64-encoded in the pipeline context) +// to S3-compatible object storage and returns the public URL, key, and bucket. +type S3UploadStep struct { + name string + bucket string + region string + key string // may contain Go template expressions (e.g. "avatars/{{.user_id}}/{{uuid}}.{{.ext}}") + bodyFrom string // dot-path to base64-encoded body in the pipeline context + contentTypeFrom string // dot-path to MIME type (optional) + contentType string // static MIME type (optional) + endpoint string // custom S3 endpoint for MinIO/LocalStack (optional) + tmpl *TemplateEngine + s3Client s3PutObjectAPI // injected in tests; nil triggers lazy init +} + +// NewS3UploadStepFactory returns a StepFactory that creates S3UploadStep instances. +func NewS3UploadStepFactory() StepFactory { + return func(name string, config map[string]any, _ modular.Application) (PipelineStep, error) { + bucket := os.ExpandEnv(s3UploadStringConfig(config, "bucket")) + if bucket == "" { + return nil, fmt.Errorf("s3_upload step %q: 'bucket' is required", name) + } + + region := os.ExpandEnv(s3UploadStringConfig(config, "region")) + if region == "" { + return nil, fmt.Errorf("s3_upload step %q: 'region' is required", name) + } + + key := s3UploadStringConfig(config, "key") + if key == "" { + return nil, fmt.Errorf("s3_upload step %q: 'key' is required", name) + } + + bodyFrom := s3UploadStringConfig(config, "body_from") + if bodyFrom == "" { + return nil, fmt.Errorf("s3_upload step %q: 'body_from' is required", name) + } + + return &S3UploadStep{ + name: name, + bucket: bucket, + region: region, + key: key, + bodyFrom: bodyFrom, + contentTypeFrom: s3UploadStringConfig(config, "content_type_from"), + contentType: s3UploadStringConfig(config, "content_type"), + endpoint: os.ExpandEnv(s3UploadStringConfig(config, "endpoint")), + tmpl: NewTemplateEngine(), + }, nil + } +} + +// s3UploadStringConfig extracts a string value from a config map, returning "" +// if the key is absent or the value is not a string. +func s3UploadStringConfig(config map[string]any, key string) string { + v, _ := config[key].(string) + return v +} + +// Name returns the step name. +func (s *S3UploadStep) Name() string { return s.name } + +// Execute uploads binary data from the pipeline context to S3 and returns the +// public URL, the resolved key, and the bucket name as step output. +func (s *S3UploadStep) Execute(ctx context.Context, pc *PipelineContext) (*StepResult, error) { + // Resolve the key template (supports {{ .field }} and {{ uuid }} etc.) + resolvedKey, err := s.tmpl.Resolve(s.key, pc) + if err != nil { + return nil, fmt.Errorf("s3_upload step %q: failed to resolve key template: %w", s.name, err) + } + + // Resolve body_from dot-path to obtain the base64-encoded body. + bodyVal, err := s.resolveFromPath(pc, s.bodyFrom) + if err != nil { + return nil, fmt.Errorf("s3_upload step %q: body_from %q: %w", s.name, s.bodyFrom, err) + } + + bodyStr, ok := bodyVal.(string) + if !ok { + return nil, fmt.Errorf("s3_upload step %q: body_from value must be a base64-encoded string, got %T", s.name, bodyVal) + } + + bodyBytes, err := s3UploadDecodeBase64(bodyStr) + if err != nil { + return nil, fmt.Errorf("s3_upload step %q: failed to base64-decode body: %w", s.name, err) + } + + // Resolve content type (content_type_from takes precedence over content_type). + contentType, err := s.resolveContentType(pc) + if err != nil { + return nil, fmt.Errorf("s3_upload step %q: %w", s.name, err) + } + + // Obtain the S3 client (injected or built from config). + client, err := s.getClient(ctx) + if err != nil { + return nil, fmt.Errorf("s3_upload step %q: failed to build S3 client: %w", s.name, err) + } + + input := &s3.PutObjectInput{ + Bucket: &s.bucket, + Key: &resolvedKey, + Body: bytes.NewReader(bodyBytes), + } + if contentType != "" { + input.ContentType = &contentType + } + + if _, err = client.PutObject(ctx, input); err != nil { + return nil, fmt.Errorf("s3_upload step %q: PutObject failed: %w", s.name, err) + } + + return &StepResult{Output: map[string]any{ + "url": s.buildURL(resolvedKey), + "key": resolvedKey, + "bucket": s.bucket, + }}, nil +} + +// resolveFromPath walks a dot-separated path (e.g. "steps.parse.data") through +// the pipeline context, including step outputs under the "steps" key. +func (s *S3UploadStep) resolveFromPath(pc *PipelineContext, path string) (any, error) { + data := make(map[string]any, len(pc.Current)+1) + for k, v := range pc.Current { + data[k] = v + } + if len(pc.StepOutputs) > 0 { + steps := make(map[string]any, len(pc.StepOutputs)) + for k, v := range pc.StepOutputs { + steps[k] = v + } + data["steps"] = steps + } + return resolveDottedPath(data, path) +} + +// resolveContentType returns the effective content type: +// content_type_from (dot-path lookup) takes precedence; falls back to the +// static content_type field. +func (s *S3UploadStep) resolveContentType(pc *PipelineContext) (string, error) { + if s.contentTypeFrom != "" { + ctVal, err := s.resolveFromPath(pc, s.contentTypeFrom) + if err != nil { + return "", fmt.Errorf("content_type_from %q: %w", s.contentTypeFrom, err) + } + if ct, ok := ctVal.(string); ok { + return ct, nil + } + } + return s.contentType, nil +} + +// getClient returns the injected client if set, otherwise builds a new AWS S3 +// client from the step's region and optional custom endpoint. +func (s *S3UploadStep) getClient(ctx context.Context) (s3PutObjectAPI, error) { + if s.s3Client != nil { + return s.s3Client, nil + } + + cfg, err := awsconfig.LoadDefaultConfig(ctx, awsconfig.WithRegion(s.region)) + if err != nil { + return nil, fmt.Errorf("failed to load AWS config: %w", err) + } + + var s3Opts []func(*s3.Options) + if s.endpoint != "" { + ep := s.endpoint + s3Opts = append(s3Opts, func(o *s3.Options) { + o.BaseEndpoint = &ep + o.UsePathStyle = true + }) + } + + return s3.NewFromConfig(cfg, s3Opts...), nil +} + +// buildURL constructs the public URL for the uploaded object. +// When a custom endpoint is configured (MinIO, LocalStack, etc.) it uses +// path-style: {endpoint}/{bucket}/{key}. +// Otherwise it uses the standard AWS virtual-hosted URL: +// https://{bucket}.s3.{region}.amazonaws.com/{key}. +func (s *S3UploadStep) buildURL(key string) string { + if s.endpoint != "" { + ep := strings.TrimRight(s.endpoint, "/") + return fmt.Sprintf("%s/%s/%s", ep, s.bucket, key) + } + return fmt.Sprintf("https://%s.s3.%s.amazonaws.com/%s", s.bucket, s.region, key) +} + +// s3UploadDecodeBase64 attempts standard base64, then URL-safe base64, +// then the raw (no-padding) variants, returning the first successful decode. +func s3UploadDecodeBase64(encoded string) ([]byte, error) { + if b, err := base64.StdEncoding.DecodeString(encoded); err == nil { + return b, nil + } + if b, err := base64.URLEncoding.DecodeString(encoded); err == nil { + return b, nil + } + if b, err := base64.RawStdEncoding.DecodeString(encoded); err == nil { + return b, nil + } + b, err := base64.RawURLEncoding.DecodeString(encoded) + if err != nil { + return nil, err + } + return b, nil +} diff --git a/module/pipeline_step_s3_upload_test.go b/module/pipeline_step_s3_upload_test.go new file mode 100644 index 00000000..6429b728 --- /dev/null +++ b/module/pipeline_step_s3_upload_test.go @@ -0,0 +1,335 @@ +package module + +import ( + "context" + "encoding/base64" + "strings" + "testing" + + "github.com/aws/aws-sdk-go-v2/service/s3" +) + +// mockS3Uploader is an in-memory S3 client for testing. +type mockS3Uploader struct { + lastInput *s3.PutObjectInput + err error +} + +func (m *mockS3Uploader) PutObject(_ context.Context, input *s3.PutObjectInput, _ ...func(*s3.Options)) (*s3.PutObjectOutput, error) { + m.lastInput = input + return &s3.PutObjectOutput{}, m.err +} + +func TestS3UploadStep_BasicUpload(t *testing.T) { + mock := &mockS3Uploader{} + factory := NewS3UploadStepFactory() + step, err := factory("upload", map[string]any{ + "bucket": "my-bucket", + "region": "us-east-1", + "key": "files/test.png", + "body_from": "file_data", + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + step.(*S3UploadStep).s3Client = mock + + body := base64.StdEncoding.EncodeToString([]byte("hello world")) + pc := NewPipelineContext(map[string]any{"file_data": body}, nil) + + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + if result.Output["key"] != "files/test.png" { + t.Errorf("expected key 'files/test.png', got %v", result.Output["key"]) + } + if result.Output["bucket"] != "my-bucket" { + t.Errorf("expected bucket 'my-bucket', got %v", result.Output["bucket"]) + } + want := "https://my-bucket.s3.us-east-1.amazonaws.com/files/test.png" + if result.Output["url"] != want { + t.Errorf("expected url %q, got %v", want, result.Output["url"]) + } +} + +func TestS3UploadStep_TemplatedKey(t *testing.T) { + mock := &mockS3Uploader{} + factory := NewS3UploadStepFactory() + step, err := factory("upload-tmpl", map[string]any{ + "bucket": "avatars", + "region": "us-west-2", + "key": "avatars/{{ .user_id }}/photo.{{ .ext }}", + "body_from": "data", + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + step.(*S3UploadStep).s3Client = mock + + body := base64.StdEncoding.EncodeToString([]byte("png-data")) + pc := NewPipelineContext(map[string]any{ + "data": body, + "user_id": "u-123", + "ext": "png", + }, nil) + + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + if result.Output["key"] != "avatars/u-123/photo.png" { + t.Errorf("expected resolved key, got %v", result.Output["key"]) + } +} + +func TestS3UploadStep_StaticContentType(t *testing.T) { + mock := &mockS3Uploader{} + factory := NewS3UploadStepFactory() + step, err := factory("upload-ct", map[string]any{ + "bucket": "my-bucket", + "region": "us-east-1", + "key": "img.png", + "body_from": "data", + "content_type": "image/png", + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + step.(*S3UploadStep).s3Client = mock + + body := base64.StdEncoding.EncodeToString([]byte("png")) + pc := NewPipelineContext(map[string]any{"data": body}, nil) + + _, err = step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + if mock.lastInput == nil { + t.Fatal("expected PutObject to be called") + } + if mock.lastInput.ContentType == nil || *mock.lastInput.ContentType != "image/png" { + t.Errorf("expected ContentType 'image/png', got %v", mock.lastInput.ContentType) + } +} + +func TestS3UploadStep_ContentTypeFrom(t *testing.T) { + mock := &mockS3Uploader{} + factory := NewS3UploadStepFactory() + step, err := factory("upload-ctf", map[string]any{ + "bucket": "my-bucket", + "region": "us-east-1", + "key": "upload", + "body_from": "data", + "content_type_from": "mime", + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + step.(*S3UploadStep).s3Client = mock + + body := base64.StdEncoding.EncodeToString([]byte("bytes")) + pc := NewPipelineContext(map[string]any{ + "data": body, + "mime": "image/jpeg", + }, nil) + + _, err = step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + if mock.lastInput == nil || mock.lastInput.ContentType == nil || *mock.lastInput.ContentType != "image/jpeg" { + t.Errorf("expected ContentType 'image/jpeg', got %v", mock.lastInput.ContentType) + } +} + +func TestS3UploadStep_CustomEndpoint(t *testing.T) { + mock := &mockS3Uploader{} + factory := NewS3UploadStepFactory() + step, err := factory("upload-minio", map[string]any{ + "bucket": "mybucket", + "region": "us-east-1", + "key": "obj/key", + "body_from": "data", + "endpoint": "http://localhost:9000", + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + step.(*S3UploadStep).s3Client = mock + + body := base64.StdEncoding.EncodeToString([]byte("data")) + pc := NewPipelineContext(map[string]any{"data": body}, nil) + + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + want := "http://localhost:9000/mybucket/obj/key" + if result.Output["url"] != want { + t.Errorf("expected url %q, got %v", want, result.Output["url"]) + } +} + +func TestS3UploadStep_BodyFromStepOutput(t *testing.T) { + mock := &mockS3Uploader{} + factory := NewS3UploadStepFactory() + step, err := factory("upload-step", map[string]any{ + "bucket": "bucket", + "region": "us-east-1", + "key": "file", + "body_from": "steps.parse.raw_data", + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + step.(*S3UploadStep).s3Client = mock + + body := base64.StdEncoding.EncodeToString([]byte("content")) + pc := NewPipelineContext(nil, nil) + pc.MergeStepOutput("parse", map[string]any{"raw_data": body}) + + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + if result.Output["key"] != "file" { + t.Errorf("expected key 'file', got %v", result.Output["key"]) + } +} + +func TestS3UploadStep_MissingRequiredFields(t *testing.T) { + factory := NewS3UploadStepFactory() + + tests := []struct { + name string + config map[string]any + wantErr string + }{ + { + name: "missing bucket", + config: map[string]any{"region": "us-east-1", "key": "k", "body_from": "b"}, + wantErr: "'bucket' is required", + }, + { + name: "missing region", + config: map[string]any{"bucket": "b", "key": "k", "body_from": "b"}, + wantErr: "'region' is required", + }, + { + name: "missing key", + config: map[string]any{"bucket": "b", "region": "us-east-1", "body_from": "b"}, + wantErr: "'key' is required", + }, + { + name: "missing body_from", + config: map[string]any{"bucket": "b", "region": "us-east-1", "key": "k"}, + wantErr: "'body_from' is required", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, err := factory("test-step", tt.config, nil) + if err == nil { + t.Fatal("expected error, got nil") + } + if !strings.Contains(err.Error(), tt.wantErr) { + t.Errorf("expected error containing %q, got %q", tt.wantErr, err.Error()) + } + }) + } +} + +func TestS3UploadStep_InvalidBase64(t *testing.T) { + mock := &mockS3Uploader{} + factory := NewS3UploadStepFactory() + step, err := factory("upload-bad", map[string]any{ + "bucket": "b", + "region": "us-east-1", + "key": "k", + "body_from": "data", + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + step.(*S3UploadStep).s3Client = mock + + pc := NewPipelineContext(map[string]any{"data": "not-valid-base64!!!"}, nil) + _, err = step.Execute(context.Background(), pc) + if err == nil { + t.Fatal("expected error for invalid base64") + } + if !strings.Contains(err.Error(), "base64-decode") { + t.Errorf("expected base64 decode error, got %q", err.Error()) + } +} + +func TestS3UploadStep_NonStringBody(t *testing.T) { + mock := &mockS3Uploader{} + factory := NewS3UploadStepFactory() + step, err := factory("upload-nonstr", map[string]any{ + "bucket": "b", + "region": "us-east-1", + "key": "k", + "body_from": "data", + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + step.(*S3UploadStep).s3Client = mock + + pc := NewPipelineContext(map[string]any{"data": 12345}, nil) + _, err = step.Execute(context.Background(), pc) + if err == nil { + t.Fatal("expected error for non-string body") + } + if !strings.Contains(err.Error(), "base64-encoded string") { + t.Errorf("expected type error, got %q", err.Error()) + } +} + +func TestS3UploadStep_URLEncoding(t *testing.T) { + // Ensure URL-safe base64 is also accepted. + mock := &mockS3Uploader{} + factory := NewS3UploadStepFactory() + step, err := factory("upload-urlb64", map[string]any{ + "bucket": "b", + "region": "us-east-1", + "key": "k", + "body_from": "data", + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + step.(*S3UploadStep).s3Client = mock + + // Use URL-safe base64 encoding. + body := base64.URLEncoding.EncodeToString([]byte("url-safe content")) + pc := NewPipelineContext(map[string]any{"data": body}, nil) + _, err = step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } +} + +func TestS3UploadStep_Name(t *testing.T) { + factory := NewS3UploadStepFactory() + step, err := factory("my-upload", map[string]any{ + "bucket": "b", + "region": "us-east-1", + "key": "k", + "body_from": "d", + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + if step.Name() != "my-upload" { + t.Errorf("expected name 'my-upload', got %q", step.Name()) + } +} diff --git a/module/pipeline_step_statemachine_transition.go b/module/pipeline_step_statemachine_transition.go index 7b83f00c..450d9323 100644 --- a/module/pipeline_step_statemachine_transition.go +++ b/module/pipeline_step_statemachine_transition.go @@ -9,14 +9,14 @@ import ( // StateMachineTransitionStep triggers a state machine transition from within a pipeline. type StateMachineTransitionStep struct { - name string - statemachine string - entityID string - event string - data map[string]any - failOnError bool - app modular.Application - tmpl *TemplateEngine + name string + statemachine string + entityID string + event string + data map[string]any + failOnError bool + app modular.Application + tmpl *TemplateEngine } // NewStateMachineTransitionStepFactory returns a StepFactory for step.statemachine_transition. diff --git a/module/pipeline_step_statemachine_transition_test.go b/module/pipeline_step_statemachine_transition_test.go index 3cb27865..12cfa008 100644 --- a/module/pipeline_step_statemachine_transition_test.go +++ b/module/pipeline_step_statemachine_transition_test.go @@ -167,9 +167,9 @@ func TestStateMachineTransitionStep_InvalidTransition_FailOnErrorFalse(t *testin factory := NewStateMachineTransitionStepFactory() // "reject" is valid, but "approve" from "approved" is not — trigger approve twice step, err := factory("double-approve", map[string]any{ - "statemachine": "order-sm", - "entity_id": "order-1", - "event": "approve", + "statemachine": "order-sm", + "entity_id": "order-1", + "event": "approve", "fail_on_error": false, }, app) if err != nil { @@ -370,9 +370,9 @@ func TestStateMachineTransitionStep_MockTrigger_Error_NoFail(t *testing.T) { factory := NewStateMachineTransitionStepFactory() step, err := factory("mock-fail", map[string]any{ - "statemachine": "order-sm", - "entity_id": "order-1", - "event": "approve", + "statemachine": "order-sm", + "entity_id": "order-1", + "event": "approve", "fail_on_error": false, }, app) if err != nil { diff --git a/module/platform_apigateway.go b/module/platform_apigateway.go index 859b8fd1..82014f1d 100644 --- a/module/platform_apigateway.go +++ b/module/platform_apigateway.go @@ -386,9 +386,9 @@ func (b *awsAPIGatewayBackend) apply(m *PlatformAPIGateway) (*PlatformGatewaySta } if cors := m.platformCORS(); cors != nil { createInput.CorsConfiguration = &apigwtypes.Cors{ - AllowOrigins: cors.AllowOrigins, - AllowMethods: cors.AllowMethods, - AllowHeaders: cors.AllowHeaders, + AllowOrigins: cors.AllowOrigins, + AllowMethods: cors.AllowMethods, + AllowHeaders: cors.AllowHeaders, } } apiOut, err := client.CreateApi(context.Background(), createInput) @@ -419,10 +419,10 @@ func (b *awsAPIGatewayBackend) apply(m *PlatformAPIGateway) (*PlatformGatewaySta for _, route := range routes { // Create integration for the route target integOut, err := client.CreateIntegration(context.Background(), &apigatewayv2.CreateIntegrationInput{ - ApiId: aws.String(apiID), - IntegrationType: apigwtypes.IntegrationTypeHttpProxy, - IntegrationUri: aws.String(route.Target), - IntegrationMethod: aws.String(route.Method), + ApiId: aws.String(apiID), + IntegrationType: apigwtypes.IntegrationTypeHttpProxy, + IntegrationUri: aws.String(route.Target), + IntegrationMethod: aws.String(route.Method), PayloadFormatVersion: aws.String("1.0"), }) if err != nil { diff --git a/module/platform_dns_backends.go b/module/platform_dns_backends.go index a9a0aff7..106b84a3 100644 --- a/module/platform_dns_backends.go +++ b/module/platform_dns_backends.go @@ -171,7 +171,7 @@ func (b *route53Backend) applyDNS(m *PlatformDNS) (*DNSState, error) { Name: aws.String(zone.Name), CallerReference: aws.String(fmt.Sprintf("workflow-%s", zone.Name)), HostedZoneConfig: &r53types.HostedZoneConfig{ - Comment: aws.String(zone.Comment), + Comment: aws.String(zone.Comment), PrivateZone: zone.Private, }, }) diff --git a/module/platform_do_app.go b/module/platform_do_app.go index 37e85b9b..ec630cc2 100644 --- a/module/platform_do_app.go +++ b/module/platform_do_app.go @@ -11,15 +11,15 @@ import ( // DOAppState holds the current state of a DigitalOcean App Platform app. type DOAppState struct { - ID string `json:"id"` - Name string `json:"name"` - Region string `json:"region"` - Status string `json:"status"` // pending, deploying, running, error, deleted - LiveURL string `json:"liveUrl"` - Instances int `json:"instances"` - Image string `json:"image"` - DeployedAt time.Time `json:"deployedAt"` - DeploymentID string `json:"deploymentId"` + ID string `json:"id"` + Name string `json:"name"` + Region string `json:"region"` + Status string `json:"status"` // pending, deploying, running, error, deleted + LiveURL string `json:"liveUrl"` + Instances int `json:"instances"` + Image string `json:"image"` + DeployedAt time.Time `json:"deployedAt"` + DeploymentID string `json:"deploymentId"` } // doAppBackend is the interface DO App Platform backends implement. @@ -198,10 +198,10 @@ func (m *PlatformDOApp) buildAppSpec() *godo.AppSpec { RegistryType: godo.ImageSourceSpecRegistryType_DockerHub, Repository: m.state.Image, }, - InstanceCount: int64(m.state.Instances), - InstanceSizeSlug: "basic-xxs", - HTTPPort: int64(m.httpPort()), - Envs: appEnvs, + InstanceCount: int64(m.state.Instances), + InstanceSizeSlug: "basic-xxs", + HTTPPort: int64(m.httpPort()), + Envs: appEnvs, }, }, } diff --git a/module/platform_do_networking.go b/module/platform_do_networking.go index 8f94f189..47ba86d5 100644 --- a/module/platform_do_networking.go +++ b/module/platform_do_networking.go @@ -22,15 +22,15 @@ type DOVPCState struct { // DOFirewallRule describes a single firewall rule (inbound or outbound). type DOFirewallRule struct { - Protocol string `json:"protocol"` // tcp, udp, icmp + Protocol string `json:"protocol"` // tcp, udp, icmp PortRange string `json:"portRange"` // e.g. "80" or "8000-9000" Sources string `json:"sources"` // CIDR, tag, or load_balancer_uid } // DOFirewallConfig describes a DigitalOcean firewall. type DOFirewallConfig struct { - Name string `json:"name"` - InboundRules []DOFirewallRule `json:"inboundRules"` + Name string `json:"name"` + InboundRules []DOFirewallRule `json:"inboundRules"` OutboundRules []DOFirewallRule `json:"outboundRules"` } @@ -257,9 +257,9 @@ func (b *doNetworkingRealBackend) plan(m *PlatformDONetworking) (*DONetworkPlan, func (b *doNetworkingRealBackend) apply(m *PlatformDONetworking) (*DOVPCState, error) { req := &godo.VPCCreateRequest{ - Name: m.state.Name, + Name: m.state.Name, RegionSlug: m.state.Region, - IPRange: m.state.IPRange, + IPRange: m.state.IPRange, } vpc, _, err := b.client.VPCs.Create(context.Background(), req) if err != nil { diff --git a/module/platform_ecs.go b/module/platform_ecs.go index 9b3aa9f0..f6870fcd 100644 --- a/module/platform_ecs.go +++ b/module/platform_ecs.go @@ -14,24 +14,24 @@ import ( // ECSServiceState holds the current state of a managed ECS service. type ECSServiceState struct { - Name string `json:"name"` - Cluster string `json:"cluster"` - Region string `json:"region"` - LaunchType string `json:"launchType"` - Status string `json:"status"` // pending, creating, running, deleting, deleted - DesiredCount int `json:"desiredCount"` - RunningCount int `json:"runningCount"` - TaskDefinition ECSTaskDefinition `json:"taskDefinition"` - LoadBalancer *ECSLoadBalancer `json:"loadBalancer,omitempty"` - CreatedAt time.Time `json:"createdAt"` + Name string `json:"name"` + Cluster string `json:"cluster"` + Region string `json:"region"` + LaunchType string `json:"launchType"` + Status string `json:"status"` // pending, creating, running, deleting, deleted + DesiredCount int `json:"desiredCount"` + RunningCount int `json:"runningCount"` + TaskDefinition ECSTaskDefinition `json:"taskDefinition"` + LoadBalancer *ECSLoadBalancer `json:"loadBalancer,omitempty"` + CreatedAt time.Time `json:"createdAt"` } // ECSTaskDefinition describes an ECS task definition. type ECSTaskDefinition struct { - Family string `json:"family"` - Revision int `json:"revision"` - CPU string `json:"cpu"` - Memory string `json:"memory"` + Family string `json:"family"` + Revision int `json:"revision"` + CPU string `json:"cpu"` + Memory string `json:"memory"` Containers []ECSContainer `json:"containers"` } diff --git a/module/platform_kubernetes_kind.go b/module/platform_kubernetes_kind.go index ea1d6991..7f2d693d 100644 --- a/module/platform_kubernetes_kind.go +++ b/module/platform_kubernetes_kind.go @@ -413,9 +413,9 @@ func (b *gkeBackend) apply(k *PlatformKubernetes) (*PlatformResult, error) { parent := fmt.Sprintf("projects/%s/locations/%s", project, location) req := &container.CreateClusterRequest{ Cluster: &container.Cluster{ - Name: k.clusterName(), + Name: k.clusterName(), InitialClusterVersion: version, - NodePools: nodePools, + NodePools: nodePools, }, } @@ -611,8 +611,8 @@ func (b *aksBackend) apply(k *PlatformKubernetes) (*PlatformResult, error) { } if creds.ClientID != "" { clusterBody["properties"].(map[string]any)["servicePrincipalProfile"] = map[string]any{ - "clientId": creds.ClientID, - "secret": creds.ClientSecret, + "clientId": creds.ClientID, + "secret": creds.ClientSecret, } } @@ -762,11 +762,11 @@ func (b *aksBackend) buildAgentPools(k *PlatformKubernetes) []map[string]any { groups := k.nodeGroups() if len(groups) == 0 { return []map[string]any{{ - "name": "default", - "count": 1, - "vmSize": "Standard_DS2_v2", - "mode": "System", - "osType": "Linux", + "name": "default", + "count": 1, + "vmSize": "Standard_DS2_v2", + "mode": "System", + "osType": "Linux", }} } var pools []map[string]any @@ -780,14 +780,14 @@ func (b *aksBackend) buildAgentPools(k *PlatformKubernetes) []map[string]any { mode = "System" } pools = append(pools, map[string]any{ - "name": ng.Name, - "count": ng.Min, - "minCount": ng.Min, - "maxCount": ng.Max, - "vmSize": vmSize, - "mode": mode, - "osType": "Linux", - "enableAutoScaling": true, + "name": ng.Name, + "count": ng.Min, + "minCount": ng.Min, + "maxCount": ng.Max, + "vmSize": vmSize, + "mode": mode, + "osType": "Linux", + "enableAutoScaling": true, }) } return pools diff --git a/module/platform_provider.go b/module/platform_provider.go index fbc6a2c8..125a7257 100644 --- a/module/platform_provider.go +++ b/module/platform_provider.go @@ -18,7 +18,7 @@ type PlatformPlan struct { // PlatformAction describes a single change within a plan. type PlatformAction struct { - Type string `json:"type"` // create, update, delete, noop + Type string `json:"type"` // create, update, delete, noop Resource string `json:"resource"` Detail string `json:"detail"` } diff --git a/module/policy_engine.go b/module/policy_engine.go index f2df66bb..f7ff06d0 100644 --- a/module/policy_engine.go +++ b/module/policy_engine.go @@ -140,8 +140,8 @@ func (e *mockPolicyEngine) Evaluate(_ context.Context, input map[string]any) (*P // Check input for explicit deny action. if action, _ := input["action"].(string); action == "deny" { return &PolicyDecision{ - Allowed: false, - Reasons: []string{"input action is deny"}, + Allowed: false, + Reasons: []string{"input action is deny"}, Metadata: map[string]any{"backend": "mock", "input": input}, }, nil } @@ -150,8 +150,8 @@ func (e *mockPolicyEngine) Evaluate(_ context.Context, input map[string]any) (*P for name, content := range e.policies { if containsString(content, "deny") { return &PolicyDecision{ - Allowed: false, - Reasons: []string{fmt.Sprintf("policy %q contains deny rule", name)}, + Allowed: false, + Reasons: []string{fmt.Sprintf("policy %q contains deny rule", name)}, Metadata: map[string]any{"backend": "mock", "matched_policy": name}, }, nil } @@ -175,4 +175,3 @@ func containsString(s, substr string) bool { return false }()) } - diff --git a/module/tracing_propagation.go b/module/tracing_propagation.go index cabf4343..ecc69551 100644 --- a/module/tracing_propagation.go +++ b/module/tracing_propagation.go @@ -34,8 +34,8 @@ func NewMapCarrier(m map[string]string) MapCarrier { return MapCarrier{m: m} } -func (c MapCarrier) Get(key string) string { return c.m[key] } -func (c MapCarrier) Set(key, value string) { c.m[key] = value } +func (c MapCarrier) Get(key string) string { return c.m[key] } +func (c MapCarrier) Set(key, value string) { c.m[key] = value } func (c MapCarrier) Keys() []string { keys := make([]string, 0, len(c.m)) for k := range c.m { diff --git a/plugins/pipelinesteps/plugin.go b/plugins/pipelinesteps/plugin.go index fe739edd..e906afd4 100644 --- a/plugins/pipelinesteps/plugin.go +++ b/plugins/pipelinesteps/plugin.go @@ -3,7 +3,8 @@ // http_call, request_parse, db_query, db_exec, json_response, // validate_path_param, validate_pagination, validate_request_body, // foreach, webhook_verify, ui_scaffold, ui_scaffold_analyze, -// dlq_send, dlq_replay, retry_with_backoff, circuit_breaker (wrapping). +// dlq_send, dlq_replay, retry_with_backoff, circuit_breaker (wrapping), +// s3_upload. // It also provides the PipelineWorkflowHandler for composable pipelines. package pipelinesteps @@ -80,6 +81,7 @@ func New() *Plugin { "step.dlq_replay", "step.retry_with_backoff", "step.resilient_circuit_breaker", + "step.s3_upload", }, WorkflowTypes: []string{"pipeline"}, Capabilities: []plugin.CapabilityDecl{ @@ -111,7 +113,7 @@ func (p *Plugin) StepFactories() map[string]plugin.StepFactory { "step.delegate": wrapStepFactory(module.NewDelegateStepFactory()), "step.jq": wrapStepFactory(module.NewJQStepFactory()), "step.publish": wrapStepFactory(module.NewPublishStepFactory()), - "step.event_publish": wrapStepFactory(module.NewEventPublishStepFactory()), + "step.event_publish": wrapStepFactory(module.NewEventPublishStepFactory()), "step.http_call": wrapStepFactory(module.NewHTTPCallStepFactory()), "step.request_parse": wrapStepFactory(module.NewRequestParseStepFactory()), "step.db_query": wrapStepFactory(module.NewDBQueryStepFactory()), @@ -125,7 +127,7 @@ func (p *Plugin) StepFactories() map[string]plugin.StepFactory { "step.foreach": wrapStepFactory(module.NewForEachStepFactory(func() *module.StepRegistry { return p.concreteStepRegistry })), - "step.webhook_verify": wrapStepFactory(module.NewWebhookVerifyStepFactory()), + "step.webhook_verify": wrapStepFactory(module.NewWebhookVerifyStepFactory()), "step.cache_get": wrapStepFactory(module.NewCacheGetStepFactory()), "step.cache_set": wrapStepFactory(module.NewCacheSetStepFactory()), "step.cache_delete": wrapStepFactory(module.NewCacheDeleteStepFactory()), @@ -140,6 +142,7 @@ func (p *Plugin) StepFactories() map[string]plugin.StepFactory { "step.resilient_circuit_breaker": wrapStepFactory(module.NewResilienceCircuitBreakerStepFactory(func() *module.StepRegistry { return p.concreteStepRegistry })), + "step.s3_upload": wrapStepFactory(module.NewS3UploadStepFactory()), } } diff --git a/plugins/pipelinesteps/plugin_test.go b/plugins/pipelinesteps/plugin_test.go index df833648..6876cf59 100644 --- a/plugins/pipelinesteps/plugin_test.go +++ b/plugins/pipelinesteps/plugin_test.go @@ -59,6 +59,7 @@ func TestStepFactories(t *testing.T) { "step.dlq_replay", "step.retry_with_backoff", "step.resilient_circuit_breaker", + "step.s3_upload", } for _, stepType := range expectedSteps { @@ -80,7 +81,7 @@ func TestPluginLoads(t *testing.T) { } steps := loader.StepFactories() - if len(steps) != 28 { - t.Fatalf("expected 28 step factories after load, got %d", len(steps)) + if len(steps) != 29 { + t.Fatalf("expected 29 step factories after load, got %d", len(steps)) } } From e895ca6bc560a8dda8cc6d14b73f469782758c88 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 27 Feb 2026 06:05:08 +0000 Subject: [PATCH 3/4] fix: add step.s3_upload to schema coreModuleTypes and module_schema registry Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --- schema/module_schema.go | 18 ++++++++++++++++++ schema/schema.go | 1 + 2 files changed, 19 insertions(+) diff --git a/schema/module_schema.go b/schema/module_schema.go index 017bb7fe..0108f352 100644 --- a/schema/module_schema.go +++ b/schema/module_schema.go @@ -1010,6 +1010,24 @@ func (r *ModuleSchemaRegistry) registerBuiltins() { }, }) + r.Register(&ModuleSchema{ + Type: "step.s3_upload", + Label: "S3 Upload", + Category: "pipeline", + Description: "Uploads base64-encoded binary data from the pipeline context to AWS S3 or S3-compatible storage (MinIO, LocalStack). Returns the public URL, resolved object key, and bucket name.", + Inputs: []ServiceIODef{{Name: "context", Type: "PipelineContext", Description: "Pipeline context with binary data (base64-encoded) and optional MIME type"}}, + Outputs: []ServiceIODef{{Name: "result", Type: "StepResult", Description: "Upload result with url, key, and bucket fields"}}, + ConfigFields: []ConfigFieldDef{ + {Key: "bucket", Label: "Bucket", Type: FieldTypeString, Required: true, Description: "S3 bucket name (supports ${ENV_VAR} expansion)", Placeholder: "${AVATAR_BUCKET}"}, + {Key: "region", Label: "Region", Type: FieldTypeString, Required: true, Description: "AWS region (supports ${ENV_VAR} expansion)", Placeholder: "${AWS_REGION}"}, + {Key: "key", Label: "Object Key", Type: FieldTypeString, Required: true, Description: "S3 object key (supports {{ .field }} templates and {{ uuid }})", Placeholder: "avatars/{{ .user_id }}/{{ uuid }}.{{ .ext }}"}, + {Key: "body_from", Label: "Body From", Type: FieldTypeString, Required: true, Description: "Dot-path to the base64-encoded binary data in the pipeline context (e.g. steps.parse.image_data)", Placeholder: "steps.parse.image_data"}, + {Key: "content_type_from", Label: "Content Type From", Type: FieldTypeString, Description: "Dot-path to the MIME type in the pipeline context (takes precedence over content_type)", Placeholder: "steps.parse.mime_type"}, + {Key: "content_type", Label: "Content Type", Type: FieldTypeString, Description: "Static MIME type for the uploaded object (e.g. image/png)", Placeholder: "image/png"}, + {Key: "endpoint", Label: "Endpoint", Type: FieldTypeString, Description: "Custom S3 endpoint URL for MinIO, LocalStack, or other S3-compatible storage (supports ${ENV_VAR} expansion). Leave empty for AWS.", Placeholder: "http://localhost:9000"}, + }, + }) + // ---- CI/CD Pipeline Steps Category ---- r.Register(&ModuleSchema{ diff --git a/schema/schema.go b/schema/schema.go index da0a492e..90ad0148 100644 --- a/schema/schema.go +++ b/schema/schema.go @@ -133,6 +133,7 @@ var coreModuleTypes = []string{ "step.publish", "step.rate_limit", "step.request_parse", + "step.s3_upload", "step.scan_container", "step.scan_deps", "step.scan_sast", From 6b088e291af859fee5fc19a5b29c9b1b6c6ef69d Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 27 Feb 2026 07:39:24 +0000 Subject: [PATCH 4/4] fix: update TestPluginLoads to use dynamic count after merge from main Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --- plugins/pipelinesteps/plugin_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugins/pipelinesteps/plugin_test.go b/plugins/pipelinesteps/plugin_test.go index c05502f7..05f9a6e2 100644 --- a/plugins/pipelinesteps/plugin_test.go +++ b/plugins/pipelinesteps/plugin_test.go @@ -83,7 +83,7 @@ func TestPluginLoads(t *testing.T) { } steps := loader.StepFactories() - if len(steps) != 29 { - t.Fatalf("expected 29 step factories after load, got %d", len(steps)) + if len(steps) != len(p.StepFactories()) { + t.Fatalf("expected %d step factories after load, got %d", len(p.StepFactories()), len(steps)) } }