diff --git a/Cargo.lock b/Cargo.lock index 13c8002c..4e0701fb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -258,10 +258,12 @@ dependencies = [ name = "confidence-cloudflare-resolver" version = "0.9.0" dependencies = [ + "arc-swap", "base64 0.22.1", "bytes", "confidence_resolver", "getrandom 0.3.3", + "js-sys", "once_cell", "prost 0.13.5", "serde", diff --git a/confidence-cloudflare-resolver/Cargo.toml b/confidence-cloudflare-resolver/Cargo.toml index 7a55790a..67a7915d 100644 --- a/confidence-cloudflare-resolver/Cargo.toml +++ b/confidence-cloudflare-resolver/Cargo.toml @@ -28,5 +28,7 @@ worker = { version= "0.6.1", features=['queue'] } base64 = "0.22.1" once_cell = "1.19" prost = "0.13" +arc-swap = "1" +js-sys = "0.3" serde = { version = "1.0.219" } serde_json = "1.0.85" \ No newline at end of file diff --git a/confidence-cloudflare-resolver/deployer/script.sh b/confidence-cloudflare-resolver/deployer/script.sh index 42d18ea8..e5d9aaa0 100755 --- a/confidence-cloudflare-resolver/deployer/script.sh +++ b/confidence-cloudflare-resolver/deployer/script.sh @@ -366,6 +366,55 @@ else echo "⚠️ Could not check queue status (HTTP $QUEUE_STATUS)" fi +# Create KV namespace for /metrics endpoint if it doesn't exist +if [ -n "$WORKER_NAME_PREFIX" ]; then + KV_NAMESPACE_TITLE="${WORKER_NAME_PREFIX}-resolver-metrics" +else + KV_NAMESPACE_TITLE="resolver-metrics" +fi + +echo "🔍 Checking if KV namespace '$KV_NAMESPACE_TITLE' exists..." +KV_LIST=$(curl -sS -w "%{http_code}" \ + -H "Authorization: Bearer ${CLOUDFLARE_API_TOKEN}" \ + "https://api.cloudflare.com/client/v4/accounts/${CLOUDFLARE_ACCOUNT_ID}/storage/kv/namespaces?per_page=100") +KV_LIST_STATUS="${KV_LIST: -3}" +KV_LIST_BODY="${KV_LIST%???}" + +KV_NAMESPACE_ID="" +if [ "$KV_LIST_STATUS" = "200" ]; then + KV_NAMESPACE_ID=$(printf "%s" "$KV_LIST_BODY" | jq -r ".result[] | select(.title == \"${KV_NAMESPACE_TITLE}\") | .id" 2>/dev/null || true) +fi + +if [ -z "$KV_NAMESPACE_ID" ]; then + echo "📦 KV namespace '$KV_NAMESPACE_TITLE' not found, creating..." + KV_CREATE_RESP=$(curl -sS -w "%{http_code}" -X POST \ + -H "Authorization: Bearer ${CLOUDFLARE_API_TOKEN}" \ + -H "Content-Type: application/json" \ + -d "{\"title\": \"${KV_NAMESPACE_TITLE}\"}" \ + "https://api.cloudflare.com/client/v4/accounts/${CLOUDFLARE_ACCOUNT_ID}/storage/kv/namespaces") + KV_CREATE_STATUS="${KV_CREATE_RESP: -3}" + KV_CREATE_BODY="${KV_CREATE_RESP%???}" + if [ "$KV_CREATE_STATUS" = "200" ] || [ "$KV_CREATE_STATUS" = "201" ]; then + KV_NAMESPACE_ID=$(printf "%s" "$KV_CREATE_BODY" | jq -r '.result.id') + echo "✅ KV namespace '$KV_NAMESPACE_TITLE' created (id: $KV_NAMESPACE_ID)" + else + echo "⚠️ Failed to create KV namespace (HTTP $KV_CREATE_STATUS), /metrics will be unavailable" + fi +else + echo "✅ KV namespace '$KV_NAMESPACE_TITLE' already exists (id: $KV_NAMESPACE_ID)" +fi + +# Append KV binding to wrangler.toml if namespace was created +if [ -n "$KV_NAMESPACE_ID" ]; then + cat >> wrangler.toml <> = LazyLock::new(ResolveLogger::new); static ASSIGN_LOGGER: LazyLock = LazyLock::new(AssignLogger::new); +static TELEMETRY: LazyLock = LazyLock::new(Telemetry::new); +static LAST_FLUSHED: LazyLock> = + LazyLock::new(|| ArcSwap::from_pointee(TelemetrySnapshot::default())); use confidence_resolver::Client; use once_cell::sync::Lazy; +use std::cell::RefCell; + +/// High-resolution timestamp in milliseconds via `performance.now()`. +fn performance_now() -> f64 { + js_sys::Reflect::get(&js_sys::global(), &"performance".into()) + .ok() + .and_then(|p| js_sys::Reflect::get(&p, &"now".into()).ok()) + .and_then(|f| js_sys::Function::from(f).call0(&js_sys::global()).ok()) + .and_then(|v| v.as_f64()) + .unwrap_or_else(js_sys::Date::now) +} + +/// Per-request resolve metrics captured in the hot path, recorded in wait_until. +struct ResolveMetrics { + elapsed_us: u32, + reasons: Vec, +} + +thread_local! { + static PENDING_METRICS: RefCell> = const { RefCell::new(Vec::new()) }; +} /// SetResolverStateRequest message from the CDN. /// This matches the protobuf message format returned by the CDN. @@ -142,6 +168,19 @@ pub async fn main(req: Request, env: Env, ctx: Context) -> Result { let router = Router::new(); let response = router + .get_async("/metrics", |_req, ctx| { + let allowed_origin = allowed_origin_env.clone(); + async move { + let text = match ctx.env.kv("CONFIDENCE_METRICS_KV") { + Ok(kv) => kv.get("prometheus").text().await.unwrap_or(None), + Err(_) => None, + }; + let body = text.unwrap_or_default(); + let headers = Headers::new(); + headers.set("Content-Type", "text/plain; version=0.0.4; charset=utf-8")?; + Response::ok(body)?.with_headers(headers).with_cors_headers(&allowed_origin) + } + }) // GET endpoint to expose the current deployment state etag and resolver version .get_async("/v1/state:etag", |_req, _ctx| { let allowed_origin = allowed_origin_env.clone(); @@ -181,6 +220,7 @@ pub async fn main(req: Request, env: Env, ctx: Context) -> Result { .evaluation_context .clone() .unwrap_or_default(); + let start = performance_now(); match state.get_resolver::( &resolver_request.client_secret, evaluation_context, @@ -193,23 +233,56 @@ pub async fn main(req: Request, env: Env, ctx: Context) -> Result { ); match resolver.resolve_flags(process_request) { Ok(process_response) => { + let elapsed_us = ((performance_now() - start) * 1000.0) as u32; match process_response.into_resolved() { Some((response, _writes)) => { + let reasons: Vec = response + .resolved_flags + .iter() + .map(|f| f.reason()) + .collect(); + PENDING_METRICS.with(|m| { + m.borrow_mut().push(ResolveMetrics { elapsed_us, reasons }); + }); Response::from_json(&response)? .with_cors_headers(&allowed_origin) } - None => Response::error( - "Unexpected suspended response", - 500, - )? - .with_cors_headers(&allowed_origin), + None => { + PENDING_METRICS.with(|m| { + m.borrow_mut().push(ResolveMetrics { + elapsed_us, + reasons: vec![ResolveReason::Error], + }); + }); + Response::error( + "Unexpected suspended response", + 500, + )? + .with_cors_headers(&allowed_origin) + } } } - Err(msg) => Response::error(msg, 500)? - .with_cors_headers(&allowed_origin), + Err(msg) => { + let elapsed_us = ((performance_now() - start) * 1000.0) as u32; + PENDING_METRICS.with(|m| { + m.borrow_mut().push(ResolveMetrics { + elapsed_us, + reasons: vec![ResolveReason::Error], + }); + }); + Response::error(msg, 500)? + .with_cors_headers(&allowed_origin) + } } } Err(msg) => { + let elapsed_us = ((performance_now() - start) * 1000.0) as u32; + PENDING_METRICS.with(|m| { + m.borrow_mut().push(ResolveMetrics { + elapsed_us, + reasons: vec![ResolveReason::Error], + }); + }); Response::error(msg, 500)?.with_cors_headers(&allowed_origin) } } @@ -250,8 +323,18 @@ pub async fn main(req: Request, env: Env, ctx: Context) -> Result { .run(req, env) .await; - // Use ctx.waitUntil to run logging after response is returned + // Use ctx.waitUntil to run logging and telemetry after response is returned ctx.wait_until(async move { + // Record pending resolve metrics into the telemetry counters + PENDING_METRICS.with(|m| { + for metrics in m.borrow_mut().drain(..) { + TELEMETRY.record_latency_us(metrics.elapsed_us); + for reason in metrics.reasons { + TELEMETRY.mark_resolve(reason); + } + } + }); + let aggregated: confidence_resolver::proto::confidence::flags::resolver::v1::WriteFlagLogsRequest = checkpoint(); if let Ok(converted) = serde_json::to_string(&aggregated) { @@ -284,6 +367,12 @@ pub async fn consume_flag_logs_queue( client_resolve_info: v.client_resolve_info, }) .collect(); + + // Accumulate telemetry deltas into KV-backed cumulative snapshot for /metrics + if let Ok(kv) = env.kv("CONFIDENCE_METRICS_KV") { + let _ = update_prometheus_kv(&kv, &logs).await; + } + let req = flag_logger::aggregate_batch(logs); send_flags_logs(CONFIDENCE_CLIENT_SECRET.get().unwrap().as_str(), req).await?; } @@ -293,10 +382,42 @@ pub async fn consume_flag_logs_queue( fn checkpoint() -> WriteFlagLogsRequest { let mut req = RESOLVE_LOGGER.checkpoint(); + req.telemetry_data = Some(TELEMETRY.delta_snapshot(&LAST_FLUSHED)); ASSIGN_LOGGER.checkpoint_fill(&mut req); req } +/// Accumulate telemetry deltas from all isolates into a cumulative +/// `TelemetrySnapshot` stored in KV, then write its Prometheus text +/// representation for the /metrics endpoint. +/// +/// Note: concurrent queue consumer invocations can race on KV read-modify-write. +/// Acceptable for metrics — at worst one batch's deltas are lost, not cumulative state. +async fn update_prometheus_kv(kv: &kv::KvStore, logs: &[WriteFlagLogsRequest]) { + let mut cumulative = match kv.get("snapshot").text().await { + Ok(Some(text)) => serde_json::from_str::(&text).unwrap_or_default(), + _ => TelemetrySnapshot::default(), + }; + + for log in logs { + if let Some(td) = &log.telemetry_data { + cumulative.accumulate_delta(td); + } + } + + let prom_text = cumulative.to_prometheus( + "cf-resolver", + &confidence_resolver::telemetry::PrometheusConfig::default(), + ); + + if let Ok(builder) = kv.put("snapshot", serde_json::to_string(&cumulative).unwrap_or_default()) { + let _ = builder.execute().await; + } + if let Ok(builder) = kv.put("prometheus", prom_text) { + let _ = builder.execute().await; + } +} + async fn send_flags_logs(client_secret: &str, message: WriteFlagLogsRequest) -> Result { let resolve_url = "https://resolver.confidence.dev/v1/clientFlagLogs:write"; let mut init = RequestInit::new(); diff --git a/confidence-cloudflare-resolver/wrangler.toml b/confidence-cloudflare-resolver/wrangler.toml index 47660cce..76bbdcaa 100644 --- a/confidence-cloudflare-resolver/wrangler.toml +++ b/confidence-cloudflare-resolver/wrangler.toml @@ -14,5 +14,8 @@ max_batch_timeout = 10 # seconds queue = "flag-logs-queue" binding = "flag_logs_queue" +# KV namespace for /metrics endpoint is created and injected by the deployer. +# See deployer/script.sh for auto-creation of the CONFIDENCE_METRICS_KV binding. + [vars] CONFIDENCE_CLIENT_SECRET = "SECRET" diff --git a/confidence-resolver/src/flag_logger.rs b/confidence-resolver/src/flag_logger.rs index 307653f9..04acfd5a 100644 --- a/confidence-resolver/src/flag_logger.rs +++ b/confidence-resolver/src/flag_logger.rs @@ -4,6 +4,7 @@ use crate::proto::confidence::flags::admin::v1::flag_resolve_info::{ }; use crate::proto::confidence::flags::admin::v1::{ClientResolveInfo, FlagResolveInfo}; use crate::proto::confidence::flags::resolver::v1::events::FlagAssigned; +use crate::proto::confidence::flags::resolver::v1::telemetry_data::ResolveRate; use crate::proto::confidence::flags::resolver::v1::{TelemetryData, WriteFlagLogsRequest}; use std::collections::{HashMap, HashSet}; @@ -14,12 +15,14 @@ pub fn aggregate_batch(message_batch: Vec) -> WriteFlagLog let mut flag_resolve_map: HashMap = HashMap::new(); let mut flag_assigned: Vec = vec![]; let mut first_sdk: Option = None; + let mut agg_telemetry: Option = None; for flag_logs_message in message_batch { if let Some(td) = &flag_logs_message.telemetry_data { if first_sdk.is_none() && td.sdk.is_some() { first_sdk = td.sdk.clone(); } + agg_telemetry = Some(merge_telemetry(agg_telemetry.take(), td)); } for c in &flag_logs_message.client_resolve_info { @@ -98,10 +101,20 @@ pub fn aggregate_batch(message_batch: Vec) -> WriteFlagLog }) } - let telemetry_data = first_sdk.map(|sdk| TelemetryData { - sdk: Some(sdk), - ..Default::default() - }); + // Attach SDK info to the aggregated telemetry + let telemetry_data = match (agg_telemetry, first_sdk) { + (Some(mut td), sdk) => { + if td.sdk.is_none() { + td.sdk = sdk; + } + Some(td) + } + (None, Some(sdk)) => Some(TelemetryData { + sdk: Some(sdk), + ..Default::default() + }), + (None, None) => None, + }; WriteFlagLogsRequest { telemetry_data, @@ -111,6 +124,53 @@ pub fn aggregate_batch(message_batch: Vec) -> WriteFlagLog } } +/// Merge a telemetry delta into an accumulator. +/// Both are deltas, so counters are summed and gauges take the latest non-zero value. +fn merge_telemetry(acc: Option, delta: &TelemetryData) -> TelemetryData { + let mut acc = acc.unwrap_or_default(); + + // Merge resolve latency + match (&mut acc.resolve_latency, &delta.resolve_latency) { + (Some(a), Some(d)) => { + a.sum = a.sum.wrapping_add(d.sum); + a.count = a.count.wrapping_add(d.count); + a.buckets.extend(d.buckets.iter().cloned()); + if a.ln_ratio == 0.0 { + a.ln_ratio = d.ln_ratio; + } + } + (None, Some(d)) => { + acc.resolve_latency = Some(d.clone()); + } + _ => {} + } + + // Merge resolve rates by reason + for dr in &delta.resolve_rate { + if let Some(ar) = acc.resolve_rate.iter_mut().find(|r| r.reason == dr.reason) { + ar.count = ar.count.wrapping_add(dr.count); + } else { + acc.resolve_rate.push(ResolveRate { + count: dr.count, + reason: dr.reason, + }); + } + } + + // Gauges: take latest non-zero + if let Some(sa) = &delta.state_age { + acc.state_age = Some(sa.clone()); + } + if delta.memory_bytes > 0 { + acc.memory_bytes = delta.memory_bytes; + } + if !delta.resolver_version.is_empty() { + acc.resolver_version = delta.resolver_version.clone(); + } + + acc +} + struct SchemaItem { pub client: String, pub schemas: HashSet, diff --git a/confidence-resolver/src/telemetry.rs b/confidence-resolver/src/telemetry.rs index 4d9e658c..b74e9e85 100644 --- a/confidence-resolver/src/telemetry.rs +++ b/confidence-resolver/src/telemetry.rs @@ -106,6 +106,7 @@ impl Histogram { /// Used for delta computation between flushes and as the future intermediate /// representation for Prometheus text format serialization. #[derive(Clone, Default)] +#[cfg_attr(feature = "json", derive(serde::Serialize, serde::Deserialize))] pub struct TelemetrySnapshot { pub latency: HistogramSnapshot, pub resolve_rates: Vec, @@ -113,6 +114,7 @@ pub struct TelemetrySnapshot { } #[derive(Clone, Default)] +#[cfg_attr(feature = "json", derive(serde::Serialize, serde::Deserialize))] pub struct HistogramSnapshot { pub sum: u64, pub count: u64, @@ -142,6 +144,52 @@ impl Default for PrometheusConfig { } impl TelemetrySnapshot { + /// Accumulate a `TelemetryData` delta into this cumulative snapshot. + /// + /// Expands compressed `BucketSpan`s back into the flat bucket array and + /// adds all counters. Gauge fields (memory_bytes) are replaced with the + /// latest value. + #[allow(clippy::indexing_slicing, clippy::arithmetic_side_effects)] + pub fn accumulate_delta(&mut self, td: &pb::TelemetryData) { + if let Some(latency) = &td.resolve_latency { + self.latency.sum = self.latency.sum.wrapping_add(latency.sum as u64); + self.latency.count = self.latency.count.wrapping_add(latency.count as u64); + + // Expand BucketSpans into flat bucket array + for span in &latency.buckets { + let base = match usize::try_from(span.offset) { + Ok(b) if b < BUCKET_COUNT => b, + _ => continue, // skip malformed span + }; + for (i, &count) in span.counts.iter().enumerate() { + let idx = base.saturating_add(i); + if idx >= BUCKET_COUNT { + break; + } + if idx >= self.latency.buckets.len() { + self.latency.buckets.resize(idx.saturating_add(1), 0); + } + // Safety: idx < BUCKET_COUNT and we just resized to at least idx+1 + self.latency.buckets[idx] = + self.latency.buckets[idx].wrapping_add(count as u64); + } + } + } + + for rate in &td.resolve_rate { + let idx = rate.reason as usize; + if idx >= self.resolve_rates.len() { + self.resolve_rates.resize(idx.saturating_add(1), 0); + } + // Safety: we just resized to at least idx+1 + self.resolve_rates[idx] = self.resolve_rates[idx].wrapping_add(rate.count as u64); + } + + if td.memory_bytes > 0 { + self.memory_bytes = td.memory_bytes; + } + } + /// Format the snapshot as Prometheus exposition text. /// /// All values are cumulative counters, matching what Prometheus expects. @@ -939,4 +987,97 @@ mod tests { ); assert_eq!(default_out, zero_out); } + + #[test] + fn accumulate_delta_basic() { + let mut snap = TelemetrySnapshot::default(); + let td = pb::TelemetryData { + resolve_latency: Some(pb::ResolveLatency { + sum: 500, + count: 2, + buckets: vec![pb::BucketSpan { + offset: 5, + counts: vec![1, 1], + }], + ln_ratio: LN_RATIO, + }), + resolve_rate: vec![pb::ResolveRate { + reason: ResolveReason::Match as i32, + count: 3, + }], + memory_bytes: 4096, + ..Default::default() + }; + + snap.accumulate_delta(&td); + assert_eq!(snap.latency.sum, 500); + assert_eq!(snap.latency.count, 2); + assert_eq!(snap.latency.buckets[5], 1); + assert_eq!(snap.latency.buckets[6], 1); + assert_eq!(snap.resolve_rates[ResolveReason::Match as usize], 3); + assert_eq!(snap.memory_bytes, 4096); + + // Second delta accumulates counters, replaces gauge + snap.accumulate_delta(&td); + assert_eq!(snap.latency.sum, 1000); + assert_eq!(snap.latency.count, 4); + assert_eq!(snap.latency.buckets[5], 2); + assert_eq!(snap.resolve_rates[ResolveReason::Match as usize], 6); + assert_eq!(snap.memory_bytes, 4096); + } + + #[test] + fn accumulate_delta_negative_offset_skipped() { + let mut snap = TelemetrySnapshot::default(); + let td = pb::TelemetryData { + resolve_latency: Some(pb::ResolveLatency { + sum: 100, + count: 1, + buckets: vec![ + pb::BucketSpan { + offset: -1, + counts: vec![99], + }, + pb::BucketSpan { + offset: 3, + counts: vec![1], + }, + ], + ln_ratio: LN_RATIO, + }), + ..Default::default() + }; + + snap.accumulate_delta(&td); + // Negative offset span skipped, valid span applied + assert_eq!(snap.latency.count, 1); + assert_eq!(snap.latency.buckets.get(3).copied().unwrap_or(0), 1); + // Bucket from negative offset should not exist + let total: u64 = snap.latency.buckets.iter().sum(); + assert_eq!(total, 1); + } + + #[test] + fn accumulate_delta_oversized_offset_skipped() { + let mut snap = TelemetrySnapshot::default(); + let td = pb::TelemetryData { + resolve_latency: Some(pb::ResolveLatency { + sum: 100, + count: 1, + buckets: vec![pb::BucketSpan { + offset: BUCKET_COUNT as i32 + 10, + counts: vec![1], + }], + ln_ratio: LN_RATIO, + }), + ..Default::default() + }; + + snap.accumulate_delta(&td); + // Oversized offset span skipped, sum/count still accumulated + assert_eq!(snap.latency.count, 1); + assert_eq!(snap.latency.sum, 100); + let total: u64 = snap.latency.buckets.iter().sum(); + assert_eq!(total, 0); + } } diff --git a/openfeature-provider/go/confidence/internal/local_resolver/assets/confidence_resolver.wasm b/openfeature-provider/go/confidence/internal/local_resolver/assets/confidence_resolver.wasm index 20fa6527..02ed19fd 100755 Binary files a/openfeature-provider/go/confidence/internal/local_resolver/assets/confidence_resolver.wasm and b/openfeature-provider/go/confidence/internal/local_resolver/assets/confidence_resolver.wasm differ