-
Notifications
You must be signed in to change notification settings - Fork 134
Expand file tree
/
Copy pathproject_utils.py
More file actions
142 lines (105 loc) · 3.75 KB
/
project_utils.py
File metadata and controls
142 lines (105 loc) · 3.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
from __future__ import annotations
import os
import shutil
import subprocess
import sys
from importlib.util import find_spec
from pathlib import Path
from typing import Iterable, Sequence
def repo_root(start: str | Path | None = None) -> Path:
path = Path(start or __file__).resolve()
if path.is_file():
path = path.parent
for current in [path, *path.parents]:
if (current / ".git").exists():
return current
return Path(__file__).resolve().parent
ROOT = repo_root(__file__)
def root_path(*parts: str) -> Path:
return ROOT.joinpath(*parts)
def env_path(name: str, default: str | Path | None = None) -> Path | None:
value = os.environ.get(name)
if value:
return Path(value).expanduser().resolve()
if default is None:
return None
return Path(default).expanduser().resolve()
def require_path(path: Path, description: str) -> Path:
if not path.exists():
raise FileNotFoundError(f"{description}不存在: {path}")
return path
def ensure_dir(path: Path) -> Path:
path.mkdir(parents=True, exist_ok=True)
return path
def reset_dir(path: Path) -> Path:
if path.exists():
shutil.rmtree(path)
path.mkdir(parents=True, exist_ok=True)
return path
def remove_if_exists(path: Path) -> None:
if path.exists():
path.unlink()
def find_ltp_data_dir() -> Path:
candidates = [
env_path("LTP_DATA_DIR"),
root_path("ltp_data_v3.4.0"),
root_path("models", "ltp_data_v3.4.0"),
root_path("..", "ltp_data_v3.4.0"),
]
for candidate in candidates:
if candidate and (candidate / "cws.model").exists():
return candidate
raise FileNotFoundError(
"未找到 LTP 模型目录。请设置环境变量 LTP_DATA_DIR,并确保目录下包含 cws.model / pos.model / ner.model。"
)
def find_executable(env_name: str, names: Sequence[str], extra_candidates: Iterable[Path] = ()) -> Path | None:
configured = env_path(env_name)
if configured and configured.exists():
return configured
for name in names:
located = shutil.which(name)
if located:
return Path(located)
for candidate in extra_candidates:
if candidate.exists():
return candidate
return None
def find_crf_test() -> Path | None:
extra_candidates: list[Path] = []
if os.name == "nt":
extra_candidates.append(root_path("03基于CRF的事件要素抽取", "实验", "05", "crf_test.exe"))
return find_executable("CRF_TEST_BIN", ["crf_test"], extra_candidates)
def run_command(
cmd: Sequence[str],
cwd: Path | None = None,
description: str | None = None,
capture_output: bool = False,
) -> subprocess.CompletedProcess[str]:
try:
return subprocess.run(
list(cmd),
cwd=str(cwd) if cwd else None,
check=True,
text=True,
capture_output=capture_output,
)
except FileNotFoundError as exc:
detail = description or cmd[0]
raise RuntimeError(f"未找到可执行命令: {detail}") from exc
except subprocess.CalledProcessError as exc:
detail = description or " ".join(cmd)
stderr = exc.stderr.strip() if exc.stderr else ""
message = f"命令执行失败: {detail}"
if stderr:
message += f"\n{stderr}"
raise RuntimeError(message) from exc
def print_step(message: str) -> None:
print(f"[INFO] {message}")
def print_warning(message: str) -> None:
print(f"[WARN] {message}")
def print_success(message: str) -> None:
print(f"[OK] {message}")
def python_executable() -> str:
return sys.executable
def python_module_available(module_name: str) -> bool:
return find_spec(module_name) is not None