Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions confidence-cloudflare-resolver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,7 @@ worker = { version= "0.6.1", features=['queue'] }
base64 = "0.22.1"
once_cell = "1.19"
prost = "0.13"
arc-swap = "1"
js-sys = "0.3"
serde = { version = "1.0.219" }
serde_json = "1.0.85"
49 changes: 49 additions & 0 deletions confidence-cloudflare-resolver/deployer/script.sh
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,55 @@ else
echo "⚠️ Could not check queue status (HTTP $QUEUE_STATUS)"
fi

# Create KV namespace for /metrics endpoint if it doesn't exist
if [ -n "$WORKER_NAME_PREFIX" ]; then
KV_NAMESPACE_TITLE="${WORKER_NAME_PREFIX}-resolver-metrics"
else
KV_NAMESPACE_TITLE="resolver-metrics"
fi

echo "🔍 Checking if KV namespace '$KV_NAMESPACE_TITLE' exists..."
KV_LIST=$(curl -sS -w "%{http_code}" \
-H "Authorization: Bearer ${CLOUDFLARE_API_TOKEN}" \
"https://api.cloudflare.com/client/v4/accounts/${CLOUDFLARE_ACCOUNT_ID}/storage/kv/namespaces?per_page=100")
KV_LIST_STATUS="${KV_LIST: -3}"
KV_LIST_BODY="${KV_LIST%???}"

KV_NAMESPACE_ID=""
if [ "$KV_LIST_STATUS" = "200" ]; then
KV_NAMESPACE_ID=$(printf "%s" "$KV_LIST_BODY" | jq -r ".result[] | select(.title == \"${KV_NAMESPACE_TITLE}\") | .id" 2>/dev/null || true)
fi

if [ -z "$KV_NAMESPACE_ID" ]; then
echo "📦 KV namespace '$KV_NAMESPACE_TITLE' not found, creating..."
KV_CREATE_RESP=$(curl -sS -w "%{http_code}" -X POST \
-H "Authorization: Bearer ${CLOUDFLARE_API_TOKEN}" \
-H "Content-Type: application/json" \
-d "{\"title\": \"${KV_NAMESPACE_TITLE}\"}" \
"https://api.cloudflare.com/client/v4/accounts/${CLOUDFLARE_ACCOUNT_ID}/storage/kv/namespaces")
KV_CREATE_STATUS="${KV_CREATE_RESP: -3}"
KV_CREATE_BODY="${KV_CREATE_RESP%???}"
if [ "$KV_CREATE_STATUS" = "200" ] || [ "$KV_CREATE_STATUS" = "201" ]; then
KV_NAMESPACE_ID=$(printf "%s" "$KV_CREATE_BODY" | jq -r '.result.id')
echo "✅ KV namespace '$KV_NAMESPACE_TITLE' created (id: $KV_NAMESPACE_ID)"
else
echo "⚠️ Failed to create KV namespace (HTTP $KV_CREATE_STATUS), /metrics will be unavailable"
fi
else
echo "✅ KV namespace '$KV_NAMESPACE_TITLE' already exists (id: $KV_NAMESPACE_ID)"
fi

# Append KV binding to wrangler.toml if namespace was created
if [ -n "$KV_NAMESPACE_ID" ]; then
cat >> wrangler.toml <<EOF

[[kv_namespaces]]
binding = "CONFIDENCE_METRICS_KV"
id = "$KV_NAMESPACE_ID"
EOF
echo "✅ Added CONFIDENCE_METRICS_KV binding to wrangler.toml"
fi

# Update worker name and queue name in wrangler.toml if using prefix
if [ -n "$WORKER_NAME_PREFIX" ]; then
sed -i.tmp "s/^name = .*/name = \"$WORKER_NAME\"/" wrangler.toml
Expand Down
139 changes: 130 additions & 9 deletions confidence-cloudflare-resolver/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ use confidence_resolver::{
assign_logger::AssignLogger,
flag_logger,
proto::{confidence, google::Struct},
telemetry::{Telemetry, TelemetrySnapshot},
FlagToApply, Host, ResolvedValue, ResolverState,
};
use worker::*;

use arc_swap::ArcSwap;
use base64::engine::general_purpose::STANDARD;
use base64::Engine;
use bytes::Bytes;
Expand All @@ -14,13 +16,37 @@ use serde_json::from_slice;
use serde_json::json;

