-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
270 lines (231 loc) · 10.7 KB
/
Copy pathmain.py
File metadata and controls
270 lines (231 loc) · 10.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
#!/usr/bin/env python3
"""
GitHub Fork/Star 通知机器人
轮询模式,检测爸爸仓库的fork和star变化并发送Telegram通知
"""
import time
import json
import logging
import fcntl
import os
from datetime import datetime, timezone
from pathlib import Path
from github_client import GitHubClient, GitHubAPIError
from telegram_sender import TelegramSender
from config import POLL_INTERVAL, STATE_FILE, TARGET_USERS, LOG_LEVEL
# 配置日志(使用 config.py 里设置的真实 LOG_LEVEL)
logging.basicConfig(
level=getattr(logging, LOG_LEVEL.upper(), logging.INFO),
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def load_state() -> dict:
"""加载状态文件"""
if STATE_FILE.exists():
try:
with open(STATE_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
except (json.JSONDecodeError, IOError) as e:
logger.warning(f"状态文件读取失败: {e},将创建新状态")
return {
"last_fork_check": {},
"last_star_check": {},
"last_run": None,
"last_successful_poll": None
}
def save_state(state: dict) -> None:
"""
保存状态文件,带文件锁防止并发写入损坏。
先写临时文件再 rename,保证原子性。
"""
tmp_path = STATE_FILE.with_suffix('.tmp')
try:
with open(tmp_path, 'w', encoding='utf-8') as f:
fcntl.flock(f.fileno(), fcntl.LOCK_EX) # 独占锁
json.dump(state, f, ensure_ascii=False, indent=2)
f.flush()
os.fsync(f.fileno()) # 确保落盘
# 锁还在持有着,在 with 块内完成 rename,锁释放时才close
tmp_path.rename(STATE_FILE)
except OSError as e:
# rename 失败(如跨文件系统)是致命错误,直接抛
logger.critical(f"状态文件保存失败(原子替换出错): {e}")
try:
if tmp_path.exists():
tmp_path.unlink()
except Exception:
pass
raise
except Exception as e:
logger.error(f"保存状态文件失败: {e}")
try:
if tmp_path.exists():
tmp_path.unlink()
except Exception:
pass
raise # 让上层知道保存失败了
def format_time(dt_str: str) -> str:
"""格式化时间为易读格式,兼容多种 ISO 8601 变体"""
try:
normalized = dt_str.replace('Z', '+00:00')
dt = datetime.fromisoformat(normalized)
return dt.strftime('%Y-%m-%d %H:%M:%S')
except ValueError:
logger.warning(f"时间格式无法解析(将原样返回): {dt_str}")
return dt_str
def main() -> None:
"""主循环"""
logger.info("GitHub Fork/Star 通知机器人启动")
logger.info(f"监控目标用户: {TARGET_USERS}")
logger.info(f"轮询间隔: {POLL_INTERVAL}秒")
github = GitHubClient()
telegram = TelegramSender()
state = load_state()
consecutive_save_errors = 0
max_consecutive_save_errors = 5
while True:
try:
logger.info("=" * 50)
logger.info("开始新一轮检查...")
all_events = []
round_has_error = False
# 本轮已成功通知的repo,用于推进star状态
notified_star_repos = {} # {username: {repo_name: latest_starred_at}}
for username in TARGET_USERS:
logger.info(f"检查用户: {username}")
repos = github.get_user_repos(username)
if not repos:
logger.warning(f"用户 {username} 没有找到仓库或API调用失败")
continue
logger.info(f"找到 {len(repos)} 个仓库")
# 提前初始化用户名状态(确保 dict 结构存在)
if username not in state['last_fork_check']:
state['last_fork_check'][username] = {}
if username not in state['last_star_check']:
state['last_star_check'][username] = {}
if username not in notified_star_repos:
notified_star_repos[username] = {}
for repo in repos:
full_name = repo['full_name']
repo_name = repo['name']
# ---- 检测新的 fork ----
try:
new_forks, latest_fork_at, fork_err = github.check_new_forks(
full_name, state['last_fork_check'].get(username, {})
)
if fork_err:
round_has_error = True
logger.error(f"Fork检查API异常({full_name}): {fork_err}")
else:
# 更新 fork 状态(fork 不可逆,直接推进)
if latest_fork_at:
state['last_fork_check'][username][repo_name] = latest_fork_at
for fork in new_forks:
all_events.append({
"type": "fork",
"repo": full_name,
"user": fork['owner']['login'],
"user_url": fork['owner']['html_url'],
"time": fork['created_at'],
"fork_owner": username,
"fork_repo": repo_name,
})
logger.info(f"新fork: {full_name} by {fork['owner']['login']}")
except GitHubAPIError as e:
round_has_error = True
logger.error(f"Fork检查API异常({full_name}): {e}")
# ---- 检测新的 star ----
try:
new_stars, star_err = github.check_new_stars(
full_name, state['last_star_check'].get(username, {})
)
if star_err:
round_has_error = True
logger.error(f"Star检查API异常({full_name}): {star_err}")
for star in new_stars:
starred_at = star.get('starred_at')
if not starred_at:
continue
all_events.append({
"type": "star",
"repo": full_name,
"user": star['user']['login'],
"user_url": star['user']['html_url'],
"time": starred_at,
"star_owner": username,
"star_repo": repo_name,
"starred_at": starred_at,
})
logger.info(f"新star: {full_name} by {star['user']['login']}")
except GitHubAPIError as e:
round_has_error = True
logger.error(f"Star检查API异常({full_name}): {e}")
# ---- 发送通知 ----
# fork 通知:发送即成功(fork 不可逆,状态已推进)
# star 通知:成功后才知道要推进哪个 repo 的状态
for event in all_events:
message = format_event_message(event)
if telegram.send(message):
# 发送成功:推进对应 repo 的 star 状态
if event['type'] == 'star':
notified_star_repos[event['star_owner']][event['star_repo']] = event['starred_at']
else:
logger.error(f"通知发送失败: {event['type']} - {event['repo']} by {event['user']}")
# ---- 所有通知发完后:统一推进 star 状态 ----
for username in notified_star_repos:
for repo_name, latest in notified_star_repos[username].items():
state['last_star_check'][username][repo_name] = latest
# ---- 保存状态 ----
if all_events:
logger.info(f"本轮共 {len(all_events)} 个事件,已处理完毕")
else:
if round_has_error:
logger.warning("本轮检查遇到API错误,但没有发现新事件")
else:
logger.info("没有发现新事件")
try:
state['last_run'] = datetime.now(timezone.utc).isoformat()
state['last_successful_poll'] = state['last_run']
save_state(state)
consecutive_save_errors = 0
except Exception as e:
logger.error(f"保存状态失败: {e}")
consecutive_save_errors = min(consecutive_save_errors + 1, max_consecutive_save_errors * 2)
if consecutive_save_errors >= max_consecutive_save_errors:
logger.critical(f"连续 {consecutive_save_errors} 次保存状态失败,可能磁盘有问题!")
logger.info(f"本轮检查完成,等待 {POLL_INTERVAL} 秒...")
except GitHubAPIError as e:
logger.error(f"GitHub API错误(外层捕获): {e}", exc_info=True)
consecutive_save_errors = min(consecutive_save_errors + 1, max_consecutive_save_errors * 2)
except KeyboardInterrupt:
logger.info("收到中断信号,正常退出")
break
except Exception as e:
logger.error(f"发生未知错误: {e}", exc_info=True)
consecutive_save_errors = min(consecutive_save_errors + 1, max_consecutive_save_errors * 2)
time.sleep(POLL_INTERVAL)
def format_event_message(event: dict) -> str:
"""格式化事件消息"""
event_type = event['type']
repo = event['repo']
user = event['user']
user_url = event['user_url']
time_str = format_time(event['time'])
# GitHub 用户名允许 alphanumeric, - 和 _,但 Telegram MarkdownV1 里
# _ 和 * 是特殊字符,直接放会干扰解析,加反斜杠转义
def md_escape(s: str) -> str:
return s.replace('_', r'\_').replace('*', r'\*').replace('`', r'\`')
if event_type == 'fork':
return f"""🍴 新Fork通知
📦 仓库: {md_escape(repo)}
👤 Fork者: [{md_escape(user)}]({user_url})
🕐 时间: {time_str}
🔗 链接: https://github.com/{repo}/network/members"""
else:
return f"""⭐ 新Star通知
📦 仓库: {md_escape(repo)}
👤 收藏者: [{md_escape(user)}]({user_url})
🕐 时间: {time_str}
🔗 链接: https://github.com/{repo}/stargazers"""
if __name__ == "__main__":
main()