Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ validate-secrets check-file input/secrets_file.json --file-format json --format
| `google_api_key` | Google API Keys | AIza... format |
| `microsoft_teams_webhook` | Microsoft Teams/Office 365 Webhooks | webhook.office.com URLs |
| `snyk_api_token` | Snyk API Tokens | API tokens |
| `databricks_token` | Databricks Personal Access Tokens | `dapi...` format |

Note: Most accurate way to see available validators is to run `validate-secrets list-validators` command.

Expand Down Expand Up @@ -200,6 +201,15 @@ With the `--output` option you can also specify the file to write the output to:
validate-secrets check-file secrets.txt google_api_key --file-format csv --output results.csv
```

### Databricks Token Validation

Validate Databricks Personal Access Tokens against a workspace. The `--host-url` flag provides the workspace URL:

```bash
# Validate a single token
validate-secrets --host-url https://my-workspace.cloud.databricks.com validate "dapi1234abcd..." databricks_token
````

## License

This project is licensed under the terms of the MIT open source license. Please refer to [LICENSE.md](LICENSE.md) for the full terms.
Expand Down
10 changes: 9 additions & 1 deletion src/validate_secrets/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,12 @@
is_flag=True,
help="Enable debug logging. To use, add the flag as a first argument!",
)
@click.option(
"--host-url",
help="Base URL of the service to validate against",
)
@click.pass_context
def cli(ctx, config, debug):
def cli(ctx, config, debug, host_url):
"""Extensible secret validation tool."""
ctx.ensure_object(dict)

Expand All @@ -48,6 +52,7 @@ def cli(ctx, config, debug):
ctx.obj["config"].setup_logging()

ctx.obj["debug"] = debug
ctx.obj["host_url"] = host_url


@cli.command()
Expand Down Expand Up @@ -125,6 +130,7 @@ def check_file(ctx, file_path, secret_type, output, output_format, file_format,
notify=notify or validation_config["notifications"],
debug=ctx.obj["debug"],
timeout=validation_config["timeout"],
host_url=ctx.obj.get("host_url"),
)

for secret_data in track(
Expand Down Expand Up @@ -245,6 +251,7 @@ def check_github(ctx, org, repo, secret_type, state, validity, output, output_fo
notify=notify or validation_config["notifications"],
debug=ctx.obj["debug"],
timeout=validation_config["timeout"],
host_url=ctx.obj.get("host_url"),
)

status = validator.check(secret)
Expand Down Expand Up @@ -334,6 +341,7 @@ def validate(ctx, secret, secret_type, notify):
notify=notify or validation_config["notifications"],
debug=ctx.obj["debug"],
timeout=validation_config["timeout"],
host_url=ctx.obj.get("host_url"),
)

# Validate secret
Expand Down
10 changes: 9 additions & 1 deletion src/validate_secrets/core/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,25 @@ class Checker(ABC):
name: str = ""
description: str = ""

def __init__(self, notify: bool = False, debug: bool = False, timeout: int = 30) -> None:
def __init__(
self,
notify: bool = False,
debug: bool = False,
timeout: int = 30,
host_url: Optional[str] = None,
) -> None:
"""Initialize the checker.

Args:
notify: Whether to send notifications to endpoints
debug: Enable debug logging
timeout: Timeout in seconds for validation
host_url: Base URL of the service to validate against
"""
self.notify = notify
self.debug = debug
self.timeout = timeout
self.host_url = host_url.rstrip("/") if host_url else None

if self.debug:
logging.getLogger().setLevel(logging.DEBUG)
Expand Down
77 changes: 77 additions & 0 deletions src/validate_secrets/validators/databricks_token.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
#!/usr/bin/env python3

"""Validator for Databricks Personal Access Tokens."""

import os
import requests
import logging
from typing import Optional

from ..core.base import Checker

LOG = logging.getLogger(__name__)


class DatabricksTokenChecker(Checker):
"""Class to check if a Databricks Personal Access Token is valid."""

name = "databricks_token"
description = "Validates Databricks Personal Access Tokens"

def __init__(
self,
notify: bool = False,
debug: bool = False,
timeout: int = 30,
host_url: Optional[str] = None,
) -> None:
super().__init__(notify, debug, timeout, host_url)
self.session = requests.Session()
self.session.headers.update({"Content-Type": "application/json"})

