Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 163 additions & 0 deletions judgment/api_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
"""Shared HTTP client for the Judgment platform REST API.

Resolves the API key from (in priority order):
1. JUDGMENT_API_KEY environment variable
2. System keyring (service="judgment-cli")
3. Config file (~/.judgment/config.toml [auth] api_key)

Resolves the API URL from:
1. JUDGMENT_API_URL environment variable
2. Config file (~/.judgment/config.toml [defaults] api_url)
3. Default: https://api.judgment.com
"""

from __future__ import annotations

import os
from typing import Any, Optional

import httpx
import typer

from .config import get_config_value, DEFAULT_API_URL
from .output import print_error

# Keyring is optional β€” if the backend is not available we silently skip it.
try:
import keyring as _keyring
except Exception: # noqa: BLE001
_keyring = None # type: ignore[assignment]

KEYRING_SERVICE = "judgment-cli"
KEYRING_USERNAME = "api_key"

_REQUEST_TIMEOUT = 30.0


def _resolve_api_key() -> Optional[str]:
"""Return the API key using the priority chain, or None."""
# 1. Environment variable
key = os.environ.get("JUDGMENT_API_KEY")
if key:
return key

# 2. System keyring
if _keyring is not None:
try:
key = _keyring.get_password(KEYRING_SERVICE, KEYRING_USERNAME)
if key:
return key
except Exception: # noqa: BLE001
pass

# 3. Config file
key = get_config_value("auth", "api_key")
if key:
return str(key)

return None


def _resolve_api_url() -> str:
"""Return the base API URL using the priority chain."""
# 1. Environment variable
url = os.environ.get("JUDGMENT_API_URL")
if url:
return url.rstrip("/")

# 2. Config file
url = get_config_value("defaults", "api_url")
if url:
return str(url).rstrip("/")

return DEFAULT_API_URL


def _handle_error_response(response: httpx.Response) -> None:
"""Inspect the HTTP response and emit user-friendly messages for common errors."""
if response.is_success:
return

status = response.status_code

if status == 401:
print_error("Authentication failed. Run [bold]judgment login[/bold] to set a valid API key.")
raise typer.Exit(code=1)
elif status == 403:
print_error("Permission denied. You do not have access to this resource.")
raise typer.Exit(code=1)
elif status == 404:
print_error("Resource not found. Please check the ID or path and try again.")
raise typer.Exit(code=1)
elif status >= 500:
print_error(f"Server error ({status}). Please try again later or contact support.")
raise typer.Exit(code=1)
else:
# Generic fallback
try:
detail = response.json()
except Exception:
detail = response.text
print_error(f"Request failed ({status}): {detail}")
raise typer.Exit(code=1)


class JudgmentAPIClient:
"""Thin wrapper around httpx that handles auth and error formatting."""

def __init__(
self,
api_key: Optional[str] = None,
api_url: Optional[str] = None,
) -> None:
self._api_key = api_key or _resolve_api_key()
self._api_url = api_url or _resolve_api_url()

# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------

def _ensure_auth(self) -> str:
if not self._api_key:
print_error("No API key found. Run [bold]judgment login[/bold] to authenticate.")
raise typer.Exit(code=1)
return self._api_key

def _headers(self) -> dict[str, str]:
key = self._ensure_auth()
return {
"Authorization": f"Bearer {key}",
"Content-Type": "application/json",
}

def _url(self, path: str) -> str:
return f"{self._api_url}/{path.lstrip('/')}"

# ------------------------------------------------------------------
# Public HTTP methods
# ------------------------------------------------------------------

def get(self, path: str, **kwargs: Any) -> httpx.Response:
response = httpx.get(self._url(path), headers=self._headers(), timeout=_REQUEST_TIMEOUT, **kwargs)
_handle_error_response(response)
return response

def post(self, path: str, **kwargs: Any) -> httpx.Response:
response = httpx.post(self._url(path), headers=self._headers(), timeout=_REQUEST_TIMEOUT, **kwargs)
_handle_error_response(response)
return response

def put(self, path: str, **kwargs: Any) -> httpx.Response:
response = httpx.put(self._url(path), headers=self._headers(), timeout=_REQUEST_TIMEOUT, **kwargs)
_handle_error_response(response)
return response

def patch(self, path: str, **kwargs: Any) -> httpx.Response:
response = httpx.patch(self._url(path), headers=self._headers(), timeout=_REQUEST_TIMEOUT, **kwargs)
_handle_error_response(response)
return response

def delete(self, path: str, **kwargs: Any) -> httpx.Response:
response = httpx.delete(self._url(path), headers=self._headers(), timeout=_REQUEST_TIMEOUT, **kwargs)
_handle_error_response(response)
return response
31 changes: 27 additions & 4 deletions judgment/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,23 @@
from .command_utils.self_host import deploy, create_https_listener
from typing_extensions import Annotated

from .output import set_output_options
from .commands.config_cmd import config_app
from .commands.auth import auth_app
from .commands.orgs import orgs_app
from .commands.projects import projects_app

app = typer.Typer(help="Judgment CLI tool for managing self-hosted instances.", add_completion=False)

app = typer.Typer(help="Judgment CLI tool for managing self-hosted instances and the Judgment platform.", add_completion=False)
self_host_app = typer.Typer(help="Commands for self-hosting Judgment", add_completion=False)
app.add_typer(self_host_app, name="self-host")

