diff --git a/Cargo.lock b/Cargo.lock index 4a3a8ff4..9430bbc7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -567,8 +567,10 @@ dependencies = [ "anyhow", "clap", "compute-pcrs-lib", + "env_logger", "k8s-openapi 0.28.0", "kube 4.0.0", + "log", "serde_json", "tokio", "trusted-cluster-operator-lib", diff --git a/compute-pcrs/Cargo.toml b/compute-pcrs/Cargo.toml index df9501db..3024bd78 100644 --- a/compute-pcrs/Cargo.toml +++ b/compute-pcrs/Cargo.toml @@ -15,7 +15,9 @@ anyhow.workspace = true clap.workspace = true trusted-cluster-operator-lib = { path = "../lib" } compute-pcrs-lib.workspace = true +env_logger.workspace = true k8s-openapi.workspace = true kube.workspace = true +log.workspace = true serde_json.workspace = true tokio.workspace = true diff --git a/compute-pcrs/src/main.rs b/compute-pcrs/src/main.rs index 0c1bfacf..34f9c056 100644 --- a/compute-pcrs/src/main.rs +++ b/compute-pcrs/src/main.rs @@ -6,11 +6,13 @@ use anyhow::{Context, Result, anyhow}; use clap::Parser; use compute_pcrs_lib::*; +use env_logger::Env; use k8s_openapi::{api::core::v1::ConfigMap, jiff::Timestamp}; use kube::{Api, Client}; +use log::info; use std::{fs::File, io::Read}; -use trusted_cluster_operator_lib::{conditions::INSTALLED_REASON, reference_values::*, *}; +use trusted_cluster_operator_lib::reference_values::*; #[derive(Parser)] #[command(version, about)] @@ -25,6 +27,7 @@ struct Args { #[tokio::main] async fn main() -> Result<()> { + env_logger::Builder::from_env(Env::default().default_filter_or("info")).init(); let args = Args::parse(); let kernels = format!("{IMAGE_VOLUME_MOUNTPOINT}/usr/lib/modules"); @@ -53,30 +56,38 @@ async fn main() -> Result<()> { ]; let client = Client::try_default().await?; - let config_maps: Api = Api::default_namespaced(client.clone()); - - let mut image_pcrs_map = config_maps.get(PCR_CONFIG_MAP).await?; - let image_pcrs_data = image_pcrs_map - .data - .context("Image PCRs map existed, but had no data")?; - let image_pcrs_str = image_pcrs_data - .get(PCR_CONFIG_FILE) - .context("Image PCRs data existed, but had no file")?; - let mut image_pcrs: ImagePcrs = serde_json::from_str(image_pcrs_str)?; + let config_maps: Api = Api::default_namespaced(client); let image_pcr = ImagePcr { first_seen: Timestamp::now(), reference: args.image, pcrs, }; - image_pcrs.0.insert(args.resource_name.clone(), image_pcr); - update_image_pcrs!(config_maps, image_pcrs_map, image_pcrs); - - let approved_images: Api = Api::default_namespaced(client); - let image = approved_images.get(&args.resource_name).await?; - let committed = committed_condition(INSTALLED_REASON, image.metadata.generation, &None); - let conditions = Some(vec![committed]); - let status = ApprovedImageStatus { conditions }; - update_status!(approved_images, &args.resource_name, status)?; + // If we see this causing performance problems, consider NoSQL + loop { + let mut image_pcrs_map = config_maps.get(PCR_CONFIG_MAP).await?; + let ctx = "Image PCRs map existed, but had no data"; + let image_pcrs_data = image_pcrs_map.data.context(ctx)?; + let ctx = "Image PCRs data existed, but had no file"; + let image_pcrs_str = image_pcrs_data.get(PCR_CONFIG_FILE).context(ctx)?; + let mut image_pcrs: ImagePcrs = serde_json::from_str(image_pcrs_str)?; + let resource_name = args.resource_name.clone(); + image_pcrs.0.insert(resource_name, image_pcr.clone()); + let image_pcrs_json = serde_json::to_string(&image_pcrs)?; + let map = (PCR_CONFIG_FILE.to_string(), image_pcrs_json); + let data = std::collections::BTreeMap::from([map]); + image_pcrs_map.data = Some(data); + match config_maps + .replace(PCR_CONFIG_MAP, &Default::default(), &image_pcrs_map) + .await + { + Ok(_) => break, + Err(kube::Error::Api(ae)) if ae.code == 409 => { + info!("ConfigMap update conflict, retrying"); + continue; + } + Err(e) => return Err(e.into()), + } + } Ok(()) } diff --git a/lib/src/lib.rs b/lib/src/lib.rs index 8861b6ce..b0595b7c 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -10,7 +10,6 @@ pub mod reference_values; mod kopium; #[allow(clippy::all)] mod vendor_kopium; -use k8s_openapi::jiff::Timestamp; pub use kopium::approvedimages::*; pub use kopium::attestationkeys::*; pub use kopium::ingresses as openshift_ingresses; @@ -26,88 +25,9 @@ pub use vendor_kopium::virtualmachineinstances; pub use vendor_kopium::virtualmachines; use anyhow::{Context, Result, anyhow}; -use conditions::*; -use k8s_openapi::apimachinery::pkg::apis::meta::v1::{Condition, OwnerReference, Time}; +use k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference; use kube::{Api, Client, Resource}; -#[macro_export] -macro_rules! update_status { - ($api:ident, $name:expr, $status:expr) => {{ - let patch = kube::api::Patch::Merge(serde_json::json!({"status": $status})); - $api.patch_status($name, &Default::default(), &patch).await - .map_err(Into::::into) - }} -} - -pub fn condition_status(status: bool) -> String { - match status { - true => "True".to_string(), - false => "False".to_string(), - } -} - -pub trait Conditions { - fn conditions(&self) -> &Option>; -} - -impl Conditions for TrustedExecutionClusterStatus { - fn conditions(&self) -> &Option> { - &self.conditions - } -} - -impl Conditions for AttestationKeyStatus { - fn conditions(&self) -> &Option> { - &self.conditions - } -} - -impl Conditions for ApprovedImageStatus { - fn conditions(&self) -> &Option> { - &self.conditions - } -} - -pub fn transition_time( - existing_status: &Option, - type_: &str, - new_status: &str, -) -> Time { - let get = |s: &S| s.conditions().clone(); - let conditions = existing_status.as_ref().and_then(get); - let find = |c: &Condition| type_ == c.type_ && new_status == c.status; - let existing = conditions.and_then(|cs| cs.into_iter().find(find)); - let time = existing.map(|c| c.last_transition_time); - time.unwrap_or(Time(Timestamp::now())) -} - -pub fn committed_condition( - reason: &str, - generation: Option, - existing_status: &Option, -) -> Condition { - let status = condition_status(reason == COMMITTED_REASON); - let type_ = COMMITTED_CONDITION; - Condition { - type_: type_.to_string(), - reason: reason.to_string(), - message: match reason { - NOT_COMMITTED_REASON_COMPUTING => "Computation is ongoing. Check jobs for progress.", - NOT_COMMITTED_REASON_NO_DIGEST => { - "Image did not specify a digest. \ - Only images with a digest are supported to avoid ambiguity." - } - NOT_COMMITTED_REASON_PENDING => "Pod is pending, check pods for details", - NOT_COMMITTED_REASON_FAILED => "Computation failed, check operator log for details", - _ => "", - } - .to_string(), - last_transition_time: transition_time(existing_status, type_, &status), - status, - observed_generation: generation, - } -} - /// Generate an OwnerReference for any Kubernetes resource pub fn generate_owner_reference>( object: &T, diff --git a/lib/src/reference_values.rs b/lib/src/reference_values.rs index f262d37d..1eaeeb81 100644 --- a/lib/src/reference_values.rs +++ b/lib/src/reference_values.rs @@ -12,7 +12,7 @@ pub const PCR_CONFIG_MAP: &str = "image-pcrs"; pub const PCR_CONFIG_FILE: &str = "image-pcrs.json"; pub const IMAGE_VOLUME_MOUNTPOINT: &str = "/image"; -#[derive(Deserialize, Serialize)] +#[derive(Clone, Deserialize, Serialize)] pub struct ImagePcr { pub first_seen: Timestamp, pub pcrs: Vec, @@ -21,15 +21,3 @@ pub struct ImagePcr { #[derive(Default, Deserialize, Serialize)] pub struct ImagePcrs(pub BTreeMap); - -#[macro_export] -macro_rules! update_image_pcrs { - ($api:ident, $map:ident, $pcrs:ident) => { - let image_pcrs_json = serde_json::to_string(&$pcrs)?; - let map = (PCR_CONFIG_FILE.to_string(), image_pcrs_json.to_string()); - let data = std::collections::BTreeMap::from([map]); - $map.data = Some(data); - $api.replace(PCR_CONFIG_MAP, &Default::default(), &$map) - .await? - }; -} diff --git a/operator/src/attestation_key_register.rs b/operator/src/attestation_key_register.rs index 332e5124..c9bdd817 100644 --- a/operator/src/attestation_key_register.rs +++ b/operator/src/attestation_key_register.rs @@ -31,14 +31,12 @@ use std::{collections::BTreeMap, sync::Arc, time::Duration}; use trusted_cluster_operator_lib::conditions::ATTESTATION_KEY_MACHINE_APPROVE; use trusted_cluster_operator_lib::endpoints::*; -use trusted_cluster_operator_lib::{AttestationKey, AttestationKeyStatus, Machine, update_status}; +use trusted_cluster_operator_lib::{AttestationKey, AttestationKeyStatus, Machine}; use crate::conditions::attestation_key_approved_condition; use crate::trustee; -use operator::{ - ControllerError, TLS_DIR, controller_error_policy, create_or_info_if_exists, read_certificate, - upsert_condition, -}; +use operator::{ControllerError, TLS_DIR, controller_error_policy}; +use operator::{create_or_info_if_exists, read_certificate, update_status, upsert_condition}; /// Shared context for the three attestation-key controllers. /// Stores give local cache access to avoid repeated API-server reads. @@ -243,7 +241,7 @@ async fn approve_ak(ak: &AttestationKey, machine: &Machine, ctx: &AkContextData) if changed { let status = AttestationKeyStatus { conditions }; - update_status!(aks, &name, status)?; + update_status(&aks, &name, status).await?; info!("Approved attestation key {name}"); } diff --git a/operator/src/conditions.rs b/operator/src/conditions.rs index 817d92c1..2dd4d7a9 100644 --- a/operator/src/conditions.rs +++ b/operator/src/conditions.rs @@ -3,8 +3,9 @@ // SPDX-License-Identifier: MIT use k8s_openapi::apimachinery::pkg::apis::meta::v1::Condition; +use operator::{condition_status, transition_time}; +use trusted_cluster_operator_lib::conditions::*; use trusted_cluster_operator_lib::{AttestationKeyStatus, TrustedExecutionClusterStatus}; -use trusted_cluster_operator_lib::{condition_status, conditions::*, transition_time}; pub fn known_trustee_address_condition( known: bool, diff --git a/operator/src/lib.rs b/operator/src/lib.rs index 5cc6caa1..ded6b80a 100644 --- a/operator/src/lib.rs +++ b/operator/src/lib.rs @@ -14,13 +14,17 @@ use k8s_openapi::api::core::v1::{Secret, SecretVolumeSource, Volume, VolumeMount use k8s_openapi::apimachinery::pkg::apis::meta::v1::{Condition, Time}; use k8s_openapi::jiff::Timestamp; use kube::Resource; +use kube::api::Patch; use kube::runtime::reflector::{self, Store}; use kube::runtime::watcher::watcher; use kube::{Api, Client, runtime::controller::Action}; use log::{info, warn}; +use serde::Serialize; use std::fmt::{Debug, Display}; use std::{sync::Arc, time::Duration}; use tokio::time::timeout; +use trusted_cluster_operator_lib::{ApprovedImageStatus, AttestationKeyStatus}; +use trusted_cluster_operator_lib::{TrustedExecutionClusterStatus, conditions::*}; // Re-export common functions from the lib pub use trusted_cluster_operator_lib::generate_owner_reference; @@ -31,6 +35,84 @@ pub enum ControllerError { Anyhow(#[from] anyhow::Error), } +pub async fn update_status(api: &Api, name: &str, status: S) -> Result<()> +where + K: Resource + serde::de::DeserializeOwned + Clone + Debug, +{ + let patch = Patch::Merge(serde_json::json!({"status": status})); + api.patch_status(name, &Default::default(), &patch).await?; + Ok(()) +} + +pub fn condition_status(status: bool) -> String { + match status { + true => "True".to_string(), + false => "False".to_string(), + } +} + +pub trait Conditions { + fn conditions(&self) -> &Option>; +} + +impl Conditions for TrustedExecutionClusterStatus { + fn conditions(&self) -> &Option> { + &self.conditions + } +} + +impl Conditions for AttestationKeyStatus { + fn conditions(&self) -> &Option> { + &self.conditions + } +} + +impl Conditions for ApprovedImageStatus { + fn conditions(&self) -> &Option> { + &self.conditions + } +} + +pub fn transition_time( + existing_status: &Option, + type_: &str, + new_status: &str, +) -> Time { + let get = |s: &S| s.conditions().clone(); + let conditions = existing_status.as_ref().and_then(get); + let find = |c: &Condition| type_ == c.type_ && new_status == c.status; + let existing = conditions.and_then(|cs| cs.into_iter().find(find)); + let time = existing.map(|c| c.last_transition_time); + time.unwrap_or(Time(Timestamp::now())) +} + +pub fn committed_condition( + reason: &str, + generation: Option, + existing_status: &Option, +) -> Condition { + let status = condition_status(reason == COMMITTED_REASON); + let type_ = COMMITTED_CONDITION; + Condition { + type_: type_.to_string(), + reason: reason.to_string(), + message: match reason { + NOT_COMMITTED_REASON_COMPUTING => "Computation is ongoing. Check jobs for progress.", + NOT_COMMITTED_REASON_NO_DIGEST => { + "Image did not specify a digest. \ + Only images with a digest are supported to avoid ambiguity." + } + NOT_COMMITTED_REASON_PENDING => "Pod is pending, check pods for details", + NOT_COMMITTED_REASON_FAILED => "Computation failed, check operator log for details", + _ => "", + } + .to_string(), + last_transition_time: transition_time(existing_status, type_, &status), + status, + observed_generation: generation, + } +} + pub fn controller_error_policy(_obj: Arc, error: &E, _ctx: Arc) -> Action { log::error!("{error}"); Action::requeue(Duration::from_secs(60)) diff --git a/operator/src/main.rs b/operator/src/main.rs index d6fc9944..4c4d5caa 100644 --- a/operator/src/main.rs +++ b/operator/src/main.rs @@ -17,9 +17,9 @@ use kube::runtime::watcher; use kube::{Api, Client}; use log::{error, info, warn}; -use operator::{generate_owner_reference, upsert_condition}; +use operator::{generate_owner_reference, update_status, upsert_condition}; use trusted_cluster_operator_lib::{TrustedExecutionCluster, TrustedExecutionClusterStatus}; -use trusted_cluster_operator_lib::{conditions::*, images::*, update_status}; +use trusted_cluster_operator_lib::{conditions::*, images::*}; mod attestation_key_register; mod conditions; @@ -96,8 +96,9 @@ async fn reconcile( let uninstall_condition = installed_condition(uninstalling_reason, generation, existing_status); let changed = upsert_condition(&mut conditions, uninstall_condition); + let status = TrustedExecutionClusterStatus { conditions }; if changed { - update_status!(clusters, name, TrustedExecutionClusterStatus { conditions })?; + update_status(&clusters, name, status).await?; } return Ok(Action::await_change()); } @@ -115,8 +116,9 @@ async fn reconcile( let non_unique_condition = installed_condition(NOT_INSTALLED_REASON_NON_UNIQUE, generation, existing_status); let changed = upsert_condition(&mut conditions, non_unique_condition); + let status = TrustedExecutionClusterStatus { conditions }; if changed { - update_status!(clusters, name, TrustedExecutionClusterStatus { conditions })?; + update_status(&clusters, name, status).await?; } return Ok(Action::requeue(Duration::from_secs(60))); } @@ -129,7 +131,7 @@ async fn reconcile( let status = TrustedExecutionClusterStatus { conditions: conditions.clone(), }; - update_status!(clusters, name, status)?; + update_status(&clusters, name, status).await?; } install_trustee_configuration(kube_client.clone(), &cluster).await?; @@ -141,7 +143,7 @@ async fn reconcile( let changed = upsert_condition(&mut conditions, installed_condition); if changed { let status = TrustedExecutionClusterStatus { conditions }; - update_status!(clusters, name, status)?; + update_status(&clusters, name, status).await?; } Ok(Action::await_change()) } diff --git a/operator/src/reference_values.rs b/operator/src/reference_values.rs index f202c533..23c7407d 100644 --- a/operator/src/reference_values.rs +++ b/operator/src/reference_values.rs @@ -32,8 +32,8 @@ use std::{collections::BTreeMap, sync::Arc, time::Duration}; use crate::COMPONENT_VERSION; use crate::trustee::{self, get_image_pcrs}; -use operator::{ControllerError, upsert_condition}; -use operator::{controller_error_policy, controller_info, create_or_info_if_exists}; +use operator::{ControllerError, committed_condition, upsert_condition}; +use operator::{controller_error_policy, controller_info, create_or_info_if_exists, update_status}; use trusted_cluster_operator_lib::{conditions::*, reference_values::*, *}; const JOB_LABEL_KEY: &str = "kind"; @@ -125,6 +125,19 @@ async fn job_reconcile(job: Arc, client: Arc) -> Result = Api::default_namespaced(kube_client.clone()); + let try_image = approved_images.get(resource_name).await; + let image = try_image.map_err(Into::::into)?; + let committed = committed_condition(INSTALLED_REASON, image.metadata.generation, &None); + let conditions = Some(vec![committed]); + let image_status = ApprovedImageStatus { conditions }; + update_status(&approved_images, resource_name, image_status).await?; + let jobs: Api = Api::default_namespaced(kube_client.clone()); // Foreground deletion: Delete the pod too let delete = jobs.delete(name, &DeleteParams::foreground()).await; @@ -159,6 +172,7 @@ fn get_job_name(boot_image: &str) -> Result { Ok(trimmed) } +// If jobs running in a suboptimal order becomes a problem, consider work queues async fn compute_fresh_pcrs(client: Client, image: &ApprovedImage) -> anyhow::Result<()> { let job_name = get_job_name(&image.spec.image)?; let env = "RELATED_IMAGE_COMPUTE_PCRS"; @@ -305,7 +319,8 @@ async fn image_add_reconcile( let changed = upsert_condition(&mut conditions, committed); if changed { let images: Api = Api::default_namespaced(client); - update_status!(images, &name, ApprovedImageStatus { conditions }) + update_status(&images, name, ApprovedImageStatus { conditions }) + .await .map_err(|e| finalizer::Error::::ApplyFailed(e.into()))?; } Ok(action) @@ -355,6 +370,19 @@ async fn is_pending(client: &Client, resource_name: &str) -> Result { .is_some_and(|phase| phase == "Pending")) } +async fn update_image_pcrs( + api: &Api, + map: &mut ConfigMap, + pcrs: &ImagePcrs, +) -> Result<()> { + let image_pcrs_json = serde_json::to_string(pcrs)?; + let entry = (PCR_CONFIG_FILE.to_string(), image_pcrs_json); + map.data = Some(BTreeMap::from([entry])); + api.replace(PCR_CONFIG_MAP, &Default::default(), map) + .await?; + Ok(()) +} + pub async fn handle_new_image(client: Client, image: &ApprovedImage) -> Result<&'static str> { let resource_name = image.metadata.name.as_ref().unwrap(); let boot_image = image.spec.image.as_ref(); @@ -404,7 +432,7 @@ pub async fn handle_new_image(client: Client, image: &ApprovedImage) -> Result<& reference: boot_image.to_string(), }; image_pcrs.0.insert(resource_name.to_string(), image_pcr); - update_image_pcrs!(config_maps, image_pcrs_map, image_pcrs); + update_image_pcrs(&config_maps, &mut image_pcrs_map, &image_pcrs).await?; trustee::update_reference_values(client) .await .map(|_| COMMITTED_REASON) @@ -417,7 +445,7 @@ pub async fn disallow_image(client: Client, resource_name: &str) -> Result<()> { if image_pcrs.0.remove(resource_name).is_none() { info!("Image {resource_name} was to be disallowed, but already was not allowed"); } - update_image_pcrs!(config_maps, image_pcrs_map, image_pcrs); + update_image_pcrs(&config_maps, &mut image_pcrs_map, &image_pcrs).await?; trustee::update_reference_values(client).await } @@ -427,7 +455,7 @@ mod tests { use crate::test_utils::*; use http::{Method, Request, StatusCode}; use k8s_openapi::api::batch::v1::JobStatus; - use k8s_openapi::apimachinery::pkg::apis::meta::v1::Time; + use k8s_openapi::apimachinery::pkg::apis::meta::v1::{OwnerReference, Time}; use kube::api::ObjectList; use kube::client::Body; use trusted_cluster_operator_test_utils::mock_client::*; @@ -472,6 +500,10 @@ mod tests { Job { metadata: ObjectMeta { name: Some("test".to_string()), + owner_references: Some(vec![OwnerReference { + name: "test".to_string(), + ..Default::default() + }]), ..Default::default() }, status: Some(JobStatus { @@ -485,18 +517,20 @@ mod tests { #[tokio::test] async fn test_job_reconcile_success() { let clos = async |req: Request<_>, ctr| match (ctr, req.method()) { - (0, &Method::DELETE) => Ok(serde_json::to_string(&Job::default()).unwrap()), - (1, &Method::GET) => { + (0, &Method::GET) => Ok(serde_json::to_string(&dummy_image()).unwrap()), + (1, &Method::PATCH) => Ok(serde_json::to_string(&dummy_image()).unwrap()), + (2, &Method::DELETE) => Ok(serde_json::to_string(&Job::default()).unwrap()), + (3, &Method::GET) => { assert!(req.uri().path().contains(PCR_CONFIG_MAP)); Ok(serde_json::to_string(&dummy_pcrs_map()).unwrap()) } - (2, &Method::GET) | (3, &Method::PUT) => { + (4, &Method::GET) | (5, &Method::PUT) => { assert!(req.uri().path().contains(trustee::TRUSTEE_DATA_MAP)); Ok(serde_json::to_string(&dummy_trustee_map()).unwrap()) } _ => panic!("unexpected API interaction: {req:?}, counter {ctr}"), }; - count_check!(4, clos, |client| { + count_check!(6, clos, |client| { let job = Arc::new(dummy_job()); let result = job_reconcile(job, Arc::new(client)).await.unwrap(); assert_eq!(result, Action::await_change()); @@ -617,4 +651,34 @@ mod tests { assert!(image_remove_reconcile(client, image, cluster).await.is_ok()); }); } + + #[tokio::test] + async fn test_update_image_pcrs_success() { + let clos = async |req: Request<_>, _| match req.method() { + &Method::PUT => Ok(serde_json::to_string(&dummy_pcrs_map()).unwrap()), + _ => panic!("unexpected API interaction: {req:?}"), + }; + count_check!(1, clos, |client| { + let config_maps: Api = Api::default_namespaced(client); + let mut map = dummy_pcrs_map(); + let pcrs = dummy_pcrs(); + let result = update_image_pcrs(&config_maps, &mut map, &pcrs).await; + assert!(result.is_ok()); + }); + } + + #[tokio::test] + async fn test_update_image_pcrs_error() { + let clos = async |req: Request<_>, _| match req.method() { + &Method::PUT => Err(StatusCode::INTERNAL_SERVER_ERROR), + _ => panic!("unexpected API interaction: {req:?}"), + }; + count_check!(1, clos, |client| { + let config_maps: Api = Api::default_namespaced(client); + let mut map = dummy_pcrs_map(); + let pcrs = dummy_pcrs(); + let result = update_image_pcrs(&config_maps, &mut map, &pcrs).await; + assert!(result.is_err()); + }); + } }