-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgenerate_report.py
More file actions
204 lines (178 loc) · 10.4 KB
/
generate_report.py
File metadata and controls
204 lines (178 loc) · 10.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
# 文件名: generate_report.py
import json
from jinja2 import Environment, FileSystemLoader, TemplateNotFound
import datetime
import pathlib
import logging
# --- 配置 ---
INPUT_JSON = "scan_analysis.json"
TEMPLATE_FILE = "template.html"
OUTPUT_HTML = "disk_report.html"
OUTPUT_MD = "disk_analysis_report.md"
LOG_FILE = "scan.log"
# --- 辅助函数 ---
def format_size(size_bytes: int, human_readable: bool) -> str:
if size_bytes is None: return "N/A"
if human_readable:
if size_bytes > 1024**3: return f"{size_bytes / 1024**3:.2f} GB"
if size_bytes > 1024**2: return f"{size_bytes / 1024**2:.2f} MB"
if size_bytes > 1024: return f"{size_bytes / 1024:.2f} KB"
return f"{size_bytes} Bytes"
else: return f"{size_bytes} Bytes"
def format_duration(seconds: float) -> str:
return str(datetime.timedelta(seconds=int(seconds)))
def create_md_file_link(display_text: str, full_path: str) -> str:
try:
uri = pathlib.Path(full_path).resolve().as_uri()
return f"[{display_text}]({uri})"
except Exception:
return display_text
def convert_to_treemap_data(nodes: list) -> list:
treemap_list = []
for node in nodes:
new_node = { "name": node['name_display'], "value": node['size'], "path": node['path'] }
if node.get('children'):
new_node['children'] = convert_to_treemap_data(node['children'])
treemap_list.append(new_node)
return treemap_list
def compact_single_child_paths(node: dict) -> dict:
if node['type'] == 'file' or not node.get('children'):
return node
compacted_children = [compact_single_child_paths(child) for child in node['children']]
node['children'] = compacted_children
if len(node['children']) == 1 and node['children'][0]['type'] == 'directory':
child = node['children'][0]
child['name'] = f"{node['name']}/{child['name']}"
return child
return node
def prepare_display_names(nodes: list, config: dict) -> list:
if not config.get("ENABLE_REPORT_TRUNCATION", False):
for node in nodes:
node['name_full'] = node['name']
node['name_display'] = node['name']
if node.get('children'):
prepare_display_names(node['children'], config)
return nodes
trigger_len = config.get("REPORT_TRUNCATE_TRIGGER_LEN", 25)
display_len = config.get("REPORT_TRUNCATE_DISPLAY_LEN", 22)
transformed_nodes = []
for node in nodes:
new_node = node.copy()
full_name = new_node['name']
display_name = full_name
name_parts = full_name.split('/')
last_part = name_parts[-1]
if len(last_part) > trigger_len:
prefix = '/'.join(name_parts[:-1]) + '/' if len(name_parts) > 1 else ''
truncated_last_part = last_part[:display_len] + "..."
display_name = prefix + truncated_last_part
new_node['name_full'] = full_name
new_node['name_display'] = display_name
if new_node.get('children'):
new_node['children'] = prepare_display_names(new_node['children'], config)
transformed_nodes.append(new_node)
return transformed_nodes
# --- 数据分析与报告生成 ---
def analyze_and_prepare_data(data: dict) -> dict:
log = logging.getLogger(__name__)
log.info("Analyzing data and preparing context for templates...")
results_tree = data.get('results', [])
config = data.get('scan_config', {})
compacted_tree = [compact_single_child_paths(node) for node in results_tree]
report_tree = prepare_display_names(compacted_tree, config)
total_found_size = sum(item.get('size', 0) for item in compacted_tree)
summary_table = []
if total_found_size > 0:
def flatten_for_table(nodes):
flat_list = [];
for node in nodes:
node_copy = node.copy(); node_copy.pop('children', None); flat_list.append(node_copy)
if node.get('children'): flat_list.extend(flatten_for_table(node['children']))
return flat_list
all_items_for_table = flatten_for_table(report_tree)
for item in all_items_for_table:
size = item.get('size', 0)
percentage = (size / total_found_size) * 100
item['percentage'] = f"{percentage:.2f}%"
summary_table.append(item)
summary_table.sort(key=lambda x: x['size'], reverse=True)
top_10_for_charts = summary_table[:10]
pie_data = [{"value": item['size'], "name": item['name_display']} for item in top_10_for_charts]
bar_data_names = [item['name_display'] for item in reversed(top_10_for_charts)]
bar_data_values = [item['size'] for item in reversed(top_10_for_charts)]
treemap_and_sunburst_data = convert_to_treemap_data(report_tree)
log.info("Data analysis complete.")
return {
"config": config, "metadata": data.get('scan_metadata', {}), "errors": data.get('errors', []),
"results_tree": report_tree,
"analysis": { "total_found_size": total_found_size, "summary_table": summary_table },
"charts_data": {
"pie_data_json": json.dumps(pie_data), "bar_data_names_json": json.dumps(bar_data_names),
"bar_data_values_json": json.dumps(bar_data_values), "treemap_data_json": json.dumps(treemap_and_sunburst_data),
"sunburst_data_json": json.dumps(treemap_and_sunburst_data)
}
}
def generate_markdown_report(context: dict, output_file: str):
log = logging.getLogger(__name__); log.info(f"Generating Markdown report -> {output_file}")
config = context['config']; metadata = context['metadata']; analysis = context['analysis']
def md_format_size(size): return format_size(size, config.get('USE_HUMAN_READABLE_SIZE', True))
try:
with open(output_file, 'w', encoding='utf-8') as f:
root_link = create_md_file_link(f"`{metadata['scan_root']}`", metadata['scan_root'])
f.write("# 磁盘空间分析报告 (Markdown)\n\n"); f.write(f"- **扫描根目录:** {root_link}\n"); f.write(f"- **扫描总耗时:** {format_duration(metadata['scan_duration_seconds'])}\n"); f.write(f"- **扫描配置:** 记录下限 > {config.get('RECORD_THRESHOLD_GB', 'N/A')} GB, 深入扫描下限 > {config.get('DEEP_SCAN_THRESHOLD_GB', 'N/A')} GB\n\n")
f.write("## 扫描结果总览\n\n"); f.write(f"共发现 **{len(analysis['summary_table'])}** 个大项, 总计 **{md_format_size(analysis['total_found_size'])}**.\n\n"); f.write("| 排名 | 名称 | 类型 | 大小 | 占比 |\n|:---:|:---|:---|:---:|:---:|\n")
for i, item in enumerate(analysis['summary_table']):
name_link = create_md_file_link(f"**{item['name_display']}**", item['path'])
item_type = '目录' if item['type'] == 'directory' else '文件'; f.write(f"| {i+1} | {name_link} | {item_type} | `{md_format_size(item['size'])}` | `{item['percentage']}` |\n")
f.write("\n---\n## 详细文件列表\n\n")
def write_md_node(node, indent_level):
indent = " " * indent_level; icon = "📁" if node['type'] == 'directory' else "📄"
name_link = create_md_file_link(f"**{node['name_display']}**", node['path'])
f.write(f"{indent}- {icon} {name_link} ({md_format_size(node['size'])})\n")
if node.get('children'):
for child in sorted(node['children'], key=lambda x: x.get('size', 0), reverse=True): write_md_node(child, indent_level + 1)
for item in sorted(context['results_tree'], key=lambda x: x.get('size', 0), reverse=True): write_md_node(item, 0)
if context['errors']:
f.write("\n\n---\n## 扫描问题摘要\n\n")
for err in context['errors']: path_link = create_md_file_link(f"`{err['path']}`", err['path']); f.write(f"- `{err['message']}`\n - 路径: {path_link}\n")
print(f"✅ Markdown 报告已保存到: {output_file}")
except IOError as e: log.error(f"Failed to write Markdown file {output_file}: {e}", exc_info=True); print(f"❌ 写入Markdown文件时出错: {e}")
def generate_html_report(context: dict, template_file: str, output_file: str):
log = logging.getLogger(__name__); log.info(f"Rendering HTML report using template '{template_file}' -> {output_file}")
env = Environment(loader=FileSystemLoader('.')); env.filters['format_size'] = lambda s: format_size(s, context['config'].get('USE_HUMAN_READABLE_SIZE', True)); env.filters['format_duration'] = format_duration
try:
template = env.get_template(template_file); html_content = template.render(context)
with open(output_file, 'w', encoding='utf-8') as f: f.write(html_content)
print(f"✅ HTML 报告已保存到: {output_file}")
except TemplateNotFound:
err_msg = f"HTML模板文件 '{template_file}' 未找到. 请确保它与脚本在同一目录下."
log.error(err_msg); print(f"❌ 错误: {err_msg}")
except Exception as e: err_msg = f"渲染或保存 HTML 报告时出错: {e}"; log.error(err_msg, exc_info=True); print(f"❌ {err_msg}")
def main():
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - [ReportGenerator] - %(message)s', filename=LOG_FILE, filemode='a')
log = logging.getLogger(__name__)
log.info("--- Report Generation Started ---")
print(f"🔄 正在读取扫描数据文件: {INPUT_JSON}")
# **核心修复: 将 try...except 块恢复为标准的多行格式**
try:
log.info(f"Attempting to open and read {INPUT_JSON}")
with open(INPUT_JSON, 'r', encoding='utf-8') as f:
data = json.load(f)
log.info("Successfully loaded JSON data.")
except FileNotFoundError:
err_msg = f"数据文件 '{INPUT_JSON}' 未找到. 请先运行 'scan_disk.py'."
log.error(err_msg)
print(f"❌ 错误: {err_msg}")
return
except json.JSONDecodeError as e:
err_msg = f"解析JSON文件 '{INPUT_JSON}' 失败. 文件可能已损坏. 错误: {e}"
log.error(err_msg, exc_info=True)
print(f"❌ 错误: {err_msg}")
return
template_context = analyze_and_prepare_data(data)
generate_markdown_report(template_context, OUTPUT_MD)
generate_html_report(template_context, TEMPLATE_FILE, OUTPUT_HTML)
log.info("--- Report Generation Finished ---")
print(f"\n🎉 全部报告生成完毕!\n - 传统报告: {OUTPUT_MD}\n - 交互式网页报告: {OUTPUT_HTML}")
if __name__ == "__main__":
main()