diff --git a/src/prism/home/mod.rs b/src/prism/home/mod.rs
index c99cd86dd..611d4c84d 100644
--- a/src/prism/home/mod.rs
+++ b/src/prism/home/mod.rs
@@ -15,28 +15,24 @@
* along with this program. If not, see .
*
*/
-
-use std::collections::HashMap;
-
use actix_web::http::StatusCode;
use actix_web::http::header::ContentType;
-use chrono::Utc;
use itertools::Itertools;
-use serde::Serialize;
+use serde::{Deserialize, Serialize};
use tracing::error;
use crate::{
- alerts::{ALERTS, AlertError, alert_structs::AlertsSummary, get_alerts_summary},
+ alerts::{ALERTS, AlertError, AlertState},
correlation::{CORRELATIONS, CorrelationError},
event::format::{LogSource, LogSourceEntry},
- handlers::{
- TelemetryType,
- http::{cluster::fetch_daily_stats, logstream::error::StreamError},
- },
+ handlers::{TelemetryType, http::logstream::error::StreamError},
metastore::MetastoreError,
parseable::PARSEABLE,
- rbac::{Users, map::SessionKey, role::Action},
- stats::Stats,
+ rbac::{
+ Users,
+ map::{SessionKey, users},
+ role::Action,
+ },
storage::{ObjectStorageError, ObjectStoreFormat, StreamType},
users::{dashboards::DASHBOARDS, filters::FILTERS},
};
@@ -52,14 +48,6 @@ type StreamMetadataResponse = Result<
PrismHomeError,
>;
-#[derive(Debug, Serialize, Default)]
-pub struct DatedStats {
- date: String,
- events: u64,
- ingestion: u64,
- storage: u64,
-}
-
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct DataSet {
@@ -70,13 +58,21 @@ pub struct DataSet {
dataset_format: LogSource,
}
+#[derive(Debug, Serialize, Deserialize, Default)]
+#[serde(rename_all = "camelCase")]
+pub struct Checklist {
+ pub data_ingested: bool,
+ pub keystone_created: bool,
+ pub alert_created: bool,
+ pub user_added: bool,
+}
+
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct HomeResponse {
- pub alerts_summary: AlertsSummary,
- pub stats_details: Vec,
pub datasets: Vec,
- pub top_five_ingestion: HashMap,
+ pub checklist: Checklist,
+ pub triggered_alerts_count: u64,
}
#[derive(Debug, Serialize)]
@@ -105,22 +101,17 @@ pub async fn generate_home_response(
include_internal: bool,
) -> Result {
// Execute these operations concurrently
- let (stream_titles_result, alerts_summary_result) =
- tokio::join!(get_stream_titles(key), get_alerts_summary(key));
-
- let stream_titles = stream_titles_result?;
- let alerts_summary = alerts_summary_result?;
-
- // Generate dates for date-wise stats
- let mut dates = (0..7)
- .map(|i| {
- Utc::now()
- .checked_sub_signed(chrono::Duration::days(i))
- .expect("Date conversion failed")
+ let all_streams = PARSEABLE.metastore.list_streams().await?;
+
+ let stream_titles: Vec = all_streams
+ .iter()
+ .filter(|logstream| {
+ Users.authorize(key.clone(), Action::ListStream, Some(logstream), None)
+ == crate::rbac::Response::Authorized
})
- .map(|date| date.format("%Y-%m-%d").to_string())
+ .cloned()
+ .sorted()
.collect_vec();
- dates.reverse();
// Process stream metadata concurrently
let stream_metadata_futures = stream_titles
@@ -129,7 +120,6 @@ pub async fn generate_home_response(
let stream_metadata_results: Vec =
futures::future::join_all(stream_metadata_futures).await;
- let mut stream_wise_stream_json: HashMap> = HashMap::new();
let mut datasets = Vec::new();
for result in stream_metadata_results {
@@ -143,7 +133,7 @@ pub async fn generate_home_response(
{
continue;
}
- stream_wise_stream_json.insert(stream.clone(), metadata);
+
datasets.push(DataSet {
title: stream,
dataset_type,
@@ -158,66 +148,44 @@ pub async fn generate_home_response(
}
}
- let top_five_ingestion = get_top_5_streams_by_ingestion(&stream_wise_stream_json);
-
- // Process stats for all dates concurrently
- let stats_futures = dates
- .iter()
- .map(|date| stats_for_date(date.clone(), stream_wise_stream_json.clone()));
- let stats_results: Vec> =
- futures::future::join_all(stats_futures).await;
-
- let mut stream_details = Vec::new();
-
- for result in stats_results {
- match result {
- Ok(dated_stats) => {
- stream_details.push(dated_stats);
- }
- Err(e) => {
- error!("Failed to process stats for date: {:?}", e);
- // Continue with other dates instead of failing entirely
- }
+ // Generate checklist and count triggered alerts
+ let data_ingested = !all_streams.is_empty();
+ let user_count = users().len();
+ let user_added = user_count > 1; // more than just the default admin user
+
+ // Calculate triggered alerts count
+ let (alert_created, triggered_alerts_count) = {
+ let guard = ALERTS.read().await;
+ if let Some(alerts) = guard.as_ref() {
+ let user_alerts = alerts.list_alerts_for_user(key.clone(), vec![]).await?;
+ let total_alerts = !user_alerts.is_empty();
+
+ // Count alerts currently in triggered state
+ let triggered_count = user_alerts
+ .iter()
+ .filter(|alert| alert.state == AlertState::Triggered)
+ .count() as u64;
+
+ (total_alerts, triggered_count)
+ } else {
+ (false, 0)
}
- }
+ };
+
+ let checklist = Checklist {
+ data_ingested,
+ keystone_created: false, // Enterprise will override
+ alert_created,
+ user_added,
+ };
Ok(HomeResponse {
- stats_details: stream_details,
datasets,
- alerts_summary,
- top_five_ingestion,
+ checklist,
+ triggered_alerts_count,
})
}
-fn get_top_5_streams_by_ingestion(
- stream_wise_stream_json: &HashMap>,
-) -> HashMap {
- let mut result: Vec<_> = stream_wise_stream_json
- .iter()
- .map(|(stream_name, formats)| {
- let total_stats = formats.iter().fold(
- Stats {
- events: 0,
- ingestion: 0,
- storage: 0,
- },
- |mut acc, osf| {
- let current = &osf.stats.current_stats;
- acc.events += current.events;
- acc.ingestion += current.ingestion;
- acc.storage += current.storage;
- acc
- },
- );
- (stream_name.clone(), total_stats)
- })
- .collect();
-
- result.sort_by_key(|(_, stats)| std::cmp::Reverse(stats.ingestion));
- result.truncate(5);
- result.into_iter().collect()
-}
-
async fn get_stream_metadata(
stream: String,
) -> Result<
@@ -272,50 +240,6 @@ async fn get_stream_metadata(
))
}
-async fn stats_for_date(
- date: String,
- stream_wise_meta: HashMap>,
-) -> Result {
- // Initialize result structure
- let mut details = DatedStats {
- date: date.clone(),
- ..Default::default()
- };
-
- // Process each stream concurrently
- let stream_stats_futures = stream_wise_meta
- .values()
- .map(|meta| get_stream_stats_for_date(date.clone(), meta));
-
- let stream_stats_results = futures::future::join_all(stream_stats_futures).await;
-
- // Aggregate results
- for result in stream_stats_results {
- match result {
- Ok((events, ingestion, storage)) => {
- details.events += events;
- details.ingestion += ingestion;
- details.storage += storage;
- }
- Err(e) => {
- error!("Failed to get stats for stream: {:?}", e);
- // Continue with other streams
- }
- }
- }
-
- Ok(details)
-}
-
-async fn get_stream_stats_for_date(
- date: String,
- meta: &[ObjectStoreFormat],
-) -> Result<(u64, u64, u64), PrismHomeError> {
- let stats = fetch_daily_stats(&date, meta)?;
-
- Ok((stats.events, stats.ingestion, stats.storage))
-}
-
pub async fn generate_home_search_response(
key: &SessionKey,
query_value: &str,