use confidence::flags::resolver::v1::{ApplyFlagsRequest, ApplyFlagsResponse, ResolveFlagsRequest};
use confidence_resolver::proto::confidence::flags::resolver::v1::ResolveProcessRequest;
use confidence_resolver::proto::confidence::flags::resolver::v1::{ResolveProcessRequest, ResolveReason};

static RESOLVE_LOGGER: LazyLock<ResolveLogger<H>> = LazyLock::new(ResolveLogger::new);
static ASSIGN_LOGGER: LazyLock<AssignLogger> = LazyLock::new(AssignLogger::new);
static TELEMETRY: LazyLock<Telemetry> = LazyLock::new(Telemetry::new);
static LAST_FLUSHED: LazyLock<ArcSwap<TelemetrySnapshot>> =
LazyLock::new(|| ArcSwap::from_pointee(TelemetrySnapshot::default()));

use confidence_resolver::Client;
use once_cell::sync::Lazy;
use std::cell::RefCell;

/// High-resolution timestamp in milliseconds via `performance.now()`.
fn performance_now() -> f64 {
js_sys::Reflect::get(&js_sys::global(), &"performance".into())
.ok()
.and_then(|p| js_sys::Reflect::get(&p, &"now".into()).ok())
.and_then(|f| js_sys::Function::from(f).call0(&js_sys::global()).ok())
.and_then(|v| v.as_f64())
.unwrap_or_else(js_sys::Date::now)
}

/// Per-request resolve metrics captured in the hot path, recorded in wait_until.
struct ResolveMetrics {
elapsed_us: u32,
reasons: Vec<ResolveReason>,
}

thread_local! {
static PENDING_METRICS: RefCell<Vec<ResolveMetrics>> = const { RefCell::new(Vec::new()) };
}

