From 61526a25b44fffed9c3c35ac8f01d3e6cfa76835 Mon Sep 17 00:00:00 2001 From: Ink-dark Date: Fri, 1 May 2026 17:27:23 +0800 Subject: [PATCH 1/3] =?UTF-8?q?feat:=20=E5=AE=9E=E7=8E=B0=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E7=B1=BB=E5=9E=8B=E6=8E=A8=E6=96=AD=E3=80=81=E5=8F=96?= =?UTF-8?q?=E6=B6=88=E6=B6=88=E6=81=AF=E5=A4=84=E7=90=86=E5=92=8C=E4=BB=AA?= =?UTF-8?q?=E8=A1=A8=E7=9B=98=E7=BB=9F=E8=AE=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 在 OneBot 协议中添加消息类型推断功能 - 在 Dispatcher 中增加对已取消消息的跳过处理 - 重构前端仪表盘统计显示,支持多种消息状态 - 完善 Web 前端消息状态显示和样式 --- compass/todolist.md | 23 ++++++++------ src/adapters/onebot/protocol.rs | 14 +++++++++ src/adapters/onebot/ws.rs | 33 ++++++++++++++++---- src/core/dispatcher.rs | 11 +++++++ src/web/templates/app.js | 53 ++++++++++++++++++++++----------- src/web/templates/index.html | 24 ++++++++++----- 6 files changed, 118 insertions(+), 40 deletions(-) diff --git a/compass/todolist.md b/compass/todolist.md index aedf7ff..1fcfd17 100644 --- a/compass/todolist.md +++ b/compass/todolist.md @@ -14,6 +14,8 @@ | 04-30 | Token 认证系统 | 启动生成 UUID → 登录遮罩 → localStorage → API/WS 携带 | | 04-30 | CI 格式修复 × 3 | import order / 行宽 / 链式调用换行 | | 05-01 | 集成测试补全 | 39 个新测试: broadcaster(11) + protocol(21) + auth(7) | +| 05-01 | Phase B: API 补全 | list/stats/PATCH/DELETE 全部完成;status Canceled 拆分修复 | +| 05-01 | Phase C1-C2: 消息流 + Dispatcher | 图片/文件类型扩展、sender 补全、Canceled 跳过、Web 前端同步 | --- @@ -31,21 +33,21 @@ | A4 | 调度器生命周期管理 | 🔴 | main.rs + dispatcher.rs | start / graceful shutdown | | A5 | 集成测试 | 🔴 | tests/dispatcher.rs | 模拟发送成功/失败/超时/最大重试 | -### 🟡 Phase B: API 补全 +### 🟡 Phase B: API 补全 ✅ 已完成 | # | 任务 | 优先级 | 说明 | |:--|------|:---:|------| -| B1 | `GET /api/messages` 列表分页 | 🟡 | 支持 `?limit=&offset=&status=` 筛选 | -| B2 | `PATCH /api/messages/{id}` 取消/重发 | 🟡 | `{"status": "Failed"}` 取消待发送消息 | -| B3 | `GET /api/stats` 统计概览 | 🟡 | 今日消息数/各状态计数/平台分布 | -| B4 | `DELETE /api/messages/{id}` | 🟢 | 删除消息记录 | +| B1 | `GET /api/messages` 列表分页 | ✅ | 支持 `?limit=&offset=&status=` 筛选 | +| B2 | `PATCH /api/messages/{id}` 取消/重发 | ✅ | 支持 Pending/Sending/Sent/Failed/Canceled | +| B3 | `GET /api/stats` 统计概览 | ✅ | 5种状态计数 + total | +| B4 | `DELETE /api/messages/{id}` | ✅ | 删除消息记录 | -### 🟡 Phase C: 消息流完善 +### 🟡 Phase C: 消息流完善 🔴 进行中 | # | 任务 | 优先级 | 说明 | |:--|------|:---:|------| -| C1 | OneBot 图片/文件消息类型扩展 | 🟡 | 当前只处理 Text 类型 → 扩展 Image/File MessageType | -| C2 | 入站消息 sender 信息补全 | 🟡 | group/private conversation_name 自动生成 | +| C1 | OneBot 图片/文件消息类型扩展 | ✅ | 根据消息段自动识别 Image/File/Text 类型 | +| C2 | 入站消息 sender 信息补全 | ✅ | conversation_name 从 nickname/群号自动生成 | | C3 | 多平台支持框架就绪 | 🟢 | adapter 注册机制验证 (QQ 已有,可加 mock 测试微信) | ### 🟢 Phase D: 运维与部署 @@ -102,7 +104,10 @@ └── [ ] GET /api/stats ⏳ Phase C-E: 完善 + 部署 → 目标 2026-05-05 -├── [ ] 图片/文件类型 +├── [✅] 图片/文件类型识别 +├── [✅] sender 信息补全 +├── [✅] Dispatcher Canceled 跳过 +├── [✅] Web 前端状态同步 ├── [ ] Docker 部署 └── [ ] 架构优化 ``` diff --git a/src/adapters/onebot/protocol.rs b/src/adapters/onebot/protocol.rs index 03fc614..3dbac4e 100644 --- a/src/adapters/onebot/protocol.rs +++ b/src/adapters/onebot/protocol.rs @@ -136,6 +136,20 @@ impl MessageSegment { .collect::>() .join("") } + + /// 从消息段数组推断消息类型 + pub fn segments_message_type(segments: &[MessageSegment]) -> &'static str { + for seg in segments { + match seg.seg_type.as_str() { + "image" => return "Image", + "video" => return "Image", + "record" => return "File", + "file" => return "File", + _ => {} + } + } + "Text" + } } // ========== 通知事件 ========== diff --git a/src/adapters/onebot/ws.rs b/src/adapters/onebot/ws.rs index 749ffb0..00aa5f7 100644 --- a/src/adapters/onebot/ws.rs +++ b/src/adapters/onebot/ws.rs @@ -132,17 +132,38 @@ fn handle_onebot_event(broadcaster: &MessageBroadcaster, event: OneBotEvent) { fn handle_message_event(broadcaster: &MessageBroadcaster, event: MessageEvent) { let platform = "qq".to_string(); - let conversation = if let Some(gid) = event.group_id { - format!("group_{}", gid) + let (conversation, conversation_name) = if let Some(gid) = event.group_id { + ( + format!("group_{}", gid), + event + .sender + .as_ref() + .map(|s| s.card.clone().unwrap_or_else(|| s.nickname.clone())) + .or_else(|| Some(format!("群聊 {}", gid))), + ) } else { - format!("private_{}", event.user_id) + ( + format!("private_{}", event.user_id), + event + .sender + .as_ref() + .map(|s| s.nickname.clone()) + .or_else(|| Some(format!("用户 {}", event.user_id))), + ) }; let raw_text = super::protocol::MessageSegment::segments_to_text(&event.message); + let msg_type_str = super::protocol::MessageSegment::segments_message_type(&event.message); + + let message_type = match msg_type_str { + "Image" => MessageType::Image, + "File" => MessageType::File, + _ => MessageType::Text, + }; let message = Message { id: uuid::Uuid::new_v4().to_string(), - message_type: MessageType::Text, + message_type, content: raw_text.clone(), recipient: conversation.clone(), status: MessageStatus::Pending, @@ -168,9 +189,9 @@ fn handle_message_event(broadcaster: &MessageBroadcaster, event: MessageEvent) { id: message.id.clone(), platform: platform.clone(), conversation: conversation.clone(), - conversation_name: None, + conversation_name, content: raw_text, - message_type: "Text".into(), + message_type: msg_type_str.into(), sender: event.sender.map(|s| crate::core::broadcaster::SenderInfo { id: s.user_id.to_string(), name: s.card.unwrap_or(s.nickname), diff --git a/src/core/dispatcher.rs b/src/core/dispatcher.rs index cbb82ae..56e2fe6 100644 --- a/src/core/dispatcher.rs +++ b/src/core/dispatcher.rs @@ -96,6 +96,17 @@ impl MessageDispatcher { } }; + // Skip if the message was canceled between poll and dispatch + if let Ok(Some(current)) = repo.get(&message.id) + && current.status == MessageStatus::Canceled + { + tracing::info!( + message_id = %message.id, + "Skipping canceled message" + ); + return; + } + let mut sent = false; for attempt in 0..=max_retries { diff --git a/src/web/templates/app.js b/src/web/templates/app.js index 491fe92..03c5a9a 100644 --- a/src/web/templates/app.js +++ b/src/web/templates/app.js @@ -164,17 +164,17 @@ function updateWSStatus(connected) { // ========== Dashboard ========== function refreshDashboard() { - try { - j('/api/health').then(function () { - document.getElementById('statToday').textContent = messageCache.length; - document.getElementById('statPlatforms').textContent = '--'; - document.getElementById('statClients').textContent = '--'; - document.getElementById('statPending').textContent = '--'; - renderRecentMessages(); - }); - } catch (e) { - document.getElementById('statToday').textContent = '错误'; - } + j('/api/stats').then(function (stats) { + document.getElementById('statPending').textContent = stats.pending || 0; + document.getElementById('statSending').textContent = stats.sending || 0; + document.getElementById('statSent').textContent = stats.sent || 0; + document.getElementById('statFailed').textContent = stats.failed || 0; + document.getElementById('statCanceled').textContent = stats.canceled || 0; + document.getElementById('statTotal').textContent = stats.total || 0; + }).catch(function () { + document.getElementById('statPending').textContent = 'ERR'; + }); + renderRecentMessages(); } function renderRecentMessages() { @@ -188,9 +188,9 @@ function renderRecentMessages() { var time = new Date(m.created_at * 1000).toLocaleTimeString('zh-CN'); return '' + '' + esc(m.platform) + '' + - '' + esc(m.conversation.substring(0, 20)) + '' + - '' + esc(m.content.substring(0, 50)) + '' + - '已接收' + + '' + esc((m.conversation || '').substring(0, 20)) + '' + + '' + esc((m.content || '').substring(0, 50)) + '' + + '' + statusBadge(m.status || 'Pending') + '' + '' + time + '' + ''; }).join(''); @@ -214,11 +214,11 @@ function renderMessages() { var sender = m.sender ? m.sender.name : '--'; return '' + '' + esc(m.platform) + '' + - '' + esc(m.conversation.substring(0, 24)) + '' + + '' + esc((m.conversation || '').substring(0, 24)) + '' + '' + esc(sender) + '' + - '' + esc(m.content.substring(0, 40)) + '' + + '' + esc((m.content || '').substring(0, 40)) + '' + '' + esc(m.message_type) + '' + - '已接收' + + '' + statusBadge(m.status || 'Pending') + '' + '' + time + '' + ''; }).join(''); @@ -299,6 +299,25 @@ function esc(s) { return String(s).replace(/&/g, '&').replace(//g, '>').replace(/"/g, '"'); } +function statusBadge(status) { + var map = { + Sent: 'success', + Pending: 'warning', + Sending: 'info', + Failed: 'danger', + Canceled: 'danger' + }; + var label = { + Sent: '已发送', + Pending: '待发送', + Sending: '发送中', + Failed: '失败', + Canceled: '已取消' + }; + var cls = map[status] || 'info'; + return '' + (label[status] || status || '未知') + ''; +} + // ========== Boot ========== if (accessToken) { diff --git a/src/web/templates/index.html b/src/web/templates/index.html index c8562e8..8db1e68 100644 --- a/src/web/templates/index.html +++ b/src/web/templates/index.html @@ -69,20 +69,28 @@

