Skip to content
165 changes: 80 additions & 85 deletions compass/todolist.md
Original file line number Diff line number Diff line change
@@ -1,113 +1,108 @@
# ReChat-sender 开发规划与任务清单
# ReChat-provider 开发规划与任务清单

> 最后更新: 2026-04-26
> 最后更新: 2026-05-01

---

## 一、已完成

| # | 任务 | 涉及文件 | 日期 |
|:--|------|---------|:---:|
| ✅ | 修复 REPO 未初始化时 `unwrap()` panic → 安全错误处理 | `src/api/endpoints/messages.rs`, `src/main.rs` | 04-26 |
| ✅ | 修复 `duration_since().unwrap()` → `unwrap_or_default()` + 日志 | `src/api/endpoints/messages.rs`, `src/core/message.rs` | 04-26 |
| ✅ | `Config::load()` 实际调用 — 新增 `--config` CLI 参数 | `src/main.rs` | 04-26 |
| ✅ | `workers` 从配置动态读取(不再硬编码为 1) | `src/main.rs` | 04-26 |
| ✅ | 清理无意义的 `--server` 参数 | `src/main.rs` | 04-26 |
| ✅ | 时间戳以 `i64` 数值存储(不再转字符串) | `src/core/message.rs` | 04-26 |
| ✅ | 移除 `cargo-tarpaulin` dev-dependency | `Cargo.toml` | 04-26 |
| ✅ | 无效枚举值错误类型从 `InvalidColumnType` → `InvalidQuery` | `src/core/message.rs` | 04-26 |
| ✅ | 数据库层时间戳合理性验证(零值/负值检查 + 日志) | `src/core/message.rs` | 04-26 |
| ✅ | API 层 `system_time_to_secs()` 安全转换 + 日志 | `src/api/endpoints/messages.rs` | 04-26 |
| ✅ | 合并远程 `adapter + plugin` 模块到 `main.rs` | `src/main.rs` | 04-26 |
## 一、已完成 (近期)

| 日期 | 任务 | 说明 |
|:---:|------|------|
| 04-30 | Phase 0-4: OneBot v11 适配器 | protocol / WS 端点 / Adapter trait 全部完成 |
| 04-30 | Web UI 重构 | Vanilla JS SPA,侧边栏布局,亮/暗/Auto 三模式主题 |
| 04-30 | 路由合并修复 | 多个 `web::scope("")` 冲突 → `configure()` |
| 04-30 | Token 认证系统 | 启动生成 UUID → 登录遮罩 → localStorage → API/WS 携带 |
| 04-30 | CI 格式修复 × 3 | import order / 行宽 / 链式调用换行 |
| 05-01 | 集成测试补全 | 39 个新测试: broadcaster(11) + protocol(21) + auth(7) |

---

## 二、待修复(代码质量)
## 二、下一步开发计划

| # | 任务 | 优先级 | 说明 |
|:--|------|:---:|------|
| 🔧 | `adapter.rs` — `AdapterStats::Default` 改用 `#[derive(Default)]` | 🟢 | Clippy suggestion,减少手写代码 |
| 🔧 | `adapter.rs` — 为 `AdapterManager` 实现 `Default` trait | 🟢 | Clippy suggestion,遵循 Rust 惯例 |
| 🔧 | `plugin.rs` — `PluginStats::Default` 改用 `#[derive(Default)]` | 🟢 | 同上 |
| 🔧 | `plugin.rs` — 为 `PluginManager` 实现 `Default` trait | 🟢 | 同上 |
| 🔧 | `MessageQueue` 未实例化,`services/queue.rs` 为死代码 | 🟡 | 需要 Redis 环境;需决定是集成还是移除此模块 |
### 🔴 Phase A: 消息发送调度核心 (P0)

---
> 当前 `/ws/client` 的 `send_message` 只调用 `send_to_adapter` 但不处理结果/重试。需要后台调度器。

## 三、待开发(功能特性)
| # | 任务 | 优先级 | 文件 | 说明 |
|:--|------|:---:|------|------|
| A1 | 创建 `core/dispatcher.rs` | 🔴 | 新建 | 后台 tokio task 轮询 Pending 消息 → 匹配 Adapter 发送 → 更新状态 |
| A2 | `SenderConfig` 接入调度器 | 🔴 | dispatcher.rs | max_retries / retry_interval / batch_size / concurrency |
| A3 | `MessageRepository::update_status()` | 🔴 | message.rs | 发送成功后更新为 Sent,失败后 retry_count++ |
| A4 | 调度器生命周期管理 | 🔴 | main.rs + dispatcher.rs | start / graceful shutdown |
| A5 | 集成测试 | 🔴 | tests/dispatcher.rs | 模拟发送成功/失败/超时/最大重试 |

