Skip to content
80 changes: 65 additions & 15 deletions src/adapters/onebot/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,45 +43,95 @@ impl Adapter for OneBotAdapter {
fn send_message(&self, message: &Message) -> Result<(), Box<dyn std::error::Error>> {
let segments = internal_message_to_segments(message);

// Determine if group or private based on conversation ID format
let (message_type, target_id) = if message.recipient.starts_with("group_") {
let gid: i64 = message
.recipient
.strip_prefix("group_")
.and_then(|s| s.parse().ok())
.unwrap_or(0);
.ok_or_else(|| {
let err = format!(
"Invalid group ID format in recipient: {}",
message.recipient
);
tracing::error!(
message_id = %message.id,
recipient = %message.recipient,
"{}",
err
);
err
})?;
("group", gid)
} else if message.recipient.starts_with("private_") {
let uid: i64 = message
.recipient
.strip_prefix("private_")
.and_then(|s| s.parse().ok())
.unwrap_or(0);
.ok_or_else(|| {
let err = format!(
"Invalid private ID format in recipient: {}",
message.recipient
);
tracing::error!(
message_id = %message.id,
recipient = %message.recipient,
"{}",
err
);
err
})?;
("private", uid)
} else {
// Try to parse as numeric; default to group
let id: i64 = message.recipient.parse().ok().unwrap_or(0);
let id: i64 = message.recipient.parse().ok().ok_or_else(|| {
let err = format!("Unrecognized recipient format: {}", message.recipient);
tracing::error!(
message_id = %message.id,
recipient = %message.recipient,
"{}",
err
);
err
})?;
("group", id)
};

let action =
protocol::build_send_msg_action(message_type, target_id, &segments, Some(&message.id));

let sender = self
.sender
.lock()
.map_err(|e| format!("OneBot sender mutex poisoned: {}", e))?;
let sender = self.sender.lock().map_err(|e| {
let err = format!("OneBot sender mutex poisoned: {}", e);
tracing::error!(
message_id = %message.id,
adapter = %self.name,
"{}",
err
);
err
})?;

sender
.as_ref()
.ok_or("OneBot sender not initialized")?
.send(action)
.map_err(|e| format!("Failed to send OneBot action: {}", e))?;
let sender_ref = sender.as_ref().ok_or_else(|| {
let err = format!("OneBot sender not initialized for adapter '{}'", self.name);
tracing::error!(message_id = %message.id, adapter = %self.name, "{}", err);
err
})?;

sender_ref.send(action).map_err(|e| {
let err = format!("Failed to send OneBot action: {}", e);
tracing::error!(
message_id = %message.id,
adapter = %self.name,
conversation = %message.recipient,
"{}",
err
);
err
})?;

tracing::info!(
message_id = %message.id,
conversation = %message.recipient,
"OneBot message sent"
adapter = %self.name,
"OneBot message sent successfully"
);
Ok(())
}
Expand Down
5 changes: 4 additions & 1 deletion src/adapters/onebot/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ pub async fn onebot_ws(
let (action_tx, mut action_rx) = mpsc::unbounded_channel::<ActionRequest>();
// Store the sender so the adapter can use it later
{
let mut sender = onebot_sender.lock().unwrap();
let mut sender = onebot_sender.lock().map_err(|e| {
tracing::error!("Failed to acquire OneBot sender lock: {}", e);
actix_web::error::ErrorInternalServerError("Internal server error")
})?;
*sender = Some(action_tx);
}

Expand Down
134 changes: 134 additions & 0 deletions src/api/endpoints/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,137 @@ pub async fn get_message(id: web::Path<String>) -> impl Responder {
pub async fn health_check() -> impl Responder {
HttpResponse::Ok().json(serde_json::json!({"status": "ok"}))
}

#[derive(Debug, Serialize, Deserialize)]
pub struct MessageListQuery {
pub offset: Option<usize>,
pub limit: Option<usize>,
pub status: Option<String>,
}

pub async fn list_messages(query: web::Query<MessageListQuery>) -> impl Responder {
let offset = query.offset.unwrap_or(0);
let limit = query.limit.unwrap_or(50).min(200);
let status = query.status.as_deref();

let result = crate::REPO.with(|repo| {
repo.borrow()
.as_ref()
.map(|r| r.list(status, offset, limit))
});
match result {
Some(Ok(messages)) => {
let items: Vec<MessageResponse> =
messages.into_iter().map(MessageResponse::from).collect();
HttpResponse::Ok().json(serde_json::json!({
"messages": items,
"offset": offset,
"limit": limit,
}))
}
Some(Err(e)) => {
tracing::error!(error = %e, "Failed to list messages");
HttpResponse::InternalServerError().json(serde_json::json!({"error": e.to_string()}))
}
None => {
tracing::error!("Repository not initialized during list call");
HttpResponse::InternalServerError()
.json(serde_json::json!({"error": "Repository not initialized"}))
}
}
}

#[derive(Debug, Serialize, Deserialize)]
pub struct PatchMessageRequest {
pub status: String,
}

pub async fn update_message(
id: web::Path<String>,
body: web::Json<PatchMessageRequest>,
) -> impl Responder {
let new_status = match body.status.as_str() {
"Pending" => crate::models::message::MessageStatus::Pending,
"Sending" => crate::models::message::MessageStatus::Sending,
"Sent" => crate::models::message::MessageStatus::Sent,
"Canceled" => crate::models::message::MessageStatus::Canceled,
"Failed" => crate::models::message::MessageStatus::Failed,
_ => {
return HttpResponse::BadRequest()
.json(serde_json::json!({"error": "Invalid status. Use Pending, Sending, Sent, Failed, or Canceled"}));
}
};

let result = crate::REPO.with(|repo| {
repo.borrow()
.as_ref()
.map(|r| r.update_message_status(&id, &new_status))
});
match result {
Some(Ok(())) => {
let msg_result = crate::REPO.with(|repo| repo.borrow().as_ref().map(|r| r.get(&id)));
match msg_result {
Some(Ok(Some(msg))) => HttpResponse::Ok().json(MessageResponse::from(msg)),
_ => HttpResponse::Ok().json(serde_json::json!({"status": "updated"})),
}
}
Some(Err(e)) => {
tracing::error!(error = %e, message_id = %id.into_inner(), "Failed to update message");
HttpResponse::InternalServerError().json(serde_json::json!({"error": e.to_string()}))
}
None => {
tracing::error!("Repository not initialized during update call");
HttpResponse::InternalServerError()
.json(serde_json::json!({"error": "Repository not initialized"}))
}
}
}

pub async fn delete_message(id: web::Path<String>) -> impl Responder {
let result = crate::REPO.with(|repo| repo.borrow().as_ref().map(|r| r.delete(&id)));
match result {
Some(Ok(true)) => HttpResponse::Ok().json(serde_json::json!({"deleted": true})),
Some(Ok(false)) => {
HttpResponse::NotFound().json(serde_json::json!({"error": "Message not found"}))
}
Some(Err(e)) => {
tracing::error!(error = %e, message_id = %id.into_inner(), "Failed to delete message");
HttpResponse::InternalServerError().json(serde_json::json!({"error": e.to_string()}))
}
None => {
tracing::error!("Repository not initialized during delete call");
HttpResponse::InternalServerError()
.json(serde_json::json!({"error": "Repository not initialized"}))
}
}
}

pub async fn get_stats() -> impl Responder {
let result = crate::REPO.with(|repo| {
let r = repo.borrow();
let repo_ref = r.as_ref()?;
let pending = repo_ref.count_by_status("Pending").unwrap_or(0);
let sending = repo_ref.count_by_status("Sending").unwrap_or(0);
let sent = repo_ref.count_by_status("Sent").unwrap_or(0);
let failed = repo_ref.count_by_status("Failed").unwrap_or(0);
let canceled = repo_ref.count_by_status("Canceled").unwrap_or(0);
Some((pending, sending, sent, failed, canceled))
});
match result {
Some((pending, sending, sent, failed, canceled)) => {
HttpResponse::Ok().json(serde_json::json!({
"pending": pending,
"sending": sending,
"sent": sent,
"failed": failed,
"canceled": canceled,
"total": pending + sending + sent + failed + canceled,
}))
}
None => {
tracing::error!("Repository not initialized during stats call");
HttpResponse::InternalServerError()
.json(serde_json::json!({"error": "Repository not initialized"}))
}
}
}
15 changes: 14 additions & 1 deletion src/api/endpoints/ws_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,20 @@ async fn handle_command(
}

if let Err(e) = adapter_manager.send_to_adapter(&platform, &message) {
tracing::warn!(platform = %platform, error = %e, "No adapter found for platform");
tracing::error!(
platform = %platform,
message_id = %message.id,
error = %e,
"Failed to send message via adapter"
);
crate::REPO.with(|repo| {
if let Some(r) = repo.borrow().as_ref() {
let _ = r.update_message_status(
&message.id,
&crate::models::message::MessageStatus::Failed,
);
}
});
}

let now = message
Expand Down
14 changes: 12 additions & 2 deletions src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,19 @@ pub fn config(cfg: &mut web::ServiceConfig) {

cfg.service(
web::scope("/api")
.service(web::resource("/messages").route(web::post().to(messages::create_message)))
.service(web::resource("/messages/{id}").route(web::get().to(messages::get_message)))
.service(
web::resource("/messages")
.route(web::get().to(messages::list_messages))
.route(web::post().to(messages::create_message)),
)
.service(
web::resource("/messages/{id}")
.route(web::get().to(messages::get_message))
.route(web::patch().to(messages::update_message))
.route(web::delete().to(messages::delete_message)),
)
.service(web::resource("/health").route(web::get().to(messages::health_check)))
.service(web::resource("/stats").route(web::get().to(messages::get_stats)))
.route("/auth/verify", web::post().to(auth_verify)),
)
.route("/ws/client", web::get().to(ws_client::ws_client))
Expand Down
13 changes: 12 additions & 1 deletion src/core/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,18 @@ impl AdapterManager {
) -> Vec<Result<(), Box<dyn std::error::Error>>> {
self.adapters
.iter()
.map(|adapter| adapter.send_message(message))
.map(|adapter| {
let result = adapter.send_message(message);
if let Err(ref e) = result {
tracing::error!(
adapter = %adapter.name(),
message_id = %message.id,
error = %e,
"Failed to broadcast message to adapter"
);
}
result
})
.collect()
}
}
Loading
Loading