ReChat 登录

仪表盘

-
今日消息
-
--
+
待发送
+
--
-
活跃平台
-
--
+
发送中
+
--
-
WS 客户端
-
--
+
已发送
+
--
-
待发送
-
--
+
失败
+
--
+
+
+
已取消
+
--
+
+
+
总计
+
--
From 8c64142629a845e81102c324918173d75c66b606 Mon Sep 17 00:00:00 2001 From: Ink-dark Date: Fri, 1 May 2026 17:30:49 +0800 Subject: [PATCH 2/3] =?UTF-8?q?fix(onebot):=20=E4=BF=AE=E6=AD=A3=E8=A7=86?= =?UTF-8?q?=E9=A2=91=E6=B6=88=E6=81=AF=E6=AE=B5=E7=B1=BB=E5=9E=8B=E9=94=99?= =?UTF-8?q?=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 将视频消息段的返回类型从"Image"改为"File",以保持与其他文件类型消息段的一致性 --- src/adapters/onebot/protocol.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/adapters/onebot/protocol.rs b/src/adapters/onebot/protocol.rs index 3dbac4e..a583c42 100644 --- a/src/adapters/onebot/protocol.rs +++ b/src/adapters/onebot/protocol.rs @@ -142,7 +142,7 @@ impl MessageSegment { for seg in segments { match seg.seg_type.as_str() { "image" => return "Image", - "video" => return "Image", + "video" => return "File", "record" => return "File", "file" => return "File", _ => {} From eef16a769eedadebcaa0630b6de35f7f1afdf2a0 Mon Sep 17 00:00:00 2001 From: Ink-dark Date: Fri, 1 May 2026 17:40:38 +0800 Subject: [PATCH 3/3] =?UTF-8?q?feat(=E6=B6=88=E6=81=AF=E5=A4=84=E7=90=86):?= =?UTF-8?q?=20=E6=B7=BB=E5=8A=A0=E5=8E=9F=E5=AD=90=E6=80=A7=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E8=AE=A4=E9=A2=86=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 实现 try_claim_message 方法用于原子性地认领待发送消息,避免竞态条件 在 dispatcher 中使用该方法替代原有的状态检查逻辑 --- src/core/dispatcher.rs | 27 ++++++++++++++++++--------- src/core/message.rs | 14 ++++++++++++++ 2 files changed, 32 insertions(+), 9 deletions(-) diff --git a/src/core/dispatcher.rs b/src/core/dispatcher.rs index 56e2fe6..17445e9 100644 --- a/src/core/dispatcher.rs +++ b/src/core/dispatcher.rs @@ -96,15 +96,24 @@ impl MessageDispatcher { } }; - // Skip if the message was canceled between poll and dispatch - if let Ok(Some(current)) = repo.get(&message.id) - && current.status == MessageStatus::Canceled - { - tracing::info!( - message_id = %message.id, - "Skipping canceled message" - ); - return; + // Atomically claim: only proceeds if Pending/Sending, not Canceled + match repo.try_claim_message(&message.id) { + Ok(true) => {} + Ok(false) => { + tracing::info!( + message_id = %message.id, + "Message already claimed or canceled, skipping" + ); + return; + } + Err(e) => { + tracing::error!( + error = %e, + message_id = %message.id, + "Failed to claim message" + ); + return; + } } let mut sent = false; diff --git a/src/core/message.rs b/src/core/message.rs index 0b520d9..8103506 100644 --- a/src/core/message.rs +++ b/src/core/message.rs @@ -219,6 +219,20 @@ impl MessageRepository { Ok(()) } + /// Atomically claim a message for sending: only succeeds if status is Pending or Sending. + /// Returns true if claimed, false if the message was already canceled/failed/sent. + pub fn try_claim_message(&self, id: &str) -> Result { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs() as i64; + let affected = self.conn.execute( + "UPDATE messages SET status = 'Sending', updated_at = ?1 WHERE id = ?2 AND status IN ('Pending', 'Sending')", + rusqlite::params![now, id], + )?; + Ok(affected > 0) + } + pub fn increment_retry(&self, id: &str) -> Result<()> { let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH)