diff --git a/crates/aionui-api-types/src/channel.rs b/crates/aionui-api-types/src/channel.rs index 0260533df..1cee49903 100644 --- a/crates/aionui-api-types/src/channel.rs +++ b/crates/aionui-api-types/src/channel.rs @@ -45,6 +45,8 @@ pub struct TestPluginExtraConfig { pub app_id: Option, #[serde(default)] pub app_secret: Option, + #[serde(flatten)] + pub extra: std::collections::HashMap, } // --------------------------------------------------------------------------- diff --git a/crates/aionui-app/Cargo.toml b/crates/aionui-app/Cargo.toml index 1781fef6c..4d722070d 100644 --- a/crates/aionui-app/Cargo.toml +++ b/crates/aionui-app/Cargo.toml @@ -8,11 +8,12 @@ name = "aioncore" path = "src/main.rs" [features] -default = ["telegram", "lark", "dingtalk", "weixin"] +default = ["telegram", "lark", "dingtalk", "weixin", "mattermost"] telegram = ["aionui-channel/telegram"] lark = ["aionui-channel/lark"] dingtalk = ["aionui-channel/dingtalk"] weixin = ["aionui-channel/weixin"] +mattermost = ["aionui-channel/mattermost"] [dependencies] aionui-common.workspace = true diff --git a/crates/aionui-app/tests/channel_e2e.rs b/crates/aionui-app/tests/channel_e2e.rs index 642bf8c0b..537d8fbb0 100644 --- a/crates/aionui-app/tests/channel_e2e.rs +++ b/crates/aionui-app/tests/channel_e2e.rs @@ -28,11 +28,20 @@ async fn get_plugins_empty() { let json = body_json(resp).await; assert!(json["success"].as_bool().unwrap()); let data = json["data"].as_array().unwrap(); - assert_eq!(data.len(), 7); + assert_eq!(data.len(), 8); let types: std::collections::HashSet<_> = data.iter().filter_map(|item| item["type"].as_str()).collect(); assert_eq!( types, - std::collections::HashSet::from(["telegram", "lark", "dingtalk", "slack", "discord", "weixin", "wecom",]) + std::collections::HashSet::from([ + "telegram", + "lark", + "dingtalk", + "slack", + "discord", + "weixin", + "wecom", + "mattermost", + ]) ); assert!(data.iter().all(|item| item["enabled"] == false)); } @@ -548,7 +557,7 @@ async fn enable_disable_plugin_lifecycle() { assert_eq!(resp.status(), StatusCode::OK); let json = body_json(resp).await; let plugins = json["data"].as_array().unwrap(); - assert_eq!(plugins.len(), 7); + assert_eq!(plugins.len(), 8); let telegram = plugins .iter() .find(|plugin| plugin["plugin_id"] == "telegram") @@ -575,7 +584,7 @@ async fn enable_disable_plugin_lifecycle() { let resp = app.oneshot(req).await.unwrap(); let json = body_json(resp).await; let plugins = json["data"].as_array().unwrap(); - assert_eq!(plugins.len(), 7); + assert_eq!(plugins.len(), 8); let telegram = plugins .iter() .find(|plugin| plugin["plugin_id"] == "telegram") diff --git a/crates/aionui-channel/Cargo.toml b/crates/aionui-channel/Cargo.toml index a0de2d46d..7c2be7e8b 100644 --- a/crates/aionui-channel/Cargo.toml +++ b/crates/aionui-channel/Cargo.toml @@ -9,6 +9,7 @@ telegram = ["dep:reqwest"] lark = ["dep:reqwest", "dep:tokio-tungstenite", "dep:futures-util", "dep:prost", "dep:rustls", "dep:rustls-native-certs"] dingtalk = ["dep:reqwest", "dep:tokio-tungstenite", "dep:futures-util", "dep:rustls", "dep:rustls-native-certs"] weixin = ["dep:reqwest", "dep:futures-util", "dep:base64", "dep:uuid"] +mattermost = ["dep:reqwest", "dep:tokio-tungstenite", "dep:futures-util", "dep:rustls", "dep:rustls-native-certs"] [dependencies] aionui-common.workspace = true diff --git a/crates/aionui-channel/src/formatter.rs b/crates/aionui-channel/src/formatter.rs index b1bf039d1..ace631e52 100644 --- a/crates/aionui-channel/src/formatter.rs +++ b/crates/aionui-channel/src/formatter.rs @@ -14,6 +14,7 @@ pub fn format_text_for_platform(text: &str, platform: PluginType) -> String { match platform { PluginType::Telegram => markdown_to_telegram_html(text), PluginType::Lark | PluginType::Dingtalk => html_to_markdown(text), + PluginType::Mattermost => strip_tags_loop(text), PluginType::Weixin => strip_html(text), _ => escape_html(text), } diff --git a/crates/aionui-channel/src/manager.rs b/crates/aionui-channel/src/manager.rs index c673c76c1..616e516ee 100644 --- a/crates/aionui-channel/src/manager.rs +++ b/crates/aionui-channel/src/manager.rs @@ -550,6 +550,7 @@ impl ChannelManager { PluginType::Lark => "Lark Bot".into(), PluginType::Dingtalk => "DingTalk Bot".into(), PluginType::Weixin => "WeChat Bot".into(), + PluginType::Mattermost => "Mattermost".into(), PluginType::Slack => "Slack Bot".into(), PluginType::Discord => "Discord Bot".into(), } @@ -1440,6 +1441,7 @@ mod tests { assert_eq!(mgr.default_plugin_name(PluginType::Lark), "Lark Bot"); assert_eq!(mgr.default_plugin_name(PluginType::Dingtalk), "DingTalk Bot"); assert_eq!(mgr.default_plugin_name(PluginType::Weixin), "WeChat Bot"); + assert_eq!(mgr.default_plugin_name(PluginType::Mattermost), "Mattermost"); assert_eq!(mgr.default_plugin_name(PluginType::Slack), "Slack Bot"); assert_eq!(mgr.default_plugin_name(PluginType::Discord), "Discord Bot"); } diff --git a/crates/aionui-channel/src/message_service.rs b/crates/aionui-channel/src/message_service.rs index f76147875..6ab1cb8ec 100644 --- a/crates/aionui-channel/src/message_service.rs +++ b/crates/aionui-channel/src/message_service.rs @@ -338,7 +338,7 @@ fn platform_to_source(platform: PluginType) -> ConversationSource { PluginType::Dingtalk => ConversationSource::Dingtalk, PluginType::Weixin => ConversationSource::Weixin, // Reserved variants default to Aionui - PluginType::Slack | PluginType::Discord => ConversationSource::Aionui, + PluginType::Mattermost | PluginType::Slack | PluginType::Discord => ConversationSource::Aionui, } } @@ -377,6 +377,7 @@ fn channel_conversation_name( PluginType::Lark => "lark", PluginType::Dingtalk => "ding", PluginType::Weixin => "wx", + PluginType::Mattermost => "mm", PluginType::Slack => "slack", PluginType::Discord => "discord", }; @@ -430,6 +431,7 @@ mod tests { #[test] fn platform_to_source_reserved_defaults_to_aionui() { + assert_eq!(platform_to_source(PluginType::Mattermost), ConversationSource::Aionui); assert_eq!(platform_to_source(PluginType::Slack), ConversationSource::Aionui); assert_eq!(platform_to_source(PluginType::Discord), ConversationSource::Aionui); } diff --git a/crates/aionui-channel/src/plugins/mattermost/api.rs b/crates/aionui-channel/src/plugins/mattermost/api.rs new file mode 100644 index 000000000..c8d112ae8 --- /dev/null +++ b/crates/aionui-channel/src/plugins/mattermost/api.rs @@ -0,0 +1,111 @@ +use reqwest::{Client, StatusCode}; + +use crate::error::ChannelError; + +use super::types::{CreatePostRequest, MattermostUser, PatchPostRequest}; + +#[derive(Clone)] +pub(crate) struct MattermostApi { + client: Client, + server_url: String, + access_token: String, +} + +impl MattermostApi { + pub fn new(client: Client, server_url: String, access_token: String) -> Self { + Self { + client, + server_url, + access_token, + } + } + + pub async fn get_me(&self) -> Result { + let url = format!("{}/api/v4/users/me", self.server_url); + let response = self + .client + .get(url) + .bearer_auth(&self.access_token) + .send() + .await + .map_err(|e| ChannelError::ConnectionFailed(format!("Mattermost user request failed: {e}")))?; + + let status = response.status(); + if !status.is_success() { + return Err(mattermost_status_error("Mattermost user request failed", status)); + } + + response + .json() + .await + .map_err(|e| ChannelError::PlatformApi(format!("Mattermost user response parse failed: {e}"))) + } + + pub async fn create_post(&self, req: &CreatePostRequest) -> Result { + let url = format!("{}/api/v4/posts", self.server_url); + let response = self + .client + .post(url) + .bearer_auth(&self.access_token) + .json(req) + .send() + .await + .map_err(|e| ChannelError::MessageSendFailed(format!("Mattermost post request failed: {e}")))?; + + let status = response.status(); + if !status.is_success() { + return Err(ChannelError::MessageSendFailed(format!( + "Mattermost post request failed with status {status}" + ))); + } + + let value: serde_json::Value = response + .json() + .await + .map_err(|e| ChannelError::MessageSendFailed(format!("Mattermost post response parse failed: {e}")))?; + value["id"] + .as_str() + .map(ToOwned::to_owned) + .ok_or_else(|| ChannelError::MessageSendFailed("Mattermost post response missing id".into())) + } + + pub async fn patch_post(&self, post_id: &str, req: &PatchPostRequest) -> Result<(), ChannelError> { + let url = format!("{}/api/v4/posts/{post_id}/patch", self.server_url); + let response = self + .client + .put(url) + .bearer_auth(&self.access_token) + .json(req) + .send() + .await + .map_err(|e| ChannelError::MessageSendFailed(format!("Mattermost post patch request failed: {e}")))?; + + let status = response.status(); + if !status.is_success() { + return Err(ChannelError::MessageSendFailed(format!( + "Mattermost post patch request failed with status {status}" + ))); + } + + Ok(()) + } + + pub fn websocket_url(&self) -> String { + let ws_base = if let Some(rest) = self.server_url.strip_prefix("https://") { + format!("wss://{rest}") + } else if let Some(rest) = self.server_url.strip_prefix("http://") { + format!("ws://{rest}") + } else { + self.server_url.clone() + }; + format!("{ws_base}/api/v4/websocket") + } + + pub fn access_token(&self) -> &str { + &self.access_token + } +} + +fn mattermost_status_error(context: &str, status: StatusCode) -> ChannelError { + ChannelError::ConnectionFailed(format!("{context} with status {status}")) +} diff --git a/crates/aionui-channel/src/plugins/mattermost/mod.rs b/crates/aionui-channel/src/plugins/mattermost/mod.rs new file mode 100644 index 000000000..aea6b5c93 --- /dev/null +++ b/crates/aionui-channel/src/plugins/mattermost/mod.rs @@ -0,0 +1,5 @@ +mod api; +mod plugin; +mod types; + +pub use plugin::MattermostPlugin; diff --git a/crates/aionui-channel/src/plugins/mattermost/plugin.rs b/crates/aionui-channel/src/plugins/mattermost/plugin.rs new file mode 100644 index 000000000..b1c0f7442 --- /dev/null +++ b/crates/aionui-channel/src/plugins/mattermost/plugin.rs @@ -0,0 +1,586 @@ +use std::sync::Arc; +use std::time::Duration; + +use futures_util::{SinkExt, StreamExt}; +use reqwest::Client; +use tokio::sync::{mpsc, watch}; +use tokio::task::JoinHandle; +use tokio_tungstenite::tungstenite::Message; +use tracing::{debug, info, warn}; + +use crate::error::ChannelError; +use crate::plugin::{ChannelPlugin, PluginCallbacks}; +use crate::types::{ + BotInfo, PluginConfig, PluginStatus, PluginType, UnifiedIncomingMessage, UnifiedOutgoingMessage, UnifiedUser, +}; + +use super::api::MattermostApi; +use super::types::{ + CreatePostRequest, MattermostConfig, MattermostPost, MattermostUser, PatchPostRequest, parse_posted_event, + post_to_content, +}; + +const MAX_RECONNECT_DELAY: Duration = Duration::from_secs(30); +const WS_CONNECT_TIMEOUT: Duration = Duration::from_secs(30); + +pub struct MattermostPlugin { + status: PluginStatus, + bot_info: Option, + last_error: Option, + api: Option>, + config: Option, + ws_handle: Option>, + shutdown_tx: Option>, +} + +impl Default for MattermostPlugin { + fn default() -> Self { + Self { + status: PluginStatus::Created, + bot_info: None, + last_error: None, + api: None, + config: None, + ws_handle: None, + shutdown_tx: None, + } + } +} + +impl MattermostPlugin { + pub fn new() -> Self { + Self::default() + } +} + +#[async_trait::async_trait] +impl ChannelPlugin for MattermostPlugin { + async fn initialize(&mut self, config: PluginConfig, callbacks: PluginCallbacks) -> Result<(), ChannelError> { + self.status = PluginStatus::Initializing; + + let parsed_config = MattermostConfig::from_plugin_config(&config).inspect_err(|e| { + self.status = PluginStatus::Error; + self.last_error = Some(e.to_string()); + })?; + + let client = Client::builder() + .timeout(Duration::from_secs(30)) + .build() + .map_err(|e| { + self.status = PluginStatus::Error; + self.last_error = Some(format!("HTTP client init failed: {e}")); + ChannelError::ConnectionFailed(format!("HTTP client init failed: {e}")) + })?; + let api = Arc::new(MattermostApi::new( + client, + parsed_config.server_url.clone(), + parsed_config.access_token.clone(), + )); + + let me = api.get_me().await.map_err(|e| { + self.status = PluginStatus::Error; + self.last_error = Some(format!("Credential validation failed: {e}")); + e + })?; + + self.bot_info = Some(BotInfo { + id: me.id.clone(), + username: me.username.clone(), + display_name: me.display_name(), + }); + + info!( + user_id = %me.id, + username = me.username.as_deref().unwrap_or(""), + "Mattermost identity loaded" + ); + + let (shutdown_tx, shutdown_rx) = watch::channel(false); + self.shutdown_tx = Some(shutdown_tx); + self.ws_handle = Some(tokio::spawn(ws_loop( + Arc::clone(&api), + parsed_config.clone(), + me, + callbacks.message_tx, + shutdown_rx, + ))); + + self.api = Some(api); + self.config = Some(parsed_config); + self.status = PluginStatus::Ready; + info!("Mattermost plugin initialized"); + Ok(()) + } + + async fn start(&mut self) -> Result<(), ChannelError> { + self.status = PluginStatus::Starting; + self.status = PluginStatus::Running; + info!("Mattermost plugin started"); + Ok(()) + } + + async fn stop(&mut self) -> Result<(), ChannelError> { + self.status = PluginStatus::Stopping; + + if let Some(tx) = self.shutdown_tx.take() { + let _ = tx.send(true); + } + + if let Some(handle) = self.ws_handle.take() { + let _ = tokio::time::timeout(Duration::from_secs(5), handle).await; + } + + self.api = None; + self.config = None; + self.status = PluginStatus::Stopped; + info!("Mattermost plugin stopped"); + Ok(()) + } + + async fn send_message(&self, chat_id: &str, message: UnifiedOutgoingMessage) -> Result { + let api = self + .api + .as_ref() + .ok_or_else(|| ChannelError::PlatformApi("Plugin not initialized".into()))?; + let config = self + .config + .as_ref() + .ok_or_else(|| ChannelError::PlatformApi("Plugin not initialized".into()))?; + + let text = message.text.unwrap_or_default(); + let root_id = config + .reply_in_thread + .then(|| message.reply_to_message_id.filter(|id| !id.is_empty())) + .flatten(); + let req = CreatePostRequest { + channel_id: chat_id.to_owned(), + message: text, + root_id, + }; + + api.create_post(&req).await + } + + async fn edit_message( + &self, + _chat_id: &str, + message_id: &str, + message: UnifiedOutgoingMessage, + ) -> Result<(), ChannelError> { + let api = self + .api + .as_ref() + .ok_or_else(|| ChannelError::PlatformApi("Plugin not initialized".into()))?; + let req = PatchPostRequest { + message: message.text.unwrap_or_default(), + }; + api.patch_post(message_id, &req).await + } + + fn active_user_count(&self) -> usize { + 0 + } + + fn bot_info(&self) -> Option<&BotInfo> { + self.bot_info.as_ref() + } + + fn plugin_type(&self) -> PluginType { + PluginType::Mattermost + } + + fn status(&self) -> PluginStatus { + self.status + } + + fn last_error(&self) -> Option<&str> { + self.last_error.as_deref() + } +} + +async fn ws_loop( + api: Arc, + config: MattermostConfig, + me: MattermostUser, + message_tx: mpsc::Sender, + mut shutdown_rx: watch::Receiver, +) { + let mut attempts = 0u32; + + loop { + if *shutdown_rx.borrow() { + break; + } + + match connect_once(&api, &config, &me, &message_tx, &mut shutdown_rx).await { + Ok(()) => { + attempts = 0; + } + Err(e) => { + attempts = attempts.saturating_add(1); + let delay = reconnect_delay(attempts); + warn!(error = %e, delay_ms = delay.as_millis(), "Mattermost WebSocket reconnect scheduled"); + tokio::select! { + _ = tokio::time::sleep(delay) => {} + _ = shutdown_rx.changed() => { + if *shutdown_rx.borrow() { + break; + } + } + } + } + } + } + + debug!("Mattermost WebSocket loop stopped"); +} + +async fn connect_once( + api: &MattermostApi, + config: &MattermostConfig, + me: &MattermostUser, + message_tx: &mpsc::Sender, + shutdown_rx: &mut watch::Receiver, +) -> Result<(), ChannelError> { + use tokio_tungstenite::connect_async_tls_with_config; + + let ws_url = api.websocket_url(); + info!(url = %ws_url, "Mattermost WebSocket connecting"); + + let connector = build_ws_tls_connector()?; + let (mut ws, _) = tokio::time::timeout( + WS_CONNECT_TIMEOUT, + connect_async_tls_with_config(&ws_url, None, false, Some(connector)), + ) + .await + .map_err(|_| { + ChannelError::ConnectionFailed(format!( + "Mattermost WebSocket connect timed out after {}s", + WS_CONNECT_TIMEOUT.as_secs() + )) + })? + .map_err(|e| ChannelError::ConnectionFailed(format!("Mattermost WebSocket connect failed: {e}")))?; + + let auth = serde_json::json!({ + "seq": 1, + "action": "authentication_challenge", + "data": { + "token": api.access_token(), + }, + }); + tokio::time::timeout(WS_CONNECT_TIMEOUT, ws.send(Message::Text(auth.to_string().into()))) + .await + .map_err(|_| { + ChannelError::ConnectionFailed(format!( + "Mattermost WebSocket auth send timed out after {}s", + WS_CONNECT_TIMEOUT.as_secs() + )) + })? + .map_err(|e| ChannelError::ConnectionFailed(format!("Mattermost WebSocket auth send failed: {e}")))?; + info!("Mattermost WebSocket connected"); + + loop { + tokio::select! { + _ = shutdown_rx.changed() => { + if *shutdown_rx.borrow() { + let _ = ws.close(None).await; + return Ok(()); + } + } + next = ws.next() => { + let Some(message) = next else { + return Err(ChannelError::ConnectionFailed("Mattermost WebSocket closed".into())); + }; + let message = message.map_err(|e| ChannelError::ConnectionFailed(format!("Mattermost WebSocket read failed: {e}")))?; + match message { + Message::Text(text) => handle_ws_text(&text, config, me, message_tx).await, + Message::Ping(payload) => { + let _ = ws.send(Message::Pong(payload)).await; + } + Message::Close(_) => { + return Err(ChannelError::ConnectionFailed("Mattermost WebSocket closed".into())); + } + _ => {} + } + } + } + } +} + +/// Build a TLS connector for WebSocket connections. +/// +/// WebSocket upgrade requires HTTP/1.1. Some Mattermost deployments sit behind +/// reverse proxies that negotiate h2 by ALPN unless the client pins HTTP/1.1. +fn build_ws_tls_connector() -> Result { + use std::sync::Arc; + use tokio_tungstenite::Connector; + + let certs = rustls_native_certs::load_native_certs(); + let mut root_store = rustls::RootCertStore::empty(); + root_store.add_parsable_certificates(certs.certs); + + let provider = rustls::crypto::CryptoProvider::get_default() + .cloned() + .unwrap_or_else(|| Arc::new(rustls::crypto::ring::default_provider())); + + let mut config = rustls::ClientConfig::builder_with_provider(provider) + .with_safe_default_protocol_versions() + .map_err(|e| ChannelError::ConnectionFailed(format!("TLS config error: {e}")))? + .with_root_certificates(root_store) + .with_no_client_auth(); + config.alpn_protocols = vec![b"http/1.1".to_vec()]; + + Ok(Connector::Rustls(Arc::new(config))) +} + +async fn handle_ws_text( + text: &str, + config: &MattermostConfig, + me: &MattermostUser, + message_tx: &mpsc::Sender, +) { + let value: serde_json::Value = match serde_json::from_str(text) { + Ok(value) => value, + Err(e) => { + warn!(error = %e, "Mattermost WebSocket message parse failed"); + return; + } + }; + + let Some(post) = parse_posted_event(&value) else { + return; + }; + info!( + post_id = %post.id, + channel_id = %post.channel_id, + user_id = %post.user_id, + "Mattermost post event received" + ); + if let Some(msg) = post_to_unified(&post, config, me) + && let Err(e) = message_tx.send(msg).await + { + warn!(error = %e, "Mattermost incoming message dispatch failed"); + } +} + +pub(crate) fn post_to_unified( + post: &MattermostPost, + config: &MattermostConfig, + me: &MattermostUser, +) -> Option { + if post.id.is_empty() || post.channel_id.is_empty() || post.user_id.is_empty() || post.message.trim().is_empty() { + return None; + } + if !config.channel_allowed(&post.channel_id) { + return None; + } + if config.ignore_self_messages && post.user_id == me.id { + return None; + } + if post.r#type == "system_join_channel" || post.r#type == "system_leave_channel" { + return None; + } + + Some(UnifiedIncomingMessage { + id: post.id.clone(), + platform: PluginType::Mattermost, + chat_id: post.channel_id.clone(), + user: UnifiedUser { + id: post.user_id.clone(), + username: None, + display_name: post.user_id.clone(), + avatar_url: None, + }, + content: post_to_content(post), + timestamp: post.create_at / 1000, + reply_to_message_id: post.root_id.clone().filter(|id| !id.is_empty()), + action: None, + raw: Some(serde_json::json!({ + "post_id": post.id, + "channel_id": post.channel_id, + "root_id": post.root_id, + "props": post.props, + })), + }) +} + +fn reconnect_delay(attempts: u32) -> Duration { + let secs = 2u64.saturating_pow(attempts.min(4)).min(MAX_RECONNECT_DELAY.as_secs()); + Duration::from_secs(secs) +} + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + + use serde_json::Value; + + use super::*; + + fn config() -> MattermostConfig { + MattermostConfig { + server_url: "https://mm.example".into(), + access_token: "token".into(), + allowed_channel_ids: HashSet::new(), + reply_in_thread: true, + ignore_self_messages: true, + } + } + + fn me() -> MattermostUser { + MattermostUser { + id: "me".into(), + username: Some("bot".into()), + nickname: None, + } + } + + fn post(user_id: &str, channel_id: &str) -> MattermostPost { + MattermostPost { + id: "p1".into(), + channel_id: channel_id.into(), + user_id: user_id.into(), + message: "hello".into(), + create_at: 1_000, + root_id: Some("root1".into()), + r#type: String::new(), + props: Some(Value::Object(serde_json::Map::new())), + } + } + + #[test] + fn post_maps_to_unified_message() { + let msg = post_to_unified(&post("u1", "c1"), &config(), &me()).unwrap(); + assert_eq!(msg.platform, PluginType::Mattermost); + assert_eq!(msg.chat_id, "c1"); + assert_eq!(msg.reply_to_message_id.as_deref(), Some("root1")); + assert_eq!(msg.content.text, "hello"); + } + + #[test] + fn post_ignores_self_message() { + assert!(post_to_unified(&post("me", "c1"), &config(), &me()).is_none()); + } + + #[test] + fn post_ignores_disallowed_channel() { + let mut cfg = config(); + cfg.allowed_channel_ids = HashSet::from(["allowed".into()]); + assert!(post_to_unified(&post("u1", "blocked"), &cfg, &me()).is_none()); + } + + #[test] + fn plugin_initial_state() { + let plugin = MattermostPlugin::new(); + assert_eq!(plugin.status(), PluginStatus::Created); + assert_eq!(plugin.plugin_type(), PluginType::Mattermost); + } + + #[test] + fn create_post_payload_omits_empty_root() { + let req = CreatePostRequest { + channel_id: "c1".into(), + message: "hello".into(), + root_id: None, + }; + let value = serde_json::to_value(req).unwrap(); + assert_eq!(value["channel_id"], "c1"); + assert!(value.get("root_id").is_none()); + } + + #[test] + fn post_raw_excludes_credentials() { + let msg = post_to_unified(&post("u1", "c1"), &config(), &me()).unwrap(); + let raw = msg.raw.unwrap(); + assert!(raw.get("access_token").is_none()); + assert!(raw.get("token").is_none()); + } + + #[test] + fn user_display_name_prefers_nickname_then_username() { + let user = MattermostUser { + id: "u1".into(), + username: Some("username".into()), + nickname: Some("nickname".into()), + }; + assert_eq!(user.display_name(), "nickname"); + } + + #[test] + fn mattermost_plugin_is_send_sync() { + fn assert_send_sync() {} + assert_send_sync::(); + } + + #[test] + fn reconnect_delay_is_bounded() { + assert!(reconnect_delay(100) <= MAX_RECONNECT_DELAY); + } + + #[test] + fn empty_post_is_ignored() { + let mut p = post("u1", "c1"); + p.message = " ".into(); + assert!(post_to_unified(&p, &config(), &me()).is_none()); + } + + #[test] + fn system_post_is_ignored() { + let mut p = post("u1", "c1"); + p.r#type = "system_join_channel".into(); + assert!(post_to_unified(&p, &config(), &me()).is_none()); + } + + #[test] + fn no_allowed_channels_allows_all() { + let cfg = config(); + assert!(cfg.channel_allowed("any")); + } + + #[test] + fn allowed_channels_are_checked() { + let mut cfg = config(); + cfg.allowed_channel_ids = HashSet::from(["c1".into()]); + assert!(cfg.channel_allowed("c1")); + assert!(!cfg.channel_allowed("c2")); + } + + #[test] + fn display_name_falls_back_to_id() { + let user = MattermostUser { + id: "u1".into(), + username: None, + nickname: None, + }; + assert_eq!(user.display_name(), "u1"); + } + + #[test] + fn create_post_payload_includes_root() { + let req = CreatePostRequest { + channel_id: "c1".into(), + message: "hello".into(), + root_id: Some("p1".into()), + }; + let value = serde_json::to_value(req).unwrap(); + assert_eq!(value["root_id"], "p1"); + } + + #[test] + fn patch_post_payload_updates_message_only() { + let req = PatchPostRequest { + message: "updated".into(), + }; + let value = serde_json::to_value(req).unwrap(); + assert_eq!(value["message"], "updated"); + assert!(value.get("channel_id").is_none()); + assert!(value.get("root_id").is_none()); + } + + #[test] + fn mattermost_plugin_as_trait_object() { + let plugin = MattermostPlugin::new(); + let boxed: Box = Box::new(plugin); + assert_eq!(boxed.plugin_type(), PluginType::Mattermost); + } +} diff --git a/crates/aionui-channel/src/plugins/mattermost/types.rs b/crates/aionui-channel/src/plugins/mattermost/types.rs new file mode 100644 index 000000000..3282ce09a --- /dev/null +++ b/crates/aionui-channel/src/plugins/mattermost/types.rs @@ -0,0 +1,248 @@ +use std::collections::HashSet; + +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +use crate::error::ChannelError; +use crate::types::{PluginConfig, UnifiedMessageContent}; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) struct MattermostConfig { + pub server_url: String, + pub access_token: String, + pub allowed_channel_ids: HashSet, + pub reply_in_thread: bool, + pub ignore_self_messages: bool, +} + +impl MattermostConfig { + pub fn from_plugin_config(config: &PluginConfig) -> Result { + let access_token = first_non_empty([ + config.credentials.extra.get("accessToken"), + config.credentials.extra.get("access_token"), + ]) + .or_else(|| non_empty_string(config.credentials.token.as_deref())) + .or_else(|| non_empty_string(config.credentials.bot_token.as_deref())) + .ok_or_else(|| ChannelError::InvalidConfig("Missing Mattermost access token".into()))?; + + let config_extra = config.config.as_ref().map(|c| &c.extra); + let server_url = config_extra + .and_then(|extra| first_non_empty([extra.get("serverUrl"), extra.get("server_url")])) + .ok_or_else(|| ChannelError::InvalidConfig("Missing Mattermost server URL".into()))?; + + let allowed_channel_ids = config_extra + .and_then(|extra| first_non_empty([extra.get("allowedChannelIds"), extra.get("allowed_channel_ids")])) + .map(|value| { + value + .split(',') + .map(str::trim) + .filter(|s| !s.is_empty()) + .map(ToOwned::to_owned) + .collect() + }) + .unwrap_or_default(); + + let reply_in_thread = config_extra + .and_then(|extra| first_bool([extra.get("replyInThread"), extra.get("reply_in_thread")])) + .unwrap_or(true); + let ignore_self_messages = config_extra + .and_then(|extra| first_bool([extra.get("ignoreSelfMessages"), extra.get("ignore_self_messages")])) + .unwrap_or(true); + + Ok(Self { + server_url: normalize_server_url(&server_url)?, + access_token, + allowed_channel_ids, + reply_in_thread, + ignore_self_messages, + }) + } + + pub fn channel_allowed(&self, channel_id: &str) -> bool { + self.allowed_channel_ids.is_empty() || self.allowed_channel_ids.contains(channel_id) + } +} + +#[derive(Debug, Clone, Deserialize)] +pub(crate) struct MattermostUser { + pub id: String, + pub username: Option, + pub nickname: Option, +} + +impl MattermostUser { + pub fn display_name(&self) -> String { + self.nickname + .as_deref() + .filter(|s| !s.is_empty()) + .or(self.username.as_deref().filter(|s| !s.is_empty())) + .map(ToOwned::to_owned) + .unwrap_or_else(|| self.id.clone()) + } +} + +#[derive(Debug, Clone, Serialize)] +pub(crate) struct CreatePostRequest { + pub channel_id: String, + pub message: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub root_id: Option, +} + +#[derive(Debug, Clone, Serialize)] +pub(crate) struct PatchPostRequest { + pub message: String, +} + +#[derive(Debug, Clone, Deserialize)] +pub(crate) struct MattermostPost { + pub id: String, + pub channel_id: String, + pub user_id: String, + #[serde(default)] + pub message: String, + #[serde(default)] + pub create_at: i64, + #[serde(default)] + pub root_id: Option, + #[serde(rename = "type", default)] + pub r#type: String, + #[serde(default)] + pub props: Option, +} + +#[derive(Debug, Clone, Deserialize)] +pub(crate) struct MattermostWsEvent { + pub event: String, + #[serde(default)] + pub data: Value, +} + +pub(crate) fn parse_posted_event(value: &Value) -> Option { + let event: MattermostWsEvent = serde_json::from_value(value.clone()).ok()?; + if event.event != "posted" { + return None; + } + let post_value = event.data.get("post")?; + match post_value { + Value::String(s) => serde_json::from_str(s).ok(), + Value::Object(_) => serde_json::from_value(post_value.clone()).ok(), + _ => None, + } +} + +pub(crate) fn post_to_content(post: &MattermostPost) -> UnifiedMessageContent { + UnifiedMessageContent { + content_type: crate::types::MessageContentType::Text, + text: post.message.clone(), + attachments: None, + } +} + +fn normalize_server_url(raw: &str) -> Result { + let trimmed = raw.trim().trim_end_matches('/'); + if trimmed.is_empty() { + return Err(ChannelError::InvalidConfig("Missing Mattermost server URL".into())); + } + if !trimmed.starts_with("http://") && !trimmed.starts_with("https://") { + return Err(ChannelError::InvalidConfig( + "Mattermost server URL must start with http:// or https://".into(), + )); + } + Ok(trimmed.to_owned()) +} + +fn first_non_empty(values: [Option<&Value>; N]) -> Option { + values.into_iter().find_map(|value| match value? { + Value::String(s) if !s.trim().is_empty() => Some(s.trim().to_owned()), + _ => None, + }) +} + +fn first_bool(values: [Option<&Value>; N]) -> Option { + values.into_iter().find_map(|value| match value? { + Value::Bool(v) => Some(*v), + Value::String(s) => match s.trim().to_ascii_lowercase().as_str() { + "true" | "1" | "yes" => Some(true), + "false" | "0" | "no" => Some(false), + _ => None, + }, + _ => None, + }) +} + +fn non_empty_string(value: Option<&str>) -> Option { + value.map(str::trim).filter(|s| !s.is_empty()).map(ToOwned::to_owned) +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use crate::types::{PluginConfigOptions, PluginCredentials}; + + use super::*; + + fn config(credentials: HashMap, options: HashMap) -> PluginConfig { + PluginConfig { + credentials: PluginCredentials { + token: None, + app_id: None, + app_secret: None, + encrypt_key: None, + verification_token: None, + client_id: None, + client_secret: None, + account_id: None, + bot_token: None, + extra: credentials, + }, + config: Some(PluginConfigOptions { + mode: None, + webhook_url: None, + rate_limit: None, + require_mention: None, + extra: options, + }), + } + } + + #[test] + fn config_accepts_camel_case_fields() { + let parsed = MattermostConfig::from_plugin_config(&config( + HashMap::from([("accessToken".into(), Value::String("tok".into()))]), + HashMap::from([ + ("serverUrl".into(), Value::String("https://mm.example/".into())), + ("allowedChannelIds".into(), Value::String("c1, c2".into())), + ("replyInThread".into(), Value::Bool(false)), + ]), + )) + .unwrap(); + + assert_eq!(parsed.server_url, "https://mm.example"); + assert_eq!(parsed.access_token, "tok"); + assert!(parsed.channel_allowed("c1")); + assert!(parsed.channel_allowed("c2")); + assert!(!parsed.channel_allowed("c3")); + assert!(!parsed.reply_in_thread); + } + + #[test] + fn config_requires_token_and_server_url() { + let err = MattermostConfig::from_plugin_config(&config(HashMap::new(), HashMap::new())).unwrap_err(); + assert!(err.to_string().contains("access token")); + } + + #[test] + fn posted_event_parses_string_post() { + let raw = serde_json::json!({ + "event": "posted", + "data": { + "post": "{\"id\":\"p1\",\"channel_id\":\"c1\",\"user_id\":\"u1\",\"message\":\"hello\",\"create_at\":1000}" + } + }); + let post = parse_posted_event(&raw).unwrap(); + assert_eq!(post.id, "p1"); + assert_eq!(post.message, "hello"); + } +} diff --git a/crates/aionui-channel/src/plugins/mod.rs b/crates/aionui-channel/src/plugins/mod.rs index d8fadc656..c9def5881 100644 --- a/crates/aionui-channel/src/plugins/mod.rs +++ b/crates/aionui-channel/src/plugins/mod.rs @@ -10,6 +10,9 @@ pub mod dingtalk; #[cfg(feature = "weixin")] pub mod weixin; +#[cfg(feature = "mattermost")] +pub mod mattermost; + use crate::plugin::ChannelPlugin; use crate::types::PluginType; @@ -30,6 +33,9 @@ pub fn create_plugin(plugin_type: PluginType) -> Option> #[cfg(feature = "weixin")] PluginType::Weixin => Some(Box::new(weixin::WeixinPlugin::new())), + #[cfg(feature = "mattermost")] + PluginType::Mattermost => Some(Box::new(mattermost::MattermostPlugin::new())), + #[allow(unreachable_patterns)] _ => None, } diff --git a/crates/aionui-channel/src/routes.rs b/crates/aionui-channel/src/routes.rs index ebde507d3..6598eb1a9 100644 --- a/crates/aionui-channel/src/routes.rs +++ b/crates/aionui-channel/src/routes.rs @@ -131,10 +131,11 @@ async fn get_plugin_status( .map(|plugin| (plugin.id.clone(), plugin)) .collect(); - let builtin_names: [(&str, &str); 7] = [ + let builtin_names: [(&str, &str); 8] = [ ("telegram", "Telegram"), ("lark", "Lark"), ("dingtalk", "DingTalk"), + ("mattermost", "Mattermost"), ("slack", "Slack"), ("discord", "Discord"), ("weixin", "WeChat"), @@ -651,6 +652,27 @@ fn build_test_config(req: &TestPluginRequest) -> PluginConfig { credentials.account_id = extra.app_id.clone(); } } + "mattermost" => { + credentials + .extra + .insert("accessToken".to_string(), serde_json::Value::String(req.token.clone())); + let mut extra_map = HashMap::new(); + if let Some(ref extra) = req.extra_config { + for (key, value) in &extra.extra { + extra_map.insert(key.clone(), value.clone()); + } + } + return PluginConfig { + credentials, + config: Some(PluginConfigOptions { + mode: None, + webhook_url: None, + rate_limit: None, + require_mention: None, + extra: extra_map, + }), + }; + } _ => { // Default: use token field (Telegram) credentials.token = Some(req.token.clone()); @@ -690,6 +712,9 @@ fn build_extension_test_config( if let Some(app_secret) = &extra.app_secret { map.insert("appSecret".to_string(), serde_json::Value::String(app_secret.clone())); } + for (key, value) in &extra.extra { + map.insert(key.clone(), value.clone()); + } } build_extension_config(plugin, &serde_json::Value::Object(map)) } @@ -887,6 +912,7 @@ mod tests { extra_config: Some(TestPluginExtraConfig { app_id: Some("cli_abc".into()), app_secret: Some("secret".into()), + extra: HashMap::new(), }), }; let config = build_test_config(&req); @@ -903,6 +929,7 @@ mod tests { extra_config: Some(TestPluginExtraConfig { app_id: None, app_secret: Some("client_secret_456".into()), + extra: HashMap::new(), }), }; let config = build_test_config(&req); @@ -918,10 +945,40 @@ mod tests { extra_config: Some(TestPluginExtraConfig { app_id: Some("account_abc".into()), app_secret: None, + extra: HashMap::new(), }), }; let config = build_test_config(&req); assert_eq!(config.credentials.bot_token.as_deref(), Some("bot_token_xyz")); assert_eq!(config.credentials.account_id.as_deref(), Some("account_abc")); } + + #[test] + fn build_test_config_mattermost() { + let req = TestPluginRequest { + plugin_id: "mattermost".into(), + token: "token".into(), + extra_config: Some(TestPluginExtraConfig { + app_id: None, + app_secret: None, + extra: HashMap::from([( + "serverUrl".into(), + serde_json::Value::String("https://mm.example".into()), + )]), + }), + }; + let config = build_test_config(&req); + assert_eq!( + config.credentials.extra.get("accessToken").and_then(|v| v.as_str()), + Some("token") + ); + assert_eq!( + config + .config + .as_ref() + .and_then(|c| c.extra.get("serverUrl")) + .and_then(|v| v.as_str()), + Some("https://mm.example") + ); + } } diff --git a/crates/aionui-channel/src/types.rs b/crates/aionui-channel/src/types.rs index 8c3d3e22c..7a2c81cac 100644 --- a/crates/aionui-channel/src/types.rs +++ b/crates/aionui-channel/src/types.rs @@ -18,6 +18,7 @@ pub enum PluginType { Lark, Dingtalk, Weixin, + Mattermost, /// Reserved variant for future Slack integration. Slack, /// Reserved variant for future Discord integration. @@ -31,6 +32,7 @@ impl fmt::Display for PluginType { Self::Lark => write!(f, "lark"), Self::Dingtalk => write!(f, "dingtalk"), Self::Weixin => write!(f, "weixin"), + Self::Mattermost => write!(f, "mattermost"), Self::Slack => write!(f, "slack"), Self::Discord => write!(f, "discord"), } @@ -45,6 +47,7 @@ impl PluginType { "lark" => Some(Self::Lark), "dingtalk" => Some(Self::Dingtalk), "weixin" => Some(Self::Weixin), + "mattermost" => Some(Self::Mattermost), "slack" => Some(Self::Slack), "discord" => Some(Self::Discord), _ => None, @@ -490,6 +493,7 @@ mod tests { (PluginType::Lark, "\"lark\""), (PluginType::Dingtalk, "\"dingtalk\""), (PluginType::Weixin, "\"weixin\""), + (PluginType::Mattermost, "\"mattermost\""), (PluginType::Slack, "\"slack\""), (PluginType::Discord, "\"discord\""), ]; @@ -507,6 +511,7 @@ mod tests { assert_eq!(PluginType::Lark.to_string(), "lark"); assert_eq!(PluginType::Dingtalk.to_string(), "dingtalk"); assert_eq!(PluginType::Weixin.to_string(), "weixin"); + assert_eq!(PluginType::Mattermost.to_string(), "mattermost"); assert_eq!(PluginType::Slack.to_string(), "slack"); assert_eq!(PluginType::Discord.to_string(), "discord"); } @@ -515,6 +520,7 @@ mod tests { fn plugin_type_from_str_opt() { assert_eq!(PluginType::from_str_opt("telegram"), Some(PluginType::Telegram)); assert_eq!(PluginType::from_str_opt("lark"), Some(PluginType::Lark)); + assert_eq!(PluginType::from_str_opt("mattermost"), Some(PluginType::Mattermost)); assert_eq!(PluginType::from_str_opt("unknown"), None); } diff --git a/docs/mattermost-channel-design.md b/docs/mattermost-channel-design.md new file mode 100644 index 000000000..d9319e26e --- /dev/null +++ b/docs/mattermost-channel-design.md @@ -0,0 +1,181 @@ +# Mattermost Channel Design + +## Goal + +Add Mattermost as a built-in channel plugin named `mattermost`. + +The plugin follows the existing `aionui-channel` architecture used by Telegram, +Lark, DingTalk, and Weixin. It is not an extension runtime plugin and uses only +the `mattermost` platform name. + +## Non-Goals + +- Do not implement a generic JavaScript extension channel runtime. +- Do not add a separate bot-specific plugin type. +- Do not distinguish bot tokens from user personal access tokens in the channel + manager API. Mattermost accepts both as bearer tokens for the REST and + WebSocket APIs when the token has sufficient permissions. +- Do not add new database tables or migrations. + +## Existing Channel Pattern + +Current built-in channels use the same lifecycle: + +1. `PluginType` contains a fixed platform variant. +2. `plugins/mod.rs` creates a platform-specific Rust plugin from `PluginType`. +3. `ChannelManager::enable_plugin` stores encrypted config, initializes the + plugin, starts it, and marks it as running. +4. The plugin receives platform-specific events, converts them to + `UnifiedIncomingMessage`, and sends them through `PluginCallbacks.message_tx`. +5. `ChannelOrchestrator` dispatches the message to pairing, actions, sessions, + and the selected agent/model configuration for that platform. +6. Replies are sent through `ChannelManager` using the plugin id string. + +Mattermost should use the same lifecycle. + +## Plugin Type and Feature + +Add: + +- `PluginType::Mattermost` +- `mattermost` feature in `crates/aionui-channel/Cargo.toml` +- `crates/aionui-channel/src/plugins/mattermost/` +- Mattermost entry in the built-in plugin status list exposed by + `GET /api/channel/plugins` +- Mattermost support in channel settings sync validation +- Mattermost support in conversation naming and source mapping + +The plugin id and serialized platform value are `mattermost`. + +## Configuration + +The Mattermost plugin uses the shared `PluginConfig` structure: + +Credentials: + +- `accessToken` or `access_token`: required bearer token + +Options: + +- `serverUrl` or `server_url`: required base URL, for example + `https://mattermost.example.com` +- `allowedChannelIds` or `allowed_channel_ids`: optional comma-separated channel + id allow-list +- `replyInThread` or `reply_in_thread`: optional boolean, default `true` +- `ignoreSelfMessages` or `ignore_self_messages`: optional boolean, default + `true` + +Sensitive values must remain inside the encrypted plugin config and must not be +included in logs or API status responses. + +## Mattermost API Use + +The plugin uses Mattermost API v4. + +Startup and test: + +- `GET /api/v4/users/me` validates the token and loads the current user id. + +Incoming messages: + +- Connect to `/api/v4/websocket`. +- Send an authentication message with the access token after the socket opens. +- Handle `posted` events. +- Parse the event `data.post` JSON payload. +- Ignore deleted, empty, or unsupported posts. +- Ignore self messages when `ignoreSelfMessages` is enabled. +- Apply `allowedChannelIds` when configured. + +Outgoing messages: + +- `POST /api/v4/posts` +- `channel_id` is the incoming chat id. +- `message` is the outgoing text. +- If `replyInThread` is enabled, set `root_id` to the incoming root id or post id + when replying from a channel session. + +Mattermost accepts markdown-like plain text in post messages. The channel +formatter should not HTML-escape normal responses for Mattermost. + +## Message Mapping + +Incoming Mattermost posts map to `UnifiedIncomingMessage`: + +- `id`: Mattermost post id +- `platform`: `PluginType::Mattermost` +- `chat_id`: Mattermost channel id +- `user.id`: Mattermost user id +- `user.display_name`: username when present, otherwise user id +- `content.type`: `text` +- `content.text`: post message +- `timestamp`: `create_at / 1000` +- `reply_to_message_id`: root id when present +- `raw`: sanitized event/post metadata without credentials + +Mattermost should map to `ConversationSource::Aionui` unless a dedicated source +variant exists in `aionui_common`. This matches reserved Slack/Discord behavior. +Conversation names should use the short prefix `mm`. + +## UI Contract + +AionUi should expose Mattermost as the built-in channel id `mattermost`. + +The existing extension contribution can be removed or hidden once the built-in +channel is available. Agent/model settings should persist under: + +- `assistant.mattermost.agent` +- `assistant.mattermost.defaultModel` + +## Observability + +Add low-volume structured logs for lifecycle and hard-to-observe failures: + +- plugin initialized +- REST identity loaded +- WebSocket connected/authenticated +- WebSocket reconnect scheduled +- plugin stopped +- invalid/malformed inbound post handled safely + +Logs must not include tokens, raw request headers, raw credentials, or full +message bodies. + +## Tests + +Add tests at the same level as existing channel plugins: + +- `PluginType::Mattermost` parse/display/serde behavior +- factory creates Mattermost plugin behind the `mattermost` feature +- config parsing accepts camelCase and snake_case keys +- missing token/server URL fails validation +- incoming `posted` event maps to `UnifiedIncomingMessage` +- self messages and disallowed channels are ignored +- outgoing post request payload includes `channel_id`, `message`, and optional + `root_id` + +Run affected checks first: + +```bash +cargo fmt --all -- --check +cargo clippy -p aionui-channel --features mattermost -- -D warnings +cargo test -p aionui-channel --features mattermost +``` + +Before pushing, use `just push`. + +## Consistency Checklist + +| Area | Existing channels | Mattermost design | +| --- | --- | --- | +| Plugin identity | Fixed `PluginType` variant | `PluginType::Mattermost` | +| Build gating | Cargo feature per platform | `mattermost` feature | +| Implementation language | Rust plugin module | Rust plugin module | +| Config storage | `PluginConfig`, encrypted credentials | Same | +| Manager lifecycle | `enable_plugin` initializes and starts | Same | +| Startup restore | `restore_plugins` by `PluginType` | Same | +| Inbound flow | Platform event to `UnifiedIncomingMessage` | Same | +| Outbound flow | `send_message` / `edit_message` | Same | +| Agent/model keys | `assistant.{platform}.*` | `assistant.mattermost.*` | +| Settings sync | `PluginType::from_str_opt(platform)` | `mattermost` parses successfully | +| Formatter | Platform-specific reply formatting | Plain text / markdown-like output | +| Logs | lifecycle and warnings without secrets | Same |