Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 128 additions & 1 deletion src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ use axum::{
response::{IntoResponse, Response},
};
use lazy_static::lazy_static;
#[cfg(target_os = "linux")]
use prometheus::process_collector::ProcessCollector;
use prometheus::{Encoder, HistogramOpts, HistogramVec, IntCounterVec, IntGauge, Opts, Registry, TextEncoder};
use std::sync::Arc;
use std::time::Instant;

use crate::http_server::AppState;
use rusx::error::SdkError;

// Define comprehensive metrics for REST API monitoring
lazy_static! {
Expand Down Expand Up @@ -47,6 +50,35 @@ lazy_static! {
&["method", "endpoint", "status"]
)
.unwrap();

// Twitter API metrics
pub static ref TWITTER_API_CALLS_TOTAL: IntCounterVec = IntCounterVec::new(
Opts::new("twitter_api_calls_total", "Total number of Twitter API calls"),
&["operation"]
)
.unwrap();
pub static ref TWITTER_API_CALL_DURATION: HistogramVec = HistogramVec::new(
HistogramOpts::new("twitter_api_call_duration_seconds", "Twitter API call duration in seconds")
.buckets(vec![0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0]),
&["operation"]
)
.unwrap();
pub static ref TWITTER_TWEETS_PULLED_TOTAL: IntCounterVec = IntCounterVec::new(
Opts::new("twitter_tweets_pulled_total", "Total number of tweets pulled from Twitter API"),
&["operation"]
)
.unwrap();
pub static ref TWITTER_API_ERRORS_TOTAL: IntCounterVec = IntCounterVec::new(
Opts::new("twitter_api_errors_total", "Total number of Twitter API errors"),
&["operation", "error_type"]
)
.unwrap();
pub static ref TWITTER_TWEETS_PER_CALL: HistogramVec = HistogramVec::new(
HistogramOpts::new("twitter_tweets_per_call", "Number of tweets returned per API call")
.buckets(vec![0.0, 1.0, 5.0, 10.0, 25.0, 50.0, 100.0]),
&["operation"]
)
.unwrap();
}

