diff --git a/README.md b/README.md index 9114c66..7a6c760 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,11 @@ # openclaw-superpowers -**44 ready-to-use skills that make your AI agent autonomous, self-healing, and self-improving.** +**49 ready-to-use skills that make your AI agent autonomous, self-healing, and self-improving.** -[![Skills](https://img.shields.io/badge/skills-44-blue)](#skills-included) +[![Skills](https://img.shields.io/badge/skills-49-blue)](#skills-included) [![Security](https://img.shields.io/badge/security_skills-6-green)](#security--guardrails) -[![Cron](https://img.shields.io/badge/cron_scheduled-12-orange)](#openclaw-native-28-skills) -[![Scripts](https://img.shields.io/badge/companion_scripts-15-purple)](#companion-scripts) +[![Cron](https://img.shields.io/badge/cron_scheduled-15-orange)](#openclaw-native-33-skills) +[![Scripts](https://img.shields.io/badge/companion_scripts-20-purple)](#companion-scripts) [![License: MIT](https://img.shields.io/badge/license-MIT-yellow.svg)](LICENSE) A plug-and-play skill library for [OpenClaw](https://github.com/openclaw/openclaw) — the open-source AI agent runtime. Gives your agent structured thinking, security guardrails, persistent memory, cron scheduling, self-recovery, and the ability to write its own new skills during conversation. @@ -20,12 +20,13 @@ Built for developers who want their AI agent to run autonomously 24/7, not just Most AI agent frameworks give you a chatbot that forgets everything between sessions. OpenClaw is different — it runs persistently, handles multi-hour tasks, and has native cron scheduling. But out of the box, it doesn't know *how* to use those capabilities well. -**openclaw-superpowers bridges that gap.** Install once, and your agent immediately knows how to: +**openclaw-superpowers bridges that gap.** Install 49 skills in one command, and your agent immediately knows how to: - **Think before it acts** — brainstorming, planning, and systematic debugging skills prevent the "dive in and break things" failure mode - **Protect itself** — 6 security skills detect prompt injection, block dangerous actions, audit installed code, and scan for leaked credentials - **Run unattended** — 12 cron-scheduled skills handle memory cleanup, health checks, budget tracking, and community monitoring while you sleep - **Recover from failures** — self-recovery, loop-breaking, and task handoff skills keep long-running work alive across crashes and restarts +- **Never forget** — DAG-based memory compaction, integrity checking, and context scoring ensure the agent preserves critical information even in month-long conversations - **Improve itself** — the agent can write new skills during normal conversation using `create-skill`, encoding your preferences as permanent behaviors --- @@ -50,7 +51,7 @@ cd ~/.openclaw/extensions/superpowers && ./install.sh openclaw gateway restart ``` -`install.sh` symlinks all 44 skills, creates state directories for stateful skills, and registers cron jobs — everything in one step. That's it. Your agent now has superpowers. +`install.sh` symlinks all 49 skills, creates state directories for stateful skills, and registers cron jobs — everything in one step. That's it. Your agent now has superpowers. --- @@ -78,7 +79,7 @@ Methodology skills that work in any AI agent runtime. Adapted from [obra/superpo | `skill-conflict-detector` | Detects name shadowing and description-overlap conflicts between installed skills | `detect.py` | | `skill-portability-checker` | Validates OS/binary dependencies in companion scripts; catches non-portable calls | `check.py` | -### OpenClaw-Native (28 skills) +### OpenClaw-Native (33 skills) Skills that require OpenClaw's persistent runtime — cron scheduling, session state, or long-running execution. These are the skills that make a 24/7 autonomous agent actually work reliably. @@ -112,6 +113,11 @@ Skills that require OpenClaw's persistent runtime — cron scheduling, session s | `config-encryption-auditor` | Scans config directories for plaintext API keys, tokens, and world-readable permissions | Sundays 9am | `audit.py` | | `tool-description-optimizer` | Scores skill descriptions for trigger quality — clarity, specificity, keyword density — and suggests rewrites | — | `optimize.py` | | `mcp-health-checker` | Monitors MCP server connections for health, latency, and availability; detects stale connections | every 6h | `check.py` | +| `memory-dag-compactor` | Builds hierarchical summary DAGs from MEMORY.md with depth-aware prompts (d0 leaf → d3+ durable) | daily 11pm | `compact.py` | +| `large-file-interceptor` | Detects oversized files, generates structural exploration summaries, stores compact references | — | `intercept.py` | +| `context-assembly-scorer` | Scores how well current context represents full conversation; detects blind spots | every 4h | `score.py` | +| `compaction-resilience-guard` | Monitors compaction for failures; enforces normal → aggressive → deterministic fallback chain | — | `guard.py` | +| `memory-integrity-checker` | Validates summary DAGs for orphans, circular refs, token inflation, broken lineage | Sundays 3am | `integrity.py` | ### Community (1 skill) @@ -142,12 +148,12 @@ Six skills form a defense-in-depth security layer for autonomous agents: | Feature | openclaw-superpowers | obra/superpowers | Custom prompts | |---|---|---|---| -| Skills included | **44** | 8 | 0 | +| Skills included | **49** | 8 | 0 | | Self-modifying (agent writes new skills) | Yes | No | No | -| Cron scheduling | **12 scheduled skills** | No | No | +| Cron scheduling | **15 scheduled skills** | No | No | | Persistent state across sessions | **YAML state schemas** | No | No | | Security guardrails | **6 defense-in-depth skills** | No | No | -| Companion scripts with CLI | **15 scripts** | No | No | +| Companion scripts with CLI | **20 scripts** | No | No | | Memory graph / knowledge graph | Yes | No | No | | MCP server health monitoring | Yes | No | No | | API spend tracking & budget enforcement | Yes | No | No | @@ -169,7 +175,7 @@ Six skills form a defense-in-depth security layer for autonomous agents: │ │ │ ├── SKILL.md │ │ │ └── TEMPLATE.md │ │ └── ... -│ ├── openclaw-native/ # 28 persistent-runtime skills +│ ├── openclaw-native/ # 33 persistent-runtime skills │ │ ├── memory-graph-builder/ │ │ │ ├── SKILL.md # Skill definition + YAML frontmatter │ │ │ ├── STATE_SCHEMA.yaml # State shape (committed, versioned) @@ -192,7 +198,7 @@ Six skills form a defense-in-depth security layer for autonomous agents: Skills marked with a script ship a small executable alongside their `SKILL.md`: -- **15 Python scripts** (`run.py`, `audit.py`, `check.py`, `guard.py`, `bridge.py`, `onboard.py`, `sync.py`, `doctor.py`, `loadout.py`, `governor.py`, `detect.py`, `test.py`, `radar.py`, `graph.py`, `optimize.py`) — run directly to manipulate state, generate reports, or trigger actions. No extra dependencies; `pyyaml` is optional but recommended. +- **20 Python scripts** (`run.py`, `audit.py`, `check.py`, `guard.py`, `bridge.py`, `onboard.py`, `sync.py`, `doctor.py`, `loadout.py`, `governor.py`, `detect.py`, `test.py`, `radar.py`, `graph.py`, `optimize.py`, `compact.py`, `intercept.py`, `score.py`, `integrity.py`) — run directly to manipulate state, generate reports, or trigger actions. No extra dependencies; `pyyaml` is optional but recommended. - **`vet.sh`** — Pure bash scanner; runs on any system with grep. - Every script supports `--help` and `--format json`. Dry-run mode available on scripts that make changes. - See the `example-state.yaml` in each skill directory for sample state and a commented walkthrough of cron behaviour. @@ -228,3 +234,4 @@ Skills marked with a script ship a small executable alongside their `SKILL.md`: - **[openclaw/openclaw](https://github.com/openclaw/openclaw)** — the open-source AI agent runtime - **[obra/superpowers](https://github.com/obra/superpowers)** — Jesse Vincent's skills framework; core skills adapted under MIT license +- **[OpenLobster](https://github.com/Neirth/OpenLobster)** — inspiration for memory graph, config encryption auditing, tool-description scoring, and MCP health monitoring diff --git a/skills/openclaw-native/compaction-resilience-guard/SKILL.md b/skills/openclaw-native/compaction-resilience-guard/SKILL.md new file mode 100644 index 0000000..f8ac747 --- /dev/null +++ b/skills/openclaw-native/compaction-resilience-guard/SKILL.md @@ -0,0 +1,94 @@ +--- +name: compaction-resilience-guard +version: "1.0" +category: openclaw-native +description: Monitors memory compaction for failures and enforces a three-level fallback chain — normal, aggressive, deterministic truncation — ensuring compaction always makes forward progress. +stateful: true +--- + +# Compaction Resilience Guard + +## What it does + +Memory compaction can fail silently: the LLM produces empty output, summaries that are *larger* than their input, or garbled text. When this happens, compaction stalls and context overflows. + +Compaction Resilience Guard enforces a three-level escalation chain inspired by [lossless-claw](https://github.com/Martian-Engineering/lossless-claw): + +| Level | Strategy | When used | +|---|---|---| +| L1 — Normal | Standard summarization prompt | First attempt | +| L2 — Aggressive | Low temperature, reduced reasoning, shorter output target | After L1 failure | +| L3 — Deterministic | Pure truncation: keep first N + last N lines, drop middle | After L2 failure | + +This ensures compaction **always makes progress** — even if the LLM is broken. + +## When to invoke + +- After any compaction event — validate the output +- When context usage approaches 90% — compaction may be failing +- When summaries seem unusually long or empty — detect inflation +- As a pre-check before memory-dag-compactor runs + +## How to use + +```bash +python3 guard.py --check # Validate recent compaction outputs +python3 guard.py --check --file # Check a specific summary file +python3 guard.py --simulate # Run the 3-level chain on sample text +python3 guard.py --report # Show failure/escalation history +python3 guard.py --status # Last check summary +python3 guard.py --format json # Machine-readable output +``` + +## Failure detection + +The guard detects these compaction failures: + +| Failure | How detected | Action | +|---|---|---| +| Empty output | Summary length < 10 chars | Escalate to next level | +| Inflation | Summary tokens > input tokens | Escalate to next level | +| Garbled text | Entropy score > 5.0 (random chars) | Escalate to next level | +| Repetition | Same 20+ char phrase repeated 3+ times | Escalate to next level | +| Truncation marker | Contains `[FALLBACK]` or `[TRUNCATED]` | Record as L3 usage | +| Stale | Summary unchanged from previous run | Flag for review | + +## Procedure + +**Step 1 — Check recent compaction outputs** + +```bash +python3 guard.py --check +``` + +Validates all summary nodes in memory-dag-compactor state. Reports failures by level and whether escalation was needed. + +**Step 2 — Simulate the fallback chain** + +```bash +python3 guard.py --simulate "$(cat long-text.txt)" +``` + +Runs the 3-level chain on sample text to test that each level produces valid output. + +**Step 3 — Review escalation history** + +```bash +python3 guard.py --report +``` + +Shows how often each level was used. High L2/L3 usage indicates the primary summarization prompt needs improvement. + +## State + +Failure counts, escalation history, and per-summary validation results stored in `~/.openclaw/skill-state/compaction-resilience-guard/state.yaml`. + +Fields: `last_check_at`, `level_usage`, `failures`, `check_history`. + +## Notes + +- Read-only monitoring — does not perform compaction itself +- Works alongside memory-dag-compactor as a quality gate +- Deterministic truncation (L3) preserves first 30% and last 20% of input, drops middle +- Entropy is measured using Shannon entropy on character distribution +- High L3 usage (>10% of compactions) suggests a systemic LLM issue diff --git a/skills/openclaw-native/compaction-resilience-guard/STATE_SCHEMA.yaml b/skills/openclaw-native/compaction-resilience-guard/STATE_SCHEMA.yaml new file mode 100644 index 0000000..df559f5 --- /dev/null +++ b/skills/openclaw-native/compaction-resilience-guard/STATE_SCHEMA.yaml @@ -0,0 +1,30 @@ +version: "1.0" +description: Compaction failure tracking, escalation history, and level usage stats. +fields: + last_check_at: + type: datetime + level_usage: + type: object + description: How often each fallback level was used + fields: + l1_normal: { type: integer, default: 0 } + l2_aggressive: { type: integer, default: 0 } + l3_deterministic: { type: integer, default: 0 } + failures: + type: list + description: Recent compaction failures detected + items: + summary_id: { type: string } + failure_type: { type: enum, values: [empty, inflation, garbled, repetition, stale] } + level_used: { type: integer, description: "1, 2, or 3" } + input_tokens: { type: integer } + output_tokens: { type: integer } + detected_at: { type: datetime } + check_history: + type: list + description: Rolling log of past checks (last 20) + items: + checked_at: { type: datetime } + summaries_checked: { type: integer } + failures_found: { type: integer } + escalations: { type: integer } diff --git a/skills/openclaw-native/compaction-resilience-guard/example-state.yaml b/skills/openclaw-native/compaction-resilience-guard/example-state.yaml new file mode 100644 index 0000000..df38ba6 --- /dev/null +++ b/skills/openclaw-native/compaction-resilience-guard/example-state.yaml @@ -0,0 +1,65 @@ +# Example runtime state for compaction-resilience-guard +last_check_at: "2026-03-16T23:05:00.000000" +level_usage: + l1_normal: 42 + l2_aggressive: 3 + l3_deterministic: 1 +failures: + - summary_id: s-d0-012 + failure_type: inflation + level_used: 2 + input_tokens: 500 + output_tokens: 620 + detected_at: "2026-03-16T23:04:58.000000" + - summary_id: s-d1-005 + failure_type: repetition + level_used: 3 + input_tokens: 800 + output_tokens: 200 + detected_at: "2026-03-15T23:05:00.000000" +check_history: + - checked_at: "2026-03-16T23:05:00.000000" + summaries_checked: 18 + failures_found: 1 + escalations: 1 + - checked_at: "2026-03-15T23:05:00.000000" + summaries_checked: 15 + failures_found: 1 + escalations: 1 + - checked_at: "2026-03-14T23:05:00.000000" + summaries_checked: 12 + failures_found: 0 + escalations: 0 +# ── Walkthrough ────────────────────────────────────────────────────────────── +# python3 guard.py --check +# +# Compaction Resilience Check — 2026-03-16 23:05 +# ────────────────────────────────────────────────── +# Summaries checked: 18 +# Failures found: 1 +# Escalations needed: 1 +# Status: DEGRADED +# +# ! s-d0-012: inflation (entropy=3.2, 620 tok) +# +# python3 guard.py --report +# +# Compaction Resilience Report +# ────────────────────────────────────────────────── +# Total compactions tracked: 46 +# L1 Normal: 42 (91%) +# L2 Aggressive: 3 (7%) +# L3 Deterministic: 1 (2%) +# +# Recent failures: 2 +# s-d0-012: inflation (L2) +# s-d1-005: repetition (L3) +# +# python3 guard.py --simulate "$(cat long-text.txt)" +# +# Fallback Chain Simulation +# ────────────────────────────────────────────────── +# Input: 2500 tokens (10000 chars) +# Level used: L1 (l1_normal) +# Output: 1000 tokens +# Compression: 40% diff --git a/skills/openclaw-native/compaction-resilience-guard/guard.py b/skills/openclaw-native/compaction-resilience-guard/guard.py new file mode 100755 index 0000000..3e926e2 --- /dev/null +++ b/skills/openclaw-native/compaction-resilience-guard/guard.py @@ -0,0 +1,373 @@ +#!/usr/bin/env python3 +""" +Compaction Resilience Guard for openclaw-superpowers. + +Monitors memory compaction for failures and enforces a 3-level +fallback chain: normal → aggressive → deterministic truncation. + +Usage: + python3 guard.py --check + python3 guard.py --check --file + python3 guard.py --simulate + python3 guard.py --report + python3 guard.py --status + python3 guard.py --format json +""" + +import argparse +import json +import math +import os +import re +import sys +from collections import Counter +from datetime import datetime +from pathlib import Path + +try: + import yaml + HAS_YAML = True +except ImportError: + HAS_YAML = False + +OPENCLAW_DIR = Path(os.environ.get("OPENCLAW_HOME", Path.home() / ".openclaw")) +STATE_FILE = OPENCLAW_DIR / "skill-state" / "compaction-resilience-guard" / "state.yaml" +DAG_STATE_FILE = OPENCLAW_DIR / "skill-state" / "memory-dag-compactor" / "state.yaml" +MAX_HISTORY = 20 + +# Thresholds +MIN_SUMMARY_LENGTH = 10 +MAX_ENTROPY = 5.0 +REPETITION_THRESHOLD = 3 +REPETITION_MIN_LENGTH = 20 + + +# ── State helpers ──────────────────────────────────────────────────────────── + +def load_state() -> dict: + if not STATE_FILE.exists(): + return {"level_usage": {"l1_normal": 0, "l2_aggressive": 0, "l3_deterministic": 0}, + "failures": [], "check_history": []} + try: + text = STATE_FILE.read_text() + return (yaml.safe_load(text) or {}) if HAS_YAML else {} + except Exception: + return {"level_usage": {"l1_normal": 0, "l2_aggressive": 0, "l3_deterministic": 0}, + "failures": [], "check_history": []} + + +def save_state(state: dict) -> None: + STATE_FILE.parent.mkdir(parents=True, exist_ok=True) + if HAS_YAML: + with open(STATE_FILE, "w") as f: + yaml.dump(state, f, default_flow_style=False, allow_unicode=True) + + +def load_dag_state() -> dict: + if not DAG_STATE_FILE.exists(): + return {} + try: + text = DAG_STATE_FILE.read_text() + return (yaml.safe_load(text) or {}) if HAS_YAML else {} + except Exception: + return {} + + +def estimate_tokens(text: str) -> int: + return len(text) // 4 + + +# ── Failure detection ──────────────────────────────────────────────────────── + +def shannon_entropy(text: str) -> float: + """Calculate Shannon entropy of character distribution.""" + if not text: + return 0.0 + freq = Counter(text) + total = len(text) + entropy = 0.0 + for count in freq.values(): + p = count / total + if p > 0: + entropy -= p * math.log2(p) + return round(entropy, 2) + + +def detect_repetition(text: str) -> bool: + """Detect if the same 20+ char phrase is repeated 3+ times.""" + if len(text) < REPETITION_MIN_LENGTH * REPETITION_THRESHOLD: + return False + # Sliding window of REPETITION_MIN_LENGTH chars + phrases = Counter() + for i in range(len(text) - REPETITION_MIN_LENGTH): + phrase = text[i:i + REPETITION_MIN_LENGTH] + phrases[phrase] += 1 + return any(count >= REPETITION_THRESHOLD for count in phrases.values()) + + +def validate_summary(content: str, input_tokens: int = 0) -> dict: + """Validate a single summary and return failure info if any.""" + failures = [] + + # Check: empty + if len(content.strip()) < MIN_SUMMARY_LENGTH: + failures.append("empty") + + # Check: inflation (summary larger than input) + output_tokens = estimate_tokens(content) + if input_tokens > 0 and output_tokens > input_tokens: + failures.append("inflation") + + # Check: garbled (high entropy = random characters) + entropy = shannon_entropy(content) + if entropy > MAX_ENTROPY: + failures.append("garbled") + + # Check: repetition + if detect_repetition(content): + failures.append("repetition") + + # Check: fallback markers + if "[FALLBACK]" in content or "[TRUNCATED]" in content: + failures.append("truncation_marker") + + return { + "valid": len(failures) == 0, + "failures": failures, + "output_tokens": output_tokens, + "entropy": entropy, + } + + +# ── Three-level fallback chain ─────────────────────────────────────────────── + +def l1_normal(text: str) -> str: + """Level 1: Standard summarization — keep first 40% by lines.""" + lines = text.split("\n") + keep = max(3, len(lines) * 40 // 100) + summary = "\n".join(lines[:keep]) + return summary.strip() + + +def l2_aggressive(text: str) -> str: + """Level 2: Aggressive — keep only lines with substance (no blanks, short lines).""" + lines = text.split("\n") + substantial = [l for l in lines if len(l.strip()) > 20] + keep = max(3, len(substantial) * 30 // 100) + summary = "\n".join(substantial[:keep]) + return summary.strip() + + +def l3_deterministic(text: str) -> str: + """Level 3: Deterministic truncation — first 30% + last 20%, drop middle.""" + lines = text.split("\n") + total = len(lines) + if total <= 5: + return text.strip() + head_count = max(2, total * 30 // 100) + tail_count = max(1, total * 20 // 100) + head = lines[:head_count] + tail = lines[-tail_count:] + dropped = total - head_count - tail_count + summary = "\n".join(head) + f"\n[... {dropped} lines truncated ...]\n" + "\n".join(tail) + return summary.strip() + + +def run_fallback_chain(text: str) -> tuple[str, int]: + """Run the 3-level fallback chain, return (result, level_used).""" + input_tokens = estimate_tokens(text) + + # Level 1 + result = l1_normal(text) + check = validate_summary(result, input_tokens) + if check["valid"]: + return result, 1 + + # Level 2 + result = l2_aggressive(text) + check = validate_summary(result, input_tokens) + if check["valid"]: + return result, 2 + + # Level 3 — always succeeds + result = l3_deterministic(text) + return result, 3 + + +# ── Commands ───────────────────────────────────────────────────────────────── + +def cmd_check(state: dict, file_path: str | None, fmt: str) -> None: + now = datetime.now().isoformat() + failures_found = 0 + escalations = 0 + summaries_checked = 0 + + if file_path: + # Check a specific file + path = Path(file_path) + if not path.exists(): + print(f"Error: file '{file_path}' not found.") + sys.exit(1) + content = path.read_text() + check = validate_summary(content) + summaries_checked = 1 + if not check["valid"]: + failures_found = 1 + else: + # Check all DAG summaries + dag = load_dag_state() + nodes = dag.get("dag_nodes") or [] + results = [] + + for node in nodes: + content = node.get("content", "") + check = validate_summary(content) + summaries_checked += 1 + if not check["valid"]: + failures_found += 1 + for f_type in check["failures"]: + failure_record = { + "summary_id": node.get("id", "unknown"), + "failure_type": f_type, + "level_used": 1, + "input_tokens": 0, + "output_tokens": check["output_tokens"], + "detected_at": now, + } + existing_failures = state.get("failures") or [] + existing_failures.append(failure_record) + state["failures"] = existing_failures[-50:] # Keep last 50 + escalations += 1 + + results.append({ + "id": node.get("id"), "failures": check["failures"], + "entropy": check["entropy"], "tokens": check["output_tokens"], + }) + + if fmt != "json" and results: + for r in results: + print(f" ! {r['id']}: {', '.join(r['failures'])} " + f"(entropy={r['entropy']}, {r['tokens']} tok)") + + state["last_check_at"] = now + history = state.get("check_history") or [] + history.insert(0, { + "checked_at": now, "summaries_checked": summaries_checked, + "failures_found": failures_found, "escalations": escalations, + }) + state["check_history"] = history[:MAX_HISTORY] + save_state(state) + + if fmt == "json": + print(json.dumps({"summaries_checked": summaries_checked, + "failures_found": failures_found, + "escalations": escalations}, indent=2)) + else: + print(f"\nCompaction Resilience Check — {datetime.now().strftime('%Y-%m-%d %H:%M')}") + print("-" * 50) + print(f" Summaries checked: {summaries_checked}") + print(f" Failures found: {failures_found}") + print(f" Escalations needed: {escalations}") + status = "HEALTHY" if failures_found == 0 else "DEGRADED" + print(f" Status: {status}") + print() + + +def cmd_simulate(state: dict, text: str, fmt: str) -> None: + input_tokens = estimate_tokens(text) + result, level = run_fallback_chain(text) + output_tokens = estimate_tokens(result) + + # Update level usage + usage = state.get("level_usage") or {"l1_normal": 0, "l2_aggressive": 0, "l3_deterministic": 0} + level_key = {1: "l1_normal", 2: "l2_aggressive", 3: "l3_deterministic"}[level] + usage[level_key] = usage.get(level_key, 0) + 1 + state["level_usage"] = usage + save_state(state) + + if fmt == "json": + print(json.dumps({"level_used": level, "input_tokens": input_tokens, + "output_tokens": output_tokens, "result": result[:500]}, indent=2)) + else: + print(f"\nFallback Chain Simulation") + print("-" * 50) + print(f" Input: {input_tokens} tokens ({len(text)} chars)") + print(f" Level used: L{level} ({level_key})") + print(f" Output: {output_tokens} tokens") + ratio = round(output_tokens / max(input_tokens, 1) * 100) + print(f" Compression: {ratio}%") + print(f"\n Result preview:") + for line in result.split("\n")[:10]: + print(f" {line}") + if result.count("\n") > 10: + print(f" ... ({result.count(chr(10))-10} more lines)") + print() + + +def cmd_report(state: dict, fmt: str) -> None: + usage = state.get("level_usage") or {} + failures = state.get("failures") or [] + total = sum(usage.values()) + + if fmt == "json": + print(json.dumps({"level_usage": usage, "total_compactions": total, + "recent_failures": failures[-10:]}, indent=2)) + else: + print(f"\nCompaction Resilience Report") + print("-" * 50) + print(f" Total compactions tracked: {total}") + if total > 0: + l1 = usage.get("l1_normal", 0) + l2 = usage.get("l2_aggressive", 0) + l3 = usage.get("l3_deterministic", 0) + print(f" L1 Normal: {l1:>5} ({l1/total*100:.0f}%)") + print(f" L2 Aggressive: {l2:>5} ({l2/total*100:.0f}%)") + print(f" L3 Deterministic: {l3:>5} ({l3/total*100:.0f}%)") + if l3 / total > 0.1: + print(f"\n WARNING: L3 usage > 10% — indicates systemic LLM issue") + print(f"\n Recent failures: {len(failures)}") + for f in failures[-5:]: + print(f" {f.get('summary_id', '?')}: {f.get('failure_type', '?')} " + f"(L{f.get('level_used', '?')})") + print() + + +def cmd_status(state: dict) -> None: + last = state.get("last_check_at", "never") + usage = state.get("level_usage") or {} + total = sum(usage.values()) + l3 = usage.get("l3_deterministic", 0) + print(f"\nCompaction Resilience Guard — Last check: {last}") + print(f" {total} compactions tracked | L3 fallbacks: {l3}") + history = state.get("check_history") or [] + if history: + h = history[0] + print(f" Last: {h.get('summaries_checked', 0)} checked, " + f"{h.get('failures_found', 0)} failures") + print() + + +def main(): + parser = argparse.ArgumentParser(description="Compaction Resilience Guard") + group = parser.add_mutually_exclusive_group(required=True) + group.add_argument("--check", action="store_true", help="Validate recent compaction outputs") + group.add_argument("--simulate", type=str, metavar="TEXT", help="Run 3-level chain on sample text") + group.add_argument("--report", action="store_true", help="Show failure/escalation history") + group.add_argument("--status", action="store_true", help="Last check summary") + parser.add_argument("--file", type=str, metavar="PATH", help="Check a specific summary file") + parser.add_argument("--format", choices=["text", "json"], default="text") + args = parser.parse_args() + + state = load_state() + if args.check: + cmd_check(state, args.file, args.format) + elif args.simulate: + cmd_simulate(state, args.simulate, args.format) + elif args.report: + cmd_report(state, args.format) + elif args.status: + cmd_status(state) + + +if __name__ == "__main__": + main() diff --git a/skills/openclaw-native/context-assembly-scorer/SKILL.md b/skills/openclaw-native/context-assembly-scorer/SKILL.md new file mode 100644 index 0000000..0487be6 --- /dev/null +++ b/skills/openclaw-native/context-assembly-scorer/SKILL.md @@ -0,0 +1,94 @@ +--- +name: context-assembly-scorer +version: "1.0" +category: openclaw-native +description: Scores how well the current context represents the full conversation — detects information blind spots, stale summaries, and coverage gaps that cause the agent to forget critical details. +stateful: true +cron: "0 */4 * * *" +--- + +# Context Assembly Scorer + +## What it does + +When an agent compacts context, it loses information. But how much? And which information? Context Assembly Scorer answers these questions by measuring **coverage** — the ratio of important topics in the full conversation history that are represented in the current assembled context. + +Inspired by [lossless-claw](https://github.com/Martian-Engineering/lossless-claw)'s context assembly system, which carefully selects which summaries to include in each turn's context to maximize information coverage. + +## When to invoke + +- Automatically every 4 hours (cron) — silent coverage check +- Before starting a task that depends on prior context — verify nothing critical is missing +- After compaction — measure information loss +- When the agent says "I don't remember" — diagnose why + +## Coverage dimensions + +| Dimension | What it measures | Weight | +|---|---|---| +| Topic coverage | % of conversation topics present in current context | 2x | +| Recency bias | Whether recent context is over-represented vs. older important context | 1.5x | +| Entity continuity | Named entities (files, people, APIs) mentioned in history that are missing from context | 2x | +| Decision retention | Architectural decisions and user preferences still accessible | 2x | +| Task continuity | Active/pending tasks that might be lost after compaction | 1.5x | + +## How to use + +```bash +python3 score.py --score # Score current context assembly +python3 score.py --score --verbose # Detailed per-dimension breakdown +python3 score.py --blind-spots # List topics missing from context +python3 score.py --drift # Compare current vs. previous scores +python3 score.py --status # Last score summary +python3 score.py --format json # Machine-readable output +``` + +## Procedure + +**Step 1 — Score context coverage** + +```bash +python3 score.py --score +``` + +The scorer reads MEMORY.md (full history) and compares it against what's currently accessible. Outputs a coverage score from 0–100% with a letter grade. + +**Step 2 — Find blind spots** + +```bash +python3 score.py --blind-spots +``` + +Lists specific topics, entities, and decisions that exist in full history but are missing from current context — these are what the agent has effectively "forgotten." + +**Step 3 — Track drift over time** + +```bash +python3 score.py --drift +``` + +Shows how coverage has changed across the last 20 scores. Identify if compaction is progressively losing more information. + +## Grading + +| Grade | Coverage | Meaning | +|---|---|---| +| A | 90–100% | Excellent — minimal information loss | +| B | 75–89% | Good — minor gaps, unlikely to cause issues | +| C | 60–74% | Fair — some important context missing | +| D | 40–59% | Poor — significant blind spots | +| F | 0–39% | Critical — agent is operating with major gaps | + +## State + +Coverage scores and blind spot history stored in `~/.openclaw/skill-state/context-assembly-scorer/state.yaml`. + +Fields: `last_score_at`, `current_score`, `blind_spots`, `score_history`. + +## Notes + +- Read-only — does not modify context or memory +- Topic extraction uses keyword clustering, not LLM calls +- Entity detection uses regex patterns for file paths, URLs, class names, API endpoints +- Decision detection looks for markers: "decided", "chose", "prefer", "always", "never" +- Recency bias is measured as the ratio of recent-vs-old entry representation diff --git a/skills/openclaw-native/context-assembly-scorer/STATE_SCHEMA.yaml b/skills/openclaw-native/context-assembly-scorer/STATE_SCHEMA.yaml new file mode 100644 index 0000000..89a1b18 --- /dev/null +++ b/skills/openclaw-native/context-assembly-scorer/STATE_SCHEMA.yaml @@ -0,0 +1,31 @@ +version: "1.0" +description: Context coverage scores, blind spot tracking, and drift history. +fields: + last_score_at: + type: datetime + current_score: + type: object + fields: + overall: { type: float, description: "0-100 coverage percentage" } + grade: { type: string } + topic_coverage: { type: float } + recency_bias: { type: float } + entity_continuity: { type: float } + decision_retention: { type: float } + task_continuity: { type: float } + blind_spots: + type: list + description: Topics/entities missing from current context + items: + type: { type: enum, values: [topic, entity, decision, task] } + name: { type: string } + importance: { type: enum, values: [critical, high, medium, low] } + last_seen: { type: string, description: "When this was last in context" } + score_history: + type: list + description: Rolling log of past scores (last 20) + items: + scored_at: { type: datetime } + overall: { type: float } + grade: { type: string } + blind_spot_count: { type: integer } diff --git a/skills/openclaw-native/context-assembly-scorer/example-state.yaml b/skills/openclaw-native/context-assembly-scorer/example-state.yaml new file mode 100644 index 0000000..a2d139d --- /dev/null +++ b/skills/openclaw-native/context-assembly-scorer/example-state.yaml @@ -0,0 +1,74 @@ +# Example runtime state for context-assembly-scorer +last_score_at: "2026-03-16T16:00:08.000000" +current_score: + overall: 72.3 + grade: C + topic_coverage: 82.0 + recency_bias: 65.5 + entity_continuity: 68.0 + decision_retention: 75.0 + task_continuity: 70.0 +blind_spots: + - type: decision + name: "Decided to use Jaccard similarity threshold of 0.7 for deduplication" + importance: critical + last_seen: "in full memory" + - type: entity + name: "/skills/openclaw-native/heartbeat-governor/governor.py" + importance: high + last_seen: "in full memory" + - type: task + name: "TODO: add --dry-run flag to radar.py before next release" + importance: high + last_seen: "in full memory" + - type: entity + name: "https://github.com/Neirth/OpenLobster" + importance: medium + last_seen: "in full memory" +score_history: + - scored_at: "2026-03-16T16:00:08.000000" + overall: 72.3 + grade: C + blind_spot_count: 12 + - scored_at: "2026-03-16T12:00:05.000000" + overall: 85.1 + grade: B + blind_spot_count: 5 + - scored_at: "2026-03-16T08:00:03.000000" + overall: 91.2 + grade: A + blind_spot_count: 2 +# ── Walkthrough ────────────────────────────────────────────────────────────── +# Cron runs every 4 hours: python3 score.py --score --verbose +# +# Context Assembly Score — 2026-03-16 16:00 +# ─────────────────────────────────────────────────────── +# Overall: 72.3% Grade: C +# Topic coverage: 82.0% (2x weight) +# Recency bias: 65.5% (1.5x weight) +# Entity continuity: 68.0% (2x weight) +# Decision retention: 75.0% (2x weight) +# Task continuity: 70.0% (1.5x weight) +# +# Memory stats: +# Topics: 284 unique | Entities: 47 +# Decisions: 12 | Tasks: 8 +# Blind spots: 12 +# +# python3 score.py --blind-spots +# +# Blind Spots — 12 items missing from context +# ─────────────────────────────────────────────────────── +# !! [CRITICAL] decision: Decided to use Jaccard similarity threshold... +# ! [ HIGH] entity: /skills/openclaw-native/heartbeat-governor/... +# ! [ HIGH] task: TODO: add --dry-run flag to radar.py... +# +# python3 score.py --drift +# +# Coverage Drift — 3 data points +# ─────────────────────────────────────────────────────── +# 2026-03-16T16:00 [=======---] 72.3% (C) 12 blind spots +# 2026-03-16T12:00 [=========-] 85.1% (B) 5 blind spots +# 2026-03-16T08:00 [=========+] 91.2% (A) 2 blind spots +# +# Trend: declining (-12.8%) diff --git a/skills/openclaw-native/context-assembly-scorer/score.py b/skills/openclaw-native/context-assembly-scorer/score.py new file mode 100755 index 0000000..0897293 --- /dev/null +++ b/skills/openclaw-native/context-assembly-scorer/score.py @@ -0,0 +1,419 @@ +#!/usr/bin/env python3 +""" +Context Assembly Scorer for openclaw-superpowers. + +Scores how well the current context represents the full conversation. +Detects information blind spots, stale summaries, and coverage gaps. + +Usage: + python3 score.py --score + python3 score.py --score --verbose + python3 score.py --blind-spots + python3 score.py --drift + python3 score.py --status + python3 score.py --format json +""" + +import argparse +import json +import os +import re +import sys +from collections import Counter +from datetime import datetime +from pathlib import Path + +try: + import yaml + HAS_YAML = True +except ImportError: + HAS_YAML = False + +OPENCLAW_DIR = Path(os.environ.get("OPENCLAW_HOME", Path.home() / ".openclaw")) +STATE_FILE = OPENCLAW_DIR / "skill-state" / "context-assembly-scorer" / "state.yaml" +MEMORY_FILE = OPENCLAW_DIR / "workspace" / "MEMORY.md" +CONTEXT_FILE = OPENCLAW_DIR / "workspace" / "CONTEXT.md" +MAX_HISTORY = 20 + +# ── Patterns for entity/decision detection ─────────────────────────────────── + +ENTITY_PATTERNS = [ + re.compile(r'(?:/[\w./-]+\.[\w]+)'), # file paths + re.compile(r'https?://[^\s)]+'), # URLs + re.compile(r'(?:class|def|function|const)\s+(\w+)'), # code definitions + re.compile(r'(?:GET|POST|PUT|DELETE|PATCH)\s+/[\w/-]+'), # API endpoints + re.compile(r'`([^`]{3,40})`'), # inline code refs +] + +DECISION_MARKERS = [ + "decided", "chose", "chosen", "prefer", "preference", + "always", "never", "must", "should not", "agreed", + "convention", "standard", "rule", "policy", "approach", +] + +TASK_MARKERS = [ + "todo", "TODO", "FIXME", "HACK", "pending", "in progress", + "blocked", "waiting", "next step", "follow up", "need to", +] + + +# ── State helpers ──────────────────────────────────────────────────────────── + +def load_state() -> dict: + if not STATE_FILE.exists(): + return {"blind_spots": [], "score_history": []} + try: + text = STATE_FILE.read_text() + return (yaml.safe_load(text) or {}) if HAS_YAML else {} + except Exception: + return {"blind_spots": [], "score_history": []} + + +def save_state(state: dict) -> None: + STATE_FILE.parent.mkdir(parents=True, exist_ok=True) + if HAS_YAML: + with open(STATE_FILE, "w") as f: + yaml.dump(state, f, default_flow_style=False, allow_unicode=True) + + +# ── Extraction ─────────────────────────────────────────────────────────────── + +def extract_topics(text: str) -> Counter: + """Extract topic keywords from text.""" + # Remove code blocks and URLs + cleaned = re.sub(r'```[\s\S]*?```', '', text) + cleaned = re.sub(r'https?://\S+', '', cleaned) + # Tokenize + words = re.findall(r'[a-z][a-z0-9_-]{2,}', cleaned.lower()) + # Filter stopwords + stopwords = { + "the", "and", "for", "that", "this", "with", "from", "are", "was", + "were", "been", "have", "has", "had", "will", "would", "could", + "should", "not", "but", "its", "also", "can", "into", "when", + "then", "than", "more", "some", "each", "all", "any", "our", + "your", "their", "which", "about", "just", "like", "very", + } + filtered = [w for w in words if w not in stopwords and len(w) > 2] + return Counter(filtered) + + +def extract_entities(text: str) -> set: + """Extract named entities from text.""" + entities = set() + for pattern in ENTITY_PATTERNS: + matches = pattern.findall(text) + for m in matches: + if isinstance(m, tuple): + m = m[0] + if len(m) > 2: + entities.add(m.strip()) + return entities + + +def extract_decisions(text: str) -> list[str]: + """Extract decision statements from text.""" + decisions = [] + for line in text.split("\n"): + line_lower = line.lower() + for marker in DECISION_MARKERS: + if marker in line_lower: + decisions.append(line.strip()[:120]) + break + return decisions + + +def extract_tasks(text: str) -> list[str]: + """Extract task/todo references from text.""" + tasks = [] + for line in text.split("\n"): + line_lower = line.lower() + for marker in TASK_MARKERS: + if marker in line_lower: + tasks.append(line.strip()[:120]) + break + return tasks + + +# ── Scoring ────────────────────────────────────────────────────────────────── + +def score_topic_coverage(memory_topics: Counter, context_topics: Counter) -> float: + """Score: what % of important memory topics appear in context.""" + if not memory_topics: + return 100.0 + # Focus on top 50 topics by frequency + top_topics = {t for t, _ in memory_topics.most_common(50)} + if not top_topics: + return 100.0 + covered = sum(1 for t in top_topics if context_topics.get(t, 0) > 0) + return round(covered / len(top_topics) * 100, 1) + + +def score_recency_bias(memory_text: str, context_text: str) -> float: + """Score: is context over-representing recent entries vs. older important ones.""" + memory_lines = memory_text.split("\n") + total = len(memory_lines) + if total < 10: + return 100.0 + + # Split memory into thirds: old, mid, recent + third = total // 3 + old_topics = extract_topics("\n".join(memory_lines[:third])) + mid_topics = extract_topics("\n".join(memory_lines[third:2*third])) + recent_topics = extract_topics("\n".join(memory_lines[2*third:])) + ctx_topics = extract_topics(context_text) + + # Score each third's representation + old_covered = sum(1 for t in old_topics if ctx_topics.get(t, 0) > 0) + mid_covered = sum(1 for t in mid_topics if ctx_topics.get(t, 0) > 0) + recent_covered = sum(1 for t in recent_topics if ctx_topics.get(t, 0) > 0) + + old_pct = old_covered / max(len(old_topics), 1) * 100 + mid_pct = mid_covered / max(len(mid_topics), 1) * 100 + recent_pct = recent_covered / max(len(recent_topics), 1) * 100 + + # Penalize if old coverage is much lower than recent + if recent_pct > 0: + balance = (old_pct + mid_pct) / (2 * recent_pct) * 100 + return round(min(100.0, balance), 1) + return 100.0 + + +def score_entity_continuity(memory_entities: set, context_entities: set) -> float: + """Score: named entities in history that are missing from context.""" + if not memory_entities: + return 100.0 + covered = len(memory_entities & context_entities) + return round(covered / len(memory_entities) * 100, 1) + + +def score_decision_retention(memory_decisions: list, context_text: str) -> float: + """Score: are decisions still accessible in context.""" + if not memory_decisions: + return 100.0 + ctx_lower = context_text.lower() + retained = sum(1 for d in memory_decisions + if any(word in ctx_lower for word in d.lower().split()[:5])) + return round(retained / len(memory_decisions) * 100, 1) + + +def score_task_continuity(memory_tasks: list, context_text: str) -> float: + """Score: are active tasks still visible in context.""" + if not memory_tasks: + return 100.0 + ctx_lower = context_text.lower() + retained = sum(1 for t in memory_tasks + if any(word in ctx_lower for word in t.lower().split()[:5])) + return round(retained / len(memory_tasks) * 100, 1) + + +def compute_overall(tc, rb, ec, dr, tcont) -> float: + """Weighted overall score.""" + weighted = tc * 2.0 + rb * 1.5 + ec * 2.0 + dr * 2.0 + tcont * 1.5 + total_weight = 2.0 + 1.5 + 2.0 + 2.0 + 1.5 + return round(weighted / total_weight, 1) + + +def get_grade(score: float) -> str: + if score >= 90: + return "A" + elif score >= 75: + return "B" + elif score >= 60: + return "C" + elif score >= 40: + return "D" + return "F" + + +def find_blind_spots(memory_text: str, context_text: str) -> list[dict]: + """Find specific items missing from context.""" + spots = [] + mem_entities = extract_entities(memory_text) + ctx_entities = extract_entities(context_text) + missing_entities = mem_entities - ctx_entities + + for entity in sorted(missing_entities)[:20]: + importance = "high" if "/" in entity or "http" in entity else "medium" + spots.append({ + "type": "entity", + "name": entity[:80], + "importance": importance, + "last_seen": "in full memory", + }) + + mem_decisions = extract_decisions(memory_text) + ctx_lower = context_text.lower() + for d in mem_decisions[:10]: + if not any(word in ctx_lower for word in d.lower().split()[:5]): + spots.append({ + "type": "decision", + "name": d[:80], + "importance": "critical", + "last_seen": "in full memory", + }) + + mem_tasks = extract_tasks(memory_text) + for t in mem_tasks[:10]: + if not any(word in ctx_lower for word in t.lower().split()[:5]): + spots.append({ + "type": "task", + "name": t[:80], + "importance": "high", + "last_seen": "in full memory", + }) + + # Sort by importance + order = {"critical": 0, "high": 1, "medium": 2, "low": 3} + spots.sort(key=lambda s: order.get(s["importance"], 3)) + return spots + + +# ── Commands ───────────────────────────────────────────────────────────────── + +def cmd_score(state: dict, verbose: bool, fmt: str) -> None: + memory_text = MEMORY_FILE.read_text() if MEMORY_FILE.exists() else "" + context_text = CONTEXT_FILE.read_text() if CONTEXT_FILE.exists() else memory_text + + if not memory_text: + print("No MEMORY.md found — nothing to score.") + return + + mem_topics = extract_topics(memory_text) + ctx_topics = extract_topics(context_text) + mem_entities = extract_entities(memory_text) + ctx_entities = extract_entities(context_text) + mem_decisions = extract_decisions(memory_text) + mem_tasks = extract_tasks(memory_text) + + tc = score_topic_coverage(mem_topics, ctx_topics) + rb = score_recency_bias(memory_text, context_text) + ec = score_entity_continuity(mem_entities, ctx_entities) + dr = score_decision_retention(mem_decisions, context_text) + tcont = score_task_continuity(mem_tasks, context_text) + overall = compute_overall(tc, rb, ec, dr, tcont) + grade = get_grade(overall) + now = datetime.now().isoformat() + + score_data = { + "overall": overall, + "grade": grade, + "topic_coverage": tc, + "recency_bias": rb, + "entity_continuity": ec, + "decision_retention": dr, + "task_continuity": tcont, + } + state["current_score"] = score_data + state["last_score_at"] = now + + history = state.get("score_history") or [] + blind_spots = find_blind_spots(memory_text, context_text) + history.insert(0, { + "scored_at": now, "overall": overall, + "grade": grade, "blind_spot_count": len(blind_spots), + }) + state["score_history"] = history[:MAX_HISTORY] + state["blind_spots"] = blind_spots + save_state(state) + + if fmt == "json": + print(json.dumps(score_data, indent=2)) + else: + print(f"\nContext Assembly Score — {datetime.now().strftime('%Y-%m-%d %H:%M')}") + print("-" * 55) + print(f" Overall: {overall:>5}% Grade: {grade}") + if verbose: + print(f" Topic coverage: {tc:>5}% (2x weight)") + print(f" Recency bias: {rb:>5}% (1.5x weight)") + print(f" Entity continuity: {ec:>5}% (2x weight)") + print(f" Decision retention: {dr:>5}% (2x weight)") + print(f" Task continuity: {tcont:>5}% (1.5x weight)") + print(f"\n Memory stats:") + print(f" Topics: {len(mem_topics)} unique | Entities: {len(mem_entities)}") + print(f" Decisions: {len(mem_decisions)} | Tasks: {len(mem_tasks)}") + print(f" Blind spots: {len(blind_spots)}") + print() + + +def cmd_blind_spots(state: dict, fmt: str) -> None: + memory_text = MEMORY_FILE.read_text() if MEMORY_FILE.exists() else "" + context_text = CONTEXT_FILE.read_text() if CONTEXT_FILE.exists() else memory_text + spots = find_blind_spots(memory_text, context_text) + + if fmt == "json": + print(json.dumps({"blind_spots": spots, "count": len(spots)}, indent=2)) + else: + print(f"\nBlind Spots — {len(spots)} items missing from context") + print("-" * 55) + icons = {"critical": "!!", "high": "!", "medium": "~", "low": "."} + for s in spots[:25]: + icon = icons.get(s["importance"], "?") + print(f" {icon} [{s['importance'].upper():>8}] {s['type']:>8}: {s['name']}") + print() + + +def cmd_drift(state: dict, fmt: str) -> None: + history = state.get("score_history") or [] + if fmt == "json": + print(json.dumps({"score_history": history}, indent=2)) + else: + print(f"\nCoverage Drift — {len(history)} data points") + print("-" * 55) + if not history: + print(" No score history yet.") + else: + for h in history[:15]: + ts = h.get("scored_at", "?")[:16] + overall = h.get("overall", 0) + grade = h.get("grade", "?") + spots = h.get("blind_spot_count", 0) + bar = "=" * int(overall / 10) + "-" * (10 - int(overall / 10)) + print(f" {ts} [{bar}] {overall}% ({grade}) {spots} blind spots") + # Trend + if len(history) >= 2: + latest = history[0].get("overall", 0) + prev = history[1].get("overall", 0) + delta = latest - prev + trend = "improving" if delta > 0 else "declining" if delta < 0 else "stable" + print(f"\n Trend: {trend} ({'+' if delta>0 else ''}{delta}%)") + print() + + +def cmd_status(state: dict) -> None: + last = state.get("last_score_at", "never") + score = state.get("current_score") or {} + print(f"\nContext Assembly Scorer — Last score: {last}") + if score: + print(f" Overall: {score.get('overall', 0)}% ({score.get('grade', '?')})") + spots = state.get("blind_spots") or [] + print(f" Blind spots: {len(spots)}") + critical = sum(1 for s in spots if s.get("importance") == "critical") + if critical: + print(f" Critical blind spots: {critical}") + print() + + +def main(): + parser = argparse.ArgumentParser(description="Context Assembly Scorer") + group = parser.add_mutually_exclusive_group(required=True) + group.add_argument("--score", action="store_true", help="Score current context assembly") + group.add_argument("--blind-spots", action="store_true", help="List topics missing from context") + group.add_argument("--drift", action="store_true", help="Compare scores over time") + group.add_argument("--status", action="store_true", help="Last score summary") + parser.add_argument("--verbose", action="store_true", help="Detailed per-dimension breakdown") + parser.add_argument("--format", choices=["text", "json"], default="text") + args = parser.parse_args() + + state = load_state() + if args.score: + cmd_score(state, args.verbose, args.format) + elif args.blind_spots: + cmd_blind_spots(state, args.format) + elif args.drift: + cmd_drift(state, args.format) + elif args.status: + cmd_status(state) + + +if __name__ == "__main__": + main() diff --git a/skills/openclaw-native/large-file-interceptor/SKILL.md b/skills/openclaw-native/large-file-interceptor/SKILL.md new file mode 100644 index 0000000..e479943 --- /dev/null +++ b/skills/openclaw-native/large-file-interceptor/SKILL.md @@ -0,0 +1,109 @@ +--- +name: large-file-interceptor +version: "1.0" +category: openclaw-native +description: Detects oversized files that would blow the context window, generates structural exploration summaries, and stores compact references — preventing a single paste from consuming the entire budget. +stateful: true +--- + +# Large File Interceptor + +## What it does + +A single large file paste can consume 60–80% of the context window, leaving no room for actual work. Large File Interceptor detects oversized files, generates a structural summary (schema, columns, imports, key definitions), stores the original externally, and replaces it with a compact reference card. + +Inspired by [lossless-claw](https://github.com/Martian-Engineering/lossless-claw)'s large file interception layer, which automatically extracts files exceeding 25k tokens. + +## When to invoke + +- Before processing any file the agent reads or receives — check size first +- When context budget is running low and large files may be the cause +- After a paste or file read — retroactively scan for oversized content +- Periodically to audit what's consuming the most context budget + +## How to use + +```bash +python3 intercept.py --scan # Scan a file or directory +python3 intercept.py --scan --threshold 10000 # Custom token threshold +python3 intercept.py --summarize # Generate structural summary for a file +python3 intercept.py --list # List all intercepted files +python3 intercept.py --restore # Retrieve original file content +python3 intercept.py --audit # Show context budget impact +python3 intercept.py --status # Last scan summary +python3 intercept.py --format json # Machine-readable output +``` + +## Structural exploration summaries + +The interceptor generates different summaries based on file type: + +| File type | Summary includes | +|---|---| +| JSON/YAML | Top-level schema, key types, array lengths, nested depth | +| CSV/TSV | Column names, row count, sample values, data types per column | +| Python/JS/TS | Imports, class definitions, function signatures, export list | +| Markdown | Heading structure, word count per section, link count | +| Log files | Time range, error count, unique error patterns, frequency | +| Binary/Other | File size, MIME type, magic bytes | + +## Reference card format + +When a file is intercepted, the original is stored in `~/.openclaw/lcm-files/` and replaced with: + +``` +[FILE REFERENCE: ref-001] +Original: /path/to/large-file.json +Size: 145,230 bytes (~36,307 tokens) +Type: JSON — API response payload + +Structure: + - Root: object with 3 keys + - "data": array of 1,247 objects + - "metadata": object (pagination, timestamps) + - "errors": empty array + +Key fields in data[]: id, name, email, created_at, status +Sample: {"id": 1, "name": "...", "status": "active"} + +To retrieve full content: python3 intercept.py --restore ref-001 +``` + +## Procedure + +**Step 1 — Scan before processing** + +```bash +python3 intercept.py --scan /path/to/file.json +``` + +If the file exceeds the token threshold (default: 25,000 tokens), it generates a structural summary and stores a reference. + +**Step 2 — Audit context impact** + +```bash +python3 intercept.py --audit +``` + +Shows all files in the current workspace ranked by token impact, with recommendations for which to intercept. + +**Step 3 — Restore when needed** + +```bash +python3 intercept.py --restore ref-001 +``` + +Retrieves the original file content from storage for detailed inspection. + +## State + +Intercepted file registry and reference cards stored in `~/.openclaw/skill-state/large-file-interceptor/state.yaml`. Original files stored in `~/.openclaw/lcm-files/`. + +Fields: `last_scan_at`, `intercepted_files`, `total_tokens_saved`, `scan_history`. + +## Notes + +- Never deletes or modifies original files — intercept creates a copy + reference +- Token threshold is configurable (default: 25,000 ~= 100KB of text) +- Reference cards are typically 200–400 tokens vs. 25,000+ for the original +- Supports recursive directory scanning with `--scan /path/to/dir` diff --git a/skills/openclaw-native/large-file-interceptor/STATE_SCHEMA.yaml b/skills/openclaw-native/large-file-interceptor/STATE_SCHEMA.yaml new file mode 100644 index 0000000..6c421f9 --- /dev/null +++ b/skills/openclaw-native/large-file-interceptor/STATE_SCHEMA.yaml @@ -0,0 +1,34 @@ +version: "1.0" +description: Registry of intercepted large files, reference cards, and token savings. +fields: + last_scan_at: + type: datetime + token_threshold: + type: integer + default: 25000 + description: Files exceeding this token count are intercepted + intercepted_files: + type: list + description: All intercepted file references + items: + ref_id: { type: string, description: "Reference ID (e.g. ref-001)" } + original_path: { type: string } + stored_path: { type: string, description: "Path in ~/.openclaw/lcm-files/" } + file_type: { type: string, description: "Detected file type" } + original_tokens: { type: integer } + summary_tokens: { type: integer } + tokens_saved: { type: integer } + summary: { type: string, description: "Structural exploration summary" } + intercepted_at: { type: datetime } + total_tokens_saved: + type: integer + description: Cumulative tokens saved by interception + scan_history: + type: list + description: Rolling log of past scans (last 20) + items: + scanned_at: { type: datetime } + path_scanned: { type: string } + files_checked: { type: integer } + files_intercepted: { type: integer } + tokens_saved: { type: integer } diff --git a/skills/openclaw-native/large-file-interceptor/example-state.yaml b/skills/openclaw-native/large-file-interceptor/example-state.yaml new file mode 100644 index 0000000..162404b --- /dev/null +++ b/skills/openclaw-native/large-file-interceptor/example-state.yaml @@ -0,0 +1,66 @@ +# Example runtime state for large-file-interceptor +last_scan_at: "2026-03-16T14:30:05.000000" +token_threshold: 25000 +intercepted_files: + - ref_id: ref-001 + original_path: "/Users/you/project/data/api-response.json" + stored_path: "/Users/you/.openclaw/lcm-files/ref-001_a3b2c1d4e5f6.json" + file_type: JSON + original_tokens: 36307 + summary_tokens: 180 + tokens_saved: 36127 + summary: | + Root: object with 3 keys + "data": array of 1247 objects + "metadata": object (5 keys) + "errors": empty array + Item keys: id, name, email, created_at, status + intercepted_at: "2026-03-16T14:30:03.000000" + - ref_id: ref-002 + original_path: "/Users/you/project/logs/server.log" + stored_path: "/Users/you/.openclaw/lcm-files/ref-002_f7e8d9c0b1a2.log" + file_type: Log + original_tokens: 52800 + summary_tokens: 220 + tokens_saved: 52580 + summary: | + Total lines: 8450 + Time range: 2026-03-15T00:00 → 2026-03-16T14:29 + Errors: 23, Warnings: 87 + Unique error patterns: 5 + ConnectionError: host N.N.N.N port N + TimeoutError: request exceeded Nms + ValueError: invalid JSON at line N + intercepted_at: "2026-03-16T14:30:04.000000" +total_tokens_saved: 88707 +scan_history: + - scanned_at: "2026-03-16T14:30:05.000000" + path_scanned: "/Users/you/project" + files_checked: 48 + files_intercepted: 2 + tokens_saved: 88707 +# ── Walkthrough ────────────────────────────────────────────────────────────── +# python3 intercept.py --scan /Users/you/project +# +# Intercepted: api-response.json (36,307 tokens → 180 tokens) +# Reference card: +# [FILE REFERENCE: ref-001] +# Original: /Users/you/project/data/api-response.json +# Size: 145,230 bytes (~36,307 tokens) +# Type: JSON — API response payload +# ... +# +# Intercepted: server.log (52,800 tokens → 220 tokens) +# ... +# +# Scan Complete — 48 files checked, 2 intercepted, ~88,707 tokens saved +# +# python3 intercept.py --audit +# +# Context Budget Audit +# ────────────────────────────────────────────── +# Intercepted files: 2 +# Original token cost: ~89,107 +# Summary token cost: ~400 +# Total tokens saved: ~88,707 +# Compression ratio: 99% diff --git a/skills/openclaw-native/large-file-interceptor/intercept.py b/skills/openclaw-native/large-file-interceptor/intercept.py new file mode 100755 index 0000000..d7b231a --- /dev/null +++ b/skills/openclaw-native/large-file-interceptor/intercept.py @@ -0,0 +1,497 @@ +#!/usr/bin/env python3 +""" +Large File Interceptor for openclaw-superpowers. + +Detects oversized files, generates structural summaries, and stores +compact references to prevent context window blowout. + +Usage: + python3 intercept.py --scan + python3 intercept.py --scan --threshold 10000 + python3 intercept.py --summarize + python3 intercept.py --list + python3 intercept.py --restore + python3 intercept.py --audit + python3 intercept.py --status + python3 intercept.py --format json +""" + +import argparse +import csv +import hashlib +import io +import json +import mimetypes +import os +import re +import shutil +import sys +from datetime import datetime +from pathlib import Path + +try: + import yaml + HAS_YAML = True +except ImportError: + HAS_YAML = False + +OPENCLAW_DIR = Path(os.environ.get("OPENCLAW_HOME", Path.home() / ".openclaw")) +STATE_FILE = OPENCLAW_DIR / "skill-state" / "large-file-interceptor" / "state.yaml" +FILE_STORE = OPENCLAW_DIR / "lcm-files" +DEFAULT_THRESHOLD = 25000 # tokens +MAX_HISTORY = 20 + +# File extensions to analyze +TEXT_EXTENSIONS = { + ".json", ".yaml", ".yml", ".csv", ".tsv", ".xml", + ".py", ".js", ".ts", ".jsx", ".tsx", ".go", ".rs", ".java", + ".md", ".txt", ".log", ".conf", ".cfg", ".ini", ".toml", + ".html", ".css", ".sql", ".sh", ".bash", ".zsh", + ".env", ".gitignore", ".dockerfile", +} + + +# ── State helpers ──────────────────────────────────────────────────────────── + +def load_state() -> dict: + if not STATE_FILE.exists(): + return {"intercepted_files": [], "total_tokens_saved": 0, "scan_history": []} + try: + text = STATE_FILE.read_text() + return (yaml.safe_load(text) or {}) if HAS_YAML else {} + except Exception: + return {"intercepted_files": [], "total_tokens_saved": 0, "scan_history": []} + + +def save_state(state: dict) -> None: + STATE_FILE.parent.mkdir(parents=True, exist_ok=True) + if HAS_YAML: + with open(STATE_FILE, "w") as f: + yaml.dump(state, f, default_flow_style=False, allow_unicode=True) + + +def estimate_tokens(text: str) -> int: + return len(text) // 4 + + +def next_ref_id(state: dict) -> str: + existing = state.get("intercepted_files") or [] + return f"ref-{len(existing)+1:03d}" + + +# ── File type detection and structural analysis ────────────────────────────── + +def detect_file_type(path: Path) -> str: + ext = path.suffix.lower() + type_map = { + ".json": "JSON", ".yaml": "YAML", ".yml": "YAML", + ".csv": "CSV", ".tsv": "TSV", + ".py": "Python", ".js": "JavaScript", ".ts": "TypeScript", + ".jsx": "JSX", ".tsx": "TSX", + ".go": "Go", ".rs": "Rust", ".java": "Java", + ".md": "Markdown", ".txt": "Text", + ".log": "Log", ".xml": "XML", ".html": "HTML", + ".css": "CSS", ".sql": "SQL", + ".sh": "Shell", ".bash": "Shell", ".zsh": "Shell", + } + return type_map.get(ext, "Unknown") + + +def analyze_json(content: str) -> str: + """Structural summary for JSON files.""" + try: + data = json.loads(content) + except json.JSONDecodeError: + return "Invalid JSON — parse error" + + lines = [] + if isinstance(data, dict): + lines.append(f"Root: object with {len(data)} keys") + for key, val in list(data.items())[:10]: + if isinstance(val, list): + item_type = type(val[0]).__name__ if val else "empty" + lines.append(f' "{key}": array of {len(val)} {item_type}s') + elif isinstance(val, dict): + lines.append(f' "{key}": object ({len(val)} keys)') + else: + lines.append(f' "{key}": {type(val).__name__} = {str(val)[:50]}') + if len(data) > 10: + lines.append(f" ... +{len(data)-10} more keys") + elif isinstance(data, list): + lines.append(f"Root: array of {len(data)} items") + if data: + item = data[0] + if isinstance(item, dict): + lines.append(f" Item keys: {', '.join(list(item.keys())[:8])}") + sample = json.dumps(item, default=str)[:100] + lines.append(f" Sample: {sample}") + return "\n".join(lines) + + +def analyze_csv(content: str, delimiter: str = ",") -> str: + """Structural summary for CSV/TSV files.""" + lines = [] + reader = csv.reader(io.StringIO(content), delimiter=delimiter) + rows = list(reader) + if not rows: + return "Empty file" + + headers = rows[0] if rows else [] + lines.append(f"Columns ({len(headers)}): {', '.join(headers[:10])}") + if len(headers) > 10: + lines.append(f" ... +{len(headers)-10} more columns") + lines.append(f"Rows: {len(rows)-1} (excluding header)") + + # Sample values + if len(rows) > 1: + sample = rows[1] + for i, (h, v) in enumerate(zip(headers[:5], sample[:5])): + lines.append(f" {h}: {v[:50]}") + return "\n".join(lines) + + +def analyze_python(content: str) -> str: + """Structural summary for Python files.""" + lines = [] + imports = re.findall(r'^(?:import|from)\s+.+', content, re.MULTILINE) + classes = re.findall(r'^class\s+(\w+)', content, re.MULTILINE) + functions = re.findall(r'^def\s+(\w+)\(([^)]*)\)', content, re.MULTILINE) + + if imports: + lines.append(f"Imports ({len(imports)}): {'; '.join(imports[:5])}") + if classes: + lines.append(f"Classes: {', '.join(classes)}") + if functions: + func_sigs = [f"{name}({args[:30]})" for name, args in functions[:10]] + lines.append(f"Functions ({len(functions)}): {', '.join(func_sigs)}") + lines.append(f"Total lines: {content.count(chr(10))+1}") + return "\n".join(lines) + + +def analyze_javascript(content: str) -> str: + """Structural summary for JS/TS files.""" + lines = [] + imports = re.findall(r'^import\s+.+', content, re.MULTILINE) + exports = re.findall(r'^export\s+(?:default\s+)?(?:class|function|const|let|var|interface|type)\s+(\w+)', + content, re.MULTILINE) + classes = re.findall(r'^(?:export\s+)?class\s+(\w+)', content, re.MULTILINE) + functions = re.findall(r'^(?:export\s+)?(?:async\s+)?function\s+(\w+)', content, re.MULTILINE) + + if imports: + lines.append(f"Imports ({len(imports)}): {'; '.join(imports[:5])}") + if exports: + lines.append(f"Exports: {', '.join(exports[:10])}") + if classes: + lines.append(f"Classes: {', '.join(classes)}") + if functions: + lines.append(f"Functions: {', '.join(functions[:10])}") + lines.append(f"Total lines: {content.count(chr(10))+1}") + return "\n".join(lines) + + +def analyze_markdown(content: str) -> str: + """Structural summary for Markdown files.""" + lines = [] + headings = re.findall(r'^(#{1,6})\s+(.+)', content, re.MULTILINE) + word_count = len(content.split()) + link_count = len(re.findall(r'\[([^\]]+)\]\([^)]+\)', content)) + + lines.append(f"Word count: {word_count}") + if headings: + lines.append(f"Headings ({len(headings)}):") + for level, text in headings[:10]: + lines.append(f" {' '*(len(level)-1)}{level} {text}") + lines.append(f"Links: {link_count}") + return "\n".join(lines) + + +def analyze_log(content: str) -> str: + """Structural summary for log files.""" + lines_list = content.split("\n") + lines = [f"Total lines: {len(lines_list)}"] + + # Detect time range + timestamps = re.findall(r'\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}', content) + if timestamps: + lines.append(f"Time range: {timestamps[0]} → {timestamps[-1]}") + + # Error patterns + errors = [l for l in lines_list if re.search(r'(?i)error|exception|fatal|panic', l)] + warns = [l for l in lines_list if re.search(r'(?i)warn', l)] + lines.append(f"Errors: {len(errors)}, Warnings: {len(warns)}") + + if errors: + unique_errors = set() + for e in errors[:20]: + pattern = re.sub(r'\d+', 'N', e.strip()[:80]) + unique_errors.add(pattern) + lines.append(f"Unique error patterns: {len(unique_errors)}") + for p in list(unique_errors)[:3]: + lines.append(f" {p}") + + return "\n".join(lines) + + +def generate_summary(path: Path, content: str) -> str: + """Generate a structural exploration summary based on file type.""" + file_type = detect_file_type(path) + + analyzers = { + "JSON": analyze_json, + "YAML": lambda c: analyze_json(json.dumps(yaml.safe_load(c))) if HAS_YAML else f"YAML file, {len(c)} chars", + "CSV": lambda c: analyze_csv(c, ","), + "TSV": lambda c: analyze_csv(c, "\t"), + "Python": analyze_python, + "JavaScript": analyze_javascript, + "TypeScript": analyze_javascript, + "JSX": analyze_javascript, + "TSX": analyze_javascript, + "Markdown": analyze_markdown, + "Log": analyze_log, + } + + analyzer = analyzers.get(file_type) + if analyzer: + try: + return analyzer(content) + except Exception as e: + return f"Analysis failed: {str(e)[:100]}" + else: + return f"File type: {file_type}\nSize: {len(content)} chars\nLines: {content.count(chr(10))+1}" + + +def generate_reference_card(ref_id: str, path: Path, content: str, summary: str) -> str: + """Generate a compact reference card for an intercepted file.""" + tokens = estimate_tokens(content) + file_type = detect_file_type(path) + + card = f"""[FILE REFERENCE: {ref_id}] +Original: {path} +Size: {len(content):,} bytes (~{tokens:,} tokens) +Type: {file_type} + +Structure: +{summary} + +To retrieve full content: python3 intercept.py --restore {ref_id}""" + return card + + +# ── Commands ───────────────────────────────────────────────────────────────── + +def cmd_scan(state: dict, scan_path: str, threshold: int, fmt: str) -> None: + path = Path(scan_path).resolve() + now = datetime.now().isoformat() + + if not path.exists(): + print(f"Error: path '{scan_path}' not found.") + sys.exit(1) + + # Collect files to scan + files = [] + if path.is_file(): + files = [path] + else: + for ext in TEXT_EXTENSIONS: + files.extend(path.rglob(f"*{ext}")) + + checked = 0 + intercepted = 0 + tokens_saved = 0 + + for fp in files: + try: + content = fp.read_text(errors="replace") + except (PermissionError, OSError): + continue + + checked += 1 + tokens = estimate_tokens(content) + + if tokens <= threshold: + continue + + # Intercept this file + ref_id = next_ref_id(state) + summary = generate_summary(fp, content) + summary_tokens = estimate_tokens(summary + ref_id) + saved = tokens - summary_tokens + + # Store original + FILE_STORE.mkdir(parents=True, exist_ok=True) + file_hash = hashlib.sha256(content.encode()).hexdigest()[:12] + stored_name = f"{ref_id}_{file_hash}{fp.suffix}" + stored_path = FILE_STORE / stored_name + stored_path.write_text(content) + + record = { + "ref_id": ref_id, + "original_path": str(fp), + "stored_path": str(stored_path), + "file_type": detect_file_type(fp), + "original_tokens": tokens, + "summary_tokens": summary_tokens, + "tokens_saved": saved, + "summary": summary, + "intercepted_at": now, + } + + files_list = state.get("intercepted_files") or [] + files_list.append(record) + state["intercepted_files"] = files_list + intercepted += 1 + tokens_saved += saved + + if fmt != "json": + card = generate_reference_card(ref_id, fp, content, summary) + print(f"\n Intercepted: {fp.name} ({tokens:,} tokens → {summary_tokens:,} tokens)") + print(f" Reference card:\n{card}\n") + + state["total_tokens_saved"] = (state.get("total_tokens_saved") or 0) + tokens_saved + state["last_scan_at"] = now + + history = state.get("scan_history") or [] + history.insert(0, { + "scanned_at": now, "path_scanned": str(path), + "files_checked": checked, "files_intercepted": intercepted, + "tokens_saved": tokens_saved, + }) + state["scan_history"] = history[:MAX_HISTORY] + save_state(state) + + if fmt == "json": + print(json.dumps({"files_checked": checked, "files_intercepted": intercepted, + "tokens_saved": tokens_saved}, indent=2)) + else: + print(f"\nScan Complete — {checked} files checked, {intercepted} intercepted, ~{tokens_saved:,} tokens saved") + + +def cmd_summarize(path_str: str, fmt: str) -> None: + path = Path(path_str).resolve() + if not path.exists(): + print(f"Error: file '{path_str}' not found.") + sys.exit(1) + + content = path.read_text(errors="replace") + summary = generate_summary(path, content) + tokens = estimate_tokens(content) + summary_tokens = estimate_tokens(summary) + + if fmt == "json": + print(json.dumps({"file": str(path), "type": detect_file_type(path), + "original_tokens": tokens, "summary_tokens": summary_tokens, + "summary": summary}, indent=2)) + else: + print(f"\nStructural Summary: {path.name}") + print(f" Type: {detect_file_type(path)} | {tokens:,} tokens → ~{summary_tokens:,} tokens") + print("-" * 50) + print(summary) + print() + + +def cmd_list(state: dict, fmt: str) -> None: + files = state.get("intercepted_files") or [] + if fmt == "json": + print(json.dumps({"intercepted_files": files, "total": len(files)}, indent=2)) + else: + print(f"\nIntercepted Files — {len(files)} total") + print("-" * 60) + for f in files: + saved = f.get("tokens_saved", 0) + print(f" {f['ref_id']} {f['file_type']:>10} {f['original_tokens']:>8,} tok → " + f"{f['summary_tokens']:>6,} tok (saved {saved:,})") + print(f" {f['original_path']}") + total = state.get("total_tokens_saved", 0) + print(f"\n Total tokens saved: ~{total:,}") + print() + + +def cmd_restore(state: dict, ref_id: str) -> None: + files = state.get("intercepted_files") or [] + target = next((f for f in files if f["ref_id"] == ref_id), None) + if not target: + print(f"Error: reference '{ref_id}' not found.") + sys.exit(1) + + stored = Path(target["stored_path"]) + if stored.exists(): + print(stored.read_text()) + else: + print(f"Error: stored file not found at {stored}") + sys.exit(1) + + +def cmd_audit(state: dict, fmt: str) -> None: + files = state.get("intercepted_files") or [] + total_original = sum(f.get("original_tokens", 0) for f in files) + total_summary = sum(f.get("summary_tokens", 0) for f in files) + total_saved = state.get("total_tokens_saved", 0) + + if fmt == "json": + print(json.dumps({"files": len(files), "total_original_tokens": total_original, + "total_summary_tokens": total_summary, "total_saved": total_saved}, indent=2)) + else: + print(f"\nContext Budget Audit") + print("-" * 50) + print(f" Intercepted files: {len(files)}") + print(f" Original token cost: ~{total_original:,}") + print(f" Summary token cost: ~{total_summary:,}") + print(f" Total tokens saved: ~{total_saved:,}") + if total_original > 0: + ratio = total_saved / total_original * 100 + print(f" Compression ratio: {ratio:.0f}%") + print() + + # Top consumers + sorted_files = sorted(files, key=lambda f: f.get("original_tokens", 0), reverse=True) + if sorted_files: + print(" Top context consumers:") + for f in sorted_files[:5]: + print(f" {f['ref_id']} {f['original_tokens']:>8,} tok {f['original_path']}") + print() + + +def cmd_status(state: dict) -> None: + last = state.get("last_scan_at", "never") + files = state.get("intercepted_files") or [] + total_saved = state.get("total_tokens_saved", 0) + print(f"\nLarge File Interceptor — Last scan: {last}") + print(f" {len(files)} files intercepted | ~{total_saved:,} tokens saved") + history = state.get("scan_history") or [] + if history: + h = history[0] + print(f" Last: {h.get('files_checked',0)} checked, " + f"{h.get('files_intercepted',0)} intercepted at {h.get('path_scanned','?')}") + print() + + +def main(): + parser = argparse.ArgumentParser(description="Large File Interceptor") + group = parser.add_mutually_exclusive_group(required=True) + group.add_argument("--scan", type=str, metavar="PATH", help="Scan a file or directory") + group.add_argument("--summarize", type=str, metavar="FILE", help="Generate structural summary") + group.add_argument("--list", action="store_true", help="List all intercepted files") + group.add_argument("--restore", type=str, metavar="REF_ID", help="Retrieve original file") + group.add_argument("--audit", action="store_true", help="Show context budget impact") + group.add_argument("--status", action="store_true", help="Last scan summary") + parser.add_argument("--threshold", type=int, default=DEFAULT_THRESHOLD, help="Token threshold (default: 25000)") + parser.add_argument("--format", choices=["text", "json"], default="text") + args = parser.parse_args() + + state = load_state() + if args.scan: + cmd_scan(state, args.scan, args.threshold, args.format) + elif args.summarize: + cmd_summarize(args.summarize, args.format) + elif args.list: + cmd_list(state, args.format) + elif args.restore: + cmd_restore(state, args.restore) + elif args.audit: + cmd_audit(state, args.format) + elif args.status: + cmd_status(state) + + +if __name__ == "__main__": + main() diff --git a/skills/openclaw-native/memory-dag-compactor/SKILL.md b/skills/openclaw-native/memory-dag-compactor/SKILL.md new file mode 100644 index 0000000..b321308 --- /dev/null +++ b/skills/openclaw-native/memory-dag-compactor/SKILL.md @@ -0,0 +1,114 @@ +--- +name: memory-dag-compactor +version: "1.0" +category: openclaw-native +description: Builds hierarchical summary DAGs from MEMORY.md with depth-aware prompts — leaf summaries preserve detail, higher depths condense to durable arcs, preventing information loss during compaction. +stateful: true +cron: "0 23 * * *" +--- + +# Memory DAG Compactor + +## What it does + +Standard memory compaction is lossy — older entries get truncated and details disappear forever. Memory DAG Compactor replaces flat compaction with a **directed acyclic graph (DAG)** of hierarchical summaries inspired by [lossless-claw](https://github.com/Martian-Engineering/lossless-claw)'s Lossless Context Management approach. + +Each depth in the DAG uses a purpose-built prompt tuned for that abstraction level: + +| Depth | Name | What it preserves | Timeline granularity | +|---|---|---|---| +| d0 | Leaf | File operations, timestamps, specific actions, errors | Hours | +| d1 | Condensed | What changed vs. previous context, decisions made | Sessions | +| d2 | Arc | Goal → outcome → carries forward | Days | +| d3+ | Durable | Long-term context that survives weeks of inactivity | Date ranges | + +The raw MEMORY.md entries are never deleted — only organized into a searchable, multi-level summary hierarchy. + +## When to invoke + +- Automatically nightly at 11pm (cron) — compacts the day's memory entries +- When MEMORY.md grows beyond a configurable threshold (default: 200 entries) +- Before a long-running task — ensures memory is compact and searchable +- When the agent reports "I don't remember" for something that should be in memory + +## How to use + +```bash +python3 compact.py --compact # Run leaf + condensation passes +python3 compact.py --compact --depth 0 # Only leaf summaries (d0) +python3 compact.py --compact --depth 2 # Condense up to d2 arcs +python3 compact.py --status # Show DAG stats and health +python3 compact.py --tree # Print the summary DAG as a tree +python3 compact.py --search "deployment issue" # Search across all depths +python3 compact.py --inspect # Show a summary with its children +python3 compact.py --dissolve # Reverse a condensation +python3 compact.py --format json # Machine-readable output +``` + +## Procedure + +**Step 1 — Run compaction** + +```bash +python3 compact.py --compact +``` + +The compactor: +1. Reads all entries from MEMORY.md +2. Groups entries into chunks (default: 20 entries per leaf) +3. Generates d0 leaf summaries preserving operational detail +4. When leaf count exceeds fanout (default: 5), condenses into d1 summaries +5. Repeats condensation at each depth until DAG is within budget +6. Writes the summary DAG to state + +**Step 2 — Search memory across depths** + +```bash +python3 compact.py --search "API migration" +``` + +Searches raw entries and all summary depths. Results ranked by relevance and depth — deeper summaries (d0) are more detailed, shallower (d3+) give the big picture. + +**Step 3 — Inspect and repair** + +```bash +python3 compact.py --tree # Visualize the full DAG +python3 compact.py --inspect s-003 # Show summary with lineage +python3 compact.py --dissolve s-007 # Reverse a bad condensation +``` + +## Depth-aware prompt design + +### d0 (Leaf) — Operational detail +Preserves: timestamps, file paths, commands run, error messages, specific values. Drops: conversational filler, repeated attempts, verbose tool output. + +### d1 (Condensed) — Session context +Preserves: what changed vs. previous state, decisions made and why, blockers encountered. Drops: per-file details, exact timestamps, intermediate steps. + +### d2 (Arc) — Goal-to-outcome arcs +Preserves: goal definition, final outcome, what carries forward, open questions. Drops: session-level detail, individual decisions, specific tools used. + +### d3+ (Durable) — Long-term context +Preserves: project identity, architectural decisions, user preferences, recurring patterns. Drops: anything that wouldn't matter after 2 weeks of inactivity. + +## Configuration + +| Parameter | Default | Description | +|---|---|---| +| `chunk_size` | 20 | Entries per leaf summary | +| `fanout` | 5 | Max children before condensation triggers | +| `max_depth` | 4 | Maximum DAG depth | +| `token_budget` | 8000 | Target token count for assembled context | + +## State + +DAG structure, summary content, and lineage stored in `~/.openclaw/skill-state/memory-dag-compactor/state.yaml`. + +Fields: `last_compact_at`, `dag_nodes`, `dag_edges`, `entry_count`, `compact_history`. + +## Notes + +- Never modifies or deletes MEMORY.md — the DAG is an overlay +- Each summary includes a `[Expand for details about: ...]` footer listing what was compressed +- Dissolve reverses a condensation, restoring child summaries to the active set +- Inspired by lossless-claw's DAG-based summarization hierarchy and depth-aware prompt system diff --git a/skills/openclaw-native/memory-dag-compactor/STATE_SCHEMA.yaml b/skills/openclaw-native/memory-dag-compactor/STATE_SCHEMA.yaml new file mode 100644 index 0000000..44a9761 --- /dev/null +++ b/skills/openclaw-native/memory-dag-compactor/STATE_SCHEMA.yaml @@ -0,0 +1,45 @@ +version: "1.0" +description: DAG-based memory summary hierarchy with depth-aware nodes and lineage tracking. +fields: + last_compact_at: + type: datetime + config: + type: object + description: Compaction configuration + fields: + chunk_size: { type: integer, default: 20 } + fanout: { type: integer, default: 5 } + max_depth: { type: integer, default: 4 } + token_budget: { type: integer, default: 8000 } + dag_nodes: + type: list + description: All summary nodes in the DAG + items: + id: { type: string, description: "Unique summary ID (e.g. s-001)" } + depth: { type: integer, description: "0=leaf, 1+=condensed" } + content: { type: string, description: "Summary text" } + expand_footer: { type: string, description: "What was compressed away" } + token_count: { type: integer } + created_at: { type: datetime } + source_type: { type: enum, values: [entries, summaries] } + source_range: { type: string, description: "Entry range or child summary IDs" } + is_active: { type: boolean, description: "Part of current assembled context" } + dag_edges: + type: list + description: Parent-child relationships in the DAG + items: + parent_id: { type: string } + child_id: { type: string } + entry_count: + type: integer + description: Total MEMORY.md entries tracked + compact_history: + type: list + description: Rolling log of compaction runs (last 20) + items: + compacted_at: { type: datetime } + entries_processed: { type: integer } + leaves_created: { type: integer } + condensations: { type: integer } + max_depth_reached: { type: integer } + total_nodes: { type: integer } diff --git a/skills/openclaw-native/memory-dag-compactor/compact.py b/skills/openclaw-native/memory-dag-compactor/compact.py new file mode 100755 index 0000000..73b4142 --- /dev/null +++ b/skills/openclaw-native/memory-dag-compactor/compact.py @@ -0,0 +1,637 @@ +#!/usr/bin/env python3 +""" +Memory DAG Compactor for openclaw-superpowers. + +Builds hierarchical summary DAGs from MEMORY.md with depth-aware +prompts. Leaf summaries preserve detail; higher depths condense +to durable arcs. + +Usage: + python3 compact.py --compact + python3 compact.py --compact --depth 2 + python3 compact.py --tree + python3 compact.py --search "query" + python3 compact.py --inspect + python3 compact.py --dissolve + python3 compact.py --status + python3 compact.py --format json +""" + +import argparse +import hashlib +import json +import os +import re +import sys +from datetime import datetime +from pathlib import Path + +try: + import yaml + HAS_YAML = True +except ImportError: + HAS_YAML = False + +OPENCLAW_DIR = Path(os.environ.get("OPENCLAW_HOME", Path.home() / ".openclaw")) +STATE_FILE = OPENCLAW_DIR / "skill-state" / "memory-dag-compactor" / "state.yaml" +MEMORY_FILE = OPENCLAW_DIR / "workspace" / "MEMORY.md" +MAX_HISTORY = 20 + +# Default config +DEFAULT_CONFIG = { + "chunk_size": 20, + "fanout": 5, + "max_depth": 4, + "token_budget": 8000, +} + +# ── Depth-aware prompt templates ───────────────────────────────────────────── + +DEPTH_PROMPTS = { + 0: { + "name": "Leaf (d0)", + "instruction": ( + "Summarize these memory entries preserving operational detail.\n" + "KEEP: timestamps, file paths, commands run, error messages, " + "specific values, tool outputs, decisions made.\n" + "DROP: conversational filler, repeated failed attempts (keep final " + "outcome), verbose intermediate steps.\n" + "Timeline granularity: hours.\n" + "End with: [Expand for details about: ]" + ), + }, + 1: { + "name": "Condensed (d1)", + "instruction": ( + "Condense these summaries into a session-level overview.\n" + "KEEP: what changed vs. previous state, decisions made and why, " + "blockers encountered, tools/APIs used.\n" + "DROP: per-file details, exact timestamps, intermediate steps, " + "individual error messages.\n" + "Timeline granularity: sessions.\n" + "End with: [Expand for details about: ]" + ), + }, + 2: { + "name": "Arc (d2)", + "instruction": ( + "Condense these summaries into goal-to-outcome arcs.\n" + "KEEP: goal definition, final outcome, what carries forward, " + "open questions, architectural decisions.\n" + "DROP: session-level detail, individual decisions, specific " + "tools used, intermediate blockers.\n" + "Timeline granularity: days.\n" + "End with: [Expand for details about: ]" + ), + }, + 3: { + "name": "Durable (d3+)", + "instruction": ( + "Condense these summaries into durable long-term context.\n" + "KEEP: project identity, architectural decisions, user preferences, " + "recurring patterns, key relationships.\n" + "DROP: anything that wouldn't matter after 2 weeks of inactivity.\n" + "Timeline granularity: date ranges.\n" + "End with: [Expand for details about: ]" + ), + }, +} + + +# ── State helpers ──────────────────────────────────────────────────────────── + +def load_state() -> dict: + if not STATE_FILE.exists(): + return { + "config": DEFAULT_CONFIG.copy(), + "dag_nodes": [], + "dag_edges": [], + "entry_count": 0, + "compact_history": [], + } + try: + text = STATE_FILE.read_text() + return (yaml.safe_load(text) or {}) if HAS_YAML else {} + except Exception: + return {"config": DEFAULT_CONFIG.copy(), "dag_nodes": [], "dag_edges": [], + "entry_count": 0, "compact_history": []} + + +def save_state(state: dict) -> None: + STATE_FILE.parent.mkdir(parents=True, exist_ok=True) + if HAS_YAML: + with open(STATE_FILE, "w") as f: + yaml.dump(state, f, default_flow_style=False, allow_unicode=True) + + +# ── Memory parsing ─────────────────────────────────────────────────────────── + +def parse_memory(memory_path: Path) -> list[dict]: + """Parse MEMORY.md into individual entries.""" + if not memory_path.exists(): + return [] + text = memory_path.read_text() + entries = [] + current = [] + current_header = "" + idx = 0 + + for line in text.split("\n"): + # Detect entry boundaries: lines starting with - or ## or timestamps + is_boundary = ( + line.startswith("- ") or + line.startswith("## ") or + re.match(r'^\d{4}-\d{2}-\d{2}', line.strip()) + ) + if is_boundary and current: + entries.append({ + "id": f"e-{idx:04d}", + "header": current_header, + "content": "\n".join(current).strip(), + "line_count": len(current), + }) + idx += 1 + current = [line] + current_header = line.strip()[:80] + else: + current.append(line) + if not current_header and line.strip(): + current_header = line.strip()[:80] + + if current: + entries.append({ + "id": f"e-{idx:04d}", + "header": current_header, + "content": "\n".join(current).strip(), + "line_count": len(current), + }) + return entries + + +def estimate_tokens(text: str) -> int: + """Rough token estimate: ~4 chars per token.""" + return len(text) // 4 + + +# ── DAG operations ─────────────────────────────────────────────────────────── + +def gen_summary_id(depth: int, index: int) -> str: + return f"s-d{depth}-{index:03d}" + + +def get_depth_prompt(depth: int) -> dict: + if depth >= 3: + return DEPTH_PROMPTS[3] + return DEPTH_PROMPTS.get(depth, DEPTH_PROMPTS[0]) + + +def generate_leaf_summary(entries: list[dict], depth: int = 0) -> str: + """Generate a deterministic summary from entries using depth-aware rules.""" + prompt = get_depth_prompt(depth) + lines = [] + + if depth == 0: + # Leaf: preserve operational detail + for e in entries: + content = e["content"] + # Keep first 3 lines of each entry for detail + entry_lines = content.split("\n")[:3] + lines.append(" | ".join(l.strip() for l in entry_lines if l.strip())) + topics = extract_topics(entries) + summary = "; ".join(lines[:10]) + if len(lines) > 10: + summary += f" ... (+{len(lines)-10} more entries)" + summary += f"\n[Expand for details about: {', '.join(topics[:5])}]" + + elif depth == 1: + # Condensed: focus on changes and decisions + for e in entries: + content = e.get("content", "") + first_line = content.split("\n")[0].strip() + lines.append(first_line) + topics = extract_topics(entries) + summary = "Session: " + "; ".join(lines[:8]) + if len(lines) > 8: + summary += f" ... (+{len(lines)-8} more)" + summary += f"\n[Expand for details about: {', '.join(topics[:5])}]" + + elif depth == 2: + # Arc: goal → outcome + all_text = " ".join(e.get("content", "") for e in entries) + topics = extract_topics(entries) + summary = f"Arc ({len(entries)} summaries): {all_text[:200].strip()}" + summary += f"\n[Expand for details about: {', '.join(topics[:5])}]" + + else: + # Durable: long-term context only + all_text = " ".join(e.get("content", "") for e in entries) + topics = extract_topics(entries) + summary = f"Durable context: {all_text[:150].strip()}" + summary += f"\n[Expand for details about: {', '.join(topics[:5])}]" + + return summary + + +def extract_topics(entries: list[dict]) -> list[str]: + """Extract key topics from a set of entries.""" + # Simple keyword extraction: find capitalized words, paths, and technical terms + all_text = " ".join(e.get("content", e.get("header", "")) for e in entries) + words = re.findall(r'[A-Z][a-z]+(?:[A-Z][a-z]+)*|[a-z]+(?:[-_][a-z]+)+|/[\w/.-]+', all_text) + # Deduplicate preserving order + seen = set() + topics = [] + for w in words: + low = w.lower() + if low not in seen and len(w) > 3: + seen.add(low) + topics.append(w) + return topics[:10] + + +def compact_entries(state: dict, max_depth: int | None = None) -> dict: + """Run leaf + condensation passes on MEMORY.md entries.""" + config = state.get("config") or DEFAULT_CONFIG + chunk_size = config.get("chunk_size", 20) + fanout = config.get("fanout", 5) + depth_limit = max_depth if max_depth is not None else config.get("max_depth", 4) + + entries = parse_memory(MEMORY_FILE) + if not entries: + return {"entries_processed": 0, "leaves_created": 0, + "condensations": 0, "max_depth_reached": 0} + + nodes = state.get("dag_nodes") or [] + edges = state.get("dag_edges") or [] + + # Track existing entry coverage + existing_entry_ids = set() + for node in nodes: + if node.get("source_type") == "entries": + sr = node.get("source_range", "") + for eid in sr.split(","): + existing_entry_ids.add(eid.strip()) + + # Find unprocessed entries + new_entries = [e for e in entries if e["id"] not in existing_entry_ids] + + if not new_entries: + return {"entries_processed": 0, "leaves_created": 0, + "condensations": 0, "max_depth_reached": 0} + + # Step 1: Create leaf summaries (d0) + leaves_created = 0 + leaf_idx = len([n for n in nodes if n.get("depth") == 0]) + + for i in range(0, len(new_entries), chunk_size): + chunk = new_entries[i:i + chunk_size] + summary_text = generate_leaf_summary(chunk, depth=0) + sid = gen_summary_id(0, leaf_idx) + + topics = extract_topics(chunk) + node = { + "id": sid, + "depth": 0, + "content": summary_text, + "expand_footer": f"Expand for details about: {', '.join(topics[:5])}", + "token_count": estimate_tokens(summary_text), + "created_at": datetime.now().isoformat(), + "source_type": "entries", + "source_range": ", ".join(e["id"] for e in chunk), + "is_active": True, + } + nodes.append(node) + leaf_idx += 1 + leaves_created += 1 + + # Step 2: Condensation passes (d1, d2, d3+) + condensations = 0 + max_depth_reached = 0 + + for depth in range(1, depth_limit + 1): + # Find active nodes at depth-1 + parent_depth = depth - 1 + active_at_depth = [n for n in nodes if n.get("depth") == parent_depth and n.get("is_active")] + + if len(active_at_depth) <= fanout: + break + + # Condense in groups of fanout + condense_idx = len([n for n in nodes if n.get("depth") == depth]) + + for i in range(0, len(active_at_depth), fanout): + group = active_at_depth[i:i + fanout] + if len(group) < 2: + continue + + summary_text = generate_leaf_summary(group, depth=depth) + sid = gen_summary_id(depth, condense_idx) + + node = { + "id": sid, + "depth": depth, + "content": summary_text, + "expand_footer": extract_topics(group), + "token_count": estimate_tokens(summary_text), + "created_at": datetime.now().isoformat(), + "source_type": "summaries", + "source_range": ", ".join(g["id"] for g in group), + "is_active": True, + } + nodes.append(node) + + # Deactivate children and create edges + for g in group: + g["is_active"] = False + edges.append({"parent_id": sid, "child_id": g["id"]}) + + condense_idx += 1 + condensations += 1 + max_depth_reached = max(max_depth_reached, depth) + + state["dag_nodes"] = nodes + state["dag_edges"] = edges + state["entry_count"] = len(entries) + + return { + "entries_processed": len(new_entries), + "leaves_created": leaves_created, + "condensations": condensations, + "max_depth_reached": max_depth_reached, + "total_nodes": len(nodes), + } + + +def dissolve_node(state: dict, node_id: str) -> bool: + """Reverse a condensation — reactivate children, remove parent.""" + nodes = state.get("dag_nodes") or [] + edges = state.get("dag_edges") or [] + + target = None + for n in nodes: + if n["id"] == node_id: + target = n + break + + if not target: + return False + + if target.get("depth", 0) == 0: + print(f"Error: cannot dissolve leaf node {node_id}") + return False + + # Find and reactivate children + child_ids = [e["child_id"] for e in edges if e["parent_id"] == node_id] + for n in nodes: + if n["id"] in child_ids: + n["is_active"] = True + + # Remove parent node and edges + state["dag_nodes"] = [n for n in nodes if n["id"] != node_id] + state["dag_edges"] = [e for e in edges if e["parent_id"] != node_id] + + return True + + +# ── Search ─────────────────────────────────────────────────────────────────── + +def search_dag(state: dict, query: str) -> list[dict]: + """Search across all DAG nodes and raw entries.""" + results = [] + query_lower = query.lower() + tokens = set(query_lower.split()) + + # Search DAG nodes + for node in (state.get("dag_nodes") or []): + content = node.get("content", "").lower() + if query_lower in content or any(t in content for t in tokens): + match_count = sum(1 for t in tokens if t in content) + results.append({ + "type": "summary", + "id": node["id"], + "depth": node.get("depth", 0), + "content": node["content"][:200], + "relevance": match_count / len(tokens) if tokens else 0, + "is_active": node.get("is_active", False), + }) + + # Search raw entries + entries = parse_memory(MEMORY_FILE) + for entry in entries: + content = entry.get("content", "").lower() + if query_lower in content or any(t in content for t in tokens): + match_count = sum(1 for t in tokens if t in content) + results.append({ + "type": "entry", + "id": entry["id"], + "depth": -1, + "content": entry["content"][:200], + "relevance": match_count / len(tokens) if tokens else 0, + "is_active": True, + }) + + results.sort(key=lambda r: (-r["relevance"], r["depth"])) + return results + + +# ── Commands ───────────────────────────────────────────────────────────────── + +def cmd_compact(state: dict, max_depth: int | None, fmt: str) -> None: + result = compact_entries(state, max_depth) + now = datetime.now().isoformat() + state["last_compact_at"] = now + + history = state.get("compact_history") or [] + history.insert(0, {"compacted_at": now, **result}) + state["compact_history"] = history[:MAX_HISTORY] + save_state(state) + + if fmt == "json": + print(json.dumps(result, indent=2)) + else: + print(f"\nMemory DAG Compaction — {datetime.now().strftime('%Y-%m-%d %H:%M')}") + print("-" * 50) + print(f" Entries processed: {result['entries_processed']}") + print(f" Leaves created: {result['leaves_created']}") + print(f" Condensations: {result['condensations']}") + print(f" Max depth reached: {result['max_depth_reached']}") + print(f" Total DAG nodes: {result.get('total_nodes', len(state.get('dag_nodes', [])))}") + print() + + if result["entries_processed"] == 0: + print(" No new entries to compact.") + else: + print(" DAG updated successfully.") + print() + + +def cmd_tree(state: dict, fmt: str) -> None: + nodes = state.get("dag_nodes") or [] + edges = state.get("dag_edges") or [] + + if fmt == "json": + print(json.dumps({"nodes": len(nodes), "edges": len(edges), + "dag": [{"id": n["id"], "depth": n["depth"], + "active": n.get("is_active", False), + "tokens": n.get("token_count", 0)} + for n in nodes]}, indent=2)) + return + + print(f"\nMemory DAG Tree — {len(nodes)} nodes, {len(edges)} edges") + print("-" * 50) + + # Build parent map + children_of = {} + for e in edges: + children_of.setdefault(e["parent_id"], []).append(e["child_id"]) + + # Find roots (nodes with no parent) + child_ids = {e["child_id"] for e in edges} + roots = [n for n in nodes if n["id"] not in child_ids and n.get("is_active")] + + def print_node(node, indent=0): + prefix = " " * indent + active = "+" if node.get("is_active") else "-" + depth_label = f"d{node.get('depth', 0)}" + content_preview = node.get("content", "")[:60].replace("\n", " ") + print(f"{prefix}{active} [{depth_label}] {node['id']} ({node.get('token_count', 0)} tok)") + print(f"{prefix} \"{content_preview}...\"") + for child_id in children_of.get(node["id"], []): + child = next((n for n in nodes if n["id"] == child_id), None) + if child: + print_node(child, indent + 1) + + if not roots: + # Show all leaf nodes if no hierarchy yet + for n in sorted(nodes, key=lambda x: x["id"]): + print_node(n) + else: + for root in sorted(roots, key=lambda x: x["id"]): + print_node(root) + print() + + +def cmd_search(state: dict, query: str, fmt: str) -> None: + results = search_dag(state, query) + + if fmt == "json": + print(json.dumps({"query": query, "results": results[:20]}, indent=2)) + else: + print(f"\nSearch: \"{query}\" — {len(results)} results") + print("-" * 50) + for r in results[:15]: + depth_label = f"d{r['depth']}" if r["depth"] >= 0 else "raw" + active = "+" if r["is_active"] else "-" + print(f" {active} [{depth_label}] {r['id']} (relevance: {r['relevance']:.1%})") + print(f" \"{r['content'][:100]}...\"") + print() + + +def cmd_inspect(state: dict, node_id: str, fmt: str) -> None: + nodes = state.get("dag_nodes") or [] + edges = state.get("dag_edges") or [] + + target = next((n for n in nodes if n["id"] == node_id), None) + if not target: + print(f"Error: node '{node_id}' not found.") + sys.exit(1) + + children = [e["child_id"] for e in edges if e["parent_id"] == node_id] + parents = [e["parent_id"] for e in edges if e["child_id"] == node_id] + + if fmt == "json": + print(json.dumps({"node": target, "children": children, "parents": parents}, indent=2)) + else: + print(f"\nInspect: {node_id}") + print("-" * 50) + print(f" Depth: d{target.get('depth', 0)}") + print(f" Active: {target.get('is_active', False)}") + print(f" Tokens: {target.get('token_count', 0)}") + print(f" Created: {target.get('created_at', '?')}") + print(f" Source: {target.get('source_type', '?')}: {target.get('source_range', '?')}") + print(f" Parents: {', '.join(parents) if parents else 'none (root)'}") + print(f" Children: {', '.join(children) if children else 'none (leaf)'}") + print(f"\n Content:") + for line in target.get("content", "").split("\n"): + print(f" {line}") + print() + + +def cmd_dissolve(state: dict, node_id: str, fmt: str) -> None: + success = dissolve_node(state, node_id) + if success: + save_state(state) + if fmt == "json": + print(json.dumps({"dissolved": node_id, "success": True})) + else: + print(f"\n Dissolved {node_id} — children reactivated.") + else: + if fmt == "json": + print(json.dumps({"dissolved": node_id, "success": False})) + else: + print(f"\n Failed to dissolve {node_id}.") + sys.exit(1) + + +def cmd_status(state: dict) -> None: + nodes = state.get("dag_nodes") or [] + last = state.get("last_compact_at", "never") + entry_count = state.get("entry_count", 0) + + active = [n for n in nodes if n.get("is_active")] + depth_dist = {} + for n in nodes: + d = n.get("depth", 0) + depth_dist[d] = depth_dist.get(d, 0) + 1 + + total_tokens = sum(n.get("token_count", 0) for n in active) + + print(f"\nMemory DAG Compactor — Last compact: {last}") + print("-" * 50) + print(f" Entries tracked: {entry_count}") + print(f" Total DAG nodes: {len(nodes)}") + print(f" Active nodes: {len(active)}") + print(f" Active tokens: ~{total_tokens}") + print(f" Depth distribution:") + for d in sorted(depth_dist.keys()): + print(f" d{d}: {depth_dist[d]} nodes") + print() + + history = state.get("compact_history") or [] + if history: + h = history[0] + print(f" Last run: {h.get('entries_processed', 0)} entries → " + f"{h.get('leaves_created', 0)} leaves, " + f"{h.get('condensations', 0)} condensations") + print() + + +def main(): + parser = argparse.ArgumentParser(description="Memory DAG Compactor") + group = parser.add_mutually_exclusive_group(required=True) + group.add_argument("--compact", action="store_true", help="Run leaf + condensation passes") + group.add_argument("--tree", action="store_true", help="Print summary DAG as a tree") + group.add_argument("--search", type=str, metavar="QUERY", help="Search across all depths") + group.add_argument("--inspect", type=str, metavar="ID", help="Inspect a summary node") + group.add_argument("--dissolve", type=str, metavar="ID", help="Reverse a condensation") + group.add_argument("--status", action="store_true", help="Show DAG stats and health") + parser.add_argument("--depth", type=int, metavar="N", help="Max depth for compaction") + parser.add_argument("--format", choices=["text", "json"], default="text") + args = parser.parse_args() + + state = load_state() + if args.compact: + cmd_compact(state, args.depth, args.format) + elif args.tree: + cmd_tree(state, args.format) + elif args.search: + cmd_search(state, args.search, args.format) + elif args.inspect: + cmd_inspect(state, args.inspect, args.format) + elif args.dissolve: + cmd_dissolve(state, args.dissolve, args.format) + elif args.status: + cmd_status(state) + + +if __name__ == "__main__": + main() diff --git a/skills/openclaw-native/memory-dag-compactor/example-state.yaml b/skills/openclaw-native/memory-dag-compactor/example-state.yaml new file mode 100644 index 0000000..a0f6832 --- /dev/null +++ b/skills/openclaw-native/memory-dag-compactor/example-state.yaml @@ -0,0 +1,90 @@ +# Example runtime state for memory-dag-compactor +last_compact_at: "2026-03-16T23:00:12.445000" +config: + chunk_size: 20 + fanout: 5 + max_depth: 4 + token_budget: 8000 +dag_nodes: + - id: s-d0-000 + depth: 0 + content: | + Built memory-graph-builder skill with graph.py companion script; + pushed to branch skill/memory-graph-builder; PR #28 merged. + Added node/edge extraction from MEMORY.md, Jaccard dedup at 0.7. + [Expand for details about: memory-graph-builder, graph.py, Jaccard, PR] + expand_footer: "memory-graph-builder, graph.py, Jaccard, PR" + token_count: 52 + created_at: "2026-03-16T23:00:10.000000" + source_type: entries + source_range: "e-0000, e-0001, e-0002, e-0003, e-0004" + is_active: false + - id: s-d0-001 + depth: 0 + content: | + Built config-encryption-auditor with audit.py; scans for 8 API key + patterns + 3 token patterns; PR #29 merged. Also built + tool-description-optimizer with 5-dimension scoring; PR #30 merged. + [Expand for details about: config-encryption-auditor, audit.py, optimizer] + expand_footer: "config-encryption-auditor, audit.py, optimizer" + token_count: 58 + created_at: "2026-03-16T23:00:11.000000" + source_type: entries + source_range: "e-0005, e-0006, e-0007, e-0008, e-0009" + is_active: false + - id: s-d1-000 + depth: 1 + content: | + Session: Built 4 OpenLobster-inspired skills (memory-graph-builder, + config-encryption-auditor, tool-description-optimizer, mcp-health-checker). + All merged via PRs #28-#31. README updated to 44 skills. + [Expand for details about: OpenLobster, PRs, README, skills] + expand_footer: "OpenLobster, PRs, README, skills" + token_count: 44 + created_at: "2026-03-16T23:00:12.000000" + source_type: summaries + source_range: "s-d0-000, s-d0-001" + is_active: true +dag_edges: + - parent_id: s-d1-000 + child_id: s-d0-000 + - parent_id: s-d1-000 + child_id: s-d0-001 +entry_count: 10 +compact_history: + - compacted_at: "2026-03-16T23:00:12.445000" + entries_processed: 10 + leaves_created: 2 + condensations: 1 + max_depth_reached: 1 + total_nodes: 3 +# ── Walkthrough ────────────────────────────────────────────────────────────── +# Cron runs nightly at 11pm: python3 compact.py --compact +# +# Memory DAG Compaction — 2026-03-16 23:00 +# ────────────────────────────────────────────────── +# Entries processed: 10 +# Leaves created: 2 +# Condensations: 1 +# Max depth reached: 1 +# Total DAG nodes: 3 +# +# python3 compact.py --tree +# +# Memory DAG Tree — 3 nodes, 2 edges +# ────────────────────────────────────────────────── +# + [d1] s-d1-000 (44 tok) +# "Session: Built 4 OpenLobster-inspired skills..." +# - [d0] s-d0-000 (52 tok) +# "Built memory-graph-builder skill with graph.py..." +# - [d0] s-d0-001 (58 tok) +# "Built config-encryption-auditor with audit.py..." +# +# python3 compact.py --search "encryption" +# +# Search: "encryption" — 2 results +# ────────────────────────────────────────────────── +# - [d0] s-d0-001 (relevance: 100%) +# "Built config-encryption-auditor with audit.py..." +# + [d1] s-d1-000 (relevance: 50%) +# "Session: Built 4 OpenLobster-inspired skills..." diff --git a/skills/openclaw-native/memory-integrity-checker/SKILL.md b/skills/openclaw-native/memory-integrity-checker/SKILL.md new file mode 100644 index 0000000..31fa0c9 --- /dev/null +++ b/skills/openclaw-native/memory-integrity-checker/SKILL.md @@ -0,0 +1,93 @@ +--- +name: memory-integrity-checker +version: "1.0" +category: openclaw-native +description: Validates memory summary DAGs for structural integrity — detects orphan nodes, circular references, token inflation, broken lineage, and stale summaries that corrupt the agent's memory. +stateful: true +cron: "0 3 * * 0" +--- + +# Memory Integrity Checker + +## What it does + +As memory DAGs grow through compaction, they can develop structural problems: orphan nodes with no parent, circular reference loops, summaries that inflated instead of compressing, broken lineage chains, and stale nodes that should have been dissolved. These problems silently corrupt the agent's memory. + +Memory Integrity Checker runs 8 structural checks on the DAG, generates a repair plan, and optionally auto-fixes safe issues. + +Inspired by [lossless-claw](https://github.com/Martian-Engineering/lossless-claw)'s DAG integrity checking system, which detects and repairs corrupted summaries. + +## When to invoke + +- Automatically Sundays at 3am (cron) — weekly structural audit +- After a crash or unexpected shutdown — check for corruption +- When the agent's memory seems inconsistent — diagnose structural issues +- Before a major compaction or prune operation — ensure clean starting state + +## Integrity checks (8 total) + +| Check | What it detects | Severity | +|---|---|---| +| ORPHAN_NODE | Node with no parent and not a root | HIGH | +| CIRCULAR_REF | Circular parent-child loops in the DAG | CRITICAL | +| TOKEN_INFLATION | Summary has more tokens than its combined children | HIGH | +| BROKEN_LINEAGE | Edge references a node ID that doesn't exist | CRITICAL | +| STALE_ACTIVE | Active node older than 30 days with no children | MEDIUM | +| EMPTY_NODE | Node with empty or whitespace-only content | HIGH | +| DUPLICATE_EDGE | Same parent-child edge appears multiple times | LOW | +| DEPTH_MISMATCH | Node's depth doesn't match its position in the DAG | MEDIUM | + +## How to use + +```bash +python3 integrity.py --check # Run all 8 integrity checks +python3 integrity.py --check --fix # Auto-fix safe issues +python3 integrity.py --check --only ORPHAN_NODE # Run a specific check +python3 integrity.py --repair-plan # Generate repair plan without fixing +python3 integrity.py --status # Last check summary +python3 integrity.py --format json # Machine-readable output +``` + +## Procedure + +**Step 1 — Run integrity checks** + +```bash +python3 integrity.py --check +``` + +Runs all 8 checks and reports findings by severity. + +**Step 2 — Review repair plan** + +```bash +python3 integrity.py --repair-plan +``` + +For each finding, shows what the auto-fix would do: +- ORPHAN_NODE → reattach to nearest active root or deactivate +- DUPLICATE_EDGE → remove duplicates +- EMPTY_NODE → deactivate +- STALE_ACTIVE → deactivate + +**Step 3 — Apply safe fixes** + +```bash +python3 integrity.py --check --fix +``` + +Auto-fixes LOW and MEDIUM severity issues. HIGH and CRITICAL require manual review. + +## State + +Check results, finding history, and repair actions stored in `~/.openclaw/skill-state/memory-integrity-checker/state.yaml`. + +Fields: `last_check_at`, `findings`, `check_history`, `repairs_applied`. + +## Notes + +- Reads from memory-dag-compactor's state file — does not maintain its own DAG +- Auto-fix only applies to LOW and MEDIUM severity issues +- CRITICAL issues (circular refs, broken lineage) require manual intervention +- Circular reference detection uses DFS with visited-set tracking +- Token inflation check compares parent tokens vs. sum of children tokens diff --git a/skills/openclaw-native/memory-integrity-checker/STATE_SCHEMA.yaml b/skills/openclaw-native/memory-integrity-checker/STATE_SCHEMA.yaml new file mode 100644 index 0000000..c18b7d9 --- /dev/null +++ b/skills/openclaw-native/memory-integrity-checker/STATE_SCHEMA.yaml @@ -0,0 +1,33 @@ +version: "1.0" +description: DAG integrity check results, findings, and repair history. +fields: + last_check_at: + type: datetime + findings: + type: list + description: Integrity issues found in the most recent check + items: + check: { type: string, description: "Check name (e.g. ORPHAN_NODE)" } + severity: { type: enum, values: [CRITICAL, HIGH, MEDIUM, LOW] } + node_id: { type: string } + detail: { type: string } + auto_fixable: { type: boolean } + check_history: + type: list + description: Rolling log of past checks (last 20) + items: + checked_at: { type: datetime } + nodes_checked: { type: integer } + findings: { type: integer } + critical: { type: integer } + high: { type: integer } + medium: { type: integer } + low: { type: integer } + repairs_applied: + type: list + description: History of auto-fix actions taken + items: + repaired_at: { type: datetime } + check: { type: string } + node_id: { type: string } + action: { type: string } diff --git a/skills/openclaw-native/memory-integrity-checker/example-state.yaml b/skills/openclaw-native/memory-integrity-checker/example-state.yaml new file mode 100644 index 0000000..f2ac1b0 --- /dev/null +++ b/skills/openclaw-native/memory-integrity-checker/example-state.yaml @@ -0,0 +1,71 @@ +# Example runtime state for memory-integrity-checker +last_check_at: "2026-03-16T03:00:12.000000" +findings: + - check: ORPHAN_NODE + severity: HIGH + node_id: s-d1-003 + detail: "Depth 1 node has no parent — should be connected to a d0 parent" + auto_fixable: true + - check: STALE_ACTIVE + severity: MEDIUM + node_id: s-d0-002 + detail: "Active node is 45 days old with no children" + auto_fixable: true + - check: DUPLICATE_EDGE + severity: LOW + node_id: "s-d2-001->s-d1-000" + detail: "Duplicate edge in DAG" + auto_fixable: true +check_history: + - checked_at: "2026-03-16T03:00:12.000000" + nodes_checked: 24 + findings: 3 + critical: 0 + high: 1 + medium: 1 + low: 1 + - checked_at: "2026-03-09T03:00:10.000000" + nodes_checked: 18 + findings: 0 + critical: 0 + high: 0 + medium: 0 + low: 0 +repairs_applied: + - repaired_at: "2026-03-16T03:00:12.000000" + check: DUPLICATE_EDGE + node_id: "s-d2-001->s-d1-000" + action: "Removed duplicate edges" + - repaired_at: "2026-03-16T03:00:12.000000" + check: STALE_ACTIVE + node_id: s-d0-002 + action: "Deactivated stale node s-d0-002" +# ── Walkthrough ────────────────────────────────────────────────────────────── +# Cron runs Sundays at 3am: python3 integrity.py --check --fix +# +# Memory Integrity Check — 2026-03-16 03:00 +# ─────────────────────────────────────────────────────── +# Nodes checked: 24 | Edges: 18 +# Findings: 3 (0 critical, 1 high, 1 medium, 1 low) +# +# ! [ HIGH] ORPHAN_NODE: s-d1-003 +# Depth 1 node has no parent [auto-fixable] +# ~ [ MEDIUM] STALE_ACTIVE: s-d0-002 +# Active node is 45 days old with no children [auto-fixable] +# . [ LOW] DUPLICATE_EDGE: s-d2-001->s-d1-000 +# Duplicate edge in DAG [auto-fixable] +# +# Repairs applied: 2 +# + Removed duplicate edges +# + Deactivated stale node s-d0-002 +# +# Status: DEGRADED +# +# python3 integrity.py --repair-plan +# +# Repair Plan — 3 findings +# ─────────────────────────────────────────────────────── +# Auto-fixable (3): +# ORPHAN_NODE on s-d1-003: Depth 1 node has no parent +# STALE_ACTIVE on s-d0-002: Active node is 45 days old +# DUPLICATE_EDGE on s-d2-001->s-d1-000: Duplicate edge diff --git a/skills/openclaw-native/memory-integrity-checker/integrity.py b/skills/openclaw-native/memory-integrity-checker/integrity.py new file mode 100755 index 0000000..5c48083 --- /dev/null +++ b/skills/openclaw-native/memory-integrity-checker/integrity.py @@ -0,0 +1,489 @@ +#!/usr/bin/env python3 +""" +Memory Integrity Checker for openclaw-superpowers. + +Validates memory summary DAGs for structural integrity — orphan nodes, +circular references, token inflation, broken lineage, stale summaries. + +Usage: + python3 integrity.py --check + python3 integrity.py --check --fix + python3 integrity.py --check --only ORPHAN_NODE + python3 integrity.py --repair-plan + python3 integrity.py --status + python3 integrity.py --format json +""" + +import argparse +import json +import os +import sys +from datetime import datetime, timedelta +from pathlib import Path + +try: + import yaml + HAS_YAML = True +except ImportError: + HAS_YAML = False + +OPENCLAW_DIR = Path(os.environ.get("OPENCLAW_HOME", Path.home() / ".openclaw")) +STATE_FILE = OPENCLAW_DIR / "skill-state" / "memory-integrity-checker" / "state.yaml" +DAG_STATE_FILE = OPENCLAW_DIR / "skill-state" / "memory-dag-compactor" / "state.yaml" +MAX_HISTORY = 20 +STALE_DAYS = 30 + + +# ── State helpers ──────────────────────────────────────────────────────────── + +def load_state() -> dict: + if not STATE_FILE.exists(): + return {"findings": [], "check_history": [], "repairs_applied": []} + try: + text = STATE_FILE.read_text() + return (yaml.safe_load(text) or {}) if HAS_YAML else {} + except Exception: + return {"findings": [], "check_history": [], "repairs_applied": []} + + +def save_state(state: dict) -> None: + STATE_FILE.parent.mkdir(parents=True, exist_ok=True) + if HAS_YAML: + with open(STATE_FILE, "w") as f: + yaml.dump(state, f, default_flow_style=False, allow_unicode=True) + + +def load_dag_state() -> dict: + if not DAG_STATE_FILE.exists(): + return {} + try: + text = DAG_STATE_FILE.read_text() + return (yaml.safe_load(text) or {}) if HAS_YAML else {} + except Exception: + return {} + + +def save_dag_state(dag: dict) -> None: + if HAS_YAML: + with open(DAG_STATE_FILE, "w") as f: + yaml.dump(dag, f, default_flow_style=False, allow_unicode=True) + + +# ── Integrity checks ──────────────────────────────────────────────────────── + +def check_orphan_nodes(nodes: list, edges: list) -> list[dict]: + """Find nodes with no parent that aren't root nodes (depth > 0).""" + findings = [] + child_ids = {e["child_id"] for e in edges} + parent_ids = {e["parent_id"] for e in edges} + + for node in nodes: + nid = node.get("id", "") + depth = node.get("depth", 0) + # Root nodes (not a child of anything) at depth 0 are fine + if nid not in child_ids and depth > 0: + findings.append({ + "check": "ORPHAN_NODE", + "severity": "HIGH", + "node_id": nid, + "detail": f"Depth {depth} node has no parent — should be connected to a d{depth-1} parent", + "auto_fixable": True, + }) + return findings + + +def check_circular_refs(nodes: list, edges: list) -> list[dict]: + """Detect circular parent-child loops using DFS.""" + findings = [] + children_of = {} + for e in edges: + children_of.setdefault(e["parent_id"], []).append(e["child_id"]) + + def has_cycle(start: str, visited: set, stack: set) -> bool: + visited.add(start) + stack.add(start) + for child in children_of.get(start, []): + if child in stack: + return True + if child not in visited and has_cycle(child, visited, stack): + return True + stack.discard(start) + return False + + visited = set() + for node in nodes: + nid = node.get("id", "") + if nid not in visited: + if has_cycle(nid, visited, set()): + findings.append({ + "check": "CIRCULAR_REF", + "severity": "CRITICAL", + "node_id": nid, + "detail": f"Circular reference detected in DAG involving node {nid}", + "auto_fixable": False, + }) + return findings + + +def check_token_inflation(nodes: list, edges: list) -> list[dict]: + """Find summaries with more tokens than their combined children.""" + findings = [] + children_of = {} + for e in edges: + children_of.setdefault(e["parent_id"], []).append(e["child_id"]) + + node_map = {n.get("id"): n for n in nodes} + + for node in nodes: + nid = node.get("id", "") + parent_tokens = node.get("token_count", 0) + child_ids = children_of.get(nid, []) + + if not child_ids or parent_tokens == 0: + continue + + children_tokens = sum( + node_map.get(cid, {}).get("token_count", 0) + for cid in child_ids + ) + + if children_tokens > 0 and parent_tokens > children_tokens: + ratio = round(parent_tokens / children_tokens, 1) + findings.append({ + "check": "TOKEN_INFLATION", + "severity": "HIGH", + "node_id": nid, + "detail": f"Parent ({parent_tokens} tok) > children ({children_tokens} tok) — {ratio}x inflation", + "auto_fixable": False, + }) + return findings + + +def check_broken_lineage(nodes: list, edges: list) -> list[dict]: + """Find edges referencing non-existent node IDs.""" + findings = [] + node_ids = {n.get("id") for n in nodes} + + for edge in edges: + if edge["parent_id"] not in node_ids: + findings.append({ + "check": "BROKEN_LINEAGE", + "severity": "CRITICAL", + "node_id": edge["parent_id"], + "detail": f"Edge references non-existent parent: {edge['parent_id']}", + "auto_fixable": True, + }) + if edge["child_id"] not in node_ids: + findings.append({ + "check": "BROKEN_LINEAGE", + "severity": "CRITICAL", + "node_id": edge["child_id"], + "detail": f"Edge references non-existent child: {edge['child_id']}", + "auto_fixable": True, + }) + return findings + + +def check_stale_active(nodes: list, edges: list) -> list[dict]: + """Find active nodes older than STALE_DAYS with no children.""" + findings = [] + parent_ids = {e["parent_id"] for e in edges} + cutoff = datetime.now() - timedelta(days=STALE_DAYS) + + for node in nodes: + if not node.get("is_active"): + continue + nid = node.get("id", "") + created = node.get("created_at", "") + if nid in parent_ids: + continue # Has children, not stale + + try: + created_dt = datetime.fromisoformat(created) + if created_dt < cutoff: + age_days = (datetime.now() - created_dt).days + findings.append({ + "check": "STALE_ACTIVE", + "severity": "MEDIUM", + "node_id": nid, + "detail": f"Active node is {age_days} days old with no children", + "auto_fixable": True, + }) + except (ValueError, TypeError): + pass + return findings + + +def check_empty_nodes(nodes: list) -> list[dict]: + """Find nodes with empty or whitespace-only content.""" + findings = [] + for node in nodes: + content = node.get("content", "") + if len(content.strip()) < 10: + findings.append({ + "check": "EMPTY_NODE", + "severity": "HIGH", + "node_id": node.get("id", "unknown"), + "detail": f"Node has empty or near-empty content ({len(content.strip())} chars)", + "auto_fixable": True, + }) + return findings + + +def check_duplicate_edges(edges: list) -> list[dict]: + """Find duplicate parent-child edges.""" + findings = [] + seen = set() + for edge in edges: + key = (edge["parent_id"], edge["child_id"]) + if key in seen: + findings.append({ + "check": "DUPLICATE_EDGE", + "severity": "LOW", + "node_id": f"{edge['parent_id']}->{edge['child_id']}", + "detail": "Duplicate edge in DAG", + "auto_fixable": True, + }) + seen.add(key) + return findings + + +def check_depth_mismatch(nodes: list, edges: list) -> list[dict]: + """Check that node depth matches its actual position in the DAG.""" + findings = [] + children_of = {} + for e in edges: + children_of.setdefault(e["parent_id"], []).append(e["child_id"]) + + node_map = {n.get("id"): n for n in nodes} + + for node in nodes: + nid = node.get("id", "") + depth = node.get("depth", 0) + child_ids = children_of.get(nid, []) + + for cid in child_ids: + child = node_map.get(cid, {}) + child_depth = child.get("depth", 0) + if child_depth != depth - 1: + findings.append({ + "check": "DEPTH_MISMATCH", + "severity": "MEDIUM", + "node_id": nid, + "detail": f"Parent d{depth} has child d{child_depth} — expected d{depth-1}", + "auto_fixable": False, + }) + return findings + + +ALL_CHECKS = { + "ORPHAN_NODE": check_orphan_nodes, + "CIRCULAR_REF": check_circular_refs, + "TOKEN_INFLATION": check_token_inflation, + "BROKEN_LINEAGE": check_broken_lineage, + "STALE_ACTIVE": check_stale_active, + "EMPTY_NODE": lambda nodes, edges: check_empty_nodes(nodes), + "DUPLICATE_EDGE": lambda nodes, edges: check_duplicate_edges(edges), + "DEPTH_MISMATCH": check_depth_mismatch, +} + + +# ── Auto-fix ───────────────────────────────────────────────────────────────── + +def apply_fix(dag: dict, finding: dict) -> str | None: + """Apply a safe auto-fix for a finding. Returns action description or None.""" + check = finding["check"] + nid = finding["node_id"] + nodes = dag.get("dag_nodes") or [] + edges = dag.get("dag_edges") or [] + + if check == "ORPHAN_NODE": + for n in nodes: + if n.get("id") == nid: + n["is_active"] = False + return f"Deactivated orphan node {nid}" + + elif check == "EMPTY_NODE": + for n in nodes: + if n.get("id") == nid: + n["is_active"] = False + return f"Deactivated empty node {nid}" + + elif check == "STALE_ACTIVE": + for n in nodes: + if n.get("id") == nid: + n["is_active"] = False + return f"Deactivated stale node {nid}" + + elif check == "DUPLICATE_EDGE": + seen = set() + new_edges = [] + for e in edges: + key = (e["parent_id"], e["child_id"]) + if key not in seen: + seen.add(key) + new_edges.append(e) + dag["dag_edges"] = new_edges + return f"Removed duplicate edges" + + elif check == "BROKEN_LINEAGE": + node_ids = {n.get("id") for n in nodes} + dag["dag_edges"] = [e for e in edges + if e["parent_id"] in node_ids and e["child_id"] in node_ids] + return f"Removed edges referencing non-existent nodes" + + return None + + +# ── Commands ───────────────────────────────────────────────────────────────── + +def cmd_check(state: dict, fix: bool, only: str | None, fmt: str) -> None: + dag = load_dag_state() + nodes = dag.get("dag_nodes") or [] + edges = dag.get("dag_edges") or [] + now = datetime.now().isoformat() + + if not nodes: + print("No DAG nodes found — memory-dag-compactor may not have run yet.") + return + + all_findings = [] + checks_to_run = {only: ALL_CHECKS[only]} if only and only in ALL_CHECKS else ALL_CHECKS + + for name, check_fn in checks_to_run.items(): + findings = check_fn(nodes, edges) + all_findings.extend(findings) + + # Count by severity + counts = {"CRITICAL": 0, "HIGH": 0, "MEDIUM": 0, "LOW": 0} + for f in all_findings: + counts[f["severity"]] = counts.get(f["severity"], 0) + 1 + + # Apply fixes if requested + repairs = [] + if fix: + fixable = [f for f in all_findings if f.get("auto_fixable") and f["severity"] in ("LOW", "MEDIUM")] + for finding in fixable: + action = apply_fix(dag, finding) + if action: + repairs.append({ + "repaired_at": now, + "check": finding["check"], + "node_id": finding["node_id"], + "action": action, + }) + if repairs: + save_dag_state(dag) + existing_repairs = state.get("repairs_applied") or [] + existing_repairs.extend(repairs) + state["repairs_applied"] = existing_repairs[-50:] + + state["last_check_at"] = now + state["findings"] = all_findings + history = state.get("check_history") or [] + history.insert(0, { + "checked_at": now, "nodes_checked": len(nodes), + "findings": len(all_findings), + "critical": counts["CRITICAL"], "high": counts["HIGH"], + "medium": counts["MEDIUM"], "low": counts["LOW"], + }) + state["check_history"] = history[:MAX_HISTORY] + save_state(state) + + if fmt == "json": + print(json.dumps({"nodes_checked": len(nodes), "findings": all_findings, + "counts": counts, "repairs": repairs}, indent=2)) + else: + print(f"\nMemory Integrity Check — {datetime.now().strftime('%Y-%m-%d %H:%M')}") + print("-" * 55) + print(f" Nodes checked: {len(nodes)} | Edges: {len(edges)}") + print(f" Findings: {len(all_findings)} " + f"({counts['CRITICAL']} critical, {counts['HIGH']} high, " + f"{counts['MEDIUM']} medium, {counts['LOW']} low)") + print() + + severity_icons = {"CRITICAL": "!!", "HIGH": "!", "MEDIUM": "~", "LOW": "."} + for f in all_findings: + icon = severity_icons.get(f["severity"], "?") + fix_mark = " [auto-fixable]" if f.get("auto_fixable") else "" + print(f" {icon} [{f['severity']:>8}] {f['check']}: {f['node_id']}") + print(f" {f['detail']}{fix_mark}") + + if repairs: + print(f"\n Repairs applied: {len(repairs)}") + for r in repairs: + print(f" + {r['action']}") + + status = "HEALTHY" if not all_findings else "DEGRADED" if not counts["CRITICAL"] else "CRITICAL" + print(f"\n Status: {status}") + print() + + if counts["CRITICAL"] > 0: + sys.exit(1) + + +def cmd_repair_plan(state: dict, fmt: str) -> None: + dag = load_dag_state() + nodes = dag.get("dag_nodes") or [] + edges = dag.get("dag_edges") or [] + + all_findings = [] + for check_fn in ALL_CHECKS.values(): + all_findings.extend(check_fn(nodes, edges)) + + fixable = [f for f in all_findings if f.get("auto_fixable")] + manual = [f for f in all_findings if not f.get("auto_fixable")] + + if fmt == "json": + print(json.dumps({"auto_fixable": fixable, "manual_required": manual}, indent=2)) + else: + print(f"\nRepair Plan — {len(all_findings)} findings") + print("-" * 55) + if fixable: + print(f"\n Auto-fixable ({len(fixable)}):") + for f in fixable: + print(f" {f['check']} on {f['node_id']}: {f['detail']}") + if manual: + print(f"\n Manual review required ({len(manual)}):") + for f in manual: + print(f" [{f['severity']}] {f['check']} on {f['node_id']}: {f['detail']}") + if not all_findings: + print(" No issues found — DAG is healthy.") + print() + + +def cmd_status(state: dict) -> None: + last = state.get("last_check_at", "never") + findings = state.get("findings") or [] + critical = sum(1 for f in findings if f.get("severity") == "CRITICAL") + print(f"\nMemory Integrity Checker — Last check: {last}") + print(f" {len(findings)} findings | {critical} critical") + repairs = state.get("repairs_applied") or [] + if repairs: + print(f" {len(repairs)} repairs applied total") + print() + + +def main(): + parser = argparse.ArgumentParser(description="Memory Integrity Checker") + group = parser.add_mutually_exclusive_group(required=True) + group.add_argument("--check", action="store_true", help="Run all integrity checks") + group.add_argument("--repair-plan", action="store_true", help="Generate repair plan") + group.add_argument("--status", action="store_true", help="Last check summary") + parser.add_argument("--fix", action="store_true", help="Auto-fix safe issues") + parser.add_argument("--only", type=str, metavar="CHECK", + choices=list(ALL_CHECKS.keys()), help="Run a specific check") + parser.add_argument("--format", choices=["text", "json"], default="text") + args = parser.parse_args() + + state = load_state() + if args.check: + cmd_check(state, args.fix, args.only, args.format) + elif args.repair_plan: + cmd_repair_plan(state, args.format) + elif args.status: + cmd_status(state) + + +if __name__ == "__main__": + main()