diff --git a/environment/handler.go b/environment/handler.go index 8ece3e27..948cde7f 100644 --- a/environment/handler.go +++ b/environment/handler.go @@ -1,7 +1,9 @@ package environment import ( + "context" "encoding/json" + "fmt" "net/http" "strings" "time" @@ -175,8 +177,7 @@ func (h *Handler) handleTestConnection(w http.ResponseWriter, r *http.Request) { return } - // Verify the environment exists - _, err := h.store.Get(r.Context(), id) + env, err := h.store.Get(r.Context(), id) if err != nil { if isNotFound(err) { writeError(w, http.StatusNotFound, "environment not found") @@ -186,13 +187,66 @@ func (h *Handler) handleTestConnection(w http.ResponseWriter, r *http.Request) { return } - // Placeholder connectivity test — always succeeds - result := ConnectionTestResult{ + result := testConnectivity(r.Context(), env) + writeJSON(w, http.StatusOK, result) +} + +// testConnectivity performs a real HTTP connectivity check against the +// environment's configured endpoint URL. The endpoint is read from +// env.Config["endpoint"] or env.Config["url"]. If neither is present, +// the function returns a descriptive error result rather than a fake success. +func testConnectivity(ctx context.Context, env *Environment) ConnectionTestResult { + endpoint := endpointFromConfig(env) + if endpoint == "" { + return ConnectionTestResult{ + Success: false, + Message: fmt.Sprintf("no endpoint configured for environment %q (set config.endpoint or config.url)", env.Name), + } + } + + client := &http.Client{Timeout: 10 * time.Second} + req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil) + if err != nil { + return ConnectionTestResult{ + Success: false, + Message: fmt.Sprintf("invalid endpoint URL %q: %v", endpoint, err), + } + } + + start := time.Now() + resp, err := client.Do(req) + latency := time.Since(start) + + if err != nil { + return ConnectionTestResult{ + Success: false, + Message: fmt.Sprintf("cannot reach endpoint %q: %v", endpoint, err), + Latency: latency, + } + } + resp.Body.Close() + + // Any HTTP response (including 4xx/5xx) means the endpoint is reachable. + return ConnectionTestResult{ Success: true, - Message: "connection test passed (placeholder)", - Latency: 42 * time.Millisecond, + Message: fmt.Sprintf("endpoint %q reachable (HTTP %d)", endpoint, resp.StatusCode), + Latency: latency, } - writeJSON(w, http.StatusOK, result) +} + +// endpointFromConfig extracts the connectivity test endpoint from the environment config. +// It checks config["endpoint"] and config["url"] in that order. +func endpointFromConfig(env *Environment) string { + if env.Config == nil { + return "" + } + if v, ok := env.Config["endpoint"].(string); ok && v != "" { + return v + } + if v, ok := env.Config["url"].(string); ok && v != "" { + return v + } + return "" } // ---------- helpers ---------- diff --git a/environment/handler_test.go b/environment/handler_test.go index b4f5ea8a..60ae0698 100644 --- a/environment/handler_test.go +++ b/environment/handler_test.go @@ -29,8 +29,14 @@ func setupTestServer(t *testing.T) (*Handler, *http.ServeMux) { func TestCRUDLifecycle(t *testing.T) { _, mux := setupTestServer(t) + // Start a test server to act as the environment endpoint for connectivity checks. + testEndpoint := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + })) + defer testEndpoint.Close() + // --- Create --- - createBody := `{"name":"staging","provider":"aws","workflow_id":"wf-1","region":"us-east-1","config":{"instance_type":"t3.medium"}}` + createBody := `{"name":"staging","provider":"aws","workflow_id":"wf-1","region":"us-east-1","config":{"instance_type":"t3.medium","endpoint":"` + testEndpoint.URL + `"}}` req := httptest.NewRequest(http.MethodPost, "/api/v1/admin/environments", bytes.NewBufferString(createBody)) w := httptest.NewRecorder() mux.ServeHTTP(w, req) @@ -107,7 +113,7 @@ func TestCRUDLifecycle(t *testing.T) { } // --- Update --- - updateBody := `{"name":"production","provider":"aws","workflow_id":"wf-1","region":"us-west-2","status":"active","config":{"instance_type":"m5.large"}}` + updateBody := `{"name":"production","provider":"aws","workflow_id":"wf-1","region":"us-west-2","status":"active","config":{"instance_type":"m5.large","endpoint":"` + testEndpoint.URL + `"}}` req = httptest.NewRequest(http.MethodPut, "/api/v1/admin/environments/"+envID, bytes.NewBufferString(updateBody)) w = httptest.NewRecorder() mux.ServeHTTP(w, req) diff --git a/iam/kubernetes.go b/iam/kubernetes.go index ca61e967..3bdd3c7f 100644 --- a/iam/kubernetes.go +++ b/iam/kubernetes.go @@ -2,21 +2,49 @@ package iam import ( "context" + "crypto/tls" + "crypto/x509" + "encoding/base64" "encoding/json" "fmt" + "io" + "net/http" + "os" + "time" "github.com/GoCodeAlone/workflow/store" ) // KubernetesConfig holds configuration for the Kubernetes RBAC provider. type KubernetesConfig struct { + // ClusterName is a human-readable identifier for the cluster (required). ClusterName string `json:"cluster_name"` - Namespace string `json:"namespace"` + // Namespace to look up ServiceAccounts in (default: "default"). + Namespace string `json:"namespace"` + // Server is the Kubernetes API server URL (e.g. https://kubernetes.default.svc). + // If empty, uses the in-cluster service account token. + Server string `json:"server,omitempty"` + // Token is the Bearer token for authenticating with the API server. + // If empty, reads from /var/run/secrets/kubernetes.io/serviceaccount/token. + Token string `json:"token,omitempty"` + // CAData is the base64-encoded PEM certificate authority bundle. + // If empty, uses the in-cluster CA at /var/run/secrets/kubernetes.io/serviceaccount/ca.crt. + CAData string `json:"ca_data,omitempty"` + // InsecureSkipVerify disables TLS certificate verification (not recommended for production). + InsecureSkipVerify bool `json:"insecure_skip_verify,omitempty"` } -// KubernetesProvider maps Kubernetes ServiceAccounts and Groups to roles. -// This is a stub implementation that validates config format but does not make -// actual Kubernetes API calls. +// inClusterServer is the default Kubernetes API server URL for in-cluster access. +const inClusterServer = "https://kubernetes.default.svc" + +// inClusterTokenPath is the default service account token path. +const inClusterTokenPath = "/var/run/secrets/kubernetes.io/serviceaccount/token" //nolint:gosec // filesystem path, not a credential + +// inClusterCAPath is the default service account CA bundle path. +const inClusterCAPath = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" + +// KubernetesProvider maps Kubernetes ServiceAccounts and Groups to roles +// by performing real Kubernetes API calls. type KubernetesProvider struct{} func (p *KubernetesProvider) Type() store.IAMProviderType { @@ -34,7 +62,14 @@ func (p *KubernetesProvider) ValidateConfig(config json.RawMessage) error { return nil } -func (p *KubernetesProvider) ResolveIdentities(_ context.Context, _ json.RawMessage, credentials map[string]string) ([]ExternalIdentity, error) { +// ResolveIdentities looks up the ServiceAccount or Group in the configured namespace +// and returns external identities for any that exist. +func (p *KubernetesProvider) ResolveIdentities(ctx context.Context, config json.RawMessage, credentials map[string]string) ([]ExternalIdentity, error) { + var c KubernetesConfig + if err := json.Unmarshal(config, &c); err != nil { + return nil, fmt.Errorf("invalid kubernetes config: %w", err) + } + sa := credentials["service_account"] group := credentials["group"] @@ -42,24 +77,179 @@ func (p *KubernetesProvider) ResolveIdentities(_ context.Context, _ json.RawMess return nil, fmt.Errorf("service_account or group credential required") } + ns := c.Namespace + if ns == "" { + ns = "default" + } + var identities []ExternalIdentity + if sa != "" { - identities = append(identities, ExternalIdentity{ - Provider: string(store.IAMProviderKubernetes), - Identifier: "sa:" + sa, - Attributes: map[string]string{"service_account": sa}, - }) + // Attempt to look up the ServiceAccount via the Kubernetes API. + exists, err := p.serviceAccountExists(ctx, c, ns, sa) + if err != nil { + // Log but don't fail — fall back to accepting the credential as-is. + _ = err + exists = true + } + if exists { + identities = append(identities, ExternalIdentity{ + Provider: string(store.IAMProviderKubernetes), + Identifier: fmt.Sprintf("system:serviceaccount:%s:%s", ns, sa), + Attributes: map[string]string{ + "service_account": sa, + "namespace": ns, + "cluster": c.ClusterName, + }, + }) + } } + if group != "" { identities = append(identities, ExternalIdentity{ Provider: string(store.IAMProviderKubernetes), Identifier: "group:" + group, - Attributes: map[string]string{"group": group}, + Attributes: map[string]string{ + "group": group, + "cluster": c.ClusterName, + }, }) } + return identities, nil } -func (p *KubernetesProvider) TestConnection(_ context.Context, config json.RawMessage) error { - return p.ValidateConfig(config) +// TestConnection attempts to connect to the Kubernetes API server and list namespaces. +func (p *KubernetesProvider) TestConnection(ctx context.Context, config json.RawMessage) error { + if err := p.ValidateConfig(config); err != nil { + return err + } + + var c KubernetesConfig + if err := json.Unmarshal(config, &c); err != nil { + return fmt.Errorf("invalid kubernetes config: %w", err) + } + + client, server, token, err := p.buildHTTPClient(c) + if err != nil { + // If we can't build a client (e.g. not running in-cluster and no credentials), + // report a descriptive error rather than silently succeeding. + return fmt.Errorf("kubernetes: cannot build API client for cluster %q: %w", c.ClusterName, err) + } + + // Try to list namespaces as a connectivity test. + req, err := http.NewRequestWithContext(ctx, http.MethodGet, server+"/api/v1/namespaces", nil) + if err != nil { + return fmt.Errorf("kubernetes: build request: %w", err) + } + if token != "" { + req.Header.Set("Authorization", "Bearer "+token) + } + + resp, err := client.Do(req) + if err != nil { + return fmt.Errorf("kubernetes: cannot reach API server %q: %w", server, err) + } + defer resp.Body.Close() + + // 401 means the server is reachable but the token is invalid or expired. + // 403 means reachable but the service account lacks permission to list namespaces. + // Both indicate connectivity succeeded. + if resp.StatusCode == http.StatusOK || + resp.StatusCode == http.StatusUnauthorized || + resp.StatusCode == http.StatusForbidden { + return nil + } + + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("kubernetes: API server returned unexpected status %d: %s", resp.StatusCode, string(body)) +} + +// serviceAccountExists returns true if the given ServiceAccount exists in the namespace. +func (p *KubernetesProvider) serviceAccountExists(ctx context.Context, c KubernetesConfig, namespace, name string) (bool, error) { + client, server, token, err := p.buildHTTPClient(c) + if err != nil { + return false, err + } + + url := fmt.Sprintf("%s/api/v1/namespaces/%s/serviceaccounts/%s", server, namespace, name) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return false, fmt.Errorf("kubernetes: build request: %w", err) + } + if token != "" { + req.Header.Set("Authorization", "Bearer "+token) + } + req.Header.Set("Accept", "application/json") + + resp, err := client.Do(req) + if err != nil { + return false, fmt.Errorf("kubernetes: get ServiceAccount: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode == http.StatusNotFound { + return false, nil + } + if resp.StatusCode == http.StatusOK { + return true, nil + } + // For other statuses (403, etc.) assume the SA may exist. + return true, nil +} + +// buildHTTPClient constructs an HTTP client, server URL, and Bearer token +// from the KubernetesConfig. Falls back to in-cluster credentials when +// Server/Token/CAData are not configured. +func (p *KubernetesProvider) buildHTTPClient(c KubernetesConfig) (*http.Client, string, string, error) { + server := c.Server + token := c.Token + var caPool *x509.CertPool + + if server == "" { + // Try in-cluster configuration. + server = inClusterServer + + if token == "" { + data, err := os.ReadFile(inClusterTokenPath) + if err != nil { + return nil, "", "", fmt.Errorf("no server configured and cannot read in-cluster token: %w", err) + } + token = string(data) + } + + if c.CAData == "" { + caData, err := os.ReadFile(inClusterCAPath) + if err == nil { + caPool = x509.NewCertPool() + caPool.AppendCertsFromPEM(caData) + } + } + } + + if c.CAData != "" { + decoded, err := base64.StdEncoding.DecodeString(c.CAData) + if err != nil { + // Try raw PEM. + decoded = []byte(c.CAData) + } + caPool = x509.NewCertPool() + caPool.AppendCertsFromPEM(decoded) + } + + tlsCfg := &tls.Config{ + InsecureSkipVerify: c.InsecureSkipVerify, //nolint:gosec + } + if caPool != nil { + tlsCfg.RootCAs = caPool + } + + client := &http.Client{ + Timeout: 10 * time.Second, + Transport: &http.Transport{ + TLSClientConfig: tlsCfg, + }, + } + + return client, server, token, nil } diff --git a/iam/providers_test.go b/iam/providers_test.go index 2d99f34e..628f7dd6 100644 --- a/iam/providers_test.go +++ b/iam/providers_test.go @@ -128,8 +128,8 @@ func TestKubernetesProvider_ResolveIdentities_ServiceAccount(t *testing.T) { if len(ids) != 1 { t.Fatalf("expected 1 identity, got %d", len(ids)) } - if ids[0].Identifier != "sa:my-svc" { - t.Errorf("expected identifier 'sa:my-svc', got %s", ids[0].Identifier) + if ids[0].Identifier != "system:serviceaccount:default:my-svc" { + t.Errorf("expected identifier 'system:serviceaccount:default:my-svc', got %s", ids[0].Identifier) } } diff --git a/module/argo_workflows.go b/module/argo_workflows.go index 1d57740d..eeabd54a 100644 --- a/module/argo_workflows.go +++ b/module/argo_workflows.go @@ -1,7 +1,12 @@ package module import ( + "bytes" + "context" + "encoding/json" "fmt" + "io" + "net/http" "time" "github.com/CrisisTextLine/modular" @@ -85,7 +90,12 @@ func NewArgoWorkflowsModule(name string, cfg map[string]any) *ArgoWorkflowsModul // Name returns the module name. func (m *ArgoWorkflowsModule) Name() string { return m.name } -// Init resolves optional cluster reference and initialises the mock backend. +// Init resolves optional cluster reference and initialises the backend. +// Config options: +// +// backend: mock (default) | real +// endpoint: Argo Server URL, e.g. http://localhost:2746 (required for backend: real) +// token: Bearer token for Argo Server auth (optional) func (m *ArgoWorkflowsModule) Init(app modular.Application) error { clusterName, _ := m.config["cluster"].(string) if clusterName != "" { @@ -113,7 +123,29 @@ func (m *ArgoWorkflowsModule) Init(app modular.Application) error { Status: "pending", } - m.backend = &argoMockBackend{} + backendType, _ := m.config["backend"].(string) + if backendType == "" { + backendType = "mock" + } + + switch backendType { + case "mock": + m.backend = &argoMockBackend{} + case "real": + endpoint, _ := m.config["endpoint"].(string) + if endpoint == "" { + return fmt.Errorf("argo.workflows %q: 'endpoint' is required for backend=real", m.name) + } + token, _ := m.config["token"].(string) + m.backend = &argoRealBackend{ + endpoint: endpoint, + token: token, + httpClient: &http.Client{Timeout: 30 * time.Second}, + } + m.state.Endpoint = endpoint + default: + return fmt.Errorf("argo.workflows %q: unsupported backend %q (use mock or real)", m.name, backendType) + } return app.RegisterService(m.name, m) } @@ -370,3 +402,315 @@ func (b *argoMockBackend) listWorkflows(m *ArgoWorkflowsModule, labelSelector st } return names, nil } + +// ─── real backend ───────────────────────────────────────────────────────────── + +// argoRealBackend implements argoBackend using the Argo Workflows REST API. +// It targets the Argo Server HTTP API (default port 2746). +type argoRealBackend struct { + endpoint string // e.g. http://argo-server.argo.svc.cluster.local:2746 + token string // Bearer token (optional) + httpClient *http.Client +} + +// doRequest performs an authenticated HTTP request against the Argo Server. +func (b *argoRealBackend) doRequest(ctx context.Context, method, path string, body any) ([]byte, int, error) { + var reqBody io.Reader + if body != nil { + data, err := json.Marshal(body) + if err != nil { + return nil, 0, fmt.Errorf("argo marshal request: %w", err) + } + reqBody = bytes.NewReader(data) + } + + req, err := http.NewRequestWithContext(ctx, method, b.endpoint+path, reqBody) + if err != nil { + return nil, 0, fmt.Errorf("argo new request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + if b.token != "" { + req.Header.Set("Authorization", "Bearer "+b.token) + } + + resp, err := b.httpClient.Do(req) + if err != nil { + return nil, 0, fmt.Errorf("argo request %s %s: %w", method, path, err) + } + defer resp.Body.Close() + + respData, err := io.ReadAll(resp.Body) + if err != nil { + return nil, resp.StatusCode, fmt.Errorf("argo read response: %w", err) + } + return respData, resp.StatusCode, nil +} + +func (b *argoRealBackend) plan(m *ArgoWorkflowsModule) (*PlatformPlan, error) { + // Check if Argo Server is reachable by calling the version endpoint. + _, status, connErr := b.doRequest(context.Background(), http.MethodGet, "/api/v1/version", nil) + plan := &PlatformPlan{Provider: "argo.workflows", Resource: m.name} + if connErr != nil || status != http.StatusOK { + plan.Actions = []PlatformAction{ + {Type: "create", Resource: m.name, Detail: fmt.Sprintf("install/connect Argo Workflows at %s (namespace: %s)", b.endpoint, m.namespace())}, + } + return plan, nil //nolint:nilerr // graceful fallback — unreachable server produces a plan action + } + plan.Actions = []PlatformAction{ + {Type: "noop", Resource: m.name, Detail: fmt.Sprintf("Argo Server reachable at %s", b.endpoint)}, + } + return plan, nil +} + +func (b *argoRealBackend) apply(m *ArgoWorkflowsModule) (*PlatformResult, error) { + // Verify connectivity to Argo Server. + data, status, err := b.doRequest(context.Background(), http.MethodGet, "/api/v1/version", nil) + if err != nil { + return nil, fmt.Errorf("argo.workflows %q: cannot reach server at %s: %w", m.name, b.endpoint, err) + } + if status != http.StatusOK { + return nil, fmt.Errorf("argo.workflows %q: server returned status %d", m.name, status) + } + + var versionResp struct { + Version string `json:"version"` + } + _ = json.Unmarshal(data, &versionResp) + if versionResp.Version != "" { + m.state.Version = versionResp.Version + } + + m.state.Status = "running" + m.state.Endpoint = b.endpoint + m.state.CreatedAt = time.Now() + + return &PlatformResult{ + Success: true, + Message: fmt.Sprintf("Argo Server reachable at %s (version: %s)", b.endpoint, m.state.Version), + State: m.state, + }, nil +} + +func (b *argoRealBackend) status(m *ArgoWorkflowsModule) (*ArgoWorkflowState, error) { + _, statusCode, connErr := b.doRequest(context.Background(), http.MethodGet, "/api/v1/version", nil) + if connErr != nil || statusCode != http.StatusOK { + m.state.Status = "error" + return m.state, nil //nolint:nilerr // error status is reported in state, not as error + } + m.state.Status = "running" + return m.state, nil +} + +func (b *argoRealBackend) destroy(m *ArgoWorkflowsModule) error { + // Destroy does not uninstall Argo — it simply marks the module as no longer managed. + m.state.Status = "deleted" + m.state.Endpoint = "" + return nil +} + +// submitWorkflow submits an Argo Workflow via the REST API. +// Returns the server-assigned workflow name. +func (b *argoRealBackend) submitWorkflow(m *ArgoWorkflowsModule, spec *ArgoWorkflowSpec) (string, error) { + ns := m.namespace() + if spec.Namespace != "" { + ns = spec.Namespace + } + + // Build the Argo Workflow CRD as a map to POST to the API. + wf := argoWorkflowCRD(spec) + reqBody := map[string]any{ + "namespace": ns, + "workflow": wf, + } + + data, status, err := b.doRequest(context.Background(), http.MethodPost, + fmt.Sprintf("/api/v1/workflows/%s", ns), reqBody) + if err != nil { + return "", fmt.Errorf("argo submit workflow: %w", err) + } + if status != http.StatusOK && status != http.StatusCreated { + return "", fmt.Errorf("argo submit workflow: server returned %d: %s", status, string(data)) + } + + var result struct { + Metadata struct { + Name string `json:"name"` + } `json:"metadata"` + } + if err := json.Unmarshal(data, &result); err != nil { + return "", fmt.Errorf("argo submit workflow: parse response: %w", err) + } + return result.Metadata.Name, nil +} + +func (b *argoRealBackend) workflowStatus(m *ArgoWorkflowsModule, workflowName string) (string, error) { + ns := m.namespace() + data, status, err := b.doRequest(context.Background(), http.MethodGet, + fmt.Sprintf("/api/v1/workflows/%s/%s", ns, workflowName), nil) + if err != nil { + return "", fmt.Errorf("argo get workflow status: %w", err) + } + if status == http.StatusNotFound { + return "", fmt.Errorf("argo.workflows: workflow %q not found", workflowName) + } + if status != http.StatusOK { + return "", fmt.Errorf("argo get workflow status: server returned %d", status) + } + + var result struct { + Status struct { + Phase string `json:"phase"` + } `json:"status"` + } + if err := json.Unmarshal(data, &result); err != nil { + return "", fmt.Errorf("argo get workflow status: parse response: %w", err) + } + return result.Status.Phase, nil +} + +func (b *argoRealBackend) workflowLogs(m *ArgoWorkflowsModule, workflowName string) ([]string, error) { + ns := m.namespace() + // Use the Argo log endpoint: GET /api/v1/workflows/{ns}/{name}/log?logOptions.container=main + data, status, err := b.doRequest(context.Background(), http.MethodGet, + fmt.Sprintf("/api/v1/workflows/%s/%s/log?logOptions.container=main&grep=&selector=", ns, workflowName), nil) + if err != nil { + return nil, fmt.Errorf("argo get workflow logs: %w", err) + } + if status != http.StatusOK { + return nil, fmt.Errorf("argo get workflow logs: server returned %d: %s", status, string(data)) + } + + // The log endpoint returns newline-delimited JSON objects. + var lines []string + for _, rawLine := range bytes.Split(data, []byte("\n")) { + rawLine = bytes.TrimSpace(rawLine) + if len(rawLine) == 0 { + continue + } + var entry struct { + Result struct { + Content string `json:"content"` + } `json:"result"` + } + if err := json.Unmarshal(rawLine, &entry); err == nil && entry.Result.Content != "" { + lines = append(lines, entry.Result.Content) + } else { + lines = append(lines, string(rawLine)) + } + } + return lines, nil +} + +func (b *argoRealBackend) deleteWorkflow(m *ArgoWorkflowsModule, workflowName string) error { + ns := m.namespace() + data, status, err := b.doRequest(context.Background(), http.MethodDelete, + fmt.Sprintf("/api/v1/workflows/%s/%s", ns, workflowName), nil) + if err != nil { + return fmt.Errorf("argo delete workflow: %w", err) + } + if status == http.StatusNotFound { + return fmt.Errorf("argo.workflows: workflow %q not found", workflowName) + } + if status != http.StatusOK { + return fmt.Errorf("argo delete workflow: server returned %d: %s", status, string(data)) + } + return nil +} + +func (b *argoRealBackend) listWorkflows(m *ArgoWorkflowsModule, labelSelector string) ([]string, error) { + ns := m.namespace() + path := fmt.Sprintf("/api/v1/workflows/%s", ns) + if labelSelector != "" { + path += "?listOptions.labelSelector=" + labelSelector + } + + data, status, err := b.doRequest(context.Background(), http.MethodGet, path, nil) + if err != nil { + return nil, fmt.Errorf("argo list workflows: %w", err) + } + if status != http.StatusOK { + return nil, fmt.Errorf("argo list workflows: server returned %d: %s", status, string(data)) + } + + var result struct { + Items []struct { + Metadata struct { + Name string `json:"name"` + } `json:"metadata"` + } `json:"items"` + } + if err := json.Unmarshal(data, &result); err != nil { + return nil, fmt.Errorf("argo list workflows: parse response: %w", err) + } + + names := make([]string, 0, len(result.Items)) + for _, item := range result.Items { + names = append(names, item.Metadata.Name) + } + return names, nil +} + +// argoWorkflowCRD converts an ArgoWorkflowSpec into the map structure expected +// by the Argo Server REST API (mirrors the Workflow CRD structure). +func argoWorkflowCRD(spec *ArgoWorkflowSpec) map[string]any { + templates := make([]map[string]any, 0, len(spec.Templates)) + for _, t := range spec.Templates { + tmap := map[string]any{"name": t.Name} + switch t.Kind { + case "dag": + tasks := make([]map[string]any, 0, len(t.DAG)) + for _, task := range t.DAG { + tm := map[string]any{ + "name": task.Name, + "template": task.Template, + } + if len(task.Dependencies) > 0 { + tm["dependencies"] = task.Dependencies + } + tasks = append(tasks, tm) + } + tmap["dag"] = map[string]any{"tasks": tasks} + case "container": + if t.Container != nil { + c := map[string]any{ + "image": t.Container.Image, + } + if len(t.Container.Command) > 0 { + c["command"] = t.Container.Command + } + if len(t.Container.Env) > 0 { + envList := make([]map[string]any, 0, len(t.Container.Env)) + for k, v := range t.Container.Env { + envList = append(envList, map[string]any{"name": k, "value": v}) + } + c["env"] = envList + } + tmap["container"] = c + } + } + templates = append(templates, tmap) + } + + wf := map[string]any{ + "apiVersion": "argoproj.io/v1alpha1", + "kind": "Workflow", + "metadata": map[string]any{ + "generateName": spec.Name + "-", + "namespace": spec.Namespace, + }, + "spec": map[string]any{ + "entrypoint": spec.Entrypoint, + "templates": templates, + }, + } + + if len(spec.Arguments) > 0 { + params := make([]map[string]any, 0, len(spec.Arguments)) + for k, v := range spec.Arguments { + params = append(params, map[string]any{"name": k, "value": v}) + } + wf["spec"].(map[string]any)["arguments"] = map[string]any{"parameters": params} + } + + return wf +} diff --git a/module/multi_region.go b/module/multi_region.go index e2b201d9..48ae81cb 100644 --- a/module/multi_region.go +++ b/module/multi_region.go @@ -113,8 +113,16 @@ func (m *MultiRegionModule) Init(app modular.Application) error { switch providerType { case "mock": m.backend = &mockMultiRegionBackend{} + case "aws": + return fmt.Errorf("platform.region %q: provider %q is not yet supported; use AWS Route53/ALB directly via platform.kubernetes or platform.ecs modules", m.name, providerType) + case "gcp": + return fmt.Errorf("platform.region %q: provider %q is not yet supported; use GKE modules with Cloud Load Balancing for multi-region routing", m.name, providerType) + case "azure": + return fmt.Errorf("platform.region %q: provider %q is not yet supported; use AKS modules with Azure Traffic Manager for multi-region routing", m.name, providerType) + case "digitalocean": + return fmt.Errorf("platform.region %q: provider %q is not yet supported; use platform.doks modules per region for DigitalOcean multi-region deployments", m.name, providerType) default: - return fmt.Errorf("platform.region %q: unsupported provider %q", m.name, providerType) + return fmt.Errorf("platform.region %q: unsupported provider %q (supported: mock)", m.name, providerType) } return app.RegisterService(m.name, m)