-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlock_scan.py
More file actions
237 lines (197 loc) · 8.04 KB
/
Copy pathlock_scan.py
File metadata and controls
237 lines (197 loc) · 8.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
r"""
lock_scan.py -- Read-only overview of all active project locks
Lists all active (non-expired) LOCK*.txt files across all roots configured in
lock_roots.json: path, scope, owner, created, time remaining until expiry.
Legacy TEST.txt/TESTS.txt are listed as active (no expiry format).
Read-only by default: without --write-cache nothing is written. With
--write-cache, only the derived LOCK-CACHE.md artefact(s) are written
(LOCK*.txt files themselves are never modified).
Usage:
python lock_scan.py
python lock_scan.py --json
python lock_scan.py --write-cache
python lock_scan.py --roots-file <path>
--write-cache writes cache(s) as defined in lock_roots.json ("caches" key).
Each cache entry:
{ "name": "system-wide", "path": "/path/to/LOCK-CACHE.md" }
{ "name": "my-workspace", "path": "/path/to/workspace/LOCK-CACHE.md",
"filter_prefix": "/path/to/workspace" } <- optional prefix filter
Canonical spec: LOCK-SYSTEM.md (same directory).
Format/expiry logic: lock_utils.py.
"""
from __future__ import annotations
import argparse
import json
from datetime import datetime, timedelta
from pathlib import Path
import lock_utils
DEFAULT_ROOTS_FILE = Path(__file__).resolve().parent / "lock_roots.json"
# System-wide cache is written next to this script (all roots, no filter).
SYSTEM_CACHE_PATH = Path(__file__).resolve().parent / "LOCK-CACHE.md"
def load_config(roots_file: Path) -> dict:
with open(roots_file, "r", encoding="utf-8") as f:
return json.load(f)
def iter_lock_dirs(config: dict):
"""Generator over all directories to check across all roots,
respecting depth limits and skip-lists from the configuration.
Yields Path objects (directories) in which LOCK*.txt files are searched."""
default_depth = int(config.get("default_max_depth", 4))
shallow_depth = int(config.get("shallow_depth", 2))
skip_dirs = {d.lower() for d in config.get("skip_dirs", [])}
for entry in config.get("roots", []):
root = Path(entry["path"])
if not root.exists() or not root.is_dir():
continue
max_depth = shallow_depth if entry.get("shallow") else default_depth
yield from _walk(root, root, max_depth, skip_dirs)
def _walk(current: Path, root: Path, max_depth: int, skip_dirs: set):
yield current
depth = len(current.relative_to(root).parts)
if depth >= max_depth:
return
try:
children = list(current.iterdir())
except OSError:
return
for child in children:
if not child.is_dir():
continue
if child.name.lower() in skip_dirs:
continue
yield from _walk(child, root, max_depth, skip_dirs)
def _format_remaining(delta: timedelta) -> str:
secs = int(delta.total_seconds())
if secs < 0:
return "expired"
h, rem = divmod(secs, 3600)
m, _ = divmod(rem, 60)
return f"{h}h{m:02d}m"
def collect_locks(config: dict, now: datetime | None = None) -> list[dict]:
now = now or datetime.now()
seen: set[Path] = set()
out: list[dict] = []
for d in iter_lock_dirs(config):
if d in seen:
continue
seen.add(d)
for name, scope, is_legacy in lock_utils.active_locks(d, now):
lock_path = d / name
created, expires, source = lock_utils.lock_created_and_expiry(lock_path)
data = lock_utils.parse_lock_file(lock_path)
if is_legacy:
remaining = "legacy"
else:
remaining = _format_remaining((created + expires) - now)
out.append({
"path": str(lock_path),
"scope": scope,
"legacy": is_legacy,
"owner": data.get("owner", ""),
"created": created.isoformat(timespec="minutes"),
"created_source": source,
"expires_after": str(expires),
"remaining": remaining,
})
out.sort(key=lambda r: r["path"])
return out
def _md_escape(value: str) -> str:
"""Escape pipes in table cells, strip line breaks."""
return value.replace("|", "\\|").replace("\n", " ").replace("\r", " ").strip()
def render_cache(locks: list[dict], scanned_at: datetime, title: str) -> str:
"""Render a LOCK-CACHE.md (auto-generated) from the lock list."""
lines = [
"<!-- AUTO-GENERATED by lock_scan.py --write-cache. DO NOT edit manually. "
"The authoritative source is the LOCK*.txt files themselves. -->",
"",
f"# {title}",
"",
f"As of: {scanned_at.isoformat(timespec='seconds')}",
"",
f"Active locks: {len(locks)}",
"",
"| Path | scope | owner | created | remaining |",
"|---|---|---|---|---|",
]
for r in locks:
path = r["path"] + (" (legacy)" if r["legacy"] else "")
owner = r["owner"] or "?"
lines.append(
f"| {_md_escape(path)} | {_md_escape(r['scope'])} | {_md_escape(owner)} "
f"| {_md_escape(r['created'])} | {_md_escape(r['remaining'])} |"
)
if not locks:
lines.append("| _(no active locks)_ | | | | |")
return "\n".join(lines) + "\n"
def write_caches(locks: list[dict], scanned_at: datetime, config: dict) -> list[tuple[Path, int]]:
"""Write cache file(s) as defined by the 'caches' key in lock_roots.json.
Falls back to a single system-wide cache next to this script if not configured.
Each cache entry may have an optional 'filter_prefix' to restrict which locks appear.
Returns list of (path, count) for each written cache."""
results: list[tuple[Path, int]] = []
cache_defs: list[dict] = config.get("caches", [])
if not cache_defs:
# Default: one system-wide cache next to this script.
SYSTEM_CACHE_PATH.write_text(
render_cache(locks, scanned_at, "LOCK-CACHE (all roots)"),
encoding="utf-8",
)
results.append((SYSTEM_CACHE_PATH, len(locks)))
return results
for entry in cache_defs:
cache_path = Path(entry["path"])
title = entry.get("name", cache_path.name)
prefix = entry.get("filter_prefix")
if prefix:
prefix_lower = str(prefix).lower()
filtered = [r for r in locks if r["path"].lower().startswith(prefix_lower)]
else:
filtered = locks
cache_path.parent.mkdir(parents=True, exist_ok=True)
cache_path.write_text(
render_cache(filtered, scanned_at, f"LOCK-CACHE — {title}"),
encoding="utf-8",
)
results.append((cache_path, len(filtered)))
return results
def main() -> int:
parser = argparse.ArgumentParser(
description="List all active project locks (LOCK*.txt) across all configured roots (read-only)."
)
parser.add_argument("--json", action="store_true", help="Output as JSON.")
parser.add_argument(
"--write-cache",
action="store_true",
help="Write LOCK-CACHE.md as configured in lock_roots.json ('caches' key).",
)
parser.add_argument(
"--roots-file",
default=str(DEFAULT_ROOTS_FILE),
help="Path to lock_roots.json.",
)
args = parser.parse_args()
config = load_config(Path(args.roots_file))
scanned_at = datetime.now()
locks = collect_locks(config, scanned_at)
if args.write_cache:
written = write_caches(locks, scanned_at, config)
for path, count in written:
print(f"lock_scan --write-cache: {path} ({count} active lock(s))")
return 0
if args.json:
print(json.dumps(locks, ensure_ascii=False, indent=2))
return 0
if not locks:
print("lock_scan: no active locks found.")
return 0
print(f"lock_scan: {len(locks)} active lock(s):")
for r in locks:
legacy = " [LEGACY]" if r["legacy"] else ""
owner = r["owner"] or "?"
print(f" {r['path']}{legacy}")
print(
f" scope={r['scope']} owner={owner} created={r['created']} "
f"({r['created_source']}) remaining={r['remaining']}"
)
return 0
if __name__ == "__main__":
raise SystemExit(main())