Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ tempfile = "3.20.0"
lazy_static = "1.4.0"
prost = "0.13.1"
dashmap = "6.1.0"
indexmap = { version = "2.13.0", features = ["serde"] }

[build-dependencies]
cargo_toml = "0.21"
Expand Down
9 changes: 0 additions & 9 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,15 +442,6 @@ pub struct Options {
)]
pub max_event_payload_size: usize,

// collect dataset stats
#[arg(
long,
env = "P_COLLECT_DATASET_STATS",
default_value = "false",
help = "Enable/Disable collecting dataset stats"
)]
pub collect_dataset_stats: bool,

// the duration during which local sync should be completed
#[arg(
long,
Expand Down
5 changes: 1 addition & 4 deletions src/handlers/http/health_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,8 @@ pub async fn shutdown() {
// Perform sync operations
perform_sync_operations().await;

// If collect_dataset_stats is enabled, perform sync operations
// This is to ensure that all stats data is synced before the server shuts down
if PARSEABLE.options.collect_dataset_stats {
perform_sync_operations().await;
}
perform_sync_operations().await;
}

async fn set_shutdown_flag() {
Expand Down
3 changes: 2 additions & 1 deletion src/handlers/http/modal/query_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ impl ParseableServer for QueryServer {
.service(Server::get_alerts_webscope())
.service(Server::get_targets_webscope())
.service(Self::get_cluster_web_scope())
.service(Server::get_demo_data_webscope()),
.service(Server::get_demo_data_webscope())
.service(Server::get_dataset_stats_webscope()),
)
.service(
web::scope(&prism_base_path())
Expand Down
10 changes: 9 additions & 1 deletion src/handlers/http/modal/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use crate::hottier::HotTierManager;
use crate::metrics;
use crate::migration;
use crate::storage;
use crate::storage::field_stats::get_dataset_stats;
use crate::sync;
use crate::sync::sync_start;

Expand Down Expand Up @@ -100,7 +101,8 @@ impl ParseableServer for Server {
.service(Self::get_alerts_webscope())
.service(Self::get_targets_webscope())
.service(Self::get_metrics_webscope())
.service(Self::get_demo_data_webscope()),
.service(Self::get_demo_data_webscope())
.service(Self::get_dataset_stats_webscope()),
)
.service(
web::scope(&prism_base_path())
Expand Down Expand Up @@ -726,6 +728,12 @@ impl Server {
web::resource("/about").route(web::get().to(about::about).authorize(Action::GetAbout))
}

// get the dataset stats webscope
pub fn get_dataset_stats_webscope() -> Resource {
web::resource("/dataset_stats")
.route(web::post().to(get_dataset_stats).authorize(Action::Query))
}

// GET "/" ==> Serve the static frontend directory
pub fn get_generated() -> ResourceFiles {
ResourceFiles::new("/", generate()).resolve_not_found_to_root()
Expand Down
204 changes: 204 additions & 0 deletions src/storage/field_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,20 @@ use crate::event::format::LogSourceEntry;
use crate::event::format::json;
use crate::handlers::TelemetryType;
use crate::handlers::http::ingest::PostError;
use crate::handlers::http::query::Query;
use crate::handlers::http::query::QueryError;
use crate::handlers::http::query::query;
use crate::metadata::SchemaVersion;
use crate::parseable::PARSEABLE;
use crate::query::QUERY_SESSION_STATE;
use crate::storage::ObjectStorageError;
use crate::storage::StreamType;
use crate::utils::json::apply_generic_flattening_for_partition;
use actix_web::HttpRequest;
use actix_web::HttpResponse;
use actix_web::Responder;
use actix_web::body::MessageBody;
use actix_web::web::Json;
use arrow_array::Array;
use arrow_array::BinaryArray;
use arrow_array::BinaryViewArray;
Expand All @@ -48,7 +56,9 @@ use chrono::Utc;
use datafusion::prelude::ParquetReadOptions;
use datafusion::prelude::SessionContext;
use futures::StreamExt;
use indexmap::IndexMap;
use regex::Regex;
use serde::Deserialize;
use serde::Serialize;
use std::collections::HashMap;
use std::collections::HashSet;
Expand Down Expand Up @@ -456,6 +466,200 @@ fn extract_datetime_from_parquet_path_regex(
}
}

/// Request for stats, received from API/SQL query.
#[derive(Debug, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct DataSetStatsRequest {
/// Name of the stream to get stats for
pub dataset_name: String,
/// Included start time for stats query
pub start_time: String,
/// Excluded end time for stats query
pub end_time: String,
/// Fields list to fetch stats for
/// If empty, stats for all fields will be returned
#[serde(default)]
pub fields: Vec<String>,
/// Offset for pagination of distinct values (default: 0)
pub offset: Option<u64>,
/// Limit for number of distinct values per field (default: 5)
pub limit: Option<u64>,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct FieldStats {
field_count: i64,
distinct_count: i64,
distinct_values: IndexMap<String, i64>,
}

#[derive(Serialize, Deserialize)]
pub struct QueryRow {
field_name: String,
field_count: i64,
distinct_count: i64,
distinct_value: String,
distinct_value_count: i64,
}

/// API handler to get the field stats for a dataset
/// If `fields` is empty, stats for all fields will be returned
/// If `fields` is provided, stats for those fields will be returned
pub async fn get_dataset_stats(
req: HttpRequest,
dataset_stats_request: Json<DataSetStatsRequest>,
) -> Result<impl Responder, QueryError> {
let offset = dataset_stats_request.offset.unwrap_or(0);
let limit = dataset_stats_request.limit.unwrap_or(5);

let sql = if dataset_stats_request.fields.is_empty() {
build_stats_sql(&dataset_stats_request.dataset_name, None, offset, limit)
} else {
build_stats_sql(
&dataset_stats_request.dataset_name,
Some(&dataset_stats_request.fields),
offset,
limit,
)
};

// create query request
let query_request = Query {
query: sql,
start_time: dataset_stats_request.start_time.clone(),
end_time: dataset_stats_request.end_time.clone(),
filter_tags: None,
fields: false,
streaming: false,
send_null: false,
};

let response = query(req, query_request).await?;
let body_bytes = response.into_body().try_into_bytes().map_err(|_| {
QueryError::CustomError("error in converting response to bytes".to_string())
})?;

let body_str = std::str::from_utf8(&body_bytes).map_err(|_| {
QueryError::CustomError("error in converting response bytes to string".to_string())
})?;

let rows: Vec<QueryRow> =
serde_json::from_str(body_str).map_err(|e| QueryError::CustomError(e.to_string()))?;

let field_stats = transform_query_results(rows)?;
let response = HttpResponse::Ok().json(field_stats);

Ok(response)
}

pub fn transform_query_results(
rows: Vec<QueryRow>,
) -> Result<HashMap<String, FieldStats>, QueryError> {
let mut field_map: HashMap<String, FieldStats> = HashMap::new();

for row in rows {
let field_stats = field_map
.entry(row.field_name.clone())
.or_insert_with(|| FieldStats {
field_count: row.field_count,
distinct_count: row.distinct_count,
distinct_values: IndexMap::new(),
});

field_stats
.distinct_values
.insert(row.distinct_value, row.distinct_value_count);
}

//sort field_stats distinct_values by distinct_value_count in descending order
for field_stats in field_map.values_mut() {
field_stats.distinct_values.sort_by(|_k1, v1, _k2, v2| {
v2.partial_cmp(v1).unwrap_or(std::cmp::Ordering::Equal) // Descending order
});
}

Ok(field_map)
}

/// Builds the SQL query to get field stats for a dataset.
/// If `fields` is `None`, stats for all fields will be returned.
/// If `fields` is `Some`, stats for those fields will be returned.
/// `offset` and `limit` control pagination of distinct values per field.
pub fn build_stats_sql(
dataset_name: &str,
fields: Option<&[String]>,
offset: u64,
limit: u64,
) -> String {
let fields_filter = if let Some(fields) = fields {
if !fields.is_empty() {
let quoted_fields: Vec<String> = fields
.iter()
.map(|f| format!("'{}'", f.replace('\'', "''")))
.collect();
format!("AND rv.field_name IN ({})", quoted_fields.join(", "))
} else {
String::default()
}
} else {
String::default()
};
let dataset_name = dataset_name.replace('"', "\"\"");

// Calculate the row number range based on offset and limit
let rn_start = offset;
let rn_end = offset + limit;

format!(
"WITH field_totals AS (
SELECT
field_stats_field_name,
SUM(field_stats_count) as total_field_count
FROM (
SELECT DISTINCT
p_timestamp,
field_stats_field_name,
field_stats_count
FROM {DATASET_STATS_STREAM_NAME}
WHERE dataset_name = '{dataset_name}'
) deduped
GROUP BY field_stats_field_name
),
ranked_values AS (
SELECT
field_stats_field_name as field_name,
field_stats_distinct_stats_distinct_value as distinct_value,
SUM(field_stats_distinct_stats_count) as distinct_value_count,
ROW_NUMBER() OVER (
PARTITION BY field_stats_field_name
ORDER BY SUM(field_stats_distinct_stats_count) DESC
) as rn
FROM {DATASET_STATS_STREAM_NAME}
WHERE dataset_name = '{dataset_name}'
AND field_stats_distinct_stats_distinct_value IS NOT NULL
GROUP BY field_stats_field_name, field_stats_distinct_stats_distinct_value
),
field_distinct_counts AS (
SELECT
field_name,
COUNT(*) as distinct_count
FROM ranked_values
GROUP BY field_name
)
SELECT
rv.field_name,
ft.total_field_count as field_count,
fdc.distinct_count,
rv.distinct_value,
rv.distinct_value_count
FROM ranked_values rv
JOIN field_totals ft ON rv.field_name = ft.field_stats_field_name
JOIN field_distinct_counts fdc ON rv.field_name = fdc.field_name
WHERE rv.rn > {rn_start} AND rv.rn <= {rn_end}
{fields_filter}
ORDER BY rv.field_name, rv.distinct_value_count DESC"
)
}
#[cfg(test)]
mod tests {
use std::{fs::OpenOptions, sync::Arc};
Expand Down
2 changes: 1 addition & 1 deletion src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ async fn calculate_stats_if_enabled(
path: &std::path::Path,
schema: &Arc<Schema>,
) {
if stream_name != DATASET_STATS_STREAM_NAME && PARSEABLE.options.collect_dataset_stats {
if stream_name != DATASET_STATS_STREAM_NAME {
let max_field_statistics = PARSEABLE.options.max_field_statistics;
if let Err(err) =
calculate_field_stats(stream_name, path, schema, max_field_statistics).await
Expand Down
Loading