diff --git a/README.md b/README.md index dd08f4d..7f62d73 100644 --- a/README.md +++ b/README.md @@ -26,13 +26,15 @@ | **多平台消息接入** | ✅ | OneBot v11 (NapCat/QQ),可扩展至微信/Telegram/Discord | | **实时消息推送** | ✅ | 双 WebSocket 通道:平台入站 `/onebot/v11/ws` + 客户端出站 `/ws/client` | | **消息广播中枢** | ✅ | `MessageBroadcaster` 按平台/会话订阅,精准推送 | -| **消息生命周期管理** | ✅ | Pending → Sending → Sent → Failed 全状态流转 | +| **消息生命周期管理** | ✅ | Pending → Sending → Sent → Failed → Canceled 全状态流转 | +| **消息类型识别** | ✅ | Text / Image / File / Video / Audio 五大消息类型 | | **Web 管理界面** | ✅ | Vanilla JS SPA:仪表盘 / 消息流 / 发送消息 / 平台状态 | | **暗色模式** | ✅ | 亮色/暗色/Auto 三模式,localStorage 持久化 | -| **RESTful API** | ✅ | 创建/查询消息,健康检查([完整文档](docs/api/README.md)) | +| **RESTful API** | ✅ | 创建/查询/列表/更新状态/删除消息/统计概览 + 健康检查([完整文档](docs/api/README.md)) | | **CLI 工具** | ✅ | 命令行发送消息 / 查询状态 | | **Adapter 架构** | ✅ | 可插拔的适配器系统,方便接入新平台 | | **Plugin 架构** | ✅ | 消息处理插件系统(过滤/转换/加密) | +| **消息发送调度器** | ✅ | 后台 tokio task 轮询 + 重试 + 并发控制 + 原子认领 | | **SQLite 持久化** | ✅ | 消息数据本地存储,零配置 | | **结构化日志** | ✅ | tracing + 文件/双输出,文件创建失败自动降级 | @@ -51,15 +53,15 @@ ▼ ▼ ▼ ▼ MessageEvent OneBotAdapter Message (DB) MessageBroadcaster │ .send_message() │ (广播中枢) - │ │ │ ▲ - ▼ ▼ │ │ - OneBotAdapter ◀── Message (内部) ◀───────────┘ │ - .send_message() │ - │ │ - ▼ │ - send_group_msg / send_private_msg ──WS──→ NapCat │ - │ - ┌────────────────────────────────────────────────────────────────────────┘ + │ │ │ ▲ ▲ + ▼ ▼ │ │ │ + OneBotAdapter ◀── Message (内部) ◀── MessageDispatcher ◀── 轮询 Pending + .send_message() (调度器 + 重试) + │ + ▼ + send_group_msg / send_private_msg ──WS──→ NapCat + + ┌─────────────────────────────────────────────────────────────────────┘ │ ▼ 保存 DB ──→ broadcast ──→ 所有订阅该平台的 Web UI 实时收到 @@ -83,6 +85,7 @@ src/ │ ├── adapter.rs # Adapter trait + AdapterManager │ ├── broadcaster.rs # 消息广播中枢 (MessageBroadcaster) │ ├── config.rs # 配置管理 +│ ├── dispatcher.rs # 消息发送调度器 (轮询 + 重试 + 原子认领) │ ├── logging.rs # 日志初始化 │ ├── message.rs # 消息仓库 (SQLite) │ └── plugin.rs # Plugin trait + PluginManager @@ -152,11 +155,15 @@ WS 地址: ws://localhost:8080/onebot/v11/ws ### HTTP API -| 方法 | 路径 | 说明 | -| ------ | -------------------- | ---- | -| `POST` | `/api/messages` | 创建消息 | -| `GET` | `/api/messages/{id}` | 查询消息 | -| `GET` | `/api/health` | 健康检查 | +| 方法 | 路径 | 说明 | +| ------ | -------------------- | ------ | +| `POST` | `/api/messages` | 创建消息 | +| `GET` | `/api/messages` | 消息列表(支持 `?offset=&limit=&status=`) | +| `GET` | `/api/messages/{id}` | 查询单条消息 | +| `PATCH` | `/api/messages/{id}` | 更新消息状态 | +| `DELETE` | `/api/messages/{id}` | 删除消息 | +| `GET` | `/api/stats` | 统计概览 | +| `GET` | `/api/health` | 健康检查 | ### WebSocket 指令 @@ -182,7 +189,7 @@ WS 地址: ws://localhost:8080/onebot/v11/ws ## ⌨️ 命令行工具 ```bash -# 发送消息 +# 发送消息 (支持 text / image / file / video / audio) cargo run -- send -t text -r user1 -c "Hello" # 查询状态 @@ -195,7 +202,7 @@ cargo run -- status -i | 页面 | Hash | 功能 | | ---- | ------------ | ------------------- | -| 仪表盘 | `#dashboard` | 统计卡片 + 最近消息 | +| 仪表盘 | `#dashboard` | 6 状态统计卡片 + 最近消息(调用 `/api/stats`) | | 消息流 | `#messages` | WebSocket 实时推送 + 筛选 | | 发送消息 | `#send` | 表单 → 平台发送 | | 平台状态 | `#platforms` | Adapter 连接状态 | @@ -252,12 +259,10 @@ cargo fmt | 优先级 | 功能 | | :-: | ---------------------------- | -| 🔴 | 消息发送调度器 (后台任务发送 + 重试) | -| 🟡 | 消息列表分页 API | -| 🟡 | 更多平台适配 (微信/Telegram/Discord) | +| | 更多平台适配 (微信/Telegram/Discord) | | 🟡 | Redis 消息队列集成 | | 🟢 | Plugin 实现 (加密/格式化/翻译) | -| 🟢 | 认证授权机制 | +| 🟢 | Docker 部署 | *** diff --git a/compass/todolist.md b/compass/todolist.md index 1fcfd17..537f613 100644 --- a/compass/todolist.md +++ b/compass/todolist.md @@ -15,23 +15,26 @@ | 04-30 | CI 格式修复 × 3 | import order / 行宽 / 链式调用换行 | | 05-01 | 集成测试补全 | 39 个新测试: broadcaster(11) + protocol(21) + auth(7) | | 05-01 | Phase B: API 补全 | list/stats/PATCH/DELETE 全部完成;status Canceled 拆分修复 | -| 05-01 | Phase C1-C2: 消息流 + Dispatcher | 图片/文件类型扩展、sender 补全、Canceled 跳过、Web 前端同步 | +| 05-01 | Phase C: 消息流完善 | C1 类型识别扩展 Video/Audio;C2 sender/conversation_name 补全 | +| 05-01 | Dispatcher 增强 | try_claim_message 原子认领消除竞态;Canceled 跳过 | +| 05-01 | Web 前端同步 | 仪表盘 6 卡片调用 /api/stats;statusBadge() 状态徽章 | +| 05-01 | MessageType 扩展 | 新增 Video + Audio 变体,8 文件 match 补全 + 前端下拉框 | --- ## 二、下一步开发计划 -### 🔴 Phase A: 消息发送调度核心 (P0) +### 🔴 Phase A: 消息发送调度核心 (P0) ✅ 已完成 -> 当前 `/ws/client` 的 `send_message` 只调用 `send_to_adapter` 但不处理结果/重试。需要后台调度器。 +> Dispatcher 已实现:后台 tokio task 轮询 + 重试 + 并发控制 + try_claim_message 原子认领。 | # | 任务 | 优先级 | 文件 | 说明 | |:--|------|:---:|------|------| -| A1 | 创建 `core/dispatcher.rs` | 🔴 | 新建 | 后台 tokio task 轮询 Pending 消息 → 匹配 Adapter 发送 → 更新状态 | -| A2 | `SenderConfig` 接入调度器 | 🔴 | dispatcher.rs | max_retries / retry_interval / batch_size / concurrency | -| A3 | `MessageRepository::update_status()` | 🔴 | message.rs | 发送成功后更新为 Sent,失败后 retry_count++ | -| A4 | 调度器生命周期管理 | 🔴 | main.rs + dispatcher.rs | start / graceful shutdown | -| A5 | 集成测试 | 🔴 | tests/dispatcher.rs | 模拟发送成功/失败/超时/最大重试 | +| A1 | 创建 `core/dispatcher.rs` | ✅ | dispatcher.rs | 后台 tokio task 轮询 Pending/Sending 消息 → 匹配 Adapter 发送 → 更新状态 | +| A2 | `SenderConfig` 接入调度器 | ✅ | dispatcher.rs | max_retries / retry_interval / batch_size / concurrency | +| A3 | 状态更新 + 原子认领 | ✅ | message.rs | update_message_status / increment_retry / try_claim_message | +| A4 | 调度器生命周期管理 | ✅ | main.rs + dispatcher.rs | start / graceful shutdown via AtomicBool | +| A5 | 集成测试 | ✅ | tests/dispatcher.rs | 5 场景:发送成功/失败重试/Canceled跳过/原子认领/优雅关闭 | ### 🟡 Phase B: API 补全 ✅ 已完成 @@ -42,12 +45,12 @@ | B3 | `GET /api/stats` 统计概览 | ✅ | 5种状态计数 + total | | B4 | `DELETE /api/messages/{id}` | ✅ | 删除消息记录 | -### 🟡 Phase C: 消息流完善 🔴 进行中 +### 🟡 Phase C: 消息流完善 ✅ 已完成 | # | 任务 | 优先级 | 说明 | |:--|------|:---:|------| -| C1 | OneBot 图片/文件消息类型扩展 | ✅ | 根据消息段自动识别 Image/File/Text 类型 | -| C2 | 入站消息 sender 信息补全 | ✅ | conversation_name 从 nickname/群号自动生成 | +| C1 | OneBot 消息类型扩展 | ✅ | segments_message_type 识别 Image/File/Video/Audio/Text 五种 | +| C2 | 入站消息 sender 信息补全 | ✅ | conversation_name 从 nickname/群号自动生成;message_type 动态化 | | C3 | 多平台支持框架就绪 | 🟢 | adapter 注册机制验证 (QQ 已有,可加 mock 测试微信) | ### 🟢 Phase D: 运维与部署 @@ -89,25 +92,19 @@ ├── OneBot v11 适配器 (入站/出站) ├── Web UI SPA (仪表盘/消息流/发送/平台) ├── Access Token 认证 -├── 44 个集成测试 (broadcaster 11 + protocol 21 + auth 7 + repo 5) +├── MessageType 扩展: Text / Image / File / Video / Audio (5 种) +├── MessageStatus 扩展: Pending / Sending / Sent / Failed / Canceled (5 种) +├── 消息发送调度器: 轮询 + 重试 + 并发控制 + try_claim_message 原子认领 +├── HTTP API 完整: list / PATCH / DELETE / stats + health +├── Web 前端: 6 状态卡片 / stats API 对接 / statusBadge() 徽章 +├── 49 个测试全部通过 (broadcaster 11 + protocol 21 + auth 7 + sender 5 + dispatcher 5) +├── try_claim_message 原子性修复 (跨周期重复认领消除) └── CI 格式合规 -⏳ Phase A: 发送调度器 → 目标 2026-05-02 -├── [ ] dispatcher 后台任务 -├── [ ] 状态流转 Pending→Sending→Sent/Failed -├── [ ] 重试逻辑 (max_retries + retry_interval) -└── [ ] 端到端验证: Web UI 发送 → NapCat → QQ 群 - -⏳ Phase B: API 补全 → 目标 2026-05-03 -├── [ ] GET /api/messages 分页 -├── [ ] PATCH /api/messages/{id} -└── [ ] GET /api/stats - -⏳ Phase C-E: 完善 + 部署 → 目标 2026-05-05 -├── [✅] 图片/文件类型识别 -├── [✅] sender 信息补全 -├── [✅] Dispatcher Canceled 跳过 -├── [✅] Web 前端状态同步 -├── [ ] Docker 部署 -└── [ ] 架构优化 +⏳ Next: 运维部署 + 架构优化 → 目标 2026-05-05 +├── [✅] Dispatcher 集成测试 (5 场景全部通过) +├── [ ] Dockerfile + docker-compose +├── [ ] SQLite WAL 模式 +├── [ ] 配置验证 +└── [ ] 架构优化 (REPO → Data) ``` diff --git a/docs/api/README.md b/docs/api/README.md index 9fa1013..547c4ad 100644 --- a/docs/api/README.md +++ b/docs/api/README.md @@ -1,6 +1,6 @@ # ReChat-sender API 文档 -> 版本: 0.1.0 | 更新: 2026-04-26 +> 版本: 0.2.0 | 更新: 2026-05-01 --- @@ -26,12 +26,13 @@ | 端点 | 方法 | 类型 | 说明 | |------|:---:|:---:|------| | `/api/messages` | POST | HTTP | 创建消息 | +| `/api/messages` | GET | HTTP | 消息列表(支持筛选和分页) | | `/api/messages/{id}` | GET | HTTP | 查询单条消息 | +| `/api/messages/{id}` | PATCH | HTTP | 更新消息状态 | +| `/api/messages/{id}` | DELETE | HTTP | 删除消息 | +| `/api/stats` | GET | HTTP | 统计概览(各状态计数) | | `/api/health` | GET | HTTP | 健康检查 | | `/ws/client` | GET | WebSocket | 客户端实时通道 | -| `/` | GET | HTML | 首页 | -| `/send` | GET | HTML | 发送消息页面 | -| `/status` | GET | HTML | 消息状态查询页面 | ### 数据模型 @@ -40,10 +41,10 @@ | 字段 | 类型 | 说明 | |------|------|------| | `id` | String (UUID v4) | 消息唯一 ID | -| `message_type` | String | `"Text"` / `"Image"` / `"File"` | +| `message_type` | String | `"Text"` / `"Image"` / `"File"` / `"Video"` / `"Audio"` | | `content` | String | 消息内容 | | `recipient` | String | 接收者标识(群号/用户 ID/会话 ID) | -| `status` | String | `"Pending"` / `"Sending"` / `"Sent"` / `"Failed"` | +| `status` | String | `"Pending"` / `"Sending"` / `"Sent"` / `"Failed"` / `"Canceled"` | | `created_at` | u64 | 创建时间 (Unix 秒) | | `updated_at` | u64 | 更新时间 (Unix 秒) | | `retry_count` | u32 | 重试次数 | @@ -78,7 +79,7 @@ | 字段 | 类型 | 必填 | 说明 | |------|------|:---:|------| -| `message_type` | String | ✅ | `"Text"` / `"Image"` / `"File"` | +| `message_type` | String | ✅ | `"Text"` / `"Image"` / `"File"` / `"Video"` / `"Audio"` | | `content` | String | ✅ | 消息内容 | | `recipient` | String | ✅ | 接收者标识 | @@ -113,7 +114,7 @@ curl -X POST http://localhost:8080/api/messages \ | 状态码 | 条件 | 示例 | |:---:|------|------| -| `400` | 无效的 message_type | `{"error": "Invalid message type"}` | +| `400` | 无效的 message_type | `{"error": "Invalid message type. Use Text, Image, File, Video, or Audio"}` | | `500` | 数据库未初始化 | `{"error": "Repository not initialized"}` | | `500` | 数据库写入失败 | `{"error": "..."}` | @@ -181,6 +182,136 @@ curl http://localhost:8080/api/health --- +### 2.4 消息列表 + +> `GET /api/messages` + +分页查询消息列表,支持按状态筛选。 + +**查询参数**: + +| 参数 | 类型 | 必填 | 说明 | +|------|------|:---:|------| +| `offset` | usize | ❌ | 偏移量,默认 0 | +| `limit` | usize | ❌ | 每页条数,默认 50,最大 200 | +| `status` | String | ❌ | 按状态筛选,如 `"Failed"` | + +**请求示例**: + +```bash +# 获取最新 20 条 +curl "http://localhost:8080/api/messages?limit=20" + +# 查看所有失败的消息 +curl "http://localhost:8080/api/messages?status=Failed" +``` + +**成功响应** `200 OK`: + +```json +{ + "messages": [ + { + "id": "a1b2c3d4-...", + "message_type": "Text", + "content": "Hello", + "recipient": "group_123", + "status": "Sent", + "created_at": 1714000000, + "updated_at": 1714000005, + "retry_count": 0 + } + ], + "offset": 0, + "limit": 20 +} +``` + +--- + +### 2.5 更新消息状态 + +> `PATCH /api/messages/{id}` + +更新消息状态。 + +**请求体** (JSON): + +| 字段 | 类型 | 必填 | 说明 | +|------|------|:---:|------| +| `status` | String | ✅ | `"Pending"` / `"Sending"` / `"Sent"` / `"Failed"` / `"Canceled"` | + +**请求示例**: + +```bash +curl -X PATCH http://localhost:8080/api/messages/msg-id \ + -H "Content-Type: application/json" \ + -d '{"status": "Canceled"}' +``` + +**成功响应** `200 OK`:返回更新后的消息对象。 + +**错误响应**: + +| 状态码 | 条件 | 示例 | +|:---:|------|------| +| `400` | 无效的状态值 | `{"error": "Invalid status. Use Pending, Sending, Sent, Failed, or Canceled"}` | + +--- + +### 2.6 删除消息 + +> `DELETE /api/messages/{id}` + +删除指定消息。 + +**请求示例**: + +```bash +curl -X DELETE http://localhost:8080/api/messages/msg-id +``` + +**成功响应** `200 OK`: + +```json +{"deleted": true} +``` + +**错误响应**: + +| 状态码 | 条件 | 示例 | +|:---:|------|------| +| `404` | 消息不存在 | `{"error": "Message not found"}` | + +--- + +### 2.7 统计概览 + +> `GET /api/stats` + +获取各状态消息计数及总数。 + +**请求示例**: + +```bash +curl http://localhost:8080/api/stats +``` + +**成功响应** `200 OK`: + +```json +{ + "pending": 5, + "sending": 2, + "sent": 128, + "failed": 3, + "canceled": 1, + "total": 139 +} +``` + +--- + ## 3. WebSocket API > `GET /ws/client` @@ -261,7 +392,7 @@ ws://:/ws/client | `data.platform` | String | ✅ | 目标平台,如 `"qq"` | | `data.conversation` | String | ✅ | 目标会话 ID(群号/用户 ID) | | `data.content` | String | ✅ | 消息内容 | -| `data.message_type` | String | ❌ | `"Text"`(默认)/ `"Image"` / `"File"` | +| `data.message_type` | String | ❌ | `"Text"`(默认)/ `"Image"` / `"File"` / `"Video"` / `"Audio"` | **成功响应**: @@ -315,7 +446,7 @@ ws://:/ws/client | `data.conversation` | String | 会话 ID | | `data.conversation_name` | String? | 会话名称(可选,群名等) | | `data.content` | String | 消息内容 | -| `data.message_type` | String | `Text` / `Image` / `File` | +| `data.message_type` | String | `Text` / `Image` / `File` / `Video` / `Audio` | | `data.sender` | Object? | 发送者信息(可选) | | `data.sender.id` | String | 发送者 ID | | `data.sender.name` | String | 发送者名称 | @@ -352,8 +483,8 @@ ws://:/ws/client | 路径 | 说明 | |------|------| -| `GET /` | 首页 — 导航链接 | -| `GET /send` | 消息发送表单 — 选择消息类型、填写接收者和内容,提交到 `/api/messages` | -| `GET /status` | 消息状态查询 — 输入消息 ID,调用 `/api/messages/{id}` 显示结果 | - -Web 页面为静态 HTML,内嵌 JavaScript 通过 `fetch` 调用 REST API。适合开发调试和简单管理。 +| `GET /` | SPA 入口 — 内嵌式单页应用 | +| 仪表盘 `#dashboard` | 6 状态统计卡片 + 最近消息 | +| 消息流 `#messages` | WebSocket 实时推送 + 搜索/平台筛选 | +| 发送消息 `#send` | 表单提交 → 选择平台/类型/会话/内容 | +| 平台状态 `#platforms` | Adapter 连接状态卡片 | diff --git a/src/adapters/onebot/adapter.rs b/src/adapters/onebot/adapter.rs index c6506fb..d20667e 100644 --- a/src/adapters/onebot/adapter.rs +++ b/src/adapters/onebot/adapter.rs @@ -149,6 +149,18 @@ fn internal_message_to_segments(message: &Message) -> Vec { crate::models::message::MessageType::Image => { vec![MessageSegment::image(&message.content)] } + crate::models::message::MessageType::Video => { + vec![MessageSegment::text(&format!( + "[Video: {}]", + message.content + ))] + } + crate::models::message::MessageType::Audio => { + vec![MessageSegment::text(&format!( + "[Audio: {}]", + message.content + ))] + } crate::models::message::MessageType::File => { vec![MessageSegment::text(&format!( "[File: {}]", diff --git a/src/adapters/onebot/protocol.rs b/src/adapters/onebot/protocol.rs index a583c42..f81db09 100644 --- a/src/adapters/onebot/protocol.rs +++ b/src/adapters/onebot/protocol.rs @@ -142,8 +142,8 @@ impl MessageSegment { for seg in segments { match seg.seg_type.as_str() { "image" => return "Image", - "video" => return "File", - "record" => return "File", + "video" => return "Video", + "record" => return "Audio", "file" => return "File", _ => {} } diff --git a/src/adapters/onebot/ws.rs b/src/adapters/onebot/ws.rs index 00aa5f7..9cecc0d 100644 --- a/src/adapters/onebot/ws.rs +++ b/src/adapters/onebot/ws.rs @@ -158,6 +158,8 @@ fn handle_message_event(broadcaster: &MessageBroadcaster, event: MessageEvent) { let message_type = match msg_type_str { "Image" => MessageType::Image, "File" => MessageType::File, + "Video" => MessageType::Video, + "Audio" => MessageType::Audio, _ => MessageType::Text, }; diff --git a/src/api/endpoints/messages.rs b/src/api/endpoints/messages.rs index c0b7baf..a490016 100644 --- a/src/api/endpoints/messages.rs +++ b/src/api/endpoints/messages.rs @@ -55,9 +55,11 @@ pub async fn create_message( "Text" => MessageType::Text, "Image" => MessageType::Image, "File" => MessageType::File, + "Video" => MessageType::Video, + "Audio" => MessageType::Audio, _ => { return HttpResponse::BadRequest() - .json(serde_json::json!({"error": "Invalid message type"})); + .json(serde_json::json!({"error": "Invalid message type. Use Text, Image, File, Video, or Audio"})); } }; diff --git a/src/api/endpoints/ws_client.rs b/src/api/endpoints/ws_client.rs index a62b2df..e3189eb 100644 --- a/src/api/endpoints/ws_client.rs +++ b/src/api/endpoints/ws_client.rs @@ -175,6 +175,8 @@ async fn handle_command( let msg_type = match message_type.as_str() { "Image" => MessageType::Image, "File" => MessageType::File, + "Video" => MessageType::Video, + "Audio" => MessageType::Audio, _ => MessageType::Text, }; diff --git a/src/cli/mod.rs b/src/cli/mod.rs index b1e2468..6971fb7 100644 --- a/src/cli/mod.rs +++ b/src/cli/mod.rs @@ -61,8 +61,10 @@ pub fn run(repo: Arc>) { "text" => MessageType::Text, "image" => MessageType::Image, "file" => MessageType::File, + "video" => MessageType::Video, + "audio" => MessageType::Audio, _ => { - println!("Invalid message type. Use text, image, or file."); + println!("Invalid message type. Use text, image, file, video, or audio."); return; } }; diff --git a/src/core/message.rs b/src/core/message.rs index 8103506..e6d1b74 100644 --- a/src/core/message.rs +++ b/src/core/message.rs @@ -14,6 +14,8 @@ fn read_message_row( "Text" => MessageType::Text, "Image" => MessageType::Image, "File" => MessageType::File, + "Video" => MessageType::Video, + "Audio" => MessageType::Audio, _ => return None, }; let status = match status_str.as_str() { @@ -121,6 +123,8 @@ impl MessageRepository { "Text" => crate::models::message::MessageType::Text, "Image" => crate::models::message::MessageType::Image, "File" => crate::models::message::MessageType::File, + "Video" => crate::models::message::MessageType::Video, + "Audio" => crate::models::message::MessageType::Audio, _ => return Err(rusqlite::Error::InvalidQuery), }; @@ -156,7 +160,7 @@ impl MessageRepository { pub fn get_pending_messages(&self, limit: usize) -> Result> { let mut stmt = self.conn.prepare( "SELECT id, message_type, content, recipient, status, created_at, updated_at, retry_count - FROM messages WHERE status = 'Pending' OR status = 'Sending' ORDER BY created_at ASC LIMIT ?", + FROM messages WHERE status = 'Pending' ORDER BY created_at ASC LIMIT ?", )?; let mut rows = stmt.query([&limit])?; let mut messages = Vec::new(); @@ -175,6 +179,8 @@ impl MessageRepository { "Text" => crate::models::message::MessageType::Text, "Image" => crate::models::message::MessageType::Image, "File" => crate::models::message::MessageType::File, + "Video" => crate::models::message::MessageType::Video, + "Audio" => crate::models::message::MessageType::Audio, _ => continue, }; @@ -219,15 +225,15 @@ impl MessageRepository { Ok(()) } - /// Atomically claim a message for sending: only succeeds if status is Pending or Sending. - /// Returns true if claimed, false if the message was already canceled/failed/sent. + /// Atomically claim a message for sending: only succeeds if status is Pending. + /// Returns true if claimed, false if already claimed/canceled/failed/sent. pub fn try_claim_message(&self, id: &str) -> Result { let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_secs() as i64; let affected = self.conn.execute( - "UPDATE messages SET status = 'Sending', updated_at = ?1 WHERE id = ?2 AND status IN ('Pending', 'Sending')", + "UPDATE messages SET status = 'Sending', updated_at = ?1 WHERE id = ?2 AND status = 'Pending'", rusqlite::params![now, id], )?; Ok(affected > 0) diff --git a/src/models/message.rs b/src/models/message.rs index cc0f0d0..71c50e1 100644 --- a/src/models/message.rs +++ b/src/models/message.rs @@ -6,6 +6,8 @@ pub enum MessageType { Text, Image, File, + Video, + Audio, } #[derive(Debug, Serialize, Deserialize, PartialEq)] diff --git a/src/web/templates/index.html b/src/web/templates/index.html index 8db1e68..8e1edb0 100644 --- a/src/web/templates/index.html +++ b/src/web/templates/index.html @@ -138,6 +138,9 @@