#[derive(Debug, Clone)]
Expand All @@ -64,14 +96,30 @@ impl Metrics {
pub fn new() -> Self {
let registry = Registry::new();

// Register all metrics
// Register OS/machine metrics collector (Linux only)
#[cfg(target_os = "linux")]
{
let process_collector = ProcessCollector::for_self();
registry.register(Box::new(process_collector)).unwrap();
}

// Register all custom HTTP metrics
registry.register(Box::new(HTTP_REQUESTS_TOTAL.clone())).unwrap();
registry.register(Box::new(HTTP_REQUEST_DURATION.clone())).unwrap();
registry.register(Box::new(HTTP_REQUESTS_IN_FLIGHT.clone())).unwrap();
registry.register(Box::new(HTTP_REQUEST_SIZE_BYTES.clone())).unwrap();
registry.register(Box::new(HTTP_RESPONSE_SIZE_BYTES.clone())).unwrap();
registry.register(Box::new(HTTP_ERRORS_TOTAL.clone())).unwrap();

// Register Twitter API metrics
registry.register(Box::new(TWITTER_API_CALLS_TOTAL.clone())).unwrap();
registry.register(Box::new(TWITTER_API_CALL_DURATION.clone())).unwrap();
registry
.register(Box::new(TWITTER_TWEETS_PULLED_TOTAL.clone()))
.unwrap();
registry.register(Box::new(TWITTER_API_ERRORS_TOTAL.clone())).unwrap();
registry.register(Box::new(TWITTER_TWEETS_PER_CALL.clone())).unwrap();

Self {
registry: Arc::new(registry),
}
Expand Down Expand Up @@ -169,6 +217,85 @@ pub async fn track_metrics(req: Request, next: Next) -> Response {
response
}

fn extract_sdk_error_type(err: &SdkError) -> &'static str {
match err {
SdkError::Api { status, .. } => {
// Categorize based on HTTP status code
// Match specific status codes first, then ranges
match *status {
400 => "bad_request",
401 => "unauthorized",
403 => "forbidden",
404 => "not_found",
429 => "rate_limit",
500..=599 => "server_error",
402 | 405..=428 | 430..=499 => "client_error",
_ => "twitter_api_error",
}
}
_ => "sdk_error",
}
}

/// Track Twitter API call metrics with error type detection
///
/// This function should be called around Twitter API calls to track:
/// - API call duration
/// - Number of tweets pulled
/// - Errors with specific error types extracted from SdkError
///
/// # Arguments
/// * `operation` - The type of operation (e.g., "search_recent", "tweets_get_many")
/// * `f` - The async function that makes the Twitter API call
///
/// # Returns
/// The result of the API call, with metrics automatically tracked
pub async fn track_twitter_api_call<T, F>(operation: &str, f: F) -> Result<T, SdkError>
where
F: std::future::Future<Output = Result<T, SdkError>>,
{
let start = Instant::now();

// Track API call attempt
TWITTER_API_CALLS_TOTAL.with_label_values(&[operation]).inc();

let result = f.await;

// Record duration
let duration = start.elapsed().as_secs_f64();
TWITTER_API_CALL_DURATION
.with_label_values(&[operation])
.observe(duration);

// Track errors with specific error types
if let Err(ref err) = result {
let error_type = extract_sdk_error_type(err);
TWITTER_API_ERRORS_TOTAL
.with_label_values(&[operation, error_type])
.inc();
}

result
}

/// Track tweets pulled from a Twitter API call
///
/// Call this after successfully getting tweets from the API to track:
/// - Total tweets pulled
/// - Tweets per call distribution
///
/// # Arguments
/// * `operation` - The type of operation (e.g., "search_recent", "tweets_get_many")
/// * `tweet_count` - Number of tweets returned by the API call
pub fn track_tweets_pulled(operation: &str, tweet_count: usize) {
TWITTER_TWEETS_PULLED_TOTAL
.with_label_values(&[operation])
.inc_by(tweet_count as u64);
TWITTER_TWEETS_PER_CALL
.with_label_values(&[operation])
.observe(tweet_count as f64);
}

pub async fn metrics_handler(State(state): State<AppState>) -> impl IntoResponse {
let encoder = TextEncoder::new();
let metric_families = state.metrics.registry.gather();
Expand Down
23 changes: 15 additions & 8 deletions src/services/raid_leaderboard_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use rusx::{

use crate::{
db_persistence::DbPersistence,
metrics::{track_tweets_pulled, track_twitter_api_call},
models::raid_submission::{RaidSubmission, UpdateRaidSubmissionStats},
services::alert_service::AlertService,
AppError, AppResult, Config,
Expand Down Expand Up @@ -105,20 +106,26 @@ impl RaidLeaderboardService {
for query in queries {
rate_limiter.tick().await;

let response = self
.twitter_gateway
.tweets()
.get_many(query, Some(params.clone()))
.await?;
// Track Twitter API call with metrics
let response = track_twitter_api_call("tweets_get_many", async {
self.twitter_gateway
.tweets()
.get_many(query, Some(params.clone()))
.await
})
.await?;

let Some(tweets) = &response.data else {
tracing::info!("No tweets found!.");
continue;
};

// Track Twitter API usage
let tweets_pulled = tweets.len() as i32;
self.alert_service.track_and_alert_usage(tweets_pulled).await?;
// Track Twitter API usage (for alerting)
let tweets_pulled = tweets.len();
self.alert_service.track_and_alert_usage(tweets_pulled as i32).await?;

// Track metrics for tweets pulled
track_tweets_pulled("tweets_get_many", tweets_pulled);

// EXTRACT: Collect all referenced IDs from the fetched tweets
// We use a HashSet immediately to remove duplicates before sending to DB
Expand Down
16 changes: 12 additions & 4 deletions src/services/tweet_synchronizer_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use rusx::{

use crate::{
db_persistence::DbPersistence,
metrics::{track_tweets_pulled, track_twitter_api_call},
models::{relevant_tweet::NewTweetPayload, tweet_author::NewAuthorPayload},
services::{alert_service::AlertService, telegram_service::TelegramService},
utils::x_url::build_x_status_url,
Expand Down Expand Up @@ -204,14 +205,21 @@ impl TweetSynchronizerService {
tracing::info!("No history found, performing full 7-day fetch.");
}

let response = self.twitter_gateway.search().recent(params).await?;
// Track Twitter API call with metrics
let response = track_twitter_api_call("search_recent", async {
self.twitter_gateway.search().recent(params).await
})
.await?;

let tweet_authors = self.process_tweet_authors(&response).await?;
let relevant_tweets = self.process_relevant_tweets(&response).await?;

// Track Twitter API usage
let tweets_pulled = relevant_tweets.len() as i32;
self.alert_service.track_and_alert_usage(tweets_pulled).await?;
// Track Twitter API usage (for alerting)
let tweets_pulled = relevant_tweets.len();
self.alert_service.track_and_alert_usage(tweets_pulled as i32).await?;

// Track metrics for tweets pulled
track_tweets_pulled("search_recent", tweets_pulled);

self.process_sending_raid_targets(&tweet_authors, &relevant_tweets)
.await?;
Expand Down