From 1c42ae440fbf96730aacea6cc685d2360b6a58d2 Mon Sep 17 00:00:00 2001 From: KshitijaChoudhari Date: Fri, 6 Mar 2026 12:27:37 +0530 Subject: [PATCH 1/3] WIP: reapply local changes on bug/agentpoolworkspaces --- examples/run_tasks_integration.py | 265 +++++++++++++++ src/pytfe/models/__init__.py | 25 ++ src/pytfe/resources/run_tasks_integration.py | 68 ++++ tests/units/test_run_tasks_integration.py | 324 +++++++++++++++++++ 4 files changed, 682 insertions(+) create mode 100644 examples/run_tasks_integration.py create mode 100644 src/pytfe/resources/run_tasks_integration.py create mode 100644 tests/units/test_run_tasks_integration.py diff --git a/examples/run_tasks_integration.py b/examples/run_tasks_integration.py new file mode 100644 index 0000000..878d38e --- /dev/null +++ b/examples/run_tasks_integration.py @@ -0,0 +1,265 @@ +#!/usr/bin/env python +""" +Run Tasks Integration Example - Real TFC/TFE Testing + +This example shows how to create a webhook server that integrates with +Terraform Cloud/Enterprise run tasks to validate runs and send results back. + +STEP-BY-STEP TESTING WITH REAL TFC/TFE: + +1. START THE SERVER: + python examples/run_tasks_integration.py --port 8888 + +2. MAKE IT ACCESSIBLE (choose one): + + Option A - Using ngrok (for local testing): + - Install: https://ngrok.com/download + - Run: ngrok http 8888 + - Copy the public URL (e.g., https://abc123.ngrok.io) + + Option B - Deploy to cloud (recommended for production): + + AWS EC2: + - Launch EC2 instance (t2.micro sufficient for testing) + - Upload this file: scp run_tasks_integration.py ec2-user@YOUR-IP:~/ + - SSH in: ssh ec2-user@YOUR-IP + - Install Python 3.11+: sudo dnf install python3.11 python3.11-pip + - Install dependencies: python3.11 -m pip install --user pytfe + - Run server: python3.11 run_tasks_integration.py --port 8888 + - Configure security group: Allow port 8888 from 0.0.0.0/0 + - Use public IP: http://YOUR-EC2-IP:8888 + + Heroku (easiest): + - Create Procfile: web: python run_tasks_integration.py --port $PORT + - Create requirements.txt: pytfe>=0.1.0 + - Deploy: git push heroku main + - Use Heroku URL: https://your-app.herokuapp.com + + Google Cloud Run: + - Create Dockerfile: FROM python:3.11 / RUN pip install pytfe / COPY . . / CMD ["python", "run_tasks_integration.py", "--port", "8080"] + - Deploy: gcloud run deploy --source . + - Use Cloud Run URL: https://your-service-hash.run.app + + DigitalOcean Droplet: + - Create Ubuntu droplet + - Upload file and install Python/pytfe + - Run with: python3 run_tasks_integration.py --port 8888 + - Use droplet IP: http://YOUR-DROPLET-IP:8888 + + Benefits of cloud deployment: + - Permanent URL (no ngrok reconnections) + - Better reliability and uptime + - Can handle production workloads + - SSL/HTTPS support available + - Scalable if needed + +3. CREATE RUN TASK IN TFC/TFE: + - Go to: https://app.terraform.io/app/YOUR_ORG/settings/tasks + - Click "Create run task" + - Name: "python-tfe-test" + - URL: Your public URL from step 2 + - Save and wait for verification (check mark) + +4. ATTACH TO WORKSPACE: + - Go to workspace settings → Run Tasks + - Click "Add run task" + - Select "python-tfe-test" + - Enforcement: Advisory (for testing) + - Stage: Pre-plan + - Save + +5. TRIGGER A RUN: + - Go to your workspace + - Click "Actions" → "Start new run" + - Watch this terminal for webhook activity! + - Check TFC/TFE UI for run task results + +CUSTOMIZE VALIDATION LOGIC: +Edit the section around line 80 to add your custom checks: +- Cost validation +- Security scanning (Checkov, tfsec) +- Policy enforcement +- Custom approval workflows + +API Documentation: + https://developer.hashicorp.com/terraform/enterprise/api-docs/run-tasks/run-tasks-integration +""" + +import json +from http.server import BaseHTTPRequestHandler, HTTPServer + +from pytfe import TFEClient +from pytfe.models import RunTaskRequest +from pytfe.models import ( + TaskResultCallbackOptions, + TaskResultOutcome, + TaskResultTag, +) + + +class RunTaskHandler(BaseHTTPRequestHandler): + """HTTP handler for run task callbacks from TFC/TFE.""" + + def do_POST(self): + """Handle POST request from TFC/TFE run task webhook.""" + # Read the request body + content_length = int(self.headers["Content-Length"]) + body = self.rfile.read(content_length) + + try: + # Parse the incoming run task request + payload = json.loads(body) + print("\n" + "=" * 60) + print("Received Run Task Request") + print("=" * 60) + + # Parse into RunTaskRequest model + request = RunTaskRequest.model_validate(payload) + + print(f"Run ID: {request.run_id}") + print(f"Organization: {request.organization_name}") + print(f"Workspace: {request.workspace_name}") + print(f"Workspace ID: {request.workspace_id}") + print(f"Stage: {request.stage}") + print(f"Callback URL: {request.task_result_callback_url}") + print(f"Is Speculative: {request.is_speculative}") + + # Handle verification requests (test webhooks from TFC/TFE) + if ( + request.organization_name == "test-org" + or request.workspace_name == "test-workspace" + ): + print("\n[OK] Verification request detected - responding with 200 OK") + print("=" * 60 + "\n") + self.send_response(200) + self.send_header("Content-type", "application/json") + self.end_headers() + self.wfile.write(json.dumps({"status": "ok"}).encode()) + return + + # =============================================================== + # CUSTOMIZE YOUR VALIDATION LOGIC HERE + # =============================================================== + # This is where you add your custom checks and validation. + # Examples: + # + # 1. Cost Control: + # if estimated_cost > 1000: + # result_status = "failed" + # result_message = f"Cost ${estimated_cost} exceeds limit" + # + # 2. Security Scanning: + # scan_results = run_checkov(request.configuration_version_download_url) + # if scan_results.failed: + # result_status = "failed" + # result_message = "Security scan failed" + # + # 3. Policy Enforcement: + # if not workspace_has_required_tags(request.workspace_name): + # result_status = "failed" + # result_message = "Workspace missing required tags" + # + # 4. Custom Approval: + # if request.workspace_name.startswith("prod-"): + # result_status = "failed" + # result_message = "Production changes require manual approval" + + # For this example, we'll just pass the task + result_status = "passed" + result_message = "All checks passed successfully" + + # Create detailed outcomes (optional but recommended) + outcomes = [ + TaskResultOutcome( + outcome_id="check-1", + description="Configuration validation passed", + body="All Terraform configurations are valid and follow best practices.", + url="https://example.com/results/check-1", + tags={ + "Status": [TaskResultTag(label="Passed", level="info")], + "Category": [TaskResultTag(label="Validation")], + }, + ) + ] + + # Create callback options + callback_options = TaskResultCallbackOptions( + status=result_status, + message=result_message, + url="https://example.com/full-results", + outcomes=outcomes, + ) + + # Initialize client and send callback + print("\nInitializing TFEClient...") + print(f"Access token from webhook: {request.access_token[:10]}***") + client = TFEClient() + print("Client initialized successfully") + + print(f"Sending callback to: {request.task_result_callback_url[:50]}...") + client.run_tasks_integration.callback( + callback_url=request.task_result_callback_url, + access_token=request.access_token, + options=callback_options, + ) + + print(f"\n[SUCCESS] Callback sent successfully: {result_status}") + print("=" * 60 + "\n") + + # Respond to TFC/TFE + self.send_response(200) + self.send_header("Content-type", "application/json") + self.end_headers() + self.wfile.write(json.dumps({"status": "received"}).encode()) + + except Exception as e: + print(f"Error processing request: {e}") + self.send_response(500) + self.send_header("Content-type", "application/json") + self.end_headers() + self.wfile.write(json.dumps({"error": str(e)}).encode()) + + def log_message(self, format, *args): + """Suppress default HTTP logging.""" + pass + + +def run_server(port=8080): + """Start the run task callback server.""" + server_address = ("", port) + httpd = HTTPServer(server_address, RunTaskHandler) + + print("=" * 60) + print("Run Tasks Integration Callback Server") + print("=" * 60) + print(f"Listening on http://localhost:{port}") + print("\nFor local testing:") + print(" 1. Use ngrok or similar tool to expose this server:") + print(f" ngrok http {port}") + print(" 2. Configure your run task in TFC/TFE with the ngrok URL") + print(" 3. Trigger a run in your workspace") + print("\nWaiting for requests from TFC/TFE...") + print("=" * 60 + "\n") + + try: + httpd.serve_forever() + except KeyboardInterrupt: + print("\n\nShutting down server...") + httpd.shutdown() + + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser( + description="Run Tasks Integration callback server" + ) + parser.add_argument( + "--port", + type=int, + default=8080, + help="Port to listen on (default: 8080)", + ) + args = parser.parse_args() + + run_server(port=args.port) diff --git a/src/pytfe/models/__init__.py b/src/pytfe/models/__init__.py index 8524e6b..6449b6a 100644 --- a/src/pytfe/models/__init__.py +++ b/src/pytfe/models/__init__.py @@ -273,6 +273,15 @@ Stage, TaskEnforcementLevel, ) +from .run_task_request import ( + RunTaskRequest, + RunTaskRequestCapabilities, +) +from .run_tasks_integration import ( + TaskResultCallbackOptions, + TaskResultOutcome, + TaskResultTag, +) from .run_trigger import ( RunTrigger, RunTriggerCreateOptions, @@ -580,6 +589,22 @@ "RunTaskCreateOptions", "RunTaskUpdateOptions", "RunTaskReadOptions", + "RunTaskRequest", + "RunTaskRequestCapabilities", + "TaskResultCallbackOptions", + "TaskResultOutcome", + "TaskResultTag", + "TaskResult", + "TaskResultStatus", + "TaskResultStatusTimestamps", + "Actions", + "Permissions", + "TaskStage", + "TaskStageListOptions", + "TaskStageOverrideOptions", + "TaskStageReadOptions", + "TaskStageStatus", + "TaskStageStatusTimestamps", # Run triggers "RunTrigger", "RunTriggerCreateOptions", diff --git a/src/pytfe/resources/run_tasks_integration.py b/src/pytfe/resources/run_tasks_integration.py new file mode 100644 index 0000000..7478a1a --- /dev/null +++ b/src/pytfe/resources/run_tasks_integration.py @@ -0,0 +1,68 @@ +"""Run Tasks Integration resource for python-tfe. + +This module provides the callback functionality for external run task servers +to send results back to Terraform Cloud/Enterprise. +""" + +from __future__ import annotations + +from typing import Any + +from ..errors import TFEError +from ..models.task_result import TaskResultStatus +from ..models.run_tasks_integration import ( + TaskResultTag, + TaskResultOutcome, + TaskResultCallbackOptions, +) +from ._base import _Service + + +class RunTasksIntegration(_Service): + """Run Tasks Integration API for sending callbacks to TFC/TFE. + + This service is used by external run task servers to send task results + back to Terraform Cloud/Enterprise. + + API Documentation: + https://developer.hashicorp.com/terraform/enterprise/api-docs/run-tasks/run-tasks-integration + """ + + def callback( + self, + callback_url: str, + access_token: str, + options: TaskResultCallbackOptions, + ) -> None: + """Send task result callback to TFC/TFE. + + Args: + callback_url: The callback URL from the run task request + access_token: The access token from the run task request + options: Task result callback options + + Raises: + TFEError: If callback_url or access_token is invalid + TFEError: If options validation fails + """ + if not callback_url or not callback_url.strip(): + raise TFEError("callback_url cannot be empty") + + if not access_token or not access_token.strip(): + raise TFEError("access_token cannot be empty") + + options.validate() + + # Create custom headers with the access token from the request + headers = { + "Authorization": f"Bearer {access_token}", + "Content-Type": "application/vnd.api+json", + } + + # Send PATCH request to callback URL + self.t.request( + "PATCH", + callback_url, + json_body=options.to_dict(), + headers=headers, + ) diff --git a/tests/units/test_run_tasks_integration.py b/tests/units/test_run_tasks_integration.py new file mode 100644 index 0000000..15bef5c --- /dev/null +++ b/tests/units/test_run_tasks_integration.py @@ -0,0 +1,324 @@ +"""Unit tests for Run Tasks Integration.""" + +from __future__ import annotations + +from unittest.mock import MagicMock + +import pytest + +from pytfe.errors import TFEError +from pytfe.models.run_task_request import RunTaskRequest +from pytfe.models import ( + TaskResultCallbackOptions, + TaskResultOutcome, + TaskResultStatus, + TaskResultTag, +) +from pytfe.resources.run_tasks_integration import RunTasksIntegration + + +class TestRunTaskRequest: + """Tests for RunTaskRequest model.""" + + def test_run_task_request_minimal(self): + """Test parsing minimal run task request.""" + data = { + "access_token": "test-token-123", + "is_speculative": False, + "organization_name": "my-org", + "payload_version": 1, + "run_app_url": "https://app.terraform.io/app/my-org/my-workspace/runs/run-123", + "run_created_at": "2025-12-22T10:00:00Z", + "run_created_by": "user@example.com", + "run_id": "run-123", + "run_message": "Test run", + "stage": "post_plan", + "task_result_callback_url": "https://app.terraform.io/api/v2/task-results/tr-123/callback", + "task_result_enforcement_level": "mandatory", + "task_result_id": "tr-123", + "workspace_app_url": "https://app.terraform.io/app/my-org/my-workspace", + "workspace_id": "ws-123", + "workspace_name": "my-workspace", + } + + request = RunTaskRequest(**data) + + assert request.access_token == "test-token-123" + assert request.organization_name == "my-org" + assert request.run_id == "run-123" + assert request.stage == "post_plan" + assert ( + request.task_result_callback_url + == "https://app.terraform.io/api/v2/task-results/tr-123/callback" + ) + + def test_run_task_request_complete(self): + """Test parsing complete run task request with all fields.""" + data = { + "access_token": "test-token-456", + "capabilities": {"outcomes": True}, + "configuration_version_download_url": "https://app.terraform.io/api/v2/configuration-versions/cv-123/download", + "configuration_version_id": "cv-123", + "is_speculative": True, + "organization_name": "test-org", + "payload_version": 1, + "plan_json_api_url": "https://app.terraform.io/api/v2/plans/plan-123/json-output", + "run_app_url": "https://app.terraform.io/app/test-org/test-workspace/runs/run-456", + "run_created_at": "2025-12-22T11:30:00Z", + "run_created_by": "admin@example.com", + "run_id": "run-456", + "run_message": "Test with VCS", + "stage": "pre_plan", + "task_result_callback_url": "https://app.terraform.io/api/v2/task-results/tr-456/callback", + "task_result_enforcement_level": "advisory", + "task_result_id": "tr-456", + "vcs_branch": "main", + "vcs_commit_url": "https://github.com/org/repo/commit/abc123", + "vcs_pull_request_url": "https://github.com/org/repo/pull/42", + "vcs_repo_url": "https://github.com/org/repo", + "workspace_app_url": "https://app.terraform.io/app/test-org/test-workspace", + "workspace_id": "ws-456", + "workspace_name": "test-workspace", + "workspace_working_directory": "terraform/", + } + + request = RunTaskRequest(**data) + + assert request.access_token == "test-token-456" + assert request.capabilities is not None + assert request.capabilities.outcomes is True + assert request.configuration_version_id == "cv-123" + assert request.vcs_branch == "main" + assert request.vcs_commit_url == "https://github.com/org/repo/commit/abc123" + assert request.workspace_working_directory == "terraform/" + + +class TestTaskResultTag: + """Tests for TaskResultTag.""" + + def test_tag_with_level(self): + """Test tag with level.""" + tag = TaskResultTag(label="High", level="error") + data = tag.to_dict() + + assert data["label"] == "High" + assert data["level"] == "error" + + def test_tag_without_level(self): + """Test tag without level.""" + tag = TaskResultTag(label="Passed") + data = tag.to_dict() + + assert data["label"] == "Passed" + assert "level" not in data + + +class TestTaskResultOutcome: + """Tests for TaskResultOutcome.""" + + def test_outcome_complete(self): + """Test complete outcome with all fields.""" + tags = { + "Status": [TaskResultTag(label="Failed", level="error")], + "Severity": [TaskResultTag(label="High", level="error")], + } + + outcome = TaskResultOutcome( + outcome_id="ISSUE-123", + description="Security issue found", + body="# Details\n\nSecurity vulnerability detected.", + url="https://example.com/issues/123", + tags=tags, + ) + + data = outcome.to_dict() + + assert data["type"] == "task-result-outcomes" + assert data["attributes"]["outcome-id"] == "ISSUE-123" + assert data["attributes"]["description"] == "Security issue found" + assert ( + data["attributes"]["body"] + == "# Details\n\nSecurity vulnerability detected." + ) + assert data["attributes"]["url"] == "https://example.com/issues/123" + assert "Status" in data["attributes"]["tags"] + + def test_outcome_minimal(self): + """Test minimal outcome.""" + outcome = TaskResultOutcome() + data = outcome.to_dict() + + assert data["type"] == "task-result-outcomes" + assert "attributes" in data + + +class TestTaskResultCallbackOptions: + """Tests for TaskResultCallbackOptions.""" + + def test_callback_options_passed(self): + """Test callback options with passed status.""" + options = TaskResultCallbackOptions( + status=TaskResultStatus.PASSED, + message="All checks passed", + url="https://example.com/results/123", + ) + + options.validate() + data = options.to_dict() + + assert data["data"]["type"] == "task-results" + assert data["data"]["attributes"]["status"] == "passed" + assert data["data"]["attributes"]["message"] == "All checks passed" + assert data["data"]["attributes"]["url"] == "https://example.com/results/123" + + def test_callback_options_with_outcomes(self): + """Test callback options with outcomes.""" + outcome = TaskResultOutcome( + outcome_id="ISSUE-1", + description="Test issue", + ) + + options = TaskResultCallbackOptions( + status=TaskResultStatus.FAILED, + message="1 issue found", + outcomes=[outcome], + ) + + data = options.to_dict() + + assert "relationships" in data["data"] + assert "outcomes" in data["data"]["relationships"] + assert len(data["data"]["relationships"]["outcomes"]["data"]) == 1 + + def test_validate_invalid_status(self): + """Test validation fails with invalid status.""" + options = TaskResultCallbackOptions(status="invalid") + + with pytest.raises(TFEError) as exc_info: + options.validate() + + assert "Invalid task result status" in str(exc_info.value) + + def test_validate_valid_statuses(self): + """Test validation passes with all valid statuses.""" + for status in [ + TaskResultStatus.PASSED, + TaskResultStatus.FAILED, + TaskResultStatus.RUNNING, + ]: + options = TaskResultCallbackOptions(status=status) + options.validate() # Should not raise + + +class TestRunTasksIntegration: + """Tests for RunTasksIntegration service.""" + + def test_callback_success(self): + """Test successful callback.""" + mock_transport = MagicMock() + integration = RunTasksIntegration(mock_transport) + + options = TaskResultCallbackOptions( + status=TaskResultStatus.PASSED, + message="All tests passed", + ) + + integration.callback( + callback_url="https://app.terraform.io/api/v2/task-results/tr-123/callback", + access_token="test-token-123", + options=options, + ) + + # Verify request was made + mock_transport.request.assert_called_once() + call_args = mock_transport.request.call_args + + assert call_args[0][0] == "PATCH" + assert ( + call_args[0][1] + == "https://app.terraform.io/api/v2/task-results/tr-123/callback" + ) + assert "Authorization" in call_args[1]["headers"] + assert call_args[1]["headers"]["Authorization"] == "Bearer test-token-123" + + def test_callback_empty_url(self): + """Test callback fails with empty URL.""" + mock_transport = MagicMock() + integration = RunTasksIntegration(mock_transport) + + options = TaskResultCallbackOptions(status=TaskResultStatus.PASSED) + + with pytest.raises(TFEError) as exc_info: + integration.callback( + callback_url="", + access_token="test-token", + options=options, + ) + + assert "callback_url cannot be empty" in str(exc_info.value) + + def test_callback_empty_token(self): + """Test callback fails with empty token.""" + mock_transport = MagicMock() + integration = RunTasksIntegration(mock_transport) + + options = TaskResultCallbackOptions(status=TaskResultStatus.PASSED) + + with pytest.raises(TFEError) as exc_info: + integration.callback( + callback_url="https://example.com/callback", + access_token="", + options=options, + ) + + assert "access_token cannot be empty" in str(exc_info.value) + + def test_callback_invalid_status(self): + """Test callback fails with invalid status.""" + mock_transport = MagicMock() + integration = RunTasksIntegration(mock_transport) + + options = TaskResultCallbackOptions(status="invalid-status") + + with pytest.raises(TFEError) as exc_info: + integration.callback( + callback_url="https://example.com/callback", + access_token="test-token", + options=options, + ) + + assert "Invalid task result status" in str(exc_info.value) + + def test_callback_with_outcomes(self): + """Test callback with detailed outcomes.""" + mock_transport = MagicMock() + integration = RunTasksIntegration(mock_transport) + + outcome = TaskResultOutcome( + outcome_id="CHECK-1", + description="Policy violation", + body="## Issue\n\nPolicy check failed.", + url="https://example.com/check-1", + tags={ + "Severity": [TaskResultTag(label="High", level="error")], + }, + ) + + options = TaskResultCallbackOptions( + status=TaskResultStatus.FAILED, + message="Policy check failed", + url="https://example.com/results", + outcomes=[outcome], + ) + + integration.callback( + callback_url="https://app.terraform.io/api/v2/task-results/tr-123/callback", + access_token="test-token-123", + options=options, + ) + + call_args = mock_transport.request.call_args + body = call_args[1]["json_body"] + + assert "relationships" in body["data"] + assert "outcomes" in body["data"]["relationships"] From ebe6696a841eb330a87afadadc8224553560117f Mon Sep 17 00:00:00 2001 From: KshitijaChoudhari Date: Mon, 9 Mar 2026 10:54:50 +0530 Subject: [PATCH 2/3] chore: apply ruff formatting --- examples/agent_pool.py | 34 +- examples/run_tasks_integration.py | 265 --------------- src/pytfe/models/__init__.py | 25 -- src/pytfe/models/agent.py | 8 + src/pytfe/resources/agent_pools.py | 105 ++++-- src/pytfe/resources/run_tasks_integration.py | 68 ---- tests/units/test_agent_pools.py | 85 ++++- tests/units/test_run_tasks_integration.py | 324 ------------------- 8 files changed, 201 insertions(+), 713 deletions(-) delete mode 100644 examples/run_tasks_integration.py delete mode 100644 src/pytfe/resources/run_tasks_integration.py delete mode 100644 tests/units/test_run_tasks_integration.py diff --git a/examples/agent_pool.py b/examples/agent_pool.py index bbaf14e..4fb1509 100644 --- a/examples/agent_pool.py +++ b/examples/agent_pool.py @@ -3,13 +3,14 @@ This example demonstrates: 1. Agent Pool CRUD operations (Create, Read, Update, Delete) 2. Agent token creation and management -3. Using the organization SDK client +3. Workspace assignment using assign_to_workspaces and remove_from_workspaces 4. Proper error handling Make sure to set the following environment variables: - TFE_TOKEN: Your Terraform Cloud/Enterprise API token - TFE_ADDRESS: Your Terraform Cloud/Enterprise URL (optional, defaults to https://app.terraform.io) - TFE_ORG: Your organization name +- TFE_WORKSPACE_ID: A workspace ID for testing workspace assignment (optional) Usage: export TFE_TOKEN="your-token-here" @@ -24,8 +25,10 @@ from pytfe.errors import NotFound from pytfe.models import ( AgentPoolAllowedWorkspacePolicy, + AgentPoolAssignToWorkspacesOptions, AgentPoolCreateOptions, AgentPoolListOptions, + AgentPoolRemoveFromWorkspacesOptions, AgentPoolUpdateOptions, AgentTokenCreateOptions, ) @@ -37,6 +40,9 @@ def main(): token = os.environ.get("TFE_TOKEN") org = os.environ.get("TFE_ORG") address = os.environ.get("TFE_ADDRESS", "https://app.terraform.io") + workspace_id = os.environ.get( + "TFE_WORKSPACE_ID" + ) # optional, for workspace assignment if not token: print("TFE_TOKEN environment variable is required") @@ -96,7 +102,27 @@ def main(): updated_pool = client.agent_pools.update(new_pool.id, update_options) print(f"Updated agent pool name to: {updated_pool.name}") - # Example 5: Create an agent token + # Example 5: Workspace assignment + # assign_to_workspaces sends PATCH /agent-pools/:id with relationships.allowed-workspaces + # remove_from_workspaces sends PATCH /agent-pools/:id with relationships.excluded-workspaces + if workspace_id: + print("\n Assigning workspace to agent pool...") + client.agent_pools.assign_to_workspaces( + new_pool.id, + AgentPoolAssignToWorkspacesOptions(workspace_ids=[workspace_id]), + ) + print(f" Assigned workspace {workspace_id} to pool") + + print("\n Removing workspace from agent pool...") + client.agent_pools.remove_from_workspaces( + new_pool.id, + AgentPoolRemoveFromWorkspacesOptions(workspace_ids=[workspace_id]), + ) + print(f" Excluded workspace {workspace_id} from pool") + else: + print("\n Skipping workspace assignment (set TFE_WORKSPACE_ID to test)") + + # Example 6: Create an agent token print("\n Creating agent token...") token_options = AgentTokenCreateOptions( description="SDK example token" # Optional description @@ -107,7 +133,7 @@ def main(): if agent_token.token: print(f" Token (first 10 chars): {agent_token.token[:10]}...") - # Example 6: List agent tokens + # Example 7: List agent tokens print("\n Listing agent tokens...") tokens = client.agent_tokens.list(new_pool.id) @@ -117,7 +143,7 @@ def main(): for token in token_list: print(f" - {token.description or 'No description'} (ID: {token.id})") - # Example 7: Clean up - delete the token and pool + # Example 8: Clean up - delete the token and pool print("\n Cleaning up...") client.agent_tokens.delete(agent_token.id) print("Deleted agent token") diff --git a/examples/run_tasks_integration.py b/examples/run_tasks_integration.py deleted file mode 100644 index 878d38e..0000000 --- a/examples/run_tasks_integration.py +++ /dev/null @@ -1,265 +0,0 @@ -#!/usr/bin/env python -""" -Run Tasks Integration Example - Real TFC/TFE Testing - -This example shows how to create a webhook server that integrates with -Terraform Cloud/Enterprise run tasks to validate runs and send results back. - -STEP-BY-STEP TESTING WITH REAL TFC/TFE: - -1. START THE SERVER: - python examples/run_tasks_integration.py --port 8888 - -2. MAKE IT ACCESSIBLE (choose one): - - Option A - Using ngrok (for local testing): - - Install: https://ngrok.com/download - - Run: ngrok http 8888 - - Copy the public URL (e.g., https://abc123.ngrok.io) - - Option B - Deploy to cloud (recommended for production): - - AWS EC2: - - Launch EC2 instance (t2.micro sufficient for testing) - - Upload this file: scp run_tasks_integration.py ec2-user@YOUR-IP:~/ - - SSH in: ssh ec2-user@YOUR-IP - - Install Python 3.11+: sudo dnf install python3.11 python3.11-pip - - Install dependencies: python3.11 -m pip install --user pytfe - - Run server: python3.11 run_tasks_integration.py --port 8888 - - Configure security group: Allow port 8888 from 0.0.0.0/0 - - Use public IP: http://YOUR-EC2-IP:8888 - - Heroku (easiest): - - Create Procfile: web: python run_tasks_integration.py --port $PORT - - Create requirements.txt: pytfe>=0.1.0 - - Deploy: git push heroku main - - Use Heroku URL: https://your-app.herokuapp.com - - Google Cloud Run: - - Create Dockerfile: FROM python:3.11 / RUN pip install pytfe / COPY . . / CMD ["python", "run_tasks_integration.py", "--port", "8080"] - - Deploy: gcloud run deploy --source . - - Use Cloud Run URL: https://your-service-hash.run.app - - DigitalOcean Droplet: - - Create Ubuntu droplet - - Upload file and install Python/pytfe - - Run with: python3 run_tasks_integration.py --port 8888 - - Use droplet IP: http://YOUR-DROPLET-IP:8888 - - Benefits of cloud deployment: - - Permanent URL (no ngrok reconnections) - - Better reliability and uptime - - Can handle production workloads - - SSL/HTTPS support available - - Scalable if needed - -3. CREATE RUN TASK IN TFC/TFE: - - Go to: https://app.terraform.io/app/YOUR_ORG/settings/tasks - - Click "Create run task" - - Name: "python-tfe-test" - - URL: Your public URL from step 2 - - Save and wait for verification (check mark) - -4. ATTACH TO WORKSPACE: - - Go to workspace settings → Run Tasks - - Click "Add run task" - - Select "python-tfe-test" - - Enforcement: Advisory (for testing) - - Stage: Pre-plan - - Save - -5. TRIGGER A RUN: - - Go to your workspace - - Click "Actions" → "Start new run" - - Watch this terminal for webhook activity! - - Check TFC/TFE UI for run task results - -CUSTOMIZE VALIDATION LOGIC: -Edit the section around line 80 to add your custom checks: -- Cost validation -- Security scanning (Checkov, tfsec) -- Policy enforcement -- Custom approval workflows - -API Documentation: - https://developer.hashicorp.com/terraform/enterprise/api-docs/run-tasks/run-tasks-integration -""" - -import json -from http.server import BaseHTTPRequestHandler, HTTPServer - -from pytfe import TFEClient -from pytfe.models import RunTaskRequest -from pytfe.models import ( - TaskResultCallbackOptions, - TaskResultOutcome, - TaskResultTag, -) - - -class RunTaskHandler(BaseHTTPRequestHandler): - """HTTP handler for run task callbacks from TFC/TFE.""" - - def do_POST(self): - """Handle POST request from TFC/TFE run task webhook.""" - # Read the request body - content_length = int(self.headers["Content-Length"]) - body = self.rfile.read(content_length) - - try: - # Parse the incoming run task request - payload = json.loads(body) - print("\n" + "=" * 60) - print("Received Run Task Request") - print("=" * 60) - - # Parse into RunTaskRequest model - request = RunTaskRequest.model_validate(payload) - - print(f"Run ID: {request.run_id}") - print(f"Organization: {request.organization_name}") - print(f"Workspace: {request.workspace_name}") - print(f"Workspace ID: {request.workspace_id}") - print(f"Stage: {request.stage}") - print(f"Callback URL: {request.task_result_callback_url}") - print(f"Is Speculative: {request.is_speculative}") - - # Handle verification requests (test webhooks from TFC/TFE) - if ( - request.organization_name == "test-org" - or request.workspace_name == "test-workspace" - ): - print("\n[OK] Verification request detected - responding with 200 OK") - print("=" * 60 + "\n") - self.send_response(200) - self.send_header("Content-type", "application/json") - self.end_headers() - self.wfile.write(json.dumps({"status": "ok"}).encode()) - return - - # =============================================================== - # CUSTOMIZE YOUR VALIDATION LOGIC HERE - # =============================================================== - # This is where you add your custom checks and validation. - # Examples: - # - # 1. Cost Control: - # if estimated_cost > 1000: - # result_status = "failed" - # result_message = f"Cost ${estimated_cost} exceeds limit" - # - # 2. Security Scanning: - # scan_results = run_checkov(request.configuration_version_download_url) - # if scan_results.failed: - # result_status = "failed" - # result_message = "Security scan failed" - # - # 3. Policy Enforcement: - # if not workspace_has_required_tags(request.workspace_name): - # result_status = "failed" - # result_message = "Workspace missing required tags" - # - # 4. Custom Approval: - # if request.workspace_name.startswith("prod-"): - # result_status = "failed" - # result_message = "Production changes require manual approval" - - # For this example, we'll just pass the task - result_status = "passed" - result_message = "All checks passed successfully" - - # Create detailed outcomes (optional but recommended) - outcomes = [ - TaskResultOutcome( - outcome_id="check-1", - description="Configuration validation passed", - body="All Terraform configurations are valid and follow best practices.", - url="https://example.com/results/check-1", - tags={ - "Status": [TaskResultTag(label="Passed", level="info")], - "Category": [TaskResultTag(label="Validation")], - }, - ) - ] - - # Create callback options - callback_options = TaskResultCallbackOptions( - status=result_status, - message=result_message, - url="https://example.com/full-results", - outcomes=outcomes, - ) - - # Initialize client and send callback - print("\nInitializing TFEClient...") - print(f"Access token from webhook: {request.access_token[:10]}***") - client = TFEClient() - print("Client initialized successfully") - - print(f"Sending callback to: {request.task_result_callback_url[:50]}...") - client.run_tasks_integration.callback( - callback_url=request.task_result_callback_url, - access_token=request.access_token, - options=callback_options, - ) - - print(f"\n[SUCCESS] Callback sent successfully: {result_status}") - print("=" * 60 + "\n") - - # Respond to TFC/TFE - self.send_response(200) - self.send_header("Content-type", "application/json") - self.end_headers() - self.wfile.write(json.dumps({"status": "received"}).encode()) - - except Exception as e: - print(f"Error processing request: {e}") - self.send_response(500) - self.send_header("Content-type", "application/json") - self.end_headers() - self.wfile.write(json.dumps({"error": str(e)}).encode()) - - def log_message(self, format, *args): - """Suppress default HTTP logging.""" - pass - - -def run_server(port=8080): - """Start the run task callback server.""" - server_address = ("", port) - httpd = HTTPServer(server_address, RunTaskHandler) - - print("=" * 60) - print("Run Tasks Integration Callback Server") - print("=" * 60) - print(f"Listening on http://localhost:{port}") - print("\nFor local testing:") - print(" 1. Use ngrok or similar tool to expose this server:") - print(f" ngrok http {port}") - print(" 2. Configure your run task in TFC/TFE with the ngrok URL") - print(" 3. Trigger a run in your workspace") - print("\nWaiting for requests from TFC/TFE...") - print("=" * 60 + "\n") - - try: - httpd.serve_forever() - except KeyboardInterrupt: - print("\n\nShutting down server...") - httpd.shutdown() - - -if __name__ == "__main__": - import argparse - - parser = argparse.ArgumentParser( - description="Run Tasks Integration callback server" - ) - parser.add_argument( - "--port", - type=int, - default=8080, - help="Port to listen on (default: 8080)", - ) - args = parser.parse_args() - - run_server(port=args.port) diff --git a/src/pytfe/models/__init__.py b/src/pytfe/models/__init__.py index 6449b6a..8524e6b 100644 --- a/src/pytfe/models/__init__.py +++ b/src/pytfe/models/__init__.py @@ -273,15 +273,6 @@ Stage, TaskEnforcementLevel, ) -from .run_task_request import ( - RunTaskRequest, - RunTaskRequestCapabilities, -) -from .run_tasks_integration import ( - TaskResultCallbackOptions, - TaskResultOutcome, - TaskResultTag, -) from .run_trigger import ( RunTrigger, RunTriggerCreateOptions, @@ -589,22 +580,6 @@ "RunTaskCreateOptions", "RunTaskUpdateOptions", "RunTaskReadOptions", - "RunTaskRequest", - "RunTaskRequestCapabilities", - "TaskResultCallbackOptions", - "TaskResultOutcome", - "TaskResultTag", - "TaskResult", - "TaskResultStatus", - "TaskResultStatusTimestamps", - "Actions", - "Permissions", - "TaskStage", - "TaskStageListOptions", - "TaskStageOverrideOptions", - "TaskStageReadOptions", - "TaskStageStatus", - "TaskStageStatusTimestamps", # Run triggers "RunTrigger", "RunTriggerCreateOptions", diff --git a/src/pytfe/models/agent.py b/src/pytfe/models/agent.py index 48c3055..08f40e1 100644 --- a/src/pytfe/models/agent.py +++ b/src/pytfe/models/agent.py @@ -82,6 +82,10 @@ class AgentPoolCreateOptions(BaseModel): organization_scoped: bool | None = None # Optional: Allowed workspace policy allowed_workspace_policy: AgentPoolAllowedWorkspacePolicy | None = None + # Optional: IDs of workspaces allowed to use this pool (sent as relationships.allowed-workspaces) + allowed_workspace_ids: list[str] = Field(default_factory=list) + # Optional: IDs of workspaces excluded from this pool (sent as relationships.excluded-workspaces) + excluded_workspace_ids: list[str] = Field(default_factory=list) class AgentPoolUpdateOptions(BaseModel): @@ -93,6 +97,10 @@ class AgentPoolUpdateOptions(BaseModel): organization_scoped: bool | None = None # Optional: Allowed workspace policy allowed_workspace_policy: AgentPoolAllowedWorkspacePolicy | None = None + # Optional: Full replacement list of workspace IDs allowed to use this pool + allowed_workspace_ids: list[str] = Field(default_factory=list) + # Optional: Full replacement list of workspace IDs excluded from this pool + excluded_workspace_ids: list[str] = Field(default_factory=list) class AgentPoolReadOptions(BaseModel): diff --git a/src/pytfe/resources/agent_pools.py b/src/pytfe/resources/agent_pools.py index e0ff776..6ff5c40 100644 --- a/src/pytfe/resources/agent_pools.py +++ b/src/pytfe/resources/agent_pools.py @@ -203,7 +203,27 @@ def create(self, organization: str, options: AgentPoolCreateOptions) -> AgentPoo options.allowed_workspace_policy.value ) - payload = {"data": {"type": "agent-pools", "attributes": attributes}} + relationships: dict[str, Any] = {} + if options.allowed_workspace_ids: + relationships["allowed-workspaces"] = { + "data": [ + {"type": "workspaces", "id": ws_id} + for ws_id in options.allowed_workspace_ids + ] + } + if options.excluded_workspace_ids: + relationships["excluded-workspaces"] = { + "data": [ + {"type": "workspaces", "id": ws_id} + for ws_id in options.excluded_workspace_ids + ] + } + + payload: dict[str, Any] = { + "data": {"type": "agent-pools", "attributes": attributes} + } + if relationships: + payload["data"]["relationships"] = relationships response = self.t.request("POST", path, json_body=payload) data = response.json()["data"] @@ -320,13 +340,31 @@ def update(self, agent_pool_id: str, options: AgentPoolUpdateOptions) -> AgentPo options.allowed_workspace_policy.value ) - payload = { + relationships: dict[str, Any] = {} + if options.allowed_workspace_ids: + relationships["allowed-workspaces"] = { + "data": [ + {"type": "workspaces", "id": ws_id} + for ws_id in options.allowed_workspace_ids + ] + } + if options.excluded_workspace_ids: + relationships["excluded-workspaces"] = { + "data": [ + {"type": "workspaces", "id": ws_id} + for ws_id in options.excluded_workspace_ids + ] + } + + payload: dict[str, Any] = { "data": { "type": "agent-pools", "id": agent_pool_id, "attributes": attributes, } } + if relationships: + payload["data"]["relationships"] = relationships response = self.t.request("PATCH", path, json_body=payload) data = response.json()["data"] @@ -372,7 +410,11 @@ def delete(self, agent_pool_id: str) -> None: def assign_to_workspaces( self, agent_pool_id: str, options: AgentPoolAssignToWorkspacesOptions ) -> None: - """Assign an agent pool to workspaces. + """Assign an agent pool to workspaces by updating the allowed-workspaces + relationship via PATCH /agent-pools/:id. + + The provided workspace IDs become the new complete list of allowed + workspaces for this pool (full replacement, not append). Args: agent_pool_id: Agent pool ID @@ -388,26 +430,41 @@ def assign_to_workspaces( if not options.workspace_ids: raise ValueError("At least one workspace ID is required") - path = f"/api/v2/agent-pools/{agent_pool_id}/relationships/workspaces" - - # Create data payload with workspace references - workspace_data = [] for workspace_id in options.workspace_ids: if not valid_string_id(workspace_id): raise ValueError(f"Invalid workspace ID: {workspace_id}") - workspace_data.append({"type": "workspaces", "id": workspace_id}) - payload = {"data": workspace_data} - self.t.request("POST", path, json_body=payload) + path = f"/api/v2/agent-pools/{agent_pool_id}" + payload: dict[str, Any] = { + "data": { + "type": "agent-pools", + "id": agent_pool_id, + "attributes": {}, + "relationships": { + "allowed-workspaces": { + "data": [ + {"type": "workspaces", "id": ws_id} + for ws_id in options.workspace_ids + ] + } + }, + } + } + self.t.request("PATCH", path, json_body=payload) def remove_from_workspaces( self, agent_pool_id: str, options: AgentPoolRemoveFromWorkspacesOptions ) -> None: - """Remove an agent pool from workspaces. + """Exclude workspaces from an agent pool by updating the excluded-workspaces + relationship via PATCH /agent-pools/:id. + + Use this for organization-scoped pools where most workspaces are allowed + but you want to block specific ones. The provided list becomes the new + complete excluded-workspaces list (full replacement, not append). Args: agent_pool_id: Agent pool ID - options: Removal options containing workspace IDs + options: Removal options containing workspace IDs to exclude Raises: ValueError: If parameters are invalid @@ -419,14 +476,24 @@ def remove_from_workspaces( if not options.workspace_ids: raise ValueError("At least one workspace ID is required") - path = f"/api/v2/agent-pools/{agent_pool_id}/relationships/workspaces" - - # Create data payload with workspace references - workspace_data = [] for workspace_id in options.workspace_ids: if not valid_string_id(workspace_id): raise ValueError(f"Invalid workspace ID: {workspace_id}") - workspace_data.append({"type": "workspaces", "id": workspace_id}) - payload = {"data": workspace_data} - self.t.request("DELETE", path, json_body=payload) + path = f"/api/v2/agent-pools/{agent_pool_id}" + payload: dict[str, Any] = { + "data": { + "type": "agent-pools", + "id": agent_pool_id, + "attributes": {}, + "relationships": { + "excluded-workspaces": { + "data": [ + {"type": "workspaces", "id": ws_id} + for ws_id in options.workspace_ids + ] + } + }, + } + } + self.t.request("PATCH", path, json_body=payload) diff --git a/src/pytfe/resources/run_tasks_integration.py b/src/pytfe/resources/run_tasks_integration.py deleted file mode 100644 index 7478a1a..0000000 --- a/src/pytfe/resources/run_tasks_integration.py +++ /dev/null @@ -1,68 +0,0 @@ -"""Run Tasks Integration resource for python-tfe. - -This module provides the callback functionality for external run task servers -to send results back to Terraform Cloud/Enterprise. -""" - -from __future__ import annotations - -from typing import Any - -from ..errors import TFEError -from ..models.task_result import TaskResultStatus -from ..models.run_tasks_integration import ( - TaskResultTag, - TaskResultOutcome, - TaskResultCallbackOptions, -) -from ._base import _Service - - -class RunTasksIntegration(_Service): - """Run Tasks Integration API for sending callbacks to TFC/TFE. - - This service is used by external run task servers to send task results - back to Terraform Cloud/Enterprise. - - API Documentation: - https://developer.hashicorp.com/terraform/enterprise/api-docs/run-tasks/run-tasks-integration - """ - - def callback( - self, - callback_url: str, - access_token: str, - options: TaskResultCallbackOptions, - ) -> None: - """Send task result callback to TFC/TFE. - - Args: - callback_url: The callback URL from the run task request - access_token: The access token from the run task request - options: Task result callback options - - Raises: - TFEError: If callback_url or access_token is invalid - TFEError: If options validation fails - """ - if not callback_url or not callback_url.strip(): - raise TFEError("callback_url cannot be empty") - - if not access_token or not access_token.strip(): - raise TFEError("access_token cannot be empty") - - options.validate() - - # Create custom headers with the access token from the request - headers = { - "Authorization": f"Bearer {access_token}", - "Content-Type": "application/vnd.api+json", - } - - # Send PATCH request to callback URL - self.t.request( - "PATCH", - callback_url, - json_body=options.to_dict(), - headers=headers, - ) diff --git a/tests/units/test_agent_pools.py b/tests/units/test_agent_pools.py index a113ac6..23703a0 100644 --- a/tests/units/test_agent_pools.py +++ b/tests/units/test_agent_pools.py @@ -6,6 +6,7 @@ 3. Agent token management 4. Request building and parameter handling 5. Response parsing and error handling +6. Workspace assignment (assign_to_workspaces / remove_from_workspaces bug fix) Run with: pytest tests/units/test_agent_pools.py -v @@ -19,8 +20,10 @@ from pytfe.models.agent import ( AgentPool, AgentPoolAllowedWorkspacePolicy, + AgentPoolAssignToWorkspacesOptions, AgentPoolCreateOptions, AgentPoolListOptions, + AgentPoolRemoveFromWorkspacesOptions, AgentPoolUpdateOptions, AgentTokenCreateOptions, ) @@ -85,6 +88,26 @@ def test_agent_pool_create_options(self): == AgentPoolAllowedWorkspacePolicy.SPECIFIC_WORKSPACES ) + def test_agent_pool_create_options_workspace_ids(self): + """Test AgentPoolCreateOptions with allowed/excluded workspace IDs (bug fix)""" + options = AgentPoolCreateOptions( + name="scoped-pool", + organization_scoped=False, + allowed_workspace_ids=["ws-aaa", "ws-bbb"], + excluded_workspace_ids=["ws-ccc"], + ) + assert options.allowed_workspace_ids == ["ws-aaa", "ws-bbb"] + assert options.excluded_workspace_ids == ["ws-ccc"] + + def test_agent_pool_update_options_workspace_ids(self): + """Test AgentPoolUpdateOptions with allowed/excluded workspace IDs (bug fix)""" + options = AgentPoolUpdateOptions( + allowed_workspace_ids=["ws-aaa"], + excluded_workspace_ids=["ws-bbb"], + ) + assert options.allowed_workspace_ids == ["ws-aaa"] + assert options.excluded_workspace_ids == ["ws-bbb"] + class TestAgentPoolOperations: """Test agent pool CRUD operations""" @@ -121,7 +144,6 @@ def test_list_agent_pools(self, agent_pools_service, mock_transport): } mock_transport.request.return_value.json.return_value = mock_response - agent_pools = list(agent_pools_service.list("test-org")) assert len(agent_pools) == 1 @@ -171,7 +193,6 @@ def test_create_agent_pool(self, agent_pools_service, mock_transport): } mock_transport.request.return_value.json.return_value = mock_response - options = AgentPoolCreateOptions( name="new-pool", organization_scoped=True, @@ -207,7 +228,6 @@ def test_read_agent_pool(self, agent_pools_service, mock_transport): } mock_transport.request.return_value.json.return_value = mock_response - agent_pool = agent_pools_service.read("apool-123456789abcdef0") assert agent_pool.id == "apool-123456789abcdef0" @@ -238,9 +258,7 @@ def test_update_agent_pool(self, agent_pools_service, mock_transport): } mock_transport.request.return_value.json.return_value = mock_response - options = AgentPoolUpdateOptions(name="updated-pool", organization_scoped=False) - agent_pool = agent_pools_service.update("apool-123456789abcdef0", options) assert agent_pool.id == "apool-123456789abcdef0" @@ -263,6 +281,60 @@ def test_delete_agent_pool(self, agent_pools_service, mock_transport): assert call_args[0][0] == "DELETE" assert "agent-pools/apool-123456789abcdef0" in call_args[0][1] + def test_assign_to_workspaces(self, agent_pools_service, mock_transport): + """assign_to_workspaces must PATCH /agent-pools/:id with relationships.allowed-workspaces. + + Previously (broken): POST /agent-pools/:id/relationships/workspaces -> 404 + Fixed: PATCH /agent-pools/:id with relationships.allowed-workspaces body + """ + pool_id = "apool-123456789abcdef0" + ws_id = "ws-aaaaaaaaaaaaaaa1" + + agent_pools_service.assign_to_workspaces( + pool_id, + AgentPoolAssignToWorkspacesOptions(workspace_ids=[ws_id]), + ) + + call_args = mock_transport.request.call_args + # Must be PATCH, not POST + assert call_args[0][0] == "PATCH" + # Must target the pool URL, not a /relationships/workspaces sub-resource + assert call_args[0][1] == f"/api/v2/agent-pools/{pool_id}" + # Payload must use relationships.allowed-workspaces + body = call_args[1]["json_body"]["data"] + assert body["type"] == "agent-pools" + assert body["id"] == pool_id + ws_data = body["relationships"]["allowed-workspaces"]["data"] + assert ws_data[0]["id"] == ws_id + assert ws_data[0]["type"] == "workspaces" + + def test_remove_from_workspaces(self, agent_pools_service, mock_transport): + """remove_from_workspaces must PATCH /agent-pools/:id with relationships.excluded-workspaces. + + Previously (broken): DELETE /agent-pools/:id/relationships/workspaces -> 404 + Fixed: PATCH /agent-pools/:id with relationships.excluded-workspaces body + """ + pool_id = "apool-123456789abcdef0" + ws_id = "ws-aaaaaaaaaaaaaaa1" + + agent_pools_service.remove_from_workspaces( + pool_id, + AgentPoolRemoveFromWorkspacesOptions(workspace_ids=[ws_id]), + ) + + call_args = mock_transport.request.call_args + # Must be PATCH, not DELETE + assert call_args[0][0] == "PATCH" + # Must target the pool URL, not a /relationships/workspaces sub-resource + assert call_args[0][1] == f"/api/v2/agent-pools/{pool_id}" + # Payload must use relationships.excluded-workspaces + body = call_args[1]["json_body"]["data"] + assert body["type"] == "agent-pools" + assert body["id"] == pool_id + ws_data = body["relationships"]["excluded-workspaces"]["data"] + assert ws_data[0]["id"] == ws_id + assert ws_data[0]["type"] == "workspaces" + class TestAgentTokenOperations: """Test agent token operations""" @@ -297,7 +369,6 @@ def test_list_agent_tokens(self, agent_tokens_service, mock_transport): } mock_transport.request.return_value.json.return_value = mock_response - tokens = list(agent_tokens_service.list("apool-123456789abcdef0")) assert len(tokens) == 1 @@ -328,7 +399,6 @@ def test_create_agent_token(self, agent_tokens_service, mock_transport): } mock_transport.request.return_value.json.return_value = mock_response - options = AgentTokenCreateOptions(description="New token") token = agent_tokens_service.create("apool-123456789abcdef0", options) @@ -361,7 +431,6 @@ def test_read_agent_token(self, agent_tokens_service, mock_transport): } mock_transport.request.return_value.json.return_value = mock_response - token = agent_tokens_service.read("at-123456789abcdef0") assert token.id == "at-123456789abcdef0" diff --git a/tests/units/test_run_tasks_integration.py b/tests/units/test_run_tasks_integration.py deleted file mode 100644 index 15bef5c..0000000 --- a/tests/units/test_run_tasks_integration.py +++ /dev/null @@ -1,324 +0,0 @@ -"""Unit tests for Run Tasks Integration.""" - -from __future__ import annotations - -from unittest.mock import MagicMock - -import pytest - -from pytfe.errors import TFEError -from pytfe.models.run_task_request import RunTaskRequest -from pytfe.models import ( - TaskResultCallbackOptions, - TaskResultOutcome, - TaskResultStatus, - TaskResultTag, -) -from pytfe.resources.run_tasks_integration import RunTasksIntegration - - -class TestRunTaskRequest: - """Tests for RunTaskRequest model.""" - - def test_run_task_request_minimal(self): - """Test parsing minimal run task request.""" - data = { - "access_token": "test-token-123", - "is_speculative": False, - "organization_name": "my-org", - "payload_version": 1, - "run_app_url": "https://app.terraform.io/app/my-org/my-workspace/runs/run-123", - "run_created_at": "2025-12-22T10:00:00Z", - "run_created_by": "user@example.com", - "run_id": "run-123", - "run_message": "Test run", - "stage": "post_plan", - "task_result_callback_url": "https://app.terraform.io/api/v2/task-results/tr-123/callback", - "task_result_enforcement_level": "mandatory", - "task_result_id": "tr-123", - "workspace_app_url": "https://app.terraform.io/app/my-org/my-workspace", - "workspace_id": "ws-123", - "workspace_name": "my-workspace", - } - - request = RunTaskRequest(**data) - - assert request.access_token == "test-token-123" - assert request.organization_name == "my-org" - assert request.run_id == "run-123" - assert request.stage == "post_plan" - assert ( - request.task_result_callback_url - == "https://app.terraform.io/api/v2/task-results/tr-123/callback" - ) - - def test_run_task_request_complete(self): - """Test parsing complete run task request with all fields.""" - data = { - "access_token": "test-token-456", - "capabilities": {"outcomes": True}, - "configuration_version_download_url": "https://app.terraform.io/api/v2/configuration-versions/cv-123/download", - "configuration_version_id": "cv-123", - "is_speculative": True, - "organization_name": "test-org", - "payload_version": 1, - "plan_json_api_url": "https://app.terraform.io/api/v2/plans/plan-123/json-output", - "run_app_url": "https://app.terraform.io/app/test-org/test-workspace/runs/run-456", - "run_created_at": "2025-12-22T11:30:00Z", - "run_created_by": "admin@example.com", - "run_id": "run-456", - "run_message": "Test with VCS", - "stage": "pre_plan", - "task_result_callback_url": "https://app.terraform.io/api/v2/task-results/tr-456/callback", - "task_result_enforcement_level": "advisory", - "task_result_id": "tr-456", - "vcs_branch": "main", - "vcs_commit_url": "https://github.com/org/repo/commit/abc123", - "vcs_pull_request_url": "https://github.com/org/repo/pull/42", - "vcs_repo_url": "https://github.com/org/repo", - "workspace_app_url": "https://app.terraform.io/app/test-org/test-workspace", - "workspace_id": "ws-456", - "workspace_name": "test-workspace", - "workspace_working_directory": "terraform/", - } - - request = RunTaskRequest(**data) - - assert request.access_token == "test-token-456" - assert request.capabilities is not None - assert request.capabilities.outcomes is True - assert request.configuration_version_id == "cv-123" - assert request.vcs_branch == "main" - assert request.vcs_commit_url == "https://github.com/org/repo/commit/abc123" - assert request.workspace_working_directory == "terraform/" - - -class TestTaskResultTag: - """Tests for TaskResultTag.""" - - def test_tag_with_level(self): - """Test tag with level.""" - tag = TaskResultTag(label="High", level="error") - data = tag.to_dict() - - assert data["label"] == "High" - assert data["level"] == "error" - - def test_tag_without_level(self): - """Test tag without level.""" - tag = TaskResultTag(label="Passed") - data = tag.to_dict() - - assert data["label"] == "Passed" - assert "level" not in data - - -class TestTaskResultOutcome: - """Tests for TaskResultOutcome.""" - - def test_outcome_complete(self): - """Test complete outcome with all fields.""" - tags = { - "Status": [TaskResultTag(label="Failed", level="error")], - "Severity": [TaskResultTag(label="High", level="error")], - } - - outcome = TaskResultOutcome( - outcome_id="ISSUE-123", - description="Security issue found", - body="# Details\n\nSecurity vulnerability detected.", - url="https://example.com/issues/123", - tags=tags, - ) - - data = outcome.to_dict() - - assert data["type"] == "task-result-outcomes" - assert data["attributes"]["outcome-id"] == "ISSUE-123" - assert data["attributes"]["description"] == "Security issue found" - assert ( - data["attributes"]["body"] - == "# Details\n\nSecurity vulnerability detected." - ) - assert data["attributes"]["url"] == "https://example.com/issues/123" - assert "Status" in data["attributes"]["tags"] - - def test_outcome_minimal(self): - """Test minimal outcome.""" - outcome = TaskResultOutcome() - data = outcome.to_dict() - - assert data["type"] == "task-result-outcomes" - assert "attributes" in data - - -class TestTaskResultCallbackOptions: - """Tests for TaskResultCallbackOptions.""" - - def test_callback_options_passed(self): - """Test callback options with passed status.""" - options = TaskResultCallbackOptions( - status=TaskResultStatus.PASSED, - message="All checks passed", - url="https://example.com/results/123", - ) - - options.validate() - data = options.to_dict() - - assert data["data"]["type"] == "task-results" - assert data["data"]["attributes"]["status"] == "passed" - assert data["data"]["attributes"]["message"] == "All checks passed" - assert data["data"]["attributes"]["url"] == "https://example.com/results/123" - - def test_callback_options_with_outcomes(self): - """Test callback options with outcomes.""" - outcome = TaskResultOutcome( - outcome_id="ISSUE-1", - description="Test issue", - ) - - options = TaskResultCallbackOptions( - status=TaskResultStatus.FAILED, - message="1 issue found", - outcomes=[outcome], - ) - - data = options.to_dict() - - assert "relationships" in data["data"] - assert "outcomes" in data["data"]["relationships"] - assert len(data["data"]["relationships"]["outcomes"]["data"]) == 1 - - def test_validate_invalid_status(self): - """Test validation fails with invalid status.""" - options = TaskResultCallbackOptions(status="invalid") - - with pytest.raises(TFEError) as exc_info: - options.validate() - - assert "Invalid task result status" in str(exc_info.value) - - def test_validate_valid_statuses(self): - """Test validation passes with all valid statuses.""" - for status in [ - TaskResultStatus.PASSED, - TaskResultStatus.FAILED, - TaskResultStatus.RUNNING, - ]: - options = TaskResultCallbackOptions(status=status) - options.validate() # Should not raise - - -class TestRunTasksIntegration: - """Tests for RunTasksIntegration service.""" - - def test_callback_success(self): - """Test successful callback.""" - mock_transport = MagicMock() - integration = RunTasksIntegration(mock_transport) - - options = TaskResultCallbackOptions( - status=TaskResultStatus.PASSED, - message="All tests passed", - ) - - integration.callback( - callback_url="https://app.terraform.io/api/v2/task-results/tr-123/callback", - access_token="test-token-123", - options=options, - ) - - # Verify request was made - mock_transport.request.assert_called_once() - call_args = mock_transport.request.call_args - - assert call_args[0][0] == "PATCH" - assert ( - call_args[0][1] - == "https://app.terraform.io/api/v2/task-results/tr-123/callback" - ) - assert "Authorization" in call_args[1]["headers"] - assert call_args[1]["headers"]["Authorization"] == "Bearer test-token-123" - - def test_callback_empty_url(self): - """Test callback fails with empty URL.""" - mock_transport = MagicMock() - integration = RunTasksIntegration(mock_transport) - - options = TaskResultCallbackOptions(status=TaskResultStatus.PASSED) - - with pytest.raises(TFEError) as exc_info: - integration.callback( - callback_url="", - access_token="test-token", - options=options, - ) - - assert "callback_url cannot be empty" in str(exc_info.value) - - def test_callback_empty_token(self): - """Test callback fails with empty token.""" - mock_transport = MagicMock() - integration = RunTasksIntegration(mock_transport) - - options = TaskResultCallbackOptions(status=TaskResultStatus.PASSED) - - with pytest.raises(TFEError) as exc_info: - integration.callback( - callback_url="https://example.com/callback", - access_token="", - options=options, - ) - - assert "access_token cannot be empty" in str(exc_info.value) - - def test_callback_invalid_status(self): - """Test callback fails with invalid status.""" - mock_transport = MagicMock() - integration = RunTasksIntegration(mock_transport) - - options = TaskResultCallbackOptions(status="invalid-status") - - with pytest.raises(TFEError) as exc_info: - integration.callback( - callback_url="https://example.com/callback", - access_token="test-token", - options=options, - ) - - assert "Invalid task result status" in str(exc_info.value) - - def test_callback_with_outcomes(self): - """Test callback with detailed outcomes.""" - mock_transport = MagicMock() - integration = RunTasksIntegration(mock_transport) - - outcome = TaskResultOutcome( - outcome_id="CHECK-1", - description="Policy violation", - body="## Issue\n\nPolicy check failed.", - url="https://example.com/check-1", - tags={ - "Severity": [TaskResultTag(label="High", level="error")], - }, - ) - - options = TaskResultCallbackOptions( - status=TaskResultStatus.FAILED, - message="Policy check failed", - url="https://example.com/results", - outcomes=[outcome], - ) - - integration.callback( - callback_url="https://app.terraform.io/api/v2/task-results/tr-123/callback", - access_token="test-token-123", - options=options, - ) - - call_args = mock_transport.request.call_args - body = call_args[1]["json_body"] - - assert "relationships" in body["data"] - assert "outcomes" in body["data"]["relationships"] From 361b94e9a77ca4f501ccac1a46ad475e89a22d18 Mon Sep 17 00:00:00 2001 From: KshitijaChoudhari Date: Tue, 24 Mar 2026 11:03:02 +0530 Subject: [PATCH 3/3] agent_pools: return AgentPool from assign/remove workspace operations; update tests and example --- examples/agent_pool.py | 8 ++-- src/pytfe/resources/agent_pools.py | 60 ++++++++++++++++++++++++++++-- tests/units/test_agent_pools.py | 40 +++++++++++++++++++- 3 files changed, 98 insertions(+), 10 deletions(-) diff --git a/examples/agent_pool.py b/examples/agent_pool.py index 4fb1509..432c706 100644 --- a/examples/agent_pool.py +++ b/examples/agent_pool.py @@ -107,18 +107,18 @@ def main(): # remove_from_workspaces sends PATCH /agent-pools/:id with relationships.excluded-workspaces if workspace_id: print("\n Assigning workspace to agent pool...") - client.agent_pools.assign_to_workspaces( + updated_pool = client.agent_pools.assign_to_workspaces( new_pool.id, AgentPoolAssignToWorkspacesOptions(workspace_ids=[workspace_id]), ) - print(f" Assigned workspace {workspace_id} to pool") + print(f" Assigned workspace {workspace_id} to pool {updated_pool.name}") print("\n Removing workspace from agent pool...") - client.agent_pools.remove_from_workspaces( + updated_pool = client.agent_pools.remove_from_workspaces( new_pool.id, AgentPoolRemoveFromWorkspacesOptions(workspace_ids=[workspace_id]), ) - print(f" Excluded workspace {workspace_id} from pool") + print(f" Removed workspace {workspace_id} from pool {updated_pool.name}") else: print("\n Skipping workspace assignment (set TFE_WORKSPACE_ID to test)") diff --git a/src/pytfe/resources/agent_pools.py b/src/pytfe/resources/agent_pools.py index 6ff5c40..7a90f2c 100644 --- a/src/pytfe/resources/agent_pools.py +++ b/src/pytfe/resources/agent_pools.py @@ -409,7 +409,7 @@ def delete(self, agent_pool_id: str) -> None: def assign_to_workspaces( self, agent_pool_id: str, options: AgentPoolAssignToWorkspacesOptions - ) -> None: + ) -> AgentPool: """Assign an agent pool to workspaces by updating the allowed-workspaces relationship via PATCH /agent-pools/:id. @@ -420,6 +420,9 @@ def assign_to_workspaces( agent_pool_id: Agent pool ID options: Assignment options containing workspace IDs + Returns: + Updated AgentPool object + Raises: ValueError: If parameters are invalid TFEError: If API request fails @@ -450,11 +453,34 @@ def assign_to_workspaces( }, } } - self.t.request("PATCH", path, json_body=payload) + response = self.t.request("PATCH", path, json_body=payload) + data = response.json()["data"] + + # Extract agent pool data from response + attr = data.get("attributes", {}) or {} + agent_pool_data = { + "id": _safe_str(data.get("id")), + "name": _safe_str(attr.get("name")), + "created_at": attr.get("created-at"), + "organization_scoped": attr.get("organization-scoped"), + "allowed_workspace_policy": attr.get("allowed-workspace-policy"), + "agent_count": attr.get("agent-count", 0), + } + + return AgentPool( + id=_safe_str(agent_pool_data["id"]) or "", + name=_safe_str(agent_pool_data["name"]), + created_at=cast(Any, agent_pool_data["created_at"]), + organization_scoped=_safe_bool(agent_pool_data["organization_scoped"]), + allowed_workspace_policy=_safe_workspace_policy( + agent_pool_data["allowed_workspace_policy"] + ), + agent_count=_safe_int(agent_pool_data["agent_count"]), + ) def remove_from_workspaces( self, agent_pool_id: str, options: AgentPoolRemoveFromWorkspacesOptions - ) -> None: + ) -> AgentPool: """Exclude workspaces from an agent pool by updating the excluded-workspaces relationship via PATCH /agent-pools/:id. @@ -466,6 +492,9 @@ def remove_from_workspaces( agent_pool_id: Agent pool ID options: Removal options containing workspace IDs to exclude + Returns: + Updated AgentPool object + Raises: ValueError: If parameters are invalid TFEError: If API request fails @@ -496,4 +525,27 @@ def remove_from_workspaces( }, } } - self.t.request("PATCH", path, json_body=payload) + response = self.t.request("PATCH", path, json_body=payload) + data = response.json()["data"] + + # Extract agent pool data from response + attr = data.get("attributes", {}) or {} + agent_pool_data = { + "id": _safe_str(data.get("id")), + "name": _safe_str(attr.get("name")), + "created_at": attr.get("created-at"), + "organization_scoped": attr.get("organization-scoped"), + "allowed_workspace_policy": attr.get("allowed-workspace-policy"), + "agent_count": attr.get("agent-count", 0), + } + + return AgentPool( + id=_safe_str(agent_pool_data["id"]) or "", + name=_safe_str(agent_pool_data["name"]), + created_at=cast(Any, agent_pool_data["created_at"]), + organization_scoped=_safe_bool(agent_pool_data["organization_scoped"]), + allowed_workspace_policy=_safe_workspace_policy( + agent_pool_data["allowed_workspace_policy"] + ), + agent_count=_safe_int(agent_pool_data["agent_count"]), + ) diff --git a/tests/units/test_agent_pools.py b/tests/units/test_agent_pools.py index 23703a0..5022ffd 100644 --- a/tests/units/test_agent_pools.py +++ b/tests/units/test_agent_pools.py @@ -290,11 +290,29 @@ def test_assign_to_workspaces(self, agent_pools_service, mock_transport): pool_id = "apool-123456789abcdef0" ws_id = "ws-aaaaaaaaaaaaaaa1" - agent_pools_service.assign_to_workspaces( + mock_response = { + "data": { + "id": pool_id, + "type": "agent-pools", + "attributes": { + "name": "test-pool", + "created-at": "2023-01-01T00:00:00Z", + "organization-scoped": True, + "allowed-workspace-policy": "all-workspaces", + "agent-count": 0, + }, + } + } + mock_transport.request.return_value.json.return_value = mock_response + + agent_pool = agent_pools_service.assign_to_workspaces( pool_id, AgentPoolAssignToWorkspacesOptions(workspace_ids=[ws_id]), ) + assert agent_pool.id == pool_id + assert agent_pool.name == "test-pool" + call_args = mock_transport.request.call_args # Must be PATCH, not POST assert call_args[0][0] == "PATCH" @@ -317,11 +335,29 @@ def test_remove_from_workspaces(self, agent_pools_service, mock_transport): pool_id = "apool-123456789abcdef0" ws_id = "ws-aaaaaaaaaaaaaaa1" - agent_pools_service.remove_from_workspaces( + mock_response = { + "data": { + "id": pool_id, + "type": "agent-pools", + "attributes": { + "name": "test-pool", + "created-at": "2023-01-01T00:00:00Z", + "organization-scoped": True, + "allowed-workspace-policy": "all-workspaces", + "agent-count": 0, + }, + } + } + mock_transport.request.return_value.json.return_value = mock_response + + agent_pool = agent_pools_service.remove_from_workspaces( pool_id, AgentPoolRemoveFromWorkspacesOptions(workspace_ids=[ws_id]), ) + assert agent_pool.id == pool_id + assert agent_pool.name == "test-pool" + call_args = mock_transport.request.call_args # Must be PATCH, not DELETE assert call_args[0][0] == "PATCH"