发送消息

diff --git a/tests/dispatcher.rs b/tests/dispatcher.rs index 2498759..f09ffc6 100644 --- a/tests/dispatcher.rs +++ b/tests/dispatcher.rs @@ -1,26 +1,33 @@ -use rechat_sender::core::adapter::Adapter; -use rechat_sender::core::adapter::AdapterManager; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; + +use rechat_sender::core::adapter::{Adapter, AdapterManager}; use rechat_sender::core::dispatcher::MessageDispatcher; use rechat_sender::core::message::MessageRepository; use rechat_sender::models::message::{Message, MessageStatus, MessageType}; -use std::sync::Arc; -use std::sync::atomic::AtomicU32; -use std::sync::atomic::Ordering; struct MockAdapter { name: String, - fail_count: AtomicU32, - should_fail: bool, + should_fail: AtomicBool, + call_count: AtomicU32, } impl MockAdapter { - fn new(name: &str, should_fail: bool) -> Self { + fn new(name: &str) -> Self { Self { - name: name.into(), - fail_count: AtomicU32::new(0), - should_fail, + name: name.to_string(), + should_fail: AtomicBool::new(false), + call_count: AtomicU32::new(0), } } + + fn set_should_fail(&self, fail: bool) { + self.should_fail.store(fail, Ordering::SeqCst); + } + + fn call_count(&self) -> u32 { + self.call_count.load(Ordering::SeqCst) + } } impl Adapter for MockAdapter { @@ -37,9 +44,9 @@ impl Adapter for MockAdapter { } fn send_message(&self, _message: &Message) -> Result<(), Box> { - if self.should_fail { - self.fail_count.fetch_add(1, Ordering::SeqCst); - Err(Box::new(std::io::Error::other("simulated failure"))) + self.call_count.fetch_add(1, Ordering::SeqCst); + if self.should_fail.load(Ordering::SeqCst) { + Err(Box::new(std::io::Error::other("mock send failure"))) } else { Ok(()) } @@ -50,106 +57,234 @@ impl Adapter for MockAdapter { } } +fn make_pending_message(recipient: &str, content: &str) -> Message { + let mut msg = Message::new( + MessageType::Text, + content.to_string(), + recipient.to_string(), + ); + // Override created_at to ensure ordering in get_pending_messages + msg.created_at = std::time::UNIX_EPOCH; + msg +} + +fn setup_repo(db_path: &str) -> MessageRepository { + MessageRepository::new(db_path).expect("Failed to create test repo") +} + +async fn wait_for_status( + repo: &MessageRepository, + msg_id: &str, + expected: MessageStatus, + timeout_secs: u64, +) -> bool { + let start = std::time::Instant::now(); + loop { + if let Ok(Some(msg)) = repo.get(msg_id) + && msg.status == expected + { + return true; + } + if start.elapsed().as_secs() >= timeout_secs { + return false; + } + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + } +} + #[tokio::test(flavor = "multi_thread")] async fn test_dispatcher_sends_pending_message() { - let temp_db = tempfile::NamedTempFile::new().unwrap(); - let db_path = temp_db.path().to_str().unwrap().to_string(); + let tmp = tempfile::NamedTempFile::new().unwrap(); + let db_path = tmp.path().to_str().unwrap().to_string(); - let repo = MessageRepository::new(&db_path).unwrap(); + let repo = setup_repo(&db_path); + let msg = make_pending_message("mock_backend", "hello world"); + repo.save(&msg).unwrap(); + assert_eq!( + repo.get(&msg.id).unwrap().unwrap().status, + MessageStatus::Pending + ); - let msg = Message::new( - MessageType::Text, - "Test dispatcher".into(), - "test_platform".into(), + let mock = Arc::new(MockAdapter::new("mock_backend")); + let mut manager = AdapterManager::new(); + manager.add_adapter(mock.clone()); + let manager = Arc::new(manager); + + let dispatcher = MessageDispatcher::new( + manager, + db_path.clone(), + 0, // max_retries + 1, // retry_interval (1s, but 0 retries = no wait needed) + 10, + 5, ); + let shutdown = dispatcher.shutdown_handle(); + dispatcher.start(); + + let ok = wait_for_status(&repo, &msg.id, MessageStatus::Sent, 10).await; + shutdown.store(true, Ordering::Relaxed); + + assert!(ok, "Message should transition to Sent"); + assert!(mock.call_count() >= 1, "Adapter should have been called"); + let final_msg = repo.get(&msg.id).unwrap().unwrap(); + assert_eq!(final_msg.status, MessageStatus::Sent); + assert_eq!(final_msg.retry_count, 0); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_dispatcher_skips_canceled_message() { + let tmp = tempfile::NamedTempFile::new().unwrap(); + let db_path = tmp.path().to_str().unwrap().to_string(); + + let repo = setup_repo(&db_path); + let mut msg = make_pending_message("mock_backend", "should be canceled"); + msg.status = MessageStatus::Canceled; repo.save(&msg).unwrap(); - // Close connection so dispatcher can open its own - drop(repo); - let mut am = AdapterManager::new(); - let mock = Arc::new(MockAdapter::new("test_platform", false)); - am.add_adapter(mock.clone()); - let am = Arc::new(am); + let mock = Arc::new(MockAdapter::new("mock_backend")); + let mut manager = AdapterManager::new(); + manager.add_adapter(mock.clone()); + let manager = Arc::new(manager); - let dispatcher = MessageDispatcher::new(am, db_path, 0, 1, 1, 1); + let dispatcher = MessageDispatcher::new(manager, db_path.clone(), 0, 1, 10, 5); let shutdown = dispatcher.shutdown_handle(); dispatcher.start(); - tokio::time::sleep(std::time::Duration::from_secs(2)).await; - + // Wait briefly — canceled messages should not be attempted + tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; shutdown.store(true, Ordering::Relaxed); - tokio::time::sleep(std::time::Duration::from_millis(500)).await; - let repo2 = MessageRepository::new(temp_db.path().to_str().unwrap()).unwrap(); - let updated = repo2.get(&msg.id).unwrap().unwrap(); - assert_eq!(updated.status, MessageStatus::Sent); + assert_eq!( + mock.call_count(), + 0, + "Mock adapter should NOT be called for canceled messages" + ); + let final_msg = repo.get(&msg.id).unwrap().unwrap(); + assert_eq!( + final_msg.status, + MessageStatus::Canceled, + "Canceled message should stay Canceled" + ); } #[tokio::test(flavor = "multi_thread")] -async fn test_dispatcher_marks_as_failed_after_max_retries() { - let temp_db = tempfile::NamedTempFile::new().unwrap(); - let db_path = temp_db.path().to_str().unwrap().to_string(); +async fn test_dispatcher_marks_failed_after_retries() { + let tmp = tempfile::NamedTempFile::new().unwrap(); + let db_path = tmp.path().to_str().unwrap().to_string(); - let repo = MessageRepository::new(&db_path).unwrap(); - - let msg = Message::new( - MessageType::Text, - "Should fail eventually".into(), - "bad_platform".into(), - ); + let repo = setup_repo(&db_path); + let msg = make_pending_message("mock_backend", "will fail"); repo.save(&msg).unwrap(); - drop(repo); - - let mut am = AdapterManager::new(); - let mock = Arc::new(MockAdapter::new("bad_platform", true)); - am.add_adapter(mock.clone()); - let am = Arc::new(am); - let dispatcher = MessageDispatcher::new(am, db_path, 1, 1, 1, 1); + let mock = Arc::new(MockAdapter::new("mock_backend")); + mock.set_should_fail(true); + let mut manager = AdapterManager::new(); + manager.add_adapter(mock.clone()); + let manager = Arc::new(manager); + + let max_retries: u32 = 2; + let dispatcher = MessageDispatcher::new( + manager, + db_path.clone(), + max_retries, + 1, // retry every 1s + 10, + 5, + ); let shutdown = dispatcher.shutdown_handle(); dispatcher.start(); - tokio::time::sleep(std::time::Duration::from_secs(5)).await; - + let ok = wait_for_status(&repo, &msg.id, MessageStatus::Failed, 15).await; shutdown.store(true, Ordering::Relaxed); - tokio::time::sleep(std::time::Duration::from_millis(500)).await; - let repo2 = MessageRepository::new(temp_db.path().to_str().unwrap()).unwrap(); - let updated = repo2.get(&msg.id).unwrap().unwrap(); - assert_eq!(updated.status, MessageStatus::Failed); - assert!(updated.retry_count > 0); + assert!(ok, "Message should transition to Failed after retries"); + // Called once per attempt: attempt 0, 1, 2 = 3 calls (max_retries + 1) + assert_eq!( + mock.call_count(), + max_retries + 1, + "Adapter should be called max_retries+1 times" + ); + let final_msg = repo.get(&msg.id).unwrap().unwrap(); + assert_eq!(final_msg.status, MessageStatus::Failed); + assert_eq!( + final_msg.retry_count, + max_retries + 1, + "retry_count should be max_retries+1" + ); } -#[test] -fn test_update_message_status() { - let temp_db = tempfile::NamedTempFile::new().unwrap(); - let db_path = temp_db.path().to_str().unwrap(); - - let repo = MessageRepository::new(db_path).unwrap(); +#[tokio::test(flavor = "multi_thread")] +async fn test_dispatcher_try_claim_atomicity() { + let tmp = tempfile::NamedTempFile::new().unwrap(); + let db_path = tmp.path().to_str().unwrap().to_string(); - let msg = Message::new(MessageType::Text, "Status test".into(), "user1".into()); + let repo = setup_repo(&db_path); + let msg = make_pending_message("mock_backend", "claim me"); repo.save(&msg).unwrap(); - repo.update_message_status(&msg.id, &MessageStatus::Sent) + // First claim should succeed + assert!( + repo.try_claim_message(&msg.id).unwrap(), + "First claim should succeed" + ); + let claimed = repo.get(&msg.id).unwrap().unwrap(); + assert_eq!(claimed.status, MessageStatus::Sending); + + // Cancel via normal update + repo.update_message_status(&msg.id, &MessageStatus::Canceled) .unwrap(); - let updated = repo.get(&msg.id).unwrap().unwrap(); - assert_eq!(updated.status, MessageStatus::Sent); + // try_claim should fail for canceled message + assert!( + !repo.try_claim_message(&msg.id).unwrap(), + "Claim should fail for canceled message" + ); + + // Another claim on an already Sent message + let msg2 = make_pending_message("mock_backend", "already sent"); + repo.save(&msg2).unwrap(); + repo.update_message_status(&msg2.id, &MessageStatus::Sent) + .unwrap(); + assert!( + !repo.try_claim_message(&msg2.id).unwrap(), + "Claim should fail for already sent message" + ); } -#[test] -fn test_increment_retry() { - let temp_db = tempfile::NamedTempFile::new().unwrap(); - let db_path = temp_db.path().to_str().unwrap(); +#[tokio::test(flavor = "multi_thread")] +async fn test_dispatcher_shutdown_stops_processing() { + let tmp = tempfile::NamedTempFile::new().unwrap(); + let db_path = tmp.path().to_str().unwrap().to_string(); + + let repo = setup_repo(&db_path); - let repo = MessageRepository::new(db_path).unwrap(); + let mock = Arc::new(MockAdapter::new("mock_backend")); + let mut manager = AdapterManager::new(); + manager.add_adapter(mock.clone()); + let manager = Arc::new(manager); - let msg = Message::new(MessageType::Text, "Retry test".into(), "user1".into()); + let dispatcher = MessageDispatcher::new(manager, db_path.clone(), 0, 1, 10, 5); + let shutdown = dispatcher.shutdown_handle(); + shutdown.store(true, Ordering::Relaxed); + dispatcher.start(); + + // After shutdown, save a pending message and wait + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + + let msg = make_pending_message("mock_backend", "after shutdown"); repo.save(&msg).unwrap(); - repo.increment_retry(&msg.id).unwrap(); - repo.increment_retry(&msg.id).unwrap(); + tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; - let updated = repo.get(&msg.id).unwrap().unwrap(); - assert_eq!(updated.retry_count, 2); + assert_eq!( + mock.call_count(), + 0, + "No messages should be sent after shutdown" + ); + let final_msg = repo.get(&msg.id).unwrap().unwrap(); + assert_eq!( + final_msg.status, + MessageStatus::Pending, + "Message saved after shutdown should stay Pending" + ); }