-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathAnalyze.py
More file actions
1051 lines (956 loc) · 38.5 KB
/
Analyze.py
File metadata and controls
1051 lines (956 loc) · 38.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import argparse
import asyncio
from pathlib import Path
from typing import Optional
from core.orchestrator import AnalysisOrchestrator
from core.context import AnalysisConfig
from services.language_detector import LanguageDetector
from utils.case import resolve_case, discover_cve_assets
from utils.logger import setup_logging, get_logger, print_user_success, print_user_error, print_user_warning, print_user_info
from config import list_available_providers, get_llm_config_by_provider, LLMRole, list_siliconflow_models, get_llm_config
from tools.codeql_compose import CodeQLComposeTool
from services.llm_service import MultiAgentAnalyzer
from agents.unified_source_analysis_agent import UnifiedSourceAnalysisAgent
from utils.project_importer import import_project, ProjectImportResult
# 初始化日志系统
setup_logging(level="INFO")
logger = get_logger(__name__)
async def run_case_analysis(
case_id: str,
refresh_intel: bool = False,
stream: bool = False,
output_file: Optional[str] = None,
provider: Optional[str] = None,
think_model: Optional[str] = None,
chat_model: Optional[str] = None,
api_key: Optional[str] = None,
base_url: Optional[str] = None,
enable_error_tidy: bool = False,
language: Optional[str] = None,
enable_source_sink_fallback: bool = False,
enable_sink_source_verification: bool = False,
verification_retry_max: int = 3,
verification_timeout: int = 30,
) -> None:
"""
运行完整的案例分析
Args:
case_id: 案例ID,如 "CVE-2021-21985"
refresh_intel: 是否强制刷新情报数据
stream: 是否显示AI思考过程
output_file: 自定义输出文件名
provider: 模型提供商名称(deepseek/siliconflow/zhipu/kimi/gemini)
think_model: 推理模型名称(覆盖默认模型)
chat_model: 对话模型名称(覆盖默认模型)
api_key: API Key(覆盖环境变量)
base_url: Base URL(覆盖环境变量)
enable_error_tidy: 是否启用错误整理功能
language: 强制指定编程语言(跳过自动检测)
"""
# 创建配置
config = AnalysisConfig(
show_thinking=stream,
refresh_intel=refresh_intel,
output_file=output_file, # None 表示使用时间戳目录
llm_provider=provider,
think_model=think_model,
chat_model=chat_model,
api_key=api_key,
base_url=base_url,
enable_error_tidy=enable_error_tidy,
enable_source_sink_fallback=enable_source_sink_fallback,
enable_sink_source_verification=enable_sink_source_verification,
verification_retry_max=verification_retry_max,
verification_timeout=verification_timeout,
)
# 显示模型提供商信息
try:
# 使用配置中的参数获取模型配置
chat_config = get_llm_config(
LLMRole.CHAT,
provider_name=provider,
model_name=chat_model,
api_key=api_key,
base_url=base_url
)
think_config = get_llm_config(
LLMRole.THINK,
provider_name=provider,
model_name=think_model,
api_key=api_key,
base_url=base_url
)
provider_display = provider or chat_config.provider or "环境变量配置"
print_user_info(f"🤖 使用模型提供商: {provider_display}")
print_user_info(f" 💭 推理模型: {think_config.model}")
print_user_info(f" 💬 对话模型: {chat_config.model}")
if base_url:
print_user_info(f" 🔗 Base URL: {base_url}")
logger.info(f"使用模型提供商: {provider_display}, 推理模型: {think_config.model}, 对话模型: {chat_config.model}, Base URL: {chat_config.base_url}")
except Exception as e:
print_user_error(f"❌ 无法配置模型: {e}")
logger.error(f"模型配置失败: {e}", exc_info=True)
return
# 创建并运行编排器
orchestrator = AnalysisOrchestrator(config)
result = await orchestrator.analyze_case(case_id, language=language)
# 显示结果摘要
if result.success:
print_user_success(f"\n🎉 分析成功完成!")
print_user_info(f"📋 案例ID: {result.case_id}")
print_user_info(f"💻 编程语言: {result.language}")
if result.execution_time:
print_user_info(f"⏱️ 执行时间: {result.execution_time:.2f}秒")
if result.is_complete():
print_user_success("✅ 所有分析步骤都成功完成")
else:
print_user_warning("⚠️ 部分分析步骤未完成")
if config.output_file:
print_user_info(f"📄 分析报告已保存到: {config.output_file}")
else:
logger.info(f"分析结果已保存到时间戳目录")
else:
print_user_error(f"\n❌ 分析失败: {result.error_message}")
logger.error(f"案例分析失败: {result.error_message}")
async def run_md_direct_codeql(
md_file_path: str,
stream: bool = False,
provider: Optional[str] = None,
think_model: Optional[str] = None,
chat_model: Optional[str] = None,
api_key: Optional[str] = None,
base_url: Optional[str] = None,
database_path: Optional[str] = None,
language: str = "java",
enable_error_tidy: bool = False,
enable_source_sink_fallback: bool = False,
fallback_empty_retry_max: int = 5,
exec_mode: str = "analyze",
prev_ql_file: Optional[str] = None,
) -> None:
"""
从MD文件直接生成CodeQL查询
Args:
md_file_path: MD文件路径
stream: 是否显示AI思考过程
provider: 模型提供商名称
think_model: 推理模型名称
chat_model: 对话模型名称
api_key: API Key
base_url: Base URL
database_path: CodeQL数据库路径
language: 编程语言
enable_error_tidy: 是否启用错误整理功能
prev_ql_file: 上一次的QL语句文件路径(仅用于fallback_only模式测试)
"""
# 验证MD文件存在性
md_path = Path(md_file_path)
if not md_path.exists():
print_user_error(f"❌ MD文件不存在: {md_file_path}")
logger.error(f"MD文件不存在: {md_file_path}")
return
if not md_path.suffix.lower() == '.md':
print_user_error(f"❌ 文件格式错误,需要.md文件: {md_file_path}")
logger.error(f"文件格式错误: {md_file_path}")
return
# 读取MD文件内容
try:
md_content = md_path.read_text(encoding='utf-8')
print_user_info(f"📄 成功读取MD文件: {md_file_path}")
logger.info(f"读取MD文件: {md_file_path}, 内容长度: {len(md_content)}")
except Exception as e:
print_user_error(f"❌ 读取MD文件失败: {e}")
logger.error(f"读取MD文件失败: {e}", exc_info=True)
return
prev_ql = None
if prev_ql_file:
prev_ql_path = Path(prev_ql_file)
if not prev_ql_path.exists():
print_user_error(f"❌ 指定的QL文件不存在: {prev_ql_file}")
return
try:
prev_ql = prev_ql_path.read_text(encoding='utf-8')
print_user_info(f"📄 成功读取上一次QL文件: {prev_ql_file}")
except Exception as e:
print_user_error(f"❌ 读取上一次QL文件失败: {e}")
return
# 如果没有指定数据库路径,尝试从当前目录查找
if not database_path:
current_dir = Path.cwd()
for db_dir in current_dir.glob("*.db"):
database_path = str(db_dir)
break
if not database_path:
print_user_error("❌ 未找到CodeQL数据库,请使用 --database-path 参数指定")
logger.error("未找到CodeQL数据库")
return
# 显示模型配置信息
try:
chat_config = get_llm_config(
LLMRole.CHAT,
provider_name=provider,
model_name=chat_model,
api_key=api_key,
base_url=base_url
)
think_config = get_llm_config(
LLMRole.THINK,
provider_name=provider,
model_name=think_model,
api_key=api_key,
base_url=base_url
)
provider_display = provider or chat_config.provider or "环境变量配置"
print_user_info(f"🤖 使用模型提供商: {provider_display}")
print_user_info(f" 💭 推理模型: {think_config.model}")
print_user_info(f" 💬 对话模型: {chat_config.model}")
if base_url:
print_user_info(f" 🔗 Base URL: {base_url}")
except Exception as e:
print_user_error(f"❌ 无法配置模型: {e}")
logger.error(f"模型配置失败: {e}", exc_info=True)
return
# 创建MultiAgentAnalyzer
try:
# 使用聊天模型配置作为主要配置
analyzer = MultiAgentAnalyzer(config=chat_config)
print_user_info(f"🔧 成功创建AI分析器")
except Exception as e:
print_user_error(f"❌ 创建AI分析器失败: {e}")
logger.error(f"创建AI分析器失败: {e}", exc_info=True)
return
# 创建CodeQLComposeTool
try:
codeql_tool = CodeQLComposeTool(
analyzer=analyzer,
database_path=database_path,
language=language,
max_rounds=5,
enable_error_tidy=enable_error_tidy,
enable_source_sink_fallback=enable_source_sink_fallback,
fallback_empty_retry_max=fallback_empty_retry_max,
)
print_user_info(f"🛠️ 成功创建CodeQL生成工具")
print_user_info(f" 📁 数据库路径: {database_path}")
print_user_info(f" 💻 编程语言: {language}")
except Exception as e:
print_user_error(f"❌ 创建CodeQL工具失败: {e}")
logger.error(f"创建CodeQL工具失败: {e}", exc_info=True)
return
# 执行CodeQL生成
try:
print_user_info(f"🚀 开始从MD文件生成CodeQL查询...")
# 使用MD文件内容作为需求描述
requirement = f"根据以下漏洞描述生成CodeQL查询:\n\n{md_content}"
result = await codeql_tool._arun(
requirement=requirement,
exec_mode=exec_mode,
show_thinking=stream,
cve_analysis_report=requirement if exec_mode == "fallback_only" else None,
source_analysis_report=None,
sink_analysis_report=None,
prev_ql=prev_ql,
)
# 输出结果
print_user_success(f"\n🎉 CodeQL生成完成!")
print_user_info(f"📋 查询结果:")
print(result)
logger.info(f"CodeQL生成成功完成")
except Exception as e:
print_user_error(f"❌ CodeQL生成失败: {e}")
logger.error(f"CodeQL生成失败: {e}", exc_info=True)
return
# Debug功能
async def run_md_source_analysis(
md_file_path: str,
src_path: str,
stream: bool = False,
provider: Optional[str] = None,
think_model: Optional[str] = None,
chat_model: Optional[str] = None,
api_key: Optional[str] = None,
base_url: Optional[str] = None,
language: Optional[str] = None,
output_file: Optional[str] = None
) -> None:
"""
从MD文件和源代码路径生成source点分析报告
Args:
md_file_path: MD文件路径
src_path: 源代码路径
stream: 是否显示AI思考过程
provider: 模型提供商名称
think_model: 推理模型名称
chat_model: 对话模型名称
api_key: API Key
base_url: Base URL
language: 编程语言(可选,不指定则自动检测)
output_file: 输出文件路径
"""
# 验证MD文件存在性
md_path = Path(md_file_path)
if not md_path.exists():
print_user_error(f"❌ MD文件不存在: {md_file_path}")
logger.error(f"MD文件不存在: {md_file_path}")
return
if not md_path.suffix.lower() == '.md':
print_user_error(f"❌ 文件格式错误,需要.md文件: {md_file_path}")
logger.error(f"文件格式错误: {md_file_path}")
return
# 验证源代码路径存在性
src_path_obj = Path(src_path)
if not src_path_obj.exists():
print_user_error(f"❌ 源代码路径不存在: {src_path}")
logger.error(f"源代码路径不存在: {src_path}")
return
# 读取MD文件内容
try:
md_content = md_path.read_text(encoding='utf-8')
print_user_info(f"📄 成功读取MD文件: {md_file_path}")
logger.info(f"读取MD文件: {md_file_path}, 内容长度: {len(md_content)}")
except Exception as e:
print_user_error(f"❌ 读取MD文件失败: {e}")
logger.error(f"读取MD文件失败: {e}", exc_info=True)
return
# 自动检测编程语言(如果未指定)
if not language:
try:
detector = LanguageDetector()
# 创建临时路径对象用于检测
temp_case_paths = type('CasePaths', (), {
'root': src_path_obj,
'source_dir': src_path_obj
})()
language = detector.detect_language(temp_case_paths)
print_user_info(f"🔍 自动检测编程语言: {language}")
except Exception as e:
print_user_warning(f"⚠️ 无法自动检测语言,使用默认语言 'java': {e}")
language = "java"
# 显示模型配置信息
try:
chat_config = get_llm_config(
LLMRole.CHAT,
provider_name=provider,
model_name=chat_model,
api_key=api_key,
base_url=base_url
)
think_config = get_llm_config(
LLMRole.THINK,
provider_name=provider,
model_name=think_model,
api_key=api_key,
base_url=base_url
)
provider_display = provider or chat_config.provider or "环境变量配置"
print_user_info(f"🤖 使用模型提供商: {provider_display}")
print_user_info(f" 💭 推理模型: {think_config.model}")
print_user_info(f" 💬 对话模型: {chat_config.model}")
if base_url:
print_user_info(f" 🔗 Base URL: {base_url}")
except Exception as e:
print_user_error(f"❌ 无法配置模型: {e}")
logger.error(f"模型配置失败: {e}", exc_info=True)
return
# 创建MultiAgentAnalyzer
try:
analyzer = MultiAgentAnalyzer(config=chat_config)
print_user_info(f"🔧 成功创建AI分析器")
except Exception as e:
print_user_error(f"❌ 创建AI分析器失败: {e}")
logger.error(f"创建AI分析器失败: {e}", exc_info=True)
return
# 创建UnifiedSourceAnalysisAgent
try:
source_agent = UnifiedSourceAnalysisAgent(
analyzer=analyzer,
source_root=src_path,
database_path=None # 不需要CodeQL数据库
)
print_user_info(f"🔍 成功创建Source点分析工具")
print_user_info(f" 📁 源代码路径: {src_path}")
print_user_info(f" 💻 编程语言: {language}")
except Exception as e:
print_user_error(f"❌ 创建Source分析工具失败: {e}")
logger.error(f"创建Source分析工具失败: {e}", exc_info=True)
return
# 执行Source点分析
try:
print_user_info(f"🚀 开始从MD文件生成Source点分析报告...")
# 使用MD文件内容作为sink分析结果(模拟)
sink_analysis = f"根据以下漏洞描述分析Source点:\n\n{md_content}"
result = await source_agent.analyze_sources(
language=language,
sink_analysis=sink_analysis,
show_thinking=stream
)
if result.success:
print_user_success(f"\n🎉 Source点分析完成!")
report_content = f"""# Source点分析报告
## 分析配置
- **MD文件**: {md_file_path}
- **源代码路径**: {src_path}
- **编程语言**: {language}
- **分析时间**: {__import__('datetime').datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
## 漏洞描述
{md_content}
## Source点分析结果
{result.content}
---
*报告由 PureAutoCodeQL 自动生成*
"""
# 保存到文件
if output_file:
output_path = Path(output_file)
else:
timestamp = __import__('datetime').datetime.now().strftime('%Y%m%d_%H%M%S')
output_path = Path(f"source_analysis_report_{timestamp}.md")
try:
output_path.write_text(report_content, encoding='utf-8')
print_user_info(f"📄 分析报告已保存到: {output_path}")
except Exception as e:
print_user_error(f"❌ 保存报告失败: {e}")
logger.error(f"保存报告失败: {e}", exc_info=True)
# 即使保存失败,也显示结果到控制台
print_user_info(f"📋 分析结果:")
print(report_content)
else:
print_user_error(f"\n❌ Source点分析失败: {result.error}")
logger.error(f"Source点分析失败: {result.error}")
except Exception as e:
print_user_error(f"❌ Source点分析失败: {e}")
logger.error(f"Source点分析失败: {e}", exc_info=True)
return
def list_available_cases() -> None:
"""列出所有可用的案例"""
projects_dir = Path("projects")
if not projects_dir.exists():
print_user_error("projects目录不存在")
logger.error("projects目录不存在")
return
# 排除特殊文件夹:模板目录和镜像目录
excluded_dirs = {"case-template", "python_kb"}
case_dirs = [
d for d in projects_dir.iterdir()
if d.is_dir() and d.name not in excluded_dirs
]
if not case_dirs:
print_user_error("没有找到任何案例")
logger.warning("没有找到任何案例")
return
print("📁 可用的案例:")
for case_dir in sorted(case_dirs):
try:
case_paths = resolve_case(case_dir.name)
cve_assets = discover_cve_assets(case_paths)
print(f" 📂 {case_dir.name} -> {cve_assets.cve_id}")
except Exception as e:
print_user_warning(f" 📂 {case_dir.name} -> (解析失败: {e})")
logger.warning(f"解析案例 {case_dir.name} 失败: {e}", exc_info=True)
async def validate_case(case_id: str) -> bool:
"""验证案例是否有效"""
try:
case_paths = resolve_case(case_id)
cve_assets = discover_cve_assets(case_paths)
print_user_success(f"✅ 案例 {case_id} 验证通过")
print_user_info(f" 📁 根目录: {case_paths.root}")
print_user_info(f" 🎯 CVE ID: {cve_assets.cve_id}")
print_user_info(f" 📄 JSON文件: {cve_assets.json_path}")
if cve_assets.diff_path:
file_type = "Diff" if cve_assets.diff_path.suffix == ".diff" else "Patch"
print_user_info(f" 🔄 {file_type}文件: {cve_assets.diff_path}")
else:
print_user_warning(f" ⚠️ 没有Diff/Patch文件")
# 检测语言
detector = LanguageDetector()
language = detector.detect_language(case_paths)
print_user_info(f" 💻 检测语言: {language}")
logger.info(f"案例 {case_id} 验证通过,语言: {language}")
return True
except Exception as e:
print_user_error(f"❌ 案例 {case_id} 验证失败: {e}")
logger.error(f"案例 {case_id} 验证失败: {e}", exc_info=True)
return False
def run_project_import(
source_path: str,
case_id: Optional[str] = None,
overwrite: bool = False,
language: Optional[str] = None,
skip_codeql: bool = False,
build_command: Optional[str] = None,
build_script: Optional[str] = None,
build_workdir: Optional[str] = None,
) -> None:
"""导入外部CVE项目目录"""
print_user_info(f"🚚 开始导入目录: {source_path}")
if case_id:
print_user_info(f"🎯 指定案例ID: {case_id}")
if overwrite:
print_user_warning("⚠️ 已启用覆盖模式,若同名案例存在将被替换")
if skip_codeql:
print_user_warning("⏭️ 将跳过 CodeQL 数据库创建")
if build_command:
print_user_info(f"⚙️ 构建命令: {build_command}")
if build_script:
print_user_info(f"📜 构建脚本: {build_script}")
if build_workdir:
print_user_info(f"📂 构建目录: {build_workdir}")
try:
result: ProjectImportResult = import_project(
source_path=source_path,
case_id=case_id,
overwrite=overwrite,
language=language,
create_codeql_db=not skip_codeql,
build_command=build_command,
build_script=build_script,
build_workdir=build_workdir,
)
except FileNotFoundError as exc:
print_user_error(f"❌ 输入路径不存在: {exc}")
logger.error(f"导入失败: {exc}")
return
except ValueError as exc:
print_user_error(f"❌ 参数错误: {exc}")
logger.error(f"导入失败: {exc}")
return
except Exception as exc: # pylint: disable=broad-except
print_user_error(f"❌ 导入失败: {exc}")
logger.exception("导入失败")
return
print_user_success("✅ 导入完成")
print_user_info(f" 📁 案例ID: {result.case_id}")
print_user_info(f" 📂 目标路径: {result.target_path}")
if result.language:
print_user_info(f" 💻 语言: {result.language}")
if result.metadata_files:
print_user_info(f" 🗂️ 元数据: {', '.join(result.metadata_files)}")
if not skip_codeql:
if result.codeql_created:
print_user_success(" 🧱 CodeQL 数据库创建成功")
else:
print_user_warning(" ⚠️ CodeQL 数据库未创建")
if result.codeql_error:
print_user_error(f" 原因: {result.codeql_error}")
if result.build_command:
print_user_info(f" ⚙️ 实际构建命令: {result.build_command}")
if result.build_workdir:
print_user_info(f" 📂 构建工作目录: {result.build_workdir}")
print_user_info("📣 现在可以使用 --case 运行分析或调用 API 继续操作")
def _detect_case_directory_input(case_value: str) -> Optional[Path]:
"""
判断 --case 参数是否提供了目录路径(而非案例ID)。
仅在字符串包含路径分隔符或为绝对路径且目录存在时返回 Path。
"""
if not case_value:
return None
contains_path_sep = ("\\" in case_value) or ("/" in case_value)
candidate = Path(case_value).expanduser()
if not candidate.exists() or not candidate.is_dir():
return None
if candidate.is_absolute() or contains_path_sep:
return candidate.resolve()
return None
def _auto_import_case_directory(case_dir: Path) -> ProjectImportResult:
"""
将外部目录自动导入到 projects/ 并返回结果。
默认启用覆盖与 CodeQL 自动建库。
"""
print_user_info(f"📦 检测到外部CVE目录: {case_dir}")
print_user_info("🔄 正在自动导入并创建CodeQL数据库...")
result = import_project(
source_path=str(case_dir),
overwrite=True,
create_codeql_db=True,
)
print_user_success(f"✅ 自动导入完成,案例ID: {result.case_id}")
if result.metadata_files:
print_user_info(f" 🗂️ 元数据: {', '.join(result.metadata_files)}")
if result.codeql_created:
print_user_success(" 🧱 CodeQL 数据库已创建")
else:
print_user_warning(" ⚠️ 未能自动创建 CodeQL 数据库,可稍后手动重试")
if result.codeql_error:
print_user_error(f" 原因: {result.codeql_error}")
return result
def parse_arguments() -> argparse.Namespace:
"""解析命令行参数"""
parser = argparse.ArgumentParser(
description="PureAutoCodeQL - 基于AI的自动化漏洞分析工具",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
使用示例:
%(prog)s --case CVE-2021-21985 # 分析已导入的案例
%(prog)s --case "C:\\Targets\\java\\CVE-2023-51444" # Java项目:自动导入+建库+分析
%(prog)s --case CVE-2021-21985 --stream # 显示AI思考过程
%(prog)s --import-project "C:\\Targets\\CVE-2023-51444" --import-language java # 仅导入Java项目
%(prog)s --md-file vulnerability.md # 从MD文件直接生成CodeQL
%(prog)s --md-file vulnerability.md --src-path /path/to/source # 从MD文件生成source点分析报告
%(prog)s --md-file vulnerability.md --provider deepseek # 指定模型提供商
%(prog)s --list # 列出所有可用案例
%(prog)s --validate CVE-2021-21985 # 验证案例有效性
"""
)
# 主要参数组
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument(
"--case",
type=str,
help="分析单个案例 (例如: CVE-2021-21985)"
)
group.add_argument(
"--md-file",
type=str,
metavar="FILE",
help="从指定的MD文件直接生成CodeQL查询 (例如: vulnerability.md)"
)
group.add_argument(
"--list",
action="store_true",
help="列出所有可用的案例"
)
group.add_argument(
"--validate",
type=str,
metavar="CASE_ID",
help="验证指定案例是否有效"
)
group.add_argument(
"--list-providers",
action="store_true",
help="列出所有可用的模型提供商及其状态"
)
group.add_argument(
"--list-models",
action="store_true",
help="列出硅基流动的所有可用模型"
)
group.add_argument(
"--import-project",
type=str,
metavar="PATH",
help="导入外部CVE目录 (例如: C:\\Targets\\CVE-2025-54381)"
)
# 可选参数
parser.add_argument(
"--refresh-intel",
action="store_true",
help="强制刷新情报数据(不使用缓存)"
)
parser.add_argument(
"--no-stream",
dest="stream",
action="store_false",
help="禁用AI思考过程显示"
)
parser.add_argument(
"--output",
type=str,
metavar="FILE",
help="指定输出文件名 (默认: output.md)"
)
parser.add_argument(
"--provider",
type=str,
metavar="PROVIDER",
help="指定模型提供商 (deepseek/siliconflow/zhipu/kimi/gemini 或 keys.toml 中定义的自定义提供商),覆盖环境变量 LLM_PROVIDER"
)
parser.add_argument(
"--model",
type=str,
metavar="MODEL",
help="指定模型名称(同时用于推理和对话),覆盖默认模型和环境变量"
)
parser.add_argument(
"--think-model",
type=str,
metavar="MODEL",
dest="think_model",
help="指定推理模型名称,覆盖默认模型和环境变量"
)
parser.add_argument(
"--chat-model",
type=str,
metavar="MODEL",
dest="chat_model",
help="指定对话模型名称,覆盖默认模型和环境变量"
)
parser.add_argument(
"--api-key",
type=str,
metavar="KEY",
dest="api_key",
help="指定API Key,覆盖环境变量"
)
parser.add_argument(
"--base-url",
type=str,
metavar="URL",
dest="base_url",
help="指定Base URL,覆盖环境变量(例如: https://api.siliconflow.cn/v1)"
)
parser.add_argument(
"--database-path",
type=str,
metavar="PATH",
dest="database_path",
help="指定CodeQL数据库路径(用于--md-file模式)"
)
parser.add_argument(
"--language",
type=str,
metavar="LANG",
dest="language",
default=None,
help="指定编程语言(默认: java 或自动检测)"
)
parser.add_argument(
"--src-path",
type=str,
metavar="PATH",
dest="src_path",
help="指定源代码路径(用于--md-file模式生成source点分析报告)"
)
parser.add_argument(
"--enable-error-tidy",
action="store_true",
help="启用错误整理功能(实验性)"
)
parser.add_argument(
"--enable-source-sink-fallback",
action="store_true",
help="启用 Source-Sink 回退查询(所有常规 CodeQL 重试失败后)",
)
parser.add_argument(
"--source-sink-only",
action="store_true",
help="仅使用 Source-Sink 回退代理生成不含中间路径的查询(测试模式,仅用于 --md-file)",
)
parser.add_argument(
"--enable-sink-source-verification",
action="store_true",
help="启用 Sink/Source 验证功能(实验性)",
)
parser.add_argument(
"--verification-retry-max",
type=int,
metavar="N",
dest="verification_retry_max",
default=3,
help="Sink/Source 验证失败时的最大重试次数(默认: 3)",
)
parser.add_argument(
"--verification-timeout",
type=int,
metavar="SECONDS",
dest="verification_timeout",
default=30,
help="单个验证查询的超时时间(秒,默认: 30)",
)
parser.add_argument(
"--import-case-id",
type=str,
dest="import_case_id",
help="搭配 --import-project 使用,自定义案例ID"
)
parser.add_argument(
"--import-overwrite",
action="store_true",
dest="import_overwrite",
help="搭配 --import-project 使用,若案例已存在则覆盖"
)
parser.add_argument(
"--import-language",
type=str,
dest="import_language",
help="搭配 --import-project 使用,指定语言:python/java/cpp(默认自动检测)"
)
parser.add_argument(
"--import-skip-codeql",
action="store_true",
dest="import_skip_codeql",
help="搭配 --import-project 使用,跳过 CodeQL 数据库创建"
)
parser.add_argument(
"--import-build-command",
type=str,
dest="import_build_command",
help="搭配 --import-project 使用,指定 C/C++ 构建命令(Java/Python项目不需要)"
)
parser.add_argument(
"--import-build-script",
type=str,
dest="import_build_script",
help="搭配 --import-project 使用,指定 C/C++ 构建脚本路径(Java/Python项目不需要)"
)
parser.add_argument(
"--import-build-dir",
type=str,
dest="import_build_workdir",
help="搭配 --import-project 使用,设置 C/C++ 构建命令工作目录(Java/Python项目不需要)"
)
parser.add_argument(
"--prev-ql-file",
type=str,
metavar="FILE",
dest="prev_ql_file",
help="指定上一次生成的 CodeQL 查询文件(用于测试 Source-Sink Fallback)"
)
parser.set_defaults(stream=True)
return parser.parse_args()
def list_providers() -> None:
"""列出所有可用的模型提供商"""
providers = list_available_providers()
print("\n" + "=" * 80)
print("📋 可用的模型提供商:")
print("=" * 80)
for provider in providers:
print(f"\n🔹 {provider['display_name']} ({provider['name']})")
print(f" 状态: {provider['status']}")
print(f" 推理模型: {provider['think_model']}")
print(f" 对话模型: {provider['chat_model']}")
print(f" Base URL: {provider['base_url']}")
if not provider['has_api_key']:
print(f" ⚠️ 需要设置 API Key 环境变量")
print("\n" + "=" * 80)
print("💡 使用方式:")
print(" python Analyze.py --case CVE-2021-21985 --provider deepseek")
print(" export LLM_PROVIDER=deepseek # 或通过环境变量设置")
print("=" * 80 + "\n")
async def main() -> None:
"""主函数"""
args = parse_arguments()
try:
if args.list_providers:
list_providers()
elif args.list_models:
list_siliconflow_models()
elif args.list:
list_available_cases()
elif args.validate:
await validate_case(args.validate)
elif args.import_project:
run_project_import(
source_path=args.import_project,
case_id=args.import_case_id,
overwrite=args.import_overwrite,
language=args.import_language,
skip_codeql=args.import_skip_codeql,
build_command=args.import_build_command,
build_script=args.import_build_script,
build_workdir=args.import_build_workdir,
)
elif args.case:
effective_case_id = args.case
auto_case_dir = _detect_case_directory_input(args.case)
if auto_case_dir:
try:
auto_import_result = _auto_import_case_directory(auto_case_dir)
effective_case_id = auto_import_result.case_id
except FileNotFoundError as exc:
print_user_error(f"❌ 自动导入失败: 输入目录不存在 - {exc}")
logger.error("自动导入失败: %s", exc)
return
except Exception as exc: # pylint: disable=broad-except
print_user_error(f"❌ 自动导入失败: {exc}")
logger.exception("自动导入失败")
return
print(f"🚀 PureAutoCodeQL 启动")
print(f"🎯 分析案例: {effective_case_id}")
print(f"💭 AI思考过程: {'开启' if args.stream else '关闭'}")
print(f"🔄 刷新情报: {'是' if args.refresh_intel else '否'}")
print(f"📄 输出文件: {args.output or 'output.md'}")
if args.provider:
print(f"🤖 模型提供商: {args.provider} (命令行指定)")
print("-" * 50)
# 如果指定了 --model,同时应用到 think_model 和 chat_model
think_model = args.think_model or args.model
chat_model = args.chat_model or args.model
await run_case_analysis(
case_id=effective_case_id,
refresh_intel=args.refresh_intel,
stream=args.stream,
output_file=args.output,
provider=args.provider,
think_model=think_model,
chat_model=chat_model,
api_key=args.api_key,
base_url=args.base_url,
enable_error_tidy=args.enable_error_tidy,
language=args.language,
enable_source_sink_fallback=args.enable_source_sink_fallback,
enable_sink_source_verification=args.enable_sink_source_verification,
verification_retry_max=args.verification_retry_max,
verification_timeout=args.verification_timeout,
)
elif args.md_file:
print(f"🚀 PureAutoCodeQL 启动")
print(f"📄 MD文件: {args.md_file}")
print(f"💭 AI思考过程: {'开启' if args.stream else '关闭'}")
# 检查是否同时指定了 --src-path,如果是则使用source分析模式
if args.src_path:
print(f"📁 源代码路径: {args.src_path}")
print(f"🔍 运行模式: Source点分析报告生成")
if args.language:
print(f"💻 编程语言: {args.language}")
if args.provider:
print(f"🤖 模型提供商: {args.provider} (命令行指定)")
if args.output:
print(f"📄 输出文件: {args.output}")
print("-" * 50)
# 如果指定了 --model,同时应用到 think_model 和 chat_model
think_model = args.think_model or args.model
chat_model = args.chat_model or args.model
await run_md_source_analysis(
md_file_path=args.md_file,
src_path=args.src_path,
stream=args.stream,
provider=args.provider,
think_model=think_model,
chat_model=chat_model,
api_key=args.api_key,