|
| 1 | +import json |
| 2 | +import os |
| 3 | +import ssl |
| 4 | +import tempfile |
| 5 | +import time |
| 6 | +import unittest |
| 7 | +import urllib.request |
| 8 | +from typing import Optional |
| 9 | + |
| 10 | +from testcontainers.core.container import DockerContainer |
| 11 | + |
| 12 | +from zitadel_client.transport_options import TransportOptions |
| 13 | +from zitadel_client.zitadel import Zitadel |
| 14 | + |
| 15 | + |
| 16 | +class TransportOptionsTest(unittest.TestCase): |
| 17 | + """ |
| 18 | + Test class for verifying transport options (default_headers, ca_cert_path, insecure) |
| 19 | + on the Zitadel factory methods. |
| 20 | +
|
| 21 | + This class starts a Docker container running WireMock with HTTPS support before any |
| 22 | + tests run and stops it after all tests. It registers stubs for OpenID Configuration |
| 23 | + discovery and the token endpoint so that Zitadel.with_client_credentials() can |
| 24 | + complete its initialization flow. |
| 25 | + """ |
| 26 | + |
| 27 | + host: Optional[str] = None |
| 28 | + http_port: Optional[str] = None |
| 29 | + https_port: Optional[str] = None |
| 30 | + ca_cert_path: Optional[str] = None |
| 31 | + wiremock: DockerContainer = None |
| 32 | + |
| 33 | + @classmethod |
| 34 | + def setup_class(cls) -> None: |
| 35 | + cls.wiremock = ( |
| 36 | + DockerContainer("wiremock/wiremock:3.3.1") |
| 37 | + .with_exposed_ports(8080, 8443) |
| 38 | + .with_command("--https-port 8443 --global-response-templating") |
| 39 | + ) |
| 40 | + cls.wiremock.start() |
| 41 | + |
| 42 | + cls.host = cls.wiremock.get_container_host_ip() |
| 43 | + cls.http_port = cls.wiremock.get_exposed_port(8080) |
| 44 | + cls.https_port = cls.wiremock.get_exposed_port(8443) |
| 45 | + |
| 46 | + # Wait for WireMock to be ready by polling the admin API |
| 47 | + admin_url = f"http://{cls.host}:{cls.http_port}/__admin/mappings" |
| 48 | + for _ in range(30): |
| 49 | + try: |
| 50 | + with urllib.request.urlopen(admin_url, timeout=2) as resp: # noqa: S310 |
| 51 | + if resp.status == 200: |
| 52 | + break |
| 53 | + except Exception: # noqa: S110, BLE001 |
| 54 | + pass |
| 55 | + time.sleep(1) |
| 56 | + |
| 57 | + # Register stub for OpenID Configuration discovery |
| 58 | + oidc_stub = json.dumps( |
| 59 | + { |
| 60 | + "request": {"method": "GET", "url": "/.well-known/openid-configuration"}, |
| 61 | + "response": { |
| 62 | + "status": 200, |
| 63 | + "headers": {"Content-Type": "application/json"}, |
| 64 | + "body": ( |
| 65 | + '{"issuer":"{{request.baseUrl}}",' |
| 66 | + '"token_endpoint":"{{request.baseUrl}}/oauth/v2/token",' |
| 67 | + '"authorization_endpoint":"{{request.baseUrl}}/oauth/v2/authorize",' |
| 68 | + '"userinfo_endpoint":"{{request.baseUrl}}/oidc/v1/userinfo",' |
| 69 | + '"jwks_uri":"{{request.baseUrl}}/oauth/v2/keys"}' |
| 70 | + ), |
| 71 | + }, |
| 72 | + } |
| 73 | + ).encode() |
| 74 | + |
| 75 | + req = urllib.request.Request( |
| 76 | + f"http://{cls.host}:{cls.http_port}/__admin/mappings", |
| 77 | + data=oidc_stub, |
| 78 | + headers={"Content-Type": "application/json"}, |
| 79 | + method="POST", |
| 80 | + ) |
| 81 | + with urllib.request.urlopen(req) as resp: # noqa: S310 |
| 82 | + assert resp.status == 201 |
| 83 | + |
| 84 | + # Register stub for the token endpoint |
| 85 | + token_stub = json.dumps( |
| 86 | + { |
| 87 | + "request": {"method": "POST", "url": "/oauth/v2/token"}, |
| 88 | + "response": { |
| 89 | + "status": 200, |
| 90 | + "headers": {"Content-Type": "application/json"}, |
| 91 | + "jsonBody": { |
| 92 | + "access_token": "test-token-12345", |
| 93 | + "token_type": "Bearer", |
| 94 | + "expires_in": 3600, |
| 95 | + }, |
| 96 | + }, |
| 97 | + } |
| 98 | + ).encode() |
| 99 | + |
| 100 | + req = urllib.request.Request( |
| 101 | + f"http://{cls.host}:{cls.http_port}/__admin/mappings", |
| 102 | + data=token_stub, |
| 103 | + headers={"Content-Type": "application/json"}, |
| 104 | + method="POST", |
| 105 | + ) |
| 106 | + with urllib.request.urlopen(req) as resp: # noqa: S310 |
| 107 | + assert resp.status == 201 |
| 108 | + |
| 109 | + # Extract the WireMock HTTPS certificate to a temp file |
| 110 | + pem_cert = ssl.get_server_certificate((cls.host, int(cls.https_port))) |
| 111 | + cert_file = tempfile.NamedTemporaryFile(suffix=".pem", delete=False) |
| 112 | + cert_file.write(pem_cert.encode()) |
| 113 | + cert_file.close() |
| 114 | + cls.ca_cert_path = cert_file.name |
| 115 | + |
| 116 | + @classmethod |
| 117 | + def teardown_class(cls) -> None: |
| 118 | + if cls.ca_cert_path is not None: |
| 119 | + os.unlink(cls.ca_cert_path) |
| 120 | + if cls.wiremock is not None: |
| 121 | + cls.wiremock.stop() |
| 122 | + |
| 123 | + def test_custom_ca_cert(self) -> None: |
| 124 | + zitadel = Zitadel.with_client_credentials( |
| 125 | + f"https://{self.host}:{self.https_port}", |
| 126 | + "dummy-client", |
| 127 | + "dummy-secret", |
| 128 | + ca_cert_path=self.ca_cert_path, |
| 129 | + ) |
| 130 | + self.assertIsNotNone(zitadel) |
| 131 | + |
| 132 | + def test_insecure_mode(self) -> None: |
| 133 | + zitadel = Zitadel.with_client_credentials( |
| 134 | + f"https://{self.host}:{self.https_port}", |
| 135 | + "dummy-client", |
| 136 | + "dummy-secret", |
| 137 | + insecure=True, |
| 138 | + ) |
| 139 | + self.assertIsNotNone(zitadel) |
| 140 | + |
| 141 | + def test_default_headers(self) -> None: |
| 142 | + # Use HTTP to avoid TLS concerns |
| 143 | + zitadel = Zitadel.with_client_credentials( |
| 144 | + f"http://{self.host}:{self.http_port}", |
| 145 | + "dummy-client", |
| 146 | + "dummy-secret", |
| 147 | + default_headers={"X-Custom-Header": "test-value"}, |
| 148 | + ) |
| 149 | + self.assertIsNotNone(zitadel) |
| 150 | + |
| 151 | + # Verify via WireMock request journal |
| 152 | + journal_url = f"http://{self.host}:{self.http_port}/__admin/requests" |
| 153 | + with urllib.request.urlopen(journal_url) as response: # noqa: S310 |
| 154 | + journal = json.loads(response.read().decode()) |
| 155 | + |
| 156 | + found_header = False |
| 157 | + for req in journal.get("requests", []): |
| 158 | + headers = req.get("request", {}).get("headers", {}) |
| 159 | + if "X-Custom-Header" in headers: |
| 160 | + found_header = True |
| 161 | + break |
| 162 | + self.assertTrue(found_header, "Custom header should be present in WireMock request journal") |
| 163 | + |
| 164 | + def test_proxy_url(self) -> None: |
| 165 | + # Use HTTP (not HTTPS) to avoid TLS complications with the proxy |
| 166 | + zitadel = Zitadel.with_client_credentials( |
| 167 | + f"http://{self.host}:{self.http_port}", |
| 168 | + "dummy-client", |
| 169 | + "dummy-secret", |
| 170 | + proxy_url=f"http://{self.host}:{self.http_port}", |
| 171 | + ) |
| 172 | + self.assertIsNotNone(zitadel) |
| 173 | + |
| 174 | + def test_no_ca_cert_fails(self) -> None: |
| 175 | + with self.assertRaises(Exception): # noqa: B017 |
| 176 | + Zitadel.with_client_credentials( |
| 177 | + f"https://{self.host}:{self.https_port}", |
| 178 | + "dummy-client", |
| 179 | + "dummy-secret", |
| 180 | + ) |
| 181 | + |
| 182 | + def test_transport_options_object(self) -> None: |
| 183 | + opts = TransportOptions(insecure=True) |
| 184 | + zitadel = Zitadel.with_client_credentials( |
| 185 | + f"https://{self.host}:{self.https_port}", |
| 186 | + "dummy-client", |
| 187 | + "dummy-secret", |
| 188 | + transport_options=opts, |
| 189 | + ) |
| 190 | + self.assertIsNotNone(zitadel) |
0 commit comments