diff --git a/Cargo.lock b/Cargo.lock index 2804eee04..cbad3c985 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2972,12 +2972,14 @@ checksum = "e8a5a9a0ff0086c7a148acb942baaabeadf9504d10400b5a05645853729b9cd2" [[package]] name = "indexmap" -version = "2.12.1" +version = "2.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ad4bb2b565bca0645f4d68c5c9af97fba094e9791da685bf83cb5f3ce74acf2" +checksum = "7714e70437a7dc3ac8eb7e6f8df75fd8eb422675fc7678aff7364301092b1017" dependencies = [ "equivalent", "hashbrown 0.16.1", + "serde", + "serde_core", ] [[package]] @@ -3724,6 +3726,7 @@ dependencies = [ "human-size", "humantime", "humantime-serde", + "indexmap", "itertools 0.14.0", "lazy_static", "num_cpus", diff --git a/Cargo.toml b/Cargo.toml index 1b7ca39e7..002e3c12b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -164,6 +164,7 @@ tempfile = "3.20.0" lazy_static = "1.4.0" prost = "0.13.1" dashmap = "6.1.0" +indexmap = { version = "2.13.0", features = ["serde"] } [build-dependencies] cargo_toml = "0.21" diff --git a/src/cli.rs b/src/cli.rs index 61b6fa5b7..d5f9abd60 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -442,15 +442,6 @@ pub struct Options { )] pub max_event_payload_size: usize, - // collect dataset stats - #[arg( - long, - env = "P_COLLECT_DATASET_STATS", - default_value = "false", - help = "Enable/Disable collecting dataset stats" - )] - pub collect_dataset_stats: bool, - // the duration during which local sync should be completed #[arg( long, diff --git a/src/handlers/http/health_check.rs b/src/handlers/http/health_check.rs index 90e0b07a3..65900c221 100644 --- a/src/handlers/http/health_check.rs +++ b/src/handlers/http/health_check.rs @@ -65,11 +65,8 @@ pub async fn shutdown() { // Perform sync operations perform_sync_operations().await; - // If collect_dataset_stats is enabled, perform sync operations // This is to ensure that all stats data is synced before the server shuts down - if PARSEABLE.options.collect_dataset_stats { - perform_sync_operations().await; - } + perform_sync_operations().await; } async fn set_shutdown_flag() { diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index de7a55f88..1f4c567d7 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -77,7 +77,8 @@ impl ParseableServer for QueryServer { .service(Server::get_alerts_webscope()) .service(Server::get_targets_webscope()) .service(Self::get_cluster_web_scope()) - .service(Server::get_demo_data_webscope()), + .service(Server::get_demo_data_webscope()) + .service(Server::get_dataset_stats_webscope()), ) .service( web::scope(&prism_base_path()) diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index c288e9d5d..be798a932 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -37,6 +37,7 @@ use crate::hottier::HotTierManager; use crate::metrics; use crate::migration; use crate::storage; +use crate::storage::field_stats::get_dataset_stats; use crate::sync; use crate::sync::sync_start; @@ -100,7 +101,8 @@ impl ParseableServer for Server { .service(Self::get_alerts_webscope()) .service(Self::get_targets_webscope()) .service(Self::get_metrics_webscope()) - .service(Self::get_demo_data_webscope()), + .service(Self::get_demo_data_webscope()) + .service(Self::get_dataset_stats_webscope()), ) .service( web::scope(&prism_base_path()) @@ -726,6 +728,12 @@ impl Server { web::resource("/about").route(web::get().to(about::about).authorize(Action::GetAbout)) } + // get the dataset stats webscope + pub fn get_dataset_stats_webscope() -> Resource { + web::resource("/dataset_stats") + .route(web::post().to(get_dataset_stats).authorize(Action::Query)) + } + // GET "/" ==> Serve the static frontend directory pub fn get_generated() -> ResourceFiles { ResourceFiles::new("/", generate()).resolve_not_found_to_root() diff --git a/src/storage/field_stats.rs b/src/storage/field_stats.rs index 817a66cd3..359f6bdf8 100644 --- a/src/storage/field_stats.rs +++ b/src/storage/field_stats.rs @@ -23,12 +23,20 @@ use crate::event::format::LogSourceEntry; use crate::event::format::json; use crate::handlers::TelemetryType; use crate::handlers::http::ingest::PostError; +use crate::handlers::http::query::Query; +use crate::handlers::http::query::QueryError; +use crate::handlers::http::query::query; use crate::metadata::SchemaVersion; use crate::parseable::PARSEABLE; use crate::query::QUERY_SESSION_STATE; use crate::storage::ObjectStorageError; use crate::storage::StreamType; use crate::utils::json::apply_generic_flattening_for_partition; +use actix_web::HttpRequest; +use actix_web::HttpResponse; +use actix_web::Responder; +use actix_web::body::MessageBody; +use actix_web::web::Json; use arrow_array::Array; use arrow_array::BinaryArray; use arrow_array::BinaryViewArray; @@ -48,7 +56,9 @@ use chrono::Utc; use datafusion::prelude::ParquetReadOptions; use datafusion::prelude::SessionContext; use futures::StreamExt; +use indexmap::IndexMap; use regex::Regex; +use serde::Deserialize; use serde::Serialize; use std::collections::HashMap; use std::collections::HashSet; @@ -456,6 +466,200 @@ fn extract_datetime_from_parquet_path_regex( } } +/// Request for stats, received from API/SQL query. +#[derive(Debug, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct DataSetStatsRequest { + /// Name of the stream to get stats for + pub dataset_name: String, + /// Included start time for stats query + pub start_time: String, + /// Excluded end time for stats query + pub end_time: String, + /// Fields list to fetch stats for + /// If empty, stats for all fields will be returned + #[serde(default)] + pub fields: Vec, + /// Offset for pagination of distinct values (default: 0) + pub offset: Option, + /// Limit for number of distinct values per field (default: 5) + pub limit: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct FieldStats { + field_count: i64, + distinct_count: i64, + distinct_values: IndexMap, +} + +#[derive(Serialize, Deserialize)] +pub struct QueryRow { + field_name: String, + field_count: i64, + distinct_count: i64, + distinct_value: String, + distinct_value_count: i64, +} + +/// API handler to get the field stats for a dataset +/// If `fields` is empty, stats for all fields will be returned +/// If `fields` is provided, stats for those fields will be returned +pub async fn get_dataset_stats( + req: HttpRequest, + dataset_stats_request: Json, +) -> Result { + let offset = dataset_stats_request.offset.unwrap_or(0); + let limit = dataset_stats_request.limit.unwrap_or(5); + + let sql = if dataset_stats_request.fields.is_empty() { + build_stats_sql(&dataset_stats_request.dataset_name, None, offset, limit) + } else { + build_stats_sql( + &dataset_stats_request.dataset_name, + Some(&dataset_stats_request.fields), + offset, + limit, + ) + }; + + // create query request + let query_request = Query { + query: sql, + start_time: dataset_stats_request.start_time.clone(), + end_time: dataset_stats_request.end_time.clone(), + filter_tags: None, + fields: false, + streaming: false, + send_null: false, + }; + + let response = query(req, query_request).await?; + let body_bytes = response.into_body().try_into_bytes().map_err(|_| { + QueryError::CustomError("error in converting response to bytes".to_string()) + })?; + + let body_str = std::str::from_utf8(&body_bytes).map_err(|_| { + QueryError::CustomError("error in converting response bytes to string".to_string()) + })?; + + let rows: Vec = + serde_json::from_str(body_str).map_err(|e| QueryError::CustomError(e.to_string()))?; + + let field_stats = transform_query_results(rows)?; + let response = HttpResponse::Ok().json(field_stats); + + Ok(response) +} + +pub fn transform_query_results( + rows: Vec, +) -> Result, QueryError> { + let mut field_map: HashMap = HashMap::new(); + + for row in rows { + let field_stats = field_map + .entry(row.field_name.clone()) + .or_insert_with(|| FieldStats { + field_count: row.field_count, + distinct_count: row.distinct_count, + distinct_values: IndexMap::new(), + }); + + field_stats + .distinct_values + .insert(row.distinct_value, row.distinct_value_count); + } + + //sort field_stats distinct_values by distinct_value_count in descending order + for field_stats in field_map.values_mut() { + field_stats.distinct_values.sort_by(|_k1, v1, _k2, v2| { + v2.partial_cmp(v1).unwrap_or(std::cmp::Ordering::Equal) // Descending order + }); + } + + Ok(field_map) +} + +/// Builds the SQL query to get field stats for a dataset. +/// If `fields` is `None`, stats for all fields will be returned. +/// If `fields` is `Some`, stats for those fields will be returned. +/// `offset` and `limit` control pagination of distinct values per field. +pub fn build_stats_sql( + dataset_name: &str, + fields: Option<&[String]>, + offset: u64, + limit: u64, +) -> String { + let fields_filter = if let Some(fields) = fields { + if !fields.is_empty() { + let quoted_fields: Vec = fields + .iter() + .map(|f| format!("'{}'", f.replace('\'', "''"))) + .collect(); + format!("AND rv.field_name IN ({})", quoted_fields.join(", ")) + } else { + String::default() + } + } else { + String::default() + }; + let dataset_name = dataset_name.replace('"', "\"\""); + + // Calculate the row number range based on offset and limit + let rn_start = offset; + let rn_end = offset + limit; + + format!( + "WITH field_totals AS ( + SELECT + field_stats_field_name, + SUM(field_stats_count) as total_field_count + FROM ( + SELECT DISTINCT + p_timestamp, + field_stats_field_name, + field_stats_count + FROM {DATASET_STATS_STREAM_NAME} + WHERE dataset_name = '{dataset_name}' + ) deduped + GROUP BY field_stats_field_name + ), + ranked_values AS ( + SELECT + field_stats_field_name as field_name, + field_stats_distinct_stats_distinct_value as distinct_value, + SUM(field_stats_distinct_stats_count) as distinct_value_count, + ROW_NUMBER() OVER ( + PARTITION BY field_stats_field_name + ORDER BY SUM(field_stats_distinct_stats_count) DESC + ) as rn + FROM {DATASET_STATS_STREAM_NAME} + WHERE dataset_name = '{dataset_name}' + AND field_stats_distinct_stats_distinct_value IS NOT NULL + GROUP BY field_stats_field_name, field_stats_distinct_stats_distinct_value + ), + field_distinct_counts AS ( + SELECT + field_name, + COUNT(*) as distinct_count + FROM ranked_values + GROUP BY field_name + ) + SELECT + rv.field_name, + ft.total_field_count as field_count, + fdc.distinct_count, + rv.distinct_value, + rv.distinct_value_count + FROM ranked_values rv + JOIN field_totals ft ON rv.field_name = ft.field_stats_field_name + JOIN field_distinct_counts fdc ON rv.field_name = fdc.field_name + WHERE rv.rn > {rn_start} AND rv.rn <= {rn_end} + {fields_filter} + ORDER BY rv.field_name, rv.distinct_value_count DESC" + ) +} #[cfg(test)] mod tests { use std::{fs::OpenOptions, sync::Arc}; diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 3983beb6e..5204b7fca 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -196,7 +196,7 @@ async fn calculate_stats_if_enabled( path: &std::path::Path, schema: &Arc, ) { - if stream_name != DATASET_STATS_STREAM_NAME && PARSEABLE.options.collect_dataset_stats { + if stream_name != DATASET_STATS_STREAM_NAME { let max_field_statistics = PARSEABLE.options.max_field_statistics; if let Err(err) = calculate_field_stats(stream_name, path, schema, max_field_statistics).await