Skip to content
5 changes: 4 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,8 @@
],
"vscode-nmake-tools.workspaceBuildDirectories": [
"."
]
],
"chat.tools.terminal.autoApprove": {
"./build-linux.sh": true
}
}
4 changes: 2 additions & 2 deletions proxy_agent/src/common/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,14 @@ fn log(log_level: LoggerLevel, message: String) {

#[cfg(not(windows))]
pub fn write_serial_console_log(message: String) {
use proxy_agent_shared::misc_helpers;
use proxy_agent_shared::{current_info, misc_helpers};
use std::io::Write;

let message = format!(
"{} {}_{}({}) - {}\n",
misc_helpers::get_date_time_string_with_milliseconds(),
env!("CARGO_PKG_NAME"),
misc_helpers::get_current_version(),
current_info::get_current_exe_version(),
std::process::id(),
message
);
Expand Down
10 changes: 4 additions & 6 deletions proxy_agent/src/key_keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1032,19 +1032,17 @@ mod tests {
match fs::remove_dir_all(&temp_test_path) {
Ok(_) => {}
Err(e) => {
print!("Failed to remove_dir_all with error {}.", e);
eprintln!("Failed to remove_dir_all with error {}.", e);
}
}

let cancellation_token = CancellationToken::new();
// start wire_server listener
let ip = "127.0.0.1";
let port = 8081u16;
tokio::spawn(server_mock::start(
ip.to_string(),
port,
cancellation_token.clone(),
));
let port = server_mock::start(ip.to_string(), port, cancellation_token.clone())
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;

// start with disabled secure channel state
Expand Down
4 changes: 2 additions & 2 deletions proxy_agent/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use common::cli::{Commands, CLI};
use common::constants;
use common::helpers;
use provision::provision_query::ProvisionQuery;
use proxy_agent_shared::misc_helpers;
use proxy_agent_shared::current_info;
use shared_state::SharedState;
use std::{process, time::Duration};

Expand Down Expand Up @@ -48,7 +48,7 @@ async fn main() {
let _time = helpers::get_elapsed_time_in_millisec();

if CLI.version {
println!("{}", misc_helpers::get_current_version());
println!("{}", current_info::get_current_exe_version());
return;
}

Expand Down
13 changes: 13 additions & 0 deletions proxy_agent/src/provision.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,19 @@ pub async fn start_event_threads(event_threads_shared_state: EventThreadsSharedS
.await;
}
});
tokio::spawn({
let event_reader = EventReader::new(
config::get_events_dir(),
event_threads_shared_state.common_state.clone(),
"ProxyAgent".to_string(),
"MicrosoftAzureGuestProxyAgent".to_string(),
);
async move {
event_reader
.start_extension_status_event_processor(true, Some(Duration::from_secs(60)))
.await;
}
});
tokio::spawn({
let event_sender = EventSender::new(event_threads_shared_state.common_state.clone());
async move {
Expand Down
2 changes: 1 addition & 1 deletion proxy_agent/src/proxy/authorization_rules.rs
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ mod tests {
match std::fs::remove_dir_all(&temp_test_path) {
Ok(_) => {}
Err(e) => {
print!("Failed to remove_dir_all with error {}.", e);
eprintln!("Failed to remove_dir_all with error {}.", e);
}
}
misc_helpers::try_create_folder(&temp_test_path).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion proxy_agent/src/proxy/proxy_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ impl ProxyServer {

if http_connection_context.should_skip_sig() {
http_connection_context.log(
LoggerLevel::Info,
LoggerLevel::Trace,
format!(
"Skip compute signature for the request for {} {}",
http_connection_context.method, http_connection_context.url
Expand Down
32 changes: 20 additions & 12 deletions proxy_agent/src/proxy_agent_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,17 @@
//! tokio::spawn(proxy_agent_status_task.start());
//! ```

use crate::common::logger;
use crate::common::{constants, logger};
use crate::key_keeper::UNKNOWN_STATE;
use crate::shared_state::agent_status_wrapper::{AgentStatusModule, AgentStatusSharedState};
use crate::shared_state::connection_summary_wrapper::ConnectionSummarySharedState;
use crate::shared_state::key_keeper_wrapper::KeyKeeperSharedState;
use proxy_agent_shared::logger::LoggerLevel;
use proxy_agent_shared::misc_helpers;
use proxy_agent_shared::proxy_agent_aggregate_status::{
GuestProxyAgentAggregateStatus, ModuleState, OverallState, ProxyAgentDetailStatus,
ProxyAgentStatus,
};
use proxy_agent_shared::telemetry::event_logger;
use proxy_agent_shared::telemetry::{event_logger, Extension, OperationStatus};
use proxy_agent_shared::{current_info, misc_helpers};
use std::collections::HashMap;
use std::path::PathBuf;
use std::time::{Duration, Instant};
Expand Down Expand Up @@ -126,7 +125,12 @@ impl ProxyAgentStatusTask {
e
));
}

let agent_status = Extension {
name: constants::PROXY_AGENT_SERVICE_NAME.to_string(),
version: current_info::get_current_exe_version(),
is_internal: true,
extension_type: "Monitoring".to_string(),
};
loop {
#[cfg(not(windows))]
{
Expand All @@ -140,12 +144,16 @@ impl ProxyAgentStatusTask {
Ok(status) => status,
Err(e) => format!("Error serializing proxy agent status: {e}"),
};
event_logger::write_event(
LoggerLevel::Info,
status,
"loop_status",
"proxy_agent_status",
logger::AGENT_LOGGER_KEY,
event_logger::report_extension_status_event(
agent_status.clone(),
OperationStatus {
operation_success: aggregate_status.proxyAgentStatus.status
== OverallState::SUCCESS,
task_name: "loop_status".to_string(),
operation: "report_proxy_agent_status".to_string(),
message: status,
duration: status_report_time.elapsed().as_millis() as i64,
},
);
status_report_time = Instant::now();
}
Expand Down Expand Up @@ -225,7 +233,7 @@ impl ProxyAgentStatusTask {
};

ProxyAgentStatus {
version: misc_helpers::get_current_version(),
version: current_info::get_current_exe_version(),
status,
// monitorStatus is proxy_agent_status itself status
monitorStatus: ProxyAgentDetailStatus {
Expand Down
2 changes: 1 addition & 1 deletion proxy_agent/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub async fn start_service(shared_state: SharedState) {

let start_message = format!(
"============== GuestProxyAgent ({}) is starting on {}({}), elapsed: {}",
proxy_agent_shared::misc_helpers::get_current_version(),
current_info::get_current_exe_version(),
current_info::get_long_os_version(),
current_info::get_cpu_arch(),
helpers::get_elapsed_time_in_millisec()
Expand Down
3 changes: 2 additions & 1 deletion proxy_agent_extension/src/handler_main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::logger;
use crate::structs;
use crate::ExtensionCommand;
use once_cell::sync::Lazy;
use proxy_agent_shared::current_info;
use proxy_agent_shared::misc_helpers;
use proxy_agent_shared::version::Version;
use std::fs::{self};
Expand Down Expand Up @@ -41,7 +42,7 @@ pub async fn program_start(command: ExtensionCommand, config_seq_no: String) {

logger::write(format!(
"GuestProxyAgentExtension Version: {}, OS Arch: {}, OS Version: {}",
misc_helpers::get_current_version(),
current_info::get_current_exe_version(),
misc_helpers::get_processor_arch(),
misc_helpers::get_long_os_version()
));
Expand Down
3 changes: 2 additions & 1 deletion proxy_agent_extension/src/service_main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::common;
use crate::constants;
use crate::logger;
use crate::structs::*;
use proxy_agent_shared::current_info;
use proxy_agent_shared::logger::LoggerLevel;
use proxy_agent_shared::proxy_agent_aggregate_status::{
self, GuestProxyAgentAggregateStatus, ProxyConnectionSummary,
Expand All @@ -28,7 +29,7 @@ const MAX_STATE_COUNT: u32 = 120;
pub fn run() {
let message = format!(
"============== GuestProxyAgentExtension Enabling Agent, Version: {}, OS Arch: {}, OS Version: {}",
misc_helpers::get_current_version(),
current_info::get_current_exe_version(),
misc_helpers::get_processor_arch(),
misc_helpers::get_long_os_version()
);
Expand Down
3 changes: 2 additions & 1 deletion proxy_agent_setup/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub mod setup;
mod linux;

use clap::Parser;
use proxy_agent_shared::current_info;
use proxy_agent_shared::misc_helpers;
use proxy_agent_shared::service;
use std::process;
Expand All @@ -32,7 +33,7 @@ async fn main() {
let cli = args::Cli::parse();
logger::write(format!(
"\r\n\r\n============== ProxyAgent Setup Tool ({}) is starting with args: {} ==============",
misc_helpers::get_current_version(),
current_info::get_current_exe_version(),
cli
));

Expand Down
11 changes: 5 additions & 6 deletions proxy_agent_shared/src/hyper_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,7 @@ mod tests {
use crate::{
host_clients::{imds_client::ImdsClient, wire_server_client::WireServerClient},
logger::logger_manager,
server_mock,
};
use tokio_util::sync::CancellationToken;

Expand Down Expand Up @@ -510,13 +511,11 @@ mod tests {
async fn http_request_tests() {
// start mock server
let ip = "127.0.0.1";
let port = 7072u16;
let port = 9072u16;
let cancellation_token = CancellationToken::new();
tokio::spawn(crate::server_mock::start(
ip.to_string(),
port,
cancellation_token.clone(),
));
let port = server_mock::start(ip.to_string(), port, cancellation_token.clone())
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
logger_manager::write_info("server_mock started.".to_string());

Expand Down
71 changes: 47 additions & 24 deletions proxy_agent_shared/src/server_mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,38 +21,61 @@ static EMPTY_GUID: Lazy<String> = Lazy::new(|| "00000000-0000-0000-0000-00000000
static GUID: Lazy<String> = Lazy::new(|| Uuid::new_v4().to_string());
static mut CURRENT_STATE: Lazy<String> = Lazy::new(|| String::from("wireserver"));

pub async fn start(ip: String, port: u16, cancellation_token: CancellationToken) {
/// A mock server to simulate the behavior of the Azure WireServer for testing purposes.
/// It listens for incoming HTTP requests and responds with predefined responses based on the request path and method.
/// The server can be started on a specified IP and port, and it can be gracefully shut down using a cancellation token.
/// If the port is already in use, it will automatically bind to an ephemeral port and log a warning message.
/// Returns the port number that the server is listening on, or an error if the server fails to start.
pub async fn start(ip: String, port: u16, cancellation_token: CancellationToken) -> Result<u16> {
logger_manager::write_info("Mock Server starting...".to_string());
let addr = format!("{ip}:{port}");
let listener = TcpListener::bind(&addr).await.unwrap();
println!("Listening on http://{addr}");

loop {
tokio::select! {
_ = cancellation_token.cancelled() => {
logger_manager::write_warn("cancellation token signal received, stop the listener.".to_string());
return;
let listener = match TcpListener::bind(&addr).await {
Ok(l) => l,
Err(e) => {
// if the specified port is already in use, bind to an ephemeral port instead
if e.kind() == std::io::ErrorKind::AddrInUse {
logger_manager::write_warn(format!(
"Port {port} is already in use, trying to bind to an ephemeral port."
));
TcpListener::bind(format!("{ip}:0")).await?
} else {
return Err(e.into());
}
result = listener.accept() => {
match result {
Ok((stream, _)) =>{
let ip = ip.to_string();
tokio::spawn(async move {
let io = TokioIo::new(stream);
}
};
let local_addr = listener.local_addr().unwrap();
// Update the port if we had to bind to an ephemeral port
let port = local_addr.port();
println!("Mock Server Listening on http://{local_addr}");
tokio::spawn(async move {
loop {
tokio::select! {
_ = cancellation_token.cancelled() => {
logger_manager::write_warn("cancellation token signal received, stop the listener.".to_string());
return;
}
result = listener.accept() => {
match result {
Ok((stream, _)) =>{
let ip = ip.to_string();
let service = service_fn(move |req| handle_request(ip.to_string(), port, req));
if let Err(err) = http1::Builder::new().serve_connection(io, service).await {
println!("Error serving connection: {err:?}");
}
});
},
Err(e) => {
logger_manager::write_err(format!("Failed to accept connection: {e}"));
tokio::spawn(async move {
let io = TokioIo::new(stream);
let ip = ip.to_string();
let service = service_fn(move |req| handle_request(ip.to_string(), port, req));
if let Err(err) = http1::Builder::new().serve_connection(io, service).await {
println!("Error serving connection: {err:?}");
}
});
},
Err(e) => {
logger_manager::write_err(format!("Failed to accept connection: {e}"));
}
}
}
}
}
}
});
Ok(port)
}

async fn handle_request(
Expand Down
Loading
Loading