diff --git a/README.md b/README.md index 8a9d771..56aadad 100644 --- a/README.md +++ b/README.md @@ -93,6 +93,7 @@ validate-secrets check-file input/secrets_file.json --file-format json --format | `google_api_key` | Google API Keys | AIza... format | | `microsoft_teams_webhook` | Microsoft Teams/Office 365 Webhooks | webhook.office.com URLs | | `snyk_api_token` | Snyk API Tokens | API tokens | +| `databricks_token` | Databricks Personal Access Tokens | `dapi...` format | Note: Most accurate way to see available validators is to run `validate-secrets list-validators` command. @@ -200,6 +201,15 @@ With the `--output` option you can also specify the file to write the output to: validate-secrets check-file secrets.txt google_api_key --file-format csv --output results.csv ``` +### Databricks Token Validation + +Validate Databricks Personal Access Tokens against a workspace. The `--host-url` flag provides the workspace URL: + +```bash +# Validate a single token +validate-secrets --host-url https://my-workspace.cloud.databricks.com validate "dapi1234abcd..." databricks_token +```` + ## License This project is licensed under the terms of the MIT open source license. Please refer to [LICENSE.md](LICENSE.md) for the full terms. diff --git a/src/validate_secrets/cli.py b/src/validate_secrets/cli.py index 481f8ab..ff7a037 100644 --- a/src/validate_secrets/cli.py +++ b/src/validate_secrets/cli.py @@ -33,8 +33,12 @@ is_flag=True, help="Enable debug logging. To use, add the flag as a first argument!", ) +@click.option( + "--host-url", + help="Base URL of the service to validate against", +) @click.pass_context -def cli(ctx, config, debug): +def cli(ctx, config, debug, host_url): """Extensible secret validation tool.""" ctx.ensure_object(dict) @@ -48,6 +52,7 @@ def cli(ctx, config, debug): ctx.obj["config"].setup_logging() ctx.obj["debug"] = debug + ctx.obj["host_url"] = host_url @cli.command() @@ -125,6 +130,7 @@ def check_file(ctx, file_path, secret_type, output, output_format, file_format, notify=notify or validation_config["notifications"], debug=ctx.obj["debug"], timeout=validation_config["timeout"], + host_url=ctx.obj.get("host_url"), ) for secret_data in track( @@ -245,6 +251,7 @@ def check_github(ctx, org, repo, secret_type, state, validity, output, output_fo notify=notify or validation_config["notifications"], debug=ctx.obj["debug"], timeout=validation_config["timeout"], + host_url=ctx.obj.get("host_url"), ) status = validator.check(secret) @@ -334,6 +341,7 @@ def validate(ctx, secret, secret_type, notify): notify=notify or validation_config["notifications"], debug=ctx.obj["debug"], timeout=validation_config["timeout"], + host_url=ctx.obj.get("host_url"), ) # Validate secret diff --git a/src/validate_secrets/core/base.py b/src/validate_secrets/core/base.py index 44f295c..432eb41 100644 --- a/src/validate_secrets/core/base.py +++ b/src/validate_secrets/core/base.py @@ -54,17 +54,25 @@ class Checker(ABC): name: str = "" description: str = "" - def __init__(self, notify: bool = False, debug: bool = False, timeout: int = 30) -> None: + def __init__( + self, + notify: bool = False, + debug: bool = False, + timeout: int = 30, + host_url: Optional[str] = None, + ) -> None: """Initialize the checker. Args: notify: Whether to send notifications to endpoints debug: Enable debug logging timeout: Timeout in seconds for validation + host_url: Base URL of the service to validate against """ self.notify = notify self.debug = debug self.timeout = timeout + self.host_url = host_url.rstrip("/") if host_url else None if self.debug: logging.getLogger().setLevel(logging.DEBUG) diff --git a/src/validate_secrets/validators/databricks_token.py b/src/validate_secrets/validators/databricks_token.py new file mode 100644 index 0000000..406fa83 --- /dev/null +++ b/src/validate_secrets/validators/databricks_token.py @@ -0,0 +1,77 @@ +#!/usr/bin/env python3 + +"""Validator for Databricks Personal Access Tokens.""" + +import os +import requests +import logging +from typing import Optional + +from ..core.base import Checker + +LOG = logging.getLogger(__name__) + + +class DatabricksTokenChecker(Checker): + """Class to check if a Databricks Personal Access Token is valid.""" + + name = "databricks_token" + description = "Validates Databricks Personal Access Tokens" + + def __init__( + self, + notify: bool = False, + debug: bool = False, + timeout: int = 30, + host_url: Optional[str] = None, + ) -> None: + super().__init__(notify, debug, timeout, host_url) + self.session = requests.Session() + self.session.headers.update({"Content-Type": "application/json"}) + + # Fall back to DATABRICKS_HOST env var if host_url not provided + if not self.host_url: + env_host = os.environ.get("DATABRICKS_HOST", "").rstrip("/") + if env_host: + self.host_url = env_host + + def check(self, token: str) -> Optional[bool]: + """Check if a Databricks token is still active.""" + token = token.strip() + + if not self.host_url: + LOG.error( + "No host URL configured. Use --host-url or set DATABRICKS_HOST env var." + ) + return None + + if self.notify: + LOG.debug("Cannot notify Databricks tokens") + + try: + api_url = f"{self.host_url}/api/2.0/token/list" + request = self.session.prepare_request( + requests.Request("GET", api_url, headers={"Authorization": f"Bearer {token}"}) + ) + LOG.debug("Request URL: %s", api_url) + LOG.debug("Headers: %s", request.headers) + response = self.session.send(request, timeout=self.timeout) + + LOG.debug("Response status: %s", response.status_code) + LOG.debug("Response text: %s", response.text) + + if response.status_code == 200: + return True + elif response.status_code in (401, 403): + return False + else: + LOG.error( + "Error for token %s: %s; %s", + token[:10] + "...", + response.status_code, + response.text, + ) + return None + except Exception as e: + LOG.error(f"Error validating Databricks token: {e}") + return None diff --git a/tests/test_registry.py b/tests/test_registry.py index f33bd7f..94c6a6f 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -31,6 +31,7 @@ def test_load_validators(self): assert "google_api_key" in validators assert "microsoft_teams_webhook" in validators assert "snyk_api_token" in validators + assert "databricks_token" in validators def test_get_validator(self): """Test getting a specific validator.""" diff --git a/tests/test_validators.py b/tests/test_validators.py index 68c50fc..f7baec9 100644 --- a/tests/test_validators.py +++ b/tests/test_validators.py @@ -12,6 +12,40 @@ from validate_secrets.validators.google_api_keys import GoogleApiKeyChecker from validate_secrets.validators.microsoft_teams_webhook import OfficeWebHookChecker from validate_secrets.validators.snyk_api_token import SnykAPITokenChecker +from validate_secrets.validators.databricks_token import DatabricksTokenChecker + + +class TestDatabricksTokenChecker: + """Test the Databricks token validator with host_url parameter.""" + + def test_host_from_named_parameter(self): + """Test that host_url can be set as a named parameter.""" + checker = DatabricksTokenChecker(host_url="https://my-workspace.databricks.com") + assert checker.host_url == "https://my-workspace.databricks.com" + + def test_host_strips_trailing_slash(self): + """Test that trailing slash is stripped from host.""" + checker = DatabricksTokenChecker(host_url="https://my-workspace.databricks.com/") + assert checker.host_url == "https://my-workspace.databricks.com" + + def test_host_from_env_var_fallback(self, monkeypatch): + """Test that DATABRICKS_HOST env var is used as fallback.""" + monkeypatch.setenv("DATABRICKS_HOST", "https://env-workspace.databricks.com") + checker = DatabricksTokenChecker() + assert checker.host_url == "https://env-workspace.databricks.com" + + def test_named_param_overrides_env_var(self, monkeypatch): + """Test that host_url parameter takes precedence over env var.""" + monkeypatch.setenv("DATABRICKS_HOST", "https://env-workspace.databricks.com") + checker = DatabricksTokenChecker(host_url="https://cli-workspace.databricks.com") + assert checker.host_url == "https://cli-workspace.databricks.com" + + def test_missing_host_returns_none(self, monkeypatch): + """Test that check returns None when host is not configured.""" + monkeypatch.delenv("DATABRICKS_HOST", raising=False) + checker = DatabricksTokenChecker() + result = checker.check("dapi_fake_token_123") + assert result is None class TestFodselsNummerChecker: @@ -114,6 +148,7 @@ class TestValidatorMetadata: def test_all_validators_have_names(self): """Test that all validators have proper names.""" validators = [ + DatabricksTokenChecker(host_url="https://test.databricks.com"), FodselsNummerChecker(), GoogleApiKeyChecker(), OfficeWebHookChecker(), @@ -128,6 +163,7 @@ def test_all_validators_have_names(self): def test_all_validators_have_descriptions(self): """Test that all validators have descriptions.""" validators = [ + DatabricksTokenChecker(host_url="https://test.databricks.com"), FodselsNummerChecker(), GoogleApiKeyChecker(), OfficeWebHookChecker(),