From bbfc7035e9b3b5b1fcab7a833cb39b782824e30c Mon Sep 17 00:00:00 2001 From: vahidlazio Date: Thu, 7 May 2026 11:16:39 +0200 Subject: [PATCH 1/9] feat(cloudflare): add telemetry collection and /metrics endpoint Add Prometheus-compatible telemetry to the Cloudflare resolver, matching the same metric names as the WASM providers so they can share dashboards. - Collect per-flag resolve latency and reason in the fetch handler, deferred to ctx.wait_until to keep the hot path clean - Include telemetry deltas in WriteFlagLogsRequest via checkpoint() - Queue consumer accumulates cross-isolate deltas into a cumulative TelemetrySnapshot persisted in KV - Serve /metrics endpoint reading Prometheus text from KV - Add serde derives to TelemetrySnapshot and accumulate_delta() method for reconstructing flat histograms from compressed BucketSpans - Deployer auto-creates KV namespace (same pattern as queue creation) Co-Authored-By: Claude Opus 4.6 --- Cargo.lock | 2 + confidence-cloudflare-resolver/Cargo.toml | 2 + .../deployer/script.sh | 49 +++++++ confidence-cloudflare-resolver/src/lib.rs | 127 ++++++++++++++++-- confidence-cloudflare-resolver/wrangler.toml | 3 + confidence-resolver/src/telemetry.rs | 37 +++++ 6 files changed, 211 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 13c8002c..4e0701fb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -258,10 +258,12 @@ dependencies = [ name = "confidence-cloudflare-resolver" version = "0.9.0" dependencies = [ + "arc-swap", "base64 0.22.1", "bytes", "confidence_resolver", "getrandom 0.3.3", + "js-sys", "once_cell", "prost 0.13.5", "serde", diff --git a/confidence-cloudflare-resolver/Cargo.toml b/confidence-cloudflare-resolver/Cargo.toml index 7a55790a..67a7915d 100644 --- a/confidence-cloudflare-resolver/Cargo.toml +++ b/confidence-cloudflare-resolver/Cargo.toml @@ -28,5 +28,7 @@ worker = { version= "0.6.1", features=['queue'] } base64 = "0.22.1" once_cell = "1.19" prost = "0.13" +arc-swap = "1" +js-sys = "0.3" serde = { version = "1.0.219" } serde_json = "1.0.85" \ No newline at end of file diff --git a/confidence-cloudflare-resolver/deployer/script.sh b/confidence-cloudflare-resolver/deployer/script.sh index 42d18ea8..5e75caee 100755 --- a/confidence-cloudflare-resolver/deployer/script.sh +++ b/confidence-cloudflare-resolver/deployer/script.sh @@ -366,6 +366,55 @@ else echo "⚠️ Could not check queue status (HTTP $QUEUE_STATUS)" fi +# Create KV namespace for /metrics endpoint if it doesn't exist +if [ -n "$WORKER_NAME_PREFIX" ]; then + KV_NAMESPACE_TITLE="${WORKER_NAME_PREFIX}-resolver-metrics" +else + KV_NAMESPACE_TITLE="resolver-metrics" +fi + +echo "🔍 Checking if KV namespace '$KV_NAMESPACE_TITLE' exists..." +KV_LIST=$(curl -sS -w "%{http_code}" \ + -H "Authorization: Bearer ${CLOUDFLARE_API_TOKEN}" \ + "https://api.cloudflare.com/client/v4/accounts/${CLOUDFLARE_ACCOUNT_ID}/storage/kv/namespaces?per_page=100") +KV_LIST_STATUS="${KV_LIST: -3}" +KV_LIST_BODY="${KV_LIST%???}" + +KV_NAMESPACE_ID="" +if [ "$KV_LIST_STATUS" = "200" ]; then + KV_NAMESPACE_ID=$(printf "%s" "$KV_LIST_BODY" | jq -r ".result[] | select(.title == \"${KV_NAMESPACE_TITLE}\") | .id" 2>/dev/null || true) +fi + +if [ -z "$KV_NAMESPACE_ID" ]; then + echo "📦 KV namespace '$KV_NAMESPACE_TITLE' not found, creating..." + KV_CREATE_RESP=$(curl -sS -w "%{http_code}" -X POST \ + -H "Authorization: Bearer ${CLOUDFLARE_API_TOKEN}" \ + -H "Content-Type: application/json" \ + -d "{\"title\": \"${KV_NAMESPACE_TITLE}\"}" \ + "https://api.cloudflare.com/client/v4/accounts/${CLOUDFLARE_ACCOUNT_ID}/storage/kv/namespaces") + KV_CREATE_STATUS="${KV_CREATE_RESP: -3}" + KV_CREATE_BODY="${KV_CREATE_RESP%???}" + if [ "$KV_CREATE_STATUS" = "200" ] || [ "$KV_CREATE_STATUS" = "201" ]; then + KV_NAMESPACE_ID=$(printf "%s" "$KV_CREATE_BODY" | jq -r '.result.id') + echo "✅ KV namespace '$KV_NAMESPACE_TITLE' created (id: $KV_NAMESPACE_ID)" + else + echo "⚠️ Failed to create KV namespace (HTTP $KV_CREATE_STATUS), /metrics will be unavailable" + fi +else + echo "✅ KV namespace '$KV_NAMESPACE_TITLE' already exists (id: $KV_NAMESPACE_ID)" +fi + +# Append KV binding to wrangler.toml if namespace was created +if [ -n "$KV_NAMESPACE_ID" ]; then + cat >> wrangler.toml <> = LazyLock::new(ResolveLogger::new); static ASSIGN_LOGGER: LazyLock = LazyLock::new(AssignLogger::new); +static TELEMETRY: LazyLock = LazyLock::new(Telemetry::new); +static LAST_FLUSHED: LazyLock> = + LazyLock::new(|| ArcSwap::from_pointee(TelemetrySnapshot::default())); use confidence_resolver::Client; use once_cell::sync::Lazy; +use std::cell::RefCell; + +/// Per-request resolve metrics captured in the hot path, recorded in wait_until. +struct ResolveMetrics { + elapsed_us: u32, + reasons: Vec, +} + +thread_local! { + static PENDING_METRICS: RefCell> = const { RefCell::new(Vec::new()) }; +} /// SetResolverStateRequest message from the CDN. /// This matches the protobuf message format returned by the CDN. @@ -142,6 +158,18 @@ pub async fn main(req: Request, env: Env, ctx: Context) -> Result { let router = Router::new(); let response = router + .get_async("/metrics", |_req, ctx| { + async move { + let text = match ctx.env.kv("METRICS_KV") { + Ok(kv) => kv.get("prometheus").text().await.unwrap_or(None), + Err(_) => None, + }; + let body = text.unwrap_or_default(); + let headers = Headers::new(); + headers.set("Content-Type", "text/plain; version=0.0.4; charset=utf-8")?; + Ok(Response::ok(body)?.with_headers(headers)) + } + }) // GET endpoint to expose the current deployment state etag and resolver version .get_async("/v1/state:etag", |_req, _ctx| { let allowed_origin = allowed_origin_env.clone(); @@ -181,6 +209,7 @@ pub async fn main(req: Request, env: Env, ctx: Context) -> Result { .evaluation_context .clone() .unwrap_or_default(); + let start = js_sys::Date::now(); match state.get_resolver::( &resolver_request.client_secret, evaluation_context, @@ -193,23 +222,56 @@ pub async fn main(req: Request, env: Env, ctx: Context) -> Result { ); match resolver.resolve_flags(process_request) { Ok(process_response) => { + let elapsed_us = ((js_sys::Date::now() - start) * 1000.0) as u32; match process_response.into_resolved() { Some((response, _writes)) => { + let reasons: Vec = response + .resolved_flags + .iter() + .map(|f| f.reason()) + .collect(); + PENDING_METRICS.with(|m| { + m.borrow_mut().push(ResolveMetrics { elapsed_us, reasons }); + }); Response::from_json(&response)? .with_cors_headers(&allowed_origin) } - None => Response::error( - "Unexpected suspended response", - 500, - )? - .with_cors_headers(&allowed_origin), + None => { + PENDING_METRICS.with(|m| { + m.borrow_mut().push(ResolveMetrics { + elapsed_us, + reasons: vec![ResolveReason::Error], + }); + }); + Response::error( + "Unexpected suspended response", + 500, + )? + .with_cors_headers(&allowed_origin) + } } } - Err(msg) => Response::error(msg, 500)? - .with_cors_headers(&allowed_origin), + Err(msg) => { + let elapsed_us = ((js_sys::Date::now() - start) * 1000.0) as u32; + PENDING_METRICS.with(|m| { + m.borrow_mut().push(ResolveMetrics { + elapsed_us, + reasons: vec![ResolveReason::Error], + }); + }); + Response::error(msg, 500)? + .with_cors_headers(&allowed_origin) + } } } Err(msg) => { + let elapsed_us = ((js_sys::Date::now() - start) * 1000.0) as u32; + PENDING_METRICS.with(|m| { + m.borrow_mut().push(ResolveMetrics { + elapsed_us, + reasons: vec![ResolveReason::Error], + }); + }); Response::error(msg, 500)?.with_cors_headers(&allowed_origin) } } @@ -250,8 +312,18 @@ pub async fn main(req: Request, env: Env, ctx: Context) -> Result { .run(req, env) .await; - // Use ctx.waitUntil to run logging after response is returned + // Use ctx.waitUntil to run logging and telemetry after response is returned ctx.wait_until(async move { + // Record pending resolve metrics into the telemetry counters + PENDING_METRICS.with(|m| { + for metrics in m.borrow_mut().drain(..) { + TELEMETRY.record_latency_us(metrics.elapsed_us); + for reason in metrics.reasons { + TELEMETRY.mark_resolve(reason); + } + } + }); + let aggregated: confidence_resolver::proto::confidence::flags::resolver::v1::WriteFlagLogsRequest = checkpoint(); if let Ok(converted) = serde_json::to_string(&aggregated) { @@ -284,6 +356,12 @@ pub async fn consume_flag_logs_queue( client_resolve_info: v.client_resolve_info, }) .collect(); + + // Accumulate telemetry deltas into KV-backed cumulative snapshot for /metrics + if let Ok(kv) = env.kv("METRICS_KV") { + let _ = update_prometheus_kv(&kv, &logs).await; + } + let req = flag_logger::aggregate_batch(logs); send_flags_logs(CONFIDENCE_CLIENT_SECRET.get().unwrap().as_str(), req).await?; } @@ -293,10 +371,41 @@ pub async fn consume_flag_logs_queue( fn checkpoint() -> WriteFlagLogsRequest { let mut req = RESOLVE_LOGGER.checkpoint(); + req.telemetry_data = Some(TELEMETRY.delta_snapshot(&LAST_FLUSHED)); ASSIGN_LOGGER.checkpoint_fill(&mut req); req } +/// Accumulate telemetry deltas from all isolates into a cumulative +/// `TelemetrySnapshot` stored in KV, then write its Prometheus text +/// representation for the /metrics endpoint. +async fn update_prometheus_kv(kv: &kv::KvStore, logs: &[WriteFlagLogsRequest]) { + let mut cumulative = match kv.get("snapshot").text().await { + Ok(Some(text)) => serde_json::from_str::(&text).unwrap_or_default(), + _ => TelemetrySnapshot::default(), + }; + + for log in logs { + if let Some(td) = &log.telemetry_data { + cumulative.accumulate_delta(td); + } + } + + let prom_text = cumulative.to_prometheus( + "cf-resolver", + &confidence_resolver::telemetry::PrometheusConfig::default(), + ); + + let _ = kv + .put("snapshot", serde_json::to_string(&cumulative).unwrap_or_default()) + .map(|b| b.execute()) + .ok(); + let _ = kv + .put("prometheus", prom_text) + .map(|b| b.execute()) + .ok(); +} + async fn send_flags_logs(client_secret: &str, message: WriteFlagLogsRequest) -> Result { let resolve_url = "https://resolver.confidence.dev/v1/clientFlagLogs:write"; let mut init = RequestInit::new(); diff --git a/confidence-cloudflare-resolver/wrangler.toml b/confidence-cloudflare-resolver/wrangler.toml index 47660cce..30bd64ac 100644 --- a/confidence-cloudflare-resolver/wrangler.toml +++ b/confidence-cloudflare-resolver/wrangler.toml @@ -14,5 +14,8 @@ max_batch_timeout = 10 # seconds queue = "flag-logs-queue" binding = "flag_logs_queue" +# KV namespace for /metrics endpoint is created and injected by the deployer. +# See deployer/script.sh for auto-creation of the METRICS_KV binding. + [vars] CONFIDENCE_CLIENT_SECRET = "SECRET" diff --git a/confidence-resolver/src/telemetry.rs b/confidence-resolver/src/telemetry.rs index 4d9e658c..6db00599 100644 --- a/confidence-resolver/src/telemetry.rs +++ b/confidence-resolver/src/telemetry.rs @@ -106,6 +106,7 @@ impl Histogram { /// Used for delta computation between flushes and as the future intermediate /// representation for Prometheus text format serialization. #[derive(Clone, Default)] +#[cfg_attr(feature = "json", derive(serde::Serialize, serde::Deserialize))] pub struct TelemetrySnapshot { pub latency: HistogramSnapshot, pub resolve_rates: Vec, @@ -113,6 +114,7 @@ pub struct TelemetrySnapshot { } #[derive(Clone, Default)] +#[cfg_attr(feature = "json", derive(serde::Serialize, serde::Deserialize))] pub struct HistogramSnapshot { pub sum: u64, pub count: u64, @@ -142,6 +144,41 @@ impl Default for PrometheusConfig { } impl TelemetrySnapshot { + /// Accumulate a `TelemetryData` delta into this cumulative snapshot. + /// + /// Expands compressed `BucketSpan`s back into the flat bucket array and + /// adds all counters. Gauge fields (memory_bytes) are replaced with the + /// latest value. + pub fn accumulate_delta(&mut self, td: &pb::TelemetryData) { + if let Some(latency) = &td.resolve_latency { + self.latency.sum = self.latency.sum.wrapping_add(latency.sum as u64); + self.latency.count = self.latency.count.wrapping_add(latency.count as u64); + + // Expand BucketSpans into flat bucket array + for span in &latency.buckets { + for (i, &count) in span.counts.iter().enumerate() { + let idx = span.offset as usize + i; + if idx >= self.latency.buckets.len() { + self.latency.buckets.resize(idx + 1, 0); + } + self.latency.buckets[idx] = self.latency.buckets[idx].wrapping_add(count as u64); + } + } + } + + for rate in &td.resolve_rate { + let idx = rate.reason as usize; + if idx >= self.resolve_rates.len() { + self.resolve_rates.resize(idx + 1, 0); + } + self.resolve_rates[idx] = self.resolve_rates[idx].wrapping_add(rate.count as u64); + } + + if td.memory_bytes > 0 { + self.memory_bytes = td.memory_bytes; + } + } + /// Format the snapshot as Prometheus exposition text. /// /// All values are cumulative counters, matching what Prometheus expects. From d513ce517f9f84079c2ded549b64dac2e35b0342 Mon Sep 17 00:00:00 2001 From: vahidlazio Date: Thu, 7 May 2026 11:19:46 +0200 Subject: [PATCH 2/9] fix: await KV puts, guard malformed BucketSpan offsets, add tests - Await KV put operations in update_prometheus_kv (were fire-and-forget) - Guard against negative/oversized BucketSpan offsets in accumulate_delta - Add race condition comment on KV read-modify-write - Add CORS headers to /metrics endpoint for consistency - Add unit tests for accumulate_delta: basic, negative offset, oversized Co-Authored-By: Claude Opus 4.6 --- confidence-cloudflare-resolver/src/lib.rs | 20 +++-- confidence-resolver/src/telemetry.rs | 102 +++++++++++++++++++++- 2 files changed, 112 insertions(+), 10 deletions(-) diff --git a/confidence-cloudflare-resolver/src/lib.rs b/confidence-cloudflare-resolver/src/lib.rs index 3abf27f7..3da5fa70 100644 --- a/confidence-cloudflare-resolver/src/lib.rs +++ b/confidence-cloudflare-resolver/src/lib.rs @@ -159,6 +159,7 @@ pub async fn main(req: Request, env: Env, ctx: Context) -> Result { let response = router .get_async("/metrics", |_req, ctx| { + let allowed_origin = allowed_origin_env.clone(); async move { let text = match ctx.env.kv("METRICS_KV") { Ok(kv) => kv.get("prometheus").text().await.unwrap_or(None), @@ -167,7 +168,7 @@ pub async fn main(req: Request, env: Env, ctx: Context) -> Result { let body = text.unwrap_or_default(); let headers = Headers::new(); headers.set("Content-Type", "text/plain; version=0.0.4; charset=utf-8")?; - Ok(Response::ok(body)?.with_headers(headers)) + Response::ok(body)?.with_headers(headers).with_cors_headers(&allowed_origin) } }) // GET endpoint to expose the current deployment state etag and resolver version @@ -379,6 +380,9 @@ fn checkpoint() -> WriteFlagLogsRequest { /// Accumulate telemetry deltas from all isolates into a cumulative /// `TelemetrySnapshot` stored in KV, then write its Prometheus text /// representation for the /metrics endpoint. +/// +/// Note: concurrent queue consumer invocations can race on KV read-modify-write. +/// Acceptable for metrics — at worst one batch's deltas are lost, not cumulative state. async fn update_prometheus_kv(kv: &kv::KvStore, logs: &[WriteFlagLogsRequest]) { let mut cumulative = match kv.get("snapshot").text().await { Ok(Some(text)) => serde_json::from_str::(&text).unwrap_or_default(), @@ -396,14 +400,12 @@ async fn update_prometheus_kv(kv: &kv::KvStore, logs: &[WriteFlagLogsRequest]) { &confidence_resolver::telemetry::PrometheusConfig::default(), ); - let _ = kv - .put("snapshot", serde_json::to_string(&cumulative).unwrap_or_default()) - .map(|b| b.execute()) - .ok(); - let _ = kv - .put("prometheus", prom_text) - .map(|b| b.execute()) - .ok(); + if let Ok(builder) = kv.put("snapshot", serde_json::to_string(&cumulative).unwrap_or_default()) { + let _ = builder.execute().await; + } + if let Ok(builder) = kv.put("prometheus", prom_text) { + let _ = builder.execute().await; + } } async fn send_flags_logs(client_secret: &str, message: WriteFlagLogsRequest) -> Result { diff --git a/confidence-resolver/src/telemetry.rs b/confidence-resolver/src/telemetry.rs index 6db00599..93d8e9fe 100644 --- a/confidence-resolver/src/telemetry.rs +++ b/confidence-resolver/src/telemetry.rs @@ -156,8 +156,15 @@ impl TelemetrySnapshot { // Expand BucketSpans into flat bucket array for span in &latency.buckets { + let base = match usize::try_from(span.offset) { + Ok(b) if b < BUCKET_COUNT => b, + _ => continue, // skip malformed span + }; for (i, &count) in span.counts.iter().enumerate() { - let idx = span.offset as usize + i; + let idx = base.saturating_add(i); + if idx >= BUCKET_COUNT { + break; + } if idx >= self.latency.buckets.len() { self.latency.buckets.resize(idx + 1, 0); } @@ -976,4 +983,97 @@ mod tests { ); assert_eq!(default_out, zero_out); } + + #[test] + fn accumulate_delta_basic() { + let mut snap = TelemetrySnapshot::default(); + let td = pb::TelemetryData { + resolve_latency: Some(pb::ResolveLatency { + sum: 500, + count: 2, + buckets: vec![pb::BucketSpan { + offset: 5, + counts: vec![1, 1], + }], + ln_ratio: LN_RATIO, + }), + resolve_rate: vec![pb::ResolveRate { + reason: ResolveReason::Match as i32, + count: 3, + }], + memory_bytes: 4096, + ..Default::default() + }; + + snap.accumulate_delta(&td); + assert_eq!(snap.latency.sum, 500); + assert_eq!(snap.latency.count, 2); + assert_eq!(snap.latency.buckets[5], 1); + assert_eq!(snap.latency.buckets[6], 1); + assert_eq!(snap.resolve_rates[ResolveReason::Match as usize], 3); + assert_eq!(snap.memory_bytes, 4096); + + // Second delta accumulates counters, replaces gauge + snap.accumulate_delta(&td); + assert_eq!(snap.latency.sum, 1000); + assert_eq!(snap.latency.count, 4); + assert_eq!(snap.latency.buckets[5], 2); + assert_eq!(snap.resolve_rates[ResolveReason::Match as usize], 6); + assert_eq!(snap.memory_bytes, 4096); + } + + #[test] + fn accumulate_delta_negative_offset_skipped() { + let mut snap = TelemetrySnapshot::default(); + let td = pb::TelemetryData { + resolve_latency: Some(pb::ResolveLatency { + sum: 100, + count: 1, + buckets: vec![ + pb::BucketSpan { + offset: -1, + counts: vec![99], + }, + pb::BucketSpan { + offset: 3, + counts: vec![1], + }, + ], + ln_ratio: LN_RATIO, + }), + ..Default::default() + }; + + snap.accumulate_delta(&td); + // Negative offset span skipped, valid span applied + assert_eq!(snap.latency.count, 1); + assert_eq!(snap.latency.buckets.get(3).copied().unwrap_or(0), 1); + // Bucket from negative offset should not exist + let total: u64 = snap.latency.buckets.iter().sum(); + assert_eq!(total, 1); + } + + #[test] + fn accumulate_delta_oversized_offset_skipped() { + let mut snap = TelemetrySnapshot::default(); + let td = pb::TelemetryData { + resolve_latency: Some(pb::ResolveLatency { + sum: 100, + count: 1, + buckets: vec![pb::BucketSpan { + offset: BUCKET_COUNT as i32 + 10, + counts: vec![1], + }], + ln_ratio: LN_RATIO, + }), + ..Default::default() + }; + + snap.accumulate_delta(&td); + // Oversized offset span skipped, sum/count still accumulated + assert_eq!(snap.latency.count, 1); + assert_eq!(snap.latency.sum, 100); + let total: u64 = snap.latency.buckets.iter().sum(); + assert_eq!(total, 0); + } } From 88bfac781609431851f74c77fa524cbe49af5e24 Mon Sep 17 00:00:00 2001 From: vahidlazio Date: Thu, 7 May 2026 11:36:57 +0200 Subject: [PATCH 3/9] chore: sync WASM module for Go provider Co-Authored-By: Claude Opus 4.6 --- .../assets/confidence_resolver.wasm | Bin 483141 -> 483141 bytes 1 file changed, 0 insertions(+), 0 deletions(-) diff --git a/openfeature-provider/go/confidence/internal/local_resolver/assets/confidence_resolver.wasm b/openfeature-provider/go/confidence/internal/local_resolver/assets/confidence_resolver.wasm index 20fa6527047600d0fd88674f4d38d43134f61de8..e09bdb0d2a474e83aa4722c29ec502c78c5c502b 100755 GIT binary patch delta 33 pcmX@QPxk0O*@hOz7N!>F7M2#)Eo^fn8SC2TO0sRAE6KjX6#(1w3-$m2 delta 33 pcmX@QPxk0O*@hOz7N!>F7M2#)Eo^fn8H3vAO0sRAE6KjX6#&`~3(5ch From d62457b1544fddfbdad0727d640e1bf24234a84b Mon Sep 17 00:00:00 2001 From: vahidlazio Date: Thu, 7 May 2026 11:44:07 +0200 Subject: [PATCH 4/9] fix: satisfy strict clippy lints in accumulate_delta Allow indexing_slicing and arithmetic_side_effects on the method since bounds are checked before every index. Use saturating_add for resize. Re-sync Go WASM module. Co-Authored-By: Claude Opus 4.6 --- confidence-resolver/src/telemetry.rs | 7 +++++-- .../assets/confidence_resolver.wasm | Bin 483141 -> 483141 bytes 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/confidence-resolver/src/telemetry.rs b/confidence-resolver/src/telemetry.rs index 93d8e9fe..756fcd80 100644 --- a/confidence-resolver/src/telemetry.rs +++ b/confidence-resolver/src/telemetry.rs @@ -149,6 +149,7 @@ impl TelemetrySnapshot { /// Expands compressed `BucketSpan`s back into the flat bucket array and /// adds all counters. Gauge fields (memory_bytes) are replaced with the /// latest value. + #[allow(clippy::indexing_slicing, clippy::arithmetic_side_effects)] pub fn accumulate_delta(&mut self, td: &pb::TelemetryData) { if let Some(latency) = &td.resolve_latency { self.latency.sum = self.latency.sum.wrapping_add(latency.sum as u64); @@ -166,8 +167,9 @@ impl TelemetrySnapshot { break; } if idx >= self.latency.buckets.len() { - self.latency.buckets.resize(idx + 1, 0); + self.latency.buckets.resize(idx.saturating_add(1), 0); } + // Safety: idx < BUCKET_COUNT and we just resized to at least idx+1 self.latency.buckets[idx] = self.latency.buckets[idx].wrapping_add(count as u64); } } @@ -176,8 +178,9 @@ impl TelemetrySnapshot { for rate in &td.resolve_rate { let idx = rate.reason as usize; if idx >= self.resolve_rates.len() { - self.resolve_rates.resize(idx + 1, 0); + self.resolve_rates.resize(idx.saturating_add(1), 0); } + // Safety: we just resized to at least idx+1 self.resolve_rates[idx] = self.resolve_rates[idx].wrapping_add(rate.count as u64); } diff --git a/openfeature-provider/go/confidence/internal/local_resolver/assets/confidence_resolver.wasm b/openfeature-provider/go/confidence/internal/local_resolver/assets/confidence_resolver.wasm index e09bdb0d2a474e83aa4722c29ec502c78c5c502b..68f5d3ea48f3cc653f77d6c451966ea7af86d37e 100755 GIT binary patch delta 33 pcmX@QPxk0O*@hOz7N!>F7M2#)Eo^fn85`T@O0sRAE6KjX6#(263;6&5 delta 33 pcmX@QPxk0O*@hOz7N!>F7M2#)Eo^fn8SC2TO0sRAE6KjX6#(1w3-$m2 From 39cb102a3d9b836cdf5dae01e9736fdb17858a29 Mon Sep 17 00:00:00 2001 From: vahidlazio Date: Thu, 7 May 2026 13:30:11 +0200 Subject: [PATCH 5/9] style: fix rustfmt line length in accumulate_delta Co-Authored-By: Claude Opus 4.6 --- confidence-resolver/src/telemetry.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/confidence-resolver/src/telemetry.rs b/confidence-resolver/src/telemetry.rs index 756fcd80..b74e9e85 100644 --- a/confidence-resolver/src/telemetry.rs +++ b/confidence-resolver/src/telemetry.rs @@ -170,7 +170,8 @@ impl TelemetrySnapshot { self.latency.buckets.resize(idx.saturating_add(1), 0); } // Safety: idx < BUCKET_COUNT and we just resized to at least idx+1 - self.latency.buckets[idx] = self.latency.buckets[idx].wrapping_add(count as u64); + self.latency.buckets[idx] = + self.latency.buckets[idx].wrapping_add(count as u64); } } } From 404ed3fd4f0499d758ccccdfea7bfd543bb4e5af Mon Sep 17 00:00:00 2001 From: vahidlazio Date: Thu, 7 May 2026 13:37:02 +0200 Subject: [PATCH 6/9] chore: re-sync WASM module for Go provider after fmt fix Co-Authored-By: Claude Opus 4.6 --- .../assets/confidence_resolver.wasm | Bin 483141 -> 483141 bytes 1 file changed, 0 insertions(+), 0 deletions(-) diff --git a/openfeature-provider/go/confidence/internal/local_resolver/assets/confidence_resolver.wasm b/openfeature-provider/go/confidence/internal/local_resolver/assets/confidence_resolver.wasm index 68f5d3ea48f3cc653f77d6c451966ea7af86d37e..02ed19fd941693fc9eb710dfbf74c8075e78d918 100755 GIT binary patch delta 33 pcmX@QPxk0O*@hOz7N!>F7M2#)Eo^fn8JpVYO0sRAE6KjX6#(2H3;F;6 delta 33 pcmX@QPxk0O*@hOz7N!>F7M2#)Eo^fn85`T@O0sRAE6KjX6#(263;6&5 From 2a52bfe7deb2703fb9b23045b829be8a31566dcc Mon Sep 17 00:00:00 2001 From: vahidlazio Date: Thu, 7 May 2026 13:54:23 +0200 Subject: [PATCH 7/9] fix: aggregate telemetry deltas in flag_logger::aggregate_batch Previously aggregate_batch discarded all telemetry data except the SDK field from the first message. Now it merges latency histograms, resolve rate counters, and gauge fields across all messages in the batch, so the Confidence backend receives aggregated telemetry matching what the WASM providers send. Co-Authored-By: Claude Opus 4.6 --- confidence-resolver/src/flag_logger.rs | 68 ++++++++++++++++++++++++-- 1 file changed, 64 insertions(+), 4 deletions(-) diff --git a/confidence-resolver/src/flag_logger.rs b/confidence-resolver/src/flag_logger.rs index 307653f9..04acfd5a 100644 --- a/confidence-resolver/src/flag_logger.rs +++ b/confidence-resolver/src/flag_logger.rs @@ -4,6 +4,7 @@ use crate::proto::confidence::flags::admin::v1::flag_resolve_info::{ }; use crate::proto::confidence::flags::admin::v1::{ClientResolveInfo, FlagResolveInfo}; use crate::proto::confidence::flags::resolver::v1::events::FlagAssigned; +use crate::proto::confidence::flags::resolver::v1::telemetry_data::ResolveRate; use crate::proto::confidence::flags::resolver::v1::{TelemetryData, WriteFlagLogsRequest}; use std::collections::{HashMap, HashSet}; @@ -14,12 +15,14 @@ pub fn aggregate_batch(message_batch: Vec) -> WriteFlagLog let mut flag_resolve_map: HashMap = HashMap::new(); let mut flag_assigned: Vec = vec![]; let mut first_sdk: Option = None; + let mut agg_telemetry: Option = None; for flag_logs_message in message_batch { if let Some(td) = &flag_logs_message.telemetry_data { if first_sdk.is_none() && td.sdk.is_some() { first_sdk = td.sdk.clone(); } + agg_telemetry = Some(merge_telemetry(agg_telemetry.take(), td)); } for c in &flag_logs_message.client_resolve_info { @@ -98,10 +101,20 @@ pub fn aggregate_batch(message_batch: Vec) -> WriteFlagLog }) } - let telemetry_data = first_sdk.map(|sdk| TelemetryData { - sdk: Some(sdk), - ..Default::default() - }); + // Attach SDK info to the aggregated telemetry + let telemetry_data = match (agg_telemetry, first_sdk) { + (Some(mut td), sdk) => { + if td.sdk.is_none() { + td.sdk = sdk; + } + Some(td) + } + (None, Some(sdk)) => Some(TelemetryData { + sdk: Some(sdk), + ..Default::default() + }), + (None, None) => None, + }; WriteFlagLogsRequest { telemetry_data, @@ -111,6 +124,53 @@ pub fn aggregate_batch(message_batch: Vec) -> WriteFlagLog } } +/// Merge a telemetry delta into an accumulator. +/// Both are deltas, so counters are summed and gauges take the latest non-zero value. +fn merge_telemetry(acc: Option, delta: &TelemetryData) -> TelemetryData { + let mut acc = acc.unwrap_or_default(); + + // Merge resolve latency + match (&mut acc.resolve_latency, &delta.resolve_latency) { + (Some(a), Some(d)) => { + a.sum = a.sum.wrapping_add(d.sum); + a.count = a.count.wrapping_add(d.count); + a.buckets.extend(d.buckets.iter().cloned()); + if a.ln_ratio == 0.0 { + a.ln_ratio = d.ln_ratio; + } + } + (None, Some(d)) => { + acc.resolve_latency = Some(d.clone()); + } + _ => {} + } + + // Merge resolve rates by reason + for dr in &delta.resolve_rate { + if let Some(ar) = acc.resolve_rate.iter_mut().find(|r| r.reason == dr.reason) { + ar.count = ar.count.wrapping_add(dr.count); + } else { + acc.resolve_rate.push(ResolveRate { + count: dr.count, + reason: dr.reason, + }); + } + } + + // Gauges: take latest non-zero + if let Some(sa) = &delta.state_age { + acc.state_age = Some(sa.clone()); + } + if delta.memory_bytes > 0 { + acc.memory_bytes = delta.memory_bytes; + } + if !delta.resolver_version.is_empty() { + acc.resolver_version = delta.resolver_version.clone(); + } + + acc +} + struct SchemaItem { pub client: String, pub schemas: HashSet, From 86094d631133a1b9c87d3177a8ace47b6c2587b8 Mon Sep 17 00:00:00 2001 From: vahidlazio Date: Fri, 8 May 2026 16:54:42 +0200 Subject: [PATCH 8/9] fix(cloudflare): address PR review feedback - Use performance.now() instead of Date.now() for better timing resolution - Rename METRICS_KV binding to CONFIDENCE_METRICS_KV - Fallback to Date.now() if performance API is unavailable Co-Authored-By: Claude Opus 4.6 (1M context) --- .../deployer/script.sh | 4 ++-- confidence-cloudflare-resolver/src/lib.rs | 22 ++++++++++++++----- confidence-cloudflare-resolver/wrangler.toml | 2 +- 3 files changed, 19 insertions(+), 9 deletions(-) diff --git a/confidence-cloudflare-resolver/deployer/script.sh b/confidence-cloudflare-resolver/deployer/script.sh index 5e75caee..e5d9aaa0 100755 --- a/confidence-cloudflare-resolver/deployer/script.sh +++ b/confidence-cloudflare-resolver/deployer/script.sh @@ -409,10 +409,10 @@ if [ -n "$KV_NAMESPACE_ID" ]; then cat >> wrangler.toml < f64 { + js_sys::Reflect::get(&js_sys::global(), &"performance".into()) + .ok() + .and_then(|p| js_sys::Reflect::get(&p, &"now".into()).ok()) + .and_then(|f| js_sys::Function::from(f).call0(&js_sys::global()).ok()) + .and_then(|v| v.as_f64()) + .unwrap_or_else(|| js_sys::Date::now()) +} + /// Per-request resolve metrics captured in the hot path, recorded in wait_until. struct ResolveMetrics { elapsed_us: u32, @@ -161,7 +171,7 @@ pub async fn main(req: Request, env: Env, ctx: Context) -> Result { .get_async("/metrics", |_req, ctx| { let allowed_origin = allowed_origin_env.clone(); async move { - let text = match ctx.env.kv("METRICS_KV") { + let text = match ctx.env.kv("CONFIDENCE_METRICS_KV") { Ok(kv) => kv.get("prometheus").text().await.unwrap_or(None), Err(_) => None, }; @@ -210,7 +220,7 @@ pub async fn main(req: Request, env: Env, ctx: Context) -> Result { .evaluation_context .clone() .unwrap_or_default(); - let start = js_sys::Date::now(); + let start = performance_now(); match state.get_resolver::( &resolver_request.client_secret, evaluation_context, @@ -223,7 +233,7 @@ pub async fn main(req: Request, env: Env, ctx: Context) -> Result { ); match resolver.resolve_flags(process_request) { Ok(process_response) => { - let elapsed_us = ((js_sys::Date::now() - start) * 1000.0) as u32; + let elapsed_us = ((performance_now() - start) * 1000.0) as u32; match process_response.into_resolved() { Some((response, _writes)) => { let reasons: Vec = response @@ -253,7 +263,7 @@ pub async fn main(req: Request, env: Env, ctx: Context) -> Result { } } Err(msg) => { - let elapsed_us = ((js_sys::Date::now() - start) * 1000.0) as u32; + let elapsed_us = ((performance_now() - start) * 1000.0) as u32; PENDING_METRICS.with(|m| { m.borrow_mut().push(ResolveMetrics { elapsed_us, @@ -266,7 +276,7 @@ pub async fn main(req: Request, env: Env, ctx: Context) -> Result { } } Err(msg) => { - let elapsed_us = ((js_sys::Date::now() - start) * 1000.0) as u32; + let elapsed_us = ((performance_now() - start) * 1000.0) as u32; PENDING_METRICS.with(|m| { m.borrow_mut().push(ResolveMetrics { elapsed_us, @@ -359,7 +369,7 @@ pub async fn consume_flag_logs_queue( .collect(); // Accumulate telemetry deltas into KV-backed cumulative snapshot for /metrics - if let Ok(kv) = env.kv("METRICS_KV") { + if let Ok(kv) = env.kv("CONFIDENCE_METRICS_KV") { let _ = update_prometheus_kv(&kv, &logs).await; } diff --git a/confidence-cloudflare-resolver/wrangler.toml b/confidence-cloudflare-resolver/wrangler.toml index 30bd64ac..76bbdcaa 100644 --- a/confidence-cloudflare-resolver/wrangler.toml +++ b/confidence-cloudflare-resolver/wrangler.toml @@ -15,7 +15,7 @@ queue = "flag-logs-queue" binding = "flag_logs_queue" # KV namespace for /metrics endpoint is created and injected by the deployer. -# See deployer/script.sh for auto-creation of the METRICS_KV binding. +# See deployer/script.sh for auto-creation of the CONFIDENCE_METRICS_KV binding. [vars] CONFIDENCE_CLIENT_SECRET = "SECRET" From 157ae247358173312b8aeddffb1da181be5a100a Mon Sep 17 00:00:00 2001 From: vahidlazio Date: Fri, 8 May 2026 17:08:26 +0200 Subject: [PATCH 9/9] fix: resolve clippy redundant closure warning --- confidence-cloudflare-resolver/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/confidence-cloudflare-resolver/src/lib.rs b/confidence-cloudflare-resolver/src/lib.rs index 80b28be8..e0597598 100644 --- a/confidence-cloudflare-resolver/src/lib.rs +++ b/confidence-cloudflare-resolver/src/lib.rs @@ -35,7 +35,7 @@ fn performance_now() -> f64 { .and_then(|p| js_sys::Reflect::get(&p, &"now".into()).ok()) .and_then(|f| js_sys::Function::from(f).call0(&js_sys::global()).ok()) .and_then(|v| v.as_f64()) - .unwrap_or_else(|| js_sys::Date::now()) + .unwrap_or_else(js_sys::Date::now) } /// Per-request resolve metrics captured in the hot path, recorded in wait_until.