-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmitm_local_cache.py
More file actions
100 lines (82 loc) · 3.06 KB
/
mitm_local_cache.py
File metadata and controls
100 lines (82 loc) · 3.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
from __future__ import annotations
import mimetypes
from pathlib import Path
from mitmproxy import ctx
from mitmproxy import http
from url_vault.pathing import cache_lookup_paths
from url_vault.request_log import record_url_miss
class LocalCacheAddon:
def load(self, loader) -> None:
loader.add_option(
name="cache_root",
typespec=str,
default="",
help="Root cache directory created by url-vault.",
)
loader.add_option(
name="miss_log_path",
typespec=str,
default="",
help="YAML file where cache misses are recorded.",
)
loader.add_option(
name="offline_only",
typespec=bool,
default=True,
help="Return 404 on cache miss instead of forwarding upstream.",
)
def request(self, flow: http.HTTPFlow) -> None:
if flow.request.method not in {"GET", "HEAD"}:
return
cache_root_value = ctx.options.cache_root
if not cache_root_value:
return
cache_root = Path(cache_root_value).expanduser()
matched_path = self._find_cached_path(cache_root, flow.request.pretty_url)
if matched_path is not None:
self._serve_file(flow, matched_path)
return
self._record_miss(flow.request.pretty_url, flow.request.method)
if ctx.options.offline_only:
flow.response = http.Response.make(
404,
b"cache miss\n",
{"Content-Type": "text/plain; charset=utf-8"},
)
def _find_cached_path(self, cache_root: Path, url: str) -> Path | None:
try:
candidates = cache_lookup_paths(url)
except ValueError as exc:
ctx.log.warn(f"Skipping unsupported URL {url!r}: {exc}")
return None
for candidate in candidates:
target = cache_root / Path(candidate)
if target.is_file():
return target
return None
def _serve_file(self, flow: http.HTTPFlow, target: Path) -> None:
content_type, _ = mimetypes.guess_type(str(target))
headers = {
"Content-Type": content_type or "application/octet-stream",
"X-Url-Vault-Cache": "hit",
}
body = b"" if flow.request.method == "HEAD" else target.read_bytes()
flow.response = http.Response.make(200, body, headers)
def _record_miss(self, url: str, request_method: str) -> None:
miss_log_value = ctx.options.miss_log_path
if not miss_log_value:
return
miss_log_path = Path(miss_log_value).expanduser()
try:
entry = record_url_miss(
miss_log_path,
url,
request_method=request_method,
)
except ValueError as exc:
ctx.log.warn(f"Failed to record cache miss for {url!r}: {exc}")
return
ctx.log.info(
f"Recorded cache miss for {url} (count={entry.get('count', '?')})"
)
addons = [LocalCacheAddon()]