# Fall back to DATABRICKS_HOST env var if host_url not provided
if not self.host_url:
env_host = os.environ.get("DATABRICKS_HOST", "").rstrip("/")
if env_host:
self.host_url = env_host

def check(self, token: str) -> Optional[bool]:
"""Check if a Databricks token is still active."""
token = token.strip()

if not self.host_url:
LOG.error(
"No host URL configured. Use --host-url <url> or set DATABRICKS_HOST env var."
)
return None

if self.notify:
LOG.debug("Cannot notify Databricks tokens")

try:
api_url = f"{self.host_url}/api/2.0/token/list"
request = self.session.prepare_request(
requests.Request("GET", api_url, headers={"Authorization": f"Bearer {token}"})
)
LOG.debug("Request URL: %s", api_url)
LOG.debug("Headers: %s", request.headers)
response = self.session.send(request, timeout=self.timeout)

LOG.debug("Response status: %s", response.status_code)
LOG.debug("Response text: %s", response.text)

if response.status_code == 200:
return True
elif response.status_code in (401, 403):
return False
else:
LOG.error(
"Error for token %s: %s; %s",
token[:10] + "...",
response.status_code,
response.text,
)
return None
except Exception as e:
LOG.error(f"Error validating Databricks token: {e}")
return None
1 change: 1 addition & 0 deletions tests/test_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def test_load_validators(self):
assert "google_api_key" in validators
assert "microsoft_teams_webhook" in validators
assert "snyk_api_token" in validators
assert "databricks_token" in validators

def test_get_validator(self):
"""Test getting a specific validator."""
Expand Down
36 changes: 36 additions & 0 deletions tests/test_validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,40 @@
from validate_secrets.validators.google_api_keys import GoogleApiKeyChecker
from validate_secrets.validators.microsoft_teams_webhook import OfficeWebHookChecker
from validate_secrets.validators.snyk_api_token import SnykAPITokenChecker
from validate_secrets.validators.databricks_token import DatabricksTokenChecker


class TestDatabricksTokenChecker:
"""Test the Databricks token validator with host_url parameter."""

def test_host_from_named_parameter(self):
"""Test that host_url can be set as a named parameter."""
checker = DatabricksTokenChecker(host_url="https://my-workspace.databricks.com")
assert checker.host_url == "https://my-workspace.databricks.com"

def test_host_strips_trailing_slash(self):
"""Test that trailing slash is stripped from host."""
checker = DatabricksTokenChecker(host_url="https://my-workspace.databricks.com/")
assert checker.host_url == "https://my-workspace.databricks.com"

def test_host_from_env_var_fallback(self, monkeypatch):
"""Test that DATABRICKS_HOST env var is used as fallback."""
monkeypatch.setenv("DATABRICKS_HOST", "https://env-workspace.databricks.com")
checker = DatabricksTokenChecker()
assert checker.host_url == "https://env-workspace.databricks.com"

def test_named_param_overrides_env_var(self, monkeypatch):
"""Test that host_url parameter takes precedence over env var."""
monkeypatch.setenv("DATABRICKS_HOST", "https://env-workspace.databricks.com")
checker = DatabricksTokenChecker(host_url="https://cli-workspace.databricks.com")
assert checker.host_url == "https://cli-workspace.databricks.com"

def test_missing_host_returns_none(self, monkeypatch):
"""Test that check returns None when host is not configured."""
monkeypatch.delenv("DATABRICKS_HOST", raising=False)
checker = DatabricksTokenChecker()
result = checker.check("dapi_fake_token_123")
assert result is None


class TestFodselsNummerChecker:
Expand Down Expand Up @@ -114,6 +148,7 @@ class TestValidatorMetadata:
def test_all_validators_have_names(self):
"""Test that all validators have proper names."""
validators = [
DatabricksTokenChecker(host_url="https://test.databricks.com"),
FodselsNummerChecker(),
GoogleApiKeyChecker(),
OfficeWebHookChecker(),
Expand All @@ -128,6 +163,7 @@ def test_all_validators_have_names(self):
def test_all_validators_have_descriptions(self):
"""Test that all validators have descriptions."""
validators = [
DatabricksTokenChecker(host_url="https://test.databricks.com"),
FodselsNummerChecker(),
GoogleApiKeyChecker(),
OfficeWebHookChecker(),
Expand Down