From 1b53582d4d4f21d581966524307a3a74bfd2f11f Mon Sep 17 00:00:00 2001 From: kvfasil Date: Mon, 27 Apr 2026 11:33:44 -0400 Subject: [PATCH 01/13] RDKEMW-17528: Fix memory leak from missing session/event cleanup on WebSocket disconnect *Sessions were registered by connection_id but cleanup used session_id,causing remove_session() to be a no-op. *Event listeners and empty map entries accumulated indefinitely across disconnects. Changes: - app_events.rs: Add cleanup_by_connection_id(), prune empty entries in remove_session() - firebolt_gateway.rs: Clean up by both session_id and connection_id on UnregisterSession - thunder_event_processor.rs: Add cleanup_by_app_id() for extension-side event map cleanup - firebolt_ws.rs: Elevate disconnect log from debug! to info! with full context --- core/main/src/firebolt/firebolt_gateway.rs | 14 ++++++++ core/main/src/firebolt/firebolt_ws.rs | 5 ++- core/main/src/service/apps/app_events.rs | 33 +++++++++++++++++++ .../src/events/thunder_event_processor.rs | 29 ++++++++++++++++ 4 files changed, 80 insertions(+), 1 deletion(-) diff --git a/core/main/src/firebolt/firebolt_gateway.rs b/core/main/src/firebolt/firebolt_gateway.rs index 8de591038..45a644a14 100644 --- a/core/main/src/firebolt/firebolt_gateway.rs +++ b/core/main/src/firebolt/firebolt_gateway.rs @@ -139,7 +139,14 @@ impl FireboltGateway { .add_session(session_id, session); } UnregisterSession { session_id, cid } => { + info!( + "Cleanup: session_id={} cid={} - removing event listeners, broker subs, session", + session_id, cid + ); + // Clean event listeners by session_id AppEvents::remove_session(&self.state.platform_state, session_id.clone()); + // Also clean event listeners by connection_id (cid) in case session_id != cid + AppEvents::cleanup_by_connection_id(&self.state.platform_state, &cid); ProviderBroker::unregister_session(&self.state.platform_state, cid.clone()) .await; self.state @@ -147,6 +154,13 @@ impl FireboltGateway { .endpoint_state .cleanup_for_app(&cid) .await; + // Also cleanup broker subscriptions by session_id (subscription_map uses session_id as key) + self.state + .platform_state + .endpoint_state + .cleanup_for_app(&session_id) + .await; + // Clear session from session_map by connection_id (the key used during registration) self.state.platform_state.session_state.clear_session(&cid); } HandleRpc { request } => self.handle(request, None).await, diff --git a/core/main/src/firebolt/firebolt_ws.rs b/core/main/src/firebolt/firebolt_ws.rs index 121814665..ed2aa4605 100644 --- a/core/main/src/firebolt/firebolt_ws.rs +++ b/core/main/src/firebolt/firebolt_ws.rs @@ -438,7 +438,10 @@ impl FireboltWs { } } } - debug!("SESSION DEBUG Unregistering {}", connection_id); + info!( + "Session disconnect: unregistering connection_id={} session_id={} app_id={}", + connection_id, identity.session_id, identity.app_id + ); let msg = FireboltGatewayCommand::UnregisterSession { session_id: identity.session_id.clone(), cid: connection_id, diff --git a/core/main/src/service/apps/app_events.rs b/core/main/src/service/apps/app_events.rs index fca4033ff..5cb910b5f 100644 --- a/core/main/src/service/apps/app_events.rs +++ b/core/main/src/service/apps/app_events.rs @@ -440,8 +440,41 @@ impl AppEvents { if let Some(event_listener) = ctx_map.get_mut(&context) { AppEvents::remove_session_from_events(event_listener, &session_id); } + // Remove empty context entries to free heap + if ctx_map.get(&context).map_or(false, |v| v.is_empty()) { + ctx_map.remove(&context); + } } } + // Remove empty event entries to prevent map bloat + if listeners.get(&event_name).map_or(false, |m| m.is_empty()) { + listeners.remove(&event_name); + } + } + } + + /// Cleanup all event listeners matching a given connection_id. + /// This ensures cleanup works even when session_id != connection_id. + pub fn cleanup_by_connection_id(state: &PlatformState, connection_id: &str) { + let mut listeners = state.app_events_state.listeners.write().unwrap(); + let all_events = listeners.keys().cloned().collect::>(); + for event_name in all_events { + if let Some(ctx_map) = listeners.get_mut(&event_name) { + let all_contexts = ctx_map.keys().cloned().collect::>>(); + for context in all_contexts { + if let Some(event_listener) = ctx_map.get_mut(&context) { + event_listener.retain(|l| { + l.call_ctx.cid.as_deref() != Some(connection_id) + }); + } + if ctx_map.get(&context).map_or(false, |v| v.is_empty()) { + ctx_map.remove(&context); + } + } + } + if listeners.get(&event_name).map_or(false, |m| m.is_empty()) { + listeners.remove(&event_name); + } } } } diff --git a/device/thunder_ripple_sdk/src/events/thunder_event_processor.rs b/device/thunder_ripple_sdk/src/events/thunder_event_processor.rs index 59ebdee5d..61895b83a 100644 --- a/device/thunder_ripple_sdk/src/events/thunder_event_processor.rs +++ b/device/thunder_ripple_sdk/src/events/thunder_event_processor.rs @@ -316,6 +316,35 @@ impl ThunderEventProcessor { let mut back_off_map = self.back_off.write().unwrap(); back_off_map.remove(event_name) } + + /// Remove all event listeners for a given app_id (session cleanup on disconnect). + /// Returns the list of event names that no longer have any listeners and were removed. + pub fn cleanup_by_app_id(&self, app_id: &str) -> Vec { + let mut event_map = self.event_map.write().unwrap(); + let mut removed_events = Vec::new(); + let event_names: Vec = event_map.keys().cloned().collect(); + for event_name in event_names { + if let Some(handler) = event_map.get_mut(&event_name) { + handler.listeners.retain(|id| id != app_id); + if handler.listeners.is_empty() { + event_map.remove(&event_name); + removed_events.push(event_name.clone()); + } + } + } + // Clean last_event entries for removed events to free heap + if !removed_events.is_empty() { + let mut last_event_map = self.last_event.write().unwrap(); + for event_name in &removed_events { + last_event_map.remove(event_name); + } + } + debug!( + "cleanup_by_app_id: app_id={}, removed_events={:?}", + app_id, removed_events + ); + removed_events + } } impl Default for ThunderEventProcessor { From 9370e5dfab3e156715a60024dce88bb5d7913187 Mon Sep 17 00:00:00 2001 From: kvfasil Date: Mon, 27 Apr 2026 12:39:53 -0400 Subject: [PATCH 02/13] -Fix memory leaks in request_map, extension_request_map, and ThunderEventProcessor Subscription entries. -In request_map and extension_request_map were never removed on disconnect. -ThunderEventProcessor.cleanup_by_app_id() existed but was never called. -Added DeviceEvent::Cleanup to wire ThunderEventProcessor cleanup across the gateway-extension boundary. -Added cleanup_request_maps_for_app() to prune broker maps on disconnect. --- core/main/src/broker/endpoint_broker.rs | 39 +++++++++++++++++++ core/main/src/firebolt/firebolt_gateway.rs | 19 +++++++++ core/sdk/src/api/device/device_events.rs | 5 +++ .../src/events/thunder_event_processor.rs | 4 ++ .../src/processors/thunder_events.rs | 14 +++++++ 5 files changed, 81 insertions(+) diff --git a/core/main/src/broker/endpoint_broker.rs b/core/main/src/broker/endpoint_broker.rs index b7e8bbb54..90b76f2af 100644 --- a/core/main/src/broker/endpoint_broker.rs +++ b/core/main/src/broker/endpoint_broker.rs @@ -1104,6 +1104,45 @@ impl EndpointBrokerState { */ let _ = cleaner.cleanup_session(app_id).await; } + + // Clean up subscription entries from request_map and extension_request_map + // that belong to this app. These are never removed on disconnect otherwise. + self.cleanup_request_maps_for_app(app_id); + } + + /// Remove subscription/event entries from request_map and extension_request_map + /// for the given app_id (which may be a session_id or connection_id). + /// Without this, subscription entries in request_map (guarded by is_subscription()) + /// and event entries in extension_request_map persist forever. + fn cleanup_request_maps_for_app(&self, app_id: &str) { + let removed_ids: Vec = { + let mut request_map = self.request_map.write().unwrap(); + let ids_to_remove: Vec = request_map + .iter() + .filter(|(_, req)| { + req.rpc.ctx.app_id == app_id + || req.rpc.ctx.session_id == app_id + || req.rpc.ctx.cid.as_deref() == Some(app_id) + }) + .map(|(id, _)| *id) + .collect(); + for id in &ids_to_remove { + request_map.remove(id); + } + ids_to_remove + }; + + if !removed_ids.is_empty() { + let mut extn_map = self.extension_request_map.write().unwrap(); + for id in &removed_ids { + extn_map.remove(id); + } + info!( + "cleanup_request_maps_for_app: removed {} request_map and extension_request_map entries for {}", + removed_ids.len(), + app_id + ); + } } /// Send a request through the broker and wait for response with a oneshot channel and custom timeout pub async fn send_with_response_timeout( diff --git a/core/main/src/firebolt/firebolt_gateway.rs b/core/main/src/firebolt/firebolt_gateway.rs index 45a644a14..a27c11463 100644 --- a/core/main/src/firebolt/firebolt_gateway.rs +++ b/core/main/src/firebolt/firebolt_gateway.rs @@ -18,6 +18,7 @@ use jsonrpsee::{core::server::rpc_module::Methods, types::TwoPointZero}; use ripple_sdk::{ api::{ + device::device_events::{DeviceEvent, DeviceEventCallback, DeviceEventRequest}, firebolt::{ fb_capabilities::JSON_RPC_STANDARD_ERROR_INVALID_PARAMS, fb_openrpc::FireboltOpenRpcMethod, @@ -162,6 +163,24 @@ impl FireboltGateway { .await; // Clear session from session_map by connection_id (the key used during registration) self.state.platform_state.session_state.clear_session(&cid); + // Send cleanup to ThunderEventProcessor (extension side) to remove + // event_map and last_event entries for this app + let cleanup_request = DeviceEventRequest { + event: DeviceEvent::Cleanup, + subscribe: false, + callback_type: DeviceEventCallback::FireboltAppEvent(cid.clone()), + }; + if let Err(e) = self + .state + .platform_state + .get_client() + .send_extn_request_transient(cleanup_request) + { + warn!( + "Failed to send ThunderEventProcessor cleanup for {}: {:?}", + cid, e + ); + } } HandleRpc { request } => self.handle(request, None).await, HandleRpcForExtn { msg } => { diff --git a/core/sdk/src/api/device/device_events.rs b/core/sdk/src/api/device/device_events.rs index 63e72bf22..8c53a0e97 100644 --- a/core/sdk/src/api/device/device_events.rs +++ b/core/sdk/src/api/device/device_events.rs @@ -39,6 +39,9 @@ pub const VOICE_GUIDANCE_SPEED_CHANGED: &str = "voiceguidance.onSpeedChanged"; pub enum DeviceEvent { InputChanged, AudioChanged, + /// Signals that an app has disconnected and its event subscriptions should be cleaned up. + /// The callback_type.get_id() carries the app_id to clean. + Cleanup, } impl FromStr for DeviceEvent { @@ -48,6 +51,7 @@ impl FromStr for DeviceEvent { match s { "device.onHdcpChanged" => Ok(Self::InputChanged), "device.onAudioChanged" => Ok(Self::AudioChanged), + "device.cleanup" => Ok(Self::Cleanup), _ => Err(()), } } @@ -91,6 +95,7 @@ impl ExtnPayloadProvider for DeviceEventRequest { match self.event { DeviceEvent::InputChanged => RippleContract::DeviceEvents(EventAdjective::Input), DeviceEvent::AudioChanged => RippleContract::DeviceEvents(EventAdjective::Audio), + DeviceEvent::Cleanup => RippleContract::DeviceEvents(EventAdjective::Input), } } diff --git a/device/thunder_ripple_sdk/src/events/thunder_event_processor.rs b/device/thunder_ripple_sdk/src/events/thunder_event_processor.rs index 61895b83a..e674d46e6 100644 --- a/device/thunder_ripple_sdk/src/events/thunder_event_processor.rs +++ b/device/thunder_ripple_sdk/src/events/thunder_event_processor.rs @@ -93,6 +93,10 @@ impl ThunderEventMessage { value.clone(), ))) } + DeviceEvent::Cleanup => { + // Cleanup is not a real Thunder event, skip + return None; + } } } else { debug!( diff --git a/device/thunder_ripple_sdk/src/processors/thunder_events.rs b/device/thunder_ripple_sdk/src/processors/thunder_events.rs index 43a62329b..2ff2873e6 100644 --- a/device/thunder_ripple_sdk/src/processors/thunder_events.rs +++ b/device/thunder_ripple_sdk/src/processors/thunder_events.rs @@ -17,6 +17,7 @@ use ripple_sdk::{ api::session::EventAdjective, framework::ripple_contract::RippleContract, + log::info, utils::error::RippleError, }; @@ -106,6 +107,19 @@ impl ExtnRequestProcessor for ThunderOpenEventsProcessor { id.clone(), HDCPEventHandler::provide(id, callback_type), )), + DeviceEvent::Cleanup => { + // Clean up all event subscriptions for this app + let removed_events = state.event_processor.cleanup_by_app_id(&id); + if !removed_events.is_empty() { + info!( + "ThunderEventProcessor cleanup: removed {} event(s) for app {}: {:?}", + removed_events.len(), + id, + removed_events + ); + } + return Self::ack(state.get_client(), msg).await.is_ok(); + } } { v.await; Self::ack(state.get_client(), msg).await.is_ok() From 44ef13d1453701ba8c938f4c79fc6222dc5b589f Mon Sep 17 00:00:00 2001 From: kvfasil Date: Mon, 27 Apr 2026 12:45:00 -0400 Subject: [PATCH 03/13] -fixed format check. --- core/main/src/service/apps/app_events.rs | 4 +--- device/thunder_ripple_sdk/src/processors/thunder_events.rs | 3 +-- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/core/main/src/service/apps/app_events.rs b/core/main/src/service/apps/app_events.rs index 5cb910b5f..c3079eed8 100644 --- a/core/main/src/service/apps/app_events.rs +++ b/core/main/src/service/apps/app_events.rs @@ -463,9 +463,7 @@ impl AppEvents { let all_contexts = ctx_map.keys().cloned().collect::>>(); for context in all_contexts { if let Some(event_listener) = ctx_map.get_mut(&context) { - event_listener.retain(|l| { - l.call_ctx.cid.as_deref() != Some(connection_id) - }); + event_listener.retain(|l| l.call_ctx.cid.as_deref() != Some(connection_id)); } if ctx_map.get(&context).map_or(false, |v| v.is_empty()) { ctx_map.remove(&context); diff --git a/device/thunder_ripple_sdk/src/processors/thunder_events.rs b/device/thunder_ripple_sdk/src/processors/thunder_events.rs index 2ff2873e6..0f79b4382 100644 --- a/device/thunder_ripple_sdk/src/processors/thunder_events.rs +++ b/device/thunder_ripple_sdk/src/processors/thunder_events.rs @@ -16,8 +16,7 @@ // use ripple_sdk::{ - api::session::EventAdjective, framework::ripple_contract::RippleContract, - log::info, + api::session::EventAdjective, framework::ripple_contract::RippleContract, log::info, utils::error::RippleError, }; From e1ad9565b54b8862c12ea8a1dc84256926418729 Mon Sep 17 00:00:00 2001 From: kvfasil Date: Mon, 27 Apr 2026 16:19:36 -0400 Subject: [PATCH 04/13] -Added unit testcases. --- core/main/src/broker/endpoint_broker.rs | 108 +++++++++++++++ core/main/src/firebolt/firebolt_gateway.rs | 41 ++++-- core/main/src/service/apps/app_events.rs | 123 ++++++++++++++++++ core/sdk/src/api/device/device_events.rs | 28 ++++ .../src/events/thunder_event_processor.rs | 102 +++++++++++++++ 5 files changed, 389 insertions(+), 13 deletions(-) diff --git a/core/main/src/broker/endpoint_broker.rs b/core/main/src/broker/endpoint_broker.rs index 90b76f2af..9e20e37af 100644 --- a/core/main/src/broker/endpoint_broker.rs +++ b/core/main/src/broker/endpoint_broker.rs @@ -2526,6 +2526,114 @@ mod endpoint_broker_tests { // // assert!(state.get_request(2).is_ok()); // // assert!(state.get_request(1).is_ok()); // } + + #[cfg(test)] + mod cleanup_request_maps { + use super::*; + use crate::broker::rules::rules_engine::{Rule, RuleTransform}; + use ripple_sdk::api::gateway::rpc_gateway_api::RpcRequest; + use ripple_sdk::Mockable; + use serial_test::serial; + + fn make_state() -> EndpointBrokerState { + let (tx, _) = channel(2); + let client = RippleClient::new(ChannelsState::new()); + EndpointBrokerState::new( + OpMetricState::default(), + tx, + RuleEngine { + rules: RuleSet::default(), + functions: HashMap::default(), + }, + client, + ) + } + + fn default_rule() -> Rule { + Rule { + alias: "test.method".to_owned(), + transform: RuleTransform::default(), + endpoint: None, + filter: None, + event_handler: None, + sources: None, + } + } + + #[serial] + #[tokio::test] + async fn test_cleanup_by_app_id() { + let state = make_state(); + let mut req = RpcRequest::mock(); + req.ctx.app_id = "epg".to_string(); + state.update_request(&req, &default_rule(), None, None, vec![]); + + assert_eq!(state.request_map.read().unwrap().len(), 1); + state.cleanup_request_maps_for_app("epg"); + assert!(state.request_map.read().unwrap().is_empty()); + } + + #[serial] + #[tokio::test] + async fn test_cleanup_by_session_id() { + let state = make_state(); + let mut req = RpcRequest::mock(); + req.ctx.session_id = "sess-123".to_string(); + req.ctx.app_id = "other".to_string(); + state.update_request(&req, &default_rule(), None, None, vec![]); + + state.cleanup_request_maps_for_app("sess-123"); + assert!(state.request_map.read().unwrap().is_empty()); + } + + #[serial] + #[tokio::test] + async fn test_cleanup_by_cid() { + let state = make_state(); + let mut req = RpcRequest::mock(); + req.ctx.cid = Some("conn-xyz".to_string()); + req.ctx.app_id = "other".to_string(); + state.update_request(&req, &default_rule(), None, None, vec![]); + + state.cleanup_request_maps_for_app("conn-xyz"); + assert!(state.request_map.read().unwrap().is_empty()); + } + + #[serial] + #[tokio::test] + async fn test_cleanup_no_match_is_noop() { + let state = make_state(); + let req = RpcRequest::mock(); + state.update_request(&req, &default_rule(), None, None, vec![]); + + state.cleanup_request_maps_for_app("nonexistent"); + assert_eq!(state.request_map.read().unwrap().len(), 1); + } + + #[serial] + #[tokio::test] + async fn test_cleanup_also_removes_extension_request_map() { + let state = make_state(); + let mut req = RpcRequest::mock(); + req.ctx.app_id = "epg".to_string(); + state.update_request(&req, &default_rule(), None, None, vec![]); + let id = { + let map = state.request_map.read().unwrap(); + *map.keys().next().unwrap() + }; + { + let mut extn_map = state.extension_request_map.write().unwrap(); + extn_map.insert( + id, + ripple_sdk::extn::extn_client_message::ExtnMessage::default(), + ); + } + + state.cleanup_request_maps_for_app("epg"); + assert!(state.request_map.read().unwrap().is_empty()); + assert!(state.extension_request_map.read().unwrap().is_empty()); + } + } } #[tokio::test] diff --git a/core/main/src/firebolt/firebolt_gateway.rs b/core/main/src/firebolt/firebolt_gateway.rs index a27c11463..5bcae3d6d 100644 --- a/core/main/src/firebolt/firebolt_gateway.rs +++ b/core/main/src/firebolt/firebolt_gateway.rs @@ -161,24 +161,39 @@ impl FireboltGateway { .endpoint_state .cleanup_for_app(&session_id) .await; + // Resolve app_id from session state BEFORE clearing the session, + // because ThunderEventProcessor stores listeners by app_id (e.g. "epg"), + // not by cid (a UUID). + let app_id_for_cleanup = self + .state + .platform_state + .session_state + .get_app_id(cid.clone()); // Clear session from session_map by connection_id (the key used during registration) self.state.platform_state.session_state.clear_session(&cid); // Send cleanup to ThunderEventProcessor (extension side) to remove // event_map and last_event entries for this app - let cleanup_request = DeviceEventRequest { - event: DeviceEvent::Cleanup, - subscribe: false, - callback_type: DeviceEventCallback::FireboltAppEvent(cid.clone()), - }; - if let Err(e) = self - .state - .platform_state - .get_client() - .send_extn_request_transient(cleanup_request) - { + if let Some(app_id) = app_id_for_cleanup { + let cleanup_request = DeviceEventRequest { + event: DeviceEvent::Cleanup, + subscribe: false, + callback_type: DeviceEventCallback::FireboltAppEvent(app_id.clone()), + }; + if let Err(e) = self + .state + .platform_state + .get_client() + .send_extn_request_transient(cleanup_request) + { + warn!( + "Failed to send ThunderEventProcessor cleanup for app_id={} (cid={}): {:?}", + app_id, cid, e + ); + } + } else { warn!( - "Failed to send ThunderEventProcessor cleanup for {}: {:?}", - cid, e + "Could not resolve app_id for cid={}, skipping ThunderEventProcessor cleanup", + cid ); } } diff --git a/core/main/src/service/apps/app_events.rs b/core/main/src/service/apps/app_events.rs index c3079eed8..2d501c468 100644 --- a/core/main/src/service/apps/app_events.rs +++ b/core/main/src/service/apps/app_events.rs @@ -480,6 +480,7 @@ impl AppEvents { pub mod tests { use crate::state::session_state::Session; use ripple_sdk::tokio; + use ripple_sdk::uuid::Uuid; use ripple_tdk::utils::test_utils::Mockable; use super::*; @@ -513,4 +514,126 @@ pub mod tests { AppEvents::get_listeners(&platform_state.app_events_state, "test_event", None); assert!(listeners.len() == 1); } + + /// Helper: add a listener with a specific CallContext to a given event/context. + fn add_listener_direct( + state: &PlatformState, + event: &str, + ctx: CallContext, + context: Option, + ) { + let mut listeners = state.app_events_state.listeners.write().unwrap(); + let vec = AppEvents::get_or_create_listener_vec( + &mut listeners, + event.to_string(), + context, + ); + vec.push(EventListener { + call_ctx: ctx, + session_tx: None, + decorator: None, + }); + } + + fn make_ctx(session_id: &str, app_id: &str, cid: Option<&str>) -> CallContext { + CallContext::new( + session_id.to_string(), + Uuid::new_v4().to_string(), + app_id.to_string(), + 1, + ripple_sdk::api::gateway::rpc_gateway_api::ApiProtocol::Extn, + "method".to_string(), + cid.map(|s| s.to_string()), + false, + ) + } + + #[tokio::test] + async fn test_remove_session_prunes_empty_entries() { + let state = PlatformState::mock(); + let session_id = "sess-1".to_string(); + let ctx = make_ctx(&session_id, "app1", None); + + // Register session so clear_session doesn't panic + let session = Session::new("app1".to_string(), None); + state.session_state.add_session(session_id.clone(), session); + + add_listener_direct(&state, "evt1", ctx.clone(), None); + + // Verify listener exists + assert_eq!(state.app_events_state.listeners.read().unwrap().len(), 1); + + // remove_session should remove the listener AND prune empty maps + AppEvents::remove_session(&state, session_id); + assert!(state.app_events_state.listeners.read().unwrap().is_empty()); + } + + #[tokio::test] + async fn test_remove_session_keeps_other_sessions() { + let state = PlatformState::mock(); + let ctx1 = make_ctx("sess-1", "app1", None); + let ctx2 = make_ctx("sess-2", "app2", None); + + let s1 = Session::new("app1".to_string(), None); + state.session_state.add_session("sess-1".to_string(), s1); + let s2 = Session::new("app2".to_string(), None); + state.session_state.add_session("sess-2".to_string(), s2); + + add_listener_direct(&state, "evt1", ctx1.clone(), None); + add_listener_direct(&state, "evt1", ctx2.clone(), None); + + AppEvents::remove_session(&state, "sess-1".to_string()); + + // evt1 should still exist with 1 listener + let listeners = state.app_events_state.listeners.read().unwrap(); + assert_eq!(listeners.len(), 1); + assert_eq!(listeners.get("evt1").unwrap().get(&None).unwrap().len(), 1); + } + + #[tokio::test] + async fn test_cleanup_by_connection_id_removes_matching() { + let state = PlatformState::mock(); + let cid = "conn-abc"; + let ctx = make_ctx("sess-1", "app1", Some(cid)); + + add_listener_direct(&state, "evt1", ctx.clone(), None); + add_listener_direct(&state, "evt2", ctx.clone(), Some("ctx-a".to_string())); + + assert_eq!(state.app_events_state.listeners.read().unwrap().len(), 2); + + AppEvents::cleanup_by_connection_id(&state, cid); + + // Both events should be fully pruned + assert!(state.app_events_state.listeners.read().unwrap().is_empty()); + } + + #[tokio::test] + async fn test_cleanup_by_connection_id_keeps_other_cids() { + let state = PlatformState::mock(); + let ctx1 = make_ctx("s1", "app1", Some("conn-1")); + let ctx2 = make_ctx("s2", "app2", Some("conn-2")); + + add_listener_direct(&state, "evt1", ctx1, None); + add_listener_direct(&state, "evt1", ctx2, None); + + AppEvents::cleanup_by_connection_id(&state, "conn-1"); + + let listeners = state.app_events_state.listeners.read().unwrap(); + assert_eq!(listeners.get("evt1").unwrap().get(&None).unwrap().len(), 1); + assert_eq!( + listeners.get("evt1").unwrap().get(&None).unwrap()[0].call_ctx.cid.as_deref(), + Some("conn-2") + ); + } + + #[tokio::test] + async fn test_cleanup_by_connection_id_no_match_is_noop() { + let state = PlatformState::mock(); + let ctx = make_ctx("s1", "app1", Some("conn-1")); + add_listener_direct(&state, "evt1", ctx, None); + + AppEvents::cleanup_by_connection_id(&state, "nonexistent"); + + assert_eq!(state.app_events_state.listeners.read().unwrap().len(), 1); + } } diff --git a/core/sdk/src/api/device/device_events.rs b/core/sdk/src/api/device/device_events.rs index 8c53a0e97..52eb57e02 100644 --- a/core/sdk/src/api/device/device_events.rs +++ b/core/sdk/src/api/device/device_events.rs @@ -112,12 +112,40 @@ mod tests { #[rstest(input, expected, case("device.onHdcpChanged", Ok(DeviceEvent::InputChanged)), + case("device.cleanup", Ok(DeviceEvent::Cleanup)), case("invalid_event", Err(())), )] fn test_from_str(input: &str, expected: Result) { assert_eq!(DeviceEvent::from_str(input), expected); } + #[test] + fn test_cleanup_event_request_payload_roundtrip() { + let request = DeviceEventRequest { + event: DeviceEvent::Cleanup, + subscribe: false, + callback_type: DeviceEventCallback::FireboltAppEvent("epg".to_string()), + }; + let payload = request.get_extn_payload(); + let recovered = DeviceEventRequest::get_from_payload(payload).unwrap(); + assert_eq!(recovered.event, DeviceEvent::Cleanup); + assert_eq!(recovered.callback_type.get_id(), "epg"); + } + + #[test] + fn test_cleanup_event_contract() { + let request = DeviceEventRequest { + event: DeviceEvent::Cleanup, + subscribe: true, + callback_type: DeviceEventCallback::FireboltAppEvent("app1".to_string()), + }; + // Cleanup routes through the same contract as Input events + assert_eq!( + request.get_contract(), + RippleContract::DeviceEvents(EventAdjective::Input) + ); + } + #[rstest] #[case( DeviceEventCallback::FireboltAppEvent("app_event".to_string()), diff --git a/device/thunder_ripple_sdk/src/events/thunder_event_processor.rs b/device/thunder_ripple_sdk/src/events/thunder_event_processor.rs index e674d46e6..635a4e709 100644 --- a/device/thunder_ripple_sdk/src/events/thunder_event_processor.rs +++ b/device/thunder_ripple_sdk/src/events/thunder_event_processor.rs @@ -356,3 +356,105 @@ impl Default for ThunderEventProcessor { Self::new() } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::client::device_operator::DeviceSubscribeRequest; + + fn make_handler(listeners: Vec<&str>) -> ThunderEventHandler { + ThunderEventHandler { + request: DeviceSubscribeRequest { + module: "TestModule".to_string(), + event_name: "testEvent".to_string(), + params: None, + sub_id: None, + }, + handle: |_, _, _| {}, + is_valid: |_| true, + listeners: listeners.into_iter().map(|s| s.to_string()).collect(), + id: "test_id".to_string(), + callback_type: DeviceEventCallback::FireboltAppEvent("test".to_string()), + } + } + + #[test] + fn test_cleanup_by_app_id_removes_sole_listener() { + let processor = ThunderEventProcessor::new(); + { + let mut map = processor.event_map.write().unwrap(); + map.insert("event.a".to_string(), make_handler(vec!["epg"])); + } + // Also add a last_event entry + { + let mut last = processor.last_event.write().unwrap(); + last.insert("event.a".to_string(), serde_json::json!({"foo": 1})); + } + + let removed = processor.cleanup_by_app_id("epg"); + + assert_eq!(removed, vec!["event.a".to_string()]); + assert!(processor.event_map.read().unwrap().is_empty()); + assert!(processor.last_event.read().unwrap().is_empty()); + } + + #[test] + fn test_cleanup_by_app_id_keeps_other_listeners() { + let processor = ThunderEventProcessor::new(); + { + let mut map = processor.event_map.write().unwrap(); + map.insert("event.a".to_string(), make_handler(vec!["epg", "settings"])); + } + + let removed = processor.cleanup_by_app_id("epg"); + + assert!(removed.is_empty()); + let map = processor.event_map.read().unwrap(); + let handler = map.get("event.a").unwrap(); + assert_eq!(handler.listeners, vec!["settings".to_string()]); + } + + #[test] + fn test_cleanup_by_app_id_multiple_events() { + let processor = ThunderEventProcessor::new(); + { + let mut map = processor.event_map.write().unwrap(); + map.insert("event.a".to_string(), make_handler(vec!["epg"])); + map.insert("event.b".to_string(), make_handler(vec!["epg", "other"])); + map.insert("event.c".to_string(), make_handler(vec!["other"])); + } + { + let mut last = processor.last_event.write().unwrap(); + last.insert("event.a".to_string(), serde_json::json!(1)); + last.insert("event.b".to_string(), serde_json::json!(2)); + } + + let removed = processor.cleanup_by_app_id("epg"); + + assert_eq!(removed, vec!["event.a".to_string()]); + let map = processor.event_map.read().unwrap(); + assert_eq!(map.len(), 2); // event.b and event.c remain + assert_eq!( + map.get("event.b").unwrap().listeners, + vec!["other".to_string()] + ); + // last_event for event.a should be cleaned, event.b kept + let last = processor.last_event.read().unwrap(); + assert!(!last.contains_key("event.a")); + assert!(last.contains_key("event.b")); + } + + #[test] + fn test_cleanup_by_app_id_no_match() { + let processor = ThunderEventProcessor::new(); + { + let mut map = processor.event_map.write().unwrap(); + map.insert("event.a".to_string(), make_handler(vec!["epg"])); + } + + let removed = processor.cleanup_by_app_id("nonexistent"); + + assert!(removed.is_empty()); + assert_eq!(processor.event_map.read().unwrap().len(), 1); + } +} From 45fff70233794fbbbbafceb9caf4679ba1036473 Mon Sep 17 00:00:00 2001 From: kvfasil Date: Mon, 27 Apr 2026 16:56:24 -0400 Subject: [PATCH 05/13] RDKEMW-17528: Fix Thunder unsubscribe not firing on session disconnect. --- core/main/src/broker/thunder_broker.rs | 60 ++++++++++++++++++++++---- 1 file changed, 51 insertions(+), 9 deletions(-) diff --git a/core/main/src/broker/thunder_broker.rs b/core/main/src/broker/thunder_broker.rs index 081f8f3ab..90501f0c5 100644 --- a/core/main/src/broker/thunder_broker.rs +++ b/core/main/src/broker/thunder_broker.rs @@ -30,7 +30,7 @@ use ripple_sdk::{ gateway::rpc_gateway_api::{JsonRpcApiResponse, RpcRequest}, observability::log_signal::LogSignal, }, - log::{debug, error, info, trace}, + log::{debug, error, info, trace, warn}, tokio::{ self, sync::{mpsc, Mutex}, @@ -198,7 +198,7 @@ impl ThunderBroker { ) -> Self { let endpoint = request.endpoint.clone(); let (broker_request_tx, mut broker_request_rx) = mpsc::channel(BROKER_CHANNEL_BUFFER_SIZE); - let (c_tx, mut c_tr) = mpsc::channel(2); + let (c_tx, mut c_tr) = mpsc::channel(BROKER_CHANNEL_BUFFER_SIZE); let broker_sender = BrokerSender { sender: broker_request_tx, }; @@ -373,16 +373,58 @@ impl ThunderBroker { broker_for_cleanup.subscription_map.write().unwrap().remove(&cleanup_request) }; if let Some(mut cleanup) = value { - let sender = broker_for_cleanup.get_sender(); - while let Some(mut v) = cleanup.pop() { - v.rpc = v.rpc.get_unsubscribe(); - if (sender.send(v).await).is_err() { - error!("Cleanup Error for {}",&cleanup_request); + info!( + "BrokerCleaner: unsubscribing {} subscription(s) for session {}", + cleanup.len(), cleanup_request + ); + // Send unregister calls directly to Thunder via the WebSocket. + // We must NOT route through the broker sender + prepare_request(), + // because prepare_request() calls self.unsubscribe() which looks up + // subscription_map — but we already removed entries above. + let binding = ws_tx_wrap.clone(); + let mut ws_tx = binding.lock().await; + while let Some(v) = cleanup.pop() { + let (callsign, method) = + ThunderBroker::get_callsign_and_method_from_alias(&v.rule.alias); + if let Some(method) = method { + let unregister = serde_json::json!({ + "jsonrpc": "2.0", + "id": v.rpc.ctx.call_id, + "method": format!("{}.unregister", callsign), + "params": { + "event": method, + "id": format!("{}", v.rpc.ctx.call_id) + } + }); + info!( + "BrokerCleaner: sending Thunder unregister for {}.{} (call_id={})", + callsign, method, v.rpc.ctx.call_id + ); + if let Err(e) = ws_tx.feed(Message::Text(unregister.to_string())).await { + error!( + "BrokerCleaner: failed to send unregister for {}: {:?}", + cleanup_request, e + ); + } + } else { + warn!( + "BrokerCleaner: could not extract method from alias '{}', skipping unregister", + v.rule.alias + ); } } - + if let Err(e) = ws_tx.flush().await { + error!( + "BrokerCleaner: failed to flush unregister calls for {}: {:?}", + cleanup_request, e + ); + } + } else { + debug!( + "BrokerCleaner: no subscriptions found for key '{}', skipping", + cleanup_request + ); } - } } } From 834bfc4d3f8f0d1aee2c8e6c6ad62dbf99ac5530 Mon Sep 17 00:00:00 2001 From: kvfasil Date: Mon, 27 Apr 2026 22:21:06 -0400 Subject: [PATCH 06/13] - Fix BrokerCleaner dropping cleanup messages under burst disconnect load Change try_send to send().await in BrokerCleaner::cleanup_session() to wait for channel capacity instead of silently dropping cleanup messages when the channel is full, which prevented Thunder .unregister calls from being sent. --- core/main/src/broker/endpoint_broker.rs | 2 +- core/main/src/broker/thunder_broker.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/main/src/broker/endpoint_broker.rs b/core/main/src/broker/endpoint_broker.rs index 9e20e37af..072a57d2f 100644 --- a/core/main/src/broker/endpoint_broker.rs +++ b/core/main/src/broker/endpoint_broker.rs @@ -89,7 +89,7 @@ pub struct BrokerCleaner { impl BrokerCleaner { async fn cleanup_session(&self, appid: &str) -> Result { if let Some(cleaner) = self.cleaner.clone() { - if let Err(e) = cleaner.try_send(appid.to_owned()) { + if let Err(e) = cleaner.send(appid.to_owned()).await { error!("Couldnt cleanup {} {:?}", appid, e); return Err(RippleError::SendFailure); } diff --git a/core/main/src/broker/thunder_broker.rs b/core/main/src/broker/thunder_broker.rs index 90501f0c5..e13c6e609 100644 --- a/core/main/src/broker/thunder_broker.rs +++ b/core/main/src/broker/thunder_broker.rs @@ -198,7 +198,7 @@ impl ThunderBroker { ) -> Self { let endpoint = request.endpoint.clone(); let (broker_request_tx, mut broker_request_rx) = mpsc::channel(BROKER_CHANNEL_BUFFER_SIZE); - let (c_tx, mut c_tr) = mpsc::channel(BROKER_CHANNEL_BUFFER_SIZE); + let (c_tx, mut c_tr) = mpsc::channel(2); let broker_sender = BrokerSender { sender: broker_request_tx, }; From e9d824a51cf31ccf4fa6daf00b8a7a5ab4b195ec Mon Sep 17 00:00:00 2001 From: kvfasil Date: Tue, 28 Apr 2026 16:19:56 -0400 Subject: [PATCH 07/13] Skip redundant event_handler processing for duplicate Thunder events ID in the forwarder loop and skip event_handler processing when payload is unchanged. Adds 5 unit tests for the dedup logic. --- core/main/src/broker/endpoint_broker.rs | 94 +++++++++++++++++++++++++ 1 file changed, 94 insertions(+) diff --git a/core/main/src/broker/endpoint_broker.rs b/core/main/src/broker/endpoint_broker.rs index 072a57d2f..0ffaa1d07 100644 --- a/core/main/src/broker/endpoint_broker.rs +++ b/core/main/src/broker/endpoint_broker.rs @@ -1362,6 +1362,11 @@ impl BrokerOutputForwarder { let event_utility_clone = event_utility.clone(); tokio::spawn(async move { + // Cache last event data per subscription id to skip redundant event_handler + // processing when the Thunder event payload is unchanged (e.g. + // onDisplayConnectionChanged fires every ~5s with identical data). + let mut last_event_handler_data: HashMap = HashMap::new(); + while let Some(output) = rx.recv().await { let output_c = output.clone(); let mut response = output.data.clone(); @@ -1392,6 +1397,24 @@ impl BrokerOutputForwarder { let apply_response_needed = if let Some(result) = response.result.clone() { if is_event { + // Skip event_handler processing if the event data is unchanged. + // This prevents redundant internal requests (e.g. device.hdr, + // device.hdcp) when Thunder fires the same event repeatedly + // (onDisplayConnectionChanged every ~5s), avoiding heap growth + // from repeated allocations. + if broker_request.rule.event_handler.is_some() { + if let Some(last) = last_event_handler_data.get(&id) { + if *last == result { + debug!( + "Skipping duplicate event_handler for id={} method={}", + id, rpc_request.ctx.method + ); + continue; + } + } + last_event_handler_data.insert(id, result.clone()); + } + LogSignal::new( "handle_event_output".to_string(), "processing event".to_string(), @@ -3475,6 +3498,77 @@ mod endpoint_broker_tests { assert!(cleaner.cleanup_session("test_app").await.is_err()); } } + #[cfg(test)] + mod event_handler_dedup { + use serde_json::{json, Value}; + use std::collections::HashMap; + + /// Simulates the dedup logic in `start_forwarder`: returns true if the + /// event should be skipped (duplicate), false if it should be processed. + fn should_skip( + cache: &mut HashMap, + id: u64, + has_event_handler: bool, + result: &Value, + ) -> bool { + if !has_event_handler { + return false; + } + if let Some(last) = cache.get(&id) { + if *last == *result { + return true; + } + } + cache.insert(id, result.clone()); + false + } + + #[test] + fn test_first_event_is_not_skipped() { + let mut cache = HashMap::new(); + let data = json!({"connected": true}); + assert!(!should_skip(&mut cache, 20, true, &data)); + } + + #[test] + fn test_duplicate_event_is_skipped() { + let mut cache = HashMap::new(); + let data = json!({"connected": true}); + assert!(!should_skip(&mut cache, 20, true, &data)); + assert!(should_skip(&mut cache, 20, true, &data)); + assert!(should_skip(&mut cache, 20, true, &data)); + } + + #[test] + fn test_changed_event_is_not_skipped() { + let mut cache = HashMap::new(); + let data1 = json!({"connected": true}); + let data2 = json!({"connected": false}); + assert!(!should_skip(&mut cache, 20, true, &data1)); + assert!(should_skip(&mut cache, 20, true, &data1)); + assert!(!should_skip(&mut cache, 20, true, &data2)); + assert!(should_skip(&mut cache, 20, true, &data2)); + } + + #[test] + fn test_different_ids_tracked_independently() { + let mut cache = HashMap::new(); + let data = json!({"connected": true}); + assert!(!should_skip(&mut cache, 20, true, &data)); + assert!(!should_skip(&mut cache, 22, true, &data)); + assert!(should_skip(&mut cache, 20, true, &data)); + assert!(should_skip(&mut cache, 22, true, &data)); + } + + #[test] + fn test_no_event_handler_never_skips() { + let mut cache = HashMap::new(); + let data = json!({"connected": true}); + assert!(!should_skip(&mut cache, 20, false, &data)); + assert!(!should_skip(&mut cache, 20, false, &data)); + } + } + #[cfg(test)] mod workflow { // fn test_workflow() { From 520d40d7f3824735d023be1cbdbc679bfe3f7ec3 Mon Sep 17 00:00:00 2001 From: kvfasil Date: Tue, 28 Apr 2026 18:14:07 -0400 Subject: [PATCH 08/13] Revert "Skip redundant event_handler processing for duplicate Thunder events" This reverts commit e9d824a51cf31ccf4fa6daf00b8a7a5ab4b195ec. --- core/main/src/broker/endpoint_broker.rs | 94 ------------------------- 1 file changed, 94 deletions(-) diff --git a/core/main/src/broker/endpoint_broker.rs b/core/main/src/broker/endpoint_broker.rs index 0ffaa1d07..072a57d2f 100644 --- a/core/main/src/broker/endpoint_broker.rs +++ b/core/main/src/broker/endpoint_broker.rs @@ -1362,11 +1362,6 @@ impl BrokerOutputForwarder { let event_utility_clone = event_utility.clone(); tokio::spawn(async move { - // Cache last event data per subscription id to skip redundant event_handler - // processing when the Thunder event payload is unchanged (e.g. - // onDisplayConnectionChanged fires every ~5s with identical data). - let mut last_event_handler_data: HashMap = HashMap::new(); - while let Some(output) = rx.recv().await { let output_c = output.clone(); let mut response = output.data.clone(); @@ -1397,24 +1392,6 @@ impl BrokerOutputForwarder { let apply_response_needed = if let Some(result) = response.result.clone() { if is_event { - // Skip event_handler processing if the event data is unchanged. - // This prevents redundant internal requests (e.g. device.hdr, - // device.hdcp) when Thunder fires the same event repeatedly - // (onDisplayConnectionChanged every ~5s), avoiding heap growth - // from repeated allocations. - if broker_request.rule.event_handler.is_some() { - if let Some(last) = last_event_handler_data.get(&id) { - if *last == result { - debug!( - "Skipping duplicate event_handler for id={} method={}", - id, rpc_request.ctx.method - ); - continue; - } - } - last_event_handler_data.insert(id, result.clone()); - } - LogSignal::new( "handle_event_output".to_string(), "processing event".to_string(), @@ -3498,77 +3475,6 @@ mod endpoint_broker_tests { assert!(cleaner.cleanup_session("test_app").await.is_err()); } } - #[cfg(test)] - mod event_handler_dedup { - use serde_json::{json, Value}; - use std::collections::HashMap; - - /// Simulates the dedup logic in `start_forwarder`: returns true if the - /// event should be skipped (duplicate), false if it should be processed. - fn should_skip( - cache: &mut HashMap, - id: u64, - has_event_handler: bool, - result: &Value, - ) -> bool { - if !has_event_handler { - return false; - } - if let Some(last) = cache.get(&id) { - if *last == *result { - return true; - } - } - cache.insert(id, result.clone()); - false - } - - #[test] - fn test_first_event_is_not_skipped() { - let mut cache = HashMap::new(); - let data = json!({"connected": true}); - assert!(!should_skip(&mut cache, 20, true, &data)); - } - - #[test] - fn test_duplicate_event_is_skipped() { - let mut cache = HashMap::new(); - let data = json!({"connected": true}); - assert!(!should_skip(&mut cache, 20, true, &data)); - assert!(should_skip(&mut cache, 20, true, &data)); - assert!(should_skip(&mut cache, 20, true, &data)); - } - - #[test] - fn test_changed_event_is_not_skipped() { - let mut cache = HashMap::new(); - let data1 = json!({"connected": true}); - let data2 = json!({"connected": false}); - assert!(!should_skip(&mut cache, 20, true, &data1)); - assert!(should_skip(&mut cache, 20, true, &data1)); - assert!(!should_skip(&mut cache, 20, true, &data2)); - assert!(should_skip(&mut cache, 20, true, &data2)); - } - - #[test] - fn test_different_ids_tracked_independently() { - let mut cache = HashMap::new(); - let data = json!({"connected": true}); - assert!(!should_skip(&mut cache, 20, true, &data)); - assert!(!should_skip(&mut cache, 22, true, &data)); - assert!(should_skip(&mut cache, 20, true, &data)); - assert!(should_skip(&mut cache, 22, true, &data)); - } - - #[test] - fn test_no_event_handler_never_skips() { - let mut cache = HashMap::new(); - let data = json!({"connected": true}); - assert!(!should_skip(&mut cache, 20, false, &data)); - assert!(!should_skip(&mut cache, 20, false, &data)); - } - } - #[cfg(test)] mod workflow { // fn test_workflow() { From a096181b7a03b72c10f565d076c52777fe56d86e Mon Sep 17 00:00:00 2001 From: kvfasil Date: Tue, 28 Apr 2026 20:58:07 -0400 Subject: [PATCH 09/13] -Cache compiled jq filters to prevent Rc reference cycle heap leak Each call to jq_compile() created Rc reference cycles in jaq-interpret that were never freed, leaking ~120 heap nodes (~24 KB) per event_handler cycle. fix: Cache compiled Filter objects in a static HashMap so each unique filter string is parsed and compiled only once. Subsequent calls clone the cached filter (cheap Rc refcount increment) instead of recompiling. --- core/main/src/broker/rules/rules_engine.rs | 47 +++++++++++++++++++++- 1 file changed, 46 insertions(+), 1 deletion(-) diff --git a/core/main/src/broker/rules/rules_engine.rs b/core/main/src/broker/rules/rules_engine.rs index 654b2163e..24be1a24c 100644 --- a/core/main/src/broker/rules/rules_engine.rs +++ b/core/main/src/broker/rules/rules_engine.rs @@ -14,7 +14,7 @@ // // SPDX-License-Identifier: Apache-2.0 // -use jaq_interpret::{Ctx, FilterT, ParseCtx, RcIter, Val}; +use jaq_interpret::{Ctx, Filter, FilterT, ParseCtx, RcIter, Val}; use ripple_sdk::api::{ gateway::rpc_gateway_api::RpcRequest, manifest::extn_manifest::ExtnManifest, }; @@ -36,6 +36,22 @@ use super::rules_functions::{apply_functions, RulesFunction, RulesImport}; static BASE_PARSE_CTX_INIT: Once = Once::new(); static mut BASE_PARSE_CTX_PTR: Option> = None; +static FILTER_CACHE_INIT: Once = Once::new(); +static mut FILTER_CACHE_PTR: Option>> = None; + +fn get_filter_cache() -> MutexGuard<'static, HashMap> { + FILTER_CACHE_INIT.call_once(|| unsafe { + FILTER_CACHE_PTR = Some(Mutex::new(HashMap::new())); + }); + unsafe { + FILTER_CACHE_PTR + .as_ref() + .expect("FILTER_CACHE_PTR not initialized") + .lock() + .expect("Failed to lock FILTER_CACHE_PTR") + } +} + #[derive(Debug, Deserialize, Default, Clone)] pub struct RuleSet { #[serde(default)] @@ -532,6 +548,26 @@ pub fn jq_compile(input: Value, filter: &str, reference: String) -> Result Result Date: Wed, 29 Apr 2026 02:21:39 -0400 Subject: [PATCH 10/13] -Updated log levels. --- core/main/src/broker/endpoint_broker.rs | 6 ++--- core/main/src/broker/thunder_broker.rs | 24 +++++++++---------- core/main/src/firebolt/firebolt_gateway.rs | 14 ++++------- core/main/src/service/apps/app_events.rs | 11 ++++----- core/sdk/src/extn/client/extn_client.rs | 2 +- .../src/events/thunder_event_processor.rs | 5 +--- .../src/processors/thunder_events.rs | 2 +- 7 files changed, 27 insertions(+), 37 deletions(-) diff --git a/core/main/src/broker/endpoint_broker.rs b/core/main/src/broker/endpoint_broker.rs index 072a57d2f..fa6f3e3ff 100644 --- a/core/main/src/broker/endpoint_broker.rs +++ b/core/main/src/broker/endpoint_broker.rs @@ -1137,8 +1137,8 @@ impl EndpointBrokerState { for id in &removed_ids { extn_map.remove(id); } - info!( - "cleanup_request_maps_for_app: removed {} request_map and extension_request_map entries for {}", + debug!( + "cleanup_request_maps_for_app: removed {} request_map and extension_request_map entries for app_id={}", removed_ids.len(), app_id ); @@ -1457,7 +1457,7 @@ impl BrokerOutputForwarder { ) .await; } else { - error!( + debug!( "start_forwarder:{} request not found for {:?}", line!(), response diff --git a/core/main/src/broker/thunder_broker.rs b/core/main/src/broker/thunder_broker.rs index e13c6e609..34903c17d 100644 --- a/core/main/src/broker/thunder_broker.rs +++ b/core/main/src/broker/thunder_broker.rs @@ -373,9 +373,9 @@ impl ThunderBroker { broker_for_cleanup.subscription_map.write().unwrap().remove(&cleanup_request) }; if let Some(mut cleanup) = value { - info!( - "BrokerCleaner: unsubscribing {} subscription(s) for session {}", - cleanup.len(), cleanup_request + debug!( + "BrokerCleaner: unsubscribing {} subscription(s)", + cleanup.len() ); // Send unregister calls directly to Thunder via the WebSocket. // We must NOT route through the broker sender + prepare_request(), @@ -389,21 +389,20 @@ impl ThunderBroker { if let Some(method) = method { let unregister = serde_json::json!({ "jsonrpc": "2.0", - "id": v.rpc.ctx.call_id, "method": format!("{}.unregister", callsign), "params": { "event": method, "id": format!("{}", v.rpc.ctx.call_id) } }); - info!( - "BrokerCleaner: sending Thunder unregister for {}.{} (call_id={})", - callsign, method, v.rpc.ctx.call_id + debug!( + "BrokerCleaner: sending Thunder unregister for {}.{}", + callsign, method ); if let Err(e) = ws_tx.feed(Message::Text(unregister.to_string())).await { error!( - "BrokerCleaner: failed to send unregister for {}: {:?}", - cleanup_request, e + "BrokerCleaner: failed to send unregister: {:?}", + e ); } } else { @@ -415,14 +414,13 @@ impl ThunderBroker { } if let Err(e) = ws_tx.flush().await { error!( - "BrokerCleaner: failed to flush unregister calls for {}: {:?}", - cleanup_request, e + "BrokerCleaner: failed to flush unregister calls: {:?}", + e ); } } else { debug!( - "BrokerCleaner: no subscriptions found for key '{}', skipping", - cleanup_request + "BrokerCleaner: no subscriptions found, skipping" ); } } diff --git a/core/main/src/firebolt/firebolt_gateway.rs b/core/main/src/firebolt/firebolt_gateway.rs index 5bcae3d6d..4470cd34a 100644 --- a/core/main/src/firebolt/firebolt_gateway.rs +++ b/core/main/src/firebolt/firebolt_gateway.rs @@ -33,7 +33,7 @@ use ripple_sdk::{ }, chrono::Utc, extn::extn_client_message::ExtnMessage, - log::{error, info, trace, warn}, + log::{debug, error, info, trace, warn}, serde_json::{self, Value}, service::service_message::{JsonRpcMessage as JsonRpcServiceMessage, ServiceMessage}, tokio::{self, runtime::Handle, sync::mpsc::Sender}, @@ -141,8 +141,7 @@ impl FireboltGateway { } UnregisterSession { session_id, cid } => { info!( - "Cleanup: session_id={} cid={} - removing event listeners, broker subs, session", - session_id, cid + "Cleanup: app disconnect - removing event listeners, broker subs, session" ); // Clean event listeners by session_id AppEvents::remove_session(&self.state.platform_state, session_id.clone()); @@ -186,15 +185,12 @@ impl FireboltGateway { .send_extn_request_transient(cleanup_request) { warn!( - "Failed to send ThunderEventProcessor cleanup for app_id={} (cid={}): {:?}", - app_id, cid, e + "Failed to send ThunderEventProcessor cleanup for app_id={}: {:?}", + app_id, e ); } } else { - warn!( - "Could not resolve app_id for cid={}, skipping ThunderEventProcessor cleanup", - cid - ); + debug!("Could not resolve app_id, skipping ThunderEventProcessor cleanup"); } } HandleRpc { request } => self.handle(request, None).await, diff --git a/core/main/src/service/apps/app_events.rs b/core/main/src/service/apps/app_events.rs index 2d501c468..0158aaa8c 100644 --- a/core/main/src/service/apps/app_events.rs +++ b/core/main/src/service/apps/app_events.rs @@ -523,11 +523,7 @@ pub mod tests { context: Option, ) { let mut listeners = state.app_events_state.listeners.write().unwrap(); - let vec = AppEvents::get_or_create_listener_vec( - &mut listeners, - event.to_string(), - context, - ); + let vec = AppEvents::get_or_create_listener_vec(&mut listeners, event.to_string(), context); vec.push(EventListener { call_ctx: ctx, session_tx: None, @@ -621,7 +617,10 @@ pub mod tests { let listeners = state.app_events_state.listeners.read().unwrap(); assert_eq!(listeners.get("evt1").unwrap().get(&None).unwrap().len(), 1); assert_eq!( - listeners.get("evt1").unwrap().get(&None).unwrap()[0].call_ctx.cid.as_deref(), + listeners.get("evt1").unwrap().get(&None).unwrap()[0] + .call_ctx + .cid + .as_deref(), Some("conn-2") ); } diff --git a/core/sdk/src/extn/client/extn_client.rs b/core/sdk/src/extn/client/extn_client.rs index c672e4941..d0e10ff09 100644 --- a/core/sdk/src/extn/client/extn_client.rs +++ b/core/sdk/src/extn/client/extn_client.rs @@ -472,7 +472,7 @@ impl ExtnClient { } }); } else { - error!("No response processor for {:?}", msg); + debug!("No response processor for {:?}", msg); } } diff --git a/device/thunder_ripple_sdk/src/events/thunder_event_processor.rs b/device/thunder_ripple_sdk/src/events/thunder_event_processor.rs index 635a4e709..fe2bd9f61 100644 --- a/device/thunder_ripple_sdk/src/events/thunder_event_processor.rs +++ b/device/thunder_ripple_sdk/src/events/thunder_event_processor.rs @@ -343,10 +343,7 @@ impl ThunderEventProcessor { last_event_map.remove(event_name); } } - debug!( - "cleanup_by_app_id: app_id={}, removed_events={:?}", - app_id, removed_events - ); + debug!("cleanup_by_app_id: removed_events={:?}", removed_events); removed_events } } diff --git a/device/thunder_ripple_sdk/src/processors/thunder_events.rs b/device/thunder_ripple_sdk/src/processors/thunder_events.rs index 0f79b4382..f0128bc6b 100644 --- a/device/thunder_ripple_sdk/src/processors/thunder_events.rs +++ b/device/thunder_ripple_sdk/src/processors/thunder_events.rs @@ -117,7 +117,7 @@ impl ExtnRequestProcessor for ThunderOpenEventsProcessor { removed_events ); } - return Self::ack(state.get_client(), msg).await.is_ok(); + return true; } } { v.await; From 907f6a89323567e77737250a9613f943d351a13c Mon Sep 17 00:00:00 2001 From: kvfasil Date: Wed, 29 Apr 2026 02:41:23 -0400 Subject: [PATCH 11/13] Apply suggestions from code review Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- core/main/src/broker/endpoint_broker.rs | 7 +++---- core/main/src/firebolt/firebolt_ws.rs | 6 +++--- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/core/main/src/broker/endpoint_broker.rs b/core/main/src/broker/endpoint_broker.rs index fa6f3e3ff..acf561d88 100644 --- a/core/main/src/broker/endpoint_broker.rs +++ b/core/main/src/broker/endpoint_broker.rs @@ -90,7 +90,7 @@ impl BrokerCleaner { async fn cleanup_session(&self, appid: &str) -> Result { if let Some(cleaner) = self.cleaner.clone() { if let Err(e) = cleaner.send(appid.to_owned()).await { - error!("Couldnt cleanup {} {:?}", appid, e); + error!("Could not clean up {} {:?}", appid, e); return Err(RippleError::SendFailure); } return Ok(appid.to_owned()); @@ -1138,9 +1138,8 @@ impl EndpointBrokerState { extn_map.remove(id); } debug!( - "cleanup_request_maps_for_app: removed {} request_map and extension_request_map entries for app_id={}", - removed_ids.len(), - app_id + "cleanup_request_maps_for_app: removed {} request_map and extension_request_map entries", + removed_ids.len() ); } } diff --git a/core/main/src/firebolt/firebolt_ws.rs b/core/main/src/firebolt/firebolt_ws.rs index ed2aa4605..2a5f9c090 100644 --- a/core/main/src/firebolt/firebolt_ws.rs +++ b/core/main/src/firebolt/firebolt_ws.rs @@ -438,9 +438,9 @@ impl FireboltWs { } } } - info!( - "Session disconnect: unregistering connection_id={} session_id={} app_id={}", - connection_id, identity.session_id, identity.app_id + debug!( + "Session disconnect: unregistering connection_id={} app_id={}", + connection_id, identity.app_id ); let msg = FireboltGatewayCommand::UnregisterSession { session_id: identity.session_id.clone(), From 7bc6c057bf549429ba8a3ba98b47006fc9cde79a Mon Sep 17 00:00:00 2001 From: kvfasil Date: Wed, 29 Apr 2026 03:37:00 -0400 Subject: [PATCH 12/13] Rename cleanup_request_maps parameter and fix lock ordering - Rename cleanup_request_maps_for_app to cleanup_request_maps and parameter app_id to id to clarify it may be app_id, session_id, or connection_id - Drop event_map write lock before acquiring last_event write lock in cleanup_by_app_id to prevent potential deadlocks --- core/main/src/broker/endpoint_broker.rs | 31 ++++++++++--------- .../src/events/thunder_event_processor.rs | 30 +++++++++++------- 2 files changed, 34 insertions(+), 27 deletions(-) diff --git a/core/main/src/broker/endpoint_broker.rs b/core/main/src/broker/endpoint_broker.rs index acf561d88..1e035850b 100644 --- a/core/main/src/broker/endpoint_broker.rs +++ b/core/main/src/broker/endpoint_broker.rs @@ -1095,34 +1095,34 @@ impl EndpointBrokerState { } // Method to cleanup all subscription on App termination - pub async fn cleanup_for_app(&self, app_id: &str) { + pub async fn cleanup_for_app(&self, id: &str) { let cleaners = { self.cleaner_list.read().unwrap().clone() }; for cleaner in cleaners { /* for now, just eat the error - the return type was mainly added to prepate for future refactoring/testability */ - let _ = cleaner.cleanup_session(app_id).await; + let _ = cleaner.cleanup_session(id).await; } // Clean up subscription entries from request_map and extension_request_map // that belong to this app. These are never removed on disconnect otherwise. - self.cleanup_request_maps_for_app(app_id); + self.cleanup_request_maps(id); } /// Remove subscription/event entries from request_map and extension_request_map - /// for the given app_id (which may be a session_id or connection_id). + /// matching the given identifier (may be app_id, session_id, or connection_id). /// Without this, subscription entries in request_map (guarded by is_subscription()) /// and event entries in extension_request_map persist forever. - fn cleanup_request_maps_for_app(&self, app_id: &str) { + fn cleanup_request_maps(&self, id: &str) { let removed_ids: Vec = { let mut request_map = self.request_map.write().unwrap(); let ids_to_remove: Vec = request_map .iter() .filter(|(_, req)| { - req.rpc.ctx.app_id == app_id - || req.rpc.ctx.session_id == app_id - || req.rpc.ctx.cid.as_deref() == Some(app_id) + req.rpc.ctx.app_id == id + || req.rpc.ctx.session_id == id + || req.rpc.ctx.cid.as_deref() == Some(id) }) .map(|(id, _)| *id) .collect(); @@ -1138,8 +1138,9 @@ impl EndpointBrokerState { extn_map.remove(id); } debug!( - "cleanup_request_maps_for_app: removed {} request_map and extension_request_map entries", - removed_ids.len() + "cleanup_request_maps: removed {} request_map and extension_request_map entries for app_id={}", + removed_ids.len(), + id ); } } @@ -2568,7 +2569,7 @@ mod endpoint_broker_tests { state.update_request(&req, &default_rule(), None, None, vec![]); assert_eq!(state.request_map.read().unwrap().len(), 1); - state.cleanup_request_maps_for_app("epg"); + state.cleanup_request_maps("epg"); assert!(state.request_map.read().unwrap().is_empty()); } @@ -2581,7 +2582,7 @@ mod endpoint_broker_tests { req.ctx.app_id = "other".to_string(); state.update_request(&req, &default_rule(), None, None, vec![]); - state.cleanup_request_maps_for_app("sess-123"); + state.cleanup_request_maps("sess-123"); assert!(state.request_map.read().unwrap().is_empty()); } @@ -2594,7 +2595,7 @@ mod endpoint_broker_tests { req.ctx.app_id = "other".to_string(); state.update_request(&req, &default_rule(), None, None, vec![]); - state.cleanup_request_maps_for_app("conn-xyz"); + state.cleanup_request_maps("conn-xyz"); assert!(state.request_map.read().unwrap().is_empty()); } @@ -2605,7 +2606,7 @@ mod endpoint_broker_tests { let req = RpcRequest::mock(); state.update_request(&req, &default_rule(), None, None, vec![]); - state.cleanup_request_maps_for_app("nonexistent"); + state.cleanup_request_maps("nonexistent"); assert_eq!(state.request_map.read().unwrap().len(), 1); } @@ -2628,7 +2629,7 @@ mod endpoint_broker_tests { ); } - state.cleanup_request_maps_for_app("epg"); + state.cleanup_request_maps("epg"); assert!(state.request_map.read().unwrap().is_empty()); assert!(state.extension_request_map.read().unwrap().is_empty()); } diff --git a/device/thunder_ripple_sdk/src/events/thunder_event_processor.rs b/device/thunder_ripple_sdk/src/events/thunder_event_processor.rs index fe2bd9f61..9c68c75a2 100644 --- a/device/thunder_ripple_sdk/src/events/thunder_event_processor.rs +++ b/device/thunder_ripple_sdk/src/events/thunder_event_processor.rs @@ -324,26 +324,32 @@ impl ThunderEventProcessor { /// Remove all event listeners for a given app_id (session cleanup on disconnect). /// Returns the list of event names that no longer have any listeners and were removed. pub fn cleanup_by_app_id(&self, app_id: &str) -> Vec { - let mut event_map = self.event_map.write().unwrap(); - let mut removed_events = Vec::new(); - let event_names: Vec = event_map.keys().cloned().collect(); - for event_name in event_names { - if let Some(handler) = event_map.get_mut(&event_name) { - handler.listeners.retain(|id| id != app_id); - if handler.listeners.is_empty() { - event_map.remove(&event_name); - removed_events.push(event_name.clone()); + let removed_events = { + let mut event_map = self.event_map.write().unwrap(); + let mut removed = Vec::new(); + let event_names: Vec = event_map.keys().cloned().collect(); + for event_name in event_names { + if let Some(handler) = event_map.get_mut(&event_name) { + handler.listeners.retain(|id| id != app_id); + if handler.listeners.is_empty() { + event_map.remove(&event_name); + removed.push(event_name.clone()); + } } } - } - // Clean last_event entries for removed events to free heap + removed + }; // event_map lock dropped here + // Clean last_event entries for removed events to free heap if !removed_events.is_empty() { let mut last_event_map = self.last_event.write().unwrap(); for event_name in &removed_events { last_event_map.remove(event_name); } } - debug!("cleanup_by_app_id: removed_events={:?}", removed_events); + debug!( + "cleanup_by_app_id: app_id={}, removed_events={:?}", + app_id, removed_events + ); removed_events } } From db3110455f23df329c27fea22baf386c1dfd04eb Mon Sep 17 00:00:00 2001 From: kvfasil Date: Wed, 29 Apr 2026 04:17:26 -0400 Subject: [PATCH 13/13] -fix for logging Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com> --- core/main/src/broker/endpoint_broker.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/main/src/broker/endpoint_broker.rs b/core/main/src/broker/endpoint_broker.rs index 1e035850b..ae2c85d16 100644 --- a/core/main/src/broker/endpoint_broker.rs +++ b/core/main/src/broker/endpoint_broker.rs @@ -1138,9 +1138,8 @@ impl EndpointBrokerState { extn_map.remove(id); } debug!( - "cleanup_request_maps: removed {} request_map and extension_request_map entries for app_id={}", - removed_ids.len(), - id + "cleanup_request_maps: removed {} request_map and extension_request_map entries", + removed_ids.len() ); } }