From ac1c10b09a2ff8ab012b8c1b25d3014f17c25d47 Mon Sep 17 00:00:00 2001 From: Sean Sinclair <146738689+sean-sinclair@users.noreply.github.com> Date: Fri, 27 Feb 2026 12:01:33 +0100 Subject: [PATCH 1/3] Add host_url option to CLI and Checker initialization for service validation --- src/validate_secrets/cli.py | 10 +++++++++- src/validate_secrets/core/base.py | 10 +++++++++- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/src/validate_secrets/cli.py b/src/validate_secrets/cli.py index 481f8ab..ff7a037 100644 --- a/src/validate_secrets/cli.py +++ b/src/validate_secrets/cli.py @@ -33,8 +33,12 @@ is_flag=True, help="Enable debug logging. To use, add the flag as a first argument!", ) +@click.option( + "--host-url", + help="Base URL of the service to validate against", +) @click.pass_context -def cli(ctx, config, debug): +def cli(ctx, config, debug, host_url): """Extensible secret validation tool.""" ctx.ensure_object(dict) @@ -48,6 +52,7 @@ def cli(ctx, config, debug): ctx.obj["config"].setup_logging() ctx.obj["debug"] = debug + ctx.obj["host_url"] = host_url @cli.command() @@ -125,6 +130,7 @@ def check_file(ctx, file_path, secret_type, output, output_format, file_format, notify=notify or validation_config["notifications"], debug=ctx.obj["debug"], timeout=validation_config["timeout"], + host_url=ctx.obj.get("host_url"), ) for secret_data in track( @@ -245,6 +251,7 @@ def check_github(ctx, org, repo, secret_type, state, validity, output, output_fo notify=notify or validation_config["notifications"], debug=ctx.obj["debug"], timeout=validation_config["timeout"], + host_url=ctx.obj.get("host_url"), ) status = validator.check(secret) @@ -334,6 +341,7 @@ def validate(ctx, secret, secret_type, notify): notify=notify or validation_config["notifications"], debug=ctx.obj["debug"], timeout=validation_config["timeout"], + host_url=ctx.obj.get("host_url"), ) # Validate secret diff --git a/src/validate_secrets/core/base.py b/src/validate_secrets/core/base.py index 44f295c..432eb41 100644 --- a/src/validate_secrets/core/base.py +++ b/src/validate_secrets/core/base.py @@ -54,17 +54,25 @@ class Checker(ABC): name: str = "" description: str = "" - def __init__(self, notify: bool = False, debug: bool = False, timeout: int = 30) -> None: + def __init__( + self, + notify: bool = False, + debug: bool = False, + timeout: int = 30, + host_url: Optional[str] = None, + ) -> None: """Initialize the checker. Args: notify: Whether to send notifications to endpoints debug: Enable debug logging timeout: Timeout in seconds for validation + host_url: Base URL of the service to validate against """ self.notify = notify self.debug = debug self.timeout = timeout + self.host_url = host_url.rstrip("/") if host_url else None if self.debug: logging.getLogger().setLevel(logging.DEBUG) From b1a4f35d7333c5a5a074a10a8de9df693a5fa777 Mon Sep 17 00:00:00 2001 From: Sean Sinclair <146738689+sean-sinclair@users.noreply.github.com> Date: Fri, 27 Feb 2026 12:13:44 +0100 Subject: [PATCH 2/3] Add Databricks token validator and update README with usage instructions --- README.md | 10 +++ .../validators/databricks_token.py | 77 +++++++++++++++++++ tests/test_registry.py | 1 + tests/test_validators.py | 45 +++++++++++ 4 files changed, 133 insertions(+) create mode 100644 src/validate_secrets/validators/databricks_token.py diff --git a/README.md b/README.md index 8a9d771..56aadad 100644 --- a/README.md +++ b/README.md @@ -93,6 +93,7 @@ validate-secrets check-file input/secrets_file.json --file-format json --format | `google_api_key` | Google API Keys | AIza... format | | `microsoft_teams_webhook` | Microsoft Teams/Office 365 Webhooks | webhook.office.com URLs | | `snyk_api_token` | Snyk API Tokens | API tokens | +| `databricks_token` | Databricks Personal Access Tokens | `dapi...` format | Note: Most accurate way to see available validators is to run `validate-secrets list-validators` command. @@ -200,6 +201,15 @@ With the `--output` option you can also specify the file to write the output to: validate-secrets check-file secrets.txt google_api_key --file-format csv --output results.csv ``` +### Databricks Token Validation + +Validate Databricks Personal Access Tokens against a workspace. The `--host-url` flag provides the workspace URL: + +```bash +# Validate a single token +validate-secrets --host-url https://my-workspace.cloud.databricks.com validate "dapi1234abcd..." databricks_token +```` + ## License This project is licensed under the terms of the MIT open source license. Please refer to [LICENSE.md](LICENSE.md) for the full terms. diff --git a/src/validate_secrets/validators/databricks_token.py b/src/validate_secrets/validators/databricks_token.py new file mode 100644 index 0000000..406fa83 --- /dev/null +++ b/src/validate_secrets/validators/databricks_token.py @@ -0,0 +1,77 @@ +#!/usr/bin/env python3 + +"""Validator for Databricks Personal Access Tokens.""" + +import os +import requests +import logging +from typing import Optional + +from ..core.base import Checker + +LOG = logging.getLogger(__name__) + + +class DatabricksTokenChecker(Checker): + """Class to check if a Databricks Personal Access Token is valid.""" + + name = "databricks_token" + description = "Validates Databricks Personal Access Tokens" + + def __init__( + self, + notify: bool = False, + debug: bool = False, + timeout: int = 30, + host_url: Optional[str] = None, + ) -> None: + super().__init__(notify, debug, timeout, host_url) + self.session = requests.Session() + self.session.headers.update({"Content-Type": "application/json"}) + + # Fall back to DATABRICKS_HOST env var if host_url not provided + if not self.host_url: + env_host = os.environ.get("DATABRICKS_HOST", "").rstrip("/") + if env_host: + self.host_url = env_host + + def check(self, token: str) -> Optional[bool]: + """Check if a Databricks token is still active.""" + token = token.strip() + + if not self.host_url: + LOG.error( + "No host URL configured. Use --host-url or set DATABRICKS_HOST env var." + ) + return None + + if self.notify: + LOG.debug("Cannot notify Databricks tokens") + + try: + api_url = f"{self.host_url}/api/2.0/token/list" + request = self.session.prepare_request( + requests.Request("GET", api_url, headers={"Authorization": f"Bearer {token}"}) + ) + LOG.debug("Request URL: %s", api_url) + LOG.debug("Headers: %s", request.headers) + response = self.session.send(request, timeout=self.timeout) + + LOG.debug("Response status: %s", response.status_code) + LOG.debug("Response text: %s", response.text) + + if response.status_code == 200: + return True + elif response.status_code in (401, 403): + return False + else: + LOG.error( + "Error for token %s: %s; %s", + token[:10] + "...", + response.status_code, + response.text, + ) + return None + except Exception as e: + LOG.error(f"Error validating Databricks token: {e}") + return None diff --git a/tests/test_registry.py b/tests/test_registry.py index f33bd7f..94c6a6f 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -31,6 +31,7 @@ def test_load_validators(self): assert "google_api_key" in validators assert "microsoft_teams_webhook" in validators assert "snyk_api_token" in validators + assert "databricks_token" in validators def test_get_validator(self): """Test getting a specific validator.""" diff --git a/tests/test_validators.py b/tests/test_validators.py index 68c50fc..a49f59e 100644 --- a/tests/test_validators.py +++ b/tests/test_validators.py @@ -12,6 +12,45 @@ from validate_secrets.validators.google_api_keys import GoogleApiKeyChecker from validate_secrets.validators.microsoft_teams_webhook import OfficeWebHookChecker from validate_secrets.validators.snyk_api_token import SnykAPITokenChecker +from validate_secrets.validators.databricks_token import DatabricksTokenChecker + +class TestDatabricksTokenChecker: + """Test the Databricks token validator with host_url parameter.""" + + def test_host_from_named_parameter(self): + """Test that host_url can be set as a named parameter.""" + checker = DatabricksTokenChecker( + host_url="https://my-workspace.databricks.com" + ) + assert checker.host_url == "https://my-workspace.databricks.com" + + def test_host_strips_trailing_slash(self): + """Test that trailing slash is stripped from host.""" + checker = DatabricksTokenChecker( + host_url="https://my-workspace.databricks.com/" + ) + assert checker.host_url == "https://my-workspace.databricks.com" + + def test_host_from_env_var_fallback(self, monkeypatch): + """Test that DATABRICKS_HOST env var is used as fallback.""" + monkeypatch.setenv("DATABRICKS_HOST", "https://env-workspace.databricks.com") + checker = DatabricksTokenChecker() + assert checker.host_url == "https://env-workspace.databricks.com" + + def test_named_param_overrides_env_var(self, monkeypatch): + """Test that host_url parameter takes precedence over env var.""" + monkeypatch.setenv("DATABRICKS_HOST", "https://env-workspace.databricks.com") + checker = DatabricksTokenChecker( + host_url="https://cli-workspace.databricks.com" + ) + assert checker.host_url == "https://cli-workspace.databricks.com" + + def test_missing_host_returns_none(self, monkeypatch): + """Test that check returns None when host is not configured.""" + monkeypatch.delenv("DATABRICKS_HOST", raising=False) + checker = DatabricksTokenChecker() + result = checker.check("dapi_fake_token_123") + assert result is None class TestFodselsNummerChecker: @@ -114,6 +153,9 @@ class TestValidatorMetadata: def test_all_validators_have_names(self): """Test that all validators have proper names.""" validators = [ + DatabricksTokenChecker( + host_url="https://test.databricks.com" + ), FodselsNummerChecker(), GoogleApiKeyChecker(), OfficeWebHookChecker(), @@ -128,6 +170,9 @@ def test_all_validators_have_names(self): def test_all_validators_have_descriptions(self): """Test that all validators have descriptions.""" validators = [ + DatabricksTokenChecker( + host_url="https://test.databricks.com" + ), FodselsNummerChecker(), GoogleApiKeyChecker(), OfficeWebHookChecker(), From 70dd9a280a382167856b85ed4930eeac20b544cd Mon Sep 17 00:00:00 2001 From: Sean Sinclair <146738689+sean-sinclair@users.noreply.github.com> Date: Fri, 27 Feb 2026 12:17:05 +0100 Subject: [PATCH 3/3] Refactor DatabricksTokenChecker test cases for improved readability --- tests/test_validators.py | 21 ++++++--------------- 1 file changed, 6 insertions(+), 15 deletions(-) diff --git a/tests/test_validators.py b/tests/test_validators.py index a49f59e..f7baec9 100644 --- a/tests/test_validators.py +++ b/tests/test_validators.py @@ -14,21 +14,18 @@ from validate_secrets.validators.snyk_api_token import SnykAPITokenChecker from validate_secrets.validators.databricks_token import DatabricksTokenChecker + class TestDatabricksTokenChecker: """Test the Databricks token validator with host_url parameter.""" def test_host_from_named_parameter(self): """Test that host_url can be set as a named parameter.""" - checker = DatabricksTokenChecker( - host_url="https://my-workspace.databricks.com" - ) + checker = DatabricksTokenChecker(host_url="https://my-workspace.databricks.com") assert checker.host_url == "https://my-workspace.databricks.com" def test_host_strips_trailing_slash(self): """Test that trailing slash is stripped from host.""" - checker = DatabricksTokenChecker( - host_url="https://my-workspace.databricks.com/" - ) + checker = DatabricksTokenChecker(host_url="https://my-workspace.databricks.com/") assert checker.host_url == "https://my-workspace.databricks.com" def test_host_from_env_var_fallback(self, monkeypatch): @@ -40,9 +37,7 @@ def test_host_from_env_var_fallback(self, monkeypatch): def test_named_param_overrides_env_var(self, monkeypatch): """Test that host_url parameter takes precedence over env var.""" monkeypatch.setenv("DATABRICKS_HOST", "https://env-workspace.databricks.com") - checker = DatabricksTokenChecker( - host_url="https://cli-workspace.databricks.com" - ) + checker = DatabricksTokenChecker(host_url="https://cli-workspace.databricks.com") assert checker.host_url == "https://cli-workspace.databricks.com" def test_missing_host_returns_none(self, monkeypatch): @@ -153,9 +148,7 @@ class TestValidatorMetadata: def test_all_validators_have_names(self): """Test that all validators have proper names.""" validators = [ - DatabricksTokenChecker( - host_url="https://test.databricks.com" - ), + DatabricksTokenChecker(host_url="https://test.databricks.com"), FodselsNummerChecker(), GoogleApiKeyChecker(), OfficeWebHookChecker(), @@ -170,9 +163,7 @@ def test_all_validators_have_names(self): def test_all_validators_have_descriptions(self): """Test that all validators have descriptions.""" validators = [ - DatabricksTokenChecker( - host_url="https://test.databricks.com" - ), + DatabricksTokenChecker(host_url="https://test.databricks.com"), FodselsNummerChecker(), GoogleApiKeyChecker(), OfficeWebHookChecker(),