From 414841c7009078f230ebdef1938216aa98a6d62b Mon Sep 17 00:00:00 2001 From: VaitaR Date: Sat, 13 Dec 2025 10:12:44 +0300 Subject: [PATCH 1/3] Update Telegram event extraction prompt and improve session handling - Updated version and description in telegram.yaml for event extraction related to TON/crypto channels. - Added critical scope filters for event extraction, including security incidents, competitor updates, and regulatory changes. - Enhanced general extraction rules and title slot extraction guidelines. - Modified docker-compose.yml to allow customization of Telegram session file path via environment variable. - Updated telegram_qr_auth.py to use the customizable session file path and ensure the directory exists. - Adjusted SQL queries in both Postgres and SQLite repositories to include 'extracted_at' in date range checks for event retrieval. --- app.py | 1422 ++++++++++++++++----------- config/prompts/telegram.yaml | 119 ++- docker-compose.yml | 1 + scripts/telegram_qr_auth.py | 6 +- src/adapters/postgres_repository.py | 6 +- src/adapters/sqlite_repository.py | 6 +- 6 files changed, 953 insertions(+), 607 deletions(-) diff --git a/app.py b/app.py index b6f4f6a..5f2fb27 100644 --- a/app.py +++ b/app.py @@ -1,602 +1,918 @@ #!/usr/bin/env python3 """ -Streamlit Demo App for Slack Event Manager. +Streamlit Dashboard for Event Manager. -This app provides a visual interface for the Slack Event Manager pipeline, -allowing users to configure settings, run the pipeline, and visualize results. +Clean read-only frontend for viewing extracted events from Slack (internal) +and Telegram (external/market) sources. """ from datetime import UTC, datetime, timedelta -from typing import Any, Final -from uuid import uuid4 +from typing import Final import pandas as pd import plotly.express as px +import plotly.graph_objects as go import streamlit as st from src.adapters.repository_factory import create_repository from src.config.settings import get_settings from src.domain.protocols import RepositoryProtocol -from src.observability.metrics import ensure_metrics_exporter -from src.presentation.streamlit_orchestration import ( - RateLimitExceededError, - job_result, - job_status, - submit_ingest_extract_job, -) -from src.use_cases.dashboard_queries import ( - fetch_recent_candidates, - fetch_recent_events, - fetch_recent_messages, -) -from src.use_cases.pipeline_orchestrator import PipelineParams - -# UI Constants -MAX_MESSAGE_LENGTH: Final[int] = 150 -MAX_CANDIDATE_TEXT_LENGTH: Final[int] = 200 -SESSION_ID_KEY: Final[str] = "session_id" -SESSION_JOB_ID_KEY: Final[str] = "active_job_id" -SESSION_JOB_RESULT_KEY: Final[str] = "last_job_result" -SESSION_STATUS_MESSAGE_KEY: Final[str] = "job_status_message" -POLL_INTERVAL_MS: Final[int] = 1500 +# ============================================================================ +# CONSTANTS +# ============================================================================ + +MAX_TITLE_LENGTH: Final[int] = 80 +MAX_SUMMARY_LENGTH: Final[int] = 150 + +# Category colors for consistent styling +CATEGORY_COLORS: Final[dict[str, str]] = { + "product": "#2ECC71", # Green + "risk": "#E74C3C", # Red + "process": "#3498DB", # Blue + "marketing": "#9B59B6", # Purple + "org": "#F39C12", # Orange + "unknown": "#95A5A6", # Gray +} -def get_repository() -> RepositoryProtocol: - """Get repository instance based on settings. - - Returns: - Repository instance (SQLite or PostgreSQL) - """ - settings = get_settings() - return create_repository(settings) - +STATUS_COLORS: Final[dict[str, str]] = { + "planned": "#BDC3C7", # Light gray + "confirmed": "#85C1E9", # Light blue + "started": "#F1C40F", # Yellow + "completed": "#27AE60", # Green + "postponed": "#E67E22", # Orange + "canceled": "#E74C3C", # Red + "rolled_back": "#8E44AD", # Purple + "updated": "#3498DB", # Blue +} -def _ensure_session_defaults() -> None: - if SESSION_ID_KEY not in st.session_state: - st.session_state[SESSION_ID_KEY] = str(uuid4()) - st.session_state.setdefault(SESSION_JOB_ID_KEY, None) - st.session_state.setdefault(SESSION_JOB_RESULT_KEY, None) - st.session_state.setdefault(SESSION_STATUS_MESSAGE_KEY, "") - - -def _require_auth() -> str: - """No authentication required - return a default user ID.""" - if not st.session_state.get(SESSION_ID_KEY): - st.session_state[SESSION_ID_KEY] = str(uuid4()) - return str(st.session_state[SESSION_ID_KEY]) - - -def _submit_pipeline_job(message_limit: int, channels: list[str], user_id: str) -> str: - params = PipelineParams(message_limit=message_limit, channel_ids=channels) - job_id = submit_ingest_extract_job(params, user_id) - st.session_state[SESSION_JOB_ID_KEY] = job_id - st.session_state[SESSION_JOB_RESULT_KEY] = None - st.session_state[SESSION_STATUS_MESSAGE_KEY] = "Job submitted" - return job_id - - -def _render_job_status(job_id: str) -> None: - status = job_status(job_id) - progress_value = float(status.get("progress", 0.0)) - st.progress(progress_value) - status_raw = str(status.get("status", "unknown")).lower() - status_text = status_raw.capitalize() - message = status.get("message") or "" - st.info(f"Status: {status_text} {message}") - - if status_raw in {"succeeded", "failed"}: - final = job_result(job_id) - st.session_state[SESSION_JOB_RESULT_KEY] = final - st.session_state[SESSION_JOB_ID_KEY] = None - st.session_state[SESSION_STATUS_MESSAGE_KEY] = status_text - if status_raw == "failed": - error_msg = status.get("error") or "Pipeline failed" - st.error(error_msg) - else: - st.success("Pipeline completed successfully") +# ============================================================================ +# DATA ACCESS +# ============================================================================ -def _render_job_summary(result: dict[str, object]) -> None: - st.subheader("Latest Pipeline Run") - correlation_id = result.get("correlation_id") - if correlation_id: - st.caption(f"Correlation ID: {correlation_id}") +@st.cache_resource +def get_repository() -> RepositoryProtocol: + """Get cached repository instance.""" + settings = get_settings() + return create_repository(settings) - ingest = result.get("ingest", {}) - extract = result.get("extract", {}) - dedup = result.get("dedup", {}) - col1, col2, col3 = st.columns(3) +@st.cache_data(ttl=30) +def fetch_events_by_source(source_id: str, limit: int = 500) -> pd.DataFrame: + """Fetch events filtered by source and convert to DataFrame.""" + repo = get_repository() + + # Get events from last 90 days to future + start_date = datetime.now(UTC) - timedelta(days=90) + end_date = datetime.now(UTC) + timedelta(days=365) + + all_events = repo.get_events_in_window(start_date, end_date) + + # Filter by source + events = [e for e in all_events if e.source_id.value == source_id][:limit] + + if not events: + return pd.DataFrame() + + data = [] + for evt in events: + data.append({ + "event_id": str(evt.event_id)[:8], + "title": evt.title[:MAX_TITLE_LENGTH] + "..." if len(evt.title) > MAX_TITLE_LENGTH else evt.title, + "full_title": evt.title, + "category": evt.category.value, + "status": evt.status.value, + "event_date": evt.event_date, + "confidence": evt.confidence, + "importance": evt.importance, + "summary": (evt.summary[:MAX_SUMMARY_LENGTH] + "...") if evt.summary and len(evt.summary) > MAX_SUMMARY_LENGTH else (evt.summary or ""), + "full_summary": evt.summary or "", + "environment": evt.environment.value if evt.environment else "unknown", + "source_id": evt.source_id.value, + "links": evt.links[:3] if evt.links else [], + "change_type": evt.change_type.value if evt.change_type else "other", + }) + + df = pd.DataFrame(data) + + # Ensure event_date is datetime + if "event_date" in df.columns: + df["event_date"] = pd.to_datetime(df["event_date"], utc=True) + + return df + + +# ============================================================================ +# FILTER COMPONENTS +# ============================================================================ + +def render_filters(df: pd.DataFrame, key_prefix: str) -> pd.DataFrame: + """Render filter controls and return filtered DataFrame.""" + + if df.empty: + return df + + # Create filter columns + col1, col2, col3, col4 = st.columns(4) + with col1: - st.metric( - "Messages Saved", - ingest.get("messages_saved", 0), - help="Total messages persisted during ingestion", + # Category filter + categories = ["All"] + sorted(df["category"].unique().tolist()) + selected_category = st.selectbox( + "πŸ“‚ Category", + options=categories, + key=f"{key_prefix}_category" ) + with col2: - st.metric( - "Events Extracted", - extract.get("events_extracted", 0), - help="Events produced by the extraction stage", + # Status filter + statuses = ["All"] + sorted(df["status"].unique().tolist()) + selected_status = st.selectbox( + "πŸ“Š Status", + options=statuses, + key=f"{key_prefix}_status" ) + with col3: - st.metric( - "Final Events", - dedup.get("total_events", 0), - help="Events remaining after deduplication", - ) - - -# Page configuration -st.set_page_config( - page_title="Slack Event Manager", - page_icon="πŸ“…", - layout="wide", - initial_sidebar_state="expanded", -) - -# Custom CSS for better styling -st.markdown( - """ - -""", - unsafe_allow_html=True, -) - - -def main(): - """Main application function.""" - - ensure_metrics_exporter() - _ensure_session_defaults() - user_id = _require_auth() - - # Header - st.markdown( - '

πŸ“… Slack Event Manager

