diff --git a/src/metrics.rs b/src/metrics.rs index c8bb113..b6c5e3d 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -5,11 +5,14 @@ use axum::{ response::{IntoResponse, Response}, }; use lazy_static::lazy_static; +#[cfg(target_os = "linux")] +use prometheus::process_collector::ProcessCollector; use prometheus::{Encoder, HistogramOpts, HistogramVec, IntCounterVec, IntGauge, Opts, Registry, TextEncoder}; use std::sync::Arc; use std::time::Instant; use crate::http_server::AppState; +use rusx::error::SdkError; // Define comprehensive metrics for REST API monitoring lazy_static! { @@ -47,6 +50,35 @@ lazy_static! { &["method", "endpoint", "status"] ) .unwrap(); + + // Twitter API metrics + pub static ref TWITTER_API_CALLS_TOTAL: IntCounterVec = IntCounterVec::new( + Opts::new("twitter_api_calls_total", "Total number of Twitter API calls"), + &["operation"] + ) + .unwrap(); + pub static ref TWITTER_API_CALL_DURATION: HistogramVec = HistogramVec::new( + HistogramOpts::new("twitter_api_call_duration_seconds", "Twitter API call duration in seconds") + .buckets(vec![0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0]), + &["operation"] + ) + .unwrap(); + pub static ref TWITTER_TWEETS_PULLED_TOTAL: IntCounterVec = IntCounterVec::new( + Opts::new("twitter_tweets_pulled_total", "Total number of tweets pulled from Twitter API"), + &["operation"] + ) + .unwrap(); + pub static ref TWITTER_API_ERRORS_TOTAL: IntCounterVec = IntCounterVec::new( + Opts::new("twitter_api_errors_total", "Total number of Twitter API errors"), + &["operation", "error_type"] + ) + .unwrap(); + pub static ref TWITTER_TWEETS_PER_CALL: HistogramVec = HistogramVec::new( + HistogramOpts::new("twitter_tweets_per_call", "Number of tweets returned per API call") + .buckets(vec![0.0, 1.0, 5.0, 10.0, 25.0, 50.0, 100.0]), + &["operation"] + ) + .unwrap(); } #[derive(Debug, Clone)] @@ -64,7 +96,14 @@ impl Metrics { pub fn new() -> Self { let registry = Registry::new(); - // Register all metrics + // Register OS/machine metrics collector (Linux only) + #[cfg(target_os = "linux")] + { + let process_collector = ProcessCollector::for_self(); + registry.register(Box::new(process_collector)).unwrap(); + } + + // Register all custom HTTP metrics registry.register(Box::new(HTTP_REQUESTS_TOTAL.clone())).unwrap(); registry.register(Box::new(HTTP_REQUEST_DURATION.clone())).unwrap(); registry.register(Box::new(HTTP_REQUESTS_IN_FLIGHT.clone())).unwrap(); @@ -72,6 +111,15 @@ impl Metrics { registry.register(Box::new(HTTP_RESPONSE_SIZE_BYTES.clone())).unwrap(); registry.register(Box::new(HTTP_ERRORS_TOTAL.clone())).unwrap(); + // Register Twitter API metrics + registry.register(Box::new(TWITTER_API_CALLS_TOTAL.clone())).unwrap(); + registry.register(Box::new(TWITTER_API_CALL_DURATION.clone())).unwrap(); + registry + .register(Box::new(TWITTER_TWEETS_PULLED_TOTAL.clone())) + .unwrap(); + registry.register(Box::new(TWITTER_API_ERRORS_TOTAL.clone())).unwrap(); + registry.register(Box::new(TWITTER_TWEETS_PER_CALL.clone())).unwrap(); + Self { registry: Arc::new(registry), } @@ -169,6 +217,85 @@ pub async fn track_metrics(req: Request, next: Next) -> Response { response } +fn extract_sdk_error_type(err: &SdkError) -> &'static str { + match err { + SdkError::Api { status, .. } => { + // Categorize based on HTTP status code + // Match specific status codes first, then ranges + match *status { + 400 => "bad_request", + 401 => "unauthorized", + 403 => "forbidden", + 404 => "not_found", + 429 => "rate_limit", + 500..=599 => "server_error", + 402 | 405..=428 | 430..=499 => "client_error", + _ => "twitter_api_error", + } + } + _ => "sdk_error", + } +} + +/// Track Twitter API call metrics with error type detection +/// +/// This function should be called around Twitter API calls to track: +/// - API call duration +/// - Number of tweets pulled +/// - Errors with specific error types extracted from SdkError +/// +/// # Arguments +/// * `operation` - The type of operation (e.g., "search_recent", "tweets_get_many") +/// * `f` - The async function that makes the Twitter API call +/// +/// # Returns +/// The result of the API call, with metrics automatically tracked +pub async fn track_twitter_api_call(operation: &str, f: F) -> Result +where + F: std::future::Future>, +{ + let start = Instant::now(); + + // Track API call attempt + TWITTER_API_CALLS_TOTAL.with_label_values(&[operation]).inc(); + + let result = f.await; + + // Record duration + let duration = start.elapsed().as_secs_f64(); + TWITTER_API_CALL_DURATION + .with_label_values(&[operation]) + .observe(duration); + + // Track errors with specific error types + if let Err(ref err) = result { + let error_type = extract_sdk_error_type(err); + TWITTER_API_ERRORS_TOTAL + .with_label_values(&[operation, error_type]) + .inc(); + } + + result +} + +/// Track tweets pulled from a Twitter API call +/// +/// Call this after successfully getting tweets from the API to track: +/// - Total tweets pulled +/// - Tweets per call distribution +/// +/// # Arguments +/// * `operation` - The type of operation (e.g., "search_recent", "tweets_get_many") +/// * `tweet_count` - Number of tweets returned by the API call +pub fn track_tweets_pulled(operation: &str, tweet_count: usize) { + TWITTER_TWEETS_PULLED_TOTAL + .with_label_values(&[operation]) + .inc_by(tweet_count as u64); + TWITTER_TWEETS_PER_CALL + .with_label_values(&[operation]) + .observe(tweet_count as f64); +} + pub async fn metrics_handler(State(state): State) -> impl IntoResponse { let encoder = TextEncoder::new(); let metric_families = state.metrics.registry.gather(); diff --git a/src/services/raid_leaderboard_service.rs b/src/services/raid_leaderboard_service.rs index b09d558..1b0db37 100644 --- a/src/services/raid_leaderboard_service.rs +++ b/src/services/raid_leaderboard_service.rs @@ -10,6 +10,7 @@ use rusx::{ use crate::{ db_persistence::DbPersistence, + metrics::{track_tweets_pulled, track_twitter_api_call}, models::raid_submission::{RaidSubmission, UpdateRaidSubmissionStats}, services::alert_service::AlertService, AppError, AppResult, Config, @@ -105,20 +106,26 @@ impl RaidLeaderboardService { for query in queries { rate_limiter.tick().await; - let response = self - .twitter_gateway - .tweets() - .get_many(query, Some(params.clone())) - .await?; + // Track Twitter API call with metrics + let response = track_twitter_api_call("tweets_get_many", async { + self.twitter_gateway + .tweets() + .get_many(query, Some(params.clone())) + .await + }) + .await?; let Some(tweets) = &response.data else { tracing::info!("No tweets found!."); continue; }; - // Track Twitter API usage - let tweets_pulled = tweets.len() as i32; - self.alert_service.track_and_alert_usage(tweets_pulled).await?; + // Track Twitter API usage (for alerting) + let tweets_pulled = tweets.len(); + self.alert_service.track_and_alert_usage(tweets_pulled as i32).await?; + + // Track metrics for tweets pulled + track_tweets_pulled("tweets_get_many", tweets_pulled); // EXTRACT: Collect all referenced IDs from the fetched tweets // We use a HashSet immediately to remove duplicates before sending to DB diff --git a/src/services/tweet_synchronizer_service.rs b/src/services/tweet_synchronizer_service.rs index 16063e7..3d0f085 100644 --- a/src/services/tweet_synchronizer_service.rs +++ b/src/services/tweet_synchronizer_service.rs @@ -11,6 +11,7 @@ use rusx::{ use crate::{ db_persistence::DbPersistence, + metrics::{track_tweets_pulled, track_twitter_api_call}, models::{relevant_tweet::NewTweetPayload, tweet_author::NewAuthorPayload}, services::{alert_service::AlertService, telegram_service::TelegramService}, utils::x_url::build_x_status_url, @@ -204,14 +205,21 @@ impl TweetSynchronizerService { tracing::info!("No history found, performing full 7-day fetch."); } - let response = self.twitter_gateway.search().recent(params).await?; + // Track Twitter API call with metrics + let response = track_twitter_api_call("search_recent", async { + self.twitter_gateway.search().recent(params).await + }) + .await?; let tweet_authors = self.process_tweet_authors(&response).await?; let relevant_tweets = self.process_relevant_tweets(&response).await?; - // Track Twitter API usage - let tweets_pulled = relevant_tweets.len() as i32; - self.alert_service.track_and_alert_usage(tweets_pulled).await?; + // Track Twitter API usage (for alerting) + let tweets_pulled = relevant_tweets.len(); + self.alert_service.track_and_alert_usage(tweets_pulled as i32).await?; + + // Track metrics for tweets pulled + track_tweets_pulled("search_recent", tweets_pulled); self.process_sending_raid_targets(&tweet_authors, &relevant_tweets) .await?;