diff --git a/core/main/src/broker/endpoint_broker.rs b/core/main/src/broker/endpoint_broker.rs index b7e8bbb54..ae2c85d16 100644 --- a/core/main/src/broker/endpoint_broker.rs +++ b/core/main/src/broker/endpoint_broker.rs @@ -89,8 +89,8 @@ pub struct BrokerCleaner { impl BrokerCleaner { async fn cleanup_session(&self, appid: &str) -> Result { if let Some(cleaner) = self.cleaner.clone() { - if let Err(e) = cleaner.try_send(appid.to_owned()) { - error!("Couldnt cleanup {} {:?}", appid, e); + if let Err(e) = cleaner.send(appid.to_owned()).await { + error!("Could not clean up {} {:?}", appid, e); return Err(RippleError::SendFailure); } return Ok(appid.to_owned()); @@ -1095,14 +1095,52 @@ impl EndpointBrokerState { } // Method to cleanup all subscription on App termination - pub async fn cleanup_for_app(&self, app_id: &str) { + pub async fn cleanup_for_app(&self, id: &str) { let cleaners = { self.cleaner_list.read().unwrap().clone() }; for cleaner in cleaners { /* for now, just eat the error - the return type was mainly added to prepate for future refactoring/testability */ - let _ = cleaner.cleanup_session(app_id).await; + let _ = cleaner.cleanup_session(id).await; + } + + // Clean up subscription entries from request_map and extension_request_map + // that belong to this app. These are never removed on disconnect otherwise. + self.cleanup_request_maps(id); + } + + /// Remove subscription/event entries from request_map and extension_request_map + /// matching the given identifier (may be app_id, session_id, or connection_id). + /// Without this, subscription entries in request_map (guarded by is_subscription()) + /// and event entries in extension_request_map persist forever. + fn cleanup_request_maps(&self, id: &str) { + let removed_ids: Vec = { + let mut request_map = self.request_map.write().unwrap(); + let ids_to_remove: Vec = request_map + .iter() + .filter(|(_, req)| { + req.rpc.ctx.app_id == id + || req.rpc.ctx.session_id == id + || req.rpc.ctx.cid.as_deref() == Some(id) + }) + .map(|(id, _)| *id) + .collect(); + for id in &ids_to_remove { + request_map.remove(id); + } + ids_to_remove + }; + + if !removed_ids.is_empty() { + let mut extn_map = self.extension_request_map.write().unwrap(); + for id in &removed_ids { + extn_map.remove(id); + } + debug!( + "cleanup_request_maps: removed {} request_map and extension_request_map entries", + removed_ids.len() + ); } } /// Send a request through the broker and wait for response with a oneshot channel and custom timeout @@ -1418,7 +1456,7 @@ impl BrokerOutputForwarder { ) .await; } else { - error!( + debug!( "start_forwarder:{} request not found for {:?}", line!(), response @@ -2487,6 +2525,114 @@ mod endpoint_broker_tests { // // assert!(state.get_request(2).is_ok()); // // assert!(state.get_request(1).is_ok()); // } + + #[cfg(test)] + mod cleanup_request_maps { + use super::*; + use crate::broker::rules::rules_engine::{Rule, RuleTransform}; + use ripple_sdk::api::gateway::rpc_gateway_api::RpcRequest; + use ripple_sdk::Mockable; + use serial_test::serial; + + fn make_state() -> EndpointBrokerState { + let (tx, _) = channel(2); + let client = RippleClient::new(ChannelsState::new()); + EndpointBrokerState::new( + OpMetricState::default(), + tx, + RuleEngine { + rules: RuleSet::default(), + functions: HashMap::default(), + }, + client, + ) + } + + fn default_rule() -> Rule { + Rule { + alias: "test.method".to_owned(), + transform: RuleTransform::default(), + endpoint: None, + filter: None, + event_handler: None, + sources: None, + } + } + + #[serial] + #[tokio::test] + async fn test_cleanup_by_app_id() { + let state = make_state(); + let mut req = RpcRequest::mock(); + req.ctx.app_id = "epg".to_string(); + state.update_request(&req, &default_rule(), None, None, vec![]); + + assert_eq!(state.request_map.read().unwrap().len(), 1); + state.cleanup_request_maps("epg"); + assert!(state.request_map.read().unwrap().is_empty()); + } + + #[serial] + #[tokio::test] + async fn test_cleanup_by_session_id() { + let state = make_state(); + let mut req = RpcRequest::mock(); + req.ctx.session_id = "sess-123".to_string(); + req.ctx.app_id = "other".to_string(); + state.update_request(&req, &default_rule(), None, None, vec![]); + + state.cleanup_request_maps("sess-123"); + assert!(state.request_map.read().unwrap().is_empty()); + } + + #[serial] + #[tokio::test] + async fn test_cleanup_by_cid() { + let state = make_state(); + let mut req = RpcRequest::mock(); + req.ctx.cid = Some("conn-xyz".to_string()); + req.ctx.app_id = "other".to_string(); + state.update_request(&req, &default_rule(), None, None, vec![]); + + state.cleanup_request_maps("conn-xyz"); + assert!(state.request_map.read().unwrap().is_empty()); + } + + #[serial] + #[tokio::test] + async fn test_cleanup_no_match_is_noop() { + let state = make_state(); + let req = RpcRequest::mock(); + state.update_request(&req, &default_rule(), None, None, vec![]); + + state.cleanup_request_maps("nonexistent"); + assert_eq!(state.request_map.read().unwrap().len(), 1); + } + + #[serial] + #[tokio::test] + async fn test_cleanup_also_removes_extension_request_map() { + let state = make_state(); + let mut req = RpcRequest::mock(); + req.ctx.app_id = "epg".to_string(); + state.update_request(&req, &default_rule(), None, None, vec![]); + let id = { + let map = state.request_map.read().unwrap(); + *map.keys().next().unwrap() + }; + { + let mut extn_map = state.extension_request_map.write().unwrap(); + extn_map.insert( + id, + ripple_sdk::extn::extn_client_message::ExtnMessage::default(), + ); + } + + state.cleanup_request_maps("epg"); + assert!(state.request_map.read().unwrap().is_empty()); + assert!(state.extension_request_map.read().unwrap().is_empty()); + } + } } #[tokio::test] diff --git a/core/main/src/broker/rules/rules_engine.rs b/core/main/src/broker/rules/rules_engine.rs index 654b2163e..24be1a24c 100644 --- a/core/main/src/broker/rules/rules_engine.rs +++ b/core/main/src/broker/rules/rules_engine.rs @@ -14,7 +14,7 @@ // // SPDX-License-Identifier: Apache-2.0 // -use jaq_interpret::{Ctx, FilterT, ParseCtx, RcIter, Val}; +use jaq_interpret::{Ctx, Filter, FilterT, ParseCtx, RcIter, Val}; use ripple_sdk::api::{ gateway::rpc_gateway_api::RpcRequest, manifest::extn_manifest::ExtnManifest, }; @@ -36,6 +36,22 @@ use super::rules_functions::{apply_functions, RulesFunction, RulesImport}; static BASE_PARSE_CTX_INIT: Once = Once::new(); static mut BASE_PARSE_CTX_PTR: Option> = None; +static FILTER_CACHE_INIT: Once = Once::new(); +static mut FILTER_CACHE_PTR: Option>> = None; + +fn get_filter_cache() -> MutexGuard<'static, HashMap> { + FILTER_CACHE_INIT.call_once(|| unsafe { + FILTER_CACHE_PTR = Some(Mutex::new(HashMap::new())); + }); + unsafe { + FILTER_CACHE_PTR + .as_ref() + .expect("FILTER_CACHE_PTR not initialized") + .lock() + .expect("Failed to lock FILTER_CACHE_PTR") + } +} + #[derive(Debug, Deserialize, Default, Clone)] pub struct RuleSet { #[serde(default)] @@ -532,6 +548,26 @@ pub fn jq_compile(input: Value, filter: &str, reference: String) -> Result Result { + info!( + "Cleanup: app disconnect - removing event listeners, broker subs, session" + ); + // Clean event listeners by session_id AppEvents::remove_session(&self.state.platform_state, session_id.clone()); + // Also clean event listeners by connection_id (cid) in case session_id != cid + AppEvents::cleanup_by_connection_id(&self.state.platform_state, &cid); ProviderBroker::unregister_session(&self.state.platform_state, cid.clone()) .await; self.state @@ -147,7 +154,44 @@ impl FireboltGateway { .endpoint_state .cleanup_for_app(&cid) .await; + // Also cleanup broker subscriptions by session_id (subscription_map uses session_id as key) + self.state + .platform_state + .endpoint_state + .cleanup_for_app(&session_id) + .await; + // Resolve app_id from session state BEFORE clearing the session, + // because ThunderEventProcessor stores listeners by app_id (e.g. "epg"), + // not by cid (a UUID). + let app_id_for_cleanup = self + .state + .platform_state + .session_state + .get_app_id(cid.clone()); + // Clear session from session_map by connection_id (the key used during registration) self.state.platform_state.session_state.clear_session(&cid); + // Send cleanup to ThunderEventProcessor (extension side) to remove + // event_map and last_event entries for this app + if let Some(app_id) = app_id_for_cleanup { + let cleanup_request = DeviceEventRequest { + event: DeviceEvent::Cleanup, + subscribe: false, + callback_type: DeviceEventCallback::FireboltAppEvent(app_id.clone()), + }; + if let Err(e) = self + .state + .platform_state + .get_client() + .send_extn_request_transient(cleanup_request) + { + warn!( + "Failed to send ThunderEventProcessor cleanup for app_id={}: {:?}", + app_id, e + ); + } + } else { + debug!("Could not resolve app_id, skipping ThunderEventProcessor cleanup"); + } } HandleRpc { request } => self.handle(request, None).await, HandleRpcForExtn { msg } => { diff --git a/core/main/src/firebolt/firebolt_ws.rs b/core/main/src/firebolt/firebolt_ws.rs index 121814665..2a5f9c090 100644 --- a/core/main/src/firebolt/firebolt_ws.rs +++ b/core/main/src/firebolt/firebolt_ws.rs @@ -438,7 +438,10 @@ impl FireboltWs { } } } - debug!("SESSION DEBUG Unregistering {}", connection_id); + debug!( + "Session disconnect: unregistering connection_id={} app_id={}", + connection_id, identity.app_id + ); let msg = FireboltGatewayCommand::UnregisterSession { session_id: identity.session_id.clone(), cid: connection_id, diff --git a/core/main/src/service/apps/app_events.rs b/core/main/src/service/apps/app_events.rs index fca4033ff..0158aaa8c 100644 --- a/core/main/src/service/apps/app_events.rs +++ b/core/main/src/service/apps/app_events.rs @@ -440,8 +440,39 @@ impl AppEvents { if let Some(event_listener) = ctx_map.get_mut(&context) { AppEvents::remove_session_from_events(event_listener, &session_id); } + // Remove empty context entries to free heap + if ctx_map.get(&context).map_or(false, |v| v.is_empty()) { + ctx_map.remove(&context); + } + } + } + // Remove empty event entries to prevent map bloat + if listeners.get(&event_name).map_or(false, |m| m.is_empty()) { + listeners.remove(&event_name); + } + } + } + + /// Cleanup all event listeners matching a given connection_id. + /// This ensures cleanup works even when session_id != connection_id. + pub fn cleanup_by_connection_id(state: &PlatformState, connection_id: &str) { + let mut listeners = state.app_events_state.listeners.write().unwrap(); + let all_events = listeners.keys().cloned().collect::>(); + for event_name in all_events { + if let Some(ctx_map) = listeners.get_mut(&event_name) { + let all_contexts = ctx_map.keys().cloned().collect::>>(); + for context in all_contexts { + if let Some(event_listener) = ctx_map.get_mut(&context) { + event_listener.retain(|l| l.call_ctx.cid.as_deref() != Some(connection_id)); + } + if ctx_map.get(&context).map_or(false, |v| v.is_empty()) { + ctx_map.remove(&context); + } } } + if listeners.get(&event_name).map_or(false, |m| m.is_empty()) { + listeners.remove(&event_name); + } } } } @@ -449,6 +480,7 @@ impl AppEvents { pub mod tests { use crate::state::session_state::Session; use ripple_sdk::tokio; + use ripple_sdk::uuid::Uuid; use ripple_tdk::utils::test_utils::Mockable; use super::*; @@ -482,4 +514,125 @@ pub mod tests { AppEvents::get_listeners(&platform_state.app_events_state, "test_event", None); assert!(listeners.len() == 1); } + + /// Helper: add a listener with a specific CallContext to a given event/context. + fn add_listener_direct( + state: &PlatformState, + event: &str, + ctx: CallContext, + context: Option, + ) { + let mut listeners = state.app_events_state.listeners.write().unwrap(); + let vec = AppEvents::get_or_create_listener_vec(&mut listeners, event.to_string(), context); + vec.push(EventListener { + call_ctx: ctx, + session_tx: None, + decorator: None, + }); + } + + fn make_ctx(session_id: &str, app_id: &str, cid: Option<&str>) -> CallContext { + CallContext::new( + session_id.to_string(), + Uuid::new_v4().to_string(), + app_id.to_string(), + 1, + ripple_sdk::api::gateway::rpc_gateway_api::ApiProtocol::Extn, + "method".to_string(), + cid.map(|s| s.to_string()), + false, + ) + } + + #[tokio::test] + async fn test_remove_session_prunes_empty_entries() { + let state = PlatformState::mock(); + let session_id = "sess-1".to_string(); + let ctx = make_ctx(&session_id, "app1", None); + + // Register session so clear_session doesn't panic + let session = Session::new("app1".to_string(), None); + state.session_state.add_session(session_id.clone(), session); + + add_listener_direct(&state, "evt1", ctx.clone(), None); + + // Verify listener exists + assert_eq!(state.app_events_state.listeners.read().unwrap().len(), 1); + + // remove_session should remove the listener AND prune empty maps + AppEvents::remove_session(&state, session_id); + assert!(state.app_events_state.listeners.read().unwrap().is_empty()); + } + + #[tokio::test] + async fn test_remove_session_keeps_other_sessions() { + let state = PlatformState::mock(); + let ctx1 = make_ctx("sess-1", "app1", None); + let ctx2 = make_ctx("sess-2", "app2", None); + + let s1 = Session::new("app1".to_string(), None); + state.session_state.add_session("sess-1".to_string(), s1); + let s2 = Session::new("app2".to_string(), None); + state.session_state.add_session("sess-2".to_string(), s2); + + add_listener_direct(&state, "evt1", ctx1.clone(), None); + add_listener_direct(&state, "evt1", ctx2.clone(), None); + + AppEvents::remove_session(&state, "sess-1".to_string()); + + // evt1 should still exist with 1 listener + let listeners = state.app_events_state.listeners.read().unwrap(); + assert_eq!(listeners.len(), 1); + assert_eq!(listeners.get("evt1").unwrap().get(&None).unwrap().len(), 1); + } + + #[tokio::test] + async fn test_cleanup_by_connection_id_removes_matching() { + let state = PlatformState::mock(); + let cid = "conn-abc"; + let ctx = make_ctx("sess-1", "app1", Some(cid)); + + add_listener_direct(&state, "evt1", ctx.clone(), None); + add_listener_direct(&state, "evt2", ctx.clone(), Some("ctx-a".to_string())); + + assert_eq!(state.app_events_state.listeners.read().unwrap().len(), 2); + + AppEvents::cleanup_by_connection_id(&state, cid); + + // Both events should be fully pruned + assert!(state.app_events_state.listeners.read().unwrap().is_empty()); + } + + #[tokio::test] + async fn test_cleanup_by_connection_id_keeps_other_cids() { + let state = PlatformState::mock(); + let ctx1 = make_ctx("s1", "app1", Some("conn-1")); + let ctx2 = make_ctx("s2", "app2", Some("conn-2")); + + add_listener_direct(&state, "evt1", ctx1, None); + add_listener_direct(&state, "evt1", ctx2, None); + + AppEvents::cleanup_by_connection_id(&state, "conn-1"); + + let listeners = state.app_events_state.listeners.read().unwrap(); + assert_eq!(listeners.get("evt1").unwrap().get(&None).unwrap().len(), 1); + assert_eq!( + listeners.get("evt1").unwrap().get(&None).unwrap()[0] + .call_ctx + .cid + .as_deref(), + Some("conn-2") + ); + } + + #[tokio::test] + async fn test_cleanup_by_connection_id_no_match_is_noop() { + let state = PlatformState::mock(); + let ctx = make_ctx("s1", "app1", Some("conn-1")); + add_listener_direct(&state, "evt1", ctx, None); + + AppEvents::cleanup_by_connection_id(&state, "nonexistent"); + + assert_eq!(state.app_events_state.listeners.read().unwrap().len(), 1); + } } diff --git a/core/sdk/src/api/device/device_events.rs b/core/sdk/src/api/device/device_events.rs index 63e72bf22..52eb57e02 100644 --- a/core/sdk/src/api/device/device_events.rs +++ b/core/sdk/src/api/device/device_events.rs @@ -39,6 +39,9 @@ pub const VOICE_GUIDANCE_SPEED_CHANGED: &str = "voiceguidance.onSpeedChanged"; pub enum DeviceEvent { InputChanged, AudioChanged, + /// Signals that an app has disconnected and its event subscriptions should be cleaned up. + /// The callback_type.get_id() carries the app_id to clean. + Cleanup, } impl FromStr for DeviceEvent { @@ -48,6 +51,7 @@ impl FromStr for DeviceEvent { match s { "device.onHdcpChanged" => Ok(Self::InputChanged), "device.onAudioChanged" => Ok(Self::AudioChanged), + "device.cleanup" => Ok(Self::Cleanup), _ => Err(()), } } @@ -91,6 +95,7 @@ impl ExtnPayloadProvider for DeviceEventRequest { match self.event { DeviceEvent::InputChanged => RippleContract::DeviceEvents(EventAdjective::Input), DeviceEvent::AudioChanged => RippleContract::DeviceEvents(EventAdjective::Audio), + DeviceEvent::Cleanup => RippleContract::DeviceEvents(EventAdjective::Input), } } @@ -107,12 +112,40 @@ mod tests { #[rstest(input, expected, case("device.onHdcpChanged", Ok(DeviceEvent::InputChanged)), + case("device.cleanup", Ok(DeviceEvent::Cleanup)), case("invalid_event", Err(())), )] fn test_from_str(input: &str, expected: Result) { assert_eq!(DeviceEvent::from_str(input), expected); } + #[test] + fn test_cleanup_event_request_payload_roundtrip() { + let request = DeviceEventRequest { + event: DeviceEvent::Cleanup, + subscribe: false, + callback_type: DeviceEventCallback::FireboltAppEvent("epg".to_string()), + }; + let payload = request.get_extn_payload(); + let recovered = DeviceEventRequest::get_from_payload(payload).unwrap(); + assert_eq!(recovered.event, DeviceEvent::Cleanup); + assert_eq!(recovered.callback_type.get_id(), "epg"); + } + + #[test] + fn test_cleanup_event_contract() { + let request = DeviceEventRequest { + event: DeviceEvent::Cleanup, + subscribe: true, + callback_type: DeviceEventCallback::FireboltAppEvent("app1".to_string()), + }; + // Cleanup routes through the same contract as Input events + assert_eq!( + request.get_contract(), + RippleContract::DeviceEvents(EventAdjective::Input) + ); + } + #[rstest] #[case( DeviceEventCallback::FireboltAppEvent("app_event".to_string()), diff --git a/core/sdk/src/extn/client/extn_client.rs b/core/sdk/src/extn/client/extn_client.rs index c672e4941..d0e10ff09 100644 --- a/core/sdk/src/extn/client/extn_client.rs +++ b/core/sdk/src/extn/client/extn_client.rs @@ -472,7 +472,7 @@ impl ExtnClient { } }); } else { - error!("No response processor for {:?}", msg); + debug!("No response processor for {:?}", msg); } } diff --git a/device/thunder_ripple_sdk/src/events/thunder_event_processor.rs b/device/thunder_ripple_sdk/src/events/thunder_event_processor.rs index 59ebdee5d..9c68c75a2 100644 --- a/device/thunder_ripple_sdk/src/events/thunder_event_processor.rs +++ b/device/thunder_ripple_sdk/src/events/thunder_event_processor.rs @@ -93,6 +93,10 @@ impl ThunderEventMessage { value.clone(), ))) } + DeviceEvent::Cleanup => { + // Cleanup is not a real Thunder event, skip + return None; + } } } else { debug!( @@ -316,6 +320,38 @@ impl ThunderEventProcessor { let mut back_off_map = self.back_off.write().unwrap(); back_off_map.remove(event_name) } + + /// Remove all event listeners for a given app_id (session cleanup on disconnect). + /// Returns the list of event names that no longer have any listeners and were removed. + pub fn cleanup_by_app_id(&self, app_id: &str) -> Vec { + let removed_events = { + let mut event_map = self.event_map.write().unwrap(); + let mut removed = Vec::new(); + let event_names: Vec = event_map.keys().cloned().collect(); + for event_name in event_names { + if let Some(handler) = event_map.get_mut(&event_name) { + handler.listeners.retain(|id| id != app_id); + if handler.listeners.is_empty() { + event_map.remove(&event_name); + removed.push(event_name.clone()); + } + } + } + removed + }; // event_map lock dropped here + // Clean last_event entries for removed events to free heap + if !removed_events.is_empty() { + let mut last_event_map = self.last_event.write().unwrap(); + for event_name in &removed_events { + last_event_map.remove(event_name); + } + } + debug!( + "cleanup_by_app_id: app_id={}, removed_events={:?}", + app_id, removed_events + ); + removed_events + } } impl Default for ThunderEventProcessor { @@ -323,3 +359,105 @@ impl Default for ThunderEventProcessor { Self::new() } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::client::device_operator::DeviceSubscribeRequest; + + fn make_handler(listeners: Vec<&str>) -> ThunderEventHandler { + ThunderEventHandler { + request: DeviceSubscribeRequest { + module: "TestModule".to_string(), + event_name: "testEvent".to_string(), + params: None, + sub_id: None, + }, + handle: |_, _, _| {}, + is_valid: |_| true, + listeners: listeners.into_iter().map(|s| s.to_string()).collect(), + id: "test_id".to_string(), + callback_type: DeviceEventCallback::FireboltAppEvent("test".to_string()), + } + } + + #[test] + fn test_cleanup_by_app_id_removes_sole_listener() { + let processor = ThunderEventProcessor::new(); + { + let mut map = processor.event_map.write().unwrap(); + map.insert("event.a".to_string(), make_handler(vec!["epg"])); + } + // Also add a last_event entry + { + let mut last = processor.last_event.write().unwrap(); + last.insert("event.a".to_string(), serde_json::json!({"foo": 1})); + } + + let removed = processor.cleanup_by_app_id("epg"); + + assert_eq!(removed, vec!["event.a".to_string()]); + assert!(processor.event_map.read().unwrap().is_empty()); + assert!(processor.last_event.read().unwrap().is_empty()); + } + + #[test] + fn test_cleanup_by_app_id_keeps_other_listeners() { + let processor = ThunderEventProcessor::new(); + { + let mut map = processor.event_map.write().unwrap(); + map.insert("event.a".to_string(), make_handler(vec!["epg", "settings"])); + } + + let removed = processor.cleanup_by_app_id("epg"); + + assert!(removed.is_empty()); + let map = processor.event_map.read().unwrap(); + let handler = map.get("event.a").unwrap(); + assert_eq!(handler.listeners, vec!["settings".to_string()]); + } + + #[test] + fn test_cleanup_by_app_id_multiple_events() { + let processor = ThunderEventProcessor::new(); + { + let mut map = processor.event_map.write().unwrap(); + map.insert("event.a".to_string(), make_handler(vec!["epg"])); + map.insert("event.b".to_string(), make_handler(vec!["epg", "other"])); + map.insert("event.c".to_string(), make_handler(vec!["other"])); + } + { + let mut last = processor.last_event.write().unwrap(); + last.insert("event.a".to_string(), serde_json::json!(1)); + last.insert("event.b".to_string(), serde_json::json!(2)); + } + + let removed = processor.cleanup_by_app_id("epg"); + + assert_eq!(removed, vec!["event.a".to_string()]); + let map = processor.event_map.read().unwrap(); + assert_eq!(map.len(), 2); // event.b and event.c remain + assert_eq!( + map.get("event.b").unwrap().listeners, + vec!["other".to_string()] + ); + // last_event for event.a should be cleaned, event.b kept + let last = processor.last_event.read().unwrap(); + assert!(!last.contains_key("event.a")); + assert!(last.contains_key("event.b")); + } + + #[test] + fn test_cleanup_by_app_id_no_match() { + let processor = ThunderEventProcessor::new(); + { + let mut map = processor.event_map.write().unwrap(); + map.insert("event.a".to_string(), make_handler(vec!["epg"])); + } + + let removed = processor.cleanup_by_app_id("nonexistent"); + + assert!(removed.is_empty()); + assert_eq!(processor.event_map.read().unwrap().len(), 1); + } +} diff --git a/device/thunder_ripple_sdk/src/processors/thunder_events.rs b/device/thunder_ripple_sdk/src/processors/thunder_events.rs index 43a62329b..f0128bc6b 100644 --- a/device/thunder_ripple_sdk/src/processors/thunder_events.rs +++ b/device/thunder_ripple_sdk/src/processors/thunder_events.rs @@ -16,7 +16,7 @@ // use ripple_sdk::{ - api::session::EventAdjective, framework::ripple_contract::RippleContract, + api::session::EventAdjective, framework::ripple_contract::RippleContract, log::info, utils::error::RippleError, }; @@ -106,6 +106,19 @@ impl ExtnRequestProcessor for ThunderOpenEventsProcessor { id.clone(), HDCPEventHandler::provide(id, callback_type), )), + DeviceEvent::Cleanup => { + // Clean up all event subscriptions for this app + let removed_events = state.event_processor.cleanup_by_app_id(&id); + if !removed_events.is_empty() { + info!( + "ThunderEventProcessor cleanup: removed {} event(s) for app {}: {:?}", + removed_events.len(), + id, + removed_events + ); + } + return true; + } } { v.await; Self::ack(state.get_client(), msg).await.is_ok()