diff --git a/.github/workflows/run-script.yml b/.github/workflows/run-script.yml
index 365439a89a..d9211edd31 100644
--- a/.github/workflows/run-script.yml
+++ b/.github/workflows/run-script.yml
@@ -42,7 +42,7 @@ jobs:
run: pip install -r requirements.txt
- name: Run log script
- run: python query.py "${{ secrets.QUERY_STR }}" "${{ secrets.PASSPHRASE }}" "${{ secrets.COOKIES }}"
+ run: python query.py -q "${{ secrets.QUERY_STR }}" -p "${{ secrets.PASSPHRASE }}" -c "${{ secrets.COOKIES }}" -m "${{ secrets.MAIL }}" -r "${{ secrets.RECEIVER_LIST }}"
- name: Commit and push log
run: |
@@ -50,7 +50,7 @@ jobs:
git config --local user.email "action@github.com"
git config --local user.name "GitHub Action"
git add .
- git commit -m "Update logs"
- git push
+ git commit -m "Update logs" --amend
+ git push -f
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
\ No newline at end of file
diff --git a/.gitignore b/.gitignore
index 7792edfc6b..6688a4a78c 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,3 +1,5 @@
__pycache__
logs/
-migrate.py
\ No newline at end of file
+migrate.py
+.test.py
+test/
\ No newline at end of file
diff --git a/README.md b/README.md
index 6572bcbcef..182ab4317f 100644
--- a/README.md
+++ b/README.md
@@ -61,4 +61,18 @@ Dormitricity 是由 Python 构建的爬虫,运行在 GitHub Actions 上。它
```
- `PASSPHRASE`,按上述要求生成的随机字符串
+ - `MAIL`, 用于发送邮件通知的邮箱设置,格式为 `mail_address&mail_pass&smtp_host&force_notify`
+
+ - `mail_address`:发送邮件的邮箱地址
+ - `mail_pass`:发送邮件的邮箱密码
+ - `smtp_host`:SMTP 服务器地址,例如 `smtp.qq.com`
+ - `force_notify`:是否强制发送通知,值为 `1`、`true` 或 `yes` 时表示强制发送通知,否则不强制发送,一般不需要强制发送通知,仅用于测试。
+
+ - `RECEIVER_LIST`,用于指定接收通知的邮箱列表,格式为 `room_name,mail1&mail2;room_name2,mail1&mail2`
+ - `room_name`:宿舍标识符,与查询字符串中的宿舍标识符相同
+ - `mail1&mail2`:接收通知的邮箱地址列表,多个邮箱地址用 `&` 分隔
+ - 不同宿舍的配置用 `;` 分隔
+
+若不需要邮件通知功能,则可以不设置 `MAIL` 和 `RECEIVER_LIST`。
+
## 会因此被开盒吗?
diff --git a/notify.py b/notify.py
new file mode 100644
index 0000000000..33745fe3ec
--- /dev/null
+++ b/notify.py
@@ -0,0 +1,51 @@
+from email.mime.multipart import MIMEMultipart
+from email.mime.text import MIMEText
+from email.mime.image import MIMEImage
+from email.header import Header
+import smtplib
+
+def mail_notification(mail_config, subject, body, image_paths=None):
+ mail_host = mail_config["mail_host"]
+ mail_port = mail_config["mail_port"]
+ mail_user = mail_config["mail_user"]
+ mail_pass = mail_config["mail_pass"]
+ sender = mail_config["sender"]
+ receivers = mail_config["receivers"]
+
+ message = MIMEMultipart("related")
+ message["From"] = Header(sender)
+
+ if isinstance(receivers, list):
+ message['To'] = Header(",".join(receivers), "utf-8")
+ else:
+ message['To'] = Header(receivers, "utf-8")
+ message["Subject"] = Header(subject, "utf-8")
+
+ # Attach the HTML body and inline images
+
+ # Build HTML with
tags referencing Content-IDs
+ html_body = body
+ if image_paths is None:
+ image_paths = []
+ for idx, img_path in enumerate(image_paths):
+ cid = f"image{idx}"
+ # Replace placeholder in body with cid reference if needed
+ html_body = html_body.replace(f"{{img{idx}}}", f"cid:{cid}")
+ with open(img_path, "rb") as img_file:
+ img = MIMEImage(img_file.read())
+ img.add_header("Content-ID", f"<{cid}>")
+ img.add_header("Content-Disposition", "inline", filename=img_path)
+ message.attach(img)
+
+ # Attach the HTML body
+ message.attach(MIMEText(html_body, "html", "utf-8"))
+
+ try:
+ smtp_obj = smtplib.SMTP_SSL(mail_host, mail_port)
+ smtp_obj.login(mail_user, mail_pass)
+ smtp_obj.sendmail(sender, receivers, message.as_string())
+ smtp_obj.quit()
+ return True
+ except smtplib.SMTPException as e:
+ print(f"SMTP error occurred: {e}")
+ return False
diff --git a/plot.py b/plot.py
index d711b05161..05db047d7a 100644
--- a/plot.py
+++ b/plot.py
@@ -197,7 +197,7 @@ def plot_recharge(extended_history: list[tuple[float, dt.datetime, dt.datetime]]
def plot_exhaustion(
history_decharged: list[tuple[float, dt.datetime, dt.datetime]],
history_last: tuple[float, dt.datetime, dt.datetime],
-):
+) -> dt.datetime:
tlast = history_last[1]
for i, (v, tq, tr) in enumerate(history_decharged):
if tlast - tq < estimate_timedelta:
@@ -226,16 +226,6 @@ def plot_exhaustion(
if slope == 0.0 or abs(exhaustion_x) > ts_overflow:
print("low electricity usage")
return None
- exhaustion_x = history_last[1] + warning_timedelta
- exhaustion_y = slope * exhaustion_x.timestamp() + intercept
- plt.text(
- exhaustion_x,
- exhaustion_y + 10,
- f"no exhaustion\nuntil year 3000",
- fontsize=10,
- ha="center",
- )
- return None
print(f"slope={slope}, exhaustion={exhaustion_x}")
begin_x = timestamps[-1]
@@ -288,7 +278,7 @@ def plot_watts(history_decharged: list[tuple[float, dt.datetime, dt.datetime]]):
watts = diffs / [x.total_seconds() for x in widths] * 3.6e6
plt.bar(timestamps, watts, width=widths, align="edge", color="skyblue")
-def plot(cs: csv_storage):
+def plot(cs: csv_storage) -> dt.datetime:
# history = [
# (10.0, dt.datetime(2024, 8, 8, 0, 0, 0), dt.datetime(2024, 8, 8, 23, 59, 59)),
# (8.0, dt.datetime(2024, 8, 9, 0, 0, 0), dt.datetime(2024, 8, 9, 23, 59, 59)),
@@ -303,7 +293,7 @@ def plot(cs: csv_storage):
history = read_csv(cs)
history = filter_recent(history)
decharged, recharges = decharge(history)
- costs = get_cost(decharged)
+ get_cost(decharged)
plt.figure(1, figsize=(10, 6))
exhaust_time = plot_exhaustion(decharged, history[-1])
@@ -338,3 +328,5 @@ def plot(cs: csv_storage):
plt.savefig(f"{cs.filepath}/watts.png", format="png")
# delete figure
plt.clf()
+
+ return exhaust_time
diff --git a/query.py b/query.py
index bb6489587f..3dd88a9712 100644
--- a/query.py
+++ b/query.py
@@ -1,15 +1,27 @@
-import requests, os, json, sys
-from datetime import datetime, timedelta
+import json, sys, argparse
+from datetime import datetime
+from urllib.parse import parse_qs
+import requests
from storage import csv_storage
import plot
-from urllib.parse import parse_qs
+import notify
+
+mail_config = {
+ "mail_host": "smtp.exmail.qq.com",
+ "mail_port": 465,
+ "mail_user": "",
+ "mail_pass": "",
+ "sender": "",
+ "receivers": [""],
+ "mail_notify": False, # whether to send email notification
+ "force_notify": False, # whether to send email notification even if not low power
+}
class bad_query(Exception):
# subclass but acts exactly as the base class
pass
-
class verbose_dict(dict):
def __getitem__(self, key):
try:
@@ -17,7 +29,7 @@ def __getitem__(self, key):
if isinstance(res, dict) and not isinstance(res, verbose_dict):
return verbose_dict(res)
return res
- except KeyError:
+ except KeyError as exc:
available_keys: list[str] = list(self.keys())
available_keys = list( # filter out reserved keys
filter(
@@ -27,26 +39,28 @@ def __getitem__(self, key):
)
raise bad_query(
f"attribute '{key}' not found. available: {sorted(available_keys)}"
- )
-
+ ) from exc
def json_or_exit(res):
try:
return res.json()
- except Exception as e:
+ except ValueError as e:
print(f"error fetching json: {e}")
print(f"url: {res.url}")
print(f"responce: {res.content}")
exit(1)
-def do_query(query_str: str, passphrase: str, cookies: dict):
+def do_query(query_str: str, q_passphrase: str, q_cookies: dict):
# check if such room exists
query_name = query_str.split("@")
- if len(query_name) != 2:
- print("no room name provided")
+ query_str = ""
+ room_name = ""
+ if len(query_name) < 2:
+ print("unexpected query string format")
show_help_exit()
- query_str, room_name = query_name
+ if len(query_name) == 2:
+ query_str, room_name = query_name
if not room_name:
print("empty room name")
show_help_exit()
@@ -59,11 +73,11 @@ def do_query(query_str: str, passphrase: str, cookies: dict):
room_id = dormitory_info[campus][partment]["floors"][floor][room]
except bad_query as e:
print(f"bad query: {e}")
- exit(1)
+ sys.exit(1)
# query the api
- print(f"querying ...", end="", flush=True)
- responce = requests.post(
+ print("querying ...", end="", flush=True)
+ response = requests.post(
"https://app.bupt.edu.cn/buptdf/wap/default/search",
data={
"partmentId": dormitory_info[campus][partment]["id"],
@@ -71,47 +85,88 @@ def do_query(query_str: str, passphrase: str, cookies: dict):
"dromNumber": room_id,
"areaid": str(int(campus != "西土城") + 1),
},
- cookies=cookies,
+ cookies=q_cookies,
+ timeout=10,
)
- print(f" done")
+ print(" done")
- res: dict = json_or_exit(responce)
+ res: dict = json_or_exit(response)
data = res["d"]["data"]
remain = data["surplus"] + data["freeEnd"] # 剩余电量 + 剩余赠送电量
time = datetime.fromisoformat(data["time"])
# append query result to csv
- cs = csv_storage(room_name, passphrase)
+ cs = csv_storage(room_name, q_passphrase)
cs.append(f"{remain}, {time}, {datetime.now()}\n")
print(f"successfully saved to {cs.filename}")
- plot.plot(cs)
-
+ exhaust_time = plot.plot(cs)
+ time_diff = exhaust_time - datetime.now()
+ print(f"exhaust time: {exhaust_time} ({time_diff.days} days, {time_diff.seconds // 3600} hours, {(time_diff.seconds // 60) % 60} minutes)")
+ print(f"total seconds: {time_diff.total_seconds()}")
+
+ if ((time_diff.days < 1 and mail_config["mail_notify"]) or mail_config["force_notify"]):
+ mail_config["receivers"] = receiver_dict.get(room_name, [mail_config["sender"]])
+ ret = notify.mail_notification(
+ mail_config=mail_config,
+ subject=f"宿舍电量预警: {room_name}",
+ body=f"当前电量: {remain}度\n时间: {time}" + "

",
+ image_paths=[f"{cs.filepath}/recent.png", f"{cs.filepath}/watts.png"],
+ )
+ if ret is False:
+ raise RuntimeError("failed to send notification")
def show_help_exit():
- print("usage: query.py [,query_str2,...] ")
- print(
- "example: query.py 西土城.学五楼.3.5-312-节能蓝天@学五-312宿舍,沙河.沙河校区雁北园A楼.1层.A楼102@沙河A102宿舍 example_passphrase UUkey=xxx&eai-sess=yyy"
- )
- exit(1)
-
+ print("usage: dormitricity query -q 'campus.partment.floor.room@room_name' -p passphrase -c cookies [-m mail_address&mail_pass&smtp_host&force_notify] [-r room_name,mail1&mail2;room_name2,mail1&mail2]")
+ print("example: dormitricity query -q '西土城.学五楼.3.5-312-节能蓝天@学五-312宿舍,沙河.沙河校区雁北园A楼.1层.A楼102@沙河A102宿舍' -p 'your_passphrase' -c 'UUKey=value1&eai-sess=value2' -m 'mail_address&mail_pass&smtp_host&1' -r '学五-312宿舍,mail1&mail2;沙河A102宿舍,mail3'")
+ sys.exit(1)
# main logic
-if len(sys.argv) != 4:
- print("invalid arguments.")
- show_help_exit()
+parser = argparse.ArgumentParser(description="Dormitricity Query Tool")
+parser.add_argument("-q", "--query", type=str, required=True,
+ help="Query string in the format 'campus.partment.floor.room@room_name'")
+parser.add_argument("-p", "--passphrase", type=str, required=True,
+ help="Passphrase for the query")
+parser.add_argument("-c", "--cookies", type=str, required=True,
+ help="Cookies in URL-encoded format, e.g., 'UUKey=value1&eai-sess=value2'")
+parser.add_argument("-m", "--mail", type=str, nargs='?', default="",
+ help="Email address for notifications, " \
+ "e.g., 'mail_address&mail_pass&smtp_host&force_notify', (optional)")
+parser.add_argument("-r", "--receivers", type=str, nargs='?', default="",
+ help="Custom receivers for specific rooms "
+ "in the format 'room_name,mail1&mail2;room_name2,mail1&mail2' (optional)")
+args = parser.parse_args()
# load dormitory info
-print(f"loading dormitory info ...", end="", flush=True)
+print("loading dormitory info ...", end="", flush=True)
with open("dormitory_info.json", "rt", encoding="utf-8") as f:
dormitory_info: dict = verbose_dict(json.load(f))
-print(f" done")
+print(" done")
+
+
+passphrase = args.passphrase
+
+cookies = {k: v[0] for k, v in parse_qs(args.cookies).items()}
+
+mail_config["mail_user"] = args.mail.split("&")[0] if args.mail else ""
+mail_config["mail_pass"] = args.mail.split("&")[1] if args.mail else ""
+mail_config["mail_host"] = args.mail.split("&")[2] if args.mail else ""
+mail_config["sender"] = mail_config["mail_user"]
+mail_config["mail_notify"] = True if args.mail else False
+mail_config["force_notify"] = args.mail.split("&")[3] in ["1", "true", "yes"] if args.mail else False
+print(f" FORCED NOTIFY: {mail_config['force_notify']}")
-passphrase = sys.argv[2]
+receiver_dict = {}
+if args.receivers:
+ # room_name1,mail1&mail2;room_name2,mail1&mail2
+ receiver_list = args.receivers.split(";")
+ for name_and_mails in receiver_list:
+ name, mails = name_and_mails.split(",", 1)
+ mails = mails.split("&")
+ receiver_dict[name] = mails
-cookies = {k: v[0] for k, v in parse_qs(sys.argv[3]).items()}
-for qs in sys.argv[1].split(","):
+for qs in args.query.split(","):
do_query(qs, passphrase, cookies)