# Register new command groups
app.add_typer(config_app, name="config")
app.add_typer(auth_app, name="auth")
app.add_typer(orgs_app, name="orgs")
app.add_typer(projects_app, name="projects")

class ComputeSize(str, Enum):
nano = "nano"
micro = "micro"
Expand Down Expand Up @@ -182,10 +194,21 @@ def https_listener():



# Still require calling subcommand even if there is only one
# Global options callback
@app.callback()
def callback():
pass
def callback(
ctx: typer.Context,
json_output: bool = typer.Option(False, "--json", help="Output raw JSON instead of formatted tables"),
no_color: bool = typer.Option(False, "--no-color", help="Disable colored output"),
org_id: Optional[str] = typer.Option(None, "--org-id", help="Override default organization ID"),
project_id: Optional[str] = typer.Option(None, "--project-id", help="Override default project ID"),
) -> None:
"""Judgment CLI tool for managing self-hosted instances and the Judgment platform."""
set_output_options(json_mode=json_output, no_color=no_color)
# Store global options in context meta for subcommands to access
ctx.ensure_object(dict)
ctx.meta["org_id"] = org_id
ctx.meta["project_id"] = project_id

def main():
app()
Expand Down
1 change: 1 addition & 0 deletions judgment/commands/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Judgment CLI command groups."""
92 changes: 92 additions & 0 deletions judgment/commands/auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
"""Auth commands for the Judgment CLI.

judgment login β€” prompt for API key, validate, store
judgment logout β€” remove stored API key
judgment whoami β€” show current user info
"""

from __future__ import annotations

import typer

from ..api_client import JudgmentAPIClient, KEYRING_SERVICE, KEYRING_USERNAME
from ..config import set_config_value
from ..output import print_detail, print_success, print_error, print_info

auth_app = typer.Typer(help="Authenticate with the Judgment platform.", add_completion=False)


@auth_app.command("login")
def login(
api_key: str = typer.Option(
"",
"--api-key",
"-k",
help="API key (omit to be prompted interactively)",
),
) -> None:
"""Authenticate by providing an API key."""
if not api_key:
api_key = typer.prompt("Enter your Judgment API key", hide_input=True)

if not api_key:
print_error("No API key provided.")
raise typer.Exit(code=1)

# Validate by calling /users/me
print_info("Validating API key...")
client = JudgmentAPIClient(api_key=api_key)
try:
response = client.get("/users/me")
except SystemExit:
# Re-raise typer.Exit from error handler
raise
except Exception as exc:
print_error(f"Could not reach the API: {exc}")
raise typer.Exit(code=1)

user = response.json()
set_config_value("auth", "api_key", api_key)

# Also store in keyring if available
try:
import keyring as _keyring

_keyring.set_password(KEYRING_SERVICE, KEYRING_USERNAME, api_key)
except Exception: # noqa: BLE001
pass

print_success(f"Logged in as {user.get('email', user.get('name', 'unknown'))}")


@auth_app.command("logout")
def logout() -> None:
"""Remove the stored API key."""
set_config_value("auth", "api_key", "")

# Also try to remove from keyring
try:
import keyring as _keyring

_keyring.delete_password(KEYRING_SERVICE, KEYRING_USERNAME)
except Exception: # noqa: BLE001
pass

print_success("Logged out. API key removed.")


@auth_app.command("whoami")
def whoami() -> None:
"""Show the currently authenticated user."""
client = JudgmentAPIClient()
try:
response = client.get("/users/me")
except SystemExit:
raise
except Exception as exc:
print_error(f"Could not reach the API: {exc}")
raise typer.Exit(code=1)

user = response.json()
fields = [k for k in user.keys()]
print_detail(user, fields)
65 changes: 65 additions & 0 deletions judgment/commands/config_cmd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
"""Config commands for the Judgment CLI.

judgment config set <key> <value>
judgment config show
judgment config path
"""

from __future__ import annotations

import typer

from ..config import load_config, set_config_value, get_config_path
from ..output import print_detail, print_success, print_error, get_console, is_json_mode, print_json

config_app = typer.Typer(help="Manage CLI configuration.", add_completion=False)


@config_app.command("set")
def config_set(
key: str = typer.Argument(..., help="Config key in 'section.key' format (e.g. defaults.api_url)"),
value: str = typer.Argument(..., help="Value to set"),
) -> None:
"""Set a configuration value."""
parts = key.split(".", 1)
if len(parts) != 2:
print_error("Key must be in 'section.key' format (e.g. defaults.api_url)")
raise typer.Exit(code=1)

section, config_key = parts
set_config_value(section, config_key, value)
print_success(f"Set {key} = {value}")


@config_app.command("show")
def config_show() -> None:
"""Display the current configuration (API key is masked)."""
config = load_config()

# Mask the API key for display
if "auth" in config and config["auth"].get("api_key"):
raw = str(config["auth"]["api_key"])
if len(raw) > 8:
config["auth"]["api_key"] = raw[:4] + "****" + raw[-4:]
else:
config["auth"]["api_key"] = "****"

if is_json_mode():
print_json(config)
return

console = get_console()
for section, values in config.items():
console.print(f"\n[bold cyan]\\[{section}][/bold cyan]")
if isinstance(values, dict):
for k, v in values.items():
console.print(f" {k} = {v}")
else:
console.print(f" {values}")


@config_app.command("path")
def config_path() -> None:
"""Show the config file path."""
console = get_console()
console.print(str(get_config_path()))
Loading