Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions skillopt_sleep/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,15 @@ def _report_payload(rep, outcome) -> Dict[str, Any]:
def _add_common(p: argparse.ArgumentParser) -> None:
p.add_argument("--project", default="")
p.add_argument("--scope", default="", choices=["", "all", "invoked"])
p.add_argument("--backend", default="", choices=["", "mock", "claude", "codex", "copilot"])
p.add_argument("--backend", default="", choices=["", "mock", "claude", "codex", "copilot", "pi"])
p.add_argument("--model", default="")
p.add_argument("--codex-path", default="", help="path to the real @openai/codex binary")
p.add_argument("--pi-path", default="", help="path to the pi binary (default: pi on PATH)")
p.add_argument("--claude-home", default="", help="override ~/.claude (also isolates state)")
p.add_argument("--codex-home", default="", help="override ~/.codex for archived session harvest")
p.add_argument("--source", default="", choices=["", "claude", "codex", "auto"],
p.add_argument("--source", default="", choices=["", "claude", "codex", "pi", "auto"],
help="session transcript source")
p.add_argument("--pi-home", default="", help="override ~/.pi for pi-coding-agent session harvest")
p.add_argument("--lookback-hours", type=int, default=None,
help="harvest window in hours; 0 = scan full history")
p.add_argument("--edit-budget", type=int, default=0)
Expand Down Expand Up @@ -106,10 +108,14 @@ def _cfg_from_args(args, task_meta: Dict[str, Any] | None = None) -> Any:
overrides["model"] = args.model
if getattr(args, "codex_path", ""):
overrides["codex_path"] = os.path.abspath(args.codex_path)
if getattr(args, "pi_path", ""):
overrides["pi_path"] = args.pi_path
if getattr(args, "claude_home", ""):
overrides["claude_home"] = os.path.abspath(args.claude_home)
if getattr(args, "codex_home", ""):
overrides["codex_home"] = os.path.abspath(args.codex_home)
if getattr(args, "pi_home", ""):
overrides["pi_home"] = os.path.abspath(args.pi_home)
if getattr(args, "source", ""):
overrides["transcript_source"] = args.source
lh = getattr(args, "lookback_hours", None)
Expand Down
77 changes: 77 additions & 0 deletions skillopt_sleep/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,80 @@ def tokens_used(self) -> int:
return self._tokens


# ── Claude Code CLI backend ───────────────────────────────────────


class PiCliBackend(CliBackend):
"""Drives the authenticated `pi` CLI: pi -p "<prompt>".

pi (the pi coding agent) speaks the open Agent Skills standard and supports
a `-p` / `--print` headless mode, so it slots in alongside the claude/codex
CLI backends. Using pi here means the replay model is whatever the user has
configured in pi (e.g. `zai/glm-5.2`), keeping source and backend on the same
agent — which is the design intent of `--source pi`.
"""

name = "pi"

def __init__(self, model: str = "", pi_path: str = "pi", timeout: int = 180) -> None:
super().__init__(model=model or os.environ.get("SKILLOPT_SLEEP_PI_MODEL", ""),
timeout=timeout)
self.pi_path = pi_path

_CLI_ERROR_MARKERS = (
"Not logged in",
"Authentication required",
"Invalid API key",
"Unauthorized",
"provider not found",
"no provider",
)

def _detect_cli_error(self, stdout: str, stderr: str) -> None:
import logging
check_stdout = stdout if len(stdout) < 300 else ""
combined = check_stdout + "\n" + stderr
for marker in self._CLI_ERROR_MARKERS:
if marker.lower() in combined.lower():
logging.getLogger("skillopt_sleep").warning(
"pi CLI returned a likely auth/config error: %s",
combined[:200].replace("\n", " "),
)
self.last_call_error = combined[:500]
return

def _call(self, prompt: str, *, max_tokens: int = 1024) -> str:
# Run ISOLATED so the ambient pi environment does not leak into the
# optimizer/target call: disable tools, skills, and context files, and
# run from a clean temp cwd so no project AGENTS.md is picked up.
# --no-tools no tool use during replay
# --no-skills do not load the user's installed skills
# --no-context-files do not load AGENTS.md/CLAUDE.md
# --no-session ephemeral; do not write to session history
# --no-extensions skip extension discovery
import shutil
cmd = [self.pi_path, "-p", "--no-session"]
cmd += ["--no-tools", "--no-skills", "--no-context-files", "--no-extensions"]
if self.model:
cmd += ["--model", self.model]
cmd += [prompt]
clean_cwd = tempfile.mkdtemp(prefix="skillopt_sleep_pi_")
try:
proc = subprocess.run(
cmd, capture_output=True, text=True, timeout=self.timeout, cwd=clean_cwd,
)
except Exception:
return ""
finally:
try:
shutil.rmtree(clean_cwd, ignore_errors=True)
except Exception:
pass
out = (proc.stdout or "").strip()
self._detect_cli_error(out, proc.stderr or "")
return out


# ── Claude Code CLI backend ───────────────────────────────────────────────────

class ClaudeCliBackend(CliBackend):
Expand Down Expand Up @@ -1310,10 +1384,13 @@ def get_backend(
model: str = "",
claude_path: str = "claude",
codex_path: str = "",
pi_path: str = "",
azure_endpoint: str = "",
project_dir: str = "",
) -> Backend:
n = (name or "mock").strip().lower()
if n in {"pi", "pi_cli", "pi_coding_agent", "pi-coding-agent"}:
return PiCliBackend(model=model, pi_path=pi_path or "pi")
if n in {"claude", "anthropic", "claude_cli", "claude_code"}:
return ClaudeCliBackend(model=model, claude_path=claude_path)
if n in {"codex", "codex_cli", "openai_codex"}:
Expand Down
9 changes: 8 additions & 1 deletion skillopt_sleep/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
HOME_STATE_DIR = os.path.expanduser("~/.skillopt-sleep")
CLAUDE_HOME = os.path.expanduser("~/.claude")
CODEX_HOME = os.path.expanduser("~/.codex")
PI_HOME = os.path.expanduser("~/.pi")


DEFAULTS: Dict[str, Any] = {
# ── scope ──────────────────────────────────────────────────────────────
"claude_home": CLAUDE_HOME,
"codex_home": CODEX_HOME,
"transcript_source": "claude", # "claude" | "codex" | "auto"
"pi_home": PI_HOME,
"transcript_source": "claude", # "claude" | "codex" | "pi" | "auto"
"projects": "invoked", # "invoked" | "all" | [list of abs paths]
"invoked_project": "", # filled at runtime (cwd) when projects == "invoked"
"lookback_hours": 72, # harvest window when no prior sleep recorded
Expand All @@ -40,6 +42,7 @@
"model": "", # backend-specific; "" => backend default
"gate_mode": "on", # "on" (validation-gated) | "off" (greedy, no hard filter)
"codex_path": "", # "" => auto-detect the real @openai/codex binary
"pi_path": "", # "" => use `pi` on PATH
"edit_budget": 4, # textual learning rate (max edits/night)
"gate_metric": "mixed", # hard | soft | mixed (mixed best for tiny holdouts)
"gate_mixed_weight": 0.5,
Expand Down Expand Up @@ -107,6 +110,10 @@ def transcripts_dir(self) -> str:
def codex_archived_sessions_dir(self) -> str:
return os.path.join(self.data["codex_home"], "archived_sessions")

@property
def pi_sessions_dir(self) -> str:
return os.path.join(self.data["pi_home"], "agent", "sessions")

@property
def history_path(self) -> str:
return os.path.join(self.data["claude_home"], "history.jsonl")
Expand Down
1 change: 1 addition & 0 deletions skillopt_sleep/cycle.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ def run_sleep_cycle(
cfg.get("backend", "mock"),
model=cfg.get("model", ""),
codex_path=cfg.get("codex_path", ""),
pi_path=cfg.get("pi_path", ""),
project_dir=project,
)
_progress(cfg, f"night {night}: project={project} backend={backend.name}")
Expand Down
221 changes: 221 additions & 0 deletions skillopt_sleep/harvest_pi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
"""SkillOpt-Sleep — pi (pi-coding-agent) session harvesting.

Reads pi session transcript JSONL files (one per session, stored under
``~/.pi/agent/sessions/<project-slug>/<sessionId>.jsonl``) and normalizes them
into :class:`SessionDigest` records without copying tool arguments, private
reasoning blocks (``thinking``), or raw tool outputs.

pi schema (verified against real transcripts):
* A session file is a JSONL stream of entries with a ``type`` discriminator.
* ``type == "session"`` — exactly one per file; carries ``cwd`` + ``timestamp``.
* ``type == "message"`` — a conversational turn. ``message.role`` ∈
{user, assistant, toolResult}; ``message.content`` is either a string or a
list of content blocks. Block types include ``text`` (kept), ``thinking``
(private reasoning, skipped), and ``toolCall`` (carries ``name``).
* toolResult messages carry ``isError`` (bool) and ``toolName`` — a rare
per-call success/failure signal, surfaced here as a feedback signal so the
miner/gate can exploit checkable outcomes.
* Other types (``model_change``, ``thinking_level_change``, ``custom``, ...) are
metadata / tool-result payloads and are skipped for digestion.

This module performs NO writes and NO network calls.
"""
from __future__ import annotations

import os
import re
from typing import Any, Iterable, List, Optional

from skillopt_sleep.harvest import (
_detect_feedback,
_is_headless_replay,
_is_meta_prompt,
_iter_jsonl,
_project_matches,
_text_from_content,
)
from skillopt_sleep.types import SessionDigest

# Mirror of skillopt_sleep.harvest_codex._SECRET_PATTERNS. Kept duplicated (not
# imported) so each harvester stays self-contained; if a third source appears,
# consider promoting these into a shared ``redact`` module.
_SECRET_PATTERNS: tuple[tuple[re.Pattern[str], str], ...] = (
(re.compile(r"sk-[A-Za-z0-9_-]{10,}"), "[REDACTED_OPENAI_KEY]"),
(re.compile(r"(?i)(Authorization:\s*Bearer\s+)[^\s\"']+"), r"\1[REDACTED]"),
(re.compile(r"(?i)(Authorization:\s*Basic\s+)[^\s\"']+"), r"\1[REDACTED]"),
(
re.compile(r"(?i)\b(api[_-]?key|token|password|secret)\b(\s*[:=]\s*)[^\s\"']+"),
r"\1\2[REDACTED]",
),
(
re.compile(r"(?i)\b(api[_-]?key|token|password|secret)\b(\s+)[^\s\"']+"),
r"\1\2[REDACTED]",
),
(
re.compile(
r"-----BEGIN [A-Z ]*PRIVATE KEY-----.*?-----END [A-Z ]*PRIVATE KEY-----",
re.DOTALL,
),
"[REDACTED_PRIVATE_KEY]",
),
)


def _redact_secrets(text: str) -> str:
for pattern, replacement in _SECRET_PATTERNS:
text = pattern.sub(replacement, text)
return text


def _pi_tool_names_from_content(content: Any) -> List[str]:
"""Extract tool names from pi content blocks.

pi uses ``{"type": "toolCall", "name": ...}`` (cf. Claude's ``tool_use``).
"""
names: List[str] = []
if isinstance(content, list):
for b in content:
if isinstance(b, dict) and b.get("type") == "toolCall" and b.get("name"):
names.append(str(b["name"]))
return names


def _sanitize_tool_name(name: str) -> str:
return re.sub(r"[^A-Za-z0-9_.:-]+", "_", str(name))[:80]


def _dedup(xs: Iterable[str]) -> List[str]:
seen: set = set()
out: List[str] = []
for x in xs:
if x not in seen:
seen.add(x)
out.append(x)
return out


def digest_pi_session(path: str, project: str = "") -> Optional[SessionDigest]:
"""Build a :class:`SessionDigest` from one pi session transcript."""
session_id = os.path.splitext(os.path.basename(path))[0]
started = ""
ended = ""
session_project = ""
user_prompts: List[str] = []
assistant_finals: List[str] = []
tools: List[str] = []
feedback: List[str] = []
n_user = 0
n_asst = 0

for rec in _iter_jsonl(path):
rtype = rec.get("type")
ts = rec.get("timestamp")
if isinstance(ts, str) and ts:
if not started:
started = ts
ended = ts
# cwd lives on the `session` entry, not on individual messages.
if rtype == "session":
cwd = rec.get("cwd")
if isinstance(cwd, str) and cwd and not session_project:
session_project = cwd
continue
if rtype != "message":
continue

msg = rec.get("message")
if not isinstance(msg, dict):
continue
role = msg.get("role")
content = msg.get("content")

if role == "user":
text = _text_from_content(content)
text = _redact_secrets(text).strip()
if text and not _is_meta_prompt(text):
n_user += 1
user_prompts.append(text)
feedback.extend(_detect_feedback(text))
elif role == "assistant":
n_asst += 1
tools.extend(_pi_tool_names_from_content(content))
text = _text_from_content(content)
if text.strip():
assistant_finals.append(_redact_secrets(text).strip())
elif role == "toolResult":
# Corroborating tool-name source: pi records the resolved tool name
# on the result, which catches calls even when the toolCall block's
# `name` was absent. (toolName extraction only; see note below on isError.)
tool_name = msg.get("toolName")
if isinstance(tool_name, str) and tool_name:
tools.append(_sanitize_tool_name(tool_name))
# NOTE: pi also carries `isError` (bool) here — whether that one tool
# invocation failed mechanically. We deliberately do NOT surface it
# as a feedback signal: intermediate tool errors are normal in
# agentic coding and are frequently followed by recovery and a
# successful final result. Treating every recovered error as
# `neg:` feedback would mislabel successful sessions as failures and
# poison the miner's task-outcome labels. Task outcome should be
# inferred from the user's judgment of the *final* result (the
# lexical feedback phrases above), not from transient tool mechanics.

if project and not _project_matches(session_project or "", "invoked", project):
return None
if n_user == 0 and n_asst == 0:
return None

digest = SessionDigest(
session_id=session_id,
project=session_project,
started_at=started,
ended_at=ended,
user_prompts=user_prompts,
assistant_finals=assistant_finals[-5:],
tools_used=_dedup(tools),
files_touched=[], # not extractable from pi transcripts without heuristics
feedback_signals=_dedup(feedback),
n_user_turns=n_user,
n_assistant_turns=n_asst,
raw_path=path,
)
if _is_headless_replay(digest):
return None
return digest


def harvest_pi(
sessions_dir: str,
*,
scope: Any = "all",
invoked_project: str = "",
since_iso: Optional[str] = None,
limit: int = 0,
) -> List[SessionDigest]:
"""Walk ``~/.pi/agent/sessions`` (one subdir per project slug) and return digests.

Parameters mirror :func:`skillopt_sleep.harvest.harvest`.
"""
digests: List[SessionDigest] = []
if not os.path.isdir(sessions_dir):
return digests

paths: List[str] = []
for root, _dirs, files in os.walk(sessions_dir):
for fn in files:
if fn.endswith(".jsonl"):
paths.append(os.path.join(root, fn))
paths.sort(key=lambda p: os.path.getmtime(p), reverse=True)

project_hint = invoked_project if scope == "invoked" else ""
for path in paths:
digest = digest_pi_session(path, project=project_hint)
if digest is None:
continue
if not _project_matches(digest.project or "", scope, invoked_project):
continue
if since_iso and digest.ended_at and digest.ended_at < since_iso:
continue
digests.append(digest)
if limit and len(digests) >= limit:
break
return digests
Loading