### 🔴 当前进行中:OneBot v11 Adapter(NapCat/QQ 接入)
### 🟡 Phase B: API 补全

> 详细计划 → [compass/onebot_adapter_plan.md](compass/onebot_adapter_plan.md)
| # | 任务 | 优先级 | 说明 |
|:--|------|:---:|------|
| B1 | `GET /api/messages` 列表分页 | 🟡 | 支持 `?limit=&offset=&status=` 筛选 |
| B2 | `PATCH /api/messages/{id}` 取消/重发 | 🟡 | `{"status": "Failed"}` 取消待发送消息 |
| B3 | `GET /api/stats` 统计概览 | 🟡 | 今日消息数/各状态计数/平台分布 |
| B4 | `DELETE /api/messages/{id}` | 🟢 | 删除消息记录 |

| Phase | 优先级 | 任务 | 状态 |
|:---:|:---:|------|:---:|
| P0 | 🔴 | 消息广播中枢 + 客户端 WS — `core/broadcaster.rs` + `api/endpoints/ws_client.rs` | ✅ |
| P1 | 🔴 | `src/adapters/onebot/protocol.rs` — OneBot 协议数据模型 | ✅ |
| P2 | 🔴 | `src/adapters/onebot/ws.rs` — 平台 WS 端点 `/onebot/v11/ws` | ✅ |
| P3 | 🔴 | `src/adapters/onebot/adapter.rs` — Adapter trait 实现 | ✅ |
| P4 | 🔴 | 集成:lib.rs / api/mod.rs / main.rs 注册路由与初始化 | ✅ |
| P5 | 🟡 | 测试 + 验证(cargo check / clippy / test) | ⏳ |

> ✅ **Phase 3-4 已完成** — OneBotAdapter 实现 Adapter trait (send_message / name / start / stop),main.rs 注册适配器到 AdapterManager。剩余 P5 测试验证。

### 其他待开发

| # | 功能 | 优先级 | 涉及模块 | 说明 |
|:--|------|:---:|---------|------|
| 📦 | Redis 消息队列集成 | 🟡 | `services/queue.rs`, `main.rs` | 将 `MessageQueue` 实例化并接入消息发送流程 |
| 📦 | 更多 Adapter 实现 | 🟡 | `adapters/` | WebSocket、HTTP、Telegram、Discord 等 |
| 📦 | Plugin 实现(消息处理插件) | 🟡 | `core/plugin.rs` | 实现具体的 `Plugin` trait:加密插件、格式化插件等 |
| 📦 | 发送重试与调度逻辑 | 🟡 | `core/sender.rs` (新建) | 利用 `SenderConfig`(max_retries/retry_interval/batch_size) 实现后台发送调度 |
| 📦 | 消息状态流转 API | 🔴 | `api/endpoints/messages.rs` | `GET /api/messages` 列表查询、`PATCH` 取消/重发 |
| 📦 | Adapter 状态 API | 🟢 | `api/endpoints/adapters.rs` (新建) | 查看 adapter 状态、启停 adapter |
| 📦 | Plugin 管理 API | 🟢 | `api/endpoints/plugins.rs` (新建) | 查看/启用/禁用 plugin |
| 📦 | Web 界面完善 | 🟢 | `web/mod.rs`, `templates/` | 使用 Tera 模板重写 HTML 页面,添加实时状态 |
| 📦 | 配置验证 | 🟢 | `core/config.rs` | `Config` 加载后验证字段合法性(端口范围、路径等) |
### 🟡 Phase C: 消息流完善

---
| # | 任务 | 优先级 | 说明 |
|:--|------|:---:|------|
| C1 | OneBot 图片/文件消息类型扩展 | 🟡 | 当前只处理 Text 类型 → 扩展 Image/File MessageType |
| C2 | 入站消息 sender 信息补全 | 🟡 | group/private conversation_name 自动生成 |
| C3 | 多平台支持框架就绪 | 🟢 | adapter 注册机制验证 (QQ 已有,可加 mock 测试微信) |

## 四、待开发(测试与文档)
### 🟢 Phase D: 运维与部署

