Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 151 additions & 5 deletions core/main/src/broker/endpoint_broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ pub struct BrokerCleaner {
impl BrokerCleaner {
async fn cleanup_session(&self, appid: &str) -> Result<String, RippleError> {
if let Some(cleaner) = self.cleaner.clone() {
if let Err(e) = cleaner.try_send(appid.to_owned()) {
error!("Couldnt cleanup {} {:?}", appid, e);
if let Err(e) = cleaner.send(appid.to_owned()).await {
error!("Could not clean up {} {:?}", appid, e);
Comment thread
kvfasil marked this conversation as resolved.
Dismissed
return Err(RippleError::SendFailure);
}
return Ok(appid.to_owned());
Comment thread
kvfasil marked this conversation as resolved.
Expand Down Expand Up @@ -1095,14 +1095,52 @@ impl EndpointBrokerState {
}

// Method to cleanup all subscription on App termination
pub async fn cleanup_for_app(&self, app_id: &str) {
pub async fn cleanup_for_app(&self, id: &str) {
let cleaners = { self.cleaner_list.read().unwrap().clone() };

for cleaner in cleaners {
/*
for now, just eat the error - the return type was mainly added to prepate for future refactoring/testability
*/
let _ = cleaner.cleanup_session(app_id).await;
let _ = cleaner.cleanup_session(id).await;
}

// Clean up subscription entries from request_map and extension_request_map
// that belong to this app. These are never removed on disconnect otherwise.
self.cleanup_request_maps(id);
}

/// Remove subscription/event entries from request_map and extension_request_map
/// matching the given identifier (may be app_id, session_id, or connection_id).
/// Without this, subscription entries in request_map (guarded by is_subscription())
/// and event entries in extension_request_map persist forever.
fn cleanup_request_maps(&self, id: &str) {
let removed_ids: Vec<u64> = {
Comment thread
kvfasil marked this conversation as resolved.
let mut request_map = self.request_map.write().unwrap();
let ids_to_remove: Vec<u64> = request_map
.iter()
.filter(|(_, req)| {
req.rpc.ctx.app_id == id
|| req.rpc.ctx.session_id == id
|| req.rpc.ctx.cid.as_deref() == Some(id)
})
.map(|(id, _)| *id)
.collect();
for id in &ids_to_remove {
request_map.remove(id);
}
ids_to_remove
};

if !removed_ids.is_empty() {
let mut extn_map = self.extension_request_map.write().unwrap();
for id in &removed_ids {
extn_map.remove(id);
Comment thread
kvfasil marked this conversation as resolved.
}
debug!(
Comment thread
github-advanced-security[bot] marked this conversation as resolved.
Fixed
"cleanup_request_maps: removed {} request_map and extension_request_map entries",
removed_ids.len()
);
}
}
/// Send a request through the broker and wait for response with a oneshot channel and custom timeout
Expand Down Expand Up @@ -1418,7 +1456,7 @@ impl BrokerOutputForwarder {
)
.await;
} else {
error!(
debug!(
"start_forwarder:{} request not found for {:?}",
line!(),
response
Expand Down Expand Up @@ -2487,6 +2525,114 @@ mod endpoint_broker_tests {
// // assert!(state.get_request(2).is_ok());
// // assert!(state.get_request(1).is_ok());
// }

#[cfg(test)]
mod cleanup_request_maps {
use super::*;
use crate::broker::rules::rules_engine::{Rule, RuleTransform};
use ripple_sdk::api::gateway::rpc_gateway_api::RpcRequest;
use ripple_sdk::Mockable;
use serial_test::serial;

fn make_state() -> EndpointBrokerState {
let (tx, _) = channel(2);
let client = RippleClient::new(ChannelsState::new());
EndpointBrokerState::new(
OpMetricState::default(),
tx,
RuleEngine {
rules: RuleSet::default(),
functions: HashMap::default(),
},
client,
)
}

fn default_rule() -> Rule {
Rule {
alias: "test.method".to_owned(),
transform: RuleTransform::default(),
endpoint: None,
filter: None,
event_handler: None,
sources: None,
}
}

#[serial]
#[tokio::test]
async fn test_cleanup_by_app_id() {
let state = make_state();
let mut req = RpcRequest::mock();
req.ctx.app_id = "epg".to_string();
state.update_request(&req, &default_rule(), None, None, vec![]);

assert_eq!(state.request_map.read().unwrap().len(), 1);
state.cleanup_request_maps("epg");
assert!(state.request_map.read().unwrap().is_empty());
}

#[serial]
#[tokio::test]
async fn test_cleanup_by_session_id() {
let state = make_state();
let mut req = RpcRequest::mock();
req.ctx.session_id = "sess-123".to_string();
req.ctx.app_id = "other".to_string();
state.update_request(&req, &default_rule(), None, None, vec![]);

state.cleanup_request_maps("sess-123");
assert!(state.request_map.read().unwrap().is_empty());
}

#[serial]
#[tokio::test]
async fn test_cleanup_by_cid() {
let state = make_state();
let mut req = RpcRequest::mock();
req.ctx.cid = Some("conn-xyz".to_string());
req.ctx.app_id = "other".to_string();
state.update_request(&req, &default_rule(), None, None, vec![]);

state.cleanup_request_maps("conn-xyz");
assert!(state.request_map.read().unwrap().is_empty());
}

#[serial]
#[tokio::test]
async fn test_cleanup_no_match_is_noop() {
let state = make_state();
let req = RpcRequest::mock();
state.update_request(&req, &default_rule(), None, None, vec![]);

state.cleanup_request_maps("nonexistent");
assert_eq!(state.request_map.read().unwrap().len(), 1);
}

#[serial]
#[tokio::test]
async fn test_cleanup_also_removes_extension_request_map() {
let state = make_state();
let mut req = RpcRequest::mock();
req.ctx.app_id = "epg".to_string();
state.update_request(&req, &default_rule(), None, None, vec![]);
let id = {
let map = state.request_map.read().unwrap();
*map.keys().next().unwrap()
};
{
let mut extn_map = state.extension_request_map.write().unwrap();
extn_map.insert(
id,
ripple_sdk::extn::extn_client_message::ExtnMessage::default(),
);
}

state.cleanup_request_maps("epg");
assert!(state.request_map.read().unwrap().is_empty());
assert!(state.extension_request_map.read().unwrap().is_empty());
}
}
}

#[tokio::test]
Expand Down
47 changes: 46 additions & 1 deletion core/main/src/broker/rules/rules_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
//
// SPDX-License-Identifier: Apache-2.0
//
use jaq_interpret::{Ctx, FilterT, ParseCtx, RcIter, Val};
use jaq_interpret::{Ctx, Filter, FilterT, ParseCtx, RcIter, Val};
use ripple_sdk::api::{
gateway::rpc_gateway_api::RpcRequest, manifest::extn_manifest::ExtnManifest,
};
Expand All @@ -36,6 +36,22 @@ use super::rules_functions::{apply_functions, RulesFunction, RulesImport};
static BASE_PARSE_CTX_INIT: Once = Once::new();
static mut BASE_PARSE_CTX_PTR: Option<Mutex<ParseCtx>> = None;

static FILTER_CACHE_INIT: Once = Once::new();
static mut FILTER_CACHE_PTR: Option<Mutex<HashMap<String, Filter>>> = None;

fn get_filter_cache() -> MutexGuard<'static, HashMap<String, Filter>> {
FILTER_CACHE_INIT.call_once(|| unsafe {
FILTER_CACHE_PTR = Some(Mutex::new(HashMap::new()));
});
unsafe {
FILTER_CACHE_PTR
.as_ref()
.expect("FILTER_CACHE_PTR not initialized")
.lock()
.expect("Failed to lock FILTER_CACHE_PTR")
}
}

#[derive(Debug, Deserialize, Default, Clone)]
pub struct RuleSet {
#[serde(default)]
Expand Down Expand Up @@ -532,6 +548,26 @@ pub fn jq_compile(input: Value, filter: &str, reference: String) -> Result<Value
// which do not include filters in the standard library
// such as `map`, `select` etc.

// Check cache for previously compiled filter
{
let cache = get_filter_cache();
if let Some(cached_filter) = cache.get(filter) {
let f = cached_filter.clone();
drop(cache);
let inputs = RcIter::new(core::iter::empty());
let mut out = f.run((Ctx::new([], &inputs), Val::from(input)));
if let Some(Ok(v)) = out.next() {
info!(
"Ripple Gateway Rule Processing Time (cached): {},{}",
reference,
Utc::now().timestamp_millis() - start
);
return Ok(Value::from(v));
}
return Err(RippleError::ParseError);
}
}

// Parse the filter
let (f, errs) = jaq_parse::parse(filter, jaq_parse::main());
if !errs.is_empty() {
Expand All @@ -550,6 +586,15 @@ pub fn jq_compile(input: Value, filter: &str, reference: String) -> Result<Value
defs.errs.clear(); // Clear errors before returning
return Err(RippleError::RuleError);
}
// Drop defs lock before acquiring cache lock to avoid deadlock
drop(defs);

// Cache the compiled filter
{
let mut cache = get_filter_cache();
cache.insert(filter.to_string(), f.clone());
}
Comment thread
kvfasil marked this conversation as resolved.

let inputs = RcIter::new(core::iter::empty());
// iterator over the output values
let mut out = f.run((Ctx::new([], &inputs), Val::from(input)));
Expand Down
56 changes: 48 additions & 8 deletions core/main/src/broker/thunder_broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use ripple_sdk::{
gateway::rpc_gateway_api::{JsonRpcApiResponse, RpcRequest},
observability::log_signal::LogSignal,
},
log::{debug, error, info, trace},
log::{debug, error, info, trace, warn},
tokio::{
self,
sync::{mpsc, Mutex},
Expand Down Expand Up @@ -373,16 +373,56 @@ impl ThunderBroker {
broker_for_cleanup.subscription_map.write().unwrap().remove(&cleanup_request)
};
if let Some(mut cleanup) = value {
let sender = broker_for_cleanup.get_sender();
while let Some(mut v) = cleanup.pop() {
v.rpc = v.rpc.get_unsubscribe();
if (sender.send(v).await).is_err() {
error!("Cleanup Error for {}",&cleanup_request);
debug!(
"BrokerCleaner: unsubscribing {} subscription(s)",
cleanup.len()
);
// Send unregister calls directly to Thunder via the WebSocket.
// We must NOT route through the broker sender + prepare_request(),
// because prepare_request() calls self.unsubscribe() which looks up
// subscription_map — but we already removed entries above.
let binding = ws_tx_wrap.clone();
let mut ws_tx = binding.lock().await;
while let Some(v) = cleanup.pop() {
let (callsign, method) =
ThunderBroker::get_callsign_and_method_from_alias(&v.rule.alias);
if let Some(method) = method {
let unregister = serde_json::json!({
"jsonrpc": "2.0",
"method": format!("{}.unregister", callsign),
"params": {
"event": method,
"id": format!("{}", v.rpc.ctx.call_id)
}
});
Comment thread
kvfasil marked this conversation as resolved.
debug!(
"BrokerCleaner: sending Thunder unregister for {}.{}",
callsign, method
);
if let Err(e) = ws_tx.feed(Message::Text(unregister.to_string())).await {
error!(
"BrokerCleaner: failed to send unregister: {:?}",
e
);
}
} else {
warn!(
"BrokerCleaner: could not extract method from alias '{}', skipping unregister",
v.rule.alias
);
}
}

if let Err(e) = ws_tx.flush().await {
error!(
"BrokerCleaner: failed to flush unregister calls: {:?}",
e
);
}
} else {
debug!(
"BrokerCleaner: no subscriptions found, skipping"
);
}

}
}
}
Expand Down
Loading
Loading