diff --git a/score/mw/com/example/com-api-example/basic-consumer-producer.rs b/score/mw/com/example/com-api-example/basic-consumer-producer.rs index 1c2d60b39..41817f95c 100644 --- a/score/mw/com/example/com-api-example/basic-consumer-producer.rs +++ b/score/mw/com/example/com-api-example/basic-consumer-producer.rs @@ -189,6 +189,9 @@ fn run_with_runtime(name: &str, runtime: &R) { Ok(_) => println!("Successfully unoffered the service"), Err(e) => eprintln!("Failed to unoffer: {:?}", e), } + + //UnSubscribe the event + let _ = monitor.tire_subscriber.unsubscribe(); println!("=== {name} runtime completed ===\n"); } diff --git a/score/mw/com/impl/rust/com-api/com-api-ffi-lola/bridge_ffi.rs b/score/mw/com/impl/rust/com-api/com-api-ffi-lola/bridge_ffi.rs index 72c25a9dd..b97aea0cd 100644 --- a/score/mw/com/impl/rust/com-api/com-api-ffi-lola/bridge_ffi.rs +++ b/score/mw/com/impl/rust/com-api/com-api-ffi-lola/bridge_ffi.rs @@ -450,6 +450,12 @@ extern "C" { /// # Arguments /// * `handle` - Opaque pointer to FindServiceHandle returned by mw_com_start_find_service fn mw_com_stop_find_service(handle: *mut FindServiceHandle); + + /// Unsubscribe from event to stop receiving samples + /// + /// # Arguments + /// * `event_ptr` - Opaque event pointer + fn mw_com_proxy_event_unsubscribe(event_ptr: *mut ProxyEventBase); } /// Get allocatee pointer from skeleton event of specific type @@ -784,6 +790,20 @@ pub unsafe fn subscribe_to_event(event_ptr: *mut ProxyEventBase, max_sample_coun mw_com_proxy_event_subscribe(event_ptr, max_sample_count) } +/// Unsafe wrapper around mw_com_proxy_event_unsubscribe +/// +/// # Arguments +/// * `event_ptr` - Opaque event pointer +/// +/// # Safety +/// event_ptr must be a valid pointer to a ProxyEventBase previously obtained from get_event_from_proxy(). +/// This function should be called only when no `SamplePtr` is held by the user for this event. +pub unsafe fn unsubscribe_to_event(event_ptr: *mut ProxyEventBase) { + // SAFETY: event_ptr is guaranteed to be valid per the caller's contract. + // The C++ implementation handles unsubscription and buffer cleanup safely. + mw_com_proxy_event_unsubscribe(event_ptr); +} + /// Unsafe wrapper around mw_com_proxy_set_event_receive_handler /// /// # Arguments diff --git a/score/mw/com/impl/rust/com-api/com-api-ffi-lola/registry_bridge_macro.cpp b/score/mw/com/impl/rust/com-api/com-api-ffi-lola/registry_bridge_macro.cpp index 350500e6b..dcdff4ce8 100644 --- a/score/mw/com/impl/rust/com-api/com-api-ffi-lola/registry_bridge_macro.cpp +++ b/score/mw/com/impl/rust/com-api/com-api-ffi-lola/registry_bridge_macro.cpp @@ -138,6 +138,18 @@ bool mw_com_proxy_event_subscribe(ProxyEventBase* event_ptr, uint32_t max_sample return true; } +/// \brief Unsubscribe from a proxy event to release sample buffers +/// \details Must be called only after no `SamplePtr` instances are held on the Rust side +/// \param event_ptr Opaque event pointer (ProxyEventBase*) +void mw_com_proxy_event_unsubscribe(ProxyEventBase* event_ptr) +{ + if (event_ptr == nullptr) + { + return; + } + event_ptr->Unsubscribe(); +} + /// \brief Create proxy instance dynamically /// \details Creates a proxy for the given interface UID using the provided handle. /// \param interface_id UTF-8 string view of interface UID (e.g., "mw_com_IpcBridge") diff --git a/score/mw/com/impl/rust/com-api/com-api-runtime-lola/BUILD b/score/mw/com/impl/rust/com-api/com-api-runtime-lola/BUILD index 1b5ac2a5d..c35184833 100644 --- a/score/mw/com/impl/rust/com-api/com-api-runtime-lola/BUILD +++ b/score/mw/com/impl/rust/com-api/com-api-runtime-lola/BUILD @@ -11,7 +11,7 @@ # SPDX-License-Identifier: Apache-2.0 # ******************************************************************************* -load("@rules_rust//rust:defs.bzl", "rust_library", "rust_test") +load("@rules_rust//rust:defs.bzl", "rust_doc_test", "rust_library", "rust_test") rust_library( name = "com-api-runtime-lola", @@ -40,3 +40,9 @@ rust_test( tags = ["manual"], deps = [":com-api-runtime-lola"], ) + +rust_doc_test( + name = "com-api-runtime-lola-doc-tests", + crate = ":com-api-runtime-lola", + deps = [":com-api-runtime-lola"], +) diff --git a/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs b/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs index ca4ba4a74..48a979ecb 100644 --- a/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs +++ b/score/mw/com/impl/rust/com-api/com-api-runtime-lola/consumer.rs @@ -318,8 +318,8 @@ impl Subscriber for SubscribableImpl max_num_samples, instance_info, waker_storage: Arc::default(), - _proxy: self.proxy_instance.clone(), async_init_status: AtomicBool::new(false), + _proxy: self.proxy_instance.clone(), _phantom: PhantomData, }) } @@ -419,21 +419,25 @@ where max_num_samples: usize, instance_info: LolaConsumerInfo, waker_storage: Arc, - _proxy: ProxyInstanceManager, async_init_status: AtomicBool, + _proxy: ProxyInstanceManager, _phantom: PhantomData, } impl Drop for SubscriberImpl { fn drop(&mut self) { - //SAFETY: it is safe to clear the event receive handler because event ptr is valid - // which was obtained from valid proxy instance and the callback set for this event stream will be dropped after this, - // so it won't be called after the handler is cleared + // SAFETY: It is safe to unsubscribe from the event because the event pointer is valid and was created during subscription. + // Exculsive access to the event pointer is guaranteed by the ProxyEventManagerGuard. + // Unset the receive handler to release the async callback, and then unsubscribe from the event to clean up resources on the C++ side. + let mut guard = self.event.get_proxy_event(); unsafe { - bridge_ffi_rs::clear_event_receive_handler( - self.event.get_proxy_event().deref_mut(), - T::ID, - ); + if self + .async_init_status + .load(std::sync::atomic::Ordering::Relaxed) + { + bridge_ffi_rs::clear_event_receive_handler(guard.deref_mut(), T::ID); + } + bridge_ffi_rs::unsubscribe_to_event(guard.deref_mut()); } } } @@ -466,6 +470,7 @@ where type Sample<'a> = Sample; fn unsubscribe(self) -> Self::Subscriber { + //Unsubscribe FFI call will be triggered in Drop implementation of SubscriberImpl. SubscribableImpl { identifier: self.event_id, instance_info: self.instance_info.clone(), @@ -992,24 +997,16 @@ mod test { #[test] fn test_discovery_state_data_creation() { // Test that DiscoveryStateData can be created with None values - let state = super::DiscoveryStateData { - find_handle: None, - handles: None, - }; - assert!(state.find_handle.is_none()); + let state = super::DiscoveryStateData { handles: None }; assert!(state.handles.is_none()); } #[test] fn test_discovery_state_data_debug() { // Test that DiscoveryStateData debug formatting works - let state = super::DiscoveryStateData { - find_handle: None, - handles: None, - }; + let state = super::DiscoveryStateData { handles: None }; let debug_string = format!("{:?}", state); assert!(debug_string.contains("DiscoveryStateData")); - assert!(debug_string.contains("find_handle")); assert!(debug_string.contains("handles")); } @@ -1042,3 +1039,32 @@ mod test { let _result = discovery.get_available_instances(); } } + +mod doctest { + + /// Calling `unsubscribe` while a `SampleContainer` holding samples whose lifetime + /// is tied to the subscription is still in scope so it must not compile. + /// + /// `try_receive` fills the container with `S::Sample<'a>` where `'a` is the + /// borrow lifetime of `self`. The borrow checker therefore prevents moving + /// `self` (via `unsubscribe`) while the container and the samples it may hold + /// are still in scope. + /// + /// ``` compile_fail + /// use com_api_concept::{CommData, SampleContainer, Subscription}; + /// use com_api_runtime_lola::LolaRuntimeImpl; + /// + /// fn demonstrate_sample_container_lifetime_borrow(sub: S) + /// where + /// T: CommData + std::fmt::Debug, + /// S: Subscription, + /// { + /// let mut container = SampleContainer::new(2); + /// let _ = sub.try_receive(&mut container, 2); + /// sub.unsubscribe(); // ERROR: cannot move `sub` while `container` borrows it + /// drop(container); + /// } + /// ``` + #[cfg(doctest)] + fn unsubscribe_during_sample_container_is_in_scope() {} +}