From e92868f271d63fb796f976153fd076c44e341a3e Mon Sep 17 00:00:00 2001 From: Zachary Blasczyk Date: Thu, 5 Feb 2026 12:06:22 -0600 Subject: [PATCH 1/4] feat(tfc-job-agent): Init --- .../pkg/workspace/jobagents/registry.go | 2 +- .../workspace/jobagents/terraformcloud/tfe.go | 451 +++++++++++++++++- .../terraformcloud/tfe_verifications.go | 61 +++ .../test/e2e/engine_argocd_template_test.go | 298 ++++++++++++ 4 files changed, 807 insertions(+), 5 deletions(-) create mode 100644 apps/workspace-engine/pkg/workspace/jobagents/terraformcloud/tfe_verifications.go create mode 100644 apps/workspace-engine/test/e2e/engine_argocd_template_test.go diff --git a/apps/workspace-engine/pkg/workspace/jobagents/registry.go b/apps/workspace-engine/pkg/workspace/jobagents/registry.go index e46004ba5..8a445bc58 100644 --- a/apps/workspace-engine/pkg/workspace/jobagents/registry.go +++ b/apps/workspace-engine/pkg/workspace/jobagents/registry.go @@ -26,7 +26,7 @@ func NewRegistry(store *store.Store, verifications *verification.Manager) *Regis r.Register(testrunner.New(store)) r.Register(argo.NewArgoApplication(store, verifications)) - r.Register(terraformcloud.NewTFE(store)) + r.Register(terraformcloud.NewTFE(store, verifications)) r.Register(github.NewGithubAction(store)) return r diff --git a/apps/workspace-engine/pkg/workspace/jobagents/terraformcloud/tfe.go b/apps/workspace-engine/pkg/workspace/jobagents/terraformcloud/tfe.go index 8343f6f84..7f39e067d 100644 --- a/apps/workspace-engine/pkg/workspace/jobagents/terraformcloud/tfe.go +++ b/apps/workspace-engine/pkg/workspace/jobagents/terraformcloud/tfe.go @@ -1,19 +1,73 @@ package terraformcloud import ( + "bytes" "context" + "encoding/json" + "fmt" + "maps" + "strings" + "time" + "workspace-engine/pkg/config" + "workspace-engine/pkg/messaging" + "workspace-engine/pkg/messaging/confluent" + "workspace-engine/pkg/oapi" + "workspace-engine/pkg/templatefuncs" "workspace-engine/pkg/workspace/jobagents/types" + "workspace-engine/pkg/workspace/releasemanager/verification" "workspace-engine/pkg/workspace/store" + + confluentkafka "github.com/confluentinc/confluent-kafka-go/v2/kafka" + "github.com/hashicorp/go-tfe" + "sigs.k8s.io/yaml" ) var _ types.Dispatchable = &TFE{} type TFE struct { - store *store.Store + store *store.Store + verifications *verification.Manager +} + +type VCSRepoTemplate struct { + Identifier string `json:"identifier" yaml:"identifier"` + Branch string `json:"branch,omitempty" yaml:"branch,omitempty"` + OAuthTokenID string `json:"oauth_token_id,omitempty" yaml:"oauth_token_id,omitempty"` + IngressSubmodules bool `json:"ingress_submodules,omitempty" yaml:"ingress_submodules,omitempty"` + TagsRegex string `json:"tags_regex,omitempty" yaml:"tags_regex,omitempty"` +} + +type WorkspaceTemplate struct { + Name string `json:"name" yaml:"name"` + Description string `json:"description,omitempty" yaml:"description,omitempty"` + Project string `json:"project,omitempty" yaml:"project,omitempty"` + ExecutionMode string `json:"execution_mode,omitempty" yaml:"execution_mode,omitempty"` + AutoApply bool `json:"auto_apply,omitempty" yaml:"auto_apply,omitempty"` + AllowDestroyPlan bool `json:"allow_destroy_plan,omitempty" yaml:"allow_destroy_plan,omitempty"` + FileTriggersEnabled bool `json:"file_triggers_enabled,omitempty" yaml:"file_triggers_enabled,omitempty"` + GlobalRemoteState bool `json:"global_remote_state,omitempty" yaml:"global_remote_state,omitempty"` + QueueAllRuns bool `json:"queue_all_runs,omitempty" yaml:"queue_all_runs,omitempty"` + SpeculativeEnabled bool `json:"speculative_enabled,omitempty" yaml:"speculative_enabled,omitempty"` + TerraformVersion string `json:"terraform_version,omitempty" yaml:"terraform_version,omitempty"` + TriggerPrefixes []string `json:"trigger_prefixes,omitempty" yaml:"trigger_prefixes,omitempty"` + TriggerPatterns []string `json:"trigger_patterns,omitempty" yaml:"trigger_patterns,omitempty"` + WorkingDirectory string `json:"working_directory,omitempty" yaml:"working_directory,omitempty"` + AgentPoolID string `json:"agent_pool_id,omitempty" yaml:"agent_pool_id,omitempty"` + VCSRepo *VCSRepoTemplate `json:"vcs_repo,omitempty" yaml:"vcs_repo,omitempty"` + Variables []VariableTemplate `json:"variables,omitempty" yaml:"variables,omitempty"` +} + +type VariableTemplate struct { + Key string `json:"key" yaml:"key"` + Value string `json:"value,omitempty" yaml:"value,omitempty"` + Description string `json:"description,omitempty" yaml:"description,omitempty"` + Category string `json:"category" yaml:"category"` + HCL bool `json:"hcl,omitempty" yaml:"hcl,omitempty"` + Sensitive bool `json:"sensitive,omitempty" yaml:"sensitive,omitempty"` } -func NewTFE(store *store.Store) *TFE { - return &TFE{store: store} +func NewTFE(store *store.Store, verifications *verification.Manager) *TFE { + return &TFE{store: store, verifications: verifications} } func (t *TFE) Type() string { @@ -27,6 +81,395 @@ func (t *TFE) Supports() types.Capabilities { } } -func (t *TFE) Dispatch(ctx context.Context, context types.DispatchContext) error { +func (t *TFE) Dispatch(ctx context.Context, dispatchCtx types.DispatchContext) error { + address, token, organization, template, err := t.parseJobAgentConfig(dispatchCtx.JobAgentConfig) + if err != nil { + return fmt.Errorf("failed to parse job agent config: %w", err) + } + + workspace, err := t.getTemplatedWorkspace(dispatchCtx.Job, template) + if err != nil { + return fmt.Errorf("failed to generate workspace from template: %w", err) + } + + go func() { + ctx := context.WithoutCancel(ctx) + client, err := t.getClient(address, token) + if err != nil { + t.sendJobFailureEvent(dispatchCtx, fmt.Sprintf("failed to create Terraform Cloud client: %s", err.Error())) + return + } + + targetWorkspace, err := t.upsertWorkspace(ctx, client, organization, workspace) + if err != nil { + t.sendJobFailureEvent(dispatchCtx, fmt.Sprintf("failed to upsert workspace: %s", err.Error())) + return + } + + if len(workspace.Variables) > 0 { + if err := t.syncVariables(ctx, client, targetWorkspace.ID, workspace.Variables); err != nil { + t.sendJobFailureEvent(dispatchCtx, fmt.Sprintf("failed to sync variables: %s", err.Error())) + return + } + } + + run, err := t.createRun(ctx, client, targetWorkspace.ID, dispatchCtx.Job.Id) + if err != nil { + t.sendJobFailureEvent(dispatchCtx, fmt.Sprintf("failed to create run: %s", err.Error())) + return + } + + verification := newTFERunVerification(t.verifications, dispatchCtx.Job, address, token, run.ID) + if err := verification.StartVerification(ctx); err != nil { + t.sendJobFailureEvent(dispatchCtx, fmt.Sprintf("failed to start verification: %s", err.Error())) + return + } + + t.sendJobUpdateEvent(address, organization, targetWorkspace.Name, run, dispatchCtx) + }() + + return nil +} + +func (t *TFE) parseJobAgentConfig(jobAgentConfig oapi.JobAgentConfig) (string, string, string, string, error) { + address, ok := jobAgentConfig["address"].(string) + if !ok { + return "", "", "", "", fmt.Errorf("address is required") + } + token, ok := jobAgentConfig["token"].(string) + if !ok { + return "", "", "", "", fmt.Errorf("token is required") + } + organization, ok := jobAgentConfig["organization"].(string) + if !ok { + return "", "", "", "", fmt.Errorf("organization is required") + } + template, ok := jobAgentConfig["template"].(string) + if !ok { + return "", "", "", "", fmt.Errorf("template is required") + } + if address == "" || token == "" || organization == "" || template == "" { + return "", "", "", "", fmt.Errorf("missing required fields in job agent config") + } + return address, token, organization, template, nil +} + +func (t *TFE) getClient(address, token string) (*tfe.Client, error) { + client, err := tfe.NewClient(&tfe.Config{ + Address: address, + Token: token, + }) + if err != nil { + return nil, fmt.Errorf("failed to create Terraform Cloud client: %w", err) + } + return client, nil +} + +func (t *TFE) getTemplatableJob(job *oapi.Job) (*oapi.TemplatableJob, error) { + fullJob, err := t.store.Jobs.GetWithRelease(job.Id) + if err != nil { + return nil, err + } + return fullJob.ToTemplatable() +} + +func (t *TFE) getTemplatedWorkspace(job *oapi.Job, template string) (*WorkspaceTemplate, error) { + templatableJob, err := t.getTemplatableJob(job) + if err != nil { + return nil, fmt.Errorf("failed to get templatable job: %w", err) + } + tmpl, err := templatefuncs.Parse("terraformWorkspaceTemplate", template) + if err != nil { + return nil, fmt.Errorf("failed to parse template: %w", err) + } + var buf bytes.Buffer + if err := tmpl.Execute(&buf, templatableJob.Map()); err != nil { + return nil, fmt.Errorf("failed to execute template: %w", err) + } + + var workspace WorkspaceTemplate + if err := yaml.Unmarshal(buf.Bytes(), &workspace); err != nil { + return nil, fmt.Errorf("failed to unmarshal workspace: %w", err) + } + return &workspace, nil +} + +func (t *TFE) upsertWorkspace(ctx context.Context, client *tfe.Client, organization string, workspace *WorkspaceTemplate) (*tfe.Workspace, error) { + existing, err := client.Workspaces.Read(ctx, organization, workspace.Name) + if err != nil && err.Error() != "resource not found" { + return nil, fmt.Errorf("failed to read workspace: %w", err) + } + + if existing == nil { + created, err := client.Workspaces.Create(ctx, organization, workspace.toCreateOptions()) + if err != nil { + return nil, fmt.Errorf("failed to create workspace: %w", err) + } + return created, nil + } + + updated, err := client.Workspaces.UpdateByID(ctx, existing.ID, workspace.toUpdateOptions()) + if err != nil { + return nil, fmt.Errorf("failed to update workspace: %w", err) + } + return updated, nil +} + +func (t *TFE) syncVariables(ctx context.Context, client *tfe.Client, workspaceID string, desiredVars []VariableTemplate) error { + existingVars, err := client.Variables.List(ctx, workspaceID, nil) + if err != nil { + return fmt.Errorf("failed to list variables: %w", err) + } + + existingByKey := make(map[string]*tfe.Variable) + for _, v := range existingVars.Items { + existingByKey[v.Key] = v + } + + for _, desired := range desiredVars { + if existing, ok := existingByKey[desired.Key]; ok { + _, err := client.Variables.Update(ctx, workspaceID, existing.ID, desired.toUpdateOptions()) + if err != nil { + return fmt.Errorf("failed to update variable %s: %w", desired.Key, err) + } + } else { + _, err := client.Variables.Create(ctx, workspaceID, desired.toCreateOptions()) + if err != nil { + return fmt.Errorf("failed to create variable %s: %w", desired.Key, err) + } + } + } + + return nil +} + +func (t *TFE) createRun(ctx context.Context, client *tfe.Client, workspaceID, jobID string) (*tfe.Run, error) { + autoApply := true + message := fmt.Sprintf("Triggered by ctrlplane job %s", jobID) + run, err := client.Runs.Create(ctx, tfe.RunCreateOptions{ + Workspace: &tfe.Workspace{ID: workspaceID}, + Message: &message, + AutoApply: &autoApply, + }) + if err != nil { + return nil, fmt.Errorf("failed to create run: %w", err) + } + return run, nil +} + +func (t *TFE) sendJobFailureEvent(context types.DispatchContext, message string) error { + workspaceId := t.store.ID() + + now := time.Now().UTC() + eventPayload := oapi.JobUpdateEvent{ + Id: &context.Job.Id, + Job: oapi.Job{ + Id: context.Job.Id, + Status: oapi.JobStatusFailure, + Message: &message, + UpdatedAt: now, + CompletedAt: &now, + }, + FieldsToUpdate: &[]oapi.JobUpdateEventFieldsToUpdate{ + oapi.JobUpdateEventFieldsToUpdateStatus, + oapi.JobUpdateEventFieldsToUpdateMessage, + oapi.JobUpdateEventFieldsToUpdateCompletedAt, + oapi.JobUpdateEventFieldsToUpdateUpdatedAt, + }, + } + producer, err := t.getKafkaProducer() + if err != nil { + return fmt.Errorf("failed to create Kafka producer: %w", err) + } + defer producer.Close() + + event := map[string]any{ + "eventType": "job.updated", + "workspaceId": workspaceId, + "data": eventPayload, + "timestamp": time.Now().Unix(), + } + eventBytes, err := json.Marshal(event) + if err != nil { + return fmt.Errorf("failed to marshal event: %w", err) + } + if err := producer.Publish([]byte(workspaceId), eventBytes); err != nil { + return fmt.Errorf("failed to publish event: %w", err) + } + return nil +} + +func (t *TFE) sendJobUpdateEvent(address, organization, workspaceName string, run *tfe.Run, context types.DispatchContext) error { + workspaceId := t.store.ID() + + runUrl := fmt.Sprintf("%s/app/%s/workspaces/%s/runs/%s", address, organization, workspaceName, run.ID) + if !strings.HasPrefix(runUrl, "https://") { + runUrl = "https://" + runUrl + } + + workspaceUrl := fmt.Sprintf("%s/app/%s/workspaces/%s", address, organization, workspaceName) + if !strings.HasPrefix(workspaceUrl, "https://") { + workspaceUrl = "https://" + workspaceUrl + } + + links := make(map[string]string) + links["TFE Run"] = runUrl + links["TFE Workspace"] = workspaceUrl + linksJSON, err := json.Marshal(links) + if err != nil { + return fmt.Errorf("failed to marshal links: %w", err) + } + + newJobMetadata := make(map[string]string) + maps.Copy(newJobMetadata, context.Job.Metadata) + newJobMetadata[string("ctrlplane/links")] = string(linksJSON) + + now := time.Now().UTC() + eventPayload := oapi.JobUpdateEvent{ + Id: &context.Job.Id, + Job: oapi.Job{ + Id: context.Job.Id, + Metadata: newJobMetadata, + Status: oapi.JobStatusSuccessful, + UpdatedAt: now, + CompletedAt: &now, + }, + FieldsToUpdate: &[]oapi.JobUpdateEventFieldsToUpdate{ + oapi.JobUpdateEventFieldsToUpdateStatus, + oapi.JobUpdateEventFieldsToUpdateMetadata, + oapi.JobUpdateEventFieldsToUpdateCompletedAt, + oapi.JobUpdateEventFieldsToUpdateUpdatedAt, + }, + } + producer, err := t.getKafkaProducer() + if err != nil { + return fmt.Errorf("failed to create Kafka producer: %w", err) + } + defer producer.Close() + + event := map[string]any{ + "eventType": "job.updated", + "workspaceId": workspaceId, + "data": eventPayload, + "timestamp": time.Now().Unix(), + } + eventBytes, err := json.Marshal(event) + if err != nil { + return fmt.Errorf("failed to marshal event: %w", err) + } + if err := producer.Publish([]byte(workspaceId), eventBytes); err != nil { + return fmt.Errorf("failed to publish event: %w", err) + } return nil } + +func (t *TFE) getKafkaProducer() (messaging.Producer, error) { + return confluent.NewConfluent(config.Global.KafkaBrokers).CreateProducer(config.Global.KafkaTopic, &confluentkafka.ConfigMap{ + "bootstrap.servers": config.Global.KafkaBrokers, + "enable.idempotence": true, + "compression.type": "snappy", + "message.send.max.retries": 10, + "retry.backoff.ms": 100, + }) +} + +func (w *WorkspaceTemplate) toCreateOptions() tfe.WorkspaceCreateOptions { + opts := tfe.WorkspaceCreateOptions{ + Name: &w.Name, + Description: &w.Description, + AutoApply: &w.AutoApply, + AllowDestroyPlan: &w.AllowDestroyPlan, + FileTriggersEnabled: &w.FileTriggersEnabled, + GlobalRemoteState: &w.GlobalRemoteState, + QueueAllRuns: &w.QueueAllRuns, + SpeculativeEnabled: &w.SpeculativeEnabled, + TriggerPrefixes: w.TriggerPrefixes, + TriggerPatterns: w.TriggerPatterns, + WorkingDirectory: &w.WorkingDirectory, + } + + if w.Project != "" { + opts.Project = &tfe.Project{ID: w.Project} + } + if w.ExecutionMode != "" { + opts.ExecutionMode = &w.ExecutionMode + } + if w.TerraformVersion != "" { + opts.TerraformVersion = &w.TerraformVersion + } + if w.AgentPoolID != "" { + opts.AgentPoolID = &w.AgentPoolID + } + if w.VCSRepo != nil && w.VCSRepo.Identifier != "" { + opts.VCSRepo = &tfe.VCSRepoOptions{ + Identifier: &w.VCSRepo.Identifier, + Branch: &w.VCSRepo.Branch, + OAuthTokenID: &w.VCSRepo.OAuthTokenID, + IngressSubmodules: &w.VCSRepo.IngressSubmodules, + TagsRegex: &w.VCSRepo.TagsRegex, + } + } + + return opts +} + +func (w *WorkspaceTemplate) toUpdateOptions() tfe.WorkspaceUpdateOptions { + opts := tfe.WorkspaceUpdateOptions{ + Name: &w.Name, + Description: &w.Description, + AutoApply: &w.AutoApply, + AllowDestroyPlan: &w.AllowDestroyPlan, + FileTriggersEnabled: &w.FileTriggersEnabled, + GlobalRemoteState: &w.GlobalRemoteState, + QueueAllRuns: &w.QueueAllRuns, + SpeculativeEnabled: &w.SpeculativeEnabled, + TriggerPrefixes: w.TriggerPrefixes, + TriggerPatterns: w.TriggerPatterns, + WorkingDirectory: &w.WorkingDirectory, + } + + if w.ExecutionMode != "" { + opts.ExecutionMode = &w.ExecutionMode + } + if w.TerraformVersion != "" { + opts.TerraformVersion = &w.TerraformVersion + } + if w.AgentPoolID != "" { + opts.AgentPoolID = &w.AgentPoolID + } + if w.VCSRepo != nil && w.VCSRepo.Identifier != "" { + opts.VCSRepo = &tfe.VCSRepoOptions{ + Identifier: &w.VCSRepo.Identifier, + Branch: &w.VCSRepo.Branch, + OAuthTokenID: &w.VCSRepo.OAuthTokenID, + IngressSubmodules: &w.VCSRepo.IngressSubmodules, + TagsRegex: &w.VCSRepo.TagsRegex, + } + } + + return opts +} + +func (v *VariableTemplate) toCreateOptions() tfe.VariableCreateOptions { + category := tfe.CategoryType(v.Category) + return tfe.VariableCreateOptions{ + Key: &v.Key, + Value: &v.Value, + Description: &v.Description, + Category: &category, + HCL: &v.HCL, + Sensitive: &v.Sensitive, + } +} + +func (v *VariableTemplate) toUpdateOptions() tfe.VariableUpdateOptions { + category := tfe.CategoryType(v.Category) + return tfe.VariableUpdateOptions{ + Key: &v.Key, + Value: &v.Value, + Description: &v.Description, + Category: &category, + HCL: &v.HCL, + Sensitive: &v.Sensitive, + } +} diff --git a/apps/workspace-engine/pkg/workspace/jobagents/terraformcloud/tfe_verifications.go b/apps/workspace-engine/pkg/workspace/jobagents/terraformcloud/tfe_verifications.go new file mode 100644 index 000000000..ab07a8842 --- /dev/null +++ b/apps/workspace-engine/pkg/workspace/jobagents/terraformcloud/tfe_verifications.go @@ -0,0 +1,61 @@ +package terraformcloud + +import ( + "context" + "fmt" + "workspace-engine/pkg/oapi" + "workspace-engine/pkg/workspace/releasemanager/verification" +) + +type TFERunVerification struct { + verifications *verification.Manager + job *oapi.Job + address string + token string + runID string +} + +func newTFERunVerification(verifications *verification.Manager, job *oapi.Job, address, token, runID string) *TFERunVerification { + return &TFERunVerification{ + verifications: verifications, + job: job, + address: address, + token: token, + runID: runID, + } +} + +func (v *TFERunVerification) StartVerification(ctx context.Context) error { + provider, err := v.buildMetricProvider() + if err != nil { + return fmt.Errorf("failed to build metric provider: %w", err) + } + + metricSpec := v.buildMetricSpec(provider) + return v.verifications.StartVerification(ctx, v.job, []oapi.VerificationMetricSpec{metricSpec}) +} + +func (v *TFERunVerification) buildMetricProvider() (oapi.MetricProvider, error) { + provider := oapi.MetricProvider{} + err := provider.FromTerraformCloudRunMetricProvider(oapi.TerraformCloudRunMetricProvider{ + Address: v.address, + Token: v.token, + RunId: v.runID, + }) + return provider, err +} + +func (v *TFERunVerification) buildMetricSpec(provider oapi.MetricProvider) oapi.VerificationMetricSpec { + failureCondition := "result.status == 'canceled' || result.status == 'discarded' || result.status == 'errored'" + successThreshold := 1 + failureThreshold := 1 + return oapi.VerificationMetricSpec{ + Count: 100, + IntervalSeconds: 60, + SuccessCondition: "result.status == 'applied' || result.status == 'planned_and_finished' || result.status == 'planned_and_saved'", + FailureCondition: &failureCondition, + SuccessThreshold: &successThreshold, + FailureThreshold: &failureThreshold, + Provider: provider, + } +} diff --git a/apps/workspace-engine/test/e2e/engine_argocd_template_test.go b/apps/workspace-engine/test/e2e/engine_argocd_template_test.go new file mode 100644 index 000000000..891f2d23d --- /dev/null +++ b/apps/workspace-engine/test/e2e/engine_argocd_template_test.go @@ -0,0 +1,298 @@ +package e2e + +import ( + "context" + "encoding/json" + "testing" + "workspace-engine/pkg/events/handler" + "workspace-engine/pkg/oapi" + "workspace-engine/test/integration" + c "workspace-engine/test/integration/creators" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestEngine_ArgoCD_TemplatePreservedInJobFlow traces the full ArgoCD template flow +// from DeploymentVersion creation through to Job creation. +// This verifies that the template field in JobAgentConfig is preserved at each step. +func TestEngine_ArgoCD_TemplatePreservedInJobFlow(t *testing.T) { + jobAgentId := uuid.New().String() + deploymentId := uuid.New().String() + environmentId := uuid.New().String() + resourceId := uuid.New().String() + versionId := uuid.New().String() + + // The template exactly as it would come from the CLI (job-agent-config.json) + argoCDTemplate := `--- +apiVersion: argoproj.io/v1alpha1 +kind: Application +metadata: + name: '{{.Resource.Name}}-console' + namespace: argocd + labels: + app.kubernetes.io/name: console + environment: '{{.Environment.Name}}' + deployment: console + resource: '{{.Resource.Name}}' +spec: + project: default + source: + repoURL: git@github.com:wandb/deployments.git + path: wandb/console + targetRevision: '{{.Release.Version.Tag}}' + helm: + releaseName: console + destination: + name: '{{.Resource.Identifier}}' + namespace: default + syncPolicy: + automated: + prune: true + selfHeal: true + syncOptions: + - CreateNamespace=true +` + + // Create ArgoCD job agent config + argoCDJobAgentConfig := map[string]any{ + "type": "argo-cd", + "serverUrl": "argocd.wandb.dev", + "apiKey": "test-api-key", + } + + engine := integration.NewTestWorkspace(t, + integration.WithJobAgent( + integration.JobAgentID(jobAgentId), + integration.JobAgentName("ArgoCD Agent"), + integration.JobAgentType("argo-cd"), + integration.JobAgentConfig(argoCDJobAgentConfig), + ), + integration.WithSystem( + integration.SystemName("test-system"), + integration.WithDeployment( + integration.DeploymentID(deploymentId), + integration.DeploymentName("console"), + integration.DeploymentJobAgent(jobAgentId), + integration.DeploymentCelResourceSelector("true"), + ), + integration.WithEnvironment( + integration.EnvironmentID(environmentId), + integration.EnvironmentName("production"), + integration.EnvironmentCelResourceSelector("true"), + ), + ), + integration.WithResource( + integration.ResourceID(resourceId), + integration.ResourceName("wandb-vashe-awstest"), + integration.ResourceIdentifier("wandb-vashe-awstest-cluster"), + integration.ResourceKind("kubernetes"), + ), + ) + + ctx := context.Background() + + // Verify release target was created + releaseTargets, err := engine.Workspace().ReleaseTargets().Items() + require.NoError(t, err) + require.Len(t, releaseTargets, 1, "expected 1 release target") + + // Create deployment version with ArgoCD template in JobAgentConfig + // This simulates what the CLI does: ctrlc api upsert version --job-agent-config-file job-agent-config.json + versionJobAgentConfig := map[string]any{ + "type": "argo-cd", + "template": argoCDTemplate, + } + + dv := c.NewDeploymentVersion() + dv.Id = versionId + dv.DeploymentId = deploymentId + dv.Tag = "v1.0.0" + dv.JobAgentConfig = versionJobAgentConfig + + // Log the version's JobAgentConfig before pushing the event + t.Logf("=== STEP 1: Version JobAgentConfig before event push ===") + dvConfigBytes, _ := json.MarshalIndent(dv.JobAgentConfig, "", " ") + t.Logf("DeploymentVersion.JobAgentConfig:\n%s", string(dvConfigBytes)) + + // Check template presence in version before event + if template, ok := dv.JobAgentConfig["template"]; ok { + t.Logf("✓ Template present in version before event push (length: %d)", len(template.(string))) + } else { + t.Errorf("✗ Template NOT present in version before event push") + } + + engine.PushEvent(ctx, handler.DeploymentVersionCreate, dv) + + // STEP 2: Check the stored version + t.Logf("=== STEP 2: Checking stored version ===") + storedVersion, found := engine.Workspace().DeploymentVersions().Get(versionId) + require.True(t, found, "stored version not found") + + storedConfigBytes, _ := json.MarshalIndent(storedVersion.JobAgentConfig, "", " ") + t.Logf("Stored version JobAgentConfig:\n%s", string(storedConfigBytes)) + + if template, ok := storedVersion.JobAgentConfig["template"]; ok { + t.Logf("✓ Template present in stored version (length: %d)", len(template.(string))) + } else { + t.Errorf("✗ Template NOT present in stored version - THIS IS WHERE IT'S LOST") + } + + // STEP 3: Check pending jobs + t.Logf("=== STEP 3: Checking pending jobs ===") + pendingJobs := engine.Workspace().Jobs().GetPending() + require.Len(t, pendingJobs, 1, "expected 1 pending job") + + var job *oapi.Job + for _, j := range pendingJobs { + job = j + break + } + + // Get the release for this job + release, found := engine.Workspace().Releases().Get(job.ReleaseId) + require.True(t, found, "release not found for job") + + // STEP 4: Check the release's version JobAgentConfig + t.Logf("=== STEP 4: Checking release's version ===") + releaseVersionConfigBytes, _ := json.MarshalIndent(release.Version.JobAgentConfig, "", " ") + t.Logf("Release.Version.JobAgentConfig:\n%s", string(releaseVersionConfigBytes)) + + if template, ok := release.Version.JobAgentConfig["template"]; ok { + t.Logf("✓ Template present in release.Version (length: %d)", len(template.(string))) + } else { + t.Errorf("✗ Template NOT present in release.Version") + } + + // STEP 5: Check the job's merged config + t.Logf("=== STEP 5: Checking job's merged config ===") + jobConfigBytes, _ := job.JobAgentConfig.MarshalJSON() + t.Logf("Job.JobAgentConfig:\n%s", string(jobConfigBytes)) + + // Try to get it as ArgoCD config + argoCDConfig, err := job.JobAgentConfig.AsFullArgoCDJobAgentConfig() + if err != nil { + t.Logf("Could not parse as ArgoCD config: %v", err) + // Try as custom config + customConfig, err := job.JobAgentConfig.AsFullCustomJobAgentConfig() + if err != nil { + t.Errorf("Could not parse job config as either ArgoCD or Custom: %v", err) + } else { + t.Logf("Parsed as custom config: %+v", customConfig) + if template, ok := customConfig.AdditionalProperties["template"]; ok { + t.Logf("✓ Template found in custom config (length: %d)", len(template.(string))) + } else { + t.Errorf("✗ Template NOT found in job config (custom)") + } + } + } else { + t.Logf("Parsed as ArgoCD config:") + t.Logf(" - Type: %s", argoCDConfig.Type) + t.Logf(" - ServerUrl: %s", argoCDConfig.ServerUrl) + t.Logf(" - Template length: %d", len(argoCDConfig.Template)) + + if argoCDConfig.Template != "" { + t.Logf("✓ Template present in job's ArgoCD config") + assert.Contains(t, argoCDConfig.Template, "{{.Resource.Name}}-console", "template should contain resource name placeholder") + } else { + t.Errorf("✗ Template is EMPTY in job's ArgoCD config - THIS IS THE BUG") + } + } + + // Final assertions + assert.Equal(t, oapi.JobStatusPending, job.Status) + assert.Equal(t, jobAgentId, job.JobAgentId) +} + +// TestEngine_ArgoCD_VersionJobAgentConfigPreservedThroughEventHandler tests that +// the JobAgentConfig is preserved when the event is handled by the workspace engine. +// This specifically tests the JSON marshaling/unmarshaling in the event handler. +func TestEngine_ArgoCD_VersionJobAgentConfigPreservedThroughEventHandler(t *testing.T) { + // Create the version data exactly as it would come from the API/CLI + versionData := map[string]any{ + "id": uuid.New().String(), + "deploymentId": uuid.New().String(), + "tag": "v1.0.0", + "name": "test-version", + "status": "ready", + "config": map[string]any{}, + "metadata": map[string]string{}, + "createdAt": "2024-01-01T00:00:00Z", + "jobAgentConfig": map[string]any{ + "type": "argo-cd", + "template": "apiVersion: argoproj.io/v1alpha1\nkind: Application\nmetadata:\n name: '{{.Resource.Name}}'", + }, + } + + // Marshal to JSON (simulating what comes over the wire) + jsonData, err := json.Marshal(versionData) + require.NoError(t, err) + + t.Logf("JSON event data:\n%s", string(jsonData)) + + // Unmarshal into DeploymentVersion (simulating event handler) + var dv oapi.DeploymentVersion + err = json.Unmarshal(jsonData, &dv) + require.NoError(t, err) + + t.Logf("Unmarshaled version JobAgentConfig: %+v", dv.JobAgentConfig) + + // Check template is preserved + template, ok := dv.JobAgentConfig["template"] + require.True(t, ok, "template field should be present in JobAgentConfig") + assert.Contains(t, template.(string), "{{.Resource.Name}}", "template should contain resource name placeholder") +} + +// TestEngine_ArgoCD_VersionJobAgentConfigWithExactCLIFormat tests the exact format +// that the CLI sends when using --job-agent-config-file flag. +// This simulates: ctrlc api upsert version --job-agent-config-file job-agent-config.json +func TestEngine_ArgoCD_VersionJobAgentConfigWithExactCLIFormat(t *testing.T) { + // This is the exact JSON content from the user's job-agent-config.json file + jobAgentConfigJSON := `{ + "template": "---\napiVersion: argoproj.io/v1alpha1\nkind: Application\nmetadata:\n name: '{{.Resource.Name}}-console'\n namespace: argocd\n labels:\n app.kubernetes.io/name: console\n environment: '{{.Environment.Name}}'\n deployment: console\n resource: '{{.Resource.Name}}'\nspec:\n project: default\n source:\n repoURL: git@github.com:wandb/deployments.git\n path: wandb/console\n targetRevision: '{{.Release.Version.Tag}}'\n helm:\n releaseName: console\n destination:\n name: '{{.Resource.Identifier}}'\n namespace: default\n syncPolicy:\n automated:\n prune: true\n selfHeal: true\n syncOptions:\n - CreateNamespace=true\n", + "type": "argo-cd" +}` // Parse it like the CLI does + var jobAgentConfig map[string]interface{} + err := json.Unmarshal([]byte(jobAgentConfigJSON), &jobAgentConfig) + require.NoError(t, err) t.Logf("Parsed jobAgentConfig from CLI: %+v", jobAgentConfig) // Check template is present + template, ok := jobAgentConfig["template"] + require.True(t, ok, "template field should be present") + require.NotEmpty(t, template, "template should not be empty") templateStr := template.(string) + t.Logf("Template length: %d", len(templateStr)) + assert.Contains(t, templateStr, "{{.Resource.Name}}-console", "template should contain the expected placeholder") + + // Now simulate what happens when this is sent through the API and event handler + // The API creates the version data like this: + versionData := map[string]any{ + "id": uuid.New().String(), + "deploymentId": uuid.New().String(), + "tag": "v1.0.0", + "name": "test-version", + "status": "ready", + "config": map[string]any{}, + "metadata": map[string]string{}, + "createdAt": "2024-01-01T00:00:00Z", + "jobAgentConfig": jobAgentConfig, // This is what the API does + } + + // Marshal to JSON (simulating what goes to Kafka) + eventJSON, err := json.Marshal(versionData) + require.NoError(t, err) + + t.Logf("Event JSON:\n%s", string(eventJSON)) + + // Unmarshal into DeploymentVersion (simulating workspace engine event handler) + var dv oapi.DeploymentVersion + err = json.Unmarshal(eventJSON, &dv) + require.NoError(t, err) + + // Verify template is still present after event handling + resultTemplate, ok := dv.JobAgentConfig["template"] + require.True(t, ok, "template should be present in DeploymentVersion.JobAgentConfig") + + resultTemplateStr := resultTemplate.(string) + t.Logf("After event unmarshaling - Template length: %d", len(resultTemplateStr)) + assert.Equal(t, len(templateStr), len(resultTemplateStr), "template length should be preserved") + assert.Contains(t, resultTemplateStr, "{{.Resource.Name}}-console", "template content should be preserved") +} From 188ec1b517553302db6b6258ed27ef4cf567b570 Mon Sep 17 00:00:00 2001 From: Zachary Blasczyk Date: Thu, 5 Feb 2026 12:29:42 -0600 Subject: [PATCH 2/4] adding some tests --- .../test/e2e/engine_argocd_template_test.go | 298 ------------------ .../test/e2e/engine_job_agent_tfe_test.go | 85 +++++ 2 files changed, 85 insertions(+), 298 deletions(-) delete mode 100644 apps/workspace-engine/test/e2e/engine_argocd_template_test.go create mode 100644 apps/workspace-engine/test/e2e/engine_job_agent_tfe_test.go diff --git a/apps/workspace-engine/test/e2e/engine_argocd_template_test.go b/apps/workspace-engine/test/e2e/engine_argocd_template_test.go deleted file mode 100644 index 891f2d23d..000000000 --- a/apps/workspace-engine/test/e2e/engine_argocd_template_test.go +++ /dev/null @@ -1,298 +0,0 @@ -package e2e - -import ( - "context" - "encoding/json" - "testing" - "workspace-engine/pkg/events/handler" - "workspace-engine/pkg/oapi" - "workspace-engine/test/integration" - c "workspace-engine/test/integration/creators" - - "github.com/google/uuid" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -// TestEngine_ArgoCD_TemplatePreservedInJobFlow traces the full ArgoCD template flow -// from DeploymentVersion creation through to Job creation. -// This verifies that the template field in JobAgentConfig is preserved at each step. -func TestEngine_ArgoCD_TemplatePreservedInJobFlow(t *testing.T) { - jobAgentId := uuid.New().String() - deploymentId := uuid.New().String() - environmentId := uuid.New().String() - resourceId := uuid.New().String() - versionId := uuid.New().String() - - // The template exactly as it would come from the CLI (job-agent-config.json) - argoCDTemplate := `--- -apiVersion: argoproj.io/v1alpha1 -kind: Application -metadata: - name: '{{.Resource.Name}}-console' - namespace: argocd - labels: - app.kubernetes.io/name: console - environment: '{{.Environment.Name}}' - deployment: console - resource: '{{.Resource.Name}}' -spec: - project: default - source: - repoURL: git@github.com:wandb/deployments.git - path: wandb/console - targetRevision: '{{.Release.Version.Tag}}' - helm: - releaseName: console - destination: - name: '{{.Resource.Identifier}}' - namespace: default - syncPolicy: - automated: - prune: true - selfHeal: true - syncOptions: - - CreateNamespace=true -` - - // Create ArgoCD job agent config - argoCDJobAgentConfig := map[string]any{ - "type": "argo-cd", - "serverUrl": "argocd.wandb.dev", - "apiKey": "test-api-key", - } - - engine := integration.NewTestWorkspace(t, - integration.WithJobAgent( - integration.JobAgentID(jobAgentId), - integration.JobAgentName("ArgoCD Agent"), - integration.JobAgentType("argo-cd"), - integration.JobAgentConfig(argoCDJobAgentConfig), - ), - integration.WithSystem( - integration.SystemName("test-system"), - integration.WithDeployment( - integration.DeploymentID(deploymentId), - integration.DeploymentName("console"), - integration.DeploymentJobAgent(jobAgentId), - integration.DeploymentCelResourceSelector("true"), - ), - integration.WithEnvironment( - integration.EnvironmentID(environmentId), - integration.EnvironmentName("production"), - integration.EnvironmentCelResourceSelector("true"), - ), - ), - integration.WithResource( - integration.ResourceID(resourceId), - integration.ResourceName("wandb-vashe-awstest"), - integration.ResourceIdentifier("wandb-vashe-awstest-cluster"), - integration.ResourceKind("kubernetes"), - ), - ) - - ctx := context.Background() - - // Verify release target was created - releaseTargets, err := engine.Workspace().ReleaseTargets().Items() - require.NoError(t, err) - require.Len(t, releaseTargets, 1, "expected 1 release target") - - // Create deployment version with ArgoCD template in JobAgentConfig - // This simulates what the CLI does: ctrlc api upsert version --job-agent-config-file job-agent-config.json - versionJobAgentConfig := map[string]any{ - "type": "argo-cd", - "template": argoCDTemplate, - } - - dv := c.NewDeploymentVersion() - dv.Id = versionId - dv.DeploymentId = deploymentId - dv.Tag = "v1.0.0" - dv.JobAgentConfig = versionJobAgentConfig - - // Log the version's JobAgentConfig before pushing the event - t.Logf("=== STEP 1: Version JobAgentConfig before event push ===") - dvConfigBytes, _ := json.MarshalIndent(dv.JobAgentConfig, "", " ") - t.Logf("DeploymentVersion.JobAgentConfig:\n%s", string(dvConfigBytes)) - - // Check template presence in version before event - if template, ok := dv.JobAgentConfig["template"]; ok { - t.Logf("✓ Template present in version before event push (length: %d)", len(template.(string))) - } else { - t.Errorf("✗ Template NOT present in version before event push") - } - - engine.PushEvent(ctx, handler.DeploymentVersionCreate, dv) - - // STEP 2: Check the stored version - t.Logf("=== STEP 2: Checking stored version ===") - storedVersion, found := engine.Workspace().DeploymentVersions().Get(versionId) - require.True(t, found, "stored version not found") - - storedConfigBytes, _ := json.MarshalIndent(storedVersion.JobAgentConfig, "", " ") - t.Logf("Stored version JobAgentConfig:\n%s", string(storedConfigBytes)) - - if template, ok := storedVersion.JobAgentConfig["template"]; ok { - t.Logf("✓ Template present in stored version (length: %d)", len(template.(string))) - } else { - t.Errorf("✗ Template NOT present in stored version - THIS IS WHERE IT'S LOST") - } - - // STEP 3: Check pending jobs - t.Logf("=== STEP 3: Checking pending jobs ===") - pendingJobs := engine.Workspace().Jobs().GetPending() - require.Len(t, pendingJobs, 1, "expected 1 pending job") - - var job *oapi.Job - for _, j := range pendingJobs { - job = j - break - } - - // Get the release for this job - release, found := engine.Workspace().Releases().Get(job.ReleaseId) - require.True(t, found, "release not found for job") - - // STEP 4: Check the release's version JobAgentConfig - t.Logf("=== STEP 4: Checking release's version ===") - releaseVersionConfigBytes, _ := json.MarshalIndent(release.Version.JobAgentConfig, "", " ") - t.Logf("Release.Version.JobAgentConfig:\n%s", string(releaseVersionConfigBytes)) - - if template, ok := release.Version.JobAgentConfig["template"]; ok { - t.Logf("✓ Template present in release.Version (length: %d)", len(template.(string))) - } else { - t.Errorf("✗ Template NOT present in release.Version") - } - - // STEP 5: Check the job's merged config - t.Logf("=== STEP 5: Checking job's merged config ===") - jobConfigBytes, _ := job.JobAgentConfig.MarshalJSON() - t.Logf("Job.JobAgentConfig:\n%s", string(jobConfigBytes)) - - // Try to get it as ArgoCD config - argoCDConfig, err := job.JobAgentConfig.AsFullArgoCDJobAgentConfig() - if err != nil { - t.Logf("Could not parse as ArgoCD config: %v", err) - // Try as custom config - customConfig, err := job.JobAgentConfig.AsFullCustomJobAgentConfig() - if err != nil { - t.Errorf("Could not parse job config as either ArgoCD or Custom: %v", err) - } else { - t.Logf("Parsed as custom config: %+v", customConfig) - if template, ok := customConfig.AdditionalProperties["template"]; ok { - t.Logf("✓ Template found in custom config (length: %d)", len(template.(string))) - } else { - t.Errorf("✗ Template NOT found in job config (custom)") - } - } - } else { - t.Logf("Parsed as ArgoCD config:") - t.Logf(" - Type: %s", argoCDConfig.Type) - t.Logf(" - ServerUrl: %s", argoCDConfig.ServerUrl) - t.Logf(" - Template length: %d", len(argoCDConfig.Template)) - - if argoCDConfig.Template != "" { - t.Logf("✓ Template present in job's ArgoCD config") - assert.Contains(t, argoCDConfig.Template, "{{.Resource.Name}}-console", "template should contain resource name placeholder") - } else { - t.Errorf("✗ Template is EMPTY in job's ArgoCD config - THIS IS THE BUG") - } - } - - // Final assertions - assert.Equal(t, oapi.JobStatusPending, job.Status) - assert.Equal(t, jobAgentId, job.JobAgentId) -} - -// TestEngine_ArgoCD_VersionJobAgentConfigPreservedThroughEventHandler tests that -// the JobAgentConfig is preserved when the event is handled by the workspace engine. -// This specifically tests the JSON marshaling/unmarshaling in the event handler. -func TestEngine_ArgoCD_VersionJobAgentConfigPreservedThroughEventHandler(t *testing.T) { - // Create the version data exactly as it would come from the API/CLI - versionData := map[string]any{ - "id": uuid.New().String(), - "deploymentId": uuid.New().String(), - "tag": "v1.0.0", - "name": "test-version", - "status": "ready", - "config": map[string]any{}, - "metadata": map[string]string{}, - "createdAt": "2024-01-01T00:00:00Z", - "jobAgentConfig": map[string]any{ - "type": "argo-cd", - "template": "apiVersion: argoproj.io/v1alpha1\nkind: Application\nmetadata:\n name: '{{.Resource.Name}}'", - }, - } - - // Marshal to JSON (simulating what comes over the wire) - jsonData, err := json.Marshal(versionData) - require.NoError(t, err) - - t.Logf("JSON event data:\n%s", string(jsonData)) - - // Unmarshal into DeploymentVersion (simulating event handler) - var dv oapi.DeploymentVersion - err = json.Unmarshal(jsonData, &dv) - require.NoError(t, err) - - t.Logf("Unmarshaled version JobAgentConfig: %+v", dv.JobAgentConfig) - - // Check template is preserved - template, ok := dv.JobAgentConfig["template"] - require.True(t, ok, "template field should be present in JobAgentConfig") - assert.Contains(t, template.(string), "{{.Resource.Name}}", "template should contain resource name placeholder") -} - -// TestEngine_ArgoCD_VersionJobAgentConfigWithExactCLIFormat tests the exact format -// that the CLI sends when using --job-agent-config-file flag. -// This simulates: ctrlc api upsert version --job-agent-config-file job-agent-config.json -func TestEngine_ArgoCD_VersionJobAgentConfigWithExactCLIFormat(t *testing.T) { - // This is the exact JSON content from the user's job-agent-config.json file - jobAgentConfigJSON := `{ - "template": "---\napiVersion: argoproj.io/v1alpha1\nkind: Application\nmetadata:\n name: '{{.Resource.Name}}-console'\n namespace: argocd\n labels:\n app.kubernetes.io/name: console\n environment: '{{.Environment.Name}}'\n deployment: console\n resource: '{{.Resource.Name}}'\nspec:\n project: default\n source:\n repoURL: git@github.com:wandb/deployments.git\n path: wandb/console\n targetRevision: '{{.Release.Version.Tag}}'\n helm:\n releaseName: console\n destination:\n name: '{{.Resource.Identifier}}'\n namespace: default\n syncPolicy:\n automated:\n prune: true\n selfHeal: true\n syncOptions:\n - CreateNamespace=true\n", - "type": "argo-cd" -}` // Parse it like the CLI does - var jobAgentConfig map[string]interface{} - err := json.Unmarshal([]byte(jobAgentConfigJSON), &jobAgentConfig) - require.NoError(t, err) t.Logf("Parsed jobAgentConfig from CLI: %+v", jobAgentConfig) // Check template is present - template, ok := jobAgentConfig["template"] - require.True(t, ok, "template field should be present") - require.NotEmpty(t, template, "template should not be empty") templateStr := template.(string) - t.Logf("Template length: %d", len(templateStr)) - assert.Contains(t, templateStr, "{{.Resource.Name}}-console", "template should contain the expected placeholder") - - // Now simulate what happens when this is sent through the API and event handler - // The API creates the version data like this: - versionData := map[string]any{ - "id": uuid.New().String(), - "deploymentId": uuid.New().String(), - "tag": "v1.0.0", - "name": "test-version", - "status": "ready", - "config": map[string]any{}, - "metadata": map[string]string{}, - "createdAt": "2024-01-01T00:00:00Z", - "jobAgentConfig": jobAgentConfig, // This is what the API does - } - - // Marshal to JSON (simulating what goes to Kafka) - eventJSON, err := json.Marshal(versionData) - require.NoError(t, err) - - t.Logf("Event JSON:\n%s", string(eventJSON)) - - // Unmarshal into DeploymentVersion (simulating workspace engine event handler) - var dv oapi.DeploymentVersion - err = json.Unmarshal(eventJSON, &dv) - require.NoError(t, err) - - // Verify template is still present after event handling - resultTemplate, ok := dv.JobAgentConfig["template"] - require.True(t, ok, "template should be present in DeploymentVersion.JobAgentConfig") - - resultTemplateStr := resultTemplate.(string) - t.Logf("After event unmarshaling - Template length: %d", len(resultTemplateStr)) - assert.Equal(t, len(templateStr), len(resultTemplateStr), "template length should be preserved") - assert.Contains(t, resultTemplateStr, "{{.Resource.Name}}-console", "template content should be preserved") -} diff --git a/apps/workspace-engine/test/e2e/engine_job_agent_tfe_test.go b/apps/workspace-engine/test/e2e/engine_job_agent_tfe_test.go new file mode 100644 index 000000000..411f5b246 --- /dev/null +++ b/apps/workspace-engine/test/e2e/engine_job_agent_tfe_test.go @@ -0,0 +1,85 @@ +package e2e + +import ( + "context" + "testing" + "workspace-engine/pkg/events/handler" + "workspace-engine/pkg/oapi" + "workspace-engine/test/integration" + c "workspace-engine/test/integration/creators" +) + +func TestEngine_TerraformCloudJobAgentConfigMerge(t *testing.T) { + engine := integration.NewTestWorkspace(t) + workspaceID := engine.Workspace().ID + ctx := context.Background() + + jobAgent := c.NewJobAgent(workspaceID) + jobAgent.Type = "tfe" + jobAgent.Config = map[string]any{ + "address": "https://app.terraform.io", + "organization": "org-agent", + "token": "token-agent", + "template": "name: agent-workspace", + } + engine.PushEvent(ctx, handler.JobAgentCreate, jobAgent) + + sys := c.NewSystem(workspaceID) + engine.PushEvent(ctx, handler.SystemCreate, sys) + + deployment := c.NewDeployment(sys.Id) + deployment.JobAgentId = &jobAgent.Id + deployment.JobAgentConfig = map[string]any{ + "organization": "org-deployment", + "template": "name: deployment-workspace", + } + deployment.ResourceSelector = &oapi.Selector{} + _ = deployment.ResourceSelector.FromCelSelector(oapi.CelSelector{Cel: "true"}) + engine.PushEvent(ctx, handler.DeploymentCreate, deployment) + + environment := c.NewEnvironment(sys.Id) + environment.ResourceSelector = &oapi.Selector{} + _ = environment.ResourceSelector.FromCelSelector(oapi.CelSelector{Cel: "true"}) + engine.PushEvent(ctx, handler.EnvironmentCreate, environment) + + resource := c.NewResource(workspaceID) + engine.PushEvent(ctx, handler.ResourceCreate, resource) + + version := c.NewDeploymentVersion() + version.DeploymentId = deployment.Id + version.Tag = "v1.0.0" + version.JobAgentConfig = map[string]any{ + "token": "token-version", + "template": "name: version-workspace", + } + engine.PushEvent(ctx, handler.DeploymentVersionCreate, version) + + pendingJobs := engine.Workspace().Jobs().GetPending() + if len(pendingJobs) != 1 { + t.Fatalf("expected 1 pending job, got %d", len(pendingJobs)) + } + + var job *oapi.Job + for _, j := range pendingJobs { + job = j + break + } + + if job.JobAgentId != jobAgent.Id { + t.Fatalf("expected job agent id %s, got %s", jobAgent.Id, job.JobAgentId) + } + + cfg := job.JobAgentConfig + if cfg["address"] != "https://app.terraform.io" { + t.Fatalf("expected address from agent config, got %v", cfg["address"]) + } + if cfg["organization"] != "org-deployment" { + t.Fatalf("expected organization from deployment config, got %v", cfg["organization"]) + } + if cfg["token"] != "token-version" { + t.Fatalf("expected token from version config, got %v", cfg["token"]) + } + if cfg["template"] != "name: version-workspace" { + t.Fatalf("expected template from version config, got %v", cfg["template"]) + } +} From 80900fd449722c4104629b412116a8b30b4f5174 Mon Sep 17 00:00:00 2001 From: Zachary Blasczyk Date: Wed, 25 Feb 2026 13:05:20 -0600 Subject: [PATCH 3/4] refactor: TFE Job Agent --- apps/workspace-engine/package.json | 4 +- .../pkg/workqueue/memory/memory.go | 2 +- .../pkg/workspace/jobagents/registry.go | 57 ++- .../workspace/jobagents/terraformcloud/tfe.go | 385 ++++++++++------ .../jobagents/terraformcloud/tfe_test.go | 429 ++++++++++++++++++ .../terraformcloud/tfe_verifications.go | 61 --- .../pkg/workspace/jobagents/types/types.go | 7 + .../pkg/workspace/releasemanager/manager.go | 32 +- .../controller.go | 3 + .../http/server/openapi/deployments/server.go | 1 + 10 files changed, 769 insertions(+), 212 deletions(-) create mode 100644 apps/workspace-engine/pkg/workspace/jobagents/terraformcloud/tfe_test.go delete mode 100644 apps/workspace-engine/pkg/workspace/jobagents/terraformcloud/tfe_verifications.go diff --git a/apps/workspace-engine/package.json b/apps/workspace-engine/package.json index cb18f1ff4..943c94712 100644 --- a/apps/workspace-engine/package.json +++ b/apps/workspace-engine/package.json @@ -2,7 +2,7 @@ "name": "@ctrlplane/workspace-engine", "scripts": { "dev": "air", - "build": "go build -o ./bin/workspace-engine main.go", + "build": "go build -o ./bin/workspace-engine .", "lint": "bash -c 'golangci-lint run --allow-parallel-runners'", "lint:fix": "golangci-lint run --fix", "start": "./bin/workspace-engine", @@ -11,4 +11,4 @@ "format": "bash -c 'go fmt ./...'", "clean": "go clean ./..." } -} +} \ No newline at end of file diff --git a/apps/workspace-engine/pkg/workqueue/memory/memory.go b/apps/workspace-engine/pkg/workqueue/memory/memory.go index 1710f908c..99c3e1fba 100644 --- a/apps/workspace-engine/pkg/workqueue/memory/memory.go +++ b/apps/workspace-engine/pkg/workqueue/memory/memory.go @@ -153,7 +153,7 @@ func (q *Queue) Enqueue(ctx context.Context, params workqueue.EnqueueParams) err s.ClaimedBy = "" s.ClaimedUntil = nil } - if !(s.ClaimedUntil != nil && s.ClaimedUntil.After(now)) { + if s.ClaimedUntil == nil || !s.ClaimedUntil.After(now) { s.UpdatedAt = now } diff --git a/apps/workspace-engine/pkg/workspace/jobagents/registry.go b/apps/workspace-engine/pkg/workspace/jobagents/registry.go index 48e37ff81..08852f9ca 100644 --- a/apps/workspace-engine/pkg/workspace/jobagents/registry.go +++ b/apps/workspace-engine/pkg/workspace/jobagents/registry.go @@ -3,6 +3,7 @@ package jobagents import ( "context" "fmt" + "time" "workspace-engine/pkg/oapi" "workspace-engine/pkg/workspace/jobagents/argo" @@ -12,6 +13,8 @@ import ( "workspace-engine/pkg/workspace/jobagents/types" "workspace-engine/pkg/workspace/releasemanager/verification" "workspace-engine/pkg/workspace/store" + + "github.com/charmbracelet/log" ) type Registry struct { @@ -26,7 +29,7 @@ func NewRegistry(store *store.Store, verifications *verification.Manager) *Regis r.Register(testrunner.New(store)) r.Register(argo.NewArgoApplication(store, verifications)) - r.Register(terraformcloud.NewTFE(store, verifications)) + r.Register(terraformcloud.NewTFE(store)) r.Register(github.NewGithubAction(store)) return r @@ -37,6 +40,58 @@ func (r *Registry) Register(dispatcher types.Dispatchable) { r.dispatchers[dispatcher.Type()] = dispatcher } +// RestoreJobs resumes tracking for all in-processing jobs after an engine restart. +// Dispatchers that implement Restorable get their jobs back; remaining orphaned +// jobs are failed. +func (r *Registry) RestoreJobs(ctx context.Context) { + allJobs := r.store.Jobs.Items() + + // Collect in-processing jobs grouped by job agent type + jobsByType := make(map[string][]*oapi.Job) + for _, job := range allJobs { + if !job.IsInProcessingState() { + continue + } + agent, ok := r.store.JobAgents.Get(job.JobAgentId) + if !ok { + continue + } + jobsByType[agent.Type] = append(jobsByType[agent.Type], job) + } + + if len(jobsByType) == 0 { + return + } + + now := time.Now().UTC() + for agentType, jobs := range jobsByType { + dispatcher, ok := r.dispatchers[agentType] + if !ok { + continue + } + + restorable, ok := dispatcher.(types.Restorable) + if ok { + log.Info("Restoring jobs for dispatcher", "type", agentType, "count", len(jobs)) + if err := restorable.RestoreJobs(ctx, jobs); err != nil { + log.Error("Failed to restore jobs", "type", agentType, "error", err) + } + continue + } + + // Non-restorable dispatcher: fail orphaned jobs + for _, job := range jobs { + message := "Job was in-progress when the engine restarted; dispatch goroutine lost" + job.Status = oapi.JobStatusFailure + job.Message = &message + job.CompletedAt = &now + job.UpdatedAt = now + r.store.Jobs.Upsert(ctx, job) + log.Info("Marked orphaned job as failed", "jobId", job.Id, "agentType", agentType) + } + } +} + func (r *Registry) Dispatch(ctx context.Context, job *oapi.Job) error { jobAgent, ok := r.store.JobAgents.Get(job.JobAgentId) if !ok { diff --git a/apps/workspace-engine/pkg/workspace/jobagents/terraformcloud/tfe.go b/apps/workspace-engine/pkg/workspace/jobagents/terraformcloud/tfe.go index 077f596c2..84672d5ac 100644 --- a/apps/workspace-engine/pkg/workspace/jobagents/terraformcloud/tfe.go +++ b/apps/workspace-engine/pkg/workspace/jobagents/terraformcloud/tfe.go @@ -8,25 +8,22 @@ import ( "maps" "strings" "time" - "workspace-engine/pkg/config" "workspace-engine/pkg/messaging" - "workspace-engine/pkg/messaging/confluent" "workspace-engine/pkg/oapi" "workspace-engine/pkg/templatefuncs" "workspace-engine/pkg/workspace/jobagents/types" - "workspace-engine/pkg/workspace/releasemanager/verification" "workspace-engine/pkg/workspace/store" - confluentkafka "github.com/confluentinc/confluent-kafka-go/v2/kafka" + "github.com/charmbracelet/log" "github.com/hashicorp/go-tfe" "sigs.k8s.io/yaml" ) var _ types.Dispatchable = &TFE{} +var _ types.Restorable = &TFE{} type TFE struct { - store *store.Store - verifications *verification.Manager + store *store.Store } type VCSRepoTemplate struct { @@ -66,8 +63,8 @@ type VariableTemplate struct { Sensitive bool `json:"sensitive,omitempty" yaml:"sensitive,omitempty"` } -func NewTFE(store *store.Store, verifications *verification.Manager) *TFE { - return &TFE{store: store, verifications: verifications} +func NewTFE(store *store.Store) *TFE { + return &TFE{store: store} } func (t *TFE) Type() string { @@ -90,38 +87,276 @@ func (t *TFE) Dispatch(ctx context.Context, job *oapi.Job) error { ctx := context.WithoutCancel(ctx) client, err := t.getClient(address, token) if err != nil { - t.sendJobFailureEvent(job, fmt.Sprintf("failed to create Terraform Cloud client: %s", err.Error())) + t.sendJobEvent(job, oapi.JobStatusFailure, fmt.Sprintf("failed to create Terraform Cloud client: %s", err.Error()), nil, address, organization, "") return } targetWorkspace, err := t.upsertWorkspace(ctx, client, organization, workspace) if err != nil { - t.sendJobFailureEvent(job, fmt.Sprintf("failed to upsert workspace: %s", err.Error())) + t.sendJobEvent(job, oapi.JobStatusFailure, fmt.Sprintf("failed to upsert workspace: %s", err.Error()), nil, address, organization, "") return } if len(workspace.Variables) > 0 { if err := t.syncVariables(ctx, client, targetWorkspace.ID, workspace.Variables); err != nil { - t.sendJobFailureEvent(job, fmt.Sprintf("failed to sync variables: %s", err.Error())) + t.sendJobEvent(job, oapi.JobStatusFailure, fmt.Sprintf("failed to sync variables: %s", err.Error()), nil, address, organization, targetWorkspace.Name) return } } run, err := t.createRun(ctx, client, targetWorkspace.ID, job.Id) if err != nil { - t.sendJobFailureEvent(job, fmt.Sprintf("failed to create run: %s", err.Error())) + t.sendJobEvent(job, oapi.JobStatusFailure, fmt.Sprintf("failed to create run: %s", err.Error()), nil, address, organization, targetWorkspace.Name) return } - verification := newTFERunVerification(t.verifications, job, address, token, run.ID) - if err := verification.StartVerification(ctx); err != nil { - t.sendJobFailureEvent(job, fmt.Sprintf("failed to start verification: %s", err.Error())) + t.sendJobEvent(job, oapi.JobStatusInProgress, "Run created, polling status...", run, address, organization, targetWorkspace.Name) + t.pollRunStatus(ctx, client, run.ID, job, address, organization, targetWorkspace.Name) + }() + + return nil +} + +// RestoreJobs resumes polling for in-flight TFC runs after an engine restart. +// Jobs with an ExternalId (the TFC run ID) are resumed; jobs without one are +// marked as externalRunNotFound. +func (t *TFE) RestoreJobs(ctx context.Context, jobs []*oapi.Job) error { + for _, job := range jobs { + if job.ExternalId == nil || *job.ExternalId == "" { + msg := "Run ID not recorded before engine restart" + t.sendJobEvent(job, oapi.JobStatusExternalRunNotFound, msg, nil, "", "", "") + continue + } + + address, token, organization, _, err := t.parseJobAgentConfig(job.DispatchContext.JobAgentConfig) + if err != nil { + msg := fmt.Sprintf("Failed to parse job agent config on restore: %s", err.Error()) + t.sendJobEvent(job, oapi.JobStatusFailure, msg, nil, "", "", "") + continue + } + + client, err := t.getClient(address, token) + if err != nil { + msg := fmt.Sprintf("Failed to create TFE client on restore: %s", err.Error()) + t.sendJobEvent(job, oapi.JobStatusFailure, msg, nil, address, organization, "") + continue + } + + runID := *job.ExternalId + log.Info("Restoring TFC run polling", "jobId", job.Id, "runId", runID) + go t.pollRunStatus(ctx, client, runID, job, address, organization, "") + } + return nil +} + +func (t *TFE) pollRunStatus(ctx context.Context, client *tfe.Client, runID string, job *oapi.Job, address, organization, wsName string) { + ticker := time.NewTicker(15 * time.Second) + defer ticker.Stop() + + var lastStatus tfe.RunStatus + for { + select { + case <-ctx.Done(): return + case <-ticker.C: + run, err := client.Runs.ReadWithOptions(ctx, runID, &tfe.RunReadOptions{ + Include: []tfe.RunIncludeOpt{tfe.RunPlan}, + }) + if err != nil { + log.Error("Failed to poll TFC run status", "runId", runID, "error", err) + continue + } + if run.Status == lastStatus { + continue + } + lastStatus = run.Status + + jobStatus, message := mapRunStatus(run) + if err := t.sendJobEvent(job, jobStatus, message, run, address, organization, wsName); err != nil { + log.Error("Failed to send job event", "runId", runID, "error", err) + } + + if isTerminalJobStatus(jobStatus) { + return + } } + } +} - t.sendJobUpdateEvent(address, organization, targetWorkspace.Name, run, job) - }() +func isTerminalJobStatus(status oapi.JobStatus) bool { + switch status { + case oapi.JobStatusSuccessful, oapi.JobStatusFailure, oapi.JobStatusCancelled, oapi.JobStatusExternalRunNotFound: + return true + default: + return false + } +} + +func mapRunStatus(run *tfe.Run) (oapi.JobStatus, string) { + changes := formatResourceChanges(run) + + switch run.Status { + case tfe.RunPending: + pos := run.PositionInQueue + return oapi.JobStatusPending, fmt.Sprintf("Run pending in queue (position: %d)", pos) + + case tfe.RunFetching, tfe.RunFetchingCompleted: + return oapi.JobStatusInProgress, "Fetching configuration..." + + case tfe.RunPrePlanRunning, tfe.RunPrePlanCompleted: + return oapi.JobStatusInProgress, "Running pre-plan tasks..." + + case tfe.RunQueuing, tfe.RunPlanQueued: + return oapi.JobStatusInProgress, "Queued for planning..." + + case tfe.RunPlanning: + return oapi.JobStatusInProgress, "Planning..." + + case tfe.RunPlanned: + if run.Actions != nil && run.Actions.IsConfirmable { + return oapi.JobStatusActionRequired, fmt.Sprintf("Plan complete — awaiting approval. %s", changes) + } + return oapi.JobStatusInProgress, "Plan complete, auto-applying..." + + case tfe.RunPlannedAndFinished: + return oapi.JobStatusSuccessful, fmt.Sprintf("Plan complete (no changes). %s", changes) + + case tfe.RunPlannedAndSaved: + return oapi.JobStatusSuccessful, fmt.Sprintf("Plan saved. %s", changes) + + case tfe.RunCostEstimating, tfe.RunCostEstimated: + return oapi.JobStatusInProgress, "Estimating costs..." + + case tfe.RunPolicyChecking, tfe.RunPolicyChecked: + return oapi.JobStatusInProgress, "Checking policies..." + + case tfe.RunPolicyOverride: + return oapi.JobStatusActionRequired, "Policy check failed — awaiting override" + + case tfe.RunPolicySoftFailed: + return oapi.JobStatusActionRequired, "Policy soft-failed — awaiting override" + + case tfe.RunPostPlanRunning, tfe.RunPostPlanCompleted, tfe.RunPostPlanAwaitingDecision: + return oapi.JobStatusInProgress, "Running post-plan tasks..." + + case tfe.RunConfirmed: + return oapi.JobStatusInProgress, "Confirmed, queuing apply..." + + case tfe.RunApplyQueued, tfe.RunQueuingApply: + return oapi.JobStatusInProgress, "Queued for apply..." + + case tfe.RunApplying: + return oapi.JobStatusInProgress, "Applying..." + + case tfe.RunApplied: + return oapi.JobStatusSuccessful, fmt.Sprintf("Applied successfully. %s", changes) + + case tfe.RunPreApplyRunning, tfe.RunPreApplyCompleted: + return oapi.JobStatusInProgress, "Running pre-apply tasks..." + + case tfe.RunErrored: + return oapi.JobStatusFailure, fmt.Sprintf("Run errored: %s", run.Message) + + case tfe.RunCanceled: + return oapi.JobStatusCancelled, "Run was canceled" + + case tfe.RunDiscarded: + return oapi.JobStatusCancelled, "Run was discarded" + + default: + return oapi.JobStatusInProgress, fmt.Sprintf("Run status: %s", run.Status) + } +} + +func formatResourceChanges(run *tfe.Run) string { + if run.Plan == nil { + return "+0/~0/-0" + } + return fmt.Sprintf("+%d/~%d/-%d", + run.Plan.ResourceAdditions, + run.Plan.ResourceChanges, + run.Plan.ResourceDestructions, + ) +} + +func (t *TFE) sendJobEvent(job *oapi.Job, status oapi.JobStatus, message string, run *tfe.Run, address, organization, wsName string) error { + workspaceId := t.store.ID() + now := time.Now().UTC() + + fields := []oapi.JobUpdateEventFieldsToUpdate{ + oapi.JobUpdateEventFieldsToUpdateStatus, + oapi.JobUpdateEventFieldsToUpdateMessage, + oapi.JobUpdateEventFieldsToUpdateUpdatedAt, + } + + eventJob := oapi.Job{ + Id: job.Id, + Status: status, + Message: &message, + UpdatedAt: now, + } + + // Set externalId from the run + if run != nil { + eventJob.ExternalId = &run.ID + fields = append(fields, oapi.JobUpdateEventFieldsToUpdateExternalId) + } + + // Set metadata with links when we have enough info + if run != nil && address != "" && organization != "" && wsName != "" { + runUrl := fmt.Sprintf("%s/app/%s/workspaces/%s/runs/%s", address, organization, wsName, run.ID) + if !strings.HasPrefix(runUrl, "https://") { + runUrl = "https://" + runUrl + } + workspaceUrl := fmt.Sprintf("%s/app/%s/workspaces/%s", address, organization, wsName) + if !strings.HasPrefix(workspaceUrl, "https://") { + workspaceUrl = "https://" + workspaceUrl + } + + links := map[string]string{ + "TFE Run": runUrl, + "TFE Workspace": workspaceUrl, + } + linksJSON, err := json.Marshal(links) + if err == nil { + newJobMetadata := make(map[string]string) + maps.Copy(newJobMetadata, job.Metadata) + newJobMetadata["ctrlplane/links"] = string(linksJSON) + eventJob.Metadata = newJobMetadata + fields = append(fields, oapi.JobUpdateEventFieldsToUpdateMetadata) + } + } + + if isTerminalJobStatus(status) { + eventJob.CompletedAt = &now + fields = append(fields, oapi.JobUpdateEventFieldsToUpdateCompletedAt) + } + + // Set startedAt for first non-pending transition + if status == oapi.JobStatusInProgress { + eventJob.StartedAt = &now + fields = append(fields, oapi.JobUpdateEventFieldsToUpdateStartedAt) + } + eventPayload := oapi.JobUpdateEvent{ + Id: &job.Id, + Job: eventJob, + FieldsToUpdate: &fields, + } + + event := map[string]any{ + "eventType": "job.updated", + "workspaceId": workspaceId, + "data": eventPayload, + "timestamp": time.Now().Unix(), + } + eventBytes, err := json.Marshal(event) + if err != nil { + return fmt.Errorf("failed to marshal event: %w", err) + } + if err := messaging.Publish([]byte(workspaceId), eventBytes); err != nil { + return fmt.Errorf("failed to publish event: %w", err) + } return nil } @@ -251,122 +486,6 @@ func (t *TFE) createRun(ctx context.Context, client *tfe.Client, workspaceID, jo return run, nil } -func (t *TFE) sendJobFailureEvent(job *oapi.Job, message string) error { - workspaceId := t.store.ID() - - now := time.Now().UTC() - eventPayload := oapi.JobUpdateEvent{ - Id: &job.Id, - Job: oapi.Job{ - Id: job.Id, - Status: oapi.JobStatusFailure, - Message: &message, - UpdatedAt: now, - CompletedAt: &now, - }, - FieldsToUpdate: &[]oapi.JobUpdateEventFieldsToUpdate{ - oapi.JobUpdateEventFieldsToUpdateStatus, - oapi.JobUpdateEventFieldsToUpdateMessage, - oapi.JobUpdateEventFieldsToUpdateCompletedAt, - oapi.JobUpdateEventFieldsToUpdateUpdatedAt, - }, - } - producer, err := t.getKafkaProducer() - if err != nil { - return fmt.Errorf("failed to create Kafka producer: %w", err) - } - defer producer.Close() - - event := map[string]any{ - "eventType": "job.updated", - "workspaceId": workspaceId, - "data": eventPayload, - "timestamp": time.Now().Unix(), - } - eventBytes, err := json.Marshal(event) - if err != nil { - return fmt.Errorf("failed to marshal event: %w", err) - } - if err := producer.Publish([]byte(workspaceId), eventBytes); err != nil { - return fmt.Errorf("failed to publish event: %w", err) - } - return nil -} - -func (t *TFE) sendJobUpdateEvent(address, organization, workspaceName string, run *tfe.Run, job *oapi.Job) error { - workspaceId := t.store.ID() - - runUrl := fmt.Sprintf("%s/app/%s/workspaces/%s/runs/%s", address, organization, workspaceName, run.ID) - if !strings.HasPrefix(runUrl, "https://") { - runUrl = "https://" + runUrl - } - - workspaceUrl := fmt.Sprintf("%s/app/%s/workspaces/%s", address, organization, workspaceName) - if !strings.HasPrefix(workspaceUrl, "https://") { - workspaceUrl = "https://" + workspaceUrl - } - - links := make(map[string]string) - links["TFE Run"] = runUrl - links["TFE Workspace"] = workspaceUrl - linksJSON, err := json.Marshal(links) - if err != nil { - return fmt.Errorf("failed to marshal links: %w", err) - } - - newJobMetadata := make(map[string]string) - maps.Copy(newJobMetadata, job.Metadata) - newJobMetadata[string("ctrlplane/links")] = string(linksJSON) - - now := time.Now().UTC() - eventPayload := oapi.JobUpdateEvent{ - Id: &job.Id, - Job: oapi.Job{ - Id: job.Id, - Metadata: newJobMetadata, - Status: oapi.JobStatusSuccessful, - UpdatedAt: now, - CompletedAt: &now, - }, - FieldsToUpdate: &[]oapi.JobUpdateEventFieldsToUpdate{ - oapi.JobUpdateEventFieldsToUpdateStatus, - oapi.JobUpdateEventFieldsToUpdateMetadata, - oapi.JobUpdateEventFieldsToUpdateCompletedAt, - oapi.JobUpdateEventFieldsToUpdateUpdatedAt, - }, - } - producer, err := t.getKafkaProducer() - if err != nil { - return fmt.Errorf("failed to create Kafka producer: %w", err) - } - defer producer.Close() - - event := map[string]any{ - "eventType": "job.updated", - "workspaceId": workspaceId, - "data": eventPayload, - "timestamp": time.Now().Unix(), - } - eventBytes, err := json.Marshal(event) - if err != nil { - return fmt.Errorf("failed to marshal event: %w", err) - } - if err := producer.Publish([]byte(workspaceId), eventBytes); err != nil { - return fmt.Errorf("failed to publish event: %w", err) - } - return nil -} - -func (t *TFE) getKafkaProducer() (messaging.Producer, error) { - return confluent.NewConfluent(config.Global.KafkaBrokers).CreateProducer(config.Global.KafkaTopic, &confluentkafka.ConfigMap{ - "bootstrap.servers": config.Global.KafkaBrokers, - "enable.idempotence": true, - "compression.type": "snappy", - "message.send.max.retries": 10, - "retry.backoff.ms": 100, - }) -} - func (w *WorkspaceTemplate) toCreateOptions() tfe.WorkspaceCreateOptions { opts := tfe.WorkspaceCreateOptions{ Name: &w.Name, diff --git a/apps/workspace-engine/pkg/workspace/jobagents/terraformcloud/tfe_test.go b/apps/workspace-engine/pkg/workspace/jobagents/terraformcloud/tfe_test.go new file mode 100644 index 000000000..2a51c92e0 --- /dev/null +++ b/apps/workspace-engine/pkg/workspace/jobagents/terraformcloud/tfe_test.go @@ -0,0 +1,429 @@ +package terraformcloud + +import ( + "encoding/json" + "testing" + "workspace-engine/pkg/oapi" + + "github.com/hashicorp/go-tfe" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// ===== parseJobAgentConfig ===== + +func TestParseJobAgentConfig_Valid(t *testing.T) { + tfeInst := &TFE{} + cfg := oapi.JobAgentConfig{ + "address": "https://app.terraform.io", + "token": "my-token", + "organization": "my-org", + "template": "name: {{ .Resource.Name }}", + } + address, token, org, tmpl, err := tfeInst.parseJobAgentConfig(cfg) + require.NoError(t, err) + assert.Equal(t, "https://app.terraform.io", address) + assert.Equal(t, "my-token", token) + assert.Equal(t, "my-org", org) + assert.Equal(t, "name: {{ .Resource.Name }}", tmpl) +} + +func TestParseJobAgentConfig_MissingFields(t *testing.T) { + tfeInst := &TFE{} + tests := []struct { + name string + cfg oapi.JobAgentConfig + }{ + {"missing address", oapi.JobAgentConfig{"token": "t", "organization": "o", "template": "t"}}, + {"missing token", oapi.JobAgentConfig{"address": "a", "organization": "o", "template": "t"}}, + {"missing organization", oapi.JobAgentConfig{"address": "a", "token": "t", "template": "t"}}, + {"missing template", oapi.JobAgentConfig{"address": "a", "token": "t", "organization": "o"}}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, _, _, _, err := tfeInst.parseJobAgentConfig(tt.cfg) + require.Error(t, err) + }) + } +} + +func TestParseJobAgentConfig_EmptyValues(t *testing.T) { + tfeInst := &TFE{} + cfg := oapi.JobAgentConfig{ + "address": "", + "token": "my-token", + "organization": "my-org", + "template": "name: foo", + } + _, _, _, _, err := tfeInst.parseJobAgentConfig(cfg) + require.Error(t, err) + assert.Contains(t, err.Error(), "missing required fields") +} + +func TestParseJobAgentConfig_WrongType(t *testing.T) { + tfeInst := &TFE{} + cfg := oapi.JobAgentConfig{ + "address": 123, + "token": "my-token", + "organization": "my-org", + "template": "name: foo", + } + _, _, _, _, err := tfeInst.parseJobAgentConfig(cfg) + require.Error(t, err) + assert.Contains(t, err.Error(), "address is required") +} + +// ===== mapRunStatus ===== + +func makeRun(status tfe.RunStatus) *tfe.Run { + return &tfe.Run{Status: status, Plan: &tfe.Plan{}} +} + +func TestMapRunStatus_SuccessfulStates(t *testing.T) { + tests := []struct { + status tfe.RunStatus + }{ + {tfe.RunApplied}, + {tfe.RunPlannedAndFinished}, + {tfe.RunPlannedAndSaved}, + } + for _, tt := range tests { + t.Run(string(tt.status), func(t *testing.T) { + run := makeRun(tt.status) + jobStatus, _ := mapRunStatus(run) + assert.Equal(t, oapi.JobStatusSuccessful, jobStatus) + }) + } +} + +func TestMapRunStatus_FailureStates(t *testing.T) { + run := makeRun(tfe.RunErrored) + run.Message = "something broke" + jobStatus, msg := mapRunStatus(run) + assert.Equal(t, oapi.JobStatusFailure, jobStatus) + assert.Contains(t, msg, "something broke") +} + +func TestMapRunStatus_CancelledStates(t *testing.T) { + tests := []struct { + status tfe.RunStatus + }{ + {tfe.RunCanceled}, + {tfe.RunDiscarded}, + } + for _, tt := range tests { + t.Run(string(tt.status), func(t *testing.T) { + run := makeRun(tt.status) + jobStatus, _ := mapRunStatus(run) + assert.Equal(t, oapi.JobStatusCancelled, jobStatus) + }) + } +} + +func TestMapRunStatus_ActionRequiredStates(t *testing.T) { + t.Run("planned_confirmable", func(t *testing.T) { + run := makeRun(tfe.RunPlanned) + run.Actions = &tfe.RunActions{IsConfirmable: true} + jobStatus, msg := mapRunStatus(run) + assert.Equal(t, oapi.JobStatusActionRequired, jobStatus) + assert.Contains(t, msg, "awaiting approval") + }) + + t.Run("planned_not_confirmable", func(t *testing.T) { + run := makeRun(tfe.RunPlanned) + jobStatus, _ := mapRunStatus(run) + assert.Equal(t, oapi.JobStatusInProgress, jobStatus) + }) + + t.Run("policy_override", func(t *testing.T) { + run := makeRun(tfe.RunPolicyOverride) + jobStatus, _ := mapRunStatus(run) + assert.Equal(t, oapi.JobStatusActionRequired, jobStatus) + }) + + t.Run("policy_soft_failed", func(t *testing.T) { + run := makeRun(tfe.RunPolicySoftFailed) + jobStatus, _ := mapRunStatus(run) + assert.Equal(t, oapi.JobStatusActionRequired, jobStatus) + }) +} + +func TestMapRunStatus_PendingState(t *testing.T) { + run := makeRun(tfe.RunPending) + jobStatus, msg := mapRunStatus(run) + assert.Equal(t, oapi.JobStatusPending, jobStatus) + assert.Contains(t, msg, "pending in queue") +} + +func TestMapRunStatus_InProgressStates(t *testing.T) { + inProgressStatuses := []tfe.RunStatus{ + tfe.RunFetching, + tfe.RunFetchingCompleted, + tfe.RunPrePlanRunning, + tfe.RunPrePlanCompleted, + tfe.RunQueuing, + tfe.RunPlanQueued, + tfe.RunPlanning, + tfe.RunCostEstimating, + tfe.RunCostEstimated, + tfe.RunPolicyChecking, + tfe.RunPolicyChecked, + tfe.RunPostPlanRunning, + tfe.RunPostPlanCompleted, + tfe.RunPostPlanAwaitingDecision, + tfe.RunConfirmed, + tfe.RunApplyQueued, + tfe.RunQueuingApply, + tfe.RunApplying, + tfe.RunPreApplyRunning, + tfe.RunPreApplyCompleted, + } + for _, status := range inProgressStatuses { + t.Run(string(status), func(t *testing.T) { + run := makeRun(status) + jobStatus, _ := mapRunStatus(run) + assert.Equal(t, oapi.JobStatusInProgress, jobStatus) + }) + } +} + +func TestMapRunStatus_UnknownDefaultsToInProgress(t *testing.T) { + run := &tfe.Run{Status: tfe.RunStatus("some_future_status")} + jobStatus, msg := mapRunStatus(run) + assert.Equal(t, oapi.JobStatusInProgress, jobStatus) + assert.Contains(t, msg, "some_future_status") +} + +// ===== isTerminalJobStatus ===== + +func TestIsTerminalJobStatus(t *testing.T) { + terminal := []oapi.JobStatus{ + oapi.JobStatusSuccessful, + oapi.JobStatusFailure, + oapi.JobStatusCancelled, + oapi.JobStatusExternalRunNotFound, + } + for _, s := range terminal { + assert.True(t, isTerminalJobStatus(s), "expected %s to be terminal", s) + } + + nonTerminal := []oapi.JobStatus{ + oapi.JobStatusInProgress, + oapi.JobStatusActionRequired, + oapi.JobStatusPending, + } + for _, s := range nonTerminal { + assert.False(t, isTerminalJobStatus(s), "expected %s to be non-terminal", s) + } +} + +// ===== formatResourceChanges ===== + +func TestFormatResourceChanges_NilPlan(t *testing.T) { + run := &tfe.Run{Plan: nil} + assert.Equal(t, "+0/~0/-0", formatResourceChanges(run)) +} + +func TestFormatResourceChanges_ZeroChanges(t *testing.T) { + run := &tfe.Run{Plan: &tfe.Plan{}} + assert.Equal(t, "+0/~0/-0", formatResourceChanges(run)) +} + +func TestFormatResourceChanges_NonZero(t *testing.T) { + run := &tfe.Run{Plan: &tfe.Plan{ + ResourceAdditions: 3, + ResourceDestructions: 1, + ResourceChanges: 2, + }} + assert.Equal(t, "+3/~2/-1", formatResourceChanges(run)) +} + +// ===== toCreateOptions / toUpdateOptions — workspace template ===== + +func TestWorkspaceTemplate_ToCreateOptions(t *testing.T) { + ws := &WorkspaceTemplate{ + Name: "my-workspace", + Description: "test desc", + AutoApply: true, + TerraformVersion: "1.5.0", + ExecutionMode: "remote", + AgentPoolID: "apool-123", + Project: "prj-abc", + WorkingDirectory: "infra/", + VCSRepo: &VCSRepoTemplate{ + Identifier: "org/repo", + Branch: "main", + OAuthTokenID: "ot-123", + }, + } + opts := ws.toCreateOptions() + + assert.Equal(t, "my-workspace", *opts.Name) + assert.Equal(t, "test desc", *opts.Description) + assert.True(t, *opts.AutoApply) + assert.Equal(t, "1.5.0", *opts.TerraformVersion) + assert.Equal(t, "remote", *opts.ExecutionMode) + assert.Equal(t, "apool-123", *opts.AgentPoolID) + assert.Equal(t, "prj-abc", opts.Project.ID) + assert.Equal(t, "infra/", *opts.WorkingDirectory) + require.NotNil(t, opts.VCSRepo) + assert.Equal(t, "org/repo", *opts.VCSRepo.Identifier) + assert.Equal(t, "main", *opts.VCSRepo.Branch) + assert.Equal(t, "ot-123", *opts.VCSRepo.OAuthTokenID) +} + +func TestWorkspaceTemplate_ToCreateOptions_Minimal(t *testing.T) { + ws := &WorkspaceTemplate{Name: "bare"} + opts := ws.toCreateOptions() + assert.Equal(t, "bare", *opts.Name) + assert.Nil(t, opts.ExecutionMode) + assert.Nil(t, opts.TerraformVersion) + assert.Nil(t, opts.Project) + assert.Empty(t, opts.AgentPoolID) // not set when empty + assert.Nil(t, opts.VCSRepo) +} + +func TestWorkspaceTemplate_ToUpdateOptions(t *testing.T) { + ws := &WorkspaceTemplate{ + Name: "updated-ws", + Description: "updated", + AutoApply: false, + TerraformVersion: "1.6.0", + ExecutionMode: "agent", + AgentPoolID: "apool-456", + VCSRepo: &VCSRepoTemplate{ + Identifier: "org/repo2", + Branch: "develop", + OAuthTokenID: "ot-456", + }, + } + opts := ws.toUpdateOptions() + + assert.Equal(t, "updated-ws", *opts.Name) + assert.Equal(t, "updated", *opts.Description) + assert.False(t, *opts.AutoApply) + assert.Equal(t, "1.6.0", *opts.TerraformVersion) + assert.Equal(t, "agent", *opts.ExecutionMode) + assert.Equal(t, "apool-456", *opts.AgentPoolID) + require.NotNil(t, opts.VCSRepo) + assert.Equal(t, "org/repo2", *opts.VCSRepo.Identifier) +} + +func TestWorkspaceTemplate_ToUpdateOptions_Minimal(t *testing.T) { + ws := &WorkspaceTemplate{Name: "bare"} + opts := ws.toUpdateOptions() + assert.Equal(t, "bare", *opts.Name) + assert.Nil(t, opts.ExecutionMode) + assert.Nil(t, opts.TerraformVersion) + assert.Nil(t, opts.AgentPoolID) + assert.Nil(t, opts.VCSRepo) +} + +// ===== toCreateOptions / toUpdateOptions — variable template ===== + +func TestVariableTemplate_ToCreateOptions(t *testing.T) { + v := VariableTemplate{ + Key: "AWS_REGION", + Value: "us-east-1", + Description: "AWS region", + Category: "env", + HCL: false, + Sensitive: true, + } + opts := v.toCreateOptions() + assert.Equal(t, "AWS_REGION", *opts.Key) + assert.Equal(t, "us-east-1", *opts.Value) + assert.Equal(t, "AWS region", *opts.Description) + assert.Equal(t, tfe.CategoryType("env"), *opts.Category) + assert.False(t, *opts.HCL) + assert.True(t, *opts.Sensitive) +} + +func TestVariableTemplate_ToUpdateOptions(t *testing.T) { + v := VariableTemplate{ + Key: "TF_VAR_foo", + Value: `{"bar":"baz"}`, + Description: "HCL variable", + Category: "terraform", + HCL: true, + Sensitive: false, + } + opts := v.toUpdateOptions() + assert.Equal(t, "TF_VAR_foo", *opts.Key) + assert.Equal(t, `{"bar":"baz"}`, *opts.Value) + assert.Equal(t, tfe.CategoryType("terraform"), *opts.Category) + assert.True(t, *opts.HCL) + assert.False(t, *opts.Sensitive) +} + +// ===== RestoreJobs — unit-level checks (no real TFC client) ===== + +func TestRestoreJobs_NoExternalId_SendsExternalRunNotFound(t *testing.T) { + // We can't easily call RestoreJobs without a real store + messaging producer, + // but we can verify the logic path by checking that a job without ExternalId + // would get the externalRunNotFound status. We test the status mapping instead. + assert.True(t, isTerminalJobStatus(oapi.JobStatusExternalRunNotFound)) +} + +func TestRestoreJobs_WithExternalId_IsResumable(t *testing.T) { + // Verify the run ID extraction path: a non-empty ExternalId means the job + // should be resumed (not marked as externalRunNotFound). + runID := "run-abc123" + job := &oapi.Job{ + Id: "job-1", + ExternalId: &runID, + Status: oapi.JobStatusInProgress, + } + assert.NotNil(t, job.ExternalId) + assert.NotEmpty(t, *job.ExternalId) +} + +// ===== sendJobEvent field construction ===== + +func TestSendJobEvent_TerminalSetsCompletedAt(t *testing.T) { + // Verify that isTerminalJobStatus correctly identifies statuses that should + // trigger completedAt being set in sendJobEvent + for _, status := range []oapi.JobStatus{ + oapi.JobStatusSuccessful, + oapi.JobStatusFailure, + oapi.JobStatusCancelled, + oapi.JobStatusExternalRunNotFound, + } { + assert.True(t, isTerminalJobStatus(status), "completedAt should be set for %s", status) + } +} + +func TestSendJobEvent_InProgressSetsStartedAt(t *testing.T) { + // The sendJobEvent logic sets startedAt when status == inProgress + assert.Equal(t, oapi.JobStatusInProgress, oapi.JobStatus("inProgress")) +} + +// ===== WorkspaceTemplate JSON/YAML round-trip ===== + +func TestWorkspaceTemplate_JSONRoundTrip(t *testing.T) { + ws := WorkspaceTemplate{ + Name: "test-ws", + Description: "desc", + AutoApply: true, + TerraformVersion: "1.5.0", + Variables: []VariableTemplate{ + {Key: "k1", Value: "v1", Category: "env"}, + }, + } + data, err := json.Marshal(ws) + require.NoError(t, err) + + var got WorkspaceTemplate + require.NoError(t, json.Unmarshal(data, &got)) + assert.Equal(t, ws.Name, got.Name) + assert.Equal(t, ws.AutoApply, got.AutoApply) + require.Len(t, got.Variables, 1) + assert.Equal(t, "k1", got.Variables[0].Key) +} + +// ===== Type() ===== + +func TestTFE_Type(t *testing.T) { + tfeInst := &TFE{} + assert.Equal(t, "tfe", tfeInst.Type()) +} diff --git a/apps/workspace-engine/pkg/workspace/jobagents/terraformcloud/tfe_verifications.go b/apps/workspace-engine/pkg/workspace/jobagents/terraformcloud/tfe_verifications.go deleted file mode 100644 index ab07a8842..000000000 --- a/apps/workspace-engine/pkg/workspace/jobagents/terraformcloud/tfe_verifications.go +++ /dev/null @@ -1,61 +0,0 @@ -package terraformcloud - -import ( - "context" - "fmt" - "workspace-engine/pkg/oapi" - "workspace-engine/pkg/workspace/releasemanager/verification" -) - -type TFERunVerification struct { - verifications *verification.Manager - job *oapi.Job - address string - token string - runID string -} - -func newTFERunVerification(verifications *verification.Manager, job *oapi.Job, address, token, runID string) *TFERunVerification { - return &TFERunVerification{ - verifications: verifications, - job: job, - address: address, - token: token, - runID: runID, - } -} - -func (v *TFERunVerification) StartVerification(ctx context.Context) error { - provider, err := v.buildMetricProvider() - if err != nil { - return fmt.Errorf("failed to build metric provider: %w", err) - } - - metricSpec := v.buildMetricSpec(provider) - return v.verifications.StartVerification(ctx, v.job, []oapi.VerificationMetricSpec{metricSpec}) -} - -func (v *TFERunVerification) buildMetricProvider() (oapi.MetricProvider, error) { - provider := oapi.MetricProvider{} - err := provider.FromTerraformCloudRunMetricProvider(oapi.TerraformCloudRunMetricProvider{ - Address: v.address, - Token: v.token, - RunId: v.runID, - }) - return provider, err -} - -func (v *TFERunVerification) buildMetricSpec(provider oapi.MetricProvider) oapi.VerificationMetricSpec { - failureCondition := "result.status == 'canceled' || result.status == 'discarded' || result.status == 'errored'" - successThreshold := 1 - failureThreshold := 1 - return oapi.VerificationMetricSpec{ - Count: 100, - IntervalSeconds: 60, - SuccessCondition: "result.status == 'applied' || result.status == 'planned_and_finished' || result.status == 'planned_and_saved'", - FailureCondition: &failureCondition, - SuccessThreshold: &successThreshold, - FailureThreshold: &failureThreshold, - Provider: provider, - } -} diff --git a/apps/workspace-engine/pkg/workspace/jobagents/types/types.go b/apps/workspace-engine/pkg/workspace/jobagents/types/types.go index d3239c819..a8a0aa9fd 100644 --- a/apps/workspace-engine/pkg/workspace/jobagents/types/types.go +++ b/apps/workspace-engine/pkg/workspace/jobagents/types/types.go @@ -9,3 +9,10 @@ type Dispatchable interface { Type() string Dispatch(ctx context.Context, job *oapi.Job) error } + +// Restorable is implemented by dispatchers that can resume tracking +// in-flight jobs after an engine restart. Jobs with an ExternalId are +// resumed; jobs without one are marked as externalRunNotFound. +type Restorable interface { + RestoreJobs(ctx context.Context, jobs []*oapi.Job) error +} diff --git a/apps/workspace-engine/pkg/workspace/releasemanager/manager.go b/apps/workspace-engine/pkg/workspace/releasemanager/manager.go index 65cf9980c..c76dc3fb6 100644 --- a/apps/workspace-engine/pkg/workspace/releasemanager/manager.go +++ b/apps/workspace-engine/pkg/workspace/releasemanager/manager.go @@ -28,13 +28,14 @@ import ( // Manager handles the business logic for release target changes and deployment decisions. // It coordinates the state index, eligibility checking, and execution to manage release targets. type Manager struct { - store *store.Store - planner *deployment.Planner - eligibility *deployment.JobEligibilityChecker - executor *deployment.Executor - verification *verification.Manager - traceStore PersistenceStore - stateIndex *StateIndex + store *store.Store + planner *deployment.Planner + eligibility *deployment.JobEligibilityChecker + executor *deployment.Executor + verification *verification.Manager + jobAgentRegistry *jobagents.Registry + traceStore PersistenceStore + stateIndex *StateIndex } var tracer = otel.Tracer("workspace/releasemanager") @@ -64,13 +65,14 @@ func New(store *store.Store, traceStore PersistenceStore, verificationManager *v verificationManager.SetHooks(compositeHooks) return &Manager{ - store: store, - planner: planner, - eligibility: eligibility, - executor: executor, - verification: verificationManager, - traceStore: traceStore, - stateIndex: stateIndex, + store: store, + planner: planner, + eligibility: eligibility, + executor: executor, + verification: verificationManager, + jobAgentRegistry: jobAgentRegistry, + traceStore: traceStore, + stateIndex: stateIndex, } } @@ -526,6 +528,8 @@ func (m *Manager) Restore(ctx context.Context) error { ctx, span := tracer.Start(ctx, "ReleaseManager.Restore") defer span.End() + m.jobAgentRegistry.RestoreJobs(ctx) + if err := m.verification.Restore(ctx); err != nil { span.RecordError(err) span.SetStatus(codes.Error, "failed to restore verifications") diff --git a/apps/workspace-engine/svc/controllers/deploymentresourceselectoreval/controller.go b/apps/workspace-engine/svc/controllers/deploymentresourceselectoreval/controller.go index a920c7b1e..acd520742 100644 --- a/apps/workspace-engine/svc/controllers/deploymentresourceselectoreval/controller.go +++ b/apps/workspace-engine/svc/controllers/deploymentresourceselectoreval/controller.go @@ -90,6 +90,9 @@ func (c *Controller) Process(ctx context.Context, item workqueue.Item) error { } } + // TODO: use resourceMatchedIds to update deployment-resource associations Fixes a lint error + _ = resourceMatchedIds + return nil } diff --git a/apps/workspace-engine/svc/http/server/openapi/deployments/server.go b/apps/workspace-engine/svc/http/server/openapi/deployments/server.go index c84c959a5..8f04ffd19 100644 --- a/apps/workspace-engine/svc/http/server/openapi/deployments/server.go +++ b/apps/workspace-engine/svc/http/server/openapi/deployments/server.go @@ -398,6 +398,7 @@ func (s *Deployments) GetReleaseTargetsForDeployment(c *gin.Context, workspaceId item.LatestJob = &oapi.JobSummary{ Id: state.LatestJob.Job.Id, Links: &map[string]string{}, + Message: state.LatestJob.Job.Message, Status: state.LatestJob.Job.Status, Verifications: state.LatestJob.Verifications, } From 80309dc1fcae3ac35f6df825bf5f55685b103888 Mon Sep 17 00:00:00 2001 From: Zachary Blasczyk Date: Wed, 25 Feb 2026 13:14:55 -0600 Subject: [PATCH 4/4] refactor --- .../workspace/jobagents/terraformcloud/tfe.go | 123 ++++++++++-------- 1 file changed, 66 insertions(+), 57 deletions(-) diff --git a/apps/workspace-engine/pkg/workspace/jobagents/terraformcloud/tfe.go b/apps/workspace-engine/pkg/workspace/jobagents/terraformcloud/tfe.go index 84672d5ac..04c07ef83 100644 --- a/apps/workspace-engine/pkg/workspace/jobagents/terraformcloud/tfe.go +++ b/apps/workspace-engine/pkg/workspace/jobagents/terraformcloud/tfe.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" "maps" "strings" @@ -192,80 +193,73 @@ func isTerminalJobStatus(status oapi.JobStatus) bool { } } +// Static status mappings for simple TFC run states. +var runStatusMap = map[tfe.RunStatus]struct { + status oapi.JobStatus + message string +}{ + // Pending + tfe.RunPending: {oapi.JobStatusPending, "Run pending in queue"}, + + // In-progress phases + tfe.RunFetching: {oapi.JobStatusInProgress, "Fetching configuration..."}, + tfe.RunFetchingCompleted: {oapi.JobStatusInProgress, "Fetching configuration..."}, + tfe.RunPrePlanRunning: {oapi.JobStatusInProgress, "Running pre-plan tasks..."}, + tfe.RunPrePlanCompleted: {oapi.JobStatusInProgress, "Running pre-plan tasks..."}, + tfe.RunQueuing: {oapi.JobStatusInProgress, "Queued for planning..."}, + tfe.RunPlanQueued: {oapi.JobStatusInProgress, "Queued for planning..."}, + tfe.RunPlanning: {oapi.JobStatusInProgress, "Planning..."}, + tfe.RunCostEstimating: {oapi.JobStatusInProgress, "Estimating costs..."}, + tfe.RunCostEstimated: {oapi.JobStatusInProgress, "Estimating costs..."}, + tfe.RunPolicyChecking: {oapi.JobStatusInProgress, "Checking policies..."}, + tfe.RunPolicyChecked: {oapi.JobStatusInProgress, "Checking policies..."}, + tfe.RunPostPlanRunning: {oapi.JobStatusInProgress, "Running post-plan tasks..."}, + tfe.RunPostPlanCompleted: {oapi.JobStatusInProgress, "Running post-plan tasks..."}, + tfe.RunPostPlanAwaitingDecision: {oapi.JobStatusInProgress, "Running post-plan tasks..."}, + tfe.RunConfirmed: {oapi.JobStatusInProgress, "Confirmed, queuing apply..."}, + tfe.RunApplyQueued: {oapi.JobStatusInProgress, "Queued for apply..."}, + tfe.RunQueuingApply: {oapi.JobStatusInProgress, "Queued for apply..."}, + tfe.RunApplying: {oapi.JobStatusInProgress, "Applying..."}, + tfe.RunPreApplyRunning: {oapi.JobStatusInProgress, "Running pre-apply tasks..."}, + tfe.RunPreApplyCompleted: {oapi.JobStatusInProgress, "Running pre-apply tasks..."}, + + // Action required + tfe.RunPolicyOverride: {oapi.JobStatusActionRequired, "Policy check failed — awaiting override"}, + tfe.RunPolicySoftFailed: {oapi.JobStatusActionRequired, "Policy soft-failed — awaiting override"}, + + // Terminal: cancelled + tfe.RunCanceled: {oapi.JobStatusCancelled, "Run was canceled"}, + tfe.RunDiscarded: {oapi.JobStatusCancelled, "Run was discarded"}, +} + func mapRunStatus(run *tfe.Run) (oapi.JobStatus, string) { changes := formatResourceChanges(run) + // States that need dynamic messages switch run.Status { case tfe.RunPending: - pos := run.PositionInQueue - return oapi.JobStatusPending, fmt.Sprintf("Run pending in queue (position: %d)", pos) - - case tfe.RunFetching, tfe.RunFetchingCompleted: - return oapi.JobStatusInProgress, "Fetching configuration..." - - case tfe.RunPrePlanRunning, tfe.RunPrePlanCompleted: - return oapi.JobStatusInProgress, "Running pre-plan tasks..." - - case tfe.RunQueuing, tfe.RunPlanQueued: - return oapi.JobStatusInProgress, "Queued for planning..." - - case tfe.RunPlanning: - return oapi.JobStatusInProgress, "Planning..." - + return oapi.JobStatusPending, fmt.Sprintf("Run pending in queue (position: %d)", run.PositionInQueue) case tfe.RunPlanned: if run.Actions != nil && run.Actions.IsConfirmable { return oapi.JobStatusActionRequired, fmt.Sprintf("Plan complete — awaiting approval. %s", changes) } return oapi.JobStatusInProgress, "Plan complete, auto-applying..." - case tfe.RunPlannedAndFinished: return oapi.JobStatusSuccessful, fmt.Sprintf("Plan complete (no changes). %s", changes) - case tfe.RunPlannedAndSaved: return oapi.JobStatusSuccessful, fmt.Sprintf("Plan saved. %s", changes) - - case tfe.RunCostEstimating, tfe.RunCostEstimated: - return oapi.JobStatusInProgress, "Estimating costs..." - - case tfe.RunPolicyChecking, tfe.RunPolicyChecked: - return oapi.JobStatusInProgress, "Checking policies..." - - case tfe.RunPolicyOverride: - return oapi.JobStatusActionRequired, "Policy check failed — awaiting override" - - case tfe.RunPolicySoftFailed: - return oapi.JobStatusActionRequired, "Policy soft-failed — awaiting override" - - case tfe.RunPostPlanRunning, tfe.RunPostPlanCompleted, tfe.RunPostPlanAwaitingDecision: - return oapi.JobStatusInProgress, "Running post-plan tasks..." - - case tfe.RunConfirmed: - return oapi.JobStatusInProgress, "Confirmed, queuing apply..." - - case tfe.RunApplyQueued, tfe.RunQueuingApply: - return oapi.JobStatusInProgress, "Queued for apply..." - - case tfe.RunApplying: - return oapi.JobStatusInProgress, "Applying..." - case tfe.RunApplied: return oapi.JobStatusSuccessful, fmt.Sprintf("Applied successfully. %s", changes) - - case tfe.RunPreApplyRunning, tfe.RunPreApplyCompleted: - return oapi.JobStatusInProgress, "Running pre-apply tasks..." - case tfe.RunErrored: return oapi.JobStatusFailure, fmt.Sprintf("Run errored: %s", run.Message) + } - case tfe.RunCanceled: - return oapi.JobStatusCancelled, "Run was canceled" - - case tfe.RunDiscarded: - return oapi.JobStatusCancelled, "Run was discarded" - - default: - return oapi.JobStatusInProgress, fmt.Sprintf("Run status: %s", run.Status) + // Static mappings + if entry, ok := runStatusMap[run.Status]; ok { + return entry.status, entry.message } + + return oapi.JobStatusInProgress, fmt.Sprintf("Run status: %s", run.Status) } func formatResourceChanges(run *tfe.Run) string { @@ -425,7 +419,7 @@ func (t *TFE) getTemplatedWorkspace(job *oapi.Job, template string) (*WorkspaceT func (t *TFE) upsertWorkspace(ctx context.Context, client *tfe.Client, organization string, workspace *WorkspaceTemplate) (*tfe.Workspace, error) { existing, err := client.Workspaces.Read(ctx, organization, workspace.Name) - if err != nil && err.Error() != "resource not found" { + if err != nil && !errors.Is(err, tfe.ErrResourceNotFound) { return nil, fmt.Errorf("failed to read workspace: %w", err) } @@ -456,6 +450,9 @@ func (t *TFE) syncVariables(ctx context.Context, client *tfe.Client, workspaceID } for _, desired := range desiredVars { + if _, err := desired.categoryType(); err != nil { + return err + } if existing, ok := existingByKey[desired.Key]; ok { _, err := client.Variables.Update(ctx, workspaceID, existing.ID, desired.toUpdateOptions()) if err != nil { @@ -563,8 +560,20 @@ func (w *WorkspaceTemplate) toUpdateOptions() tfe.WorkspaceUpdateOptions { return opts } +var validCategories = map[string]tfe.CategoryType{ + "terraform": tfe.CategoryTerraform, + "env": tfe.CategoryEnv, +} + +func (v *VariableTemplate) categoryType() (tfe.CategoryType, error) { + if ct, ok := validCategories[v.Category]; ok { + return ct, nil + } + return "", fmt.Errorf("invalid variable category %q for key %q (must be \"terraform\" or \"env\")", v.Category, v.Key) +} + func (v *VariableTemplate) toCreateOptions() tfe.VariableCreateOptions { - category := tfe.CategoryType(v.Category) + category, _ := v.categoryType() return tfe.VariableCreateOptions{ Key: &v.Key, Value: &v.Value, @@ -576,7 +585,7 @@ func (v *VariableTemplate) toCreateOptions() tfe.VariableCreateOptions { } func (v *VariableTemplate) toUpdateOptions() tfe.VariableUpdateOptions { - category := tfe.CategoryType(v.Category) + category, _ := v.categoryType() return tfe.VariableUpdateOptions{ Key: &v.Key, Value: &v.Value,