diff --git a/src/adapters/onebot/adapter.rs b/src/adapters/onebot/adapter.rs index 467b43f..c6506fb 100644 --- a/src/adapters/onebot/adapter.rs +++ b/src/adapters/onebot/adapter.rs @@ -43,45 +43,95 @@ impl Adapter for OneBotAdapter { fn send_message(&self, message: &Message) -> Result<(), Box> { let segments = internal_message_to_segments(message); - // Determine if group or private based on conversation ID format let (message_type, target_id) = if message.recipient.starts_with("group_") { let gid: i64 = message .recipient .strip_prefix("group_") .and_then(|s| s.parse().ok()) - .unwrap_or(0); + .ok_or_else(|| { + let err = format!( + "Invalid group ID format in recipient: {}", + message.recipient + ); + tracing::error!( + message_id = %message.id, + recipient = %message.recipient, + "{}", + err + ); + err + })?; ("group", gid) } else if message.recipient.starts_with("private_") { let uid: i64 = message .recipient .strip_prefix("private_") .and_then(|s| s.parse().ok()) - .unwrap_or(0); + .ok_or_else(|| { + let err = format!( + "Invalid private ID format in recipient: {}", + message.recipient + ); + tracing::error!( + message_id = %message.id, + recipient = %message.recipient, + "{}", + err + ); + err + })?; ("private", uid) } else { - // Try to parse as numeric; default to group - let id: i64 = message.recipient.parse().ok().unwrap_or(0); + let id: i64 = message.recipient.parse().ok().ok_or_else(|| { + let err = format!("Unrecognized recipient format: {}", message.recipient); + tracing::error!( + message_id = %message.id, + recipient = %message.recipient, + "{}", + err + ); + err + })?; ("group", id) }; let action = protocol::build_send_msg_action(message_type, target_id, &segments, Some(&message.id)); - let sender = self - .sender - .lock() - .map_err(|e| format!("OneBot sender mutex poisoned: {}", e))?; + let sender = self.sender.lock().map_err(|e| { + let err = format!("OneBot sender mutex poisoned: {}", e); + tracing::error!( + message_id = %message.id, + adapter = %self.name, + "{}", + err + ); + err + })?; - sender - .as_ref() - .ok_or("OneBot sender not initialized")? - .send(action) - .map_err(|e| format!("Failed to send OneBot action: {}", e))?; + let sender_ref = sender.as_ref().ok_or_else(|| { + let err = format!("OneBot sender not initialized for adapter '{}'", self.name); + tracing::error!(message_id = %message.id, adapter = %self.name, "{}", err); + err + })?; + + sender_ref.send(action).map_err(|e| { + let err = format!("Failed to send OneBot action: {}", e); + tracing::error!( + message_id = %message.id, + adapter = %self.name, + conversation = %message.recipient, + "{}", + err + ); + err + })?; tracing::info!( message_id = %message.id, conversation = %message.recipient, - "OneBot message sent" + adapter = %self.name, + "OneBot message sent successfully" ); Ok(()) } diff --git a/src/adapters/onebot/ws.rs b/src/adapters/onebot/ws.rs index 46dc8e6..749ffb0 100644 --- a/src/adapters/onebot/ws.rs +++ b/src/adapters/onebot/ws.rs @@ -35,7 +35,10 @@ pub async fn onebot_ws( let (action_tx, mut action_rx) = mpsc::unbounded_channel::(); // Store the sender so the adapter can use it later { - let mut sender = onebot_sender.lock().unwrap(); + let mut sender = onebot_sender.lock().map_err(|e| { + tracing::error!("Failed to acquire OneBot sender lock: {}", e); + actix_web::error::ErrorInternalServerError("Internal server error") + })?; *sender = Some(action_tx); } diff --git a/src/api/endpoints/messages.rs b/src/api/endpoints/messages.rs index c1b9d80..c0b7baf 100644 --- a/src/api/endpoints/messages.rs +++ b/src/api/endpoints/messages.rs @@ -120,3 +120,137 @@ pub async fn get_message(id: web::Path) -> impl Responder { pub async fn health_check() -> impl Responder { HttpResponse::Ok().json(serde_json::json!({"status": "ok"})) } + +#[derive(Debug, Serialize, Deserialize)] +pub struct MessageListQuery { + pub offset: Option, + pub limit: Option, + pub status: Option, +} + +pub async fn list_messages(query: web::Query) -> impl Responder { + let offset = query.offset.unwrap_or(0); + let limit = query.limit.unwrap_or(50).min(200); + let status = query.status.as_deref(); + + let result = crate::REPO.with(|repo| { + repo.borrow() + .as_ref() + .map(|r| r.list(status, offset, limit)) + }); + match result { + Some(Ok(messages)) => { + let items: Vec = + messages.into_iter().map(MessageResponse::from).collect(); + HttpResponse::Ok().json(serde_json::json!({ + "messages": items, + "offset": offset, + "limit": limit, + })) + } + Some(Err(e)) => { + tracing::error!(error = %e, "Failed to list messages"); + HttpResponse::InternalServerError().json(serde_json::json!({"error": e.to_string()})) + } + None => { + tracing::error!("Repository not initialized during list call"); + HttpResponse::InternalServerError() + .json(serde_json::json!({"error": "Repository not initialized"})) + } + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct PatchMessageRequest { + pub status: String, +} + +pub async fn update_message( + id: web::Path, + body: web::Json, +) -> impl Responder { + let new_status = match body.status.as_str() { + "Pending" => crate::models::message::MessageStatus::Pending, + "Sending" => crate::models::message::MessageStatus::Sending, + "Sent" => crate::models::message::MessageStatus::Sent, + "Canceled" => crate::models::message::MessageStatus::Canceled, + "Failed" => crate::models::message::MessageStatus::Failed, + _ => { + return HttpResponse::BadRequest() + .json(serde_json::json!({"error": "Invalid status. Use Pending, Sending, Sent, Failed, or Canceled"})); + } + }; + + let result = crate::REPO.with(|repo| { + repo.borrow() + .as_ref() + .map(|r| r.update_message_status(&id, &new_status)) + }); + match result { + Some(Ok(())) => { + let msg_result = crate::REPO.with(|repo| repo.borrow().as_ref().map(|r| r.get(&id))); + match msg_result { + Some(Ok(Some(msg))) => HttpResponse::Ok().json(MessageResponse::from(msg)), + _ => HttpResponse::Ok().json(serde_json::json!({"status": "updated"})), + } + } + Some(Err(e)) => { + tracing::error!(error = %e, message_id = %id.into_inner(), "Failed to update message"); + HttpResponse::InternalServerError().json(serde_json::json!({"error": e.to_string()})) + } + None => { + tracing::error!("Repository not initialized during update call"); + HttpResponse::InternalServerError() + .json(serde_json::json!({"error": "Repository not initialized"})) + } + } +} + +pub async fn delete_message(id: web::Path) -> impl Responder { + let result = crate::REPO.with(|repo| repo.borrow().as_ref().map(|r| r.delete(&id))); + match result { + Some(Ok(true)) => HttpResponse::Ok().json(serde_json::json!({"deleted": true})), + Some(Ok(false)) => { + HttpResponse::NotFound().json(serde_json::json!({"error": "Message not found"})) + } + Some(Err(e)) => { + tracing::error!(error = %e, message_id = %id.into_inner(), "Failed to delete message"); + HttpResponse::InternalServerError().json(serde_json::json!({"error": e.to_string()})) + } + None => { + tracing::error!("Repository not initialized during delete call"); + HttpResponse::InternalServerError() + .json(serde_json::json!({"error": "Repository not initialized"})) + } + } +} + +pub async fn get_stats() -> impl Responder { + let result = crate::REPO.with(|repo| { + let r = repo.borrow(); + let repo_ref = r.as_ref()?; + let pending = repo_ref.count_by_status("Pending").unwrap_or(0); + let sending = repo_ref.count_by_status("Sending").unwrap_or(0); + let sent = repo_ref.count_by_status("Sent").unwrap_or(0); + let failed = repo_ref.count_by_status("Failed").unwrap_or(0); + let canceled = repo_ref.count_by_status("Canceled").unwrap_or(0); + Some((pending, sending, sent, failed, canceled)) + }); + match result { + Some((pending, sending, sent, failed, canceled)) => { + HttpResponse::Ok().json(serde_json::json!({ + "pending": pending, + "sending": sending, + "sent": sent, + "failed": failed, + "canceled": canceled, + "total": pending + sending + sent + failed + canceled, + })) + } + None => { + tracing::error!("Repository not initialized during stats call"); + HttpResponse::InternalServerError() + .json(serde_json::json!({"error": "Repository not initialized"})) + } + } +} diff --git a/src/api/endpoints/ws_client.rs b/src/api/endpoints/ws_client.rs index 806acf4..a62b2df 100644 --- a/src/api/endpoints/ws_client.rs +++ b/src/api/endpoints/ws_client.rs @@ -198,7 +198,20 @@ async fn handle_command( } if let Err(e) = adapter_manager.send_to_adapter(&platform, &message) { - tracing::warn!(platform = %platform, error = %e, "No adapter found for platform"); + tracing::error!( + platform = %platform, + message_id = %message.id, + error = %e, + "Failed to send message via adapter" + ); + crate::REPO.with(|repo| { + if let Some(r) = repo.borrow().as_ref() { + let _ = r.update_message_status( + &message.id, + &crate::models::message::MessageStatus::Failed, + ); + } + }); } let now = message diff --git a/src/api/mod.rs b/src/api/mod.rs index a5fc8c5..e989efc 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -8,9 +8,19 @@ pub fn config(cfg: &mut web::ServiceConfig) { cfg.service( web::scope("/api") - .service(web::resource("/messages").route(web::post().to(messages::create_message))) - .service(web::resource("/messages/{id}").route(web::get().to(messages::get_message))) + .service( + web::resource("/messages") + .route(web::get().to(messages::list_messages)) + .route(web::post().to(messages::create_message)), + ) + .service( + web::resource("/messages/{id}") + .route(web::get().to(messages::get_message)) + .route(web::patch().to(messages::update_message)) + .route(web::delete().to(messages::delete_message)), + ) .service(web::resource("/health").route(web::get().to(messages::health_check))) + .service(web::resource("/stats").route(web::get().to(messages::get_stats))) .route("/auth/verify", web::post().to(auth_verify)), ) .route("/ws/client", web::get().to(ws_client::ws_client)) diff --git a/src/core/adapter.rs b/src/core/adapter.rs index e8739ed..7bcd1d2 100644 --- a/src/core/adapter.rs +++ b/src/core/adapter.rs @@ -100,7 +100,18 @@ impl AdapterManager { ) -> Vec>> { self.adapters .iter() - .map(|adapter| adapter.send_message(message)) + .map(|adapter| { + let result = adapter.send_message(message); + if let Err(ref e) = result { + tracing::error!( + adapter = %adapter.name(), + message_id = %message.id, + error = %e, + "Failed to broadcast message to adapter" + ); + } + result + }) .collect() } } diff --git a/src/core/message.rs b/src/core/message.rs index a236522..0b520d9 100644 --- a/src/core/message.rs +++ b/src/core/message.rs @@ -1,10 +1,43 @@ -use crate::models::message::{Message, MessageStatus}; +use crate::models::message::{Message, MessageStatus, MessageType}; use rusqlite::{Connection, Result}; pub struct MessageRepository { conn: Connection, } +fn read_message_row( + row: (String, String, String, String, String, i64, i64, u32), +) -> Option { + let (id, mt, content, recipient, status_str, created_at_secs, updated_at_secs, retry_count) = + row; + let message_type = match mt.as_str() { + "Text" => MessageType::Text, + "Image" => MessageType::Image, + "File" => MessageType::File, + _ => return None, + }; + let status = match status_str.as_str() { + "Pending" => MessageStatus::Pending, + "Sending" => MessageStatus::Sending, + "Sent" => MessageStatus::Sent, + "Failed" => MessageStatus::Failed, + "Canceled" => MessageStatus::Canceled, + _ => return None, + }; + let created_at = std::time::UNIX_EPOCH + std::time::Duration::from_secs(created_at_secs as u64); + let updated_at = std::time::UNIX_EPOCH + std::time::Duration::from_secs(updated_at_secs as u64); + Some(Message { + id, + message_type, + content, + recipient, + status, + created_at, + updated_at, + retry_count, + }) +} + impl MessageRepository { pub fn new(path: &str) -> Result { let conn = Connection::open(path)?; @@ -96,6 +129,7 @@ impl MessageRepository { "Sending" => MessageStatus::Sending, "Sent" => MessageStatus::Sent, "Failed" => MessageStatus::Failed, + "Canceled" => MessageStatus::Canceled, _ => return Err(rusqlite::Error::InvalidQuery), }; @@ -149,6 +183,7 @@ impl MessageRepository { "Sending" => MessageStatus::Sending, "Sent" => MessageStatus::Sent, "Failed" => MessageStatus::Failed, + "Canceled" => MessageStatus::Canceled, _ => continue, }; @@ -195,4 +230,68 @@ impl MessageRepository { )?; Ok(()) } + + pub fn list(&self, status: Option<&str>, offset: usize, limit: usize) -> Result> { + let mut messages = Vec::new(); + if let Some(s) = status { + let mut stmt = self.conn.prepare( + "SELECT id, message_type, content, recipient, status, created_at, updated_at, retry_count FROM messages WHERE status = ?1 ORDER BY created_at DESC LIMIT ?2 OFFSET ?3", + )?; + let rows = stmt.query_map(rusqlite::params![s, limit, offset], |row| { + Ok(( + row.get::<_, String>(0)?, + row.get::<_, String>(1)?, + row.get::<_, String>(2)?, + row.get::<_, String>(3)?, + row.get::<_, String>(4)?, + row.get::<_, i64>(5)?, + row.get::<_, i64>(6)?, + row.get::<_, u32>(7)?, + )) + })?; + for row in rows { + if let Some(msg) = read_message_row(row?) { + messages.push(msg); + } + } + } else { + let mut stmt = self.conn.prepare( + "SELECT id, message_type, content, recipient, status, created_at, updated_at, retry_count FROM messages ORDER BY created_at DESC LIMIT ?1 OFFSET ?2", + )?; + let rows = stmt.query_map(rusqlite::params![limit, offset], |row| { + Ok(( + row.get::<_, String>(0)?, + row.get::<_, String>(1)?, + row.get::<_, String>(2)?, + row.get::<_, String>(3)?, + row.get::<_, String>(4)?, + row.get::<_, i64>(5)?, + row.get::<_, i64>(6)?, + row.get::<_, u32>(7)?, + )) + })?; + for row in rows { + if let Some(msg) = read_message_row(row?) { + messages.push(msg); + } + } + } + Ok(messages) + } + + pub fn count_by_status(&self, status: &str) -> Result { + let count: i64 = self.conn.query_row( + "SELECT COUNT(*) FROM messages WHERE status = ?1", + rusqlite::params![status], + |row| row.get(0), + )?; + Ok(count as usize) + } + + pub fn delete(&self, id: &str) -> Result { + let affected = self + .conn + .execute("DELETE FROM messages WHERE id = ?1", rusqlite::params![id])?; + Ok(affected > 0) + } } diff --git a/src/models/message.rs b/src/models/message.rs index 8fdc615..cc0f0d0 100644 --- a/src/models/message.rs +++ b/src/models/message.rs @@ -14,6 +14,7 @@ pub enum MessageStatus { Sending, Sent, Failed, + Canceled, } #[derive(Debug, Serialize, Deserialize)]