Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 138 additions & 18 deletions plumb/programs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from __future__ import annotations

import json as _json
import os
import shutil
import subprocess
from pathlib import Path

import dspy
Expand All @@ -10,53 +13,170 @@

_configured = False

# Default model for direct API access
_DEFAULT_API_MODEL = "anthropic/claude-sonnet-4-20250514"
# Default model alias for Claude Code CLI fallback
_DEFAULT_CLI_MODEL = "sonnet"


class ClaudeCodeLM(dspy.LM):
"""DSPy LM backend that routes through the Claude Code CLI.

This enables Plumb to work for users on Claude Max/Pro plans who
authenticate via OAuth through Claude Code, without needing a separate
``ANTHROPIC_API_KEY``.

Calls ``claude -p --model <model> --output-format json
--no-session-persistence`` as a subprocess for each inference request.
"""

def __init__(self, model: str = _DEFAULT_CLI_MODEL, max_tokens: int = 28000, **kwargs):
self.cli_model = model
self._max_tokens = max_tokens
super().__init__(model=f"claude-code/{model}", model_type="chat", **kwargs)

def forward(self, prompt=None, messages=None, **kwargs):
# Build prompt text from either a raw string or a messages list
if prompt is not None:
prompt_text = prompt if isinstance(prompt, str) else str(prompt)
elif messages:
parts = []
for msg in messages:
if isinstance(msg, dict):
content = msg.get("content", "")
if isinstance(content, list):
content = "\n".join(
c.get("text", str(c)) if isinstance(c, dict) else str(c)
for c in content
)
parts.append(content)
else:
parts.append(str(msg))
prompt_text = "\n\n".join(parts)
else:
prompt_text = ""

try:
result = subprocess.run(
[
"claude", "-p",
"--model", self.cli_model,
"--output-format", "json",
"--no-session-persistence",
],
input=prompt_text,
capture_output=True,
text=True,
timeout=120,
)
except subprocess.TimeoutExpired as e:
raise PlumbInferenceError(f"Claude Code CLI timed out: {e}") from e
except FileNotFoundError:
raise PlumbAuthError(
"Claude Code CLI ('claude') not found on PATH. "
"Install it from https://claude.ai/code or set ANTHROPIC_API_KEY instead."
)

if result.returncode != 0:
raise PlumbInferenceError(
f"Claude Code CLI exited {result.returncode}: {result.stderr[:500]}"
)

# Parse JSON output — array of event objects; text is in the final
# "result" event.
try:
events = _json.loads(result.stdout)
if isinstance(events, list):
for event in reversed(events):
if isinstance(event, dict) and event.get("type") == "result":
return [event.get("result", "")]
return [result.stdout]
except _json.JSONDecodeError:
return [result.stdout]

def __call__(self, prompt=None, messages=None, **kwargs):
return self.forward(prompt=prompt, messages=messages, **kwargs)


def _claude_code_available() -> bool:
"""Return True if the ``claude`` CLI is installed and runnable."""
return shutil.which("claude") is not None


def get_lm() -> dspy.LM:
return dspy.LM("anthropic/claude-sonnet-4-20250514", max_tokens=28000)
"""Return the best available LM backend.

Resolution order:
1. ``ANTHROPIC_API_KEY`` is set → direct Anthropic API via LiteLLM (fast).
2. ``claude`` CLI is on PATH → Claude Code CLI fallback (works with
Max/Pro plan OAuth, no API key needed).
3. Neither available → raise ``PlumbAuthError``.
"""
from dotenv import load_dotenv
load_dotenv(override=False)

if os.environ.get("ANTHROPIC_API_KEY"):
return dspy.LM(_DEFAULT_API_MODEL, max_tokens=28000)

if _claude_code_available():
return ClaudeCodeLM(model=_DEFAULT_CLI_MODEL, max_tokens=28000)

raise PlumbAuthError(
"No LLM backend available. Plumb needs one of:\n"
" 1. ANTHROPIC_API_KEY set in environment or .env file, OR\n"
" 2. Claude Code CLI installed (https://claude.ai/code) with an active session.\n"
"Set ANTHROPIC_API_KEY or install Claude Code to continue."
)


def configure_dspy() -> None:
"""Lazy DSPy configuration. No-op if already configured.
Never call at import time — ANTHROPIC_API_KEY absence would break
non-LLM commands like plumb status."""
Never call at import time — missing credentials would break
non-LLM commands like ``plumb status``."""
global _configured
if _configured:
return
from dotenv import load_dotenv
load_dotenv(override=False)
lm = get_lm()
dspy.configure(lm=lm, adapter=XMLAdapter())
_configured = True


def validate_api_access() -> None:
"""Check that ANTHROPIC_API_KEY is set and works. Loads .env first, then
falls back to exported environment variables. Performs a smoke test to
verify the key is valid. Raises PlumbAuthError if not found or invalid."""
from dotenv import load_dotenv
"""Verify that a working LLM backend is available.

Checks for ``ANTHROPIC_API_KEY`` first, then falls back to the Claude
Code CLI. Performs a smoke test to confirm the backend actually works.
Raises ``PlumbAuthError`` if neither is available or functional.
"""
from dotenv import load_dotenv
load_dotenv(override=False)
if not os.environ.get("ANTHROPIC_API_KEY"):

has_api_key = bool(os.environ.get("ANTHROPIC_API_KEY"))
has_cli = _claude_code_available()

if not has_api_key and not has_cli:
raise PlumbAuthError(
"ANTHROPIC_API_KEY is not set. "
"Plumb requires a valid Anthropic API key to analyze commits.\n"
"Set it in a .env file or export it: export ANTHROPIC_API_KEY=your-key-here"
"No LLM backend available. Plumb needs one of:\n"
" 1. ANTHROPIC_API_KEY set in environment or .env file, OR\n"
" 2. Claude Code CLI installed (https://claude.ai/code) with an active session."
)

# Smoke test: verify the key actually works
# Smoke test whichever backend we'll use
lm = get_lm()
try:
response = lm("Reply with only the word: hello")
if not response:
raise PlumbAuthError("API returned empty response - key may be invalid")
if not response or not str(response[0]).strip():
raise PlumbAuthError("LLM backend returned empty response")
except PlumbAuthError:
raise
except Exception as e:
err_str = str(e).lower()
if "auth" in err_str or "api key" in err_str or "401" in err_str:
raise PlumbAuthError(
f"ANTHROPIC_API_KEY is invalid or rejected: {e}"
f"API key is invalid or rejected: {e}"
) from e
raise PlumbAuthError(
f"Failed to verify API access: {e}"
f"Failed to verify LLM access: {e}"
) from e


Expand Down
147 changes: 140 additions & 7 deletions tests/test_programs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
import dspy
import pytest

from plumb.programs import run_with_retries, configure_dspy, validate_api_access, get_program_lm
from plumb.programs import (
run_with_retries, configure_dspy, validate_api_access, get_program_lm,
ClaudeCodeLM, _claude_code_available, get_lm,
)
from plumb.config import PlumbConfig, save_config, ensure_plumb_dir
from plumb import PlumbAuthError, PlumbInferenceError
from plumb.programs.diff_analyzer import (
Expand Down Expand Up @@ -39,23 +42,37 @@


class TestValidateApiAccess:
def test_raises_when_key_missing(self):
def test_raises_when_no_backend_available(self):
# plumb:req-60f97012
# plumb:req-ab686eaa
# plumb:req-222ddbbd
with patch("dotenv.load_dotenv"), \
patch.dict("os.environ", {}, clear=True):
patch.dict("os.environ", {}, clear=True), \
patch("plumb.programs._claude_code_available", return_value=False):
import os
os.environ.pop("ANTHROPIC_API_KEY", None)
with pytest.raises(PlumbAuthError, match="ANTHROPIC_API_KEY is not set"):
with pytest.raises(PlumbAuthError, match="No LLM backend available"):
validate_api_access()

def test_raises_when_key_empty(self):
def test_raises_when_key_empty_and_no_cli(self):
with patch("dotenv.load_dotenv"), \
patch.dict("os.environ", {"ANTHROPIC_API_KEY": ""}):
with pytest.raises(PlumbAuthError, match="ANTHROPIC_API_KEY is not set"):
patch.dict("os.environ", {"ANTHROPIC_API_KEY": ""}), \
patch("plumb.programs._claude_code_available", return_value=False):
with pytest.raises(PlumbAuthError, match="No LLM backend available"):
validate_api_access()

def test_falls_back_to_cli_when_no_key(self):
"""When no API key but claude CLI is available, fallback works."""
mock_lm = MagicMock(return_value=["hello"])
with patch("dotenv.load_dotenv"), \
patch.dict("os.environ", {}, clear=True), \
patch("plumb.programs._claude_code_available", return_value=True), \
patch("plumb.programs.get_lm", return_value=mock_lm):
import os
os.environ.pop("ANTHROPIC_API_KEY", None)
validate_api_access() # should not raise — CLI fallback works
mock_lm.assert_called_once()

def test_passes_when_key_set_and_api_works(self):
mock_lm = MagicMock(return_value="hello")
with patch("dotenv.load_dotenv"), \
Expand Down Expand Up @@ -91,6 +108,122 @@ def test_loads_dotenv_file(self):
mock_load.assert_called_once_with(override=False)


class TestClaudeCodeLM:
"""Tests for the ClaudeCodeLM backend that routes through claude CLI."""

def test_instantiation(self):
lm = ClaudeCodeLM(model="haiku")
assert lm.cli_model == "haiku"
assert lm.model == "claude-code/haiku"

def test_forward_parses_json_result_event(self):
"""ClaudeCodeLM extracts text from the final 'result' event."""
fake_output = json.dumps([
{"type": "system", "subtype": "init"},
{"type": "assistant", "message": {"content": [{"type": "text", "text": "hello"}]}},
{"type": "result", "subtype": "success", "result": "hello world"},
])
mock_result = MagicMock(returncode=0, stdout=fake_output, stderr="")
with patch("subprocess.run", return_value=mock_result):
lm = ClaudeCodeLM(model="haiku")
response = lm.forward(prompt="test prompt")
assert response == ["hello world"]

def test_forward_handles_plain_text_fallback(self):
"""Falls back to raw stdout if JSON parsing fails."""
mock_result = MagicMock(returncode=0, stdout="just plain text", stderr="")
with patch("subprocess.run", return_value=mock_result):
lm = ClaudeCodeLM(model="haiku")
response = lm.forward(prompt="test")
assert response == ["just plain text"]

def test_forward_raises_on_timeout(self):
import subprocess
with patch("subprocess.run", side_effect=subprocess.TimeoutExpired(cmd="claude", timeout=120)):
lm = ClaudeCodeLM()
with pytest.raises(PlumbInferenceError, match="timed out"):
lm.forward(prompt="test")

def test_forward_raises_on_missing_cli(self):
with patch("subprocess.run", side_effect=FileNotFoundError):
lm = ClaudeCodeLM()
with pytest.raises(PlumbAuthError, match="not found"):
lm.forward(prompt="test")

def test_forward_raises_on_nonzero_exit(self):
mock_result = MagicMock(returncode=1, stdout="", stderr="something failed")
with patch("subprocess.run", return_value=mock_result):
lm = ClaudeCodeLM()
with pytest.raises(PlumbInferenceError, match="exited 1"):
lm.forward(prompt="test")

def test_forward_builds_prompt_from_messages(self):
"""Messages list is concatenated into prompt text."""
fake_output = json.dumps([{"type": "result", "result": "ok"}])
mock_result = MagicMock(returncode=0, stdout=fake_output, stderr="")
with patch("subprocess.run", return_value=mock_result) as mock_run:
lm = ClaudeCodeLM()
lm.forward(messages=[
{"role": "system", "content": "You are helpful."},
{"role": "user", "content": "Say hi."},
])
call_kwargs = mock_run.call_args
assert "You are helpful." in call_kwargs.kwargs["input"]
assert "Say hi." in call_kwargs.kwargs["input"]

def test_callable_delegates_to_forward(self):
fake_output = json.dumps([{"type": "result", "result": "called"}])
mock_result = MagicMock(returncode=0, stdout=fake_output, stderr="")
with patch("subprocess.run", return_value=mock_result):
lm = ClaudeCodeLM()
result = lm("hello")
assert result == ["called"]


class TestClaudeCodeAvailable:
def test_returns_true_when_claude_on_path(self):
with patch("shutil.which", return_value="/usr/local/bin/claude"):
assert _claude_code_available() is True

def test_returns_false_when_not_installed(self):
with patch("shutil.which", return_value=None):
assert _claude_code_available() is False


class TestGetLm:
def test_returns_api_lm_when_key_set(self):
with patch("dotenv.load_dotenv"), \
patch.dict("os.environ", {"ANTHROPIC_API_KEY": "sk-ant-test"}):
lm = get_lm()
assert "anthropic" in lm.model

def test_returns_cli_lm_when_no_key_but_cli_available(self):
with patch("dotenv.load_dotenv"), \
patch.dict("os.environ", {}, clear=True), \
patch("plumb.programs._claude_code_available", return_value=True):
import os
os.environ.pop("ANTHROPIC_API_KEY", None)
lm = get_lm()
assert isinstance(lm, ClaudeCodeLM)

def test_raises_when_nothing_available(self):
with patch("dotenv.load_dotenv"), \
patch.dict("os.environ", {}, clear=True), \
patch("plumb.programs._claude_code_available", return_value=False):
import os
os.environ.pop("ANTHROPIC_API_KEY", None)
with pytest.raises(PlumbAuthError, match="No LLM backend available"):
get_lm()

def test_api_key_takes_precedence_over_cli(self):
"""When both are available, API key wins (faster)."""
with patch("dotenv.load_dotenv"), \
patch.dict("os.environ", {"ANTHROPIC_API_KEY": "sk-ant-test"}), \
patch("plumb.programs._claude_code_available", return_value=True):
lm = get_lm()
assert not isinstance(lm, ClaudeCodeLM)


class TestRunWithRetries:
def test_success_first_try(self):
result = run_with_retries(lambda: 42)
Expand Down