From 03a35e392c8a55a471c207ed911abb5e799a44fc Mon Sep 17 00:00:00 2001 From: Ink-dark Date: Fri, 1 May 2026 09:28:12 +0800 Subject: [PATCH 1/7] test: add 39 integration tests - broadcaster (11), onebot protocol (21), auth (7) --- src/core/auth.rs | 6 +- tests/auth.rs | 42 ++++++ tests/broadcaster.rs | 267 +++++++++++++++++++++++++++++++++++++++ tests/onebot_protocol.rs | 242 +++++++++++++++++++++++++++++++++++ 4 files changed, 553 insertions(+), 4 deletions(-) create mode 100644 tests/auth.rs create mode 100644 tests/broadcaster.rs create mode 100644 tests/onebot_protocol.rs diff --git a/src/core/auth.rs b/src/core/auth.rs index ea93ef0..8a4ead4 100644 --- a/src/core/auth.rs +++ b/src/core/auth.rs @@ -1,7 +1,5 @@ -use actix_web::web; - -pub fn validate_token(token: &web::Data, query: &str) -> bool { - token.get_ref() == query +pub fn validate_token(expected: &str, provided: &str) -> bool { + expected == provided } pub fn extract_token_from_query(query: &str) -> Option<&str> { diff --git a/tests/auth.rs b/tests/auth.rs new file mode 100644 index 0000000..b4ecd20 --- /dev/null +++ b/tests/auth.rs @@ -0,0 +1,42 @@ +use rechat_sender::core::auth; + +#[test] +fn test_validate_token_match() { + assert!(auth::validate_token("secret123", "secret123")); +} + +#[test] +fn test_validate_token_mismatch() { + assert!(!auth::validate_token("secret123", "wrong")); +} + +#[test] +fn test_validate_token_empty() { + assert!(!auth::validate_token("secret123", "")); +} + +#[test] +fn test_extract_token_from_query_found() { + assert_eq!( + auth::extract_token_from_query("access_token=abc123&other=1"), + Some("abc123") + ); +} + +#[test] +fn test_extract_token_from_query_only_token() { + assert_eq!( + auth::extract_token_from_query("access_token=xyz"), + Some("xyz") + ); +} + +#[test] +fn test_extract_token_from_query_not_found() { + assert_eq!(auth::extract_token_from_query("other=1&foo=bar"), None); +} + +#[test] +fn test_extract_token_from_query_empty() { + assert_eq!(auth::extract_token_from_query(""), None); +} diff --git a/tests/broadcaster.rs b/tests/broadcaster.rs new file mode 100644 index 0000000..2406557 --- /dev/null +++ b/tests/broadcaster.rs @@ -0,0 +1,267 @@ +use rechat_sender::core::broadcaster::{ + BroadcastMessage, BroadcastMessageData, ClientSession, MessageBroadcaster, +}; +use std::collections::HashSet; +use tokio::sync::mpsc; + +fn make_broadcast_msg(platform: &str, conversation: &str, content: &str) -> BroadcastMessage { + BroadcastMessage { + msg_type: "new_message".into(), + data: BroadcastMessageData { + id: uuid::Uuid::new_v4().to_string(), + platform: platform.into(), + conversation: conversation.into(), + conversation_name: None, + content: content.into(), + message_type: "Text".into(), + sender: None, + created_at: 0, + }, + } +} + +fn make_session(platforms: Vec<&str>, conversations: Vec<&str>) -> (ClientSession, mpsc::UnboundedReceiver) { + let (tx, rx) = mpsc::unbounded_channel(); + let session = ClientSession { + id: uuid::Uuid::new_v4().to_string(), + platforms: platforms.into_iter().map(|s| s.to_string()).collect(), + conversations: conversations.into_iter().map(|s| s.to_string()).collect(), + sender: tx, + }; + (session, rx) +} + +#[test] +fn test_register_and_count() { + let bc = MessageBroadcaster::new(); + assert_eq!(bc.client_count(), 0); + + let (s1, _rx1) = make_session(vec!["qq"], vec![]); + bc.register(s1); + assert_eq!(bc.client_count(), 1); + + let (s2, _rx2) = make_session(vec!["wechat"], vec![]); + bc.register(s2); + assert_eq!(bc.client_count(), 2); +} + +#[test] +fn test_unregister() { + let bc = MessageBroadcaster::new(); + let (s1, _rx1) = make_session(vec!["qq"], vec![]); + let id = s1.id.clone(); + bc.register(s1); + assert_eq!(bc.client_count(), 1); + + bc.unregister(&id); + assert_eq!(bc.client_count(), 0); + + bc.unregister("nonexistent"); + assert_eq!(bc.client_count(), 0); +} + +#[test] +fn test_subscribe_and_receive() { + let bc = MessageBroadcaster::new(); + let mut platforms = HashSet::new(); + platforms.insert("qq".to_string()); + let (tx, mut rx) = mpsc::unbounded_channel(); + let session_id = uuid::Uuid::new_v4().to_string(); + let session = ClientSession { + id: session_id.clone(), + platforms, + conversations: HashSet::new(), + sender: tx, + }; + bc.register(session); + + bc.subscribe(&session_id, vec!["qq".into()], vec!["group_123".into()]); + + let msg = make_broadcast_msg("qq", "group_123", "Hello QQ"); + bc.broadcast_message("qq", "group_123", &msg); + + // The subscribed session should receive the message + let received = rx.try_recv().unwrap(); + assert!(received.contains("Hello QQ")); +} + +#[test] +fn test_broadcast_filters_by_platform() { + let bc = MessageBroadcaster::new(); + let (tx_qq, mut rx_qq) = mpsc::unbounded_channel(); + let (tx_wx, mut rx_wx) = mpsc::unbounded_channel(); + + let session_qq = ClientSession { + id: uuid::Uuid::new_v4().to_string(), + platforms: ["qq".into()].into(), + conversations: HashSet::new(), + sender: tx_qq, + }; + let session_wx = ClientSession { + id: uuid::Uuid::new_v4().to_string(), + platforms: ["wechat".into()].into(), + conversations: HashSet::new(), + sender: tx_wx, + }; + bc.register(session_qq); + bc.register(session_wx); + + let msg = make_broadcast_msg("qq", "group_123", "QQ only"); + bc.broadcast_message("qq", "group_123", &msg); + + assert!(rx_qq.try_recv().is_ok()); // QQ should receive + assert!(rx_wx.try_recv().is_err()); // WeChat should NOT +} + +#[test] +fn test_broadcast_filters_by_conversation() { + let bc = MessageBroadcaster::new(); + let (tx1, mut rx1) = mpsc::unbounded_channel(); + let (tx2, mut rx2) = mpsc::unbounded_channel(); + + let s1 = ClientSession { + id: uuid::Uuid::new_v4().to_string(), + platforms: ["qq".into()].into(), + conversations: ["group_123".into(), "group_456".into()].into(), + sender: tx1, + }; + let s2 = ClientSession { + id: uuid::Uuid::new_v4().to_string(), + platforms: ["qq".into()].into(), + conversations: ["group_789".into()].into(), + sender: tx2, + }; + bc.register(s1); + bc.register(s2); + + let msg = make_broadcast_msg("qq", "group_123", "To group 123"); + bc.broadcast_message("qq", "group_123", &msg); + + assert!(rx1.try_recv().is_ok()); // session 1 subscribed group_123 + assert!(rx2.try_recv().is_err()); // session 2 only has group_789 +} + +#[test] +fn test_broadcast_stale_session_cleanup() { + let bc = MessageBroadcaster::new(); + let (tx, rx) = mpsc::unbounded_channel::(); + let session_id = uuid::Uuid::new_v4().to_string(); + + let session = ClientSession { + id: session_id.clone(), + platforms: ["qq".into()].into(), + conversations: HashSet::new(), + sender: tx, + }; + bc.register(session); + drop(rx); // close receiver — sender.send() will fail + + let msg = make_broadcast_msg("qq", "group_123", "Stale"); + bc.broadcast_message("qq", "group_123", &msg); + + // The stale session should have been cleaned up + assert_eq!(bc.client_count(), 0); +} + +#[test] +fn test_adapter_status_broadcast() { + let bc = MessageBroadcaster::new(); + let (tx, mut rx) = mpsc::unbounded_channel(); + + let session = ClientSession { + id: uuid::Uuid::new_v4().to_string(), + platforms: ["qq".into()].into(), + conversations: HashSet::new(), + sender: tx, + }; + bc.register(session); + + bc.broadcast_adapter_status("qq", "", "Connected"); + + let received = rx.try_recv().unwrap(); + assert!(received.contains("adapter_status")); + assert!(received.contains("Connected")); +} + +#[test] +fn test_adapter_status_respects_conversation_filter() { + let bc = MessageBroadcaster::new(); + let (tx1, mut rx1) = mpsc::unbounded_channel(); + let (tx2, mut rx2) = mpsc::unbounded_channel(); + + let s1 = ClientSession { + id: uuid::Uuid::new_v4().to_string(), + platforms: ["qq".into()].into(), + conversations: ["group_123".into()].into(), + sender: tx1, + }; + let s2 = ClientSession { + id: uuid::Uuid::new_v4().to_string(), + platforms: ["qq".into()].into(), + conversations: HashSet::new(), + sender: tx2, + }; + bc.register(s1); + bc.register(s2); + + // Broadcast with specific conversation — s1 should not get it (filtered out) + bc.broadcast_adapter_status("qq", "group_999", "Connected"); + + // s1 has group_123, not group_999 — should NOT receive + assert!(rx1.try_recv().is_err()); + // s2 has empty conversations — should receive (empty = match all) + assert!(rx2.try_recv().is_ok()); +} + +#[test] +fn test_subscribe_nonexistent_session() { + let bc = MessageBroadcaster::new(); + // Should not panic + bc.subscribe("nonexistent", vec!["qq".into()], vec![]); + bc.unsubscribe("nonexistent", vec!["qq".into()], vec![]); +} + +#[test] +fn test_subscribe_adds_platforms_and_conversations() { + let bc = MessageBroadcaster::new(); + let (tx, mut rx) = mpsc::unbounded_channel(); + let sid = uuid::Uuid::new_v4().to_string(); + + let session = ClientSession { + id: sid.clone(), + platforms: HashSet::new(), + conversations: HashSet::new(), + sender: tx, + }; + bc.register(session); + + bc.subscribe(&sid, vec!["qq".into()], vec!["group_1".into()]); + + // Now should receive messages on qq/group_1 + let msg = make_broadcast_msg("qq", "group_1", "Hello"); + bc.broadcast_message("qq", "group_1", &msg); + assert!(rx.try_recv().is_ok()); +} + +#[test] +fn test_unsubscribe_removes_platform() { + let bc = MessageBroadcaster::new(); + let mut platforms = HashSet::new(); + platforms.insert("qq".to_string()); + let (tx, mut rx) = mpsc::unbounded_channel(); + let sid = uuid::Uuid::new_v4().to_string(); + + let session = ClientSession { + id: sid.clone(), + platforms, + conversations: HashSet::new(), + sender: tx, + }; + bc.register(session); + + bc.unsubscribe(&sid, vec!["qq".into()], vec![]); + + let msg = make_broadcast_msg("qq", "group_1", "Should NOT receive"); + bc.broadcast_message("qq", "group_1", &msg); + assert!(rx.try_recv().is_err()); +} diff --git a/tests/onebot_protocol.rs b/tests/onebot_protocol.rs new file mode 100644 index 0000000..95cf145 --- /dev/null +++ b/tests/onebot_protocol.rs @@ -0,0 +1,242 @@ +use rechat_sender::adapters::onebot::protocol::{ + ActionRequest, ActionResponse, MessageSegment, OneBotEvent, +}; +use rechat_sender::adapters::onebot::protocol as protocol_mod; +use std::collections::HashMap; + +#[test] +fn test_message_segment_text() { + let seg = MessageSegment::text("Hello World"); + assert_eq!(seg.seg_type, "text"); + assert_eq!(seg.data.get("text").unwrap(), "Hello World"); +} + +#[test] +fn test_message_segment_image() { + let seg = MessageSegment::image("https://example.com/pic.jpg"); + assert_eq!(seg.seg_type, "image"); + assert_eq!(seg.data.get("file").unwrap(), "https://example.com/pic.jpg"); +} + +#[test] +fn test_message_segment_at() { + let seg = MessageSegment::at("12345678"); + assert_eq!(seg.seg_type, "at"); + assert_eq!(seg.data.get("qq").unwrap(), "12345678"); +} + +#[test] +fn test_message_segment_reply() { + let seg = MessageSegment::reply(987654321); + assert_eq!(seg.seg_type, "reply"); + assert_eq!(seg.data.get("id").unwrap(), "987654321"); +} + +#[test] +fn test_to_plain_text_text() { + let seg = MessageSegment::text("你好"); + assert_eq!(seg.to_plain_text(), "你好"); +} + +#[test] +fn test_to_plain_text_at() { + let seg = MessageSegment::at("12345"); + assert_eq!(seg.to_plain_text(), "@12345"); +} + +#[test] +fn test_to_plain_text_reply() { + let seg = MessageSegment::reply(123456); + assert_eq!(seg.to_plain_text(), "[Reply]"); +} + +#[test] +fn test_to_plain_text_unknown_type() { + let mut data = HashMap::new(); + data.insert("url".into(), "https://a.b".into()); + let seg = MessageSegment { + seg_type: "unknown_type".into(), + data, + }; + assert_eq!(seg.to_plain_text(), "[unknown_type]"); +} + +#[test] +fn test_segments_to_text() { + let segments = vec![ + MessageSegment::text("Hello "), + MessageSegment::at("12345"), + MessageSegment::text(" welcome"), + ]; + assert_eq!( + MessageSegment::segments_to_text(&segments), + "Hello @12345 welcome" + ); +} + +#[test] +fn test_segments_to_text_empty() { + assert_eq!(MessageSegment::segments_to_text(&[]), ""); +} + +#[test] +fn test_build_send_private_msg() { + let seg = MessageSegment::text("Hi"); + let action = protocol_mod::build_send_msg_action("private", 123456789, &[seg], Some("echo123")); + assert_eq!(action.action, "send_private_msg"); + assert_eq!(action.params["user_id"], 123456789); + assert_eq!(action.echo.as_deref(), Some("echo123")); +} + +#[test] +fn test_build_send_group_msg() { + let seg = MessageSegment::text("Group message"); + let action = protocol_mod::build_send_msg_action("group", 987654321, &[seg], None); + assert_eq!(action.action, "send_group_msg"); + assert_eq!(action.params["group_id"], 987654321); + assert!(action.echo.is_none()); +} + +#[test] +fn test_build_delete_msg() { + let action = protocol_mod::build_delete_msg_action(42); + assert_eq!(action.action, "delete_msg"); + assert_eq!(action.params["message_id"], 42); + assert!(action.echo.is_none()); +} + +#[test] +fn test_action_request_serialization() { + let action = ActionRequest { + action: "send_private_msg".into(), + params: serde_json::json!({"user_id": 123, "message": "Hi"}), + echo: Some("trace1".into()), + }; + let json = serde_json::to_string(&action).unwrap(); + assert!(json.contains("send_private_msg")); + assert!(json.contains("trace1")); + // echo should be present + assert!(json.contains("echo")); +} + +#[test] +fn test_action_request_serialization_no_echo() { + let action = ActionRequest { + action: "get_version".into(), + params: serde_json::json!({}), + echo: None, + }; + let json = serde_json::to_string(&action).unwrap(); + assert!(!json.contains("echo")); +} + +#[test] +fn test_action_response_deserialization_ok() { + let json = r#"{"status":"ok","retcode":0,"data":{},"echo":"trace1"}"#; + let resp: ActionResponse = serde_json::from_str(json).unwrap(); + assert_eq!(resp.status, "ok"); + assert_eq!(resp.retcode, 0); + assert_eq!(resp.echo.as_deref(), Some("trace1")); +} + +#[test] +fn test_action_response_deserialization_no_echo() { + let json = r#"{"status":"failed","retcode":1404,"data":{"msg":"not found"}}"#; + let resp: ActionResponse = serde_json::from_str(json).unwrap(); + assert_eq!(resp.status, "failed"); + assert_eq!(resp.retcode, 1404); + assert!(resp.echo.is_none()); +} + +#[test] +fn test_onebot_event_deserialize_group_message() { + let json = r#"{ + "time": 1715000000, + "self_id": 123456789, + "post_type": "message", + "message_type": "group", + "sub_type": "normal", + "message_id": 789, + "user_id": 111222333, + "group_id": 999888777, + "message": [{"type":"text","data":{"text":"Hello"}}], + "raw_message": "Hello", + "sender": {"user_id":111222333,"nickname":"User","card":"DisplayName","role":"member"} + }"#; + let event: OneBotEvent = serde_json::from_str(json).unwrap(); + match event { + OneBotEvent::Message(msg) => { + assert_eq!(msg.message_type, "group"); + assert_eq!(msg.user_id, 111222333); + assert_eq!(msg.group_id, Some(999888777)); + assert_eq!(msg.message.len(), 1); + assert_eq!(msg.raw_message, "Hello"); + let sender = msg.sender.unwrap(); + assert_eq!(sender.nickname, "User"); + assert_eq!(sender.card.as_deref(), Some("DisplayName")); + } + _ => panic!("Expected Message event"), + } +} + +#[test] +fn test_onebot_event_deserialize_private_message() { + let json = r#"{ + "time": 1715000000, + "self_id": 100000, + "post_type": "message", + "message_type": "private", + "sub_type": "friend", + "message_id": 1, + "user_id": 200000, + "message": [{"type":"text","data":{"text":"PM"}}], + "raw_message": "PM", + "sender": {"user_id":200000,"nickname":"Friend"} + }"#; + let event: OneBotEvent = serde_json::from_str(json).unwrap(); + match event { + OneBotEvent::Message(msg) => { + assert_eq!(msg.message_type, "private"); + assert!(msg.group_id.is_none()); + } + _ => panic!("Expected Message event"), + } +} + +#[test] +fn test_onebot_event_deserialize_notice() { + let json = r#"{ + "time": 1715000000, + "self_id": 123456789, + "post_type": "notice", + "notice_type": "group_increase", + "user_id": 111222333, + "group_id": 999888777 + }"#; + let event: OneBotEvent = serde_json::from_str(json).unwrap(); + match event { + OneBotEvent::Notice(notice) => { + assert_eq!(notice.notice_type, "group_increase"); + assert_eq!(notice.group_id, Some(999888777)); + } + _ => panic!("Expected Notice event"), + } +} + +#[test] +fn test_onebot_event_deserialize_meta() { + let json = r#"{ + "time": 1715000000, + "self_id": 123456789, + "post_type": "meta_event", + "meta_event_type": "heartbeat", + "status": {} + }"#; + let event: OneBotEvent = serde_json::from_str(json).unwrap(); + match event { + OneBotEvent::Meta(meta) => { + assert_eq!(meta.meta_event_type, "heartbeat"); + } + _ => panic!("Expected Meta event"), + } +} From 7df57266508065c7e09fe5d49646bef59c5c1569 Mon Sep 17 00:00:00 2001 From: Ink-dark Date: Fri, 1 May 2026 09:31:31 +0800 Subject: [PATCH 2/7] =?UTF-8?q?style(test):=20=E8=B0=83=E6=95=B4=E6=B5=8B?= =?UTF-8?q?=E8=AF=95=E4=BB=A3=E7=A0=81=E6=A0=BC=E5=BC=8F=E5=92=8C=E5=AF=BC?= =?UTF-8?q?=E5=85=A5=E9=A1=BA=E5=BA=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 统一格式化测试代码中的函数参数换行 - 调整导入语句顺序 - 统一注释格式 --- tests/broadcaster.rs | 9 ++++++--- tests/onebot_protocol.rs | 2 +- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/tests/broadcaster.rs b/tests/broadcaster.rs index 2406557..467f486 100644 --- a/tests/broadcaster.rs +++ b/tests/broadcaster.rs @@ -20,7 +20,10 @@ fn make_broadcast_msg(platform: &str, conversation: &str, content: &str) -> Broa } } -fn make_session(platforms: Vec<&str>, conversations: Vec<&str>) -> (ClientSession, mpsc::UnboundedReceiver) { +fn make_session( + platforms: Vec<&str>, + conversations: Vec<&str>, +) -> (ClientSession, mpsc::UnboundedReceiver) { let (tx, rx) = mpsc::unbounded_channel(); let session = ClientSession { id: uuid::Uuid::new_v4().to_string(), @@ -109,7 +112,7 @@ fn test_broadcast_filters_by_platform() { let msg = make_broadcast_msg("qq", "group_123", "QQ only"); bc.broadcast_message("qq", "group_123", &msg); - assert!(rx_qq.try_recv().is_ok()); // QQ should receive + assert!(rx_qq.try_recv().is_ok()); // QQ should receive assert!(rx_wx.try_recv().is_err()); // WeChat should NOT } @@ -137,7 +140,7 @@ fn test_broadcast_filters_by_conversation() { let msg = make_broadcast_msg("qq", "group_123", "To group 123"); bc.broadcast_message("qq", "group_123", &msg); - assert!(rx1.try_recv().is_ok()); // session 1 subscribed group_123 + assert!(rx1.try_recv().is_ok()); // session 1 subscribed group_123 assert!(rx2.try_recv().is_err()); // session 2 only has group_789 } diff --git a/tests/onebot_protocol.rs b/tests/onebot_protocol.rs index 95cf145..424a729 100644 --- a/tests/onebot_protocol.rs +++ b/tests/onebot_protocol.rs @@ -1,7 +1,7 @@ +use rechat_sender::adapters::onebot::protocol as protocol_mod; use rechat_sender::adapters::onebot::protocol::{ ActionRequest, ActionResponse, MessageSegment, OneBotEvent, }; -use rechat_sender::adapters::onebot::protocol as protocol_mod; use std::collections::HashMap; #[test] From 17625de3559b2af4f1cb8ab912d8cb0edd90bb97 Mon Sep 17 00:00:00 2001 From: Ink-dark Date: Fri, 1 May 2026 14:50:56 +0800 Subject: [PATCH 3/7] docs: update development roadmap with Phase A-E plan --- compass/todolist.md | 165 +++++++++++++++++++++----------------------- 1 file changed, 80 insertions(+), 85 deletions(-) diff --git a/compass/todolist.md b/compass/todolist.md index c171189..aedf7ff 100644 --- a/compass/todolist.md +++ b/compass/todolist.md @@ -1,113 +1,108 @@ -# ReChat-sender 开发规划与任务清单 +# ReChat-provider 开发规划与任务清单 -> 最后更新: 2026-04-26 +> 最后更新: 2026-05-01 --- -## 一、已完成 - -| # | 任务 | 涉及文件 | 日期 | -|:--|------|---------|:---:| -| ✅ | 修复 REPO 未初始化时 `unwrap()` panic → 安全错误处理 | `src/api/endpoints/messages.rs`, `src/main.rs` | 04-26 | -| ✅ | 修复 `duration_since().unwrap()` → `unwrap_or_default()` + 日志 | `src/api/endpoints/messages.rs`, `src/core/message.rs` | 04-26 | -| ✅ | `Config::load()` 实际调用 — 新增 `--config` CLI 参数 | `src/main.rs` | 04-26 | -| ✅ | `workers` 从配置动态读取(不再硬编码为 1) | `src/main.rs` | 04-26 | -| ✅ | 清理无意义的 `--server` 参数 | `src/main.rs` | 04-26 | -| ✅ | 时间戳以 `i64` 数值存储(不再转字符串) | `src/core/message.rs` | 04-26 | -| ✅ | 移除 `cargo-tarpaulin` dev-dependency | `Cargo.toml` | 04-26 | -| ✅ | 无效枚举值错误类型从 `InvalidColumnType` → `InvalidQuery` | `src/core/message.rs` | 04-26 | -| ✅ | 数据库层时间戳合理性验证(零值/负值检查 + 日志) | `src/core/message.rs` | 04-26 | -| ✅ | API 层 `system_time_to_secs()` 安全转换 + 日志 | `src/api/endpoints/messages.rs` | 04-26 | -| ✅ | 合并远程 `adapter + plugin` 模块到 `main.rs` | `src/main.rs` | 04-26 | +## 一、已完成 (近期) + +| 日期 | 任务 | 说明 | +|:---:|------|------| +| 04-30 | Phase 0-4: OneBot v11 适配器 | protocol / WS 端点 / Adapter trait 全部完成 | +| 04-30 | Web UI 重构 | Vanilla JS SPA,侧边栏布局,亮/暗/Auto 三模式主题 | +| 04-30 | 路由合并修复 | 多个 `web::scope("")` 冲突 → `configure()` | +| 04-30 | Token 认证系统 | 启动生成 UUID → 登录遮罩 → localStorage → API/WS 携带 | +| 04-30 | CI 格式修复 × 3 | import order / 行宽 / 链式调用换行 | +| 05-01 | 集成测试补全 | 39 个新测试: broadcaster(11) + protocol(21) + auth(7) | --- -## 二、待修复(代码质量) +## 二、下一步开发计划 -| # | 任务 | 优先级 | 说明 | -|:--|------|:---:|------| -| 🔧 | `adapter.rs` — `AdapterStats::Default` 改用 `#[derive(Default)]` | 🟢 | Clippy suggestion,减少手写代码 | -| 🔧 | `adapter.rs` — 为 `AdapterManager` 实现 `Default` trait | 🟢 | Clippy suggestion,遵循 Rust 惯例 | -| 🔧 | `plugin.rs` — `PluginStats::Default` 改用 `#[derive(Default)]` | 🟢 | 同上 | -| 🔧 | `plugin.rs` — 为 `PluginManager` 实现 `Default` trait | 🟢 | 同上 | -| 🔧 | `MessageQueue` 未实例化,`services/queue.rs` 为死代码 | 🟡 | 需要 Redis 环境;需决定是集成还是移除此模块 | +### 🔴 Phase A: 消息发送调度核心 (P0) ---- +> 当前 `/ws/client` 的 `send_message` 只调用 `send_to_adapter` 但不处理结果/重试。需要后台调度器。 -## 三、待开发(功能特性) +| # | 任务 | 优先级 | 文件 | 说明 | +|:--|------|:---:|------|------| +| A1 | 创建 `core/dispatcher.rs` | 🔴 | 新建 | 后台 tokio task 轮询 Pending 消息 → 匹配 Adapter 发送 → 更新状态 | +| A2 | `SenderConfig` 接入调度器 | 🔴 | dispatcher.rs | max_retries / retry_interval / batch_size / concurrency | +| A3 | `MessageRepository::update_status()` | 🔴 | message.rs | 发送成功后更新为 Sent,失败后 retry_count++ | +| A4 | 调度器生命周期管理 | 🔴 | main.rs + dispatcher.rs | start / graceful shutdown | +| A5 | 集成测试 | 🔴 | tests/dispatcher.rs | 模拟发送成功/失败/超时/最大重试 | -### 🔴 当前进行中:OneBot v11 Adapter(NapCat/QQ 接入) +### 🟡 Phase B: API 补全 -> 详细计划 → [compass/onebot_adapter_plan.md](compass/onebot_adapter_plan.md) +| # | 任务 | 优先级 | 说明 | +|:--|------|:---:|------| +| B1 | `GET /api/messages` 列表分页 | 🟡 | 支持 `?limit=&offset=&status=` 筛选 | +| B2 | `PATCH /api/messages/{id}` 取消/重发 | 🟡 | `{"status": "Failed"}` 取消待发送消息 | +| B3 | `GET /api/stats` 统计概览 | 🟡 | 今日消息数/各状态计数/平台分布 | +| B4 | `DELETE /api/messages/{id}` | 🟢 | 删除消息记录 | -| Phase | 优先级 | 任务 | 状态 | -|:---:|:---:|------|:---:| -| P0 | 🔴 | 消息广播中枢 + 客户端 WS — `core/broadcaster.rs` + `api/endpoints/ws_client.rs` | ✅ | -| P1 | 🔴 | `src/adapters/onebot/protocol.rs` — OneBot 协议数据模型 | ✅ | -| P2 | 🔴 | `src/adapters/onebot/ws.rs` — 平台 WS 端点 `/onebot/v11/ws` | ✅ | -| P3 | 🔴 | `src/adapters/onebot/adapter.rs` — Adapter trait 实现 | ✅ | -| P4 | 🔴 | 集成:lib.rs / api/mod.rs / main.rs 注册路由与初始化 | ✅ | -| P5 | 🟡 | 测试 + 验证(cargo check / clippy / test) | ⏳ | - -> ✅ **Phase 3-4 已完成** — OneBotAdapter 实现 Adapter trait (send_message / name / start / stop),main.rs 注册适配器到 AdapterManager。剩余 P5 测试验证。 - -### 其他待开发 - -| # | 功能 | 优先级 | 涉及模块 | 说明 | -|:--|------|:---:|---------|------| -| 📦 | Redis 消息队列集成 | 🟡 | `services/queue.rs`, `main.rs` | 将 `MessageQueue` 实例化并接入消息发送流程 | -| 📦 | 更多 Adapter 实现 | 🟡 | `adapters/` | WebSocket、HTTP、Telegram、Discord 等 | -| 📦 | Plugin 实现(消息处理插件) | 🟡 | `core/plugin.rs` | 实现具体的 `Plugin` trait:加密插件、格式化插件等 | -| 📦 | 发送重试与调度逻辑 | 🟡 | `core/sender.rs` (新建) | 利用 `SenderConfig`(max_retries/retry_interval/batch_size) 实现后台发送调度 | -| 📦 | 消息状态流转 API | 🔴 | `api/endpoints/messages.rs` | `GET /api/messages` 列表查询、`PATCH` 取消/重发 | -| 📦 | Adapter 状态 API | 🟢 | `api/endpoints/adapters.rs` (新建) | 查看 adapter 状态、启停 adapter | -| 📦 | Plugin 管理 API | 🟢 | `api/endpoints/plugins.rs` (新建) | 查看/启用/禁用 plugin | -| 📦 | Web 界面完善 | 🟢 | `web/mod.rs`, `templates/` | 使用 Tera 模板重写 HTML 页面,添加实时状态 | -| 📦 | 配置验证 | 🟢 | `core/config.rs` | `Config` 加载后验证字段合法性(端口范围、路径等) | +### 🟡 Phase C: 消息流完善 ---- +| # | 任务 | 优先级 | 说明 | +|:--|------|:---:|------| +| C1 | OneBot 图片/文件消息类型扩展 | 🟡 | 当前只处理 Text 类型 → 扩展 Image/File MessageType | +| C2 | 入站消息 sender 信息补全 | 🟡 | group/private conversation_name 自动生成 | +| C3 | 多平台支持框架就绪 | 🟢 | adapter 注册机制验证 (QQ 已有,可加 mock 测试微信) | -## 四、待开发(测试与文档) +### 🟢 Phase D: 运维与部署 | # | 任务 | 优先级 | 说明 | |:--|------|:---:|------| -| 📝 | API 端点集成测试 | 🟡 | 启动测试服务器,测试 `POST /api/messages`、`GET /api/messages/{id}` | -| 📝 | `MessageRepository` 完整测试 | 🟡 | 测试 `get_pending_messages`、重复 save 覆盖、无效 ID | -| 📝 | `MessageQueue` 测试(需要 Redis) | 🟢 | 测试 push/pop/len/is_empty | -| 📝 | 错误场景测试 | 🟡 | REPO 未初始化时 API 返回 500 而非 panic;无效 message type 返回 400 | - ---- +| D1 | Dockerfile | 🟢 | 多阶段构建,静态链接 musl | +| D2 | docker-compose.yml | 🟢 | ReChat + (可选 Redis) | +| D3 | 配置验证 | 🟢 | `Config` 加载后验证 port 范围、path 可写性 | +| D4 | 环境变量覆盖配置 | 🟢 | `RE_CHAT_*` 前缀覆盖任意字段 | -## 五、架构改进 +### 🟢 Phase E: 架构优化 -| # | 建议 | 优先级 | 说明 | +| # | 任务 | 优先级 | 说明 | |:--|------|:---:|------| -| 🏗 | REPO 从 `thread_local!` 改为 Actix `Data` | 🟡 | 更符合 Actix 惯例,便于测试和注入依赖 | -| 🏗 | 提取发送器核心为独立 actor/service | 🟡 | 将消息发送、重试、状态管理封装为独立模块 | -| 🏗 | `rusqlite` 使用 WAL 模式 | 🟢 | 提升并发读取性能:`PRAGMA journal_mode=WAL;` | -| 🏗 | 配置支持环境变量覆盖 | 🟢 | `Config::load()` 后按 `RE_CHAT_*` 环境变量覆盖字段 | +| E1 | REPO `thread_local!` → `Data` | 🟢 | 更符合 Actix 惯例 | +| E2 | SQLite WAL 模式 | 🟢 | `PRAGMA journal_mode=WAL` 提升并发读 | +| E3 | adapter_manager 列表 API | 🟢 | `GET /api/adapters` 返回 JSON | +| E4 | Web 仪表盘实时数据 | 🟢 | 统计卡片从 WS 获取 live 数据 | --- -## 六、依赖与发布 +## 三、优先级总结 -| # | 任务 | 优先级 | 说明 | -|:--|------|:---:|------| -| 📦 | 关注 `redis 0.24.0` / `net2` 未来兼容性 | 🟢 | Rust 未来版本将拒绝,需等待上游更新 | -| 📦 | Windows 服务化部署脚本 | 🟢 | `scripts/` 下添加安装/卸载 Windows Service 脚本 | -| 📦 | 编写 `DEPLOY.md` | 🟢 | 补充部署步骤、配置文件示例、CLI 使用说明 | +| 优先级 | Phase | 概述 | 预计涉及文件数 | +|:---:|:---:|------|:---:| +| 🔴 P0 | A | **消息发送调度器** — 这是让 OneBot 真正能"发消息到 QQ"的最后一块拼图 | 4 文件 | +| 🟡 P1 | B | **API 补全** — 消息列表/状态变更,前端才能展示历史消息 | 3 文件 | +| 🟡 P1 | C | **消息流完善** — 图片/文件/多平台 | 3 文件 | +| 🟢 P2 | D | **运维部署** — Docker / 配置验证 | 3 文件 | +| 🟢 P3 | E | **架构优化** — REPO / WAL / API | 4 文件 | --- -## 优先级图例 - -| 标记 | 含义 | -|:---:|------| -| 🔴 | 高优先级 — 阻塞性、影响核心功能 | -| 🟡 | 中优先级 — 重要但非阻塞 | -| 🟢 | 低优先级 — 改进优化,按需处理 | -| ✅ | 已完成 | -| 🔧 | 待修复 | -| 📦 | 待开发 | -| 📝 | 待测试 | -| 🏗 | 架构建议 | +## 四、关键里程碑 + +``` +✅ 已完成 — 2026-05-01 +├── OneBot v11 适配器 (入站/出站) +├── Web UI SPA (仪表盘/消息流/发送/平台) +├── Access Token 认证 +├── 44 个集成测试 (broadcaster 11 + protocol 21 + auth 7 + repo 5) +└── CI 格式合规 + +⏳ Phase A: 发送调度器 → 目标 2026-05-02 +├── [ ] dispatcher 后台任务 +├── [ ] 状态流转 Pending→Sending→Sent/Failed +├── [ ] 重试逻辑 (max_retries + retry_interval) +└── [ ] 端到端验证: Web UI 发送 → NapCat → QQ 群 + +⏳ Phase B: API 补全 → 目标 2026-05-03 +├── [ ] GET /api/messages 分页 +├── [ ] PATCH /api/messages/{id} +└── [ ] GET /api/stats + +⏳ Phase C-E: 完善 + 部署 → 目标 2026-05-05 +├── [ ] 图片/文件类型 +├── [ ] Docker 部署 +└── [ ] 架构优化 +``` From b6cfdffa35b77391b45d056b3d585ceb65e98dd1 Mon Sep 17 00:00:00 2001 From: Ink-dark Date: Fri, 1 May 2026 15:18:04 +0800 Subject: [PATCH 4/7] feat(dispatcher): add background message dispatch with retry logic and status transitions --- src/core/dispatcher.rs | 181 +++++++++++++++++++++++++++++++++++++++++ src/core/message.rs | 26 +++++- src/core/mod.rs | 1 + src/main.rs | 11 +++ src/models/message.rs | 2 +- tests/dispatcher.rs | 173 +++++++++++++++++++++++++++++++++++++++ 6 files changed, 392 insertions(+), 2 deletions(-) create mode 100644 src/core/dispatcher.rs create mode 100644 tests/dispatcher.rs diff --git a/src/core/dispatcher.rs b/src/core/dispatcher.rs new file mode 100644 index 0000000..4a17e3a --- /dev/null +++ b/src/core/dispatcher.rs @@ -0,0 +1,181 @@ +use std::sync::Arc; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; + +use crate::core::adapter::AdapterManager; +use crate::core::message::MessageRepository; +use crate::models::message::MessageStatus; + +pub struct MessageDispatcher { + adapter_manager: Arc, + db_path: String, + max_retries: u32, + retry_interval: u64, + batch_size: usize, + concurrency: usize, + shutdown: Arc, +} + +impl MessageDispatcher { + pub fn new( + adapter_manager: Arc, + db_path: String, + max_retries: u32, + retry_interval: u64, + batch_size: usize, + concurrency: usize, + ) -> Self { + Self { + adapter_manager, + db_path, + max_retries, + retry_interval, + batch_size, + concurrency, + shutdown: Arc::new(AtomicBool::new(false)), + } + } + + pub fn shutdown_handle(&self) -> Arc { + self.shutdown.clone() + } + + pub fn start(self) { + let retry_interval_secs = self.retry_interval; + let batch_size = self.batch_size; + let max_retries = self.max_retries; + let concurrency = self.concurrency; + let adapter_manager = self.adapter_manager; + let db_path = self.db_path; + let shutdown = self.shutdown; + + tokio::spawn(async move { + let sem = Arc::new(tokio::sync::Semaphore::new(concurrency)); + + loop { + if shutdown.load(Ordering::Relaxed) { + tracing::info!("Message dispatcher shutting down gracefully"); + sem.close(); + break; + } + + let pending = match MessageRepository::new(&db_path) { + Ok(repo) => repo + .get_pending_messages(batch_size) + .unwrap_or_default(), + Err(e) => { + tracing::error!(error = %e, "Dispatcher failed to read pending messages"); + tokio::time::sleep( + tokio::time::Duration::from_secs(retry_interval_secs), + ) + .await; + continue; + } + }; + + if pending.is_empty() { + tokio::time::sleep( + tokio::time::Duration::from_secs(retry_interval_secs), + ) + .await; + continue; + } + + for message in pending { + let sem = sem.clone(); + let am = adapter_manager.clone(); + let db_path = db_path.clone(); + let shutdown = shutdown.clone(); + + tokio::spawn(async move { + let _permit = sem.acquire().await; + + let repo = match MessageRepository::new(&db_path) { + Ok(r) => r, + Err(e) => { + tracing::error!( + error = %e, + "Failed to open DB in dispatcher worker" + ); + return; + } + }; + + let mut sent = false; + + for attempt in 0..=max_retries { + if shutdown.load(Ordering::Relaxed) { + return; + } + + if attempt > 0 { + tracing::info!( + message_id = %message.id, + attempt = attempt, + max_retries = max_retries, + "Retrying message send" + ); + tokio::time::sleep( + tokio::time::Duration::from_secs(retry_interval_secs), + ) + .await; + } + + match am.send_to_adapter(&message.recipient, &message) { + Ok(()) => { + tracing::info!( + message_id = %message.id, + "Message sent successfully" + ); + if let Err(e) = repo.update_message_status( + &message.id, + &MessageStatus::Sent, + ) { + tracing::error!( + error = %e, + message_id = %message.id, + "Failed to update message status to Sent" + ); + } + sent = true; + break; + } + Err(e) => { + tracing::warn!( + message_id = %message.id, + attempt = attempt, + error = %e, + "Failed to send message" + ); + if let Err(e) = repo.increment_retry(&message.id) { + tracing::error!( + error = %e, + "Failed to increment retry count" + ); + } + } + } + } + + if !sent { + tracing::warn!( + message_id = %message.id, + "Message failed after all retries, marking as Failed" + ); + if let Err(e) = repo.update_message_status( + &message.id, + &MessageStatus::Failed, + ) { + tracing::error!( + error = %e, + message_id = %message.id, + "Failed to update message status to Failed" + ); + } + } + }); + } + } + }); + } +} diff --git a/src/core/message.rs b/src/core/message.rs index b09a6ef..a236522 100644 --- a/src/core/message.rs +++ b/src/core/message.rs @@ -122,7 +122,7 @@ impl MessageRepository { pub fn get_pending_messages(&self, limit: usize) -> Result> { let mut stmt = self.conn.prepare( "SELECT id, message_type, content, recipient, status, created_at, updated_at, retry_count - FROM messages WHERE status = 'Pending' OR status = 'Sending' LIMIT ?", + FROM messages WHERE status = 'Pending' OR status = 'Sending' ORDER BY created_at ASC LIMIT ?", )?; let mut rows = stmt.query([&limit])?; let mut messages = Vec::new(); @@ -171,4 +171,28 @@ impl MessageRepository { Ok(messages) } + + pub fn update_message_status(&self, id: &str, status: &MessageStatus) -> Result<()> { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs() as i64; + self.conn.execute( + "UPDATE messages SET status = ?1, updated_at = ?2 WHERE id = ?3", + rusqlite::params![format!("{:?}", status), now, id], + )?; + Ok(()) + } + + pub fn increment_retry(&self, id: &str) -> Result<()> { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs() as i64; + self.conn.execute( + "UPDATE messages SET retry_count = retry_count + 1, updated_at = ?1 WHERE id = ?2", + rusqlite::params![now, id], + )?; + Ok(()) + } } diff --git a/src/core/mod.rs b/src/core/mod.rs index 2e27494..f6a56fb 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -2,6 +2,7 @@ pub mod adapter; pub mod auth; pub mod broadcaster; pub mod config; +pub mod dispatcher; pub mod logging; pub mod message; pub mod plugin; diff --git a/src/main.rs b/src/main.rs index 09b43a1..57623ba 100644 --- a/src/main.rs +++ b/src/main.rs @@ -71,6 +71,17 @@ async fn main() -> std::io::Result<()> { adapter_manager.start_all(); plugin_manager.initialize_all(); + let sender_cfg = &config.sender; + let dispatcher = core::dispatcher::MessageDispatcher::new( + adapter_manager.clone(), + db_path.clone(), + sender_cfg.max_retries, + sender_cfg.retry_interval, + sender_cfg.batch_size, + sender_cfg.concurrency, + ); + dispatcher.start(); + tracing::info!( host = %config.server.host, port = config.server.port, diff --git a/src/models/message.rs b/src/models/message.rs index b645c80..8fdc615 100644 --- a/src/models/message.rs +++ b/src/models/message.rs @@ -8,7 +8,7 @@ pub enum MessageType { File, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, PartialEq)] pub enum MessageStatus { Pending, Sending, diff --git a/tests/dispatcher.rs b/tests/dispatcher.rs new file mode 100644 index 0000000..556e4be --- /dev/null +++ b/tests/dispatcher.rs @@ -0,0 +1,173 @@ +use rechat_sender::core::adapter::Adapter; +use rechat_sender::core::adapter::AdapterManager; +use rechat_sender::core::dispatcher::MessageDispatcher; +use rechat_sender::core::message::MessageRepository; +use rechat_sender::models::message::{Message, MessageStatus, MessageType}; +use std::sync::atomic::AtomicU32; +use std::sync::atomic::Ordering; +use std::sync::Arc; + +struct MockAdapter { + name: String, + fail_count: AtomicU32, + should_fail: bool, +} + +impl MockAdapter { + fn new(name: &str, should_fail: bool) -> Self { + Self { + name: name.into(), + fail_count: AtomicU32::new(0), + should_fail, + } + } +} + +impl Adapter for MockAdapter { + fn name(&self) -> &str { + &self.name + } + + fn start(&self) -> Result<(), Box> { + Ok(()) + } + + fn stop(&self) -> Result<(), Box> { + Ok(()) + } + + fn send_message( + &self, + _message: &Message, + ) -> Result<(), Box> { + if self.should_fail { + self.fail_count.fetch_add(1, Ordering::SeqCst); + Err(Box::new(std::io::Error::new( + std::io::ErrorKind::Other, + "simulated failure", + ))) + } else { + Ok(()) + } + } + + fn receive_message( + &self, + ) -> Result, Box> { + Ok(None) + } +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_dispatcher_sends_pending_message() { + let temp_db = tempfile::NamedTempFile::new().unwrap(); + let db_path = temp_db.path().to_str().unwrap().to_string(); + + let repo = MessageRepository::new(&db_path).unwrap(); + + let msg = Message::new( + MessageType::Text, + "Test dispatcher".into(), + "test_platform".into(), + ); + repo.save(&msg).unwrap(); + // Close connection so dispatcher can open its own + drop(repo); + + let mut am = AdapterManager::new(); + let mock = Arc::new(MockAdapter::new("test_platform", false)); + am.add_adapter(mock.clone()); + let am = Arc::new(am); + + let dispatcher = MessageDispatcher::new(am, db_path, 0, 1, 1, 1); + let shutdown = dispatcher.shutdown_handle(); + dispatcher.start(); + + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + + shutdown.store(true, Ordering::Relaxed); + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + + let repo2 = + MessageRepository::new(&temp_db.path().to_str().unwrap()).unwrap(); + let updated = repo2.get(&msg.id).unwrap().unwrap(); + assert_eq!(updated.status, MessageStatus::Sent); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_dispatcher_marks_as_failed_after_max_retries() { + let temp_db = tempfile::NamedTempFile::new().unwrap(); + let db_path = temp_db.path().to_str().unwrap().to_string(); + + let repo = MessageRepository::new(&db_path).unwrap(); + + let msg = Message::new( + MessageType::Text, + "Should fail eventually".into(), + "bad_platform".into(), + ); + repo.save(&msg).unwrap(); + drop(repo); + + let mut am = AdapterManager::new(); + let mock = Arc::new(MockAdapter::new("bad_platform", true)); + am.add_adapter(mock.clone()); + let am = Arc::new(am); + + let dispatcher = MessageDispatcher::new(am, db_path, 1, 1, 1, 1); + let shutdown = dispatcher.shutdown_handle(); + dispatcher.start(); + + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + + shutdown.store(true, Ordering::Relaxed); + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + + let repo2 = + MessageRepository::new(&temp_db.path().to_str().unwrap()).unwrap(); + let updated = repo2.get(&msg.id).unwrap().unwrap(); + assert_eq!(updated.status, MessageStatus::Failed); + assert!(updated.retry_count > 0); +} + +#[test] +fn test_update_message_status() { + let temp_db = tempfile::NamedTempFile::new().unwrap(); + let db_path = temp_db.path().to_str().unwrap(); + + let repo = MessageRepository::new(db_path).unwrap(); + + let msg = Message::new( + MessageType::Text, + "Status test".into(), + "user1".into(), + ); + repo.save(&msg).unwrap(); + + repo.update_message_status(&msg.id, &MessageStatus::Sent) + .unwrap(); + + let updated = repo.get(&msg.id).unwrap().unwrap(); + assert_eq!(updated.status, MessageStatus::Sent); +} + +#[test] +fn test_increment_retry() { + let temp_db = tempfile::NamedTempFile::new().unwrap(); + let db_path = temp_db.path().to_str().unwrap(); + + let repo = MessageRepository::new(db_path).unwrap(); + + let msg = Message::new( + MessageType::Text, + "Retry test".into(), + "user1".into(), + ); + repo.save(&msg).unwrap(); + + repo.increment_retry(&msg.id).unwrap(); + repo.increment_retry(&msg.id).unwrap(); + + let updated = repo.get(&msg.id).unwrap().unwrap(); + assert_eq!(updated.retry_count, 2); +} From 78ae0170d61c5f08128674d98f017d8603c6b576 Mon Sep 17 00:00:00 2001 From: Ink-dark Date: Fri, 1 May 2026 15:19:02 +0800 Subject: [PATCH 5/7] =?UTF-8?q?refactor(dispatcher):=20=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E4=BB=A3=E7=A0=81=E6=A0=BC=E5=BC=8F=E5=B9=B6=E7=AE=80=E5=8C=96?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E5=A4=84=E7=90=86=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 统一导入语句顺序 - 简化方法参数格式 - 移除不必要的括号和换行 - 保持代码风格一致性 --- src/core/dispatcher.rs | 35 +++++++++++++---------------------- tests/dispatcher.rs | 29 +++++++---------------------- 2 files changed, 20 insertions(+), 44 deletions(-) diff --git a/src/core/dispatcher.rs b/src/core/dispatcher.rs index 4a17e3a..d49f6f7 100644 --- a/src/core/dispatcher.rs +++ b/src/core/dispatcher.rs @@ -60,24 +60,17 @@ impl MessageDispatcher { } let pending = match MessageRepository::new(&db_path) { - Ok(repo) => repo - .get_pending_messages(batch_size) - .unwrap_or_default(), + Ok(repo) => repo.get_pending_messages(batch_size).unwrap_or_default(), Err(e) => { tracing::error!(error = %e, "Dispatcher failed to read pending messages"); - tokio::time::sleep( - tokio::time::Duration::from_secs(retry_interval_secs), - ) - .await; + tokio::time::sleep(tokio::time::Duration::from_secs(retry_interval_secs)) + .await; continue; } }; if pending.is_empty() { - tokio::time::sleep( - tokio::time::Duration::from_secs(retry_interval_secs), - ) - .await; + tokio::time::sleep(tokio::time::Duration::from_secs(retry_interval_secs)).await; continue; } @@ -115,9 +108,9 @@ impl MessageDispatcher { max_retries = max_retries, "Retrying message send" ); - tokio::time::sleep( - tokio::time::Duration::from_secs(retry_interval_secs), - ) + tokio::time::sleep(tokio::time::Duration::from_secs( + retry_interval_secs, + )) .await; } @@ -127,10 +120,9 @@ impl MessageDispatcher { message_id = %message.id, "Message sent successfully" ); - if let Err(e) = repo.update_message_status( - &message.id, - &MessageStatus::Sent, - ) { + if let Err(e) = repo + .update_message_status(&message.id, &MessageStatus::Sent) + { tracing::error!( error = %e, message_id = %message.id, @@ -162,10 +154,9 @@ impl MessageDispatcher { message_id = %message.id, "Message failed after all retries, marking as Failed" ); - if let Err(e) = repo.update_message_status( - &message.id, - &MessageStatus::Failed, - ) { + if let Err(e) = + repo.update_message_status(&message.id, &MessageStatus::Failed) + { tracing::error!( error = %e, message_id = %message.id, diff --git a/tests/dispatcher.rs b/tests/dispatcher.rs index 556e4be..10482f8 100644 --- a/tests/dispatcher.rs +++ b/tests/dispatcher.rs @@ -3,9 +3,9 @@ use rechat_sender::core::adapter::AdapterManager; use rechat_sender::core::dispatcher::MessageDispatcher; use rechat_sender::core::message::MessageRepository; use rechat_sender::models::message::{Message, MessageStatus, MessageType}; +use std::sync::Arc; use std::sync::atomic::AtomicU32; use std::sync::atomic::Ordering; -use std::sync::Arc; struct MockAdapter { name: String, @@ -36,10 +36,7 @@ impl Adapter for MockAdapter { Ok(()) } - fn send_message( - &self, - _message: &Message, - ) -> Result<(), Box> { + fn send_message(&self, _message: &Message) -> Result<(), Box> { if self.should_fail { self.fail_count.fetch_add(1, Ordering::SeqCst); Err(Box::new(std::io::Error::new( @@ -51,9 +48,7 @@ impl Adapter for MockAdapter { } } - fn receive_message( - &self, - ) -> Result, Box> { + fn receive_message(&self) -> Result, Box> { Ok(None) } } @@ -88,8 +83,7 @@ async fn test_dispatcher_sends_pending_message() { shutdown.store(true, Ordering::Relaxed); tokio::time::sleep(std::time::Duration::from_millis(500)).await; - let repo2 = - MessageRepository::new(&temp_db.path().to_str().unwrap()).unwrap(); + let repo2 = MessageRepository::new(&temp_db.path().to_str().unwrap()).unwrap(); let updated = repo2.get(&msg.id).unwrap().unwrap(); assert_eq!(updated.status, MessageStatus::Sent); } @@ -123,8 +117,7 @@ async fn test_dispatcher_marks_as_failed_after_max_retries() { shutdown.store(true, Ordering::Relaxed); tokio::time::sleep(std::time::Duration::from_millis(500)).await; - let repo2 = - MessageRepository::new(&temp_db.path().to_str().unwrap()).unwrap(); + let repo2 = MessageRepository::new(&temp_db.path().to_str().unwrap()).unwrap(); let updated = repo2.get(&msg.id).unwrap().unwrap(); assert_eq!(updated.status, MessageStatus::Failed); assert!(updated.retry_count > 0); @@ -137,11 +130,7 @@ fn test_update_message_status() { let repo = MessageRepository::new(db_path).unwrap(); - let msg = Message::new( - MessageType::Text, - "Status test".into(), - "user1".into(), - ); + let msg = Message::new(MessageType::Text, "Status test".into(), "user1".into()); repo.save(&msg).unwrap(); repo.update_message_status(&msg.id, &MessageStatus::Sent) @@ -158,11 +147,7 @@ fn test_increment_retry() { let repo = MessageRepository::new(db_path).unwrap(); - let msg = Message::new( - MessageType::Text, - "Retry test".into(), - "user1".into(), - ); + let msg = Message::new(MessageType::Text, "Retry test".into(), "user1".into()); repo.save(&msg).unwrap(); repo.increment_retry(&msg.id).unwrap(); From e99edf522a91df94409e7571a2e109535f746425 Mon Sep 17 00:00:00 2001 From: Ink-dark Date: Fri, 1 May 2026 15:28:00 +0800 Subject: [PATCH 6/7] fix: resolve clippy lints in dispatcher test (io_other_error + needless_borrow) --- tests/dispatcher.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/tests/dispatcher.rs b/tests/dispatcher.rs index 10482f8..2498759 100644 --- a/tests/dispatcher.rs +++ b/tests/dispatcher.rs @@ -39,10 +39,7 @@ impl Adapter for MockAdapter { fn send_message(&self, _message: &Message) -> Result<(), Box> { if self.should_fail { self.fail_count.fetch_add(1, Ordering::SeqCst); - Err(Box::new(std::io::Error::new( - std::io::ErrorKind::Other, - "simulated failure", - ))) + Err(Box::new(std::io::Error::other("simulated failure"))) } else { Ok(()) } @@ -83,7 +80,7 @@ async fn test_dispatcher_sends_pending_message() { shutdown.store(true, Ordering::Relaxed); tokio::time::sleep(std::time::Duration::from_millis(500)).await; - let repo2 = MessageRepository::new(&temp_db.path().to_str().unwrap()).unwrap(); + let repo2 = MessageRepository::new(temp_db.path().to_str().unwrap()).unwrap(); let updated = repo2.get(&msg.id).unwrap().unwrap(); assert_eq!(updated.status, MessageStatus::Sent); } @@ -117,7 +114,7 @@ async fn test_dispatcher_marks_as_failed_after_max_retries() { shutdown.store(true, Ordering::Relaxed); tokio::time::sleep(std::time::Duration::from_millis(500)).await; - let repo2 = MessageRepository::new(&temp_db.path().to_str().unwrap()).unwrap(); + let repo2 = MessageRepository::new(temp_db.path().to_str().unwrap()).unwrap(); let updated = repo2.get(&msg.id).unwrap().unwrap(); assert_eq!(updated.status, MessageStatus::Failed); assert!(updated.retry_count > 0); From 16b8096f1a7166c5c18daac80ad958f9cf712607 Mon Sep 17 00:00:00 2001 From: Ink-dark Date: Fri, 1 May 2026 15:40:55 +0800 Subject: [PATCH 7/7] fix(dispatcher): reuse SQLite connection in poll loop to prevent connection leak --- src/core/dispatcher.rs | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/src/core/dispatcher.rs b/src/core/dispatcher.rs index d49f6f7..cbb82ae 100644 --- a/src/core/dispatcher.rs +++ b/src/core/dispatcher.rs @@ -50,6 +50,14 @@ impl MessageDispatcher { let shutdown = self.shutdown; tokio::spawn(async move { + let poll_repo = match MessageRepository::new(&db_path) { + Ok(r) => r, + Err(e) => { + tracing::error!(error = %e, db_path = %db_path, "Dispatcher failed to open database"); + return; + } + }; + let sem = Arc::new(tokio::sync::Semaphore::new(concurrency)); loop { @@ -59,15 +67,9 @@ impl MessageDispatcher { break; } - let pending = match MessageRepository::new(&db_path) { - Ok(repo) => repo.get_pending_messages(batch_size).unwrap_or_default(), - Err(e) => { - tracing::error!(error = %e, "Dispatcher failed to read pending messages"); - tokio::time::sleep(tokio::time::Duration::from_secs(retry_interval_secs)) - .await; - continue; - } - }; + let pending = poll_repo + .get_pending_messages(batch_size) + .unwrap_or_default(); if pending.is_empty() { tokio::time::sleep(tokio::time::Duration::from_secs(retry_interval_secs)).await;