diff --git a/compass/todolist.md b/compass/todolist.md index aedf7ff..1fcfd17 100644 --- a/compass/todolist.md +++ b/compass/todolist.md @@ -14,6 +14,8 @@ | 04-30 | Token 认证系统 | 启动生成 UUID → 登录遮罩 → localStorage → API/WS 携带 | | 04-30 | CI 格式修复 × 3 | import order / 行宽 / 链式调用换行 | | 05-01 | 集成测试补全 | 39 个新测试: broadcaster(11) + protocol(21) + auth(7) | +| 05-01 | Phase B: API 补全 | list/stats/PATCH/DELETE 全部完成;status Canceled 拆分修复 | +| 05-01 | Phase C1-C2: 消息流 + Dispatcher | 图片/文件类型扩展、sender 补全、Canceled 跳过、Web 前端同步 | --- @@ -31,21 +33,21 @@ | A4 | 调度器生命周期管理 | 🔴 | main.rs + dispatcher.rs | start / graceful shutdown | | A5 | 集成测试 | 🔴 | tests/dispatcher.rs | 模拟发送成功/失败/超时/最大重试 | -### 🟡 Phase B: API 补全 +### 🟡 Phase B: API 补全 ✅ 已完成 | # | 任务 | 优先级 | 说明 | |:--|------|:---:|------| -| B1 | `GET /api/messages` 列表分页 | 🟡 | 支持 `?limit=&offset=&status=` 筛选 | -| B2 | `PATCH /api/messages/{id}` 取消/重发 | 🟡 | `{"status": "Failed"}` 取消待发送消息 | -| B3 | `GET /api/stats` 统计概览 | 🟡 | 今日消息数/各状态计数/平台分布 | -| B4 | `DELETE /api/messages/{id}` | 🟢 | 删除消息记录 | +| B1 | `GET /api/messages` 列表分页 | ✅ | 支持 `?limit=&offset=&status=` 筛选 | +| B2 | `PATCH /api/messages/{id}` 取消/重发 | ✅ | 支持 Pending/Sending/Sent/Failed/Canceled | +| B3 | `GET /api/stats` 统计概览 | ✅ | 5种状态计数 + total | +| B4 | `DELETE /api/messages/{id}` | ✅ | 删除消息记录 | -### 🟡 Phase C: 消息流完善 +### 🟡 Phase C: 消息流完善 🔴 进行中 | # | 任务 | 优先级 | 说明 | |:--|------|:---:|------| -| C1 | OneBot 图片/文件消息类型扩展 | 🟡 | 当前只处理 Text 类型 → 扩展 Image/File MessageType | -| C2 | 入站消息 sender 信息补全 | 🟡 | group/private conversation_name 自动生成 | +| C1 | OneBot 图片/文件消息类型扩展 | ✅ | 根据消息段自动识别 Image/File/Text 类型 | +| C2 | 入站消息 sender 信息补全 | ✅ | conversation_name 从 nickname/群号自动生成 | | C3 | 多平台支持框架就绪 | 🟢 | adapter 注册机制验证 (QQ 已有,可加 mock 测试微信) | ### 🟢 Phase D: 运维与部署 @@ -102,7 +104,10 @@ └── [ ] GET /api/stats ⏳ Phase C-E: 完善 + 部署 → 目标 2026-05-05 -├── [ ] 图片/文件类型 +├── [✅] 图片/文件类型识别 +├── [✅] sender 信息补全 +├── [✅] Dispatcher Canceled 跳过 +├── [✅] Web 前端状态同步 ├── [ ] Docker 部署 └── [ ] 架构优化 ``` diff --git a/src/adapters/onebot/protocol.rs b/src/adapters/onebot/protocol.rs index 03fc614..a583c42 100644 --- a/src/adapters/onebot/protocol.rs +++ b/src/adapters/onebot/protocol.rs @@ -136,6 +136,20 @@ impl MessageSegment { .collect::>() .join("") } + + /// 从消息段数组推断消息类型 + pub fn segments_message_type(segments: &[MessageSegment]) -> &'static str { + for seg in segments { + match seg.seg_type.as_str() { + "image" => return "Image", + "video" => return "File", + "record" => return "File", + "file" => return "File", + _ => {} + } + } + "Text" + } } // ========== 通知事件 ========== diff --git a/src/adapters/onebot/ws.rs b/src/adapters/onebot/ws.rs index 749ffb0..00aa5f7 100644 --- a/src/adapters/onebot/ws.rs +++ b/src/adapters/onebot/ws.rs @@ -132,17 +132,38 @@ fn handle_onebot_event(broadcaster: &MessageBroadcaster, event: OneBotEvent) { fn handle_message_event(broadcaster: &MessageBroadcaster, event: MessageEvent) { let platform = "qq".to_string(); - let conversation = if let Some(gid) = event.group_id { - format!("group_{}", gid) + let (conversation, conversation_name) = if let Some(gid) = event.group_id { + ( + format!("group_{}", gid), + event + .sender + .as_ref() + .map(|s| s.card.clone().unwrap_or_else(|| s.nickname.clone())) + .or_else(|| Some(format!("群聊 {}", gid))), + ) } else { - format!("private_{}", event.user_id) + ( + format!("private_{}", event.user_id), + event + .sender + .as_ref() + .map(|s| s.nickname.clone()) + .or_else(|| Some(format!("用户 {}", event.user_id))), + ) }; let raw_text = super::protocol::MessageSegment::segments_to_text(&event.message); + let msg_type_str = super::protocol::MessageSegment::segments_message_type(&event.message); + + let message_type = match msg_type_str { + "Image" => MessageType::Image, + "File" => MessageType::File, + _ => MessageType::Text, + }; let message = Message { id: uuid::Uuid::new_v4().to_string(), - message_type: MessageType::Text, + message_type, content: raw_text.clone(), recipient: conversation.clone(), status: MessageStatus::Pending, @@ -168,9 +189,9 @@ fn handle_message_event(broadcaster: &MessageBroadcaster, event: MessageEvent) { id: message.id.clone(), platform: platform.clone(), conversation: conversation.clone(), - conversation_name: None, + conversation_name, content: raw_text, - message_type: "Text".into(), + message_type: msg_type_str.into(), sender: event.sender.map(|s| crate::core::broadcaster::SenderInfo { id: s.user_id.to_string(), name: s.card.unwrap_or(s.nickname), diff --git a/src/core/dispatcher.rs b/src/core/dispatcher.rs index cbb82ae..17445e9 100644 --- a/src/core/dispatcher.rs +++ b/src/core/dispatcher.rs @@ -96,6 +96,26 @@ impl MessageDispatcher { } }; + // Atomically claim: only proceeds if Pending/Sending, not Canceled + match repo.try_claim_message(&message.id) { + Ok(true) => {} + Ok(false) => { + tracing::info!( + message_id = %message.id, + "Message already claimed or canceled, skipping" + ); + return; + } + Err(e) => { + tracing::error!( + error = %e, + message_id = %message.id, + "Failed to claim message" + ); + return; + } + } + let mut sent = false; for attempt in 0..=max_retries { diff --git a/src/core/message.rs b/src/core/message.rs index 0b520d9..8103506 100644 --- a/src/core/message.rs +++ b/src/core/message.rs @@ -219,6 +219,20 @@ impl MessageRepository { Ok(()) } + /// Atomically claim a message for sending: only succeeds if status is Pending or Sending. + /// Returns true if claimed, false if the message was already canceled/failed/sent. + pub fn try_claim_message(&self, id: &str) -> Result { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs() as i64; + let affected = self.conn.execute( + "UPDATE messages SET status = 'Sending', updated_at = ?1 WHERE id = ?2 AND status IN ('Pending', 'Sending')", + rusqlite::params![now, id], + )?; + Ok(affected > 0) + } + pub fn increment_retry(&self, id: &str) -> Result<()> { let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) diff --git a/src/web/templates/app.js b/src/web/templates/app.js index 491fe92..03c5a9a 100644 --- a/src/web/templates/app.js +++ b/src/web/templates/app.js @@ -164,17 +164,17 @@ function updateWSStatus(connected) { // ========== Dashboard ========== function refreshDashboard() { - try { - j('/api/health').then(function () { - document.getElementById('statToday').textContent = messageCache.length; - document.getElementById('statPlatforms').textContent = '--'; - document.getElementById('statClients').textContent = '--'; - document.getElementById('statPending').textContent = '--'; - renderRecentMessages(); - }); - } catch (e) { - document.getElementById('statToday').textContent = '错误'; - } + j('/api/stats').then(function (stats) { + document.getElementById('statPending').textContent = stats.pending || 0; + document.getElementById('statSending').textContent = stats.sending || 0; + document.getElementById('statSent').textContent = stats.sent || 0; + document.getElementById('statFailed').textContent = stats.failed || 0; + document.getElementById('statCanceled').textContent = stats.canceled || 0; + document.getElementById('statTotal').textContent = stats.total || 0; + }).catch(function () { + document.getElementById('statPending').textContent = 'ERR'; + }); + renderRecentMessages(); } function renderRecentMessages() { @@ -188,9 +188,9 @@ function renderRecentMessages() { var time = new Date(m.created_at * 1000).toLocaleTimeString('zh-CN'); return '' + '' + esc(m.platform) + '' + - '' + esc(m.conversation.substring(0, 20)) + '' + - '' + esc(m.content.substring(0, 50)) + '' + - '已接收' + + '' + esc((m.conversation || '').substring(0, 20)) + '' + + '' + esc((m.content || '').substring(0, 50)) + '' + + '' + statusBadge(m.status || 'Pending') + '' + '' + time + '' + ''; }).join(''); @@ -214,11 +214,11 @@ function renderMessages() { var sender = m.sender ? m.sender.name : '--'; return '' + '' + esc(m.platform) + '' + - '' + esc(m.conversation.substring(0, 24)) + '' + + '' + esc((m.conversation || '').substring(0, 24)) + '' + '' + esc(sender) + '' + - '' + esc(m.content.substring(0, 40)) + '' + + '' + esc((m.content || '').substring(0, 40)) + '' + '' + esc(m.message_type) + '' + - '已接收' + + '' + statusBadge(m.status || 'Pending') + '' + '' + time + '' + ''; }).join(''); @@ -299,6 +299,25 @@ function esc(s) { return String(s).replace(/&/g, '&').replace(//g, '>').replace(/"/g, '"'); } +function statusBadge(status) { + var map = { + Sent: 'success', + Pending: 'warning', + Sending: 'info', + Failed: 'danger', + Canceled: 'danger' + }; + var label = { + Sent: '已发送', + Pending: '待发送', + Sending: '发送中', + Failed: '失败', + Canceled: '已取消' + }; + var cls = map[status] || 'info'; + return '' + (label[status] || status || '未知') + ''; +} + // ========== Boot ========== if (accessToken) { diff --git a/src/web/templates/index.html b/src/web/templates/index.html index c8562e8..8db1e68 100644 --- a/src/web/templates/index.html +++ b/src/web/templates/index.html @@ -69,20 +69,28 @@

ReChat 登录

仪表盘

-
今日消息
-
--
+
待发送
+
--
-
活跃平台
-
--
+
发送中
+
--
-
WS 客户端
-
--
+
已发送
+
--
-
待发送
-
--
+
失败
+
--
+
+
+
已取消
+
--
+
+
+
总计
+
--