Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,371 changes: 43 additions & 1,328 deletions api/workspaces.py

Large diffs are not rendered by default.

Empty file added services/__init__.py
Empty file.
117 changes: 117 additions & 0 deletions services/cli_tabs.py
Comment thread
timon0305 marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
from __future__ import annotations

from datetime import datetime

from flask import current_app, jsonify

from utils.cli_chat_reader import list_cli_projects, messages_to_bubbles, traverse_blobs
from utils.exclusion_rules import build_searchable_text, is_excluded_by_rules
from utils.workspace_path import get_cli_chats_path


def _get_cli_workspace_tabs(workspace_id: str):
"""Return tabs for a Cursor CLI project (workspace_id starts with "cli:")."""
try:
project_id = workspace_id[4:]
cli_projects = list_cli_projects(get_cli_chats_path())
project = next(
(
cp for cp in cli_projects
if isinstance(cp, dict) and cp.get("project_id") == project_id
),
None,
)
if project is None:
return jsonify({"error": "CLI project not found"}), 404

rules = current_app.config.get("EXCLUSION_RULES") or []
ws_name = project.get("workspace_name") or project_id[:12]
sessions = project.get("sessions") or []
if not isinstance(sessions, list):
sessions = []
tabs = []

for session in sessions:
if not isinstance(session, dict):
continue
session_id = session.get("session_id")
if not session_id:
continue
meta = session.get("meta") or {}
created_ms: int = meta.get("createdAt") or int(datetime.now().timestamp() * 1000)
session_name = meta.get("name") or f"Session {session_id[:8]}"

try:
messages = traverse_blobs(session["db_path"])
except Exception as e:
print(f"CLI: could not read session {session_id}: {e}")
continue

try:
bubbles = messages_to_bubbles(messages, created_ms)
except Exception as e:
print(f"CLI: could not convert session {session_id} to bubbles: {e}")
continue
if not bubbles:
continue

# Derive title from first user bubble when name is generic
title = session_name
if not title or title.startswith("New Agent"):
for b in bubbles:
if b["type"] == "user" and b.get("text"):
first_lines = [ln for ln in b["text"].split("\n") if ln.strip()]
if first_lines:
title = first_lines[0][:100]
if len(title) == 100:
title += "..."
break

searchable = build_searchable_text(project_name=ws_name, chat_title=title)
if is_excluded_by_rules(rules, searchable):
continue

# Aggregate metadata
total_tool_calls = 0
tool_breakdown: dict = {}
for b in bubbles:
tcs = (b.get("metadata") or {}).get("toolCalls") or []
total_tool_calls += len(tcs)
for tc in tcs:
tn = tc.get("name", "unknown")
tool_breakdown[tn] = tool_breakdown.get(tn, 0) + 1

tab_meta: dict | None = None
if total_tool_calls or tool_breakdown:
tab_meta = {"totalToolCalls": total_tool_calls or None}
if tool_breakdown:
tab_meta["toolBreakdown"] = tool_breakdown

tab = {
"id": session_id,
"title": title,
"timestamp": created_ms,
"bubbles": [
{
"type": b["type"],
"text": b.get("text", ""),
"timestamp": b.get("timestamp", created_ms),
**({"metadata": b["metadata"]} if b.get("metadata") else {}),
}
for b in bubbles
],
"source": "cli",
}
if tab_meta:
tab_meta_clean = {k: v for k, v in tab_meta.items() if v is not None}
if tab_meta_clean:
tab["metadata"] = tab_meta_clean

tabs.append(tab)

tabs.sort(key=lambda t: t.get("timestamp") or 0, reverse=True)
return jsonify({"tabs": tabs})

except Exception as e:
print(f"Failed to get CLI workspace tabs: {e}")
return jsonify({"error": "Failed to get CLI workspace tabs"}), 500
104 changes: 104 additions & 0 deletions services/workspace_db.py
Comment thread
timon0305 marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
from __future__ import annotations

import json
import os
import sqlite3
from contextlib import closing, contextmanager
from pathlib import Path

from utils.path_helpers import get_workspace_folder_paths
from utils.workspace_descriptor import _read_json_file


def _collect_workspace_entries(workspace_path: str) -> list[dict]:
"""Scan workspace directory and return entries with workspace.json."""
entries = []
try:
for name in os.listdir(workspace_path):
full = os.path.join(workspace_path, name)
if os.path.isdir(full):
wj = os.path.join(full, "workspace.json")
if os.path.isfile(wj):
entries.append({"name": name, "workspaceJsonPath": wj})
except OSError:
# workspace_path missing / not readable / not a directory — return what
# we have so far. OSError covers FileNotFoundError, PermissionError,
# and NotADirectoryError.
pass
return entries


def _collect_invalid_workspace_ids(workspace_entries: list[dict]) -> set[str]:
"""Workspace IDs whose descriptors have no resolvable folder paths."""
invalid: set[str] = set()
for entry in workspace_entries:
try:
wd = _read_json_file(entry["workspaceJsonPath"])
folders = get_workspace_folder_paths(wd)
if not folders:
invalid.add(entry["name"])
except (OSError, ValueError, KeyError, TypeError):
# OSError: workspace.json unreadable. ValueError covers
# json.JSONDecodeError. KeyError / TypeError: malformed entry
# dict. Any of these mean we can't resolve folders → mark invalid,
# matching the pre-narrowing behaviour.
invalid.add(entry["name"])
return invalid


def _build_composer_id_to_workspace_id(workspace_path: str, workspace_entries: list) -> dict:
"""Build mapping: composerId -> workspaceId from per-workspace state.vscdb."""
mapping: dict = {}
for entry in workspace_entries:
db_path = os.path.join(workspace_path, entry["name"], "state.vscdb")
if not os.path.isfile(db_path):
continue
# closing() guarantees .close() on scope exit (issue #17).
# Path.as_uri() percent-encodes reserved chars; ``f"file:{path}"``
# breaks sqlite URI parsing on paths with spaces, ``#``, etc.
db_uri = Path(db_path).resolve().as_uri() + "?mode=ro"
row: tuple | None = None
try:
with closing(sqlite3.connect(db_uri, uri=True)) as conn:
row = conn.execute(
"SELECT value FROM ItemTable WHERE [key] = 'composer.composerData'"
).fetchone()
except sqlite3.Error:
continue
if not (row and row[0]):
continue
try:
data = json.loads(row[0])
except (json.JSONDecodeError, ValueError):
continue
all_composers = data.get("allComposers") if isinstance(data, dict) else None
if not isinstance(all_composers, list):
continue
for c in all_composers:
if not isinstance(c, dict):
continue
cid = c.get("composerId")
if cid:
mapping[cid] = entry["name"]
return mapping


@contextmanager
def _open_global_db(workspace_path: str):
"""Yield (conn, path) for the global-storage SQLite db (read-only); (None, path) if the file is missing."""
global_db_path = os.path.join(workspace_path, "..", "globalStorage", "state.vscdb")
global_db_path = os.path.normpath(global_db_path)
if not os.path.isfile(global_db_path):
yield None, global_db_path
return
db_uri = Path(global_db_path).resolve().as_uri() + "?mode=ro"
try:
conn = sqlite3.connect(db_uri, uri=True)
except sqlite3.Error:
yield None, global_db_path
return
conn.row_factory = sqlite3.Row
try:
yield conn, global_db_path
finally:
conn.close()
Comment thread
timon0305 marked this conversation as resolved.
Loading
Loading