diff --git a/src/prism/home/mod.rs b/src/prism/home/mod.rs index c99cd86dd..611d4c84d 100644 --- a/src/prism/home/mod.rs +++ b/src/prism/home/mod.rs @@ -15,28 +15,24 @@ * along with this program. If not, see . * */ - -use std::collections::HashMap; - use actix_web::http::StatusCode; use actix_web::http::header::ContentType; -use chrono::Utc; use itertools::Itertools; -use serde::Serialize; +use serde::{Deserialize, Serialize}; use tracing::error; use crate::{ - alerts::{ALERTS, AlertError, alert_structs::AlertsSummary, get_alerts_summary}, + alerts::{ALERTS, AlertError, AlertState}, correlation::{CORRELATIONS, CorrelationError}, event::format::{LogSource, LogSourceEntry}, - handlers::{ - TelemetryType, - http::{cluster::fetch_daily_stats, logstream::error::StreamError}, - }, + handlers::{TelemetryType, http::logstream::error::StreamError}, metastore::MetastoreError, parseable::PARSEABLE, - rbac::{Users, map::SessionKey, role::Action}, - stats::Stats, + rbac::{ + Users, + map::{SessionKey, users}, + role::Action, + }, storage::{ObjectStorageError, ObjectStoreFormat, StreamType}, users::{dashboards::DASHBOARDS, filters::FILTERS}, }; @@ -52,14 +48,6 @@ type StreamMetadataResponse = Result< PrismHomeError, >; -#[derive(Debug, Serialize, Default)] -pub struct DatedStats { - date: String, - events: u64, - ingestion: u64, - storage: u64, -} - #[derive(Debug, Serialize)] #[serde(rename_all = "camelCase")] pub struct DataSet { @@ -70,13 +58,21 @@ pub struct DataSet { dataset_format: LogSource, } +#[derive(Debug, Serialize, Deserialize, Default)] +#[serde(rename_all = "camelCase")] +pub struct Checklist { + pub data_ingested: bool, + pub keystone_created: bool, + pub alert_created: bool, + pub user_added: bool, +} + #[derive(Debug, Serialize)] #[serde(rename_all = "camelCase")] pub struct HomeResponse { - pub alerts_summary: AlertsSummary, - pub stats_details: Vec, pub datasets: Vec, - pub top_five_ingestion: HashMap, + pub checklist: Checklist, + pub triggered_alerts_count: u64, } #[derive(Debug, Serialize)] @@ -105,22 +101,17 @@ pub async fn generate_home_response( include_internal: bool, ) -> Result { // Execute these operations concurrently - let (stream_titles_result, alerts_summary_result) = - tokio::join!(get_stream_titles(key), get_alerts_summary(key)); - - let stream_titles = stream_titles_result?; - let alerts_summary = alerts_summary_result?; - - // Generate dates for date-wise stats - let mut dates = (0..7) - .map(|i| { - Utc::now() - .checked_sub_signed(chrono::Duration::days(i)) - .expect("Date conversion failed") + let all_streams = PARSEABLE.metastore.list_streams().await?; + + let stream_titles: Vec = all_streams + .iter() + .filter(|logstream| { + Users.authorize(key.clone(), Action::ListStream, Some(logstream), None) + == crate::rbac::Response::Authorized }) - .map(|date| date.format("%Y-%m-%d").to_string()) + .cloned() + .sorted() .collect_vec(); - dates.reverse(); // Process stream metadata concurrently let stream_metadata_futures = stream_titles @@ -129,7 +120,6 @@ pub async fn generate_home_response( let stream_metadata_results: Vec = futures::future::join_all(stream_metadata_futures).await; - let mut stream_wise_stream_json: HashMap> = HashMap::new(); let mut datasets = Vec::new(); for result in stream_metadata_results { @@ -143,7 +133,7 @@ pub async fn generate_home_response( { continue; } - stream_wise_stream_json.insert(stream.clone(), metadata); + datasets.push(DataSet { title: stream, dataset_type, @@ -158,66 +148,44 @@ pub async fn generate_home_response( } } - let top_five_ingestion = get_top_5_streams_by_ingestion(&stream_wise_stream_json); - - // Process stats for all dates concurrently - let stats_futures = dates - .iter() - .map(|date| stats_for_date(date.clone(), stream_wise_stream_json.clone())); - let stats_results: Vec> = - futures::future::join_all(stats_futures).await; - - let mut stream_details = Vec::new(); - - for result in stats_results { - match result { - Ok(dated_stats) => { - stream_details.push(dated_stats); - } - Err(e) => { - error!("Failed to process stats for date: {:?}", e); - // Continue with other dates instead of failing entirely - } + // Generate checklist and count triggered alerts + let data_ingested = !all_streams.is_empty(); + let user_count = users().len(); + let user_added = user_count > 1; // more than just the default admin user + + // Calculate triggered alerts count + let (alert_created, triggered_alerts_count) = { + let guard = ALERTS.read().await; + if let Some(alerts) = guard.as_ref() { + let user_alerts = alerts.list_alerts_for_user(key.clone(), vec![]).await?; + let total_alerts = !user_alerts.is_empty(); + + // Count alerts currently in triggered state + let triggered_count = user_alerts + .iter() + .filter(|alert| alert.state == AlertState::Triggered) + .count() as u64; + + (total_alerts, triggered_count) + } else { + (false, 0) } - } + }; + + let checklist = Checklist { + data_ingested, + keystone_created: false, // Enterprise will override + alert_created, + user_added, + }; Ok(HomeResponse { - stats_details: stream_details, datasets, - alerts_summary, - top_five_ingestion, + checklist, + triggered_alerts_count, }) } -fn get_top_5_streams_by_ingestion( - stream_wise_stream_json: &HashMap>, -) -> HashMap { - let mut result: Vec<_> = stream_wise_stream_json - .iter() - .map(|(stream_name, formats)| { - let total_stats = formats.iter().fold( - Stats { - events: 0, - ingestion: 0, - storage: 0, - }, - |mut acc, osf| { - let current = &osf.stats.current_stats; - acc.events += current.events; - acc.ingestion += current.ingestion; - acc.storage += current.storage; - acc - }, - ); - (stream_name.clone(), total_stats) - }) - .collect(); - - result.sort_by_key(|(_, stats)| std::cmp::Reverse(stats.ingestion)); - result.truncate(5); - result.into_iter().collect() -} - async fn get_stream_metadata( stream: String, ) -> Result< @@ -272,50 +240,6 @@ async fn get_stream_metadata( )) } -async fn stats_for_date( - date: String, - stream_wise_meta: HashMap>, -) -> Result { - // Initialize result structure - let mut details = DatedStats { - date: date.clone(), - ..Default::default() - }; - - // Process each stream concurrently - let stream_stats_futures = stream_wise_meta - .values() - .map(|meta| get_stream_stats_for_date(date.clone(), meta)); - - let stream_stats_results = futures::future::join_all(stream_stats_futures).await; - - // Aggregate results - for result in stream_stats_results { - match result { - Ok((events, ingestion, storage)) => { - details.events += events; - details.ingestion += ingestion; - details.storage += storage; - } - Err(e) => { - error!("Failed to get stats for stream: {:?}", e); - // Continue with other streams - } - } - } - - Ok(details) -} - -async fn get_stream_stats_for_date( - date: String, - meta: &[ObjectStoreFormat], -) -> Result<(u64, u64, u64), PrismHomeError> { - let stats = fetch_daily_stats(&date, meta)?; - - Ok((stats.events, stats.ingestion, stats.storage)) -} - pub async fn generate_home_search_response( key: &SessionKey, query_value: &str,