| # | 任务 | 优先级 | 说明 |
|:--|------|:---:|------|
| 📝 | API 端点集成测试 | 🟡 | 启动测试服务器,测试 `POST /api/messages`、`GET /api/messages/{id}` |
| 📝 | `MessageRepository` 完整测试 | 🟡 | 测试 `get_pending_messages`、重复 save 覆盖、无效 ID |
| 📝 | `MessageQueue` 测试(需要 Redis) | 🟢 | 测试 push/pop/len/is_empty |
| 📝 | 错误场景测试 | 🟡 | REPO 未初始化时 API 返回 500 而非 panic;无效 message type 返回 400 |

---
| D1 | Dockerfile | 🟢 | 多阶段构建,静态链接 musl |
| D2 | docker-compose.yml | 🟢 | ReChat + (可选 Redis) |
| D3 | 配置验证 | 🟢 | `Config` 加载后验证 port 范围、path 可写性 |
| D4 | 环境变量覆盖配置 | 🟢 | `RE_CHAT_*` 前缀覆盖任意字段 |

## 五、架构改进
### 🟢 Phase E: 架构优化

| # | 建议 | 优先级 | 说明 |
| # | 任务 | 优先级 | 说明 |
|:--|------|:---:|------|
| 🏗 | REPO `thread_local!` 改为 Actix `Data<T>` | 🟡 | 更符合 Actix 惯例,便于测试和注入依赖 |
| 🏗 | 提取发送器核心为独立 actor/service | 🟡 | 将消息发送、重试、状态管理封装为独立模块 |
| 🏗 | `rusqlite` 使用 WAL 模式 | 🟢 | 提升并发读取性能:`PRAGMA journal_mode=WAL;` |
| 🏗 | 配置支持环境变量覆盖 | 🟢 | `Config::load()` 后按 `RE_CHAT_*` 环境变量覆盖字段 |
| E1 | REPO `thread_local!` `Data<T>` | 🟢 | 更符合 Actix 惯例 |
| E2 | SQLite WAL 模式 | 🟢 | `PRAGMA journal_mode=WAL` 提升并发读 |
| E3 | adapter_manager 列表 API | 🟢 | `GET /api/adapters` 返回 JSON |
| E4 | Web 仪表盘实时数据 | 🟢 | 统计卡片从 WS 获取 live 数据 |

---

## 六、依赖与发布
## 三、优先级总结

| # | 任务 | 优先级 | 说明 |
|:--|------|:---:|------|
| 📦 | 关注 `redis 0.24.0` / `net2` 未来兼容性 | 🟢 | Rust 未来版本将拒绝,需等待上游更新 |
| 📦 | Windows 服务化部署脚本 | 🟢 | `scripts/` 下添加安装/卸载 Windows Service 脚本 |
| 📦 | 编写 `DEPLOY.md` | 🟢 | 补充部署步骤、配置文件示例、CLI 使用说明 |
| 优先级 | Phase | 概述 | 预计涉及文件数 |
|:---:|:---:|------|:---:|
| 🔴 P0 | A | **消息发送调度器** — 这是让 OneBot 真正能"发消息到 QQ"的最后一块拼图 | 4 文件 |
| 🟡 P1 | B | **API 补全** — 消息列表/状态变更,前端才能展示历史消息 | 3 文件 |
| 🟡 P1 | C | **消息流完善** — 图片/文件/多平台 | 3 文件 |
| 🟢 P2 | D | **运维部署** — Docker / 配置验证 | 3 文件 |
| 🟢 P3 | E | **架构优化** — REPO / WAL / API | 4 文件 |

---

## 优先级图例

| 标记 | 含义 |
|:---:|------|
| 🔴 | 高优先级 — 阻塞性、影响核心功能 |
| 🟡 | 中优先级 — 重要但非阻塞 |
| 🟢 | 低优先级 — 改进优化,按需处理 |
| ✅ | 已完成 |
| 🔧 | 待修复 |
| 📦 | 待开发 |
| 📝 | 待测试 |
| 🏗 | 架构建议 |
## 四、关键里程碑

```
✅ 已完成 — 2026-05-01
├── OneBot v11 适配器 (入站/出站)
├── Web UI SPA (仪表盘/消息流/发送/平台)
├── Access Token 认证
├── 44 个集成测试 (broadcaster 11 + protocol 21 + auth 7 + repo 5)
└── CI 格式合规

⏳ Phase A: 发送调度器 → 目标 2026-05-02
├── [ ] dispatcher 后台任务
├── [ ] 状态流转 Pending→Sending→Sent/Failed
├── [ ] 重试逻辑 (max_retries + retry_interval)
└── [ ] 端到端验证: Web UI 发送 → NapCat → QQ 群

⏳ Phase B: API 补全 → 目标 2026-05-03
├── [ ] GET /api/messages 分页
├── [ ] PATCH /api/messages/{id}
└── [ ] GET /api/stats

⏳ Phase C-E: 完善 + 部署 → 目标 2026-05-05
├── [ ] 图片/文件类型
├── [ ] Docker 部署
└── [ ] 架构优化
```
6 changes: 2 additions & 4 deletions src/core/auth.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use actix_web::web;