/// SetResolverStateRequest message from the CDN.
/// This matches the protobuf message format returned by the CDN.
Expand Down Expand Up @@ -142,6 +168,19 @@ pub async fn main(req: Request, env: Env, ctx: Context) -> Result<Response> {
let router = Router::new();

let response = router
.get_async("/metrics", |_req, ctx| {
Copy link
Copy Markdown
Contributor

@andreas-karlsson andreas-karlsson May 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use this parser to check that we can parse the output of this.

Edit: No need to do this since it's the same serializer that we test in go.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also we could consider supporting the same options on this endpoint like we do in local:

But that could also wait for a followup PR. Maybe even better.

let allowed_origin = allowed_origin_env.clone();
async move {
let text = match ctx.env.kv("CONFIDENCE_METRICS_KV") {
Ok(kv) => kv.get("prometheus").text().await.unwrap_or(None),
Err(_) => None,
};
let body = text.unwrap_or_default();
let headers = Headers::new();
headers.set("Content-Type", "text/plain; version=0.0.4; charset=utf-8")?;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there might be some special Content-Type we should use for Prometheus.

Response::ok(body)?.with_headers(headers).with_cors_headers(&allowed_origin)
}
})
// GET endpoint to expose the current deployment state etag and resolver version
.get_async("/v1/state:etag", |_req, _ctx| {
let allowed_origin = allowed_origin_env.clone();
Expand Down Expand Up @@ -181,6 +220,7 @@ pub async fn main(req: Request, env: Env, ctx: Context) -> Result<Response> {
.evaluation_context
.clone()
.unwrap_or_default();
let start = performance_now();
match state.get_resolver::<H>(
&resolver_request.client_secret,
evaluation_context,
Expand All @@ -193,23 +233,56 @@ pub async fn main(req: Request, env: Env, ctx: Context) -> Result<Response> {
);
match resolver.resolve_flags(process_request) {
Ok(process_response) => {
let elapsed_us = ((performance_now() - start) * 1000.0) as u32;
match process_response.into_resolved() {
Some((response, _writes)) => {
let reasons: Vec<ResolveReason> = response
.resolved_flags
.iter()
.map(|f| f.reason())
.collect();
PENDING_METRICS.with(|m| {
m.borrow_mut().push(ResolveMetrics { elapsed_us, reasons });
});
Response::from_json(&response)?
.with_cors_headers(&allowed_origin)
}
None => Response::error(
"Unexpected suspended response",
500,
)?
.with_cors_headers(&allowed_origin),
None => {
PENDING_METRICS.with(|m| {
m.borrow_mut().push(ResolveMetrics {
elapsed_us,
reasons: vec![ResolveReason::Error],
});
});
Response::error(
"Unexpected suspended response",
500,
)?
.with_cors_headers(&allowed_origin)
}
}
}
Err(msg) => Response::error(msg, 500)?
.with_cors_headers(&allowed_origin),
Err(msg) => {
let elapsed_us = ((performance_now() - start) * 1000.0) as u32;
PENDING_METRICS.with(|m| {
m.borrow_mut().push(ResolveMetrics {
elapsed_us,
reasons: vec![ResolveReason::Error],
});
});
Response::error(msg, 500)?
.with_cors_headers(&allowed_origin)
}
}
}
Err(msg) => {
let elapsed_us = ((performance_now() - start) * 1000.0) as u32;
PENDING_METRICS.with(|m| {
m.borrow_mut().push(ResolveMetrics {
elapsed_us,
reasons: vec![ResolveReason::Error],
});
});
Response::error(msg, 500)?.with_cors_headers(&allowed_origin)
}
}
Expand Down Expand Up @@ -250,8 +323,18 @@ pub async fn main(req: Request, env: Env, ctx: Context) -> Result<Response> {
.run(req, env)
.await;

// Use ctx.waitUntil to run logging after response is returned
// Use ctx.waitUntil to run logging and telemetry after response is returned
ctx.wait_until(async move {
// Record pending resolve metrics into the telemetry counters
PENDING_METRICS.with(|m| {
for metrics in m.borrow_mut().drain(..) {
TELEMETRY.record_latency_us(metrics.elapsed_us);
for reason in metrics.reasons {
TELEMETRY.mark_resolve(reason);
}
}
});

let aggregated: confidence_resolver::proto::confidence::flags::resolver::v1::WriteFlagLogsRequest
= checkpoint();
if let Ok(converted) = serde_json::to_string(&aggregated) {
Expand Down Expand Up @@ -284,6 +367,12 @@ pub async fn consume_flag_logs_queue(
client_resolve_info: v.client_resolve_info,
})
.collect();

// Accumulate telemetry deltas into KV-backed cumulative snapshot for /metrics
if let Ok(kv) = env.kv("CONFIDENCE_METRICS_KV") {
let _ = update_prometheus_kv(&kv, &logs).await;
}

let req = flag_logger::aggregate_batch(logs);
send_flags_logs(CONFIDENCE_CLIENT_SECRET.get().unwrap().as_str(), req).await?;
}
Expand All @@ -293,10 +382,42 @@ pub async fn consume_flag_logs_queue(

fn checkpoint() -> WriteFlagLogsRequest {
let mut req = RESOLVE_LOGGER.checkpoint();
req.telemetry_data = Some(TELEMETRY.delta_snapshot(&LAST_FLUSHED));
ASSIGN_LOGGER.checkpoint_fill(&mut req);
req
}

/// Accumulate telemetry deltas from all isolates into a cumulative
/// `TelemetrySnapshot` stored in KV, then write its Prometheus text
/// representation for the /metrics endpoint.
///
/// Note: concurrent queue consumer invocations can race on KV read-modify-write.
/// Acceptable for metrics — at worst one batch's deltas are lost, not cumulative state.
async fn update_prometheus_kv(kv: &kv::KvStore, logs: &[WriteFlagLogsRequest]) {
let mut cumulative = match kv.get("snapshot").text().await {
Ok(Some(text)) => serde_json::from_str::<TelemetrySnapshot>(&text).unwrap_or_default(),
_ => TelemetrySnapshot::default(),
};

for log in logs {
if let Some(td) = &log.telemetry_data {
cumulative.accumulate_delta(td);
}
}

let prom_text = cumulative.to_prometheus(
"cf-resolver",
&confidence_resolver::telemetry::PrometheusConfig::default(),
);

if let Ok(builder) = kv.put("snapshot", serde_json::to_string(&cumulative).unwrap_or_default()) {
let _ = builder.execute().await;
}
if let Ok(builder) = kv.put("prometheus", prom_text) {
let _ = builder.execute().await;
}
}

async fn send_flags_logs(client_secret: &str, message: WriteFlagLogsRequest) -> Result<Response> {
let resolve_url = "https://resolver.confidence.dev/v1/clientFlagLogs:write";
let mut init = RequestInit::new();
Expand Down
3 changes: 3 additions & 0 deletions confidence-cloudflare-resolver/wrangler.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,8 @@ max_batch_timeout = 10 # seconds
queue = "flag-logs-queue"
binding = "flag_logs_queue"

# KV namespace for /metrics endpoint is created and injected by the deployer.
# See deployer/script.sh for auto-creation of the CONFIDENCE_METRICS_KV binding.

[vars]
CONFIDENCE_CLIENT_SECRET = "SECRET"
Loading
Loading