Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ fn run_with_runtime<R: Runtime>(name: &str, runtime: &R) {
Ok(_) => println!("Successfully unoffered the service"),
Err(e) => eprintln!("Failed to unoffer: {:?}", e),
}

//UnSubscribe the event
let _ = monitor.tire_subscriber.unsubscribe();
println!("=== {name} runtime completed ===\n");
}

Expand Down
20 changes: 20 additions & 0 deletions score/mw/com/impl/rust/com-api/com-api-ffi-lola/bridge_ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,12 @@ extern "C" {
/// # Arguments
/// * `handle` - Opaque pointer to FindServiceHandle returned by mw_com_start_find_service
fn mw_com_stop_find_service(handle: *mut FindServiceHandle);

/// Unsubscribe from event to stop receiving samples
///
/// # Arguments
/// * `event_ptr` - Opaque event pointer
fn mw_com_proxy_event_unsubscribe(event_ptr: *mut ProxyEventBase);
}

/// Get allocatee pointer from skeleton event of specific type
Expand Down Expand Up @@ -784,6 +790,20 @@ pub unsafe fn subscribe_to_event(event_ptr: *mut ProxyEventBase, max_sample_coun
mw_com_proxy_event_subscribe(event_ptr, max_sample_count)
}

/// Unsafe wrapper around mw_com_proxy_event_unsubscribe
///
/// # Arguments
/// * `event_ptr` - Opaque event pointer
///
/// # Safety
/// event_ptr must be a valid pointer to a ProxyEventBase previously obtained from get_event_from_proxy().
/// This function should be called only when no `SamplePtr` is held by the user for this event.
pub unsafe fn unsubscribe_to_event(event_ptr: *mut ProxyEventBase) {
// SAFETY: event_ptr is guaranteed to be valid per the caller's contract.
// The C++ implementation handles unsubscription and buffer cleanup safely.
mw_com_proxy_event_unsubscribe(event_ptr);
}

/// Unsafe wrapper around mw_com_proxy_set_event_receive_handler
///
/// # Arguments
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,18 @@ bool mw_com_proxy_event_subscribe(ProxyEventBase* event_ptr, uint32_t max_sample
return true;
}

/// \brief Unsubscribe from a proxy event to release sample buffers
/// \details Must be called only after no `SamplePtr` instances are held on the Rust side
/// \param event_ptr Opaque event pointer (ProxyEventBase*)
void mw_com_proxy_event_unsubscribe(ProxyEventBase* event_ptr)
{
if (event_ptr == nullptr)
{
return;
}
event_ptr->Unsubscribe();
}

/// \brief Create proxy instance dynamically
/// \details Creates a proxy for the given interface UID using the provided handle.
/// \param interface_id UTF-8 string view of interface UID (e.g., "mw_com_IpcBridge")
Expand Down
8 changes: 7 additions & 1 deletion score/mw/com/impl/rust/com-api/com-api-runtime-lola/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# SPDX-License-Identifier: Apache-2.0
# *******************************************************************************

load("@rules_rust//rust:defs.bzl", "rust_library", "rust_test")
load("@rules_rust//rust:defs.bzl", "rust_doc_test", "rust_library", "rust_test")

rust_library(
name = "com-api-runtime-lola",
Expand Down Expand Up @@ -40,3 +40,9 @@ rust_test(
tags = ["manual"],
deps = [":com-api-runtime-lola"],
)

rust_doc_test(
name = "com-api-runtime-lola-doc-tests",
crate = ":com-api-runtime-lola",
deps = [":com-api-runtime-lola"],
)
64 changes: 45 additions & 19 deletions score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,8 @@ impl<T: CommData + Debug> Subscriber<T, LolaRuntimeImpl> for SubscribableImpl<T>
max_num_samples,
instance_info,
waker_storage: Arc::default(),
_proxy: self.proxy_instance.clone(),
async_init_status: AtomicBool::new(false),
_proxy: self.proxy_instance.clone(),
_phantom: PhantomData,
})
}
Expand Down Expand Up @@ -419,21 +419,25 @@ where
max_num_samples: usize,
instance_info: LolaConsumerInfo,
waker_storage: Arc<AtomicWaker>,
_proxy: ProxyInstanceManager,
async_init_status: AtomicBool,
_proxy: ProxyInstanceManager,
_phantom: PhantomData<T>,
}

impl<T: CommData + Debug> Drop for SubscriberImpl<T> {
fn drop(&mut self) {
//SAFETY: it is safe to clear the event receive handler because event ptr is valid
// which was obtained from valid proxy instance and the callback set for this event stream will be dropped after this,
// so it won't be called after the handler is cleared
// SAFETY: It is safe to unsubscribe from the event because the event pointer is valid and was created during subscription.
// Exculsive access to the event pointer is guaranteed by the ProxyEventManagerGuard.
// Unset the receive handler to release the async callback, and then unsubscribe from the event to clean up resources on the C++ side.
let mut guard = self.event.get_proxy_event();
unsafe {
bridge_ffi_rs::clear_event_receive_handler(
self.event.get_proxy_event().deref_mut(),
T::ID,
);
if self
.async_init_status
.load(std::sync::atomic::Ordering::Relaxed)
{
bridge_ffi_rs::clear_event_receive_handler(guard.deref_mut(), T::ID);
}
bridge_ffi_rs::unsubscribe_to_event(guard.deref_mut());
}
}
}
Expand Down Expand Up @@ -466,6 +470,7 @@ where
type Sample<'a> = Sample<T>;

fn unsubscribe(self) -> Self::Subscriber {
//Unsubscribe FFI call will be triggered in Drop implementation of SubscriberImpl.
SubscribableImpl {
identifier: self.event_id,
instance_info: self.instance_info.clone(),
Expand Down Expand Up @@ -992,24 +997,16 @@ mod test {
#[test]
fn test_discovery_state_data_creation() {
// Test that DiscoveryStateData can be created with None values
let state = super::DiscoveryStateData {
find_handle: None,
handles: None,
};
assert!(state.find_handle.is_none());
let state = super::DiscoveryStateData { handles: None };
Copy link
Copy Markdown
Contributor Author

@bharatGoswami8 bharatGoswami8 Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is just cleanup from last PR.

assert!(state.handles.is_none());
}

#[test]
fn test_discovery_state_data_debug() {
// Test that DiscoveryStateData debug formatting works
let state = super::DiscoveryStateData {
find_handle: None,
handles: None,
};
let state = super::DiscoveryStateData { handles: None };
let debug_string = format!("{:?}", state);
assert!(debug_string.contains("DiscoveryStateData"));
assert!(debug_string.contains("find_handle"));
assert!(debug_string.contains("handles"));
}

Expand Down Expand Up @@ -1042,3 +1039,32 @@ mod test {
let _result = discovery.get_available_instances();
}
}

mod doctest {

/// Calling `unsubscribe` while a `SampleContainer` holding samples whose lifetime
/// is tied to the subscription is still in scope so it must not compile.
///
/// `try_receive` fills the container with `S::Sample<'a>` where `'a` is the
/// borrow lifetime of `self`. The borrow checker therefore prevents moving
/// `self` (via `unsubscribe`) while the container and the samples it may hold
/// are still in scope.
///
/// ``` compile_fail
/// use com_api_concept::{CommData, SampleContainer, Subscription};
/// use com_api_runtime_lola::LolaRuntimeImpl;
///
/// fn demonstrate_sample_container_lifetime_borrow<T, S>(sub: S)
/// where
/// T: CommData + std::fmt::Debug,
/// S: Subscription<T, LolaRuntimeImpl>,
/// {
/// let mut container = SampleContainer::new(2);
/// let _ = sub.try_receive(&mut container, 2);
/// sub.unsubscribe(); // ERROR: cannot move `sub` while `container` borrows it
/// drop(container);
/// }
/// ```
#[cfg(doctest)]
fn unsubscribe_during_sample_container_is_in_scope() {}
}
Loading