pub fn validate_token(token: &web::Data<String>, query: &str) -> bool {
token.get_ref() == query
pub fn validate_token(expected: &str, provided: &str) -> bool {
expected == provided
}

pub fn extract_token_from_query(query: &str) -> Option<&str> {
Expand Down
174 changes: 174 additions & 0 deletions src/core/dispatcher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;

use crate::core::adapter::AdapterManager;
use crate::core::message::MessageRepository;
use crate::models::message::MessageStatus;

pub struct MessageDispatcher {
adapter_manager: Arc<AdapterManager>,
db_path: String,
max_retries: u32,
retry_interval: u64,
batch_size: usize,
concurrency: usize,
shutdown: Arc<AtomicBool>,
}

impl MessageDispatcher {
pub fn new(
adapter_manager: Arc<AdapterManager>,
db_path: String,
max_retries: u32,
retry_interval: u64,
batch_size: usize,
concurrency: usize,
) -> Self {
Self {
adapter_manager,
db_path,
max_retries,
retry_interval,
batch_size,
concurrency,
shutdown: Arc::new(AtomicBool::new(false)),
}
}

pub fn shutdown_handle(&self) -> Arc<AtomicBool> {
self.shutdown.clone()
}

pub fn start(self) {
let retry_interval_secs = self.retry_interval;
let batch_size = self.batch_size;
let max_retries = self.max_retries;
let concurrency = self.concurrency;
let adapter_manager = self.adapter_manager;
let db_path = self.db_path;
let shutdown = self.shutdown;

tokio::spawn(async move {
let poll_repo = match MessageRepository::new(&db_path) {
Ok(r) => r,
Err(e) => {
tracing::error!(error = %e, db_path = %db_path, "Dispatcher failed to open database");
return;
}
};

let sem = Arc::new(tokio::sync::Semaphore::new(concurrency));

loop {
if shutdown.load(Ordering::Relaxed) {
tracing::info!("Message dispatcher shutting down gracefully");
sem.close();
break;
}

let pending = poll_repo
.get_pending_messages(batch_size)
.unwrap_or_default();

if pending.is_empty() {
tokio::time::sleep(tokio::time::Duration::from_secs(retry_interval_secs)).await;
continue;
}

for message in pending {
let sem = sem.clone();
let am = adapter_manager.clone();
let db_path = db_path.clone();
let shutdown = shutdown.clone();

tokio::spawn(async move {
let _permit = sem.acquire().await;

let repo = match MessageRepository::new(&db_path) {
Ok(r) => r,
Err(e) => {
tracing::error!(
error = %e,
"Failed to open DB in dispatcher worker"
);
return;
}
};

let mut sent = false;

for attempt in 0..=max_retries {
if shutdown.load(Ordering::Relaxed) {
return;
}

if attempt > 0 {
tracing::info!(
message_id = %message.id,
attempt = attempt,
max_retries = max_retries,
"Retrying message send"
);
tokio::time::sleep(tokio::time::Duration::from_secs(
retry_interval_secs,
))
.await;
}

match am.send_to_adapter(&message.recipient, &message) {
Ok(()) => {
tracing::info!(
message_id = %message.id,
"Message sent successfully"
);
if let Err(e) = repo
.update_message_status(&message.id, &MessageStatus::Sent)
{
tracing::error!(
error = %e,
message_id = %message.id,
"Failed to update message status to Sent"
);
}
sent = true;
break;
}
Err(e) => {
tracing::warn!(
message_id = %message.id,
attempt = attempt,
error = %e,
"Failed to send message"
);
if let Err(e) = repo.increment_retry(&message.id) {
tracing::error!(
error = %e,
"Failed to increment retry count"
);
}
}
}
}

if !sent {
tracing::warn!(
message_id = %message.id,
"Message failed after all retries, marking as Failed"
);
if let Err(e) =
repo.update_message_status(&message.id, &MessageStatus::Failed)
{
tracing::error!(
error = %e,
message_id = %message.id,
"Failed to update message status to Failed"
);
}
}
});
}
}
});
}
}
Loading
Loading