', unsafe_allow_html=True - ) - st.markdown( - "Visual interface for processing Slack messages and extracting structured events." - ) - - # Sidebar configuration - with st.sidebar: - st.header("βš™οΈ Configuration") - - # Load settings - settings = get_settings() - - # Basic settings - message_limit = st.slider( - "Message Limit", - min_value=5, + # Importance filter + min_importance = st.slider( + "⭐ Min Importance", + min_value=0, max_value=100, - value=20, - help="Number of recent messages to fetch from each channel", - ) - - # Channel selection from config - # Create mapping: channel_name -> channel_id - channel_options = { - f"{ch.channel_name} ({ch.channel_id})": ch.channel_id - for ch in settings.slack_channels - } - - # Use channel names as display options - selected_channel_names = st.multiselect( - "Channels", - options=list(channel_options.keys()), - default=list(channel_options.keys()), # All channels by default - help="Select Slack channels to process", + value=0, + key=f"{key_prefix}_importance" ) - - # Convert back to channel IDs - channels = [channel_options[name] for name in selected_channel_names] - - # Show database info - if settings.database_type == "postgres": - st.info( - "🐘 PostgreSQL: " - f"{settings.postgres_user}@{settings.postgres_host}:" - f"{settings.postgres_port}/{settings.postgres_database}" - ) - else: - st.info(f"πŸ“ SQLite: {settings.db_path}") - - # Run pipeline button - run_pipeline = st.button( - "πŸš€ Run Pipeline", type="primary", use_container_width=True + + with col4: + # Confidence filter + min_confidence = st.slider( + "🎯 Min Confidence", + min_value=0.0, + max_value=1.0, + value=0.0, + step=0.1, + key=f"{key_prefix}_confidence" ) - - if run_pipeline: - try: - job_id = _submit_pipeline_job(message_limit, channels, user_id) - st.success(f"Pipeline job submitted (ID: {job_id[:8]}…)") - except RateLimitExceededError as exc: - st.error(str(exc)) - - active_job_id = st.session_state.get(SESSION_JOB_ID_KEY) - if active_job_id: - st.autorefresh(interval=POLL_INTERVAL_MS, key=f"poll_{active_job_id}") - _render_job_status(active_job_id) - else: - latest_result = st.session_state.get(SESSION_JOB_RESULT_KEY) - if isinstance(latest_result, dict): - _render_job_summary(latest_result) - elif st.session_state.get(SESSION_STATUS_MESSAGE_KEY): - st.info(st.session_state[SESSION_STATUS_MESSAGE_KEY]) - - show_database_inspection() - - -def fetch_channel_messages( - slack_client: Any, *, channels: list[str], limit: int | None -) -> list[tuple[str, dict[str, Any]]]: - """Fetch messages for each channel while preserving attribution.""" - - channel_messages: list[tuple[str, dict[str, Any]]] = [] - for channel in channels: - messages = slack_client.fetch_messages(channel_id=channel, limit=limit) - channel_messages.extend((channel, message) for message in messages) - return channel_messages - - -def show_pipeline_results(): - """Show detailed results from the pipeline.""" - - st.header("πŸ“Š Pipeline Results") - - # Database inspection using repository - try: - # Create tabs for different views - tab1, tab2, tab3, tab4 = st.tabs( - ["πŸ“¨ Messages", "🎯 Candidates", "πŸ“ Events", "πŸ“ˆ Timeline"] + + # Second row: search and date range + col5, col6 = st.columns([2, 2]) + + with col5: + search_query = st.text_input( + "πŸ” Search in titles", + placeholder="Type to search...", + key=f"{key_prefix}_search" ) - - with tab1: - show_messages_table() - - with tab2: - show_candidates_table() - - with tab3: - show_events_table() - - with tab4: - show_gantt_chart() - - except Exception as e: - st.error(f"Error reading database: {str(e)}") - - -def show_database_inspection(): - """Show database inspection when pipeline hasn't been run.""" - - st.header("πŸ” Database Inspection") - - try: - # Use repository for inspection - show_pipeline_results() - except Exception as e: - st.error(f"Error reading database: {str(e)}") - - -def show_messages_table(): - """Display messages table.""" - - st.subheader("πŸ“¨ Raw Messages") - - try: - repo = get_repository() - settings = get_settings() - - if settings.database_type == "postgres": - st.caption( - "🐘 Source: PostgreSQL (" - f"{settings.postgres_host}:{settings.postgres_port}/" - f"{settings.postgres_database})" + + with col6: + # Date range filter + if df["event_date"].notna().any(): + min_date = df["event_date"].min().date() + max_date = df["event_date"].max().date() + + date_range = st.date_input( + "πŸ“… Date Range", + value=(min_date, max_date), + min_value=min_date, + max_value=max_date, + key=f"{key_prefix}_date_range" ) else: - st.caption(f"πŸ“ Source: SQLite ({settings.db_path})") - - messages = fetch_recent_messages(repository=repo, limit=100) - - if not messages: - st.info("No messages found.") - return - - # Convert to DataFrame - messages_data = [] - for msg in messages: - messages_data.append( - { - "message_id": msg.message_id, - "text": msg.text[:MAX_MESSAGE_LENGTH] + "..." - if len(msg.text) > MAX_MESSAGE_LENGTH - else msg.text, - "ts": msg.ts_dt, - "user_real_name": msg.user_real_name or "", - "user_email": msg.user_email or "", - "total_reactions": msg.total_reactions or 0, - "reply_count": msg.reply_count or 0, - "attachments_count": msg.attachments_count or 0, - "files_count": msg.files_count or 0, - "permalink": msg.permalink or "", - "edited_ts": msg.edited_ts, - "edited": msg.edited_ts is not None, - } - ) - - messages_df = pd.DataFrame(messages_data) + date_range = None + + # Apply filters + filtered_df = df.copy() + + if selected_category != "All": + filtered_df = filtered_df[filtered_df["category"] == selected_category] + + if selected_status != "All": + filtered_df = filtered_df[filtered_df["status"] == selected_status] + + filtered_df = filtered_df[filtered_df["importance"] >= min_importance] + filtered_df = filtered_df[filtered_df["confidence"] >= min_confidence] + + if search_query: + mask = filtered_df["full_title"].str.contains(search_query, case=False, na=False) + filtered_df = filtered_df[mask] + + if date_range and len(date_range) == 2 and filtered_df["event_date"].notna().any(): + start, end = date_range + # Convert to datetime for comparison + start_dt = pd.Timestamp(start, tz="UTC") + end_dt = pd.Timestamp(end, tz="UTC") + pd.Timedelta(days=1) + mask = (filtered_df["event_date"] >= start_dt) & (filtered_df["event_date"] < end_dt) + filtered_df = filtered_df[mask] + + return filtered_df + + +# ============================================================================ +# TABLE COMPONENTS +# ============================================================================ + +def render_events_table(df: pd.DataFrame, key_prefix: str) -> None: + """Render events table with proper column configuration.""" + + if df.empty: + st.info("No events match the selected filters.") + return + + # Show count + st.caption(f"Showing {len(df)} events") + + # Prepare display DataFrame + display_df = df[[ + "title", "category", "status", "event_date", + "importance", "confidence", "environment", "change_type" + ]].copy() + + # Format date for display + display_df["event_date"] = display_df["event_date"].dt.strftime("%Y-%m-%d %H:%M") + + st.dataframe( + display_df, + use_container_width=True, + hide_index=True, + column_config={ + "title": st.column_config.TextColumn( + "πŸ“Œ Title", + width="large", + help="Event title" + ), + "category": st.column_config.TextColumn( + "πŸ“‚ Category", + width="small", + ), + "status": st.column_config.TextColumn( + "πŸ“Š Status", + width="small", + ), + "event_date": st.column_config.TextColumn( + "πŸ“… Date", + width="medium", + ), + "importance": st.column_config.ProgressColumn( + "⭐ Importance", + min_value=0, + max_value=100, + format="%d", + ), + "confidence": st.column_config.ProgressColumn( + "🎯 Confidence", + min_value=0, + max_value=1, + format="%.1f", + ), + "environment": st.column_config.TextColumn( + "🌍 Env", + width="small", + ), + "change_type": st.column_config.TextColumn( + "πŸ”„ Type", + width="small", + ), + }, + ) - if messages_df.empty: - st.info("No messages found.") - return - st.dataframe( - messages_df, - use_container_width=True, - column_config={ - "message_id": st.column_config.TextColumn("Message ID", width="medium"), - "text": st.column_config.TextColumn("Text", width="large"), - "ts": st.column_config.DatetimeColumn( - "Timestamp", format="YYYY-MM-DD HH:mm:ss" - ), - "user_real_name": st.column_config.TextColumn("User", width="medium"), - "user_email": st.column_config.TextColumn("Email", width="medium"), - "total_reactions": st.column_config.NumberColumn( - "πŸ‘ Reactions", width="small" - ), - "reply_count": st.column_config.NumberColumn( - "πŸ’¬ Replies", width="small" - ), - "attachments_count": st.column_config.NumberColumn( - "πŸ“Ž Files", width="small" - ), - "files_count": st.column_config.NumberColumn("πŸ“„ Docs", width="small"), - "permalink": st.column_config.LinkColumn("πŸ”— Link", width="small"), - "edited": st.column_config.CheckboxColumn("✏️ Edited", width="small"), - }, - hide_index=True, +def render_metrics(df: pd.DataFrame, source_name: str) -> None: + """Render summary metrics for events.""" + + col1, col2, col3, col4, col5 = st.columns(5) + + with col1: + st.metric("πŸ“Š Total", len(df)) + + with col2: + risk_count = len(df[df["category"] == "risk"]) if not df.empty else 0 + st.metric("🚨 Risks", risk_count) + + with col3: + product_count = len(df[df["category"] == "product"]) if not df.empty else 0 + st.metric("πŸš€ Product", product_count) + + with col4: + active_count = len(df[df["status"].isin(["started", "planned"])]) if not df.empty else 0 + st.metric("⚑ Active", active_count) + + with col5: + avg_importance = df["importance"].mean() if not df.empty else 0 + st.metric("⭐ Avg Importance", f"{avg_importance:.0f}") + + +# ============================================================================ +# TIMELINE COMPONENT (Simple and Working) +# ============================================================================ + +def render_timeline(df: pd.DataFrame, key_prefix: str) -> None: + """Render timeline visualization using px.timeline (Gantt-style).""" + + if df.empty or df["event_date"].isna().all(): + st.info("No events with dates to display on timeline.") + return + + # Filter events with valid dates + chart_df = df[df["event_date"].notna()].copy() + + if chart_df.empty: + st.info("No events with dates to display on timeline.") + return + + # Category filter + available_categories = sorted(chart_df["category"].unique().tolist()) + selected_categories = st.multiselect( + "πŸ“‚ Filter by category", + options=available_categories, + default=available_categories, + key=f"{key_prefix}_timeline_categories", + ) + + if not selected_categories: + st.warning("Select at least one category to display.") + return + + chart_df = chart_df[chart_df["category"].isin(selected_categories)] + + # Sort by date + chart_df = chart_df.sort_values("event_date", ascending=False) + + # Limit to avoid overloading + if len(chart_df) > 30: + st.caption(f"Showing top 30 events out of {len(chart_df)}") + chart_df = chart_df.head(30) + + # Create proper start/end for Gantt + chart_df["Start"] = chart_df["event_date"] + chart_df["End"] = chart_df["event_date"] + pd.Timedelta(days=1) + chart_df["Task"] = chart_df["title"].str[:60] + + # Create Gantt chart + fig = px.timeline( + chart_df, + x_start="Start", + x_end="End", + y="Task", + color="category", + color_discrete_map=CATEGORY_COLORS, + hover_data=["status", "importance", "confidence"], + title="πŸ“… Events Timeline", + ) + + # Add TODAY marker + today = datetime.now(UTC) + fig.add_vline( + x=today.timestamp() * 1000, # Convert to milliseconds for plotly + line_dash="dash", + line_color="red", + line_width=2, + annotation_text="TODAY", + annotation_position="top", + ) + + fig.update_layout( + height=max(400, len(chart_df) * 28), + xaxis_title="", + yaxis_title="", + showlegend=True, + legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="left", x=0), + ) + + fig.update_yaxes(automargin=True, tickfont=dict(size=11)) + + st.plotly_chart(fig, use_container_width=True, key=f"{key_prefix}_timeline") + + +# ============================================================================ +# CALENDAR VIEW COMPONENT +# ============================================================================ + +def render_calendar_view(df: pd.DataFrame, key_prefix: str) -> None: + """Render calendar view similar to Google Calendar.""" + + if df.empty or df["event_date"].isna().all(): + st.info("No events with dates to display.") + return + + chart_df = df[df["event_date"].notna()].copy() + + if chart_df.empty: + st.info("No events with dates to display.") + return + + # Get date range + min_date = chart_df["event_date"].min() + max_date = chart_df["event_date"].max() + + # Date navigation + col1, col2, col3 = st.columns([1, 2, 1]) + + with col2: + # Month selector + today = datetime.now(UTC) + available_months = pd.date_range( + start=min_date.replace(day=1), + end=max_date.replace(day=1) + pd.DateOffset(months=1), + freq='MS' ) - - # Show summary statistics - col1, col2, col3, col4 = st.columns(4) + month_options = [d.strftime("%B %Y") for d in available_months] + current_month_str = today.strftime("%B %Y") + default_idx = month_options.index(current_month_str) if current_month_str in month_options else 0 + + selected_month = st.selectbox( + "πŸ“… Select Month", + options=month_options, + index=default_idx, + key=f"{key_prefix}_calendar_month" + ) + + # Parse selected month + selected_date = pd.to_datetime(selected_month, format="%B %Y") + month_start = selected_date.replace(day=1, tzinfo=UTC) + month_end = (month_start + pd.DateOffset(months=1) - pd.Timedelta(days=1)).replace(tzinfo=UTC) + + # Filter events for selected month + month_events = chart_df[ + (chart_df["event_date"] >= month_start) & + (chart_df["event_date"] <= month_end) + ] + + # Create calendar grid + st.markdown("---") + + # Day headers + days = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"] + cols = st.columns(7) + for i, day in enumerate(days): + cols[i].markdown(f"**{day}**") + + # Get first day of month and calculate offset + first_day = month_start + first_weekday = first_day.weekday() # Monday = 0 + + # Calculate total days in month + days_in_month = (month_end - month_start).days + 1 + + # Create weeks + current_day = 1 + week_num = 0 + + while current_day <= days_in_month: + cols = st.columns(7) + + for weekday in range(7): + # Skip days before month starts + if week_num == 0 and weekday < first_weekday: + cols[weekday].write("") + continue + + if current_day > days_in_month: + cols[weekday].write("") + continue + + # Current date + current_date = month_start + pd.Timedelta(days=current_day - 1) + + # Get events for this day + day_events = month_events[ + month_events["event_date"].dt.date == current_date.date() + ] + + # Build day cell content + is_today = current_date.date() == today.date() + day_style = "πŸ”΄ " if is_today else "" + + with cols[weekday]: + # Day number + if is_today: + st.markdown(f"**{day_style}{current_day}**") + else: + st.markdown(f"**{current_day}**") + + # Events for this day + for _, event in day_events.iterrows(): + cat_emoji = { + "product": "πŸš€", + "risk": "🚨", + "process": "βš™οΈ", + "marketing": "πŸ“£", + "org": "πŸ‘₯", + "unknown": "❓" + }.get(event["category"], "πŸ“Œ") + + # Truncate title for display + title_short = event["title"][:12] + "…" if len(event["title"]) > 12 else event["title"] + + # Use native Streamlit popover + with st.popover(f"{cat_emoji} {title_short}", use_container_width=True): + st.markdown(f"**{event['full_title']}**") + st.caption(f"πŸ“‚ {event['category'].title()} | πŸ“Š {event['status'].title()}") + st.caption(f"⭐ Importance: {event['importance']} | 🎯 Confidence: {event['confidence']:.0%}") + st.markdown("---") + st.write(event['summary'][:200] + "..." if len(str(event['summary'])) > 200 else event['summary']) + + current_day += 1 + + week_num += 1 + + # Legend + st.markdown("---") + legend_items = " | ".join([ + f"{emoji} {cat.title()}" + for cat, emoji in { + "product": "πŸš€", + "risk": "🚨", + "process": "βš™οΈ", + "marketing": "πŸ“£", + "org": "πŸ‘₯", + }.items() + ]) + st.caption(f"Legend: {legend_items}") + + # Summary + st.caption(f"πŸ“Š {len(month_events)} events in {selected_month}") + + +# ============================================================================ +# EVENT LIST VIEW +# ============================================================================ + +def render_event_list(df: pd.DataFrame, key_prefix: str) -> None: + """Render events as a list grouped by date.""" + + if df.empty or df["event_date"].isna().all(): + st.info("No events to display.") + return + + chart_df = df[df["event_date"].notna()].copy() + chart_df = chart_df.sort_values("event_date", ascending=False) + + # Group by date + chart_df["date_str"] = chart_df["event_date"].dt.strftime("%A, %B %d, %Y") + + for date_str, group in chart_df.groupby("date_str", sort=False): + st.markdown(f"### πŸ“… {date_str}") + + for _, event in group.iterrows(): + cat_color = CATEGORY_COLORS.get(event["category"], "#95A5A6") + cat_emoji = { + "product": "πŸš€", + "risk": "🚨", + "process": "βš™οΈ", + "marketing": "πŸ“£", + "org": "πŸ‘₯", + "unknown": "❓" + }.get(event["category"], "πŸ“Œ") + + status_emoji = { + "planned": "πŸ“‹", + "started": "▢️", + "completed": "βœ…", + "canceled": "❌", + }.get(event["status"], "πŸ“Œ") + + with st.container(): + st.markdown( + f'
' + f'
{cat_emoji} {event["full_title"]}
' + f'
' + f'{status_emoji} {event["status"].title()} | ' + f'⭐ {event["importance"]} | ' + f'🎯 {event["confidence"]:.0%}' + f'
' + f'
{event["summary"][:150]}...
' + f'
', + unsafe_allow_html=True + ) + + st.markdown("") + + +# ============================================================================ +# CATEGORY DISTRIBUTION CHART +# ============================================================================ + +def render_category_chart(df: pd.DataFrame, key_prefix: str) -> None: + """Render category distribution pie chart.""" + + if df.empty: + return + + category_counts = df["category"].value_counts() + + fig = px.pie( + values=category_counts.values, + names=category_counts.index, + title="Distribution by Category", + color=category_counts.index, + color_discrete_map=CATEGORY_COLORS, + hole=0.4, + ) + + fig.update_layout( + height=300, + margin=dict(l=20, r=20, t=40, b=20), + showlegend=True, + legend=dict(orientation="h", yanchor="bottom", y=-0.2), + ) + + st.plotly_chart(fig, use_container_width=True, key=f"{key_prefix}_pie") + + +def render_status_chart(df: pd.DataFrame, key_prefix: str) -> None: + """Render status distribution bar chart.""" + + if df.empty: + return + + status_counts = df["status"].value_counts() + + fig = px.bar( + x=status_counts.values, + y=status_counts.index, + orientation="h", + title="Distribution by Status", + color=status_counts.index, + color_discrete_map=STATUS_COLORS, + ) + + fig.update_layout( + height=300, + margin=dict(l=20, r=20, t=40, b=20), + showlegend=False, + xaxis_title="Count", + yaxis_title="", + ) + + st.plotly_chart(fig, use_container_width=True, key=f"{key_prefix}_bar") + + +# ============================================================================ +# SOURCE TABS +# ============================================================================ + +def render_source_tab(source_id: str, source_name: str, source_emoji: str, description: str) -> None: + """Render complete tab for a single source.""" + + st.markdown(f"### {source_emoji} {source_name}") + st.caption(description) + + # Fetch data + df = fetch_events_by_source(source_id) + + if df.empty: + st.warning(f"No events found from {source_name}.") + return + + # Metrics row + render_metrics(df, source_name) + + st.divider() + + # Filters + st.markdown("#### πŸŽ›οΈ Filters") + filtered_df = render_filters(df, key_prefix=source_id) + + st.divider() + + # Sub-tabs for table and visualizations + tab_table, tab_timeline, tab_calendar, tab_list, tab_analytics = st.tabs([ + "πŸ“‹ Table", + "πŸ“Š Timeline", + "πŸ“… Calendar", + "πŸ“ List", + "πŸ“ˆ Analytics" + ]) + + with tab_table: + render_events_table(filtered_df, key_prefix=source_id) + + with tab_timeline: + render_timeline(filtered_df, key_prefix=source_id) + + with tab_calendar: + render_calendar_view(filtered_df, key_prefix=source_id) + + with tab_list: + render_event_list(filtered_df, key_prefix=source_id) + + with tab_analytics: + col1, col2 = st.columns(2) with col1: - st.metric("Total Messages", len(messages_df)) + render_category_chart(filtered_df, key_prefix=source_id) with col2: - st.metric("Total Reactions", int(messages_df["total_reactions"].sum())) - with col3: - st.metric("Total Replies", int(messages_df["reply_count"].sum())) - with col4: - edited_count = ( - messages_df["edited_ts"].notna().sum() - if "edited_ts" in messages_df.columns - else 0 - ) - st.metric("Edited Messages", int(edited_count)) - - except Exception as e: - st.error(f"Error loading messages: {str(e)}") - - -def show_candidates_table(): - """Display candidates table.""" - - st.subheader("🎯 Event Candidates") + render_status_chart(filtered_df, key_prefix=source_id) - try: - repo = get_repository() - candidates = fetch_recent_candidates(repository=repo, limit=100) - if not candidates: - st.info("No candidates found.") - return - - # Convert to DataFrame - candidates_data = [] - for cand in candidates: - candidates_data.append( - { - "message_id": cand.message_id, - "text_norm": cand.text_norm[:MAX_CANDIDATE_TEXT_LENGTH] + "..." - if len(cand.text_norm) > MAX_CANDIDATE_TEXT_LENGTH - else cand.text_norm, - "score": cand.score, - "status": cand.status.value, - "features_json": str( - cand.features.model_dump() if cand.features else {} - ), - } - ) +# ============================================================================ +# MAIN APPLICATION +# ============================================================================ - candidates_df = pd.DataFrame(candidates_data) - - if candidates_df.empty: - st.info("No candidates found.") - return - - st.dataframe( - candidates_df, - use_container_width=True, - column_config={ - "message_id": st.column_config.TextColumn("Message ID", width="medium"), - "text_norm": st.column_config.TextColumn("Text", width="large"), - "score": st.column_config.NumberColumn("Score", format="%.2f"), - "status": st.column_config.TextColumn("Status", width="small"), - "features_json": st.column_config.TextColumn( - "Features", width="medium" - ), - }, +def main(): + """Main application entry point.""" + + # Page configuration + st.set_page_config( + page_title="Event Manager", + page_icon="πŸ“…", + layout="wide", + initial_sidebar_state="collapsed", + ) + + # Custom CSS + st.markdown(""" + + """, unsafe_allow_html=True) + + # Header + st.markdown('

πŸ“… Event Manager Dashboard

', unsafe_allow_html=True) + st.caption("Centralized view of internal (Slack) and external (Telegram) events") + + # Sidebar with system info + with st.sidebar: + st.header("ℹ️ System Info") + + settings = get_settings() + + # Database info + if settings.database_type == "postgres": + st.success(f"🐘 PostgreSQL: {settings.postgres_database}") + else: + st.info(f"πŸ“ SQLite: {settings.db_path}") + + # Quick stats + st.divider() + st.subheader("πŸ“Š Quick Stats") + + slack_df = fetch_events_by_source("slack") + telegram_df = fetch_events_by_source("telegram") + + st.metric("Slack Events", len(slack_df)) + st.metric("Telegram Events", len(telegram_df)) + st.metric("Total Events", len(slack_df) + len(telegram_df)) + + # Refresh button + st.divider() + if st.button("πŸ”„ Refresh Data", use_container_width=True): + st.cache_data.clear() + st.rerun() + + # Main content - source tabs + tab_slack, tab_telegram, tab_all = st.tabs([ + "🏒 Internal Events (Slack)", + "🌍 External Events (Telegram)", + "πŸ“Š All Events" + ]) + + with tab_slack: + render_source_tab( + source_id="slack", + source_name="Internal Events", + source_emoji="🏒", + description="Events from internal Slack channels: releases, deployments, incidents, team updates" ) - - st.caption(f"Total candidates: {len(candidates_df)}") - - except Exception as e: - st.error(f"Error loading candidates: {str(e)}") - - -def show_events_table(): - """Display events table.""" - - st.subheader("πŸ“ Extracted Events") - - try: - repo = get_repository() - events = fetch_recent_events(repository=repo, limit=100) - - if not events: - st.info("No events found.") - return - - # Convert to DataFrame with new event structure - events_data = [] - for evt in events: - events_data.append( - { - "event_id": evt.event_id, - "message_id": evt.message_id, - "title": evt.title, # Property that renders from slots - "category": evt.category.value, - "status": evt.status.value, - "event_date": evt.event_date, # Property that returns first non-None time - "confidence": evt.confidence, - "importance": evt.importance, - "cluster_key": evt.cluster_key, - "dedup_key": evt.dedup_key or "", - } - ) - - events_df = pd.DataFrame(events_data) - - if events_df.empty: - st.info("No events found.") - return - - st.dataframe( - events_df, - use_container_width=True, - column_config={ - "event_id": st.column_config.TextColumn("Event ID", width="medium"), - "title": st.column_config.TextColumn("Title", width="large"), - "category": st.column_config.TextColumn("Category", width="small"), - "status": st.column_config.TextColumn("Status", width="small"), - "event_date": st.column_config.DatetimeColumn( - "Date", format="YYYY-MM-DD HH:mm" - ), - "confidence": st.column_config.NumberColumn( - "Confidence", format="%.2f" - ), - "importance": st.column_config.NumberColumn( - "Importance", width="small" - ), - "message_id": st.column_config.TextColumn( - "Source Message", width="medium" - ), - "cluster_key": st.column_config.TextColumn( - "Cluster Key", width="small" - ), - "dedup_key": st.column_config.TextColumn("Dedup Key", width="small"), - }, - hide_index=True, + + with tab_telegram: + render_source_tab( + source_id="telegram", + source_name="External Events", + source_emoji="🌍", + description="Events from Telegram channels: market news, competitor updates, industry trends" ) - - st.caption(f"Total events: {len(events_df)}") - - except Exception as e: - st.error(f"Error loading events: {str(e)}") - - -def show_gantt_chart(): - """Display Gantt chart visualization.""" - - st.subheader("πŸ“ˆ Events Timeline (Gantt Chart)") - - try: - # Get repository and fetch events with dates - repo = get_repository() - - start_date = datetime.now(UTC) - timedelta(days=90) - end_date = datetime.now(UTC) + timedelta(days=365) - events = repo.get_events_in_window(start_date, end_date) - - if not events: - st.info("No events with dates found for timeline.") - return - - # Convert to DataFrame - only events with dates - events_data = [] - for evt in events: - if evt.event_date: - events_data.append( - { - "title": evt.title, # Property that renders from slots - "category": evt.category.value, - "event_date": evt.event_date, # Property that returns first non-None time - } - ) - - events_df = pd.DataFrame(events_data) - - if events_df.empty: - st.info("No events with dates found for timeline.") + + with tab_all: + st.markdown("### πŸ“Š All Events Combined") + st.caption("Overview of all events from both sources") + + # Combine both sources + slack_df = fetch_events_by_source("slack") + telegram_df = fetch_events_by_source("telegram") + + if slack_df.empty and telegram_df.empty: + st.warning("No events found in any source.") return - - # Create Gantt chart - fig = px.timeline( - events_df, - x_start=events_df["event_date"], - x_end=events_df["event_date"] + pd.Timedelta(days=1), # Events span 1 day - y="title", - color="category", - title="Events Timeline", - labels={"event_date": "Date"}, - color_discrete_sequence=px.colors.qualitative.Set3, - ) - - # Update layout - fig.update_layout( - xaxis_title="Timeline", - yaxis_title="Events", - showlegend=True, - height=max( - 400, len(events_df) * 30 - ), # Dynamic height based on number of events - ) - - # Update y-axis to show full titles - fig.update_yaxes(automargin=True) - - st.plotly_chart(fig, use_container_width=True) - - # Summary stats - col1, col2, col3 = st.columns(3) + + combined_df = pd.concat([slack_df, telegram_df], ignore_index=True) + + # Metrics + col1, col2, col3, col4, col5 = st.columns(5) with col1: - st.metric("Total Events", len(events_df)) + st.metric("πŸ“Š Total", len(combined_df)) with col2: - st.metric("Categories", events_df["category"].nunique()) + st.metric("🏒 Slack", len(slack_df)) with col3: - date_range = ( - events_df["event_date"].max() - events_df["event_date"].min() - ).days - st.metric("Date Range", f"{date_range} days") - - except Exception as e: - st.error(f"Error creating Gantt chart: {str(e)}") + st.metric("🌍 Telegram", len(telegram_df)) + with col4: + risk_count = len(combined_df[combined_df["category"] == "risk"]) + st.metric("🚨 Risks", risk_count) + with col5: + avg_imp = combined_df["importance"].mean() if not combined_df.empty else 0 + st.metric("⭐ Avg Importance", f"{avg_imp:.0f}") + + st.divider() + + # Filters + st.markdown("#### πŸŽ›οΈ Filters") + filtered_df = render_filters(combined_df, key_prefix="all") + + st.divider() + + # Content tabs + tab_table, tab_timeline, tab_calendar, tab_list, tab_analytics = st.tabs([ + "πŸ“‹ Table", + "πŸ“Š Timeline", + "πŸ“… Calendar", + "πŸ“ List", + "πŸ“ˆ Analytics" + ]) + + with tab_table: + render_events_table(filtered_df, key_prefix="all") + + with tab_timeline: + render_timeline(filtered_df, key_prefix="all") + + with tab_calendar: + render_calendar_view(filtered_df, key_prefix="all") + + with tab_list: + render_event_list(filtered_df, key_prefix="all") + + with tab_analytics: + col1, col2 = st.columns(2) + with col1: + render_category_chart(filtered_df, key_prefix="all") + with col2: + render_status_chart(filtered_df, key_prefix="all") if __name__ == "__main__": diff --git a/config/prompts/telegram.yaml b/config/prompts/telegram.yaml index c2a3c99..a7838c8 100644 --- a/config/prompts/telegram.yaml +++ b/config/prompts/telegram.yaml @@ -1,5 +1,5 @@ -version: "20250215.1" -description: "Telegram event extraction prompt" +version: "20251212.1" +description: "Telegram event extraction for TON/crypto channels (security, competitor/product, regulation) with clean links" system: | You are an event extraction assistant for Telegram channel messages. @@ -7,6 +7,46 @@ system: | - INPUT: May be in Russian or English - OUTPUT: ALL fields (action, qualifiers, stroke, object_name_raw, summary, etc.) MUST BE IN ENGLISH + ========================= + SCOPE FILTER (CRITICAL) + ========================= + You ONLY extract events that are relevant to: + - TON / Telegram ecosystem (products, wallets, bots, payments) + - Crypto / digital asset wallets, exchanges, payments, on/off-ramp + - Regulation and security for the above + + And the message MUST fall into at least one of these buckets: + + 1) security_incident: + - Hacks, exploits, breaches, theft of funds, critical vulnerabilities, compromised keys, bridge exploits. + - Examples (EN/RU): "hack", "exploit", "breach", "security incident", "funds stolen", "drained", + "Π²Π·Π»ΠΎΠΌ", "эксплойт", "ΡƒΠ³Π½Π°Π»ΠΈ срСдства", "слив ΠΏΡ€ΠΈΠ²Π°Ρ‚Π½Ρ‹Ρ… ΠΊΠ»ΡŽΡ‡Π΅ΠΉ", "Π±Ρ€ΠΈΠΆΠ΄ Π²Π·Π»ΠΎΠΌΠ°Π»ΠΈ", "ΠΏΡ€ΠΎΡ‚ΠΎΠΊΠΎΠ» Π·Π°Π΄Ρ€Π΅ΠΉΠ½ΠΈΠ»ΠΈ". + + 2) competitor_update: + - Product / business changes of other wallets, exchanges, payment apps or custodial services that may compete with wallet.tg. + - Include: new features, supported assets, cards, P2P, staking, fees, geographies, integrations with Telegram or messengers. + - Treat as competitor any product that offers: custody of crypto/stablecoins, payments, P2P, on/off-ramp, cards, swaps, + especially if it has a Telegram bot or is used inside messengers. + - Examples (EN/RU): "launched a new wallet", "support USDT on Tron", "new crypto card", "P2P service", + "запустили ΠΊΠΎΡˆΠ΅Π»Ρ‘ΠΊ", "ΠΊΠΎΡˆΠ΅Π»Ρ‘ΠΊ Π² Telegram", "Π±ΠΎΡ‚ для ΠΊΡ€ΠΈΠΏΡ‚ΠΎΠΏΠ»Π°Ρ‚Π΅ΠΆΠ΅ΠΉ", "ΠΏΠΎΠ΄Π΄Π΅Ρ€ΠΆΠΊΠ° USDT", "запуск P2P", "новая ΠΊΠ°Ρ€Ρ‚Π°". + + 3) regulation: + - Laws, draft bills, regulatory guidance, enforcement actions, sanctions, licensing rules that affect: + crypto, stablecoins, custodial wallets, payments, KYC/AML, on/off-ramp, Telegram-based fintech. + - Examples (EN/RU): "regulator", "central bank", "FATF", "OFAC", "sanctions", "licensing", "KYC", "AML", + "Π·Π°ΠΊΠΎΠ½ΠΎΠΏΡ€ΠΎΠ΅ΠΊΡ‚", "рСгулятор", "Π¦Π‘", "лицСнзия", "ΠΎΡ‚Π·Ρ‹Π² Π»ΠΈΡ†Π΅Π½Π·ΠΈΠΈ", "AML", "KYC", "Π·Π°ΠΏΡ€Π΅Ρ‚ ΠΊΡ€ΠΈΠΏΡ‚ΠΎΠ²Π°Π»ΡŽΡ‚", "санкции Π½Π° стСйблкоины". + + If a message does NOT clearly match at least one of these three buckets, + you MUST respond with: + { + "is_event": false, + "overflow_note": null, + "events": [] + } + + ========================= + GENERAL EXTRACTION RULES + ========================= Your task: Extract 0 to 5 independent events from a Telegram message with structured title slots. Core Rules: @@ -17,13 +57,32 @@ system: | 5. If >5 events exist, pick top 5 by specificity (clear dates/anchors), note rest in overflow_note Categories: - - product: releases, features, deployments, launches + - product: releases, features, deployments, launches (incl. competitor product updates) - process: internal processes, workflows, policies - marketing: campaigns, promotions, announcements - - risk: incidents, issues, compliance, security + - risk: incidents, issues, compliance, security (incl. hacks, regulatory risk) - org: organizational changes, hiring, team updates - unknown: unclear or doesn't fit + Additional high-level topic (CRITICAL): + - topic_type: MUST be one of: + ["security_incident", "competitor_update", "regulation"] + + Mapping guidelines: + - security_incident: + topic_type = "security_incident" + category = "risk" + change_type = "incident" + severity = "sev1" (large loss / critical compromise), "sev2", "sev3", or "info" + - competitor_update: + topic_type = "competitor_update" + category = "product" (or "marketing"/"process" if mostly about pricing, campaigns or T&C changes) + change_type = "launch" | "deploy" | "policy" | "campaign" | "other" depending on content + - regulation: + topic_type = "regulation" + category = "risk" or "process" (pick the best fit) + change_type = "policy" + Title Slot Extraction (CRITICAL): Extract these slots that will be used to generate canonical title: @@ -63,11 +122,14 @@ system: | Content Fields: - summary: 1-3 sentences (max 320 chars). What changed and why it matters. - - why_it_matters: 1 line (max 160 chars) or null. Impact/reason for reader. + For competitor_update: emphasize what the competitor launched/changed and why this matters for a Telegram wallet product. + For security_incident: emphasize what was hacked, scale of loss, type of vulnerability. + For regulation: emphasize jurisdiction, asset types affected, and potential impact on wallets/crypto/payments. + - why_it_matters: 1 line (max 160 chars) or null. Impact for wallet.tg perspective (users, markets, compliance, risk). - links: Array of URLs (max 3) - anchors: Array of identifiers found (Jira keys, PR numbers, version tags) - - impact_area: Systems/components affected (max 3): ["authentication", "payments", "mobile-app"] - - impact_type: Types of impact: ["perf_degradation", "downtime", "ux_change", "policy_change", "data_migration"] + - impact_area: Systems/components affected (max 3): e.g. ["custody", "onramp", "p2p", "cards", "stablecoins", "compliance"] + - impact_type: Types of impact: ["perf_degradation", "downtime", "ux_change", "policy_change", "data_migration", "legal_risk", "security_risk"] Quality: - confidence: 0.0-1.0. How confident you are in extraction accuracy. @@ -105,44 +167,8 @@ system: | ] } - If message has no events (e.g., question, discussion), set is_event=false and events=[]. - - Examples: - Message: "πŸš€ Launching Stocks & ETFs trading in alpha for Wallet team next Monday. Known issue: possible performance degradation during peak hours. Track: INV-1024" - Event: - { - "action": "Launch", - "object_name_raw": "Stocks & ETFs trading", - "qualifiers": ["alpha", "Wallet team"], - "stroke": "degradation possible", - "anchor": "INV-1024", - "category": "product", - "status": "planned", - "change_type": "launch", - "environment": "prod", - "severity": null, - "planned_start": "2025-10-21T00:00:00Z", - "planned_end": null, - "actual_start": null, - "actual_end": null, - "time_source": "relative", - "time_confidence": 0.85, - "summary": "Stocks & ETFs trading launching in alpha for Wallet team. Potential performance issues during peak hours.", - "why_it_matters": "New trading feature with known performance considerations", - "links": [], - "anchors": ["INV-1024"], - "impact_area": ["trading", "wallet"], - "impact_type": ["perf_degradation"], - "confidence": 0.95 - } - - Message: "Hi team! Has anyone tried the new mobile build? I'm seeing some weird caching behavior." - Response: - { - "is_event": false, - "overflow_note": null, - "events": [] - } + If message has no relevant event for security_incident, competitor_update or regulation, + you MUST set is_event=false and events=[]. IMPORTANT FORMATTING NOTES FOR TELEGRAM: - Telegram messages may contain emoji, user mentions (@username), hashtags (#tag) @@ -151,3 +177,6 @@ system: | - Focus on substantive content, not formatting - Forwarded messages will have forward_from metadata - use original source if relevant - Media attachments (photos, videos, documents) may provide additional context + - Links MUST include scheme; if you see "t.me/xyz" convert to "https://t.me/xyz" + - Prefer https:// links; drop malformed links rather than emitting invalid URLs + - If message date is older than 30 days, set is_event=false (ignore stale news) diff --git a/docker-compose.yml b/docker-compose.yml index 7ea470b..5aa0b50 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -6,6 +6,7 @@ x-app-env: &app-env POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:?POSTGRES_PASSWORD must be set} TELEGRAM_API_ID: ${TELEGRAM_API_ID:-} TELEGRAM_API_HASH: ${TELEGRAM_API_HASH:-} + TELEGRAM_SESSION_PATH: ${TELEGRAM_SESSION_PATH:-data/telegram_session_worker.session} DATABASE_TYPE: postgres POSTGRES_HOST: postgres POSTGRES_PORT: 5432 diff --git a/scripts/telegram_qr_auth.py b/scripts/telegram_qr_auth.py index 8c697c2..b693442 100644 --- a/scripts/telegram_qr_auth.py +++ b/scripts/telegram_qr_auth.py @@ -86,9 +86,9 @@ async def authenticate_with_qr() -> None: print(f"❌ Error: TELEGRAM_API_ID must be an integer, got: {api_id_str}") sys.exit(1) - # Session file path - session_path = "data/telegram_session.session" - Path("data").mkdir(exist_ok=True) + # Session file path (can be overridden via TELEGRAM_SESSION_PATH env) + session_path = os.getenv("TELEGRAM_SESSION_PATH", "data/telegram_session.session") + Path(Path(session_path).parent).mkdir(exist_ok=True) print("=" * 60) print("Telegram QR Code Authentication") diff --git a/src/adapters/postgres_repository.py b/src/adapters/postgres_repository.py index 29569c2..48d84c2 100644 --- a/src/adapters/postgres_repository.py +++ b/src/adapters/postgres_repository.py @@ -1058,9 +1058,9 @@ def get_events_in_window(self, start_dt: datetime, end_dt: datetime) -> list[Eve cur.execute( """ SELECT * FROM events - WHERE COALESCE(actual_start, actual_end, planned_start, planned_end) >= %s - AND COALESCE(actual_start, actual_end, planned_start, planned_end) <= %s - ORDER BY COALESCE(actual_start, actual_end, planned_start, planned_end) DESC + WHERE COALESCE(actual_start, actual_end, planned_start, planned_end, extracted_at) >= %s + AND COALESCE(actual_start, actual_end, planned_start, planned_end, extracted_at) <= %s + ORDER BY COALESCE(actual_start, actual_end, planned_start, planned_end, extracted_at) DESC """, (start_dt, end_dt), ) diff --git a/src/adapters/sqlite_repository.py b/src/adapters/sqlite_repository.py index d713f6f..9af39de 100644 --- a/src/adapters/sqlite_repository.py +++ b/src/adapters/sqlite_repository.py @@ -1186,9 +1186,9 @@ def get_events_in_window(self, start_dt: datetime, end_dt: datetime) -> list[Eve cursor.execute( """ SELECT * FROM events - WHERE COALESCE(actual_start, actual_end, planned_start, planned_end) >= ? - AND COALESCE(actual_start, actual_end, planned_start, planned_end) <= ? - ORDER BY COALESCE(actual_start, actual_end, planned_start, planned_end) ASC + WHERE COALESCE(actual_start, actual_end, planned_start, planned_end, extracted_at) >= ? + AND COALESCE(actual_start, actual_end, planned_start, planned_end, extracted_at) <= ? + ORDER BY COALESCE(actual_start, actual_end, planned_start, planned_end, extracted_at) ASC """, (start_dt.isoformat(), end_dt.isoformat()), ) From e0d1c7dfbfc3bd5aeed46aae1703b4f557451e2d Mon Sep 17 00:00:00 2001 From: VaitaR Date: Sat, 13 Dec 2025 10:34:59 +0300 Subject: [PATCH 2/3] feat: add message_published_at to event model and database handling --- docker-compose.yml | 4 +++- src/adapters/bulk_persistence.py | 10 ++++++---- src/adapters/postgres_repository.py | 16 +++++++++++++--- src/adapters/sqlite_repository.py | 18 +++++++++++++++--- src/domain/models.py | 4 ++++ src/use_cases/extract_events.py | 1 + 6 files changed, 42 insertions(+), 11 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 5aa0b50..685accf 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -6,7 +6,6 @@ x-app-env: &app-env POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:?POSTGRES_PASSWORD must be set} TELEGRAM_API_ID: ${TELEGRAM_API_ID:-} TELEGRAM_API_HASH: ${TELEGRAM_API_HASH:-} - TELEGRAM_SESSION_PATH: ${TELEGRAM_SESSION_PATH:-data/telegram_session_worker.session} DATABASE_TYPE: postgres POSTGRES_HOST: postgres POSTGRES_PORT: 5432 @@ -65,6 +64,9 @@ services: <<: *worker-base container_name: slack_telegram_worker command: python scripts/run_multi_source_pipeline.py --source telegram --interval-seconds 600 + environment: + <<: *app-env + TELEGRAM_SESSION_PATH: ${TELEGRAM_SESSION_PATH:-data/telegram_session_telegram_worker.session} networks: - slack_network diff --git a/src/adapters/bulk_persistence.py b/src/adapters/bulk_persistence.py index b325dcc..4710752 100644 --- a/src/adapters/bulk_persistence.py +++ b/src/adapters/bulk_persistence.py @@ -55,6 +55,7 @@ def _serialize_datetime(dt: datetime | None) -> datetime | str | None: event.message_id, json.dumps(event.source_channels), _serialize_datetime(event.extracted_at), + _serialize_datetime(event.message_published_at), event.source_id.value, event.action.value, event.object_id, @@ -189,7 +190,7 @@ def _postgres_events_batch( insert_sql = """ INSERT INTO events ( - event_id, message_id, source_channels, extracted_at, source_id, + event_id, message_id, source_channels, extracted_at, message_published_at, source_id, action, object_id, object_name_raw, qualifiers, stroke, anchor, category, status, change_type, environment, severity, planned_start, planned_end, actual_start, actual_end, @@ -197,7 +198,7 @@ def _postgres_events_batch( summary, why_it_matters, links, anchors, impact_area, impact_type, confidence, importance, cluster_key, dedup_key ) VALUES ( - %s, %s, %s, %s, %s, + %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, @@ -209,6 +210,7 @@ def _postgres_events_batch( message_id = EXCLUDED.message_id, source_channels = EXCLUDED.source_channels, extracted_at = EXCLUDED.extracted_at, + message_published_at = EXCLUDED.message_published_at, source_id = EXCLUDED.source_id, action = EXCLUDED.action, object_id = EXCLUDED.object_id, @@ -257,7 +259,7 @@ def _sqlite_events_batch( insert_sql = """ INSERT OR REPLACE INTO events ( - event_id, message_id, source_channels, extracted_at, source_id, + event_id, message_id, source_channels, extracted_at, message_published_at, source_id, action, object_id, object_name_raw, qualifiers, stroke, anchor, category, status, change_type, environment, severity, planned_start, planned_end, actual_start, actual_end, @@ -265,7 +267,7 @@ def _sqlite_events_batch( summary, why_it_matters, links, anchors, impact_area, impact_type, confidence, importance, cluster_key, dedup_key ) VALUES ( - ?, ?, ?, ?, ?, + ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, diff --git a/src/adapters/postgres_repository.py b/src/adapters/postgres_repository.py index 48d84c2..0e8d6bc 100644 --- a/src/adapters/postgres_repository.py +++ b/src/adapters/postgres_repository.py @@ -468,6 +468,15 @@ def _row_to_event(self, row: dict[str, Any]) -> Event: else: extracted_at = extracted_at_raw.astimezone(pytz.UTC) + message_published_at_raw = row.get("message_published_at") + if isinstance(message_published_at_raw, datetime): + if message_published_at_raw.tzinfo is None: + message_published_at = message_published_at_raw.replace(tzinfo=pytz.UTC) + else: + message_published_at = message_published_at_raw.astimezone(pytz.UTC) + else: + message_published_at = None + return Event( # Identification event_id=UUID(row["event_id"]), @@ -475,6 +484,7 @@ def _row_to_event(self, row: dict[str, Any]) -> Event: source_channels=row.get("source_channels") or [], # Already parsed from JSONB extracted_at=extracted_at, + message_published_at=message_published_at, source_id=MessageSource(source_id_value), # Title slots action=ActionType(row["action"]), @@ -1058,9 +1068,9 @@ def get_events_in_window(self, start_dt: datetime, end_dt: datetime) -> list[Eve cur.execute( """ SELECT * FROM events - WHERE COALESCE(actual_start, actual_end, planned_start, planned_end, extracted_at) >= %s - AND COALESCE(actual_start, actual_end, planned_start, planned_end, extracted_at) <= %s - ORDER BY COALESCE(actual_start, actual_end, planned_start, planned_end, extracted_at) DESC + WHERE COALESCE(actual_start, actual_end, planned_start, planned_end, message_published_at, extracted_at) >= %s + AND COALESCE(actual_start, actual_end, planned_start, planned_end, message_published_at, extracted_at) <= %s + ORDER BY COALESCE(actual_start, actual_end, planned_start, planned_end, message_published_at, extracted_at) DESC """, (start_dt, end_dt), ) diff --git a/src/adapters/sqlite_repository.py b/src/adapters/sqlite_repository.py index 9af39de..ad9e2b4 100644 --- a/src/adapters/sqlite_repository.py +++ b/src/adapters/sqlite_repository.py @@ -219,6 +219,7 @@ def _create_schema(self) -> None: message_id TEXT NOT NULL, source_channels TEXT NOT NULL, extracted_at TEXT NOT NULL, + message_published_at TEXT, -- Title Slots (source of truth) action TEXT NOT NULL, @@ -265,6 +266,14 @@ def _create_schema(self) -> None: """ ) + try: + cursor.execute( + "ALTER TABLE events ADD COLUMN message_published_at TEXT" + ) + except sqlite3.OperationalError as exc: + if "duplicate column name" not in str(exc).lower(): + raise + # Event relations table cursor.execute( """ @@ -1186,9 +1195,9 @@ def get_events_in_window(self, start_dt: datetime, end_dt: datetime) -> list[Eve cursor.execute( """ SELECT * FROM events - WHERE COALESCE(actual_start, actual_end, planned_start, planned_end, extracted_at) >= ? - AND COALESCE(actual_start, actual_end, planned_start, planned_end, extracted_at) <= ? - ORDER BY COALESCE(actual_start, actual_end, planned_start, planned_end, extracted_at) ASC + WHERE COALESCE(actual_start, actual_end, planned_start, planned_end, message_published_at, extracted_at) >= ? + AND COALESCE(actual_start, actual_end, planned_start, planned_end, message_published_at, extracted_at) <= ? + ORDER BY COALESCE(actual_start, actual_end, planned_start, planned_end, message_published_at, extracted_at) ASC """, (start_dt.isoformat(), end_dt.isoformat()), ) @@ -1669,6 +1678,9 @@ def _parse_dt(value: str | None) -> datetime | None: message_id=row["message_id"], source_channels=json.loads(row["source_channels"] or "[]"), extracted_at=extracted_at, + message_published_at=_parse_dt(row["message_published_at"]) + if "message_published_at" in row.keys() + else None, # Title slots action=ActionType(row["action"]), object_id=row["object_id"], diff --git a/src/domain/models.py b/src/domain/models.py index 539e710..dab1d63 100644 --- a/src/domain/models.py +++ b/src/domain/models.py @@ -468,6 +468,9 @@ class Event(BaseModel): source_channels: list[str] = Field( default_factory=list, description="Source channel names" ) + message_published_at: datetime | None = Field( + default=None, description="Original message timestamp (UTC)" + ) extracted_at: datetime = Field( default_factory=_utcnow, description="Extraction timestamp" ) @@ -620,6 +623,7 @@ def event_date(self) -> datetime | None: or self.actual_end or self.planned_start or self.planned_end + or self.message_published_at ) @property diff --git a/src/use_cases/extract_events.py b/src/use_cases/extract_events.py index 1ad71fa..89b6b71 100644 --- a/src/use_cases/extract_events.py +++ b/src/use_cases/extract_events.py @@ -259,6 +259,7 @@ def convert_llm_event_to_domain( message_id=message_id, source_channels=[channel_name] if channel_name else [], source_id=source_id, + message_published_at=candidate.ts_dt, # Title slots action=action, object_id=object_id, From d0d05717e577b8c58b405235345ffca2c7d40900 Mon Sep 17 00:00:00 2001 From: VaitaR Date: Sat, 13 Dec 2025 13:01:42 +0300 Subject: [PATCH 3/3] feat: add message_published_at field to event model and database handling --- app.py | 562 ++++++++++++++------------ src/adapters/sqlite_repository.py | 4 +- src/domain/models.py | 2 +- src/use_cases/extract_events.py | 2 +- tests/test_bulk_persistence_counts.py | 23 +- 5 files changed, 329 insertions(+), 264 deletions(-) diff --git a/app.py b/app.py index 5f2fb27..dba346c 100644 --- a/app.py +++ b/app.py @@ -7,11 +7,10 @@ """ from datetime import UTC, datetime, timedelta -from typing import Final +from typing import Any, Final import pandas as pd import plotly.express as px -import plotly.graph_objects as go import streamlit as st from src.adapters.repository_factory import create_repository @@ -24,26 +23,30 @@ MAX_TITLE_LENGTH: Final[int] = 80 MAX_SUMMARY_LENGTH: Final[int] = 150 +DATE_RANGE_PAIR_LENGTH: Final[int] = 2 +TIMELINE_MAX_EVENTS: Final[int] = 30 +CALENDAR_TITLE_TRUNCATE_LENGTH: Final[int] = 12 +POPOVER_SUMMARY_PREVIEW_LENGTH: Final[int] = 200 # Category colors for consistent styling CATEGORY_COLORS: Final[dict[str, str]] = { - "product": "#2ECC71", # Green - "risk": "#E74C3C", # Red - "process": "#3498DB", # Blue - "marketing": "#9B59B6", # Purple - "org": "#F39C12", # Orange - "unknown": "#95A5A6", # Gray + "product": "#2ECC71", # Green + "risk": "#E74C3C", # Red + "process": "#3498DB", # Blue + "marketing": "#9B59B6", # Purple + "org": "#F39C12", # Orange + "unknown": "#95A5A6", # Gray } STATUS_COLORS: Final[dict[str, str]] = { - "planned": "#BDC3C7", # Light gray + "planned": "#BDC3C7", # Light gray "confirmed": "#85C1E9", # Light blue - "started": "#F1C40F", # Yellow + "started": "#F1C40F", # Yellow "completed": "#27AE60", # Green "postponed": "#E67E22", # Orange - "canceled": "#E74C3C", # Red - "rolled_back": "#8E44AD", # Purple - "updated": "#3498DB", # Blue + "canceled": "#E74C3C", # Red + "rolled_back": "#8E44AD", # Purple + "updated": "#3498DB", # Blue } @@ -51,6 +54,23 @@ # DATA ACCESS # ============================================================================ + +def fetch_channel_messages( + slack_client: Any, + *, + channels: list[str], + limit: int | None = None, +) -> list[tuple[str, dict[str, Any]]]: + """Fetch messages from multiple Slack channels while preserving channel identity.""" + + collected: list[tuple[str, dict[str, Any]]] = [] + for channel_id in channels: + messages = slack_client.fetch_messages(channel_id=channel_id, limit=limit) + for message in messages: + collected.append((channel_id, message)) + return collected + + @st.cache_resource def get_repository() -> RepositoryProtocol: """Get cached repository instance.""" @@ -62,44 +82,50 @@ def get_repository() -> RepositoryProtocol: def fetch_events_by_source(source_id: str, limit: int = 500) -> pd.DataFrame: """Fetch events filtered by source and convert to DataFrame.""" repo = get_repository() - + # Get events from last 90 days to future start_date = datetime.now(UTC) - timedelta(days=90) end_date = datetime.now(UTC) + timedelta(days=365) - + all_events = repo.get_events_in_window(start_date, end_date) - + # Filter by source events = [e for e in all_events if e.source_id.value == source_id][:limit] - + if not events: return pd.DataFrame() - + data = [] for evt in events: - data.append({ - "event_id": str(evt.event_id)[:8], - "title": evt.title[:MAX_TITLE_LENGTH] + "..." if len(evt.title) > MAX_TITLE_LENGTH else evt.title, - "full_title": evt.title, - "category": evt.category.value, - "status": evt.status.value, - "event_date": evt.event_date, - "confidence": evt.confidence, - "importance": evt.importance, - "summary": (evt.summary[:MAX_SUMMARY_LENGTH] + "...") if evt.summary and len(evt.summary) > MAX_SUMMARY_LENGTH else (evt.summary or ""), - "full_summary": evt.summary or "", - "environment": evt.environment.value if evt.environment else "unknown", - "source_id": evt.source_id.value, - "links": evt.links[:3] if evt.links else [], - "change_type": evt.change_type.value if evt.change_type else "other", - }) - + data.append( + { + "event_id": str(evt.event_id)[:8], + "title": evt.title[:MAX_TITLE_LENGTH] + "..." + if len(evt.title) > MAX_TITLE_LENGTH + else evt.title, + "full_title": evt.title, + "category": evt.category.value, + "status": evt.status.value, + "event_date": evt.event_date, + "confidence": evt.confidence, + "importance": evt.importance, + "summary": (evt.summary[:MAX_SUMMARY_LENGTH] + "...") + if evt.summary and len(evt.summary) > MAX_SUMMARY_LENGTH + else (evt.summary or ""), + "full_summary": evt.summary or "", + "environment": evt.environment.value if evt.environment else "unknown", + "source_id": evt.source_id.value, + "links": evt.links[:3] if evt.links else [], + "change_type": evt.change_type.value if evt.change_type else "other", + } + ) + df = pd.DataFrame(data) - + # Ensure event_date is datetime if "event_date" in df.columns: df["event_date"] = pd.to_datetime(df["event_date"], utc=True) - + return df @@ -107,33 +133,30 @@ def fetch_events_by_source(source_id: str, limit: int = 500) -> pd.DataFrame: # FILTER COMPONENTS # ============================================================================ + def render_filters(df: pd.DataFrame, key_prefix: str) -> pd.DataFrame: """Render filter controls and return filtered DataFrame.""" - + if df.empty: return df - + # Create filter columns col1, col2, col3, col4 = st.columns(4) - + with col1: # Category filter categories = ["All"] + sorted(df["category"].unique().tolist()) selected_category = st.selectbox( - "πŸ“‚ Category", - options=categories, - key=f"{key_prefix}_category" + "πŸ“‚ Category", options=categories, key=f"{key_prefix}_category" ) - + with col2: # Status filter statuses = ["All"] + sorted(df["status"].unique().tolist()) selected_status = st.selectbox( - "πŸ“Š Status", - options=statuses, - key=f"{key_prefix}_status" + "πŸ“Š Status", options=statuses, key=f"{key_prefix}_status" ) - + with col3: # Importance filter min_importance = st.slider( @@ -141,9 +164,9 @@ def render_filters(df: pd.DataFrame, key_prefix: str) -> pd.DataFrame: min_value=0, max_value=100, value=0, - key=f"{key_prefix}_importance" + key=f"{key_prefix}_importance", ) - + with col4: # Confidence filter min_confidence = st.slider( @@ -152,59 +175,67 @@ def render_filters(df: pd.DataFrame, key_prefix: str) -> pd.DataFrame: max_value=1.0, value=0.0, step=0.1, - key=f"{key_prefix}_confidence" + key=f"{key_prefix}_confidence", ) - + # Second row: search and date range col5, col6 = st.columns([2, 2]) - + with col5: search_query = st.text_input( "πŸ” Search in titles", placeholder="Type to search...", - key=f"{key_prefix}_search" + key=f"{key_prefix}_search", ) - + with col6: # Date range filter if df["event_date"].notna().any(): min_date = df["event_date"].min().date() max_date = df["event_date"].max().date() - + date_range = st.date_input( "πŸ“… Date Range", value=(min_date, max_date), min_value=min_date, max_value=max_date, - key=f"{key_prefix}_date_range" + key=f"{key_prefix}_date_range", ) else: date_range = None - + # Apply filters filtered_df = df.copy() - + if selected_category != "All": filtered_df = filtered_df[filtered_df["category"] == selected_category] - + if selected_status != "All": filtered_df = filtered_df[filtered_df["status"] == selected_status] - + filtered_df = filtered_df[filtered_df["importance"] >= min_importance] filtered_df = filtered_df[filtered_df["confidence"] >= min_confidence] - + if search_query: - mask = filtered_df["full_title"].str.contains(search_query, case=False, na=False) + mask = filtered_df["full_title"].str.contains( + search_query, case=False, na=False + ) filtered_df = filtered_df[mask] - - if date_range and len(date_range) == 2 and filtered_df["event_date"].notna().any(): + + if ( + date_range + and len(date_range) == DATE_RANGE_PAIR_LENGTH + and filtered_df["event_date"].notna().any() + ): start, end = date_range # Convert to datetime for comparison start_dt = pd.Timestamp(start, tz="UTC") end_dt = pd.Timestamp(end, tz="UTC") + pd.Timedelta(days=1) - mask = (filtered_df["event_date"] >= start_dt) & (filtered_df["event_date"] < end_dt) + mask = (filtered_df["event_date"] >= start_dt) & ( + filtered_df["event_date"] < end_dt + ) filtered_df = filtered_df[mask] - + return filtered_df @@ -212,34 +243,41 @@ def render_filters(df: pd.DataFrame, key_prefix: str) -> pd.DataFrame: # TABLE COMPONENTS # ============================================================================ + def render_events_table(df: pd.DataFrame, key_prefix: str) -> None: """Render events table with proper column configuration.""" - + if df.empty: st.info("No events match the selected filters.") return - + # Show count st.caption(f"Showing {len(df)} events") - + # Prepare display DataFrame - display_df = df[[ - "title", "category", "status", "event_date", - "importance", "confidence", "environment", "change_type" - ]].copy() - + display_df = df[ + [ + "title", + "category", + "status", + "event_date", + "importance", + "confidence", + "environment", + "change_type", + ] + ].copy() + # Format date for display display_df["event_date"] = display_df["event_date"].dt.strftime("%Y-%m-%d %H:%M") - + st.dataframe( display_df, use_container_width=True, hide_index=True, column_config={ "title": st.column_config.TextColumn( - "πŸ“Œ Title", - width="large", - help="Event title" + "πŸ“Œ Title", width="large", help="Event title" ), "category": st.column_config.TextColumn( "πŸ“‚ Category", @@ -279,24 +317,26 @@ def render_events_table(df: pd.DataFrame, key_prefix: str) -> None: def render_metrics(df: pd.DataFrame, source_name: str) -> None: """Render summary metrics for events.""" - + col1, col2, col3, col4, col5 = st.columns(5) - + with col1: st.metric("πŸ“Š Total", len(df)) - + with col2: risk_count = len(df[df["category"] == "risk"]) if not df.empty else 0 st.metric("🚨 Risks", risk_count) - + with col3: product_count = len(df[df["category"] == "product"]) if not df.empty else 0 st.metric("πŸš€ Product", product_count) - + with col4: - active_count = len(df[df["status"].isin(["started", "planned"])]) if not df.empty else 0 + active_count = ( + len(df[df["status"].isin(["started", "planned"])]) if not df.empty else 0 + ) st.metric("⚑ Active", active_count) - + with col5: avg_importance = df["importance"].mean() if not df.empty else 0 st.metric("⭐ Avg Importance", f"{avg_importance:.0f}") @@ -306,20 +346,21 @@ def render_metrics(df: pd.DataFrame, source_name: str) -> None: # TIMELINE COMPONENT (Simple and Working) # ============================================================================ + def render_timeline(df: pd.DataFrame, key_prefix: str) -> None: """Render timeline visualization using px.timeline (Gantt-style).""" - + if df.empty or df["event_date"].isna().all(): st.info("No events with dates to display on timeline.") return - + # Filter events with valid dates chart_df = df[df["event_date"].notna()].copy() - + if chart_df.empty: st.info("No events with dates to display on timeline.") return - + # Category filter available_categories = sorted(chart_df["category"].unique().tolist()) selected_categories = st.multiselect( @@ -328,26 +369,26 @@ def render_timeline(df: pd.DataFrame, key_prefix: str) -> None: default=available_categories, key=f"{key_prefix}_timeline_categories", ) - + if not selected_categories: st.warning("Select at least one category to display.") return - + chart_df = chart_df[chart_df["category"].isin(selected_categories)] - + # Sort by date chart_df = chart_df.sort_values("event_date", ascending=False) - + # Limit to avoid overloading - if len(chart_df) > 30: - st.caption(f"Showing top 30 events out of {len(chart_df)}") - chart_df = chart_df.head(30) - + if len(chart_df) > TIMELINE_MAX_EVENTS: + st.caption(f"Showing top {TIMELINE_MAX_EVENTS} events out of {len(chart_df)}") + chart_df = chart_df.head(TIMELINE_MAX_EVENTS) + # Create proper start/end for Gantt chart_df["Start"] = chart_df["event_date"] chart_df["End"] = chart_df["event_date"] + pd.Timedelta(days=1) chart_df["Task"] = chart_df["title"].str[:60] - + # Create Gantt chart fig = px.timeline( chart_df, @@ -359,7 +400,7 @@ def render_timeline(df: pd.DataFrame, key_prefix: str) -> None: hover_data=["status", "importance", "confidence"], title="πŸ“… Events Timeline", ) - + # Add TODAY marker today = datetime.now(UTC) fig.add_vline( @@ -370,7 +411,7 @@ def render_timeline(df: pd.DataFrame, key_prefix: str) -> None: annotation_text="TODAY", annotation_position="top", ) - + fig.update_layout( height=max(400, len(chart_df) * 28), xaxis_title="", @@ -378,9 +419,9 @@ def render_timeline(df: pd.DataFrame, key_prefix: str) -> None: showlegend=True, legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="left", x=0), ) - + fig.update_yaxes(automargin=True, tickfont=dict(size=11)) - + st.plotly_chart(fig, use_container_width=True, key=f"{key_prefix}_timeline") @@ -388,108 +429,114 @@ def render_timeline(df: pd.DataFrame, key_prefix: str) -> None: # CALENDAR VIEW COMPONENT # ============================================================================ + def render_calendar_view(df: pd.DataFrame, key_prefix: str) -> None: """Render calendar view similar to Google Calendar.""" - + if df.empty or df["event_date"].isna().all(): st.info("No events with dates to display.") return - + chart_df = df[df["event_date"].notna()].copy() - + if chart_df.empty: st.info("No events with dates to display.") return - + # Get date range min_date = chart_df["event_date"].min() max_date = chart_df["event_date"].max() - + # Date navigation col1, col2, col3 = st.columns([1, 2, 1]) - + with col2: # Month selector today = datetime.now(UTC) available_months = pd.date_range( start=min_date.replace(day=1), end=max_date.replace(day=1) + pd.DateOffset(months=1), - freq='MS' + freq="MS", ) month_options = [d.strftime("%B %Y") for d in available_months] current_month_str = today.strftime("%B %Y") - default_idx = month_options.index(current_month_str) if current_month_str in month_options else 0 - + default_idx = ( + month_options.index(current_month_str) + if current_month_str in month_options + else 0 + ) + selected_month = st.selectbox( "πŸ“… Select Month", options=month_options, index=default_idx, - key=f"{key_prefix}_calendar_month" + key=f"{key_prefix}_calendar_month", ) - + # Parse selected month selected_date = pd.to_datetime(selected_month, format="%B %Y") month_start = selected_date.replace(day=1, tzinfo=UTC) - month_end = (month_start + pd.DateOffset(months=1) - pd.Timedelta(days=1)).replace(tzinfo=UTC) - + month_end = (month_start + pd.DateOffset(months=1) - pd.Timedelta(days=1)).replace( + tzinfo=UTC + ) + # Filter events for selected month month_events = chart_df[ - (chart_df["event_date"] >= month_start) & - (chart_df["event_date"] <= month_end) + (chart_df["event_date"] >= month_start) & (chart_df["event_date"] <= month_end) ] - + # Create calendar grid st.markdown("---") - + # Day headers days = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"] cols = st.columns(7) for i, day in enumerate(days): cols[i].markdown(f"**{day}**") - + # Get first day of month and calculate offset first_day = month_start first_weekday = first_day.weekday() # Monday = 0 - + # Calculate total days in month days_in_month = (month_end - month_start).days + 1 - + # Create weeks current_day = 1 week_num = 0 - + while current_day <= days_in_month: cols = st.columns(7) - + for weekday in range(7): # Skip days before month starts if week_num == 0 and weekday < first_weekday: cols[weekday].write("") continue - + if current_day > days_in_month: cols[weekday].write("") continue - + # Current date current_date = month_start + pd.Timedelta(days=current_day - 1) - + # Get events for this day day_events = month_events[ month_events["event_date"].dt.date == current_date.date() ] - + # Build day cell content is_today = current_date.date() == today.date() day_style = "πŸ”΄ " if is_today else "" - + with cols[weekday]: # Day number if is_today: st.markdown(f"**{day_style}{current_day}**") else: st.markdown(f"**{current_day}**") - + # Events for this day for _, event in day_events.iterrows(): cat_emoji = { @@ -498,38 +545,53 @@ def render_calendar_view(df: pd.DataFrame, key_prefix: str) -> None: "process": "βš™οΈ", "marketing": "πŸ“£", "org": "πŸ‘₯", - "unknown": "❓" + "unknown": "❓", }.get(event["category"], "πŸ“Œ") - + # Truncate title for display - title_short = event["title"][:12] + "…" if len(event["title"]) > 12 else event["title"] - + title_short = ( + event["title"][:CALENDAR_TITLE_TRUNCATE_LENGTH] + "…" + if len(event["title"]) > CALENDAR_TITLE_TRUNCATE_LENGTH + else event["title"] + ) + # Use native Streamlit popover - with st.popover(f"{cat_emoji} {title_short}", use_container_width=True): + with st.popover( + f"{cat_emoji} {title_short}", use_container_width=True + ): st.markdown(f"**{event['full_title']}**") - st.caption(f"πŸ“‚ {event['category'].title()} | πŸ“Š {event['status'].title()}") - st.caption(f"⭐ Importance: {event['importance']} | 🎯 Confidence: {event['confidence']:.0%}") + st.caption( + f"πŸ“‚ {event['category'].title()} | πŸ“Š {event['status'].title()}" + ) + st.caption( + f"⭐ Importance: {event['importance']} | 🎯 Confidence: {event['confidence']:.0%}" + ) st.markdown("---") - st.write(event['summary'][:200] + "..." if len(str(event['summary'])) > 200 else event['summary']) - + summary = str(event["summary"] or "") + if len(summary) > POPOVER_SUMMARY_PREVIEW_LENGTH: + summary = summary[:POPOVER_SUMMARY_PREVIEW_LENGTH] + "..." + st.write(summary) + current_day += 1 - + week_num += 1 - + # Legend st.markdown("---") - legend_items = " | ".join([ - f"{emoji} {cat.title()}" - for cat, emoji in { - "product": "πŸš€", - "risk": "🚨", - "process": "βš™οΈ", - "marketing": "πŸ“£", - "org": "πŸ‘₯", - }.items() - ]) + legend_items = " | ".join( + [ + f"{emoji} {cat.title()}" + for cat, emoji in { + "product": "πŸš€", + "risk": "🚨", + "process": "βš™οΈ", + "marketing": "πŸ“£", + "org": "πŸ‘₯", + }.items() + ] + ) st.caption(f"Legend: {legend_items}") - + # Summary st.caption(f"πŸ“Š {len(month_events)} events in {selected_month}") @@ -538,22 +600,23 @@ def render_calendar_view(df: pd.DataFrame, key_prefix: str) -> None: # EVENT LIST VIEW # ============================================================================ + def render_event_list(df: pd.DataFrame, key_prefix: str) -> None: """Render events as a list grouped by date.""" - + if df.empty or df["event_date"].isna().all(): st.info("No events to display.") return - + chart_df = df[df["event_date"].notna()].copy() chart_df = chart_df.sort_values("event_date", ascending=False) - + # Group by date chart_df["date_str"] = chart_df["event_date"].dt.strftime("%A, %B %d, %Y") - + for date_str, group in chart_df.groupby("date_str", sort=False): st.markdown(f"### πŸ“… {date_str}") - + for _, event in group.iterrows(): cat_color = CATEGORY_COLORS.get(event["category"], "#95A5A6") cat_emoji = { @@ -562,32 +625,32 @@ def render_event_list(df: pd.DataFrame, key_prefix: str) -> None: "process": "βš™οΈ", "marketing": "πŸ“£", "org": "πŸ‘₯", - "unknown": "❓" + "unknown": "❓", }.get(event["category"], "πŸ“Œ") - + status_emoji = { "planned": "πŸ“‹", "started": "▢️", "completed": "βœ…", "canceled": "❌", }.get(event["status"], "πŸ“Œ") - + with st.container(): st.markdown( f'
' f'
{cat_emoji} {event["full_title"]}
' f'
' - f'{status_emoji} {event["status"].title()} | ' - f'⭐ {event["importance"]} | ' - f'🎯 {event["confidence"]:.0%}' - f'
' + f"{status_emoji} {event['status'].title()} | " + f"⭐ {event['importance']} | " + f"🎯 {event['confidence']:.0%}" + f"
" f'
{event["summary"][:150]}...
' - f'', - unsafe_allow_html=True + f"", + unsafe_allow_html=True, ) - + st.markdown("") @@ -595,14 +658,15 @@ def render_event_list(df: pd.DataFrame, key_prefix: str) -> None: # CATEGORY DISTRIBUTION CHART # ============================================================================ + def render_category_chart(df: pd.DataFrame, key_prefix: str) -> None: """Render category distribution pie chart.""" - + if df.empty: return - + category_counts = df["category"].value_counts() - + fig = px.pie( values=category_counts.values, names=category_counts.index, @@ -611,25 +675,25 @@ def render_category_chart(df: pd.DataFrame, key_prefix: str) -> None: color_discrete_map=CATEGORY_COLORS, hole=0.4, ) - + fig.update_layout( height=300, margin=dict(l=20, r=20, t=40, b=20), showlegend=True, legend=dict(orientation="h", yanchor="bottom", y=-0.2), ) - + st.plotly_chart(fig, use_container_width=True, key=f"{key_prefix}_pie") def render_status_chart(df: pd.DataFrame, key_prefix: str) -> None: """Render status distribution bar chart.""" - + if df.empty: return - + status_counts = df["status"].value_counts() - + fig = px.bar( x=status_counts.values, y=status_counts.index, @@ -638,7 +702,7 @@ def render_status_chart(df: pd.DataFrame, key_prefix: str) -> None: color=status_counts.index, color_discrete_map=STATUS_COLORS, ) - + fig.update_layout( height=300, margin=dict(l=20, r=20, t=40, b=20), @@ -646,7 +710,7 @@ def render_status_chart(df: pd.DataFrame, key_prefix: str) -> None: xaxis_title="Count", yaxis_title="", ) - + st.plotly_chart(fig, use_container_width=True, key=f"{key_prefix}_bar") @@ -654,51 +718,50 @@ def render_status_chart(df: pd.DataFrame, key_prefix: str) -> None: # SOURCE TABS # ============================================================================ -def render_source_tab(source_id: str, source_name: str, source_emoji: str, description: str) -> None: + +def render_source_tab( + source_id: str, source_name: str, source_emoji: str, description: str +) -> None: """Render complete tab for a single source.""" - + st.markdown(f"### {source_emoji} {source_name}") st.caption(description) - + # Fetch data df = fetch_events_by_source(source_id) - + if df.empty: st.warning(f"No events found from {source_name}.") return - + # Metrics row render_metrics(df, source_name) - + st.divider() - + # Filters st.markdown("#### πŸŽ›οΈ Filters") filtered_df = render_filters(df, key_prefix=source_id) - + st.divider() - + # Sub-tabs for table and visualizations - tab_table, tab_timeline, tab_calendar, tab_list, tab_analytics = st.tabs([ - "πŸ“‹ Table", - "πŸ“Š Timeline", - "πŸ“… Calendar", - "πŸ“ List", - "πŸ“ˆ Analytics" - ]) - + tab_table, tab_timeline, tab_calendar, tab_list, tab_analytics = st.tabs( + ["πŸ“‹ Table", "πŸ“Š Timeline", "πŸ“… Calendar", "πŸ“ List", "πŸ“ˆ Analytics"] + ) + with tab_table: render_events_table(filtered_df, key_prefix=source_id) - + with tab_timeline: render_timeline(filtered_df, key_prefix=source_id) - + with tab_calendar: render_calendar_view(filtered_df, key_prefix=source_id) - + with tab_list: render_event_list(filtered_df, key_prefix=source_id) - + with tab_analytics: col1, col2 = st.columns(2) with col1: @@ -711,9 +774,10 @@ def render_source_tab(source_id: str, source_name: str, source_emoji: str, descr # MAIN APPLICATION # ============================================================================ + def main(): """Main application entry point.""" - + # Page configuration st.set_page_config( page_title="Event Manager", @@ -721,9 +785,10 @@ def main(): layout="wide", initial_sidebar_state="collapsed", ) - + # Custom CSS - st.markdown(""" + st.markdown( + """ - """, unsafe_allow_html=True) - + """, + unsafe_allow_html=True, + ) + # Header - st.markdown('

πŸ“… Event Manager Dashboard

', unsafe_allow_html=True) + st.markdown( + '

πŸ“… Event Manager Dashboard

', + unsafe_allow_html=True, + ) st.caption("Centralized view of internal (Slack) and external (Telegram) events") - + # Sidebar with system info with st.sidebar: st.header("ℹ️ System Info") - + settings = get_settings() - + # Database info if settings.database_type == "postgres": st.success(f"🐘 PostgreSQL: {settings.postgres_database}") else: st.info(f"πŸ“ SQLite: {settings.db_path}") - + # Quick stats st.divider() st.subheader("πŸ“Š Quick Stats") - + slack_df = fetch_events_by_source("slack") telegram_df = fetch_events_by_source("telegram") - + st.metric("Slack Events", len(slack_df)) st.metric("Telegram Events", len(telegram_df)) st.metric("Total Events", len(slack_df) + len(telegram_df)) - + # Refresh button st.divider() if st.button("πŸ”„ Refresh Data", use_container_width=True): st.cache_data.clear() st.rerun() - + # Main content - source tabs - tab_slack, tab_telegram, tab_all = st.tabs([ - "🏒 Internal Events (Slack)", - "🌍 External Events (Telegram)", - "πŸ“Š All Events" - ]) - + tab_slack, tab_telegram, tab_all = st.tabs( + ["🏒 Internal Events (Slack)", "🌍 External Events (Telegram)", "πŸ“Š All Events"] + ) + with tab_slack: render_source_tab( source_id="slack", source_name="Internal Events", source_emoji="🏒", - description="Events from internal Slack channels: releases, deployments, incidents, team updates" + description="Events from internal Slack channels: releases, deployments, incidents, team updates", ) - + with tab_telegram: render_source_tab( source_id="telegram", - source_name="External Events", + source_name="External Events", source_emoji="🌍", - description="Events from Telegram channels: market news, competitor updates, industry trends" + description="Events from Telegram channels: market news, competitor updates, industry trends", ) - + with tab_all: st.markdown("### πŸ“Š All Events Combined") st.caption("Overview of all events from both sources") - + # Combine both sources slack_df = fetch_events_by_source("slack") telegram_df = fetch_events_by_source("telegram") - + if slack_df.empty and telegram_df.empty: st.warning("No events found in any source.") return - + combined_df = pd.concat([slack_df, telegram_df], ignore_index=True) - + # Metrics col1, col2, col3, col4, col5 = st.columns(5) with col1: @@ -877,36 +945,32 @@ def main(): with col5: avg_imp = combined_df["importance"].mean() if not combined_df.empty else 0 st.metric("⭐ Avg Importance", f"{avg_imp:.0f}") - + st.divider() - + # Filters st.markdown("#### πŸŽ›οΈ Filters") filtered_df = render_filters(combined_df, key_prefix="all") - + st.divider() - + # Content tabs - tab_table, tab_timeline, tab_calendar, tab_list, tab_analytics = st.tabs([ - "πŸ“‹ Table", - "πŸ“Š Timeline", - "πŸ“… Calendar", - "πŸ“ List", - "πŸ“ˆ Analytics" - ]) - + tab_table, tab_timeline, tab_calendar, tab_list, tab_analytics = st.tabs( + ["πŸ“‹ Table", "πŸ“Š Timeline", "πŸ“… Calendar", "πŸ“ List", "πŸ“ˆ Analytics"] + ) + with tab_table: render_events_table(filtered_df, key_prefix="all") - + with tab_timeline: render_timeline(filtered_df, key_prefix="all") - + with tab_calendar: render_calendar_view(filtered_df, key_prefix="all") - + with tab_list: render_event_list(filtered_df, key_prefix="all") - + with tab_analytics: col1, col2 = st.columns(2) with col1: diff --git a/src/adapters/sqlite_repository.py b/src/adapters/sqlite_repository.py index ad9e2b4..1660b43 100644 --- a/src/adapters/sqlite_repository.py +++ b/src/adapters/sqlite_repository.py @@ -267,9 +267,7 @@ def _create_schema(self) -> None: ) try: - cursor.execute( - "ALTER TABLE events ADD COLUMN message_published_at TEXT" - ) + cursor.execute("ALTER TABLE events ADD COLUMN message_published_at TEXT") except sqlite3.OperationalError as exc: if "duplicate column name" not in str(exc).lower(): raise diff --git a/src/domain/models.py b/src/domain/models.py index dab1d63..087a0a9 100644 --- a/src/domain/models.py +++ b/src/domain/models.py @@ -50,7 +50,7 @@ class MessageSourceConfig(BaseModel): default_factory=dict, description="Per-source LLM settings (temperature, timeout)", ) - channels: list[str] | list[Any] = Field( + channels: list[Any] = Field( default_factory=list, description="List of channel IDs (str) or channel config objects", ) diff --git a/src/use_cases/extract_events.py b/src/use_cases/extract_events.py index 89b6b71..6f1f61c 100644 --- a/src/use_cases/extract_events.py +++ b/src/use_cases/extract_events.py @@ -259,7 +259,7 @@ def convert_llm_event_to_domain( message_id=message_id, source_channels=[channel_name] if channel_name else [], source_id=source_id, - message_published_at=candidate.ts_dt, + message_published_at=_normalize_to_utc(message_ts_dt), # Title slots action=action, object_id=object_id, diff --git a/tests/test_bulk_persistence_counts.py b/tests/test_bulk_persistence_counts.py index b2952d4..1a82a1a 100644 --- a/tests/test_bulk_persistence_counts.py +++ b/tests/test_bulk_persistence_counts.py @@ -49,6 +49,7 @@ def _create_event_values(index: int) -> tuple[object, ...]: event_id = str(uuid4()) message_id = f"msg-{index}" source_channels = "[]" + message_published_at = now source_id = "slack" action = "update" object_id = f"OBJ-{index}" @@ -77,6 +78,7 @@ def _create_event_values(index: int) -> tuple[object, ...]: message_id, source_channels, now, + message_published_at, source_id, action, object_id, @@ -127,16 +129,17 @@ def test_bulk_upsert_statement_counts(record_count: int) -> None: conn = sqlite3.connect(":memory:", factory=CountingConnection) conn.execute( """ - CREATE TABLE events ( - event_id TEXT PRIMARY KEY, - message_id TEXT, - source_channels TEXT, - extracted_at TEXT, - source_id TEXT, - action TEXT, - object_id TEXT, - object_name_raw TEXT, - qualifiers TEXT, + CREATE TABLE events ( + event_id TEXT PRIMARY KEY, + message_id TEXT, + source_channels TEXT, + extracted_at TEXT, + message_published_at TEXT, + source_id TEXT, + action TEXT, + object_id TEXT, + object_name_raw TEXT, + qualifiers TEXT, stroke TEXT, anchor TEXT, category TEXT,