diff --git a/skillopt_sleep/__main__.py b/skillopt_sleep/__main__.py index 608487a2..200c4d44 100644 --- a/skillopt_sleep/__main__.py +++ b/skillopt_sleep/__main__.py @@ -69,13 +69,15 @@ def _report_payload(rep, outcome) -> Dict[str, Any]: def _add_common(p: argparse.ArgumentParser) -> None: p.add_argument("--project", default="") p.add_argument("--scope", default="", choices=["", "all", "invoked"]) - p.add_argument("--backend", default="", choices=["", "mock", "claude", "codex", "copilot"]) + p.add_argument("--backend", default="", choices=["", "mock", "claude", "codex", "copilot", "pi"]) p.add_argument("--model", default="") p.add_argument("--codex-path", default="", help="path to the real @openai/codex binary") + p.add_argument("--pi-path", default="", help="path to the pi binary (default: pi on PATH)") p.add_argument("--claude-home", default="", help="override ~/.claude (also isolates state)") p.add_argument("--codex-home", default="", help="override ~/.codex for archived session harvest") - p.add_argument("--source", default="", choices=["", "claude", "codex", "auto"], + p.add_argument("--source", default="", choices=["", "claude", "codex", "pi", "auto"], help="session transcript source") + p.add_argument("--pi-home", default="", help="override ~/.pi for pi-coding-agent session harvest") p.add_argument("--lookback-hours", type=int, default=None, help="harvest window in hours; 0 = scan full history") p.add_argument("--edit-budget", type=int, default=0) @@ -106,10 +108,14 @@ def _cfg_from_args(args, task_meta: Dict[str, Any] | None = None) -> Any: overrides["model"] = args.model if getattr(args, "codex_path", ""): overrides["codex_path"] = os.path.abspath(args.codex_path) + if getattr(args, "pi_path", ""): + overrides["pi_path"] = args.pi_path if getattr(args, "claude_home", ""): overrides["claude_home"] = os.path.abspath(args.claude_home) if getattr(args, "codex_home", ""): overrides["codex_home"] = os.path.abspath(args.codex_home) + if getattr(args, "pi_home", ""): + overrides["pi_home"] = os.path.abspath(args.pi_home) if getattr(args, "source", ""): overrides["transcript_source"] = args.source lh = getattr(args, "lookback_hours", None) diff --git a/skillopt_sleep/backend.py b/skillopt_sleep/backend.py index f472da75..ab5a718d 100644 --- a/skillopt_sleep/backend.py +++ b/skillopt_sleep/backend.py @@ -541,6 +541,80 @@ def tokens_used(self) -> int: return self._tokens +# ── Claude Code CLI backend ─────────────────────────────────────── + + +class PiCliBackend(CliBackend): + """Drives the authenticated `pi` CLI: pi -p "". + + pi (the pi coding agent) speaks the open Agent Skills standard and supports + a `-p` / `--print` headless mode, so it slots in alongside the claude/codex + CLI backends. Using pi here means the replay model is whatever the user has + configured in pi (e.g. `zai/glm-5.2`), keeping source and backend on the same + agent — which is the design intent of `--source pi`. + """ + + name = "pi" + + def __init__(self, model: str = "", pi_path: str = "pi", timeout: int = 180) -> None: + super().__init__(model=model or os.environ.get("SKILLOPT_SLEEP_PI_MODEL", ""), + timeout=timeout) + self.pi_path = pi_path + + _CLI_ERROR_MARKERS = ( + "Not logged in", + "Authentication required", + "Invalid API key", + "Unauthorized", + "provider not found", + "no provider", + ) + + def _detect_cli_error(self, stdout: str, stderr: str) -> None: + import logging + check_stdout = stdout if len(stdout) < 300 else "" + combined = check_stdout + "\n" + stderr + for marker in self._CLI_ERROR_MARKERS: + if marker.lower() in combined.lower(): + logging.getLogger("skillopt_sleep").warning( + "pi CLI returned a likely auth/config error: %s", + combined[:200].replace("\n", " "), + ) + self.last_call_error = combined[:500] + return + + def _call(self, prompt: str, *, max_tokens: int = 1024) -> str: + # Run ISOLATED so the ambient pi environment does not leak into the + # optimizer/target call: disable tools, skills, and context files, and + # run from a clean temp cwd so no project AGENTS.md is picked up. + # --no-tools no tool use during replay + # --no-skills do not load the user's installed skills + # --no-context-files do not load AGENTS.md/CLAUDE.md + # --no-session ephemeral; do not write to session history + # --no-extensions skip extension discovery + import shutil + cmd = [self.pi_path, "-p", "--no-session"] + cmd += ["--no-tools", "--no-skills", "--no-context-files", "--no-extensions"] + if self.model: + cmd += ["--model", self.model] + cmd += [prompt] + clean_cwd = tempfile.mkdtemp(prefix="skillopt_sleep_pi_") + try: + proc = subprocess.run( + cmd, capture_output=True, text=True, timeout=self.timeout, cwd=clean_cwd, + ) + except Exception: + return "" + finally: + try: + shutil.rmtree(clean_cwd, ignore_errors=True) + except Exception: + pass + out = (proc.stdout or "").strip() + self._detect_cli_error(out, proc.stderr or "") + return out + + # ── Claude Code CLI backend ─────────────────────────────────────────────────── class ClaudeCliBackend(CliBackend): @@ -1310,10 +1384,13 @@ def get_backend( model: str = "", claude_path: str = "claude", codex_path: str = "", + pi_path: str = "", azure_endpoint: str = "", project_dir: str = "", ) -> Backend: n = (name or "mock").strip().lower() + if n in {"pi", "pi_cli", "pi_coding_agent", "pi-coding-agent"}: + return PiCliBackend(model=model, pi_path=pi_path or "pi") if n in {"claude", "anthropic", "claude_cli", "claude_code"}: return ClaudeCliBackend(model=model, claude_path=claude_path) if n in {"codex", "codex_cli", "openai_codex"}: diff --git a/skillopt_sleep/config.py b/skillopt_sleep/config.py index 06303e09..ec555916 100644 --- a/skillopt_sleep/config.py +++ b/skillopt_sleep/config.py @@ -19,13 +19,15 @@ HOME_STATE_DIR = os.path.expanduser("~/.skillopt-sleep") CLAUDE_HOME = os.path.expanduser("~/.claude") CODEX_HOME = os.path.expanduser("~/.codex") +PI_HOME = os.path.expanduser("~/.pi") DEFAULTS: Dict[str, Any] = { # ── scope ────────────────────────────────────────────────────────────── "claude_home": CLAUDE_HOME, "codex_home": CODEX_HOME, - "transcript_source": "claude", # "claude" | "codex" | "auto" + "pi_home": PI_HOME, + "transcript_source": "claude", # "claude" | "codex" | "pi" | "auto" "projects": "invoked", # "invoked" | "all" | [list of abs paths] "invoked_project": "", # filled at runtime (cwd) when projects == "invoked" "lookback_hours": 72, # harvest window when no prior sleep recorded @@ -40,6 +42,7 @@ "model": "", # backend-specific; "" => backend default "gate_mode": "on", # "on" (validation-gated) | "off" (greedy, no hard filter) "codex_path": "", # "" => auto-detect the real @openai/codex binary + "pi_path": "", # "" => use `pi` on PATH "edit_budget": 4, # textual learning rate (max edits/night) "gate_metric": "mixed", # hard | soft | mixed (mixed best for tiny holdouts) "gate_mixed_weight": 0.5, @@ -107,6 +110,10 @@ def transcripts_dir(self) -> str: def codex_archived_sessions_dir(self) -> str: return os.path.join(self.data["codex_home"], "archived_sessions") + @property + def pi_sessions_dir(self) -> str: + return os.path.join(self.data["pi_home"], "agent", "sessions") + @property def history_path(self) -> str: return os.path.join(self.data["claude_home"], "history.jsonl") diff --git a/skillopt_sleep/cycle.py b/skillopt_sleep/cycle.py index 57b06a93..b27c449a 100644 --- a/skillopt_sleep/cycle.py +++ b/skillopt_sleep/cycle.py @@ -114,6 +114,7 @@ def run_sleep_cycle( cfg.get("backend", "mock"), model=cfg.get("model", ""), codex_path=cfg.get("codex_path", ""), + pi_path=cfg.get("pi_path", ""), project_dir=project, ) _progress(cfg, f"night {night}: project={project} backend={backend.name}") diff --git a/skillopt_sleep/harvest_pi.py b/skillopt_sleep/harvest_pi.py new file mode 100644 index 00000000..cbe3d6a5 --- /dev/null +++ b/skillopt_sleep/harvest_pi.py @@ -0,0 +1,221 @@ +"""SkillOpt-Sleep — pi (pi-coding-agent) session harvesting. + +Reads pi session transcript JSONL files (one per session, stored under +``~/.pi/agent/sessions//.jsonl``) and normalizes them +into :class:`SessionDigest` records without copying tool arguments, private +reasoning blocks (``thinking``), or raw tool outputs. + +pi schema (verified against real transcripts): + * A session file is a JSONL stream of entries with a ``type`` discriminator. + * ``type == "session"`` — exactly one per file; carries ``cwd`` + ``timestamp``. + * ``type == "message"`` — a conversational turn. ``message.role`` ∈ + {user, assistant, toolResult}; ``message.content`` is either a string or a + list of content blocks. Block types include ``text`` (kept), ``thinking`` + (private reasoning, skipped), and ``toolCall`` (carries ``name``). + * toolResult messages carry ``isError`` (bool) and ``toolName`` — a rare + per-call success/failure signal, surfaced here as a feedback signal so the + miner/gate can exploit checkable outcomes. + * Other types (``model_change``, ``thinking_level_change``, ``custom``, ...) are + metadata / tool-result payloads and are skipped for digestion. + +This module performs NO writes and NO network calls. +""" +from __future__ import annotations + +import os +import re +from typing import Any, Iterable, List, Optional + +from skillopt_sleep.harvest import ( + _detect_feedback, + _is_headless_replay, + _is_meta_prompt, + _iter_jsonl, + _project_matches, + _text_from_content, +) +from skillopt_sleep.types import SessionDigest + +# Mirror of skillopt_sleep.harvest_codex._SECRET_PATTERNS. Kept duplicated (not +# imported) so each harvester stays self-contained; if a third source appears, +# consider promoting these into a shared ``redact`` module. +_SECRET_PATTERNS: tuple[tuple[re.Pattern[str], str], ...] = ( + (re.compile(r"sk-[A-Za-z0-9_-]{10,}"), "[REDACTED_OPENAI_KEY]"), + (re.compile(r"(?i)(Authorization:\s*Bearer\s+)[^\s\"']+"), r"\1[REDACTED]"), + (re.compile(r"(?i)(Authorization:\s*Basic\s+)[^\s\"']+"), r"\1[REDACTED]"), + ( + re.compile(r"(?i)\b(api[_-]?key|token|password|secret)\b(\s*[:=]\s*)[^\s\"']+"), + r"\1\2[REDACTED]", + ), + ( + re.compile(r"(?i)\b(api[_-]?key|token|password|secret)\b(\s+)[^\s\"']+"), + r"\1\2[REDACTED]", + ), + ( + re.compile( + r"-----BEGIN [A-Z ]*PRIVATE KEY-----.*?-----END [A-Z ]*PRIVATE KEY-----", + re.DOTALL, + ), + "[REDACTED_PRIVATE_KEY]", + ), +) + + +def _redact_secrets(text: str) -> str: + for pattern, replacement in _SECRET_PATTERNS: + text = pattern.sub(replacement, text) + return text + + +def _pi_tool_names_from_content(content: Any) -> List[str]: + """Extract tool names from pi content blocks. + + pi uses ``{"type": "toolCall", "name": ...}`` (cf. Claude's ``tool_use``). + """ + names: List[str] = [] + if isinstance(content, list): + for b in content: + if isinstance(b, dict) and b.get("type") == "toolCall" and b.get("name"): + names.append(str(b["name"])) + return names + + +def _sanitize_tool_name(name: str) -> str: + return re.sub(r"[^A-Za-z0-9_.:-]+", "_", str(name))[:80] + + +def _dedup(xs: Iterable[str]) -> List[str]: + seen: set = set() + out: List[str] = [] + for x in xs: + if x not in seen: + seen.add(x) + out.append(x) + return out + + +def digest_pi_session(path: str, project: str = "") -> Optional[SessionDigest]: + """Build a :class:`SessionDigest` from one pi session transcript.""" + session_id = os.path.splitext(os.path.basename(path))[0] + started = "" + ended = "" + session_project = "" + user_prompts: List[str] = [] + assistant_finals: List[str] = [] + tools: List[str] = [] + feedback: List[str] = [] + n_user = 0 + n_asst = 0 + + for rec in _iter_jsonl(path): + rtype = rec.get("type") + ts = rec.get("timestamp") + if isinstance(ts, str) and ts: + if not started: + started = ts + ended = ts + # cwd lives on the `session` entry, not on individual messages. + if rtype == "session": + cwd = rec.get("cwd") + if isinstance(cwd, str) and cwd and not session_project: + session_project = cwd + continue + if rtype != "message": + continue + + msg = rec.get("message") + if not isinstance(msg, dict): + continue + role = msg.get("role") + content = msg.get("content") + + if role == "user": + text = _text_from_content(content) + text = _redact_secrets(text).strip() + if text and not _is_meta_prompt(text): + n_user += 1 + user_prompts.append(text) + feedback.extend(_detect_feedback(text)) + elif role == "assistant": + n_asst += 1 + tools.extend(_pi_tool_names_from_content(content)) + text = _text_from_content(content) + if text.strip(): + assistant_finals.append(_redact_secrets(text).strip()) + elif role == "toolResult": + # Corroborating tool-name source: pi records the resolved tool name + # on the result, which catches calls even when the toolCall block's + # `name` was absent. (toolName extraction only; see note below on isError.) + tool_name = msg.get("toolName") + if isinstance(tool_name, str) and tool_name: + tools.append(_sanitize_tool_name(tool_name)) + # NOTE: pi also carries `isError` (bool) here — whether that one tool + # invocation failed mechanically. We deliberately do NOT surface it + # as a feedback signal: intermediate tool errors are normal in + # agentic coding and are frequently followed by recovery and a + # successful final result. Treating every recovered error as + # `neg:` feedback would mislabel successful sessions as failures and + # poison the miner's task-outcome labels. Task outcome should be + # inferred from the user's judgment of the *final* result (the + # lexical feedback phrases above), not from transient tool mechanics. + + if project and not _project_matches(session_project or "", "invoked", project): + return None + if n_user == 0 and n_asst == 0: + return None + + digest = SessionDigest( + session_id=session_id, + project=session_project, + started_at=started, + ended_at=ended, + user_prompts=user_prompts, + assistant_finals=assistant_finals[-5:], + tools_used=_dedup(tools), + files_touched=[], # not extractable from pi transcripts without heuristics + feedback_signals=_dedup(feedback), + n_user_turns=n_user, + n_assistant_turns=n_asst, + raw_path=path, + ) + if _is_headless_replay(digest): + return None + return digest + + +def harvest_pi( + sessions_dir: str, + *, + scope: Any = "all", + invoked_project: str = "", + since_iso: Optional[str] = None, + limit: int = 0, +) -> List[SessionDigest]: + """Walk ``~/.pi/agent/sessions`` (one subdir per project slug) and return digests. + + Parameters mirror :func:`skillopt_sleep.harvest.harvest`. + """ + digests: List[SessionDigest] = [] + if not os.path.isdir(sessions_dir): + return digests + + paths: List[str] = [] + for root, _dirs, files in os.walk(sessions_dir): + for fn in files: + if fn.endswith(".jsonl"): + paths.append(os.path.join(root, fn)) + paths.sort(key=lambda p: os.path.getmtime(p), reverse=True) + + project_hint = invoked_project if scope == "invoked" else "" + for path in paths: + digest = digest_pi_session(path, project=project_hint) + if digest is None: + continue + if not _project_matches(digest.project or "", scope, invoked_project): + continue + if since_iso and digest.ended_at and digest.ended_at < since_iso: + continue + digests.append(digest) + if limit and len(digests) >= limit: + break + return digests diff --git a/skillopt_sleep/harvest_sources.py b/skillopt_sleep/harvest_sources.py index 501aa285..ae1695de 100644 --- a/skillopt_sleep/harvest_sources.py +++ b/skillopt_sleep/harvest_sources.py @@ -5,6 +5,7 @@ from skillopt_sleep.harvest import harvest from skillopt_sleep.harvest_codex import harvest_codex +from skillopt_sleep.harvest_pi import harvest_pi from skillopt_sleep.types import SessionDigest @@ -21,6 +22,14 @@ def harvest_for_config(cfg, *, since_iso: Optional[str] = None, limit: int = 0) since_iso=since_iso, limit=limit, ) + if source == "pi": + return harvest_pi( + cfg.pi_sessions_dir, + scope=scope, + invoked_project=invoked_project, + since_iso=since_iso, + limit=limit, + ) if source == "auto": codex_digests = harvest_codex( cfg.codex_archived_sessions_dir, @@ -31,6 +40,15 @@ def harvest_for_config(cfg, *, since_iso: Optional[str] = None, limit: int = 0) ) if codex_digests: return codex_digests + pi_digests = harvest_pi( + cfg.pi_sessions_dir, + scope=scope, + invoked_project=invoked_project, + since_iso=since_iso, + limit=limit, + ) + if pi_digests: + return pi_digests return harvest( cfg.transcripts_dir, diff --git a/tests/test_backend_pi.py b/tests/test_backend_pi.py new file mode 100644 index 00000000..4555fe0f --- /dev/null +++ b/tests/test_backend_pi.py @@ -0,0 +1,64 @@ +"""Tests for the pi CLI backend (`--backend pi`).""" +from __future__ import annotations + +from unittest import mock + +from skillopt_sleep.backend import PiCliBackend, get_backend + + +class _FakeProc: + def __init__(self, stdout: str, stderr: str = ""): + self.stdout = stdout + self.stderr = stderr + + +def test_get_backend_pi_aliases(): + for alias in ("pi", "pi_cli", "pi_coding_agent", "pi-coding-agent", "PI"): + be = get_backend(alias, model="zai/glm-5.2") + assert isinstance(be, PiCliBackend), alias + assert be.name == "pi" + + +def test_default_model_from_env(monkeypatch): + monkeypatch.setenv("SKILLOPT_SLEEP_PI_MODEL", "zai/glm-5.2") + be = PiCliBackend() + assert be.model == "zai/glm-5.2" + assert be.pi_path == "pi" + + +def test_call_builds_isolated_command_and_returns_stdout(): + be = PiCliBackend(model="zai/glm-5.2", pi_path="/usr/local/bin/pi") + captured = {} + + def fake_run(cmd, **kwargs): + captured["cmd"] = cmd + captured["cwd"] = kwargs.get("cwd") + return _FakeProc("answer text") + + with mock.patch("skillopt_sleep.backend.subprocess.run", side_effect=fake_run): + out = be._call("do the thing") + + assert out == "answer text" + cmd = captured["cmd"] + assert cmd[0:2] == ["/usr/local/bin/pi", "-p"] + # isolation flags must be present (no ambient skills/context/tools) + assert "--no-tools" in cmd + assert "--no-skills" in cmd + assert "--no-context-files" in cmd + assert "--no-extensions" in cmd + assert "--no-session" in cmd + assert "--model" in cmd and "zai/glm-5.2" in cmd + assert cmd[-1] == "do the thing" + # ran from a clean temp cwd, not inherited + assert captured["cwd"] is not None and captured["cwd"] != "" + + +def test_call_detects_auth_error_and_logs(): + be = PiCliBackend() + with mock.patch( + "skillopt_sleep.backend.subprocess.run", + return_value=_FakeProc("", stderr="Authentication required: not logged in"), + ): + out = be._call("hi") + assert out == "" # empty stdout + assert "Authentication required" in be.last_call_error diff --git a/tests/test_harvest_pi.py b/tests/test_harvest_pi.py new file mode 100644 index 00000000..a0ec477c --- /dev/null +++ b/tests/test_harvest_pi.py @@ -0,0 +1,80 @@ +"""Tests for the pi (pi-coding-agent) transcript harvester.""" +from __future__ import annotations + +import json + +from skillopt_sleep.harvest_pi import _redact_secrets, digest_pi_session, harvest_pi + + +def _write_session(tmp_path, slug, name, entries): + d = tmp_path / slug + d.mkdir(parents=True) + p = d / f"{name}.jsonl" + with open(p, "w") as f: + for rec in entries: + f.write(json.dumps(rec) + "\n") + return str(p) + + +PI_SESSION = [ + {"type": "session", "version": 1, "id": "s1", "timestamp": "2026-06-23T11:52:04.333Z", "cwd": "/home/u/proj"}, + {"type": "model_change", "id": "m", "timestamp": "2026-06-23T11:52:05.000Z", "modelId": "gpt-x"}, + {"type": "message", "id": "a1", "parentId": "s1", "timestamp": "2026-06-23T11:52:06.000Z", + "message": {"role": "user", "content": [{"type": "text", "text": "fix the failing tests"}]}}, + {"type": "message", "id": "a2", "parentId": "a1", "timestamp": "2026-06-23T11:52:07.000Z", + "message": {"role": "assistant", "content": [ + {"type": "thinking", "thinking": "private reasoning"}, + {"type": "text", "text": "Running the suite now."}, + {"type": "toolCall", "id": "call_1", "name": "bash", "arguments": {"command": "pytest"}}, + ]}}, + {"type": "message", "id": "a3", "parentId": "a2", "timestamp": "2026-06-23T11:52:08.000Z", + "message": {"role": "toolResult", "toolCallId": "call_1", "toolName": "bash", + "content": [{"type": "text", "text": "1 failed"}], "isError": True}}, + {"type": "message", "id": "a4", "parentId": "a3", "timestamp": "2026-06-23T11:52:30.000Z", + "message": {"role": "user", "content": "thanks, that works now"}}, + {"type": "message", "id": "a5", "parentId": "a4", "timestamp": "2026-06-23T11:52:31.000Z", + "message": {"role": "assistant", "content": [{"type": "text", "text": "Glad it's fixed."}]}}, +] + + +def test_digest_extracts_fields(): + d = digest_pi_session("/tmp/abc-123.jsonl") # missing file -> None + assert d is None + + +def test_digest_full_session(tmp_path): + p = _write_session(tmp_path, "--home-u-proj", "abc-123", PI_SESSION) + d = digest_pi_session(p) + assert d is not None + assert d.project == "/home/u/proj" + assert d.n_user_turns == 2 + assert d.n_assistant_turns == 2 + assert "bash" in d.tools_used # from toolCall block + assert any("works" in f for f in d.feedback_signals) # pos feedback + assert all( + not f.startswith("neg:tool_error") for f in d.feedback_signals + ) # isError deliberately NOT surfaced (recovered errors ≠ task failure) + assert all("private reasoning" not in f for f in d.feedback_signals) + # thinking blocks must not leak into finals + assert "private reasoning" not in " ".join(d.assistant_finals) + assert d.started_at.startswith("2026-06-23T11:52:04") + assert d.ended_at.startswith("2026-06-23T11:52:31") + + +def test_harvest_scope_filter(tmp_path): + _write_session(tmp_path, "--home-u-proj", "abc-123", PI_SESSION) + other = list(PI_SESSION) + other[0] = dict(PI_SESSION[0], cwd="/other/place") + _write_session(tmp_path, "--other", "xyz", other) + + all_scope = harvest_pi(str(tmp_path), scope="all") + assert len(all_scope) == 2 + invoked = harvest_pi(str(tmp_path), scope="invoked", invoked_project="/home/u/proj") + assert len(invoked) == 1 + assert invoked[0].project == "/home/u/proj" + + +def test_secret_redaction(): + out = _redact_secrets("Authorization: Bearer sk-1234567890abcdefghij") + assert "sk-1234567890abcdefghij" not in out + assert "[REDACTED]" in out