From 0b65496999157c222c2aa87a90e0d5f1b534f00c Mon Sep 17 00:00:00 2001 From: Zhidong Peng Date: Wed, 4 Feb 2026 18:15:18 +0000 Subject: [PATCH 01/12] Support report extension status event and send out --- proxy_agent_shared/src/telemetry.rs | 80 ++++++++++ .../src/telemetry/event_logger.rs | 151 ++++++++++++++++-- .../src/telemetry/event_reader.rs | 91 +++++++++++ .../src/telemetry/telemetry_event.rs | 118 +++++++++++++- 4 files changed, 430 insertions(+), 10 deletions(-) diff --git a/proxy_agent_shared/src/telemetry.rs b/proxy_agent_shared/src/telemetry.rs index 29918cf1..684e6b1a 100644 --- a/proxy_agent_shared/src/telemetry.rs +++ b/proxy_agent_shared/src/telemetry.rs @@ -13,6 +13,10 @@ pub const GENERIC_EVENT_FILE_SEARCH_PATTERN: &str = r"^[0-9]+\.json$"; pub fn new_generic_event_file_name() -> String { format!("{}.json", misc_helpers::get_date_time_unix_nano()) } +pub const EXTENSION_EVENT_FILE_SEARCH_PATTERN: &str = r"^extension_[0-9]+\.json$"; +pub fn new_extension_event_file_name() -> String { + format!("extension_{}.json", misc_helpers::get_date_time_unix_nano()) +} /// Represents a telemetry event for TelemetryGenericLogsEvent #[derive(Serialize, Deserialize)] @@ -43,6 +47,53 @@ impl Event { } } +#[derive(Serialize, Deserialize, Clone)] +pub struct Extension { + pub name: String, + pub version: String, + pub is_internal: bool, + pub extension_type: String, +} + +#[derive(Serialize, Deserialize, Clone)] +pub struct OperationStatus { + pub operation_success: bool, + pub operation: String, + pub task_name: String, + pub message: String, + pub duration: i64, +} + +#[derive(Serialize, Deserialize, Clone)] +pub struct ExtensionStatusEvent { + pub extension: Extension, + pub operation_status: OperationStatus, + + pub event_pid: String, + pub event_tid: String, + pub time_stamp: String, +} + +impl ExtensionStatusEvent { + /// Create a new ExtensionStatusEvent + /// Rust does not recommend using too many arguments in a function, + /// so we use structs to group related arguments together. + /// # Arguments + /// * `extension` - The extension information + /// * `operation_status` - The operation status information + /// # Returns + /// A new instance of `ExtensionStatusEvent` + pub fn new(extension: Extension, operation_status: OperationStatus) -> Self { + ExtensionStatusEvent { + extension, + operation_status, + event_pid: std::process::id().to_string(), + event_tid: misc_helpers::get_thread_identity(), + time_stamp: misc_helpers::get_date_time_string_with_milliseconds(), + } + } +} + #[cfg(test)] mod tests { #[test] @@ -58,4 +109,33 @@ mod tests { assert_eq!(event.TaskName, "test task name".to_string()); assert_eq!(event.OperationId, "test operation id".to_string()); } + + #[test] + fn test_extension_status_event_new() { + let extension = super::Extension { + name: "test extension".to_string(), + version: "1.0.0".to_string(), + is_internal: true, + extension_type: "test type".to_string(), + }; + let operation_status = super::OperationStatus { + operation_success: true, + task_name: "test task".to_string(), + operation: "test operation".to_string(), + message: "test message".to_string(), + duration: 100, + }; + let event = super::ExtensionStatusEvent::new(extension.clone(), operation_status.clone()); + assert_eq!(event.extension.name, extension.name); + assert_eq!(event.extension.version, extension.version); + assert_eq!(event.extension.is_internal, extension.is_internal); + assert_eq!(event.extension.extension_type, extension.extension_type); + assert_eq!( + event.operation_status.operation_success, + operation_status.operation_success + ); + assert_eq!(event.operation_status.operation, operation_status.operation); + assert_eq!(event.operation_status.message, operation_status.message); + assert_eq!(event.operation_status.duration, operation_status.duration); + } } diff --git a/proxy_agent_shared/src/telemetry/event_logger.rs b/proxy_agent_shared/src/telemetry/event_logger.rs index 6ea58cd3..96bda630 100644 --- a/proxy_agent_shared/src/telemetry/event_logger.rs +++ b/proxy_agent_shared/src/telemetry/event_logger.rs @@ -12,11 +12,13 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; -pub const MAX_MESSAGE_LENGTH: usize = 1024 * 4; // 4KB - +const MAX_MESSAGE_LENGTH: usize = 1024 * 4; // 4KB static EVENT_QUEUE: Lazy> = Lazy::new(|| ConcurrentQueue::::bounded(1000)); static SHUT_DOWN: Lazy> = Lazy::new(|| Arc::new(AtomicBool::new(false))); +/// Store the event directory path, so that other modules can access it if needed. +static EVENTS_DIR: tokio::sync::OnceCell = tokio::sync::OnceCell::const_new(); +const MAX_EXTENSION_EVENT_FILE_COUNT: usize = 1000; pub async fn start( event_dir: PathBuf, @@ -37,6 +39,12 @@ pub async fn start( set_status_fn(message.to_string()); } + if EVENTS_DIR.set(event_dir.clone()).is_err() { + let message = "Event directory is already set, cannot set it again."; + set_status_fn(message.to_string()); + logger_manager::write_log(Level::Warn, message.to_string()); + } + let shutdown = SHUT_DOWN.clone(); if interval == Duration::default() { interval = Duration::from_secs(60); @@ -93,8 +101,7 @@ pub async fn start( } let mut file_path = event_dir.to_path_buf(); - - file_path.push(format!("{}.json", misc_helpers::get_date_time_unix_nano())); + file_path.push(crate::telemetry::new_generic_event_file_name()); match misc_helpers::json_write_to_file(&events, &file_path) { Ok(()) => { logger_manager::write_log( @@ -162,6 +169,59 @@ pub fn write_event_only(level: Level, message: String, method_name: &str, module }; } +pub fn report_extension_status_event( + extension: crate::telemetry::Extension, + operation_status: crate::telemetry::OperationStatus, +) { + let event_dir = match EVENTS_DIR.get() { + Some(dir) => dir.clone(), + None => { + logger_manager::write_log( + Level::Warn, + "Event directory is not set, cannot report extension status event.".to_string(), + ); + return; + } + }; + + // Check the event file counts, + // if it exceeds the max file number, drop the new events + match misc_helpers::search_files( + &event_dir, + crate::telemetry::EXTENSION_EVENT_FILE_SEARCH_PATTERN, + ) { + Ok(files) => { + if files.len() >= MAX_EXTENSION_EVENT_FILE_COUNT { + logger_manager::write_log(Level::Warn, format!( + "Event files exceed the max file count {}, drop and skip the write to disk.", + MAX_EXTENSION_EVENT_FILE_COUNT + )); + return; + } + } + Err(e) => { + logger_manager::write_log( + Level::Warn, + format!("Failed to get event files with error: {e}"), + ); + } + } + + let event = crate::telemetry::ExtensionStatusEvent::new(extension, operation_status); + let mut file_path = event_dir.to_path_buf(); + file_path.push(crate::telemetry::new_extension_event_file_name()); + if let Err(e) = misc_helpers::json_write_to_file(&event, &file_path) { + logger_manager::write_log( + Level::Warn, + format!( + "Failed to write extension status event to the file {} with error: {}", + file_path.display(), + e + ), + ); + } +} + #[cfg(test)] mod tests { use crate::misc_helpers; @@ -169,16 +229,41 @@ mod tests { use std::fs; use std::time::Duration; + const TEST_EVENTS_DIR: &str = "test_events_dir"; + const TEST_LOGGER_KEY: &str = "test_logger_key"; + #[tokio::test] async fn event_logger_test() { let mut temp_test_path = env::temp_dir(); - let logger_key = "event_logger_test"; - temp_test_path.push(logger_key); + temp_test_path.push(TEST_EVENTS_DIR); // clean up and ignore the clean up errors _ = fs::remove_dir_all(&temp_test_path); let mut events_dir: std::path::PathBuf = temp_test_path.to_path_buf(); events_dir.push("Events"); + // When EVENTS_DIR is not set, report_extension_status_event should return early + // This test verifies the function handles the case gracefully + // Note: Since EVENTS_DIR is a static OnceCell, if other tests set it first, + // this test will still pass but will write to that directory instead + + let extension = crate::telemetry::Extension { + name: "test_extension".to_string(), + version: "1.0.0".to_string(), + is_internal: false, + extension_type: "test_type".to_string(), + }; + let operation_status = crate::telemetry::OperationStatus { + operation_success: false, + operation: "test_operation".to_string(), + task_name: "test_task".to_string(), + message: "error message".to_string(), + duration: 50, + }; + + // This should not panic even if EVENTS_DIR is not set + super::report_extension_status_event(extension, operation_status); + + // Start the event logger loop and set the EVENTS_DIR let cloned_events_dir = events_dir.to_path_buf(); tokio::spawn(async { super::start(cloned_events_dir, Duration::from_millis(100), 3, |_| { @@ -190,7 +275,7 @@ mod tests { }); // write some events to the queue and flush to disk - write_events(logger_key).await; + write_events(TEST_LOGGER_KEY).await; let files = misc_helpers::get_files(&events_dir).unwrap(); let file_count = files.len(); @@ -201,7 +286,7 @@ mod tests { // write some events to the queue and flush to disk 3 times for _ in [0; 3] { - write_events(logger_key).await; + write_events(TEST_LOGGER_KEY).await; } let files = misc_helpers::get_files(&events_dir).unwrap(); @@ -216,7 +301,7 @@ mod tests { // wait for stop signal responded tokio::time::sleep(Duration::from_millis(500)).await; - write_events(logger_key).await; + write_events(TEST_LOGGER_KEY).await; let files = misc_helpers::get_files(&events_dir).unwrap(); assert_eq!( @@ -225,6 +310,54 @@ mod tests { "No more files could write to event folder after stop()" ); + // Create test extension and operation status + let extension = crate::telemetry::Extension { + name: "test_extension".to_string(), + version: "1.0.0".to_string(), + is_internal: true, + extension_type: "test_type".to_string(), + }; + let operation_status = crate::telemetry::OperationStatus { + operation_success: true, + operation: "test_operation".to_string(), + task_name: "test_task".to_string(), + message: "test_message".to_string(), + duration: 100, + }; + + // Call report_extension_status_event + super::report_extension_status_event(extension.clone(), operation_status.clone()); + + // Wait for the file to be written + tokio::time::sleep(Duration::from_millis(100)).await; + + // Verify extension event file was created + let files = misc_helpers::search_files( + &events_dir, + crate::telemetry::EXTENSION_EVENT_FILE_SEARCH_PATTERN, + ) + .unwrap(); + assert!( + !files.is_empty(), + "Extension status event file should be created" + ); + + // Read and verify the event content + let event: crate::telemetry::ExtensionStatusEvent = + misc_helpers::json_read_from_file(&files[0]).unwrap(); + assert_eq!(event.extension.name, extension.name); + assert_eq!(event.extension.version, extension.version); + assert_eq!(event.extension.is_internal, extension.is_internal); + assert_eq!(event.extension.extension_type, extension.extension_type); + assert_eq!( + event.operation_status.operation_success, + operation_status.operation_success + ); + assert_eq!(event.operation_status.operation, operation_status.operation); + assert_eq!(event.operation_status.task_name, operation_status.task_name); + assert_eq!(event.operation_status.message, operation_status.message); + assert_eq!(event.operation_status.duration, operation_status.duration); + _ = fs::remove_dir_all(&temp_test_path); } diff --git a/proxy_agent_shared/src/telemetry/event_reader.rs b/proxy_agent_shared/src/telemetry/event_reader.rs index 33a861bc..4ee990a1 100644 --- a/proxy_agent_shared/src/telemetry/event_reader.rs +++ b/proxy_agent_shared/src/telemetry/event_reader.rs @@ -5,10 +5,12 @@ //! //! The telemetry event files are written by the event_logger module. use crate::common_state::CommonState; +use crate::current_info; use crate::logger::logger_manager; use crate::misc_helpers; use crate::telemetry::event_sender; use crate::telemetry::telemetry_event::TelemetryEvent; +use crate::telemetry::telemetry_event::TelemetryExtensionEventsEvent; use crate::telemetry::telemetry_event::TelemetryGenericLogsEvent; use crate::telemetry::Event; use std::fs::remove_file; @@ -244,6 +246,95 @@ impl EventReader { } } } + + pub async fn start_extension_status_event_processor( + &self, + delay_start: bool, + interval: Option, + ) { + if delay_start { + // delay start the event_reader task to give additional CPU cycles to more important threads + tokio::time::sleep(Duration::from_secs(60)).await; + } + + logger_manager::write_info( + "telemetry extension status event reader task started.".to_string(), + ); + let interval = interval.unwrap_or(Duration::from_secs(60)); + let cancellation_token = self.common_state.get_cancellation_token(); + tokio::select! { + _ = self.loop_extension_status_event_processor(interval ) => {} + _ = cancellation_token.cancelled() => { + logger_manager::write_warn("cancellation token signal received, stop the telemetry extension status event reader task.".to_string()); + } + } + } + + async fn loop_extension_status_event_processor(&self, interval: Duration) { + loop { + self.process_extension_status_events().await; + tokio::time::sleep(interval).await; + } + } + + async fn process_extension_status_events(&self) -> usize { + let mut event_count: usize = 0; + // get all extension status event filenames in the directory + match misc_helpers::search_files( + &self.dir_path, + crate::telemetry::EXTENSION_EVENT_FILE_SEARCH_PATTERN, + ) { + Ok(files) => { + let file_count = files.len(); + for file in files { + event_count += self.process_one_extension_status_event_file(file).await; + } + logger_manager::write_info( format!( + "Telemetry event reader sent {event_count} extension status events from {file_count} files" + )); + } + Err(e) => { + logger_manager::write_warn(format!( + "Extension Status Event Files not found in directory {}: {}", + self.dir_path.display(), + e + )); + } + } + event_count + } + + async fn process_one_extension_status_event_file(&self, file: PathBuf) -> usize { + let mut num_events_logged = 0; + + match misc_helpers::json_read_from_file::(&file) { + Ok(event) => { + num_events_logged += 1; + let telemetry_event = TelemetryExtensionEventsEvent::from_extension_status_event( + &event, + self.execution_mode.clone(), + current_info::get_current_exe_version(), + ); + let telemetry_event = TelemetryEvent::ExtensionEvent(telemetry_event); + event_sender::enqueue_event(telemetry_event); + if let Err(e) = self.common_state.notify_telemetry_event().await { + logger_manager::write_warn(format!( + "Failed to notify telemetry event with error: {e}" + )); + } + } + Err(e) => { + logger_manager::write_warn(format!( + "EventReader:: Failed to read extension status event from file {}: {}", + file.display(), + e + )); + } + } + + Self::clean_file(file); + num_events_logged + } } #[cfg(test)] diff --git a/proxy_agent_shared/src/telemetry/telemetry_event.rs b/proxy_agent_shared/src/telemetry/telemetry_event.rs index a1e216d6..2b31c70b 100644 --- a/proxy_agent_shared/src/telemetry/telemetry_event.rs +++ b/proxy_agent_shared/src/telemetry/telemetry_event.rs @@ -3,12 +3,13 @@ //! This module contains the logic to generate the telemetry data to be send to wire server. -use crate::telemetry::Event; +use crate::telemetry::{Event, ExtensionStatusEvent}; use crate::{current_info, misc_helpers}; use once_cell::sync::Lazy; use serde_derive::{Deserialize, Serialize}; const METRICS_PROVIDER_ID: &str = "FFF0196F-EE4C-4EAF-9AA5-776F622DEB4F"; +const STATUS_PROVIDER_ID: &str = "69B669B9-4AF8-4C50-BDC4-6006FA76E975"; /// VmMetaData contains the metadata of the VM. /// The metadata is used to identify the VM and the image origin. @@ -158,6 +159,9 @@ impl TelemetryProvider { TelemetryEvent::GenericLogsEvent(event) => { xml.push_str(&event.to_xml_event(vm_data)); } + TelemetryEvent::ExtensionEvent(event) => { + xml.push_str(&event.to_xml_event(vm_data)); + } } } @@ -212,10 +216,17 @@ impl TelemetryData { return; } } + TelemetryEvent::ExtensionEvent(_) => { + if provider.id == STATUS_PROVIDER_ID { + provider.add_event(event); + return; + } + } } } let mut p = TelemetryProvider::new(match &event { TelemetryEvent::GenericLogsEvent(_) => METRICS_PROVIDER_ID.to_string(), + TelemetryEvent::ExtensionEvent(_) => STATUS_PROVIDER_ID.to_string(), }); p.add_event(event); self.providers.push(p); @@ -231,6 +242,11 @@ impl TelemetryData { return provider.remove_event(last_event); } } + TelemetryEvent::ExtensionEvent(_) => { + if provider.id == STATUS_PROVIDER_ID { + return provider.remove_event(last_event); + } + } } } None @@ -246,18 +262,21 @@ impl TelemetryData { #[derive(PartialEq, Eq, Hash, Clone)] pub enum TelemetryEvent { GenericLogsEvent(TelemetryGenericLogsEvent), + ExtensionEvent(TelemetryExtensionEventsEvent), } impl TelemetryEvent { pub fn get_provider_id(&self) -> String { match self { TelemetryEvent::GenericLogsEvent(_) => TelemetryGenericLogsEvent::get_provider_id(), + TelemetryEvent::ExtensionEvent(_) => TelemetryExtensionEventsEvent::get_provider_id(), } } pub fn to_xml_event(&self, vm_data: &TelemetryEventVMData) -> String { match self { TelemetryEvent::GenericLogsEvent(event) => event.to_xml_event(vm_data), + TelemetryEvent::ExtensionEvent(event) => event.to_xml_event(vm_data), } } } @@ -344,6 +363,103 @@ impl TelemetryGenericLogsEvent { } } +#[derive(PartialEq, Eq, Hash, Clone)] +pub struct TelemetryExtensionEventsEvent { + event_pid: u64, + event_tid: u64, + ga_version: String, + task_name: String, + opcode_name: String, + execution_mode: String, + + extension_type: String, + is_internal: bool, + name: String, + version: String, + operation: String, + operation_success: bool, + message: String, + duration: u64, +} + +impl TelemetryExtensionEventsEvent { + pub fn from_extension_status_event( + event: &ExtensionStatusEvent, + execution_mode: String, + ga_version: String, + ) -> Self { + TelemetryExtensionEventsEvent { + ga_version, + execution_mode, + event_pid: event.event_pid.parse::().unwrap_or(0), + event_tid: event.event_tid.parse::().unwrap_or(0), + opcode_name: event.time_stamp.to_string(), + extension_type: event.extension.extension_type.to_string(), + is_internal: event.extension.is_internal, + name: event.extension.name.to_string(), + version: event.extension.version.to_string(), + operation: event.operation_status.operation.to_string(), + task_name: event.operation_status.task_name.to_string(), + operation_success: event.operation_status.operation_success, + message: event.operation_status.message.to_string(), + duration: event.operation_status.duration as u64, + } + } + + pub fn get_provider_id() -> String { + STATUS_PROVIDER_ID.to_string() + } + + fn to_xml_event(&self, vm_data: &TelemetryEventVMData) -> String { + let mut xml: String = String::new(); + // Event ID 1 is for Extension Events + xml.push_str("", + misc_helpers::xml_escape(self.extension_type.to_string()) + )); + xml.push_str(&format!( + "", + if self.is_internal { "True" } else { "False" } + )); + xml.push_str(&format!( + "", + misc_helpers::xml_escape(self.name.to_string()) + )); + xml.push_str(&format!( + "", + misc_helpers::xml_escape(self.version.to_string()) + )); + xml.push_str(&format!( + "", + misc_helpers::xml_escape(self.operation.to_string()) + )); + xml.push_str(&format!( + "", + if self.operation_success { + "True" + } else { + "False" + } + )); + xml.push_str(&format!( + "", + misc_helpers::xml_escape(self.message.to_string()) + )); + xml.push_str(&format!( + "", + self.duration + )); + + xml.push_str("]]>"); + xml + } +} + static CURRENT_KEYWORD_NAME: Lazy = Lazy::new(|| KeywordName::new(current_info::get_cpu_arch()).to_json()); From c54e0e45ca6387b99212d5ff0ce8eb199790d411 Mon Sep 17 00:00:00 2001 From: Zhidong Peng Date: Wed, 4 Feb 2026 18:47:18 +0000 Subject: [PATCH 02/12] add more tests --- .../src/telemetry/event_reader.rs | 317 ++++++++++++++++++ .../src/telemetry/event_sender.rs | 198 ++++++++++- .../src/telemetry/telemetry_event.rs | 204 +++++++++++ 3 files changed, 713 insertions(+), 6 deletions(-) diff --git a/proxy_agent_shared/src/telemetry/event_reader.rs b/proxy_agent_shared/src/telemetry/event_reader.rs index 4ee990a1..8002dfae 100644 --- a/proxy_agent_shared/src/telemetry/event_reader.rs +++ b/proxy_agent_shared/src/telemetry/event_reader.rs @@ -450,4 +450,321 @@ mod tests { common_state.cancel_cancellation_token(); _ = fs::remove_dir_all(&temp_dir); } + + #[tokio::test] + async fn test_extension_status_event_processor() { + let mut temp_dir = env::temp_dir(); + temp_dir.push("test_extension_status_event_processor"); + + _ = fs::remove_dir_all(&temp_dir); + let mut events_dir = temp_dir.to_path_buf(); + events_dir.push("Events"); + misc_helpers::try_create_folder(&events_dir).unwrap(); + + let cancellation_token = CancellationToken::new(); + let common_state = CommonState::start_new(cancellation_token.clone()); + let event_reader = EventReader::new( + events_dir.clone(), + common_state.clone(), + "Test".to_string(), + "test_extension_status_event_processor".to_string(), + ); + + // Create test extension status event files + let extension = crate::telemetry::Extension { + name: "test_extension".to_string(), + version: "1.0.0".to_string(), + is_internal: true, + extension_type: "test_type".to_string(), + }; + let operation_status = crate::telemetry::OperationStatus { + operation_success: true, + operation: "test_operation".to_string(), + task_name: "test_task".to_string(), + message: "test_message".to_string(), + duration: 100, + }; + let event = crate::telemetry::ExtensionStatusEvent::new( + extension.clone(), + operation_status.clone(), + ); + + // Write extension event files with proper naming pattern + let mut file_path = events_dir.to_path_buf(); + file_path.push(crate::telemetry::new_extension_event_file_name()); + misc_helpers::json_write_to_file(&event, &file_path).unwrap(); + + tokio::time::sleep(Duration::from_millis(1)).await; + + let mut file_path2 = events_dir.to_path_buf(); + file_path2.push(crate::telemetry::new_extension_event_file_name()); + misc_helpers::json_write_to_file(&event, &file_path2).unwrap(); + + // Verify files were created + let files = misc_helpers::search_files( + &events_dir, + crate::telemetry::EXTENSION_EVENT_FILE_SEARCH_PATTERN, + ) + .unwrap(); + assert_eq!(2, files.len(), "Should have 2 extension event files"); + + // Process extension status events directly (without starting the loop) + let events_processed = event_reader.process_extension_status_events().await; + assert_eq!( + 2, events_processed, + "Should have processed 2 extension status events" + ); + + // Verify files were cleaned up after processing + let files = misc_helpers::search_files( + &events_dir, + crate::telemetry::EXTENSION_EVENT_FILE_SEARCH_PATTERN, + ) + .unwrap(); + assert!( + files.is_empty(), + "Extension event files should be cleaned up after processing" + ); + + // Test with non-matching file names (should not be processed) + let mut non_matching_file = events_dir.to_path_buf(); + non_matching_file.push("not_extension_event.json"); + misc_helpers::json_write_to_file(&event, &non_matching_file).unwrap(); + + let events_processed = event_reader.process_extension_status_events().await; + assert_eq!( + 0, events_processed, + "Should not process files with non-matching names" + ); + + // Non-matching file should still exist + assert!( + non_matching_file.exists(), + "Non-matching file should not be cleaned up" + ); + + // Test start_extension_status_event_processor with cancellation + // Write another event file + let mut file_path3 = events_dir.to_path_buf(); + file_path3.push(crate::telemetry::new_extension_event_file_name()); + misc_helpers::json_write_to_file(&event, &file_path3).unwrap(); + + // Start the processor in a separate task + let event_reader_for_task = EventReader::new( + events_dir.clone(), + common_state.clone(), + "Test".to_string(), + "test_extension_status_event_processor".to_string(), + ); + let handle = tokio::spawn(async move { + event_reader_for_task + .start_extension_status_event_processor(false, Some(Duration::from_millis(50))) + .await; + }); + + // Wait for processing + tokio::time::sleep(Duration::from_millis(100)).await; + + // Cancel the token to stop the processor + common_state.cancel_cancellation_token(); + + // Wait for the task to complete + let result = tokio::time::timeout(Duration::from_secs(2), handle).await; + assert!( + result.is_ok(), + "Extension status event processor should stop when cancelled" + ); + + // Verify the file was processed + let files = misc_helpers::search_files( + &events_dir, + crate::telemetry::EXTENSION_EVENT_FILE_SEARCH_PATTERN, + ) + .unwrap(); + assert!( + files.is_empty(), + "Extension event file should be processed before cancellation" + ); + + _ = fs::remove_dir_all(&temp_dir); + } + + #[tokio::test] + async fn test_mixed_event_files() { + let mut temp_dir = env::temp_dir(); + temp_dir.push("test_mixed_event_files"); + + _ = fs::remove_dir_all(&temp_dir); + let mut events_dir = temp_dir.to_path_buf(); + events_dir.push("Events"); + misc_helpers::try_create_folder(&events_dir).unwrap(); + + let cancellation_token = CancellationToken::new(); + let common_state = CommonState::start_new(cancellation_token.clone()); + let event_reader = EventReader::new( + events_dir.clone(), + common_state.clone(), + "Test".to_string(), + "test_mixed_event_files".to_string(), + ); + + // Create generic event files (numeric names like 1234567890.json) + let message = "Test message for mixed events"; + let mut generic_events: Vec = Vec::new(); + for _ in 0..5 { + generic_events.push(Event::new( + "Informational".to_string(), + message.to_string(), + "test_mixed_event_files".to_string(), + "test_mixed_event_files".to_string(), + )); + } + + // Write 2 generic event files + let mut generic_file1 = events_dir.to_path_buf(); + generic_file1.push(crate::telemetry::new_generic_event_file_name()); + misc_helpers::json_write_to_file(&generic_events, &generic_file1).unwrap(); + + tokio::time::sleep(Duration::from_millis(1)).await; + + let mut generic_file2 = events_dir.to_path_buf(); + generic_file2.push(crate::telemetry::new_generic_event_file_name()); + misc_helpers::json_write_to_file(&generic_events, &generic_file2).unwrap(); + + // Create extension status event files + let extension = crate::telemetry::Extension { + name: "test_extension".to_string(), + version: "1.0.0".to_string(), + is_internal: true, + extension_type: "test_type".to_string(), + }; + let operation_status = crate::telemetry::OperationStatus { + operation_success: true, + operation: "test_operation".to_string(), + task_name: "test_task".to_string(), + message: "test_message".to_string(), + duration: 100, + }; + let extension_event = crate::telemetry::ExtensionStatusEvent::new( + extension.clone(), + operation_status.clone(), + ); + + // Write 3 extension event files + for _ in 0..3 { + let mut ext_file = events_dir.to_path_buf(); + ext_file.push(crate::telemetry::new_extension_event_file_name()); + misc_helpers::json_write_to_file(&extension_event, &ext_file).unwrap(); + tokio::time::sleep(Duration::from_millis(1)).await; + } + + // Verify all files were created + let generic_files = misc_helpers::search_files( + &events_dir, + crate::telemetry::GENERIC_EVENT_FILE_SEARCH_PATTERN, + ) + .unwrap(); + assert_eq!(2, generic_files.len(), "Should have 2 generic event files"); + + let extension_files = misc_helpers::search_files( + &events_dir, + crate::telemetry::EXTENSION_EVENT_FILE_SEARCH_PATTERN, + ) + .unwrap(); + assert_eq!( + 3, + extension_files.len(), + "Should have 3 extension event files" + ); + + // Process generic events using process_once + let generic_events_processed = event_reader.process_once().await; + assert_eq!( + 10, generic_events_processed, + "Should have processed 10 generic events (5 events x 2 files)" + ); + + // Verify only generic files were cleaned up + let generic_files = misc_helpers::search_files( + &events_dir, + crate::telemetry::GENERIC_EVENT_FILE_SEARCH_PATTERN, + ) + .unwrap(); + assert!( + generic_files.is_empty(), + "Generic event files should be cleaned up" + ); + + let extension_files = misc_helpers::search_files( + &events_dir, + crate::telemetry::EXTENSION_EVENT_FILE_SEARCH_PATTERN, + ) + .unwrap(); + assert_eq!( + 3, + extension_files.len(), + "Extension event files should still exist" + ); + + // Process extension events using process_extension_status_events + let extension_events_processed = event_reader.process_extension_status_events().await; + assert_eq!( + 3, extension_events_processed, + "Should have processed 3 extension status events" + ); + + // Verify extension files were cleaned up + let extension_files = misc_helpers::search_files( + &events_dir, + crate::telemetry::EXTENSION_EVENT_FILE_SEARCH_PATTERN, + ) + .unwrap(); + assert!( + extension_files.is_empty(), + "Extension event files should be cleaned up" + ); + + // Test that both processors ignore each other's files + // Write one of each type again + let mut generic_file = events_dir.to_path_buf(); + generic_file.push(crate::telemetry::new_generic_event_file_name()); + misc_helpers::json_write_to_file(&generic_events, &generic_file).unwrap(); + + let mut ext_file = events_dir.to_path_buf(); + ext_file.push(crate::telemetry::new_extension_event_file_name()); + misc_helpers::json_write_to_file(&extension_event, &ext_file).unwrap(); + + // Process extension events - should only process extension file + let extension_events_processed = event_reader.process_extension_status_events().await; + assert_eq!( + 1, extension_events_processed, + "Should only process extension event file" + ); + + // Generic file should still exist + let generic_files = misc_helpers::search_files( + &events_dir, + crate::telemetry::GENERIC_EVENT_FILE_SEARCH_PATTERN, + ) + .unwrap(); + assert_eq!( + 1, + generic_files.len(), + "Generic event file should still exist after extension processing" + ); + + // Process generic events - should only process generic file + let generic_events_processed = event_reader.process_once().await; + assert_eq!( + 5, generic_events_processed, + "Should only process generic event file" + ); + + // All files should be cleaned up now + let all_files = misc_helpers::get_files(&events_dir).unwrap(); + assert!(all_files.is_empty(), "All event files should be cleaned up"); + + common_state.cancel_cancellation_token(); + _ = fs::remove_dir_all(&temp_dir); + } } diff --git a/proxy_agent_shared/src/telemetry/event_sender.rs b/proxy_agent_shared/src/telemetry/event_sender.rs index 430a793e..9485943e 100644 --- a/proxy_agent_shared/src/telemetry/event_sender.rs +++ b/proxy_agent_shared/src/telemetry/event_sender.rs @@ -221,8 +221,10 @@ pub(crate) fn enqueue_event(event: TelemetryEvent) { mod tests { use super::*; use crate::host_clients::wire_server_client::WireServerClient; - use crate::telemetry::telemetry_event::{TelemetryGenericLogsEvent, VmMetaData}; - use crate::telemetry::Event; + use crate::telemetry::telemetry_event::{ + TelemetryExtensionEventsEvent, TelemetryGenericLogsEvent, VmMetaData, + }; + use crate::telemetry::{Event, ExtensionStatusEvent}; use tokio_util::sync::CancellationToken; fn create_test_vm_meta_data() -> VmMetaData { @@ -253,6 +255,39 @@ mod tests { )) } + fn create_test_extension_event() -> TelemetryEvent { + let extension = crate::telemetry::Extension { + name: "test_extension".to_string(), + version: "1.0.0".to_string(), + is_internal: true, + extension_type: "test_type".to_string(), + }; + let operation_status = crate::telemetry::OperationStatus { + operation_success: true, + operation: "install".to_string(), + task_name: "test_task".to_string(), + message: "Installation successful".to_string(), + duration: 500, + }; + let extension_status_event = ExtensionStatusEvent::new(extension, operation_status); + let telemetry_event = TelemetryExtensionEventsEvent::from_extension_status_event( + &extension_status_event, + "production".to_string(), + "1.0.0".to_string(), + ); + TelemetryEvent::ExtensionEvent(telemetry_event) + } + + #[tokio::test] + async fn test_event_sender_new() { + let cancellation_token = CancellationToken::new(); + let common_state = CommonState::start_new(cancellation_token); + let event_sender = EventSender::new(common_state); + + // Verify EventSender was created (common_state is private, so we just check it doesn't panic) + assert!(std::mem::size_of_val(&event_sender) > 0); + } + #[tokio::test] async fn test_common_state_vm_meta_data() { let cancellation_token = CancellationToken::new(); @@ -346,6 +381,135 @@ mod tests { assert!(xml.contains("")); } + #[test] + fn test_extension_event_xml_format() { + let vm_meta_data = create_test_vm_meta_data(); + let vm_data = TelemetryEventVMData::new_from_vm_meta_data(&vm_meta_data); + + // Test extension event XML + let event = create_test_extension_event(); + let event_xml = event.to_xml_event(&vm_data); + assert!(event_xml.contains("")); + assert!(event_xml.contains("")); + assert!(event_xml.contains("ExtensionType")); + assert!(event_xml.contains("test_type")); + assert!(event_xml.contains("Name")); + assert!(event_xml.contains("test_extension")); + + // Test provider ID for extension events + assert_eq!( + event.get_provider_id(), + "69B669B9-4AF8-4C50-BDC4-6006FA76E975" + ); + + // Test TelemetryData with extension event + let mut telemetry_data = TelemetryData::new_with_vm_data(vm_data); + telemetry_data.add_event(event); + let xml = telemetry_data.to_xml(); + assert!(xml.contains("")); + } + + #[test] + fn test_mixed_events_xml_format() { + let vm_meta_data = create_test_vm_meta_data(); + let vm_data = TelemetryEventVMData::new_from_vm_meta_data(&vm_meta_data); + + let mut telemetry_data = TelemetryData::new_with_vm_data(vm_data); + + // Add generic logs event + let generic_event = create_test_event("Test generic message"); + telemetry_data.add_event(generic_event); + + // Add extension event + let extension_event = create_test_extension_event(); + telemetry_data.add_event(extension_event); + + assert_eq!(telemetry_data.event_count(), 2); + + let xml = telemetry_data.to_xml(); + + // Verify both providers are present + assert!(xml.contains("")); + assert!(xml.contains("")); + assert!(xml.contains("")); // Generic logs event + assert!(xml.contains("")); // Extension event + } + + #[test] + fn test_queue_with_extension_events() { + // Create a local bounded queue for testing + let test_queue: ConcurrentQueue = ConcurrentQueue::bounded(10); + + // Add generic and extension events + let generic_event = create_test_event("Generic message"); + let extension_event = create_test_extension_event(); + + assert!(test_queue.push(generic_event.clone()).is_ok()); + assert!(test_queue.push(extension_event.clone()).is_ok()); + + assert_eq!(test_queue.len(), 2); + + // Verify FIFO order and event types + let popped1 = test_queue.pop(); + assert!(popped1.is_ok()); + assert_eq!( + popped1.unwrap().get_provider_id(), + "FFF0196F-EE4C-4EAF-9AA5-776F622DEB4F" + ); + + let popped2 = test_queue.pop(); + assert!(popped2.is_ok()); + assert_eq!( + popped2.unwrap().get_provider_id(), + "69B669B9-4AF8-4C50-BDC4-6006FA76E975" + ); + + assert!(test_queue.is_empty()); + } + + #[tokio::test] + async fn test_update_vm_meta_data_with_mock_server() { + let ip = "127.0.0.1"; + let port = 7072u16; + + let cancellation_token = CancellationToken::new(); + let common_state = CommonState::start_new(cancellation_token.clone()); + let event_sender = EventSender::new(common_state.clone()); + + // Start mock server + tokio::spawn(crate::server_mock::start( + ip.to_string(), + port, + cancellation_token.clone(), + )); + tokio::time::sleep(Duration::from_millis(100)).await; + + let wire_server_client = WireServerClient::new(ip, port); + let imds_client = ImdsClient::new(ip, port); + + // Initially vm_meta_data should be None + let vm_meta_data = common_state.get_vm_meta_data().await.unwrap(); + assert!(vm_meta_data.is_none()); + + // Update vm_meta_data + let result = event_sender + .update_vm_meta_data(&wire_server_client, &imds_client) + .await; + assert!(result.is_ok(), "update_vm_meta_data should succeed"); + + // Verify vm_meta_data was set + let vm_meta_data = common_state.get_vm_meta_data().await.unwrap(); + assert!(vm_meta_data.is_some(), "vm_meta_data should be set"); + + let vm_data = vm_meta_data.unwrap(); + // Values come from mock server responses + assert!(!vm_data.container_id.is_empty()); + assert!(!vm_data.role_name.is_empty()); + + cancellation_token.cancel(); + } + /// Consolidated test for all TELEMETRY_EVENT_QUEUE and wire server operations. /// This test must run in a single test function because the global static queue /// cannot be reopened once closed. The test covers: @@ -394,7 +558,22 @@ mod tests { event_sender.process_event_queue(None, None).await; assert!(TELEMETRY_EVENT_QUEUE.is_empty()); - // ===== Part 3: Test enqueue and process with mock server ===== + // ===== Part 3: Test enqueue mixed events (generic and extension) ===== + let generic_event = create_test_event("Generic event for queue"); + let extension_event = create_test_extension_event(); + + enqueue_event(generic_event); + enqueue_event(extension_event); + assert_eq!( + TELEMETRY_EVENT_QUEUE.len(), + 2, + "Queue should have 2 mixed events" + ); + + // Clear for next test + while TELEMETRY_EVENT_QUEUE.pop().is_ok() {} + + // ===== Part 4: Test enqueue and process with mock server ===== // Enqueue events let event_a = create_test_event("Test event A for processing"); let event_b = create_test_event("Test event B for processing"); @@ -455,7 +634,7 @@ mod tests { "Queue should not be closed after processing" ); - // ===== Part 4: Test send_data_to_wire_server ===== + // ===== Part 5: Test send_data_to_wire_server ===== let wire_server_client = WireServerClient::new(ip, port); let vm_meta_data = create_test_vm_meta_data(); let vm_data = TelemetryEventVMData::new_from_vm_meta_data(&vm_meta_data); @@ -466,13 +645,20 @@ mod tests { EventSender::send_data_to_wire_server(empty_data, &wire_server_client).await; // Test sending data with events - let mut telemetry_data = TelemetryData::new_with_vm_data(vm_data); + let mut telemetry_data = TelemetryData::new_with_vm_data(vm_data.clone()); telemetry_data.add_event(create_test_event("Test event 1")); telemetry_data.add_event(create_test_event("Test event 2")); assert_eq!(telemetry_data.event_count(), 2); EventSender::send_data_to_wire_server(telemetry_data, &wire_server_client).await; - // ===== Part 5: Test EventSender lifecycle (cancellation) ===== + // Test sending data with mixed events + let mut mixed_data = TelemetryData::new_with_vm_data(vm_data); + mixed_data.add_event(create_test_event("Generic event")); + mixed_data.add_event(create_test_extension_event()); + assert_eq!(mixed_data.event_count(), 2); + EventSender::send_data_to_wire_server(mixed_data, &wire_server_client).await; + + // ===== Part 6: Test EventSender lifecycle (cancellation) ===== // This MUST be last as it closes the queue permanently // Cancel the token - this will close the queue, stop the event sender task and stop mock server diff --git a/proxy_agent_shared/src/telemetry/telemetry_event.rs b/proxy_agent_shared/src/telemetry/telemetry_event.rs index 2b31c70b..45168ec7 100644 --- a/proxy_agent_shared/src/telemetry/telemetry_event.rs +++ b/proxy_agent_shared/src/telemetry/telemetry_event.rs @@ -708,4 +708,208 @@ mod tests { assert!(json.contains("CpuArchitecture")); assert!(json.contains("x86_64")); } + + fn create_test_extension_status_event() -> ExtensionStatusEvent { + let extension = crate::telemetry::Extension { + name: "test_extension".to_string(), + version: "2.0.0".to_string(), + is_internal: true, + extension_type: "test_type".to_string(), + }; + let operation_status = crate::telemetry::OperationStatus { + operation_success: true, + operation: "install".to_string(), + task_name: "test_task".to_string(), + message: "Installation successful".to_string(), + duration: 500, + }; + ExtensionStatusEvent::new(extension, operation_status) + } + + /// Tests TelemetryExtensionEventsEvent creation and XML generation + #[test] + fn test_telemetry_extension_events_event() { + let extension_status_event = create_test_extension_status_event(); + let telemetry_event = TelemetryExtensionEventsEvent::from_extension_status_event( + &extension_status_event, + "production".to_string(), + "1.0.0".to_string(), + ); + + // Verify field mappings + assert_eq!(telemetry_event.ga_version, "1.0.0"); + assert_eq!(telemetry_event.execution_mode, "production"); + assert_eq!(telemetry_event.extension_type, "test_type"); + assert!(telemetry_event.is_internal); + assert_eq!(telemetry_event.name, "test_extension"); + assert_eq!(telemetry_event.version, "2.0.0"); + assert_eq!(telemetry_event.operation, "install"); + assert_eq!(telemetry_event.task_name, "test_task"); + assert!(telemetry_event.operation_success); + assert_eq!(telemetry_event.message, "Installation successful"); + assert_eq!(telemetry_event.duration, 500); + + // Verify provider ID + assert_eq!( + TelemetryExtensionEventsEvent::get_provider_id(), + STATUS_PROVIDER_ID + ); + + // Test XML generation + let vm_data = create_test_vm_data(); + let xml = telemetry_event.to_xml_event(&vm_data); + assert!(xml.contains("")); + assert!(xml.contains("")); + assert!(xml.contains("ExtensionType")); + assert!(xml.contains("test_type")); + assert!(xml.contains("IsInternal")); + assert!(xml.contains("True")); // is_internal = true + assert!(xml.contains("Name")); + assert!(xml.contains("test_extension")); + assert!(xml.contains("Version")); + assert!(xml.contains("2.0.0")); + assert!(xml.contains("Operation")); + assert!(xml.contains("install")); + assert!(xml.contains("OperationSuccess")); + assert!(xml.contains("Message")); + assert!(xml.contains("Installation successful")); + assert!(xml.contains("Duration")); + assert!(xml.contains("500")); + } + + /// Tests TelemetryExtensionEventsEvent with operation failure + #[test] + fn test_telemetry_extension_events_event_failure() { + let extension = crate::telemetry::Extension { + name: "failed_extension".to_string(), + version: "1.0.0".to_string(), + is_internal: false, + extension_type: "external_type".to_string(), + }; + let operation_status = crate::telemetry::OperationStatus { + operation_success: false, + operation: "enable".to_string(), + task_name: "enable_task".to_string(), + message: "Enable failed with error".to_string(), + duration: 100, + }; + let extension_status_event = ExtensionStatusEvent::new(extension, operation_status); + + let telemetry_event = TelemetryExtensionEventsEvent::from_extension_status_event( + &extension_status_event, + "test".to_string(), + "2.0.0".to_string(), + ); + + assert!(!telemetry_event.is_internal); + assert!(!telemetry_event.operation_success); + assert_eq!(telemetry_event.name, "failed_extension"); + assert_eq!(telemetry_event.operation, "enable"); + + // Test XML with False values + let vm_data = create_test_vm_data(); + let xml = telemetry_event.to_xml_event(&vm_data); + assert!(xml.contains("IsInternal")); + assert!(xml.contains("\"False\"")); // is_internal = false + assert!(xml.contains("OperationSuccess")); + } + + /// Tests TelemetryEvent enum with ExtensionEvent variant + #[test] + fn test_telemetry_event_extension_variant() { + let extension_status_event = create_test_extension_status_event(); + let telemetry_event = TelemetryExtensionEventsEvent::from_extension_status_event( + &extension_status_event, + "production".to_string(), + "1.0.0".to_string(), + ); + let event = TelemetryEvent::ExtensionEvent(telemetry_event); + + // Test provider ID through enum + assert_eq!(event.get_provider_id(), STATUS_PROVIDER_ID); + + // Test XML generation through enum + let vm_data = create_test_vm_data(); + let xml = event.to_xml_event(&vm_data); + assert!(xml.contains("")); + assert!(xml.contains("ExtensionType")); + } + + /// Tests TelemetryData with mixed event types (GenericLogs and Extension events) + #[test] + fn test_telemetry_data_mixed_events() { + let vm_data = create_test_vm_data(); + let mut telemetry_data = TelemetryData::new_with_vm_data(vm_data); + + // Add generic logs event + let generic_event = create_test_telemetry_event("Generic log message"); + telemetry_data.add_event(generic_event); + assert_eq!(telemetry_data.event_count(), 1); + + // Add extension event + let extension_status_event = create_test_extension_status_event(); + let extension_telemetry_event = TelemetryExtensionEventsEvent::from_extension_status_event( + &extension_status_event, + "production".to_string(), + "1.0.0".to_string(), + ); + let extension_event = TelemetryEvent::ExtensionEvent(extension_telemetry_event); + telemetry_data.add_event(extension_event.clone()); + assert_eq!(telemetry_data.event_count(), 2); + + // Add another extension event + let extension_status_event2 = create_test_extension_status_event(); + let extension_telemetry_event2 = TelemetryExtensionEventsEvent::from_extension_status_event( + &extension_status_event2, + "production".to_string(), + "1.0.0".to_string(), + ); + let extension_event2 = TelemetryEvent::ExtensionEvent(extension_telemetry_event2); + telemetry_data.add_event(extension_event2); + assert_eq!(telemetry_data.event_count(), 3); + + // Verify XML contains both provider types + let xml = telemetry_data.to_xml(); + assert!(xml.contains(&format!("", METRICS_PROVIDER_ID))); + assert!(xml.contains(&format!("", STATUS_PROVIDER_ID))); + assert!(xml.contains("")); // Generic logs event + assert!(xml.contains("")); // Extension event + + // Remove extension event + let removed = telemetry_data.remove_last_event(extension_event); + assert!(removed.is_some()); + assert_eq!(telemetry_data.event_count(), 2); + } + + /// Tests TelemetryProvider with extension events + #[test] + fn test_telemetry_provider_with_extension_events() { + let mut provider = TelemetryProvider::new(STATUS_PROVIDER_ID.to_string()); + assert_eq!(provider.id, STATUS_PROVIDER_ID); + assert_eq!(provider.event_count(), 0); + + // Add extension events + let extension_status_event = create_test_extension_status_event(); + let telemetry_event = TelemetryExtensionEventsEvent::from_extension_status_event( + &extension_status_event, + "production".to_string(), + "1.0.0".to_string(), + ); + let event = TelemetryEvent::ExtensionEvent(telemetry_event); + provider.add_event(event.clone()); + assert_eq!(provider.event_count(), 1); + + // Test XML generation + let vm_data = create_test_vm_data(); + let xml = provider.to_xml(&vm_data); + assert!(xml.starts_with(&format!("", STATUS_PROVIDER_ID))); + assert!(xml.ends_with("")); + assert!(xml.contains("")); + + // Remove event + let removed = provider.remove_event(event); + assert!(removed.is_some()); + assert_eq!(provider.event_count(), 0); + } } From bfe2f015b18caf9640715958620502f12e6ba55c Mon Sep 17 00:00:00 2001 From: Zhidong Peng Date: Wed, 4 Feb 2026 18:54:52 +0000 Subject: [PATCH 03/12] remove linux-tools-generic --- .github/workflows/codeql.yml | 1 - .github/workflows/reusable-build.yml | 2 -- 2 files changed, 3 deletions(-) diff --git a/.github/workflows/codeql.yml b/.github/workflows/codeql.yml index 7dce6f8f..22dfbf71 100644 --- a/.github/workflows/codeql.yml +++ b/.github/workflows/codeql.yml @@ -75,7 +75,6 @@ jobs: build-essential \ linux-tools-$(uname -r) \ linux-tools-common \ - linux-tools-generic \ rpm \ musl-tools \ diff --git a/.github/workflows/reusable-build.yml b/.github/workflows/reusable-build.yml index 4d44266a..f5fe69b1 100644 --- a/.github/workflows/reusable-build.yml +++ b/.github/workflows/reusable-build.yml @@ -292,7 +292,6 @@ jobs: build-essential \ linux-tools-$(uname -r) \ linux-tools-common \ - linux-tools-generic \ rpm \ musl-tools \ libssl-dev \ @@ -464,7 +463,6 @@ jobs: build-essential \ linux-tools-$(uname -r) \ linux-tools-common \ - linux-tools-generic \ rpm \ musl-tools \ gcc-aarch64-linux-gnu \ From af3b3c8acf8ad105fd28ada1076a8c110b6b72aa Mon Sep 17 00:00:00 2001 From: Zhidong Peng Date: Thu, 5 Feb 2026 17:09:46 +0000 Subject: [PATCH 04/12] add send success log --- .vscode/settings.json | 5 ++++- proxy_agent_shared/src/telemetry/event_sender.rs | 7 ++++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index 9bf0838b..3e71cf20 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -32,5 +32,8 @@ ], "vscode-nmake-tools.workspaceBuildDirectories": [ "." - ] + ], + "chat.tools.terminal.autoApprove": { + "./build-linux.sh": true + } } \ No newline at end of file diff --git a/proxy_agent_shared/src/telemetry/event_sender.rs b/proxy_agent_shared/src/telemetry/event_sender.rs index 9485943e..64be2ce4 100644 --- a/proxy_agent_shared/src/telemetry/event_sender.rs +++ b/proxy_agent_shared/src/telemetry/event_sender.rs @@ -7,7 +7,7 @@ use std::time::Duration; use crate::common_state::{self, CommonState}; use crate::host_clients::imds_client::ImdsClient; use crate::host_clients::wire_server_client::WireServerClient; -use crate::logger::logger_manager; +use crate::logger::{logger_manager, LoggerLevel}; use crate::result::Result; use crate::telemetry::telemetry_event::{ TelemetryData, TelemetryEvent, TelemetryEventVMData, VmMetaData, @@ -191,12 +191,17 @@ impl EventSender { return; } + let event_count = telemetry_data.event_count(); for _ in [0; 5] { match wire_server_client .send_telemetry_data(telemetry_data.to_xml()) .await { Ok(()) => { + logger_manager::write_log( + LoggerLevel::Trace, + format!("Successfully sent {event_count} telemetry events to wire server."), + ); break; } Err(e) => { From 564b6e03c621471ec983560791e163c99c22cf46 Mon Sep 17 00:00:00 2001 From: "Zhidong Peng (HE/HIM)" Date: Thu, 5 Feb 2026 11:26:18 -0800 Subject: [PATCH 05/12] Update mock server to auto bind an available local tcp port --- proxy_agent/src/key_keeper.rs | 10 ++- proxy_agent/src/proxy/authorization_rules.rs | 2 +- proxy_agent_shared/src/hyper_client.rs | 11 ++- proxy_agent_shared/src/server_mock.rs | 69 ++++++++++++------- .../src/telemetry/event_sender.rs | 49 ++++++------- 5 files changed, 76 insertions(+), 65 deletions(-) diff --git a/proxy_agent/src/key_keeper.rs b/proxy_agent/src/key_keeper.rs index 8c76fb44..40b92c65 100644 --- a/proxy_agent/src/key_keeper.rs +++ b/proxy_agent/src/key_keeper.rs @@ -1032,7 +1032,7 @@ mod tests { match fs::remove_dir_all(&temp_test_path) { Ok(_) => {} Err(e) => { - print!("Failed to remove_dir_all with error {}.", e); + eprintln!("Failed to remove_dir_all with error {}.", e); } } @@ -1040,11 +1040,9 @@ mod tests { // start wire_server listener let ip = "127.0.0.1"; let port = 8081u16; - tokio::spawn(server_mock::start( - ip.to_string(), - port, - cancellation_token.clone(), - )); + let port = server_mock::start(ip.to_string(), port, cancellation_token.clone()) + .await + .unwrap(); tokio::time::sleep(Duration::from_millis(100)).await; // start with disabled secure channel state diff --git a/proxy_agent/src/proxy/authorization_rules.rs b/proxy_agent/src/proxy/authorization_rules.rs index 27ef84fa..59261293 100644 --- a/proxy_agent/src/proxy/authorization_rules.rs +++ b/proxy_agent/src/proxy/authorization_rules.rs @@ -544,7 +544,7 @@ mod tests { match std::fs::remove_dir_all(&temp_test_path) { Ok(_) => {} Err(e) => { - print!("Failed to remove_dir_all with error {}.", e); + eprintln!("Failed to remove_dir_all with error {}.", e); } } misc_helpers::try_create_folder(&temp_test_path).unwrap(); diff --git a/proxy_agent_shared/src/hyper_client.rs b/proxy_agent_shared/src/hyper_client.rs index d4e0d89e..52ed8452 100644 --- a/proxy_agent_shared/src/hyper_client.rs +++ b/proxy_agent_shared/src/hyper_client.rs @@ -471,6 +471,7 @@ mod tests { use crate::{ host_clients::{imds_client::ImdsClient, wire_server_client::WireServerClient}, logger::logger_manager, + server_mock, }; use tokio_util::sync::CancellationToken; @@ -510,13 +511,11 @@ mod tests { async fn http_request_tests() { // start mock server let ip = "127.0.0.1"; - let port = 7072u16; + let port = 9072u16; let cancellation_token = CancellationToken::new(); - tokio::spawn(crate::server_mock::start( - ip.to_string(), - port, - cancellation_token.clone(), - )); + let port = server_mock::start(ip.to_string(), port, cancellation_token.clone()) + .await + .unwrap(); tokio::time::sleep(std::time::Duration::from_millis(100)).await; logger_manager::write_info("server_mock started.".to_string()); diff --git a/proxy_agent_shared/src/server_mock.rs b/proxy_agent_shared/src/server_mock.rs index 757c19a1..fb28d9f9 100644 --- a/proxy_agent_shared/src/server_mock.rs +++ b/proxy_agent_shared/src/server_mock.rs @@ -21,38 +21,59 @@ static EMPTY_GUID: Lazy = Lazy::new(|| "00000000-0000-0000-0000-00000000 static GUID: Lazy = Lazy::new(|| Uuid::new_v4().to_string()); static mut CURRENT_STATE: Lazy = Lazy::new(|| String::from("wireserver")); -pub async fn start(ip: String, port: u16, cancellation_token: CancellationToken) { +/// A mock server to simulate the behavior of the Azure WireServer for testing purposes. +/// It listens for incoming HTTP requests and responds with predefined responses based on the request path and method. +/// The server can be started on a specified IP and port, and it can be gracefully shut down using a cancellation token. +/// If the port is already in use, it will automatically bind to an ephemeral port and log a warning message. +/// Returns the port number that the server is listening on, or an error if the server fails to start. +pub async fn start(ip: String, port: u16, cancellation_token: CancellationToken) -> Result { logger_manager::write_info("Mock Server starting...".to_string()); let addr = format!("{ip}:{port}"); - let listener = TcpListener::bind(&addr).await.unwrap(); - println!("Listening on http://{addr}"); - - loop { - tokio::select! { - _ = cancellation_token.cancelled() => { - logger_manager::write_warn("cancellation token signal received, stop the listener.".to_string()); - return; + let listener = match TcpListener::bind(&addr).await { + Ok(l) => l, + Err(e) => { + // if the specified port is already in use, bind to an ephemeral port instead + if e.kind() == std::io::ErrorKind::AddrInUse { + logger_manager::write_warn(format!( + "Port {port} is already in use, trying to bind to an ephemeral port." + )); + TcpListener::bind(format!("{ip}:0")).await? + } else { + return Err(e.into()); } - result = listener.accept() => { - match result { - Ok((stream, _)) =>{ - let ip = ip.to_string(); - tokio::spawn(async move { - let io = TokioIo::new(stream); + } + }; + let local_addr = listener.local_addr().unwrap(); + println!("Mock Server Listening on http://{local_addr}"); + tokio::spawn(async move { + loop { + tokio::select! { + _ = cancellation_token.cancelled() => { + logger_manager::write_warn("cancellation token signal received, stop the listener.".to_string()); + return; + } + result = listener.accept() => { + match result { + Ok((stream, _)) =>{ let ip = ip.to_string(); - let service = service_fn(move |req| handle_request(ip.to_string(), port, req)); - if let Err(err) = http1::Builder::new().serve_connection(io, service).await { - println!("Error serving connection: {err:?}"); - } - }); - }, - Err(e) => { - logger_manager::write_err(format!("Failed to accept connection: {e}")); + tokio::spawn(async move { + let io = TokioIo::new(stream); + let ip = ip.to_string(); + let service = service_fn(move |req| handle_request(ip.to_string(), port, req)); + if let Err(err) = http1::Builder::new().serve_connection(io, service).await { + println!("Error serving connection: {err:?}"); + } + }); + }, + Err(e) => { + logger_manager::write_err(format!("Failed to accept connection: {e}")); + } } } } } - } + }); + Ok(local_addr.port()) } async fn handle_request( diff --git a/proxy_agent_shared/src/telemetry/event_sender.rs b/proxy_agent_shared/src/telemetry/event_sender.rs index 64be2ce4..8d2e790f 100644 --- a/proxy_agent_shared/src/telemetry/event_sender.rs +++ b/proxy_agent_shared/src/telemetry/event_sender.rs @@ -226,6 +226,7 @@ pub(crate) fn enqueue_event(event: TelemetryEvent) { mod tests { use super::*; use crate::host_clients::wire_server_client::WireServerClient; + use crate::server_mock; use crate::telemetry::telemetry_event::{ TelemetryExtensionEventsEvent, TelemetryGenericLogsEvent, VmMetaData, }; @@ -476,18 +477,15 @@ mod tests { #[tokio::test] async fn test_update_vm_meta_data_with_mock_server() { let ip = "127.0.0.1"; - let port = 7072u16; + let port = 9073u16; let cancellation_token = CancellationToken::new(); let common_state = CommonState::start_new(cancellation_token.clone()); let event_sender = EventSender::new(common_state.clone()); - // Start mock server - tokio::spawn(crate::server_mock::start( - ip.to_string(), - port, - cancellation_token.clone(), - )); + let port = server_mock::start(ip.to_string(), port, cancellation_token.clone()) + .await + .unwrap(); tokio::time::sleep(Duration::from_millis(100)).await; let wire_server_client = WireServerClient::new(ip, port); @@ -531,7 +529,7 @@ mod tests { // Mock server details let ip = "127.0.0.1"; - let port = 7071u16; + let port = 9071u16; // Create EventSender let cancellation_token = CancellationToken::new(); @@ -593,34 +591,29 @@ mod tests { "Queue should have 3 events after enqueue" ); - // Start the event sender in a separate task - let handle = tokio::spawn(async move { - event_sender.start(Some(ip), Some(port)).await; - }); - - // Give it a moment to start event sender task - tokio::time::sleep(Duration::from_millis(50)).await; - - // Notify to process events - process_common_state.notify_telemetry_event().await.unwrap(); - - // Give it a moment to process the events while the VM data is still not set as Mock server not started yet - tokio::time::sleep(Duration::from_millis(100)).await; + // Process events - VM data cannot be retrieved yet as mock server not started, but should still attempt to process and log warnings + event_sender.process_event_queue(Some(ip), Some(port)).await; assert_eq!( TELEMETRY_EVENT_QUEUE.len(), 3, - "Queue should have 3 events after notify_telemetry_event but without VM data" + "Queue should have 3 events after process_event_queue but without VM data" ); // Start mock server to respond to goalstate and shared config requests - tokio::spawn(crate::server_mock::start( - ip.to_string(), - port, - cancellation_token.clone(), - )); + let port = server_mock::start(ip.to_string(), port, cancellation_token.clone()) + .await + .unwrap(); tokio::time::sleep(Duration::from_millis(100)).await; - // Notify again to process events now that VM data can be retrieved + // Start the event sender in a separate task + let handle = tokio::spawn(async move { + event_sender.start(Some(ip), Some(port)).await; + }); + + // Give it a moment to start event sender task + tokio::time::sleep(Duration::from_millis(50)).await; + + // Notify to process events now that VM data can be retrieved process_common_state.notify_telemetry_event().await.unwrap(); // Give it a moment to process the events (needs enough time for HTTP requests) From e9acb9a0ec63c34c462561d46cd42976651b767a Mon Sep 17 00:00:00 2001 From: Zhidong Peng Date: Thu, 5 Feb 2026 22:11:55 +0000 Subject: [PATCH 06/12] Add missing fields in fn to_xml_event --- .../src/telemetry/event_sender.rs | 1 + .../src/telemetry/telemetry_event.rs | 51 ++++++++++++++++++- 2 files changed, 51 insertions(+), 1 deletion(-) diff --git a/proxy_agent_shared/src/telemetry/event_sender.rs b/proxy_agent_shared/src/telemetry/event_sender.rs index 8d2e790f..fccbd5ca 100644 --- a/proxy_agent_shared/src/telemetry/event_sender.rs +++ b/proxy_agent_shared/src/telemetry/event_sender.rs @@ -647,6 +647,7 @@ mod tests { telemetry_data.add_event(create_test_event("Test event 1")); telemetry_data.add_event(create_test_event("Test event 2")); assert_eq!(telemetry_data.event_count(), 2); + println!("{}", telemetry_data.to_xml()); EventSender::send_data_to_wire_server(telemetry_data, &wire_server_client).await; // Test sending data with mixed events diff --git a/proxy_agent_shared/src/telemetry/telemetry_event.rs b/proxy_agent_shared/src/telemetry/telemetry_event.rs index 45168ec7..e7251346 100644 --- a/proxy_agent_shared/src/telemetry/telemetry_event.rs +++ b/proxy_agent_shared/src/telemetry/telemetry_event.rs @@ -337,6 +337,32 @@ impl TelemetryGenericLogsEvent { xml.push_str(&vm_data.to_xml_params()); + xml.push_str(&format!( + "", + self.event_pid + )); + xml.push_str(&format!( + "", + self.event_tid + )); + xml.push_str(&format!( + "", + misc_helpers::xml_escape(self.ga_version.to_string()) + )); + xml.push_str(&format!( + "", + misc_helpers::xml_escape(self.execution_mode.to_string()) + )); + xml.push_str(&format!( + "", + misc_helpers::xml_escape(self.task_name.to_string()) + )); + xml.push_str(&format!( + "", + misc_helpers::xml_escape(self.opcode_name.to_string()) + )); + + xml.push_str(&format!( "", misc_helpers::xml_escape(self.event_name.to_string()) @@ -417,7 +443,30 @@ impl TelemetryExtensionEventsEvent { xml.push_str(&vm_data.to_xml_params()); - // ... Additional parameters similar to TelemetryGenericLogsEvent + xml.push_str(&format!( + "", + self.event_pid + )); + xml.push_str(&format!( + "", + self.event_tid + )); + xml.push_str(&format!( + "", + misc_helpers::xml_escape(self.ga_version.to_string()) + )); + xml.push_str(&format!( + "", + misc_helpers::xml_escape(self.execution_mode.to_string()) + )); + xml.push_str(&format!( + "", + misc_helpers::xml_escape(self.task_name.to_string()) + )); + xml.push_str(&format!( + "", + misc_helpers::xml_escape(self.opcode_name.to_string()) + )); xml.push_str(&format!( "", misc_helpers::xml_escape(self.extension_type.to_string()) From 8d8fb8b02ed98a99ac0c79084cae0ff682e8cccd Mon Sep 17 00:00:00 2001 From: Zhidong Peng Date: Sat, 7 Feb 2026 01:27:33 +0000 Subject: [PATCH 07/12] replace misc_helpers::get_current_version() with current_info::get_current_exe_version() --- proxy_agent/src/common/logger.rs | 4 +-- proxy_agent/src/main.rs | 4 +-- proxy_agent/src/provision.rs | 13 ++++++++ proxy_agent/src/proxy_agent_status.rs | 33 ++++++++++++------- proxy_agent/src/service.rs | 2 +- proxy_agent_extension/src/handler_main.rs | 2 +- proxy_agent_extension/src/service_main.rs | 2 +- proxy_agent_setup/src/main.rs | 2 +- .../src/telemetry/telemetry_event.rs | 1 - 9 files changed, 42 insertions(+), 21 deletions(-) diff --git a/proxy_agent/src/common/logger.rs b/proxy_agent/src/common/logger.rs index 437a3f69..a83c390b 100644 --- a/proxy_agent/src/common/logger.rs +++ b/proxy_agent/src/common/logger.rs @@ -43,14 +43,14 @@ fn log(log_level: LoggerLevel, message: String) { #[cfg(not(windows))] pub fn write_serial_console_log(message: String) { - use proxy_agent_shared::misc_helpers; + use proxy_agent_shared::{current_info, misc_helpers}; use std::io::Write; let message = format!( "{} {}_{}({}) - {}\n", misc_helpers::get_date_time_string_with_milliseconds(), env!("CARGO_PKG_NAME"), - misc_helpers::get_current_version(), + current_info::get_current_exe_version(), std::process::id(), message ); diff --git a/proxy_agent/src/main.rs b/proxy_agent/src/main.rs index 7a73720c..5f090fd4 100644 --- a/proxy_agent/src/main.rs +++ b/proxy_agent/src/main.rs @@ -15,7 +15,7 @@ use common::cli::{Commands, CLI}; use common::constants; use common::helpers; use provision::provision_query::ProvisionQuery; -use proxy_agent_shared::misc_helpers; +use proxy_agent_shared::current_info; use shared_state::SharedState; use std::{process, time::Duration}; @@ -48,7 +48,7 @@ async fn main() { let _time = helpers::get_elapsed_time_in_millisec(); if CLI.version { - println!("{}", misc_helpers::get_current_version()); + println!("{}", current_info::get_current_exe_version()); return; } diff --git a/proxy_agent/src/provision.rs b/proxy_agent/src/provision.rs index 5b2fa519..4dc27281 100644 --- a/proxy_agent/src/provision.rs +++ b/proxy_agent/src/provision.rs @@ -381,6 +381,19 @@ pub async fn start_event_threads(event_threads_shared_state: EventThreadsSharedS .await; } }); + tokio::spawn({ + let event_reader = EventReader::new( + config::get_events_dir(), + event_threads_shared_state.common_state.clone(), + "ProxyAgent".to_string(), + "MicrosoftAzureGuestProxyAgent".to_string(), + ); + async move { + event_reader + .start_extension_status_event_processor(true, Some(Duration::from_secs(60))) + .await; + } + }); tokio::spawn({ let event_sender = EventSender::new(event_threads_shared_state.common_state.clone()); async move { diff --git a/proxy_agent/src/proxy_agent_status.rs b/proxy_agent/src/proxy_agent_status.rs index 289cf3da..17b9b26a 100644 --- a/proxy_agent/src/proxy_agent_status.rs +++ b/proxy_agent/src/proxy_agent_status.rs @@ -28,18 +28,17 @@ //! tokio::spawn(proxy_agent_status_task.start()); //! ``` -use crate::common::logger; +use crate::common::{constants, logger}; use crate::key_keeper::UNKNOWN_STATE; use crate::shared_state::agent_status_wrapper::{AgentStatusModule, AgentStatusSharedState}; use crate::shared_state::connection_summary_wrapper::ConnectionSummarySharedState; use crate::shared_state::key_keeper_wrapper::KeyKeeperSharedState; -use proxy_agent_shared::logger::LoggerLevel; -use proxy_agent_shared::misc_helpers; use proxy_agent_shared::proxy_agent_aggregate_status::{ GuestProxyAgentAggregateStatus, ModuleState, OverallState, ProxyAgentDetailStatus, ProxyAgentStatus, }; -use proxy_agent_shared::telemetry::event_logger; +use proxy_agent_shared::telemetry::{event_logger, Extension, OperationStatus}; +use proxy_agent_shared::{current_info, misc_helpers}; use std::collections::HashMap; use std::path::PathBuf; use std::time::{Duration, Instant}; @@ -126,7 +125,12 @@ impl ProxyAgentStatusTask { e )); } - + let agent_status = Extension { + name: constants::PROXY_AGENT_SERVICE_NAME.to_string(), + version: current_info::get_current_exe_version(), + is_internal: true, + extension_type: "Monitoring".to_string(), + }; loop { #[cfg(not(windows))] { @@ -140,12 +144,17 @@ impl ProxyAgentStatusTask { Ok(status) => status, Err(e) => format!("Error serializing proxy agent status: {e}"), }; - event_logger::write_event( - LoggerLevel::Info, - status, - "loop_status", - "proxy_agent_status", - logger::AGENT_LOGGER_KEY, + // private build tests report_extension_status_event + event_logger::report_extension_status_event( + agent_status.clone(), + OperationStatus { + operation_success: aggregate_status.proxyAgentStatus.status + == OverallState::SUCCESS, + task_name: "loop_status".to_string(), + operation: "report_proxy_agent_status".to_string(), + message: status, + duration: status_report_time.elapsed().as_millis() as i64, + }, ); status_report_time = Instant::now(); } @@ -225,7 +234,7 @@ impl ProxyAgentStatusTask { }; ProxyAgentStatus { - version: misc_helpers::get_current_version(), + version: current_info::get_current_exe_version(), status, // monitorStatus is proxy_agent_status itself status monitorStatus: ProxyAgentDetailStatus { diff --git a/proxy_agent/src/service.rs b/proxy_agent/src/service.rs index 500acbc9..db0f2459 100644 --- a/proxy_agent/src/service.rs +++ b/proxy_agent/src/service.rs @@ -46,7 +46,7 @@ pub async fn start_service(shared_state: SharedState) { let start_message = format!( "============== GuestProxyAgent ({}) is starting on {}({}), elapsed: {}", - proxy_agent_shared::misc_helpers::get_current_version(), + current_info::get_current_exe_version(), current_info::get_long_os_version(), current_info::get_cpu_arch(), helpers::get_elapsed_time_in_millisec() diff --git a/proxy_agent_extension/src/handler_main.rs b/proxy_agent_extension/src/handler_main.rs index 41920f4d..b650b06a 100644 --- a/proxy_agent_extension/src/handler_main.rs +++ b/proxy_agent_extension/src/handler_main.rs @@ -41,7 +41,7 @@ pub async fn program_start(command: ExtensionCommand, config_seq_no: String) { logger::write(format!( "GuestProxyAgentExtension Version: {}, OS Arch: {}, OS Version: {}", - misc_helpers::get_current_version(), + current_info::get_current_exe_version(), misc_helpers::get_processor_arch(), misc_helpers::get_long_os_version() )); diff --git a/proxy_agent_extension/src/service_main.rs b/proxy_agent_extension/src/service_main.rs index aeeac69a..de2936d6 100644 --- a/proxy_agent_extension/src/service_main.rs +++ b/proxy_agent_extension/src/service_main.rs @@ -28,7 +28,7 @@ const MAX_STATE_COUNT: u32 = 120; pub fn run() { let message = format!( "============== GuestProxyAgentExtension Enabling Agent, Version: {}, OS Arch: {}, OS Version: {}", - misc_helpers::get_current_version(), + current_info::get_current_exe_version(), misc_helpers::get_processor_arch(), misc_helpers::get_long_os_version() ); diff --git a/proxy_agent_setup/src/main.rs b/proxy_agent_setup/src/main.rs index 1e14ba0b..89dce71e 100644 --- a/proxy_agent_setup/src/main.rs +++ b/proxy_agent_setup/src/main.rs @@ -32,7 +32,7 @@ async fn main() { let cli = args::Cli::parse(); logger::write(format!( "\r\n\r\n============== ProxyAgent Setup Tool ({}) is starting with args: {} ==============", - misc_helpers::get_current_version(), + current_info::get_current_exe_version(), cli )); diff --git a/proxy_agent_shared/src/telemetry/telemetry_event.rs b/proxy_agent_shared/src/telemetry/telemetry_event.rs index e7251346..b50972da 100644 --- a/proxy_agent_shared/src/telemetry/telemetry_event.rs +++ b/proxy_agent_shared/src/telemetry/telemetry_event.rs @@ -361,7 +361,6 @@ impl TelemetryGenericLogsEvent { "", misc_helpers::xml_escape(self.opcode_name.to_string()) )); - xml.push_str(&format!( "", From 597c39e8ba2e1c5b1269bbda37ed78b28d1dcd78 Mon Sep 17 00:00:00 2001 From: Zhidong Peng Date: Sat, 7 Feb 2026 01:30:03 +0000 Subject: [PATCH 08/12] fix --- proxy_agent_extension/src/handler_main.rs | 1 + proxy_agent_setup/src/main.rs | 1 + 2 files changed, 2 insertions(+) diff --git a/proxy_agent_extension/src/handler_main.rs b/proxy_agent_extension/src/handler_main.rs index b650b06a..94220412 100644 --- a/proxy_agent_extension/src/handler_main.rs +++ b/proxy_agent_extension/src/handler_main.rs @@ -6,6 +6,7 @@ use crate::logger; use crate::structs; use crate::ExtensionCommand; use once_cell::sync::Lazy; +use proxy_agent_shared::current_info; use proxy_agent_shared::misc_helpers; use proxy_agent_shared::version::Version; use std::fs::{self}; diff --git a/proxy_agent_setup/src/main.rs b/proxy_agent_setup/src/main.rs index 89dce71e..4a90862e 100644 --- a/proxy_agent_setup/src/main.rs +++ b/proxy_agent_setup/src/main.rs @@ -13,6 +13,7 @@ pub mod setup; mod linux; use clap::Parser; +use proxy_agent_shared::current_info; use proxy_agent_shared::misc_helpers; use proxy_agent_shared::service; use std::process; From ad0959a9902133a2ad08ea277cfe7df669b51bf3 Mon Sep 17 00:00:00 2001 From: Zhidong Peng Date: Sat, 7 Feb 2026 01:41:29 +0000 Subject: [PATCH 09/12] fix mock_server error if had to bind to an ephemeral port --- proxy_agent_extension/src/service_main.rs | 1 + proxy_agent_shared/src/server_mock.rs | 4 +++- .../src/telemetry/event_sender.rs | 22 ++++++------------- 3 files changed, 11 insertions(+), 16 deletions(-) diff --git a/proxy_agent_extension/src/service_main.rs b/proxy_agent_extension/src/service_main.rs index de2936d6..3739a9ba 100644 --- a/proxy_agent_extension/src/service_main.rs +++ b/proxy_agent_extension/src/service_main.rs @@ -4,6 +4,7 @@ use crate::common; use crate::constants; use crate::logger; use crate::structs::*; +use proxy_agent_shared::current_info; use proxy_agent_shared::logger::LoggerLevel; use proxy_agent_shared::proxy_agent_aggregate_status::{ self, GuestProxyAgentAggregateStatus, ProxyConnectionSummary, diff --git a/proxy_agent_shared/src/server_mock.rs b/proxy_agent_shared/src/server_mock.rs index fb28d9f9..0472cf93 100644 --- a/proxy_agent_shared/src/server_mock.rs +++ b/proxy_agent_shared/src/server_mock.rs @@ -44,6 +44,8 @@ pub async fn start(ip: String, port: u16, cancellation_token: CancellationToken) } }; let local_addr = listener.local_addr().unwrap(); + // Update the port if we had to bind to an ephemeral port + let port = local_addr.port(); println!("Mock Server Listening on http://{local_addr}"); tokio::spawn(async move { loop { @@ -73,7 +75,7 @@ pub async fn start(ip: String, port: u16, cancellation_token: CancellationToken) } } }); - Ok(local_addr.port()) + Ok(port) } async fn handle_request( diff --git a/proxy_agent_shared/src/telemetry/event_sender.rs b/proxy_agent_shared/src/telemetry/event_sender.rs index fccbd5ca..3acba8ff 100644 --- a/proxy_agent_shared/src/telemetry/event_sender.rs +++ b/proxy_agent_shared/src/telemetry/event_sender.rs @@ -577,6 +577,12 @@ mod tests { while TELEMETRY_EVENT_QUEUE.pop().is_ok() {} // ===== Part 4: Test enqueue and process with mock server ===== + // Start mock server FIRST to respond to goalstate and shared config requests + let port = server_mock::start(ip.to_string(), port, cancellation_token.clone()) + .await + .unwrap(); + tokio::time::sleep(Duration::from_millis(100)).await; + // Enqueue events let event_a = create_test_event("Test event A for processing"); let event_b = create_test_event("Test event B for processing"); @@ -591,20 +597,6 @@ mod tests { "Queue should have 3 events after enqueue" ); - // Process events - VM data cannot be retrieved yet as mock server not started, but should still attempt to process and log warnings - event_sender.process_event_queue(Some(ip), Some(port)).await; - assert_eq!( - TELEMETRY_EVENT_QUEUE.len(), - 3, - "Queue should have 3 events after process_event_queue but without VM data" - ); - - // Start mock server to respond to goalstate and shared config requests - let port = server_mock::start(ip.to_string(), port, cancellation_token.clone()) - .await - .unwrap(); - tokio::time::sleep(Duration::from_millis(100)).await; - // Start the event sender in a separate task let handle = tokio::spawn(async move { event_sender.start(Some(ip), Some(port)).await; @@ -616,7 +608,7 @@ mod tests { // Notify to process events now that VM data can be retrieved process_common_state.notify_telemetry_event().await.unwrap(); - // Give it a moment to process the events (needs enough time for HTTP requests) + // Give it a moment to process the events (needs enough time for multiple HTTP requests) tokio::time::sleep(Duration::from_millis(500)).await; // Verify queue is empty after processing From f62ded442076e767fcecf89e4613e4ec41f62853 Mon Sep 17 00:00:00 2001 From: Zhidong Peng Date: Sat, 7 Feb 2026 03:30:26 +0000 Subject: [PATCH 10/12] fix --- proxy_agent_shared/src/telemetry/telemetry_event.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/proxy_agent_shared/src/telemetry/telemetry_event.rs b/proxy_agent_shared/src/telemetry/telemetry_event.rs index b50972da..c8ed89ef 100644 --- a/proxy_agent_shared/src/telemetry/telemetry_event.rs +++ b/proxy_agent_shared/src/telemetry/telemetry_event.rs @@ -346,7 +346,7 @@ impl TelemetryGenericLogsEvent { self.event_tid )); xml.push_str(&format!( - "", + "", misc_helpers::xml_escape(self.ga_version.to_string()) )); xml.push_str(&format!( @@ -451,7 +451,7 @@ impl TelemetryExtensionEventsEvent { self.event_tid )); xml.push_str(&format!( - "", + "", misc_helpers::xml_escape(self.ga_version.to_string()) )); xml.push_str(&format!( @@ -923,6 +923,7 @@ mod tests { assert!(xml.contains(&format!("", STATUS_PROVIDER_ID))); assert!(xml.contains("")); // Generic logs event assert!(xml.contains("")); // Extension event + println!("{xml}"); // Remove extension event let removed = telemetry_data.remove_last_event(extension_event); From 518302bef7b2c45642e693dadc3cac7c00b95029 Mon Sep 17 00:00:00 2001 From: Zhidong Peng Date: Mon, 9 Feb 2026 19:52:39 +0000 Subject: [PATCH 11/12] update --- proxy_agent/src/proxy/proxy_server.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/proxy_agent/src/proxy/proxy_server.rs b/proxy_agent/src/proxy/proxy_server.rs index 524ddc1f..666642a9 100644 --- a/proxy_agent/src/proxy/proxy_server.rs +++ b/proxy_agent/src/proxy/proxy_server.rs @@ -564,7 +564,7 @@ impl ProxyServer { if http_connection_context.should_skip_sig() { http_connection_context.log( - LoggerLevel::Info, + LoggerLevel::Trace, format!( "Skip compute signature for the request for {} {}", http_connection_context.method, http_connection_context.url From 473d66a22bccfdac6bc19dd4be20a2b36d222131 Mon Sep 17 00:00:00 2001 From: Zhidong Peng Date: Mon, 9 Feb 2026 23:28:25 +0000 Subject: [PATCH 12/12] resolve comment --- proxy_agent/src/proxy_agent_status.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/proxy_agent/src/proxy_agent_status.rs b/proxy_agent/src/proxy_agent_status.rs index 17b9b26a..51a944ba 100644 --- a/proxy_agent/src/proxy_agent_status.rs +++ b/proxy_agent/src/proxy_agent_status.rs @@ -144,7 +144,6 @@ impl ProxyAgentStatusTask { Ok(status) => status, Err(e) => format!("Error serializing proxy agent status: {e}"), }; - // private build tests report_extension_status_event event_logger::report_extension_status_event( agent_status.clone(), OperationStatus {