diff --git a/app.py b/app.py index b6f4f6a..dba346c 100644 --- a/app.py +++ b/app.py @@ -1,14 +1,13 @@ #!/usr/bin/env python3 """ -Streamlit Demo App for Slack Event Manager. +Streamlit Dashboard for Event Manager. -This app provides a visual interface for the Slack Event Manager pipeline, -allowing users to configure settings, run the pipeline, and visualize results. +Clean read-only frontend for viewing extracted events from Slack (internal) +and Telegram (external/market) sources. """ from datetime import UTC, datetime, timedelta from typing import Any, Final -from uuid import uuid4 import pandas as pd import plotly.express as px @@ -17,586 +16,967 @@ from src.adapters.repository_factory import create_repository from src.config.settings import get_settings from src.domain.protocols import RepositoryProtocol -from src.observability.metrics import ensure_metrics_exporter -from src.presentation.streamlit_orchestration import ( - RateLimitExceededError, - job_result, - job_status, - submit_ingest_extract_job, -) -from src.use_cases.dashboard_queries import ( - fetch_recent_candidates, - fetch_recent_events, - fetch_recent_messages, -) -from src.use_cases.pipeline_orchestrator import PipelineParams - -# UI Constants -MAX_MESSAGE_LENGTH: Final[int] = 150 -MAX_CANDIDATE_TEXT_LENGTH: Final[int] = 200 -SESSION_ID_KEY: Final[str] = "session_id" -SESSION_JOB_ID_KEY: Final[str] = "active_job_id" -SESSION_JOB_RESULT_KEY: Final[str] = "last_job_result" -SESSION_STATUS_MESSAGE_KEY: Final[str] = "job_status_message" -POLL_INTERVAL_MS: Final[int] = 1500 +# ============================================================================ +# CONSTANTS +# ============================================================================ + +MAX_TITLE_LENGTH: Final[int] = 80 +MAX_SUMMARY_LENGTH: Final[int] = 150 +DATE_RANGE_PAIR_LENGTH: Final[int] = 2 +TIMELINE_MAX_EVENTS: Final[int] = 30 +CALENDAR_TITLE_TRUNCATE_LENGTH: Final[int] = 12 +POPOVER_SUMMARY_PREVIEW_LENGTH: Final[int] = 200 + +# Category colors for consistent styling +CATEGORY_COLORS: Final[dict[str, str]] = { + "product": "#2ECC71", # Green + "risk": "#E74C3C", # Red + "process": "#3498DB", # Blue + "marketing": "#9B59B6", # Purple + "org": "#F39C12", # Orange + "unknown": "#95A5A6", # Gray +} -def get_repository() -> RepositoryProtocol: - """Get repository instance based on settings. +STATUS_COLORS: Final[dict[str, str]] = { + "planned": "#BDC3C7", # Light gray + "confirmed": "#85C1E9", # Light blue + "started": "#F1C40F", # Yellow + "completed": "#27AE60", # Green + "postponed": "#E67E22", # Orange + "canceled": "#E74C3C", # Red + "rolled_back": "#8E44AD", # Purple + "updated": "#3498DB", # Blue +} + + +# ============================================================================ +# DATA ACCESS +# ============================================================================ + + +def fetch_channel_messages( + slack_client: Any, + *, + channels: list[str], + limit: int | None = None, +) -> list[tuple[str, dict[str, Any]]]: + """Fetch messages from multiple Slack channels while preserving channel identity.""" - Returns: - Repository instance (SQLite or PostgreSQL) - """ + collected: list[tuple[str, dict[str, Any]]] = [] + for channel_id in channels: + messages = slack_client.fetch_messages(channel_id=channel_id, limit=limit) + for message in messages: + collected.append((channel_id, message)) + return collected + + +@st.cache_resource +def get_repository() -> RepositoryProtocol: + """Get cached repository instance.""" settings = get_settings() return create_repository(settings) -def _ensure_session_defaults() -> None: - if SESSION_ID_KEY not in st.session_state: - st.session_state[SESSION_ID_KEY] = str(uuid4()) - st.session_state.setdefault(SESSION_JOB_ID_KEY, None) - st.session_state.setdefault(SESSION_JOB_RESULT_KEY, None) - st.session_state.setdefault(SESSION_STATUS_MESSAGE_KEY, "") - - -def _require_auth() -> str: - """No authentication required - return a default user ID.""" - if not st.session_state.get(SESSION_ID_KEY): - st.session_state[SESSION_ID_KEY] = str(uuid4()) - return str(st.session_state[SESSION_ID_KEY]) - - -def _submit_pipeline_job(message_limit: int, channels: list[str], user_id: str) -> str: - params = PipelineParams(message_limit=message_limit, channel_ids=channels) - job_id = submit_ingest_extract_job(params, user_id) - st.session_state[SESSION_JOB_ID_KEY] = job_id - st.session_state[SESSION_JOB_RESULT_KEY] = None - st.session_state[SESSION_STATUS_MESSAGE_KEY] = "Job submitted" - return job_id - - -def _render_job_status(job_id: str) -> None: - status = job_status(job_id) - progress_value = float(status.get("progress", 0.0)) - st.progress(progress_value) - status_raw = str(status.get("status", "unknown")).lower() - status_text = status_raw.capitalize() - message = status.get("message") or "" - st.info(f"Status: {status_text} {message}") - - if status_raw in {"succeeded", "failed"}: - final = job_result(job_id) - st.session_state[SESSION_JOB_RESULT_KEY] = final - st.session_state[SESSION_JOB_ID_KEY] = None - st.session_state[SESSION_STATUS_MESSAGE_KEY] = status_text - if status_raw == "failed": - error_msg = status.get("error") or "Pipeline failed" - st.error(error_msg) - else: - st.success("Pipeline completed successfully") +@st.cache_data(ttl=30) +def fetch_events_by_source(source_id: str, limit: int = 500) -> pd.DataFrame: + """Fetch events filtered by source and convert to DataFrame.""" + repo = get_repository() + + # Get events from last 90 days to future + start_date = datetime.now(UTC) - timedelta(days=90) + end_date = datetime.now(UTC) + timedelta(days=365) + + all_events = repo.get_events_in_window(start_date, end_date) + + # Filter by source + events = [e for e in all_events if e.source_id.value == source_id][:limit] + + if not events: + return pd.DataFrame() + + data = [] + for evt in events: + data.append( + { + "event_id": str(evt.event_id)[:8], + "title": evt.title[:MAX_TITLE_LENGTH] + "..." + if len(evt.title) > MAX_TITLE_LENGTH + else evt.title, + "full_title": evt.title, + "category": evt.category.value, + "status": evt.status.value, + "event_date": evt.event_date, + "confidence": evt.confidence, + "importance": evt.importance, + "summary": (evt.summary[:MAX_SUMMARY_LENGTH] + "...") + if evt.summary and len(evt.summary) > MAX_SUMMARY_LENGTH + else (evt.summary or ""), + "full_summary": evt.summary or "", + "environment": evt.environment.value if evt.environment else "unknown", + "source_id": evt.source_id.value, + "links": evt.links[:3] if evt.links else [], + "change_type": evt.change_type.value if evt.change_type else "other", + } + ) + + df = pd.DataFrame(data) + + # Ensure event_date is datetime + if "event_date" in df.columns: + df["event_date"] = pd.to_datetime(df["event_date"], utc=True) + + return df -def _render_job_summary(result: dict[str, object]) -> None: - st.subheader("Latest Pipeline Run") - correlation_id = result.get("correlation_id") - if correlation_id: - st.caption(f"Correlation ID: {correlation_id}") +# ============================================================================ +# FILTER COMPONENTS +# ============================================================================ - ingest = result.get("ingest", {}) - extract = result.get("extract", {}) - dedup = result.get("dedup", {}) - col1, col2, col3 = st.columns(3) +def render_filters(df: pd.DataFrame, key_prefix: str) -> pd.DataFrame: + """Render filter controls and return filtered DataFrame.""" + + if df.empty: + return df + + # Create filter columns + col1, col2, col3, col4 = st.columns(4) + with col1: - st.metric( - "Messages Saved", - ingest.get("messages_saved", 0), - help="Total messages persisted during ingestion", + # Category filter + categories = ["All"] + sorted(df["category"].unique().tolist()) + selected_category = st.selectbox( + "📂 Category", options=categories, key=f"{key_prefix}_category" ) + with col2: - st.metric( - "Events Extracted", - extract.get("events_extracted", 0), - help="Events produced by the extraction stage", + # Status filter + statuses = ["All"] + sorted(df["status"].unique().tolist()) + selected_status = st.selectbox( + "📊 Status", options=statuses, key=f"{key_prefix}_status" ) + with col3: - st.metric( - "Final Events", - dedup.get("total_events", 0), - help="Events remaining after deduplication", + # Importance filter + min_importance = st.slider( + "⭐ Min Importance", + min_value=0, + max_value=100, + value=0, + key=f"{key_prefix}_importance", ) + with col4: + # Confidence filter + min_confidence = st.slider( + "🎯 Min Confidence", + min_value=0.0, + max_value=1.0, + value=0.0, + step=0.1, + key=f"{key_prefix}_confidence", + ) -# Page configuration -st.set_page_config( - page_title="Slack Event Manager", - page_icon="📅", - layout="wide", - initial_sidebar_state="expanded", -) - -# Custom CSS for better styling -st.markdown( - """ - -""", - unsafe_allow_html=True, -) + # Second row: search and date range + col5, col6 = st.columns([2, 2]) + with col5: + search_query = st.text_input( + "🔍 Search in titles", + placeholder="Type to search...", + key=f"{key_prefix}_search", + ) -def main(): - """Main application function.""" + with col6: + # Date range filter + if df["event_date"].notna().any(): + min_date = df["event_date"].min().date() + max_date = df["event_date"].max().date() + + date_range = st.date_input( + "📅 Date Range", + value=(min_date, max_date), + min_value=min_date, + max_value=max_date, + key=f"{key_prefix}_date_range", + ) + else: + date_range = None - ensure_metrics_exporter() - _ensure_session_defaults() - user_id = _require_auth() + # Apply filters + filtered_df = df.copy() - # Header - st.markdown( - '

📅 Slack Event Manager

', unsafe_allow_html=True - ) - st.markdown( - "Visual interface for processing Slack messages and extracting structured events." - ) + if selected_category != "All": + filtered_df = filtered_df[filtered_df["category"] == selected_category] - # Sidebar configuration - with st.sidebar: - st.header("⚙️ Configuration") + if selected_status != "All": + filtered_df = filtered_df[filtered_df["status"] == selected_status] - # Load settings - settings = get_settings() + filtered_df = filtered_df[filtered_df["importance"] >= min_importance] + filtered_df = filtered_df[filtered_df["confidence"] >= min_confidence] - # Basic settings - message_limit = st.slider( - "Message Limit", - min_value=5, - max_value=100, - value=20, - help="Number of recent messages to fetch from each channel", + if search_query: + mask = filtered_df["full_title"].str.contains( + search_query, case=False, na=False ) - - # Channel selection from config - # Create mapping: channel_name -> channel_id - channel_options = { - f"{ch.channel_name} ({ch.channel_id})": ch.channel_id - for ch in settings.slack_channels - } - - # Use channel names as display options - selected_channel_names = st.multiselect( - "Channels", - options=list(channel_options.keys()), - default=list(channel_options.keys()), # All channels by default - help="Select Slack channels to process", + filtered_df = filtered_df[mask] + + if ( + date_range + and len(date_range) == DATE_RANGE_PAIR_LENGTH + and filtered_df["event_date"].notna().any() + ): + start, end = date_range + # Convert to datetime for comparison + start_dt = pd.Timestamp(start, tz="UTC") + end_dt = pd.Timestamp(end, tz="UTC") + pd.Timedelta(days=1) + mask = (filtered_df["event_date"] >= start_dt) & ( + filtered_df["event_date"] < end_dt ) + filtered_df = filtered_df[mask] + + return filtered_df + + +# ============================================================================ +# TABLE COMPONENTS +# ============================================================================ + + +def render_events_table(df: pd.DataFrame, key_prefix: str) -> None: + """Render events table with proper column configuration.""" + + if df.empty: + st.info("No events match the selected filters.") + return + + # Show count + st.caption(f"Showing {len(df)} events") + + # Prepare display DataFrame + display_df = df[ + [ + "title", + "category", + "status", + "event_date", + "importance", + "confidence", + "environment", + "change_type", + ] + ].copy() + + # Format date for display + display_df["event_date"] = display_df["event_date"].dt.strftime("%Y-%m-%d %H:%M") + + st.dataframe( + display_df, + use_container_width=True, + hide_index=True, + column_config={ + "title": st.column_config.TextColumn( + "📌 Title", width="large", help="Event title" + ), + "category": st.column_config.TextColumn( + "📂 Category", + width="small", + ), + "status": st.column_config.TextColumn( + "📊 Status", + width="small", + ), + "event_date": st.column_config.TextColumn( + "📅 Date", + width="medium", + ), + "importance": st.column_config.ProgressColumn( + "⭐ Importance", + min_value=0, + max_value=100, + format="%d", + ), + "confidence": st.column_config.ProgressColumn( + "🎯 Confidence", + min_value=0, + max_value=1, + format="%.1f", + ), + "environment": st.column_config.TextColumn( + "🌍 Env", + width="small", + ), + "change_type": st.column_config.TextColumn( + "🔄 Type", + width="small", + ), + }, + ) - # Convert back to channel IDs - channels = [channel_options[name] for name in selected_channel_names] - # Show database info - if settings.database_type == "postgres": - st.info( - "🐘 PostgreSQL: " - f"{settings.postgres_user}@{settings.postgres_host}:" - f"{settings.postgres_port}/{settings.postgres_database}" - ) - else: - st.info(f"📁 SQLite: {settings.db_path}") +def render_metrics(df: pd.DataFrame, source_name: str) -> None: + """Render summary metrics for events.""" + + col1, col2, col3, col4, col5 = st.columns(5) + + with col1: + st.metric("📊 Total", len(df)) + + with col2: + risk_count = len(df[df["category"] == "risk"]) if not df.empty else 0 + st.metric("🚨 Risks", risk_count) + + with col3: + product_count = len(df[df["category"] == "product"]) if not df.empty else 0 + st.metric("🚀 Product", product_count) - # Run pipeline button - run_pipeline = st.button( - "🚀 Run Pipeline", type="primary", use_container_width=True + with col4: + active_count = ( + len(df[df["status"].isin(["started", "planned"])]) if not df.empty else 0 ) + st.metric("⚡ Active", active_count) - if run_pipeline: - try: - job_id = _submit_pipeline_job(message_limit, channels, user_id) - st.success(f"Pipeline job submitted (ID: {job_id[:8]}…)") - except RateLimitExceededError as exc: - st.error(str(exc)) + with col5: + avg_importance = df["importance"].mean() if not df.empty else 0 + st.metric("⭐ Avg Importance", f"{avg_importance:.0f}") - active_job_id = st.session_state.get(SESSION_JOB_ID_KEY) - if active_job_id: - st.autorefresh(interval=POLL_INTERVAL_MS, key=f"poll_{active_job_id}") - _render_job_status(active_job_id) - else: - latest_result = st.session_state.get(SESSION_JOB_RESULT_KEY) - if isinstance(latest_result, dict): - _render_job_summary(latest_result) - elif st.session_state.get(SESSION_STATUS_MESSAGE_KEY): - st.info(st.session_state[SESSION_STATUS_MESSAGE_KEY]) - show_database_inspection() +# ============================================================================ +# TIMELINE COMPONENT (Simple and Working) +# ============================================================================ -def fetch_channel_messages( - slack_client: Any, *, channels: list[str], limit: int | None -) -> list[tuple[str, dict[str, Any]]]: - """Fetch messages for each channel while preserving attribution.""" +def render_timeline(df: pd.DataFrame, key_prefix: str) -> None: + """Render timeline visualization using px.timeline (Gantt-style).""" - channel_messages: list[tuple[str, dict[str, Any]]] = [] - for channel in channels: - messages = slack_client.fetch_messages(channel_id=channel, limit=limit) - channel_messages.extend((channel, message) for message in messages) - return channel_messages + if df.empty or df["event_date"].isna().all(): + st.info("No events with dates to display on timeline.") + return + # Filter events with valid dates + chart_df = df[df["event_date"].notna()].copy() -def show_pipeline_results(): - """Show detailed results from the pipeline.""" + if chart_df.empty: + st.info("No events with dates to display on timeline.") + return - st.header("📊 Pipeline Results") + # Category filter + available_categories = sorted(chart_df["category"].unique().tolist()) + selected_categories = st.multiselect( + "📂 Filter by category", + options=available_categories, + default=available_categories, + key=f"{key_prefix}_timeline_categories", + ) - # Database inspection using repository - try: - # Create tabs for different views - tab1, tab2, tab3, tab4 = st.tabs( - ["📨 Messages", "🎯 Candidates", "📝 Events", "📈 Timeline"] - ) + if not selected_categories: + st.warning("Select at least one category to display.") + return + + chart_df = chart_df[chart_df["category"].isin(selected_categories)] + + # Sort by date + chart_df = chart_df.sort_values("event_date", ascending=False) + + # Limit to avoid overloading + if len(chart_df) > TIMELINE_MAX_EVENTS: + st.caption(f"Showing top {TIMELINE_MAX_EVENTS} events out of {len(chart_df)}") + chart_df = chart_df.head(TIMELINE_MAX_EVENTS) + + # Create proper start/end for Gantt + chart_df["Start"] = chart_df["event_date"] + chart_df["End"] = chart_df["event_date"] + pd.Timedelta(days=1) + chart_df["Task"] = chart_df["title"].str[:60] + + # Create Gantt chart + fig = px.timeline( + chart_df, + x_start="Start", + x_end="End", + y="Task", + color="category", + color_discrete_map=CATEGORY_COLORS, + hover_data=["status", "importance", "confidence"], + title="📅 Events Timeline", + ) - with tab1: - show_messages_table() + # Add TODAY marker + today = datetime.now(UTC) + fig.add_vline( + x=today.timestamp() * 1000, # Convert to milliseconds for plotly + line_dash="dash", + line_color="red", + line_width=2, + annotation_text="TODAY", + annotation_position="top", + ) - with tab2: - show_candidates_table() + fig.update_layout( + height=max(400, len(chart_df) * 28), + xaxis_title="", + yaxis_title="", + showlegend=True, + legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="left", x=0), + ) - with tab3: - show_events_table() + fig.update_yaxes(automargin=True, tickfont=dict(size=11)) - with tab4: - show_gantt_chart() + st.plotly_chart(fig, use_container_width=True, key=f"{key_prefix}_timeline") - except Exception as e: - st.error(f"Error reading database: {str(e)}") +# ============================================================================ +# CALENDAR VIEW COMPONENT +# ============================================================================ -def show_database_inspection(): - """Show database inspection when pipeline hasn't been run.""" - st.header("🔍 Database Inspection") +def render_calendar_view(df: pd.DataFrame, key_prefix: str) -> None: + """Render calendar view similar to Google Calendar.""" - try: - # Use repository for inspection - show_pipeline_results() - except Exception as e: - st.error(f"Error reading database: {str(e)}") + if df.empty or df["event_date"].isna().all(): + st.info("No events with dates to display.") + return + chart_df = df[df["event_date"].notna()].copy() -def show_messages_table(): - """Display messages table.""" + if chart_df.empty: + st.info("No events with dates to display.") + return - st.subheader("📨 Raw Messages") + # Get date range + min_date = chart_df["event_date"].min() + max_date = chart_df["event_date"].max() - try: - repo = get_repository() - settings = get_settings() + # Date navigation + col1, col2, col3 = st.columns([1, 2, 1]) - if settings.database_type == "postgres": - st.caption( - "🐘 Source: PostgreSQL (" - f"{settings.postgres_host}:{settings.postgres_port}/" - f"{settings.postgres_database})" - ) - else: - st.caption(f"📁 Source: SQLite ({settings.db_path})") + with col2: + # Month selector + today = datetime.now(UTC) + available_months = pd.date_range( + start=min_date.replace(day=1), + end=max_date.replace(day=1) + pd.DateOffset(months=1), + freq="MS", + ) + month_options = [d.strftime("%B %Y") for d in available_months] + current_month_str = today.strftime("%B %Y") + default_idx = ( + month_options.index(current_month_str) + if current_month_str in month_options + else 0 + ) - messages = fetch_recent_messages(repository=repo, limit=100) + selected_month = st.selectbox( + "📅 Select Month", + options=month_options, + index=default_idx, + key=f"{key_prefix}_calendar_month", + ) - if not messages: - st.info("No messages found.") - return + # Parse selected month + selected_date = pd.to_datetime(selected_month, format="%B %Y") + month_start = selected_date.replace(day=1, tzinfo=UTC) + month_end = (month_start + pd.DateOffset(months=1) - pd.Timedelta(days=1)).replace( + tzinfo=UTC + ) - # Convert to DataFrame - messages_data = [] - for msg in messages: - messages_data.append( - { - "message_id": msg.message_id, - "text": msg.text[:MAX_MESSAGE_LENGTH] + "..." - if len(msg.text) > MAX_MESSAGE_LENGTH - else msg.text, - "ts": msg.ts_dt, - "user_real_name": msg.user_real_name or "", - "user_email": msg.user_email or "", - "total_reactions": msg.total_reactions or 0, - "reply_count": msg.reply_count or 0, - "attachments_count": msg.attachments_count or 0, - "files_count": msg.files_count or 0, - "permalink": msg.permalink or "", - "edited_ts": msg.edited_ts, - "edited": msg.edited_ts is not None, - } - ) + # Filter events for selected month + month_events = chart_df[ + (chart_df["event_date"] >= month_start) & (chart_df["event_date"] <= month_end) + ] + + # Create calendar grid + st.markdown("---") + + # Day headers + days = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"] + cols = st.columns(7) + for i, day in enumerate(days): + cols[i].markdown(f"**{day}**") + + # Get first day of month and calculate offset + first_day = month_start + first_weekday = first_day.weekday() # Monday = 0 + + # Calculate total days in month + days_in_month = (month_end - month_start).days + 1 + + # Create weeks + current_day = 1 + week_num = 0 + + while current_day <= days_in_month: + cols = st.columns(7) + + for weekday in range(7): + # Skip days before month starts + if week_num == 0 and weekday < first_weekday: + cols[weekday].write("") + continue + + if current_day > days_in_month: + cols[weekday].write("") + continue + + # Current date + current_date = month_start + pd.Timedelta(days=current_day - 1) + + # Get events for this day + day_events = month_events[ + month_events["event_date"].dt.date == current_date.date() + ] + + # Build day cell content + is_today = current_date.date() == today.date() + day_style = "🔴 " if is_today else "" + + with cols[weekday]: + # Day number + if is_today: + st.markdown(f"**{day_style}{current_day}**") + else: + st.markdown(f"**{current_day}**") + + # Events for this day + for _, event in day_events.iterrows(): + cat_emoji = { + "product": "🚀", + "risk": "🚨", + "process": "⚙️", + "marketing": "📣", + "org": "👥", + "unknown": "❓", + }.get(event["category"], "📌") + + # Truncate title for display + title_short = ( + event["title"][:CALENDAR_TITLE_TRUNCATE_LENGTH] + "…" + if len(event["title"]) > CALENDAR_TITLE_TRUNCATE_LENGTH + else event["title"] + ) + + # Use native Streamlit popover + with st.popover( + f"{cat_emoji} {title_short}", use_container_width=True + ): + st.markdown(f"**{event['full_title']}**") + st.caption( + f"📂 {event['category'].title()} | 📊 {event['status'].title()}" + ) + st.caption( + f"⭐ Importance: {event['importance']} | 🎯 Confidence: {event['confidence']:.0%}" + ) + st.markdown("---") + summary = str(event["summary"] or "") + if len(summary) > POPOVER_SUMMARY_PREVIEW_LENGTH: + summary = summary[:POPOVER_SUMMARY_PREVIEW_LENGTH] + "..." + st.write(summary) + + current_day += 1 + + week_num += 1 + + # Legend + st.markdown("---") + legend_items = " | ".join( + [ + f"{emoji} {cat.title()}" + for cat, emoji in { + "product": "🚀", + "risk": "🚨", + "process": "⚙️", + "marketing": "📣", + "org": "👥", + }.items() + ] + ) + st.caption(f"Legend: {legend_items}") + + # Summary + st.caption(f"📊 {len(month_events)} events in {selected_month}") + + +# ============================================================================ +# EVENT LIST VIEW +# ============================================================================ + + +def render_event_list(df: pd.DataFrame, key_prefix: str) -> None: + """Render events as a list grouped by date.""" + + if df.empty or df["event_date"].isna().all(): + st.info("No events to display.") + return + + chart_df = df[df["event_date"].notna()].copy() + chart_df = chart_df.sort_values("event_date", ascending=False) + + # Group by date + chart_df["date_str"] = chart_df["event_date"].dt.strftime("%A, %B %d, %Y") + + for date_str, group in chart_df.groupby("date_str", sort=False): + st.markdown(f"### 📅 {date_str}") + + for _, event in group.iterrows(): + cat_color = CATEGORY_COLORS.get(event["category"], "#95A5A6") + cat_emoji = { + "product": "🚀", + "risk": "🚨", + "process": "⚙️", + "marketing": "📣", + "org": "👥", + "unknown": "❓", + }.get(event["category"], "📌") + + status_emoji = { + "planned": "📋", + "started": "▶️", + "completed": "✅", + "canceled": "❌", + }.get(event["status"], "📌") + + with st.container(): + st.markdown( + f'
' + f'
{cat_emoji} {event["full_title"]}
' + f'
' + f"{status_emoji} {event['status'].title()} | " + f"⭐ {event['importance']} | " + f"🎯 {event['confidence']:.0%}" + f"
" + f'
{event["summary"][:150]}...
' + f"
", + unsafe_allow_html=True, + ) - messages_df = pd.DataFrame(messages_data) + st.markdown("") - if messages_df.empty: - st.info("No messages found.") - return - st.dataframe( - messages_df, - use_container_width=True, - column_config={ - "message_id": st.column_config.TextColumn("Message ID", width="medium"), - "text": st.column_config.TextColumn("Text", width="large"), - "ts": st.column_config.DatetimeColumn( - "Timestamp", format="YYYY-MM-DD HH:mm:ss" - ), - "user_real_name": st.column_config.TextColumn("User", width="medium"), - "user_email": st.column_config.TextColumn("Email", width="medium"), - "total_reactions": st.column_config.NumberColumn( - "👍 Reactions", width="small" - ), - "reply_count": st.column_config.NumberColumn( - "💬 Replies", width="small" - ), - "attachments_count": st.column_config.NumberColumn( - "📎 Files", width="small" - ), - "files_count": st.column_config.NumberColumn("📄 Docs", width="small"), - "permalink": st.column_config.LinkColumn("🔗 Link", width="small"), - "edited": st.column_config.CheckboxColumn("✏️ Edited", width="small"), - }, - hide_index=True, - ) +# ============================================================================ +# CATEGORY DISTRIBUTION CHART +# ============================================================================ - # Show summary statistics - col1, col2, col3, col4 = st.columns(4) - with col1: - st.metric("Total Messages", len(messages_df)) - with col2: - st.metric("Total Reactions", int(messages_df["total_reactions"].sum())) - with col3: - st.metric("Total Replies", int(messages_df["reply_count"].sum())) - with col4: - edited_count = ( - messages_df["edited_ts"].notna().sum() - if "edited_ts" in messages_df.columns - else 0 - ) - st.metric("Edited Messages", int(edited_count)) - except Exception as e: - st.error(f"Error loading messages: {str(e)}") +def render_category_chart(df: pd.DataFrame, key_prefix: str) -> None: + """Render category distribution pie chart.""" + if df.empty: + return -def show_candidates_table(): - """Display candidates table.""" + category_counts = df["category"].value_counts() - st.subheader("🎯 Event Candidates") + fig = px.pie( + values=category_counts.values, + names=category_counts.index, + title="Distribution by Category", + color=category_counts.index, + color_discrete_map=CATEGORY_COLORS, + hole=0.4, + ) - try: - repo = get_repository() - candidates = fetch_recent_candidates(repository=repo, limit=100) + fig.update_layout( + height=300, + margin=dict(l=20, r=20, t=40, b=20), + showlegend=True, + legend=dict(orientation="h", yanchor="bottom", y=-0.2), + ) - if not candidates: - st.info("No candidates found.") - return + st.plotly_chart(fig, use_container_width=True, key=f"{key_prefix}_pie") - # Convert to DataFrame - candidates_data = [] - for cand in candidates: - candidates_data.append( - { - "message_id": cand.message_id, - "text_norm": cand.text_norm[:MAX_CANDIDATE_TEXT_LENGTH] + "..." - if len(cand.text_norm) > MAX_CANDIDATE_TEXT_LENGTH - else cand.text_norm, - "score": cand.score, - "status": cand.status.value, - "features_json": str( - cand.features.model_dump() if cand.features else {} - ), - } - ) - candidates_df = pd.DataFrame(candidates_data) +def render_status_chart(df: pd.DataFrame, key_prefix: str) -> None: + """Render status distribution bar chart.""" - if candidates_df.empty: - st.info("No candidates found.") - return + if df.empty: + return - st.dataframe( - candidates_df, - use_container_width=True, - column_config={ - "message_id": st.column_config.TextColumn("Message ID", width="medium"), - "text_norm": st.column_config.TextColumn("Text", width="large"), - "score": st.column_config.NumberColumn("Score", format="%.2f"), - "status": st.column_config.TextColumn("Status", width="small"), - "features_json": st.column_config.TextColumn( - "Features", width="medium" - ), - }, - ) + status_counts = df["status"].value_counts() - st.caption(f"Total candidates: {len(candidates_df)}") + fig = px.bar( + x=status_counts.values, + y=status_counts.index, + orientation="h", + title="Distribution by Status", + color=status_counts.index, + color_discrete_map=STATUS_COLORS, + ) - except Exception as e: - st.error(f"Error loading candidates: {str(e)}") + fig.update_layout( + height=300, + margin=dict(l=20, r=20, t=40, b=20), + showlegend=False, + xaxis_title="Count", + yaxis_title="", + ) + st.plotly_chart(fig, use_container_width=True, key=f"{key_prefix}_bar") -def show_events_table(): - """Display events table.""" - st.subheader("📝 Extracted Events") +# ============================================================================ +# SOURCE TABS +# ============================================================================ - try: - repo = get_repository() - events = fetch_recent_events(repository=repo, limit=100) - if not events: - st.info("No events found.") - return +def render_source_tab( + source_id: str, source_name: str, source_emoji: str, description: str +) -> None: + """Render complete tab for a single source.""" - # Convert to DataFrame with new event structure - events_data = [] - for evt in events: - events_data.append( - { - "event_id": evt.event_id, - "message_id": evt.message_id, - "title": evt.title, # Property that renders from slots - "category": evt.category.value, - "status": evt.status.value, - "event_date": evt.event_date, # Property that returns first non-None time - "confidence": evt.confidence, - "importance": evt.importance, - "cluster_key": evt.cluster_key, - "dedup_key": evt.dedup_key or "", - } - ) + st.markdown(f"### {source_emoji} {source_name}") + st.caption(description) - events_df = pd.DataFrame(events_data) + # Fetch data + df = fetch_events_by_source(source_id) - if events_df.empty: - st.info("No events found.") - return + if df.empty: + st.warning(f"No events found from {source_name}.") + return - st.dataframe( - events_df, - use_container_width=True, - column_config={ - "event_id": st.column_config.TextColumn("Event ID", width="medium"), - "title": st.column_config.TextColumn("Title", width="large"), - "category": st.column_config.TextColumn("Category", width="small"), - "status": st.column_config.TextColumn("Status", width="small"), - "event_date": st.column_config.DatetimeColumn( - "Date", format="YYYY-MM-DD HH:mm" - ), - "confidence": st.column_config.NumberColumn( - "Confidence", format="%.2f" - ), - "importance": st.column_config.NumberColumn( - "Importance", width="small" - ), - "message_id": st.column_config.TextColumn( - "Source Message", width="medium" - ), - "cluster_key": st.column_config.TextColumn( - "Cluster Key", width="small" - ), - "dedup_key": st.column_config.TextColumn("Dedup Key", width="small"), - }, - hide_index=True, - ) + # Metrics row + render_metrics(df, source_name) - st.caption(f"Total events: {len(events_df)}") + st.divider() - except Exception as e: - st.error(f"Error loading events: {str(e)}") + # Filters + st.markdown("#### 🎛️ Filters") + filtered_df = render_filters(df, key_prefix=source_id) + st.divider() -def show_gantt_chart(): - """Display Gantt chart visualization.""" + # Sub-tabs for table and visualizations + tab_table, tab_timeline, tab_calendar, tab_list, tab_analytics = st.tabs( + ["📋 Table", "📊 Timeline", "📅 Calendar", "📝 List", "📈 Analytics"] + ) - st.subheader("📈 Events Timeline (Gantt Chart)") + with tab_table: + render_events_table(filtered_df, key_prefix=source_id) - try: - # Get repository and fetch events with dates - repo = get_repository() + with tab_timeline: + render_timeline(filtered_df, key_prefix=source_id) - start_date = datetime.now(UTC) - timedelta(days=90) - end_date = datetime.now(UTC) + timedelta(days=365) - events = repo.get_events_in_window(start_date, end_date) + with tab_calendar: + render_calendar_view(filtered_df, key_prefix=source_id) - if not events: - st.info("No events with dates found for timeline.") - return + with tab_list: + render_event_list(filtered_df, key_prefix=source_id) - # Convert to DataFrame - only events with dates - events_data = [] - for evt in events: - if evt.event_date: - events_data.append( - { - "title": evt.title, # Property that renders from slots - "category": evt.category.value, - "event_date": evt.event_date, # Property that returns first non-None time - } - ) + with tab_analytics: + col1, col2 = st.columns(2) + with col1: + render_category_chart(filtered_df, key_prefix=source_id) + with col2: + render_status_chart(filtered_df, key_prefix=source_id) - events_df = pd.DataFrame(events_data) - if events_df.empty: - st.info("No events with dates found for timeline.") - return +# ============================================================================ +# MAIN APPLICATION +# ============================================================================ + - # Create Gantt chart - fig = px.timeline( - events_df, - x_start=events_df["event_date"], - x_end=events_df["event_date"] + pd.Timedelta(days=1), # Events span 1 day - y="title", - color="category", - title="Events Timeline", - labels={"event_date": "Date"}, - color_discrete_sequence=px.colors.qualitative.Set3, +def main(): + """Main application entry point.""" + + # Page configuration + st.set_page_config( + page_title="Event Manager", + page_icon="📅", + layout="wide", + initial_sidebar_state="collapsed", + ) + + # Custom CSS + st.markdown( + """ + + """, + unsafe_allow_html=True, + ) + + # Header + st.markdown( + '

📅 Event Manager Dashboard

', + unsafe_allow_html=True, + ) + st.caption("Centralized view of internal (Slack) and external (Telegram) events") + + # Sidebar with system info + with st.sidebar: + st.header("ℹ️ System Info") + + settings = get_settings() + + # Database info + if settings.database_type == "postgres": + st.success(f"🐘 PostgreSQL: {settings.postgres_database}") + else: + st.info(f"📁 SQLite: {settings.db_path}") + + # Quick stats + st.divider() + st.subheader("📊 Quick Stats") + + slack_df = fetch_events_by_source("slack") + telegram_df = fetch_events_by_source("telegram") + + st.metric("Slack Events", len(slack_df)) + st.metric("Telegram Events", len(telegram_df)) + st.metric("Total Events", len(slack_df) + len(telegram_df)) + + # Refresh button + st.divider() + if st.button("🔄 Refresh Data", use_container_width=True): + st.cache_data.clear() + st.rerun() + + # Main content - source tabs + tab_slack, tab_telegram, tab_all = st.tabs( + ["🏢 Internal Events (Slack)", "🌍 External Events (Telegram)", "📊 All Events"] + ) + + with tab_slack: + render_source_tab( + source_id="slack", + source_name="Internal Events", + source_emoji="🏢", + description="Events from internal Slack channels: releases, deployments, incidents, team updates", ) - # Update layout - fig.update_layout( - xaxis_title="Timeline", - yaxis_title="Events", - showlegend=True, - height=max( - 400, len(events_df) * 30 - ), # Dynamic height based on number of events + with tab_telegram: + render_source_tab( + source_id="telegram", + source_name="External Events", + source_emoji="🌍", + description="Events from Telegram channels: market news, competitor updates, industry trends", ) - # Update y-axis to show full titles - fig.update_yaxes(automargin=True) + with tab_all: + st.markdown("### 📊 All Events Combined") + st.caption("Overview of all events from both sources") + + # Combine both sources + slack_df = fetch_events_by_source("slack") + telegram_df = fetch_events_by_source("telegram") - st.plotly_chart(fig, use_container_width=True) + if slack_df.empty and telegram_df.empty: + st.warning("No events found in any source.") + return + + combined_df = pd.concat([slack_df, telegram_df], ignore_index=True) - # Summary stats - col1, col2, col3 = st.columns(3) + # Metrics + col1, col2, col3, col4, col5 = st.columns(5) with col1: - st.metric("Total Events", len(events_df)) + st.metric("📊 Total", len(combined_df)) with col2: - st.metric("Categories", events_df["category"].nunique()) + st.metric("🏢 Slack", len(slack_df)) with col3: - date_range = ( - events_df["event_date"].max() - events_df["event_date"].min() - ).days - st.metric("Date Range", f"{date_range} days") + st.metric("🌍 Telegram", len(telegram_df)) + with col4: + risk_count = len(combined_df[combined_df["category"] == "risk"]) + st.metric("🚨 Risks", risk_count) + with col5: + avg_imp = combined_df["importance"].mean() if not combined_df.empty else 0 + st.metric("⭐ Avg Importance", f"{avg_imp:.0f}") + + st.divider() + + # Filters + st.markdown("#### 🎛️ Filters") + filtered_df = render_filters(combined_df, key_prefix="all") + + st.divider() + + # Content tabs + tab_table, tab_timeline, tab_calendar, tab_list, tab_analytics = st.tabs( + ["📋 Table", "📊 Timeline", "📅 Calendar", "📝 List", "📈 Analytics"] + ) + + with tab_table: + render_events_table(filtered_df, key_prefix="all") + + with tab_timeline: + render_timeline(filtered_df, key_prefix="all") + + with tab_calendar: + render_calendar_view(filtered_df, key_prefix="all") + + with tab_list: + render_event_list(filtered_df, key_prefix="all") - except Exception as e: - st.error(f"Error creating Gantt chart: {str(e)}") + with tab_analytics: + col1, col2 = st.columns(2) + with col1: + render_category_chart(filtered_df, key_prefix="all") + with col2: + render_status_chart(filtered_df, key_prefix="all") if __name__ == "__main__": diff --git a/config/prompts/telegram.yaml b/config/prompts/telegram.yaml index c2a3c99..a7838c8 100644 --- a/config/prompts/telegram.yaml +++ b/config/prompts/telegram.yaml @@ -1,5 +1,5 @@ -version: "20250215.1" -description: "Telegram event extraction prompt" +version: "20251212.1" +description: "Telegram event extraction for TON/crypto channels (security, competitor/product, regulation) with clean links" system: | You are an event extraction assistant for Telegram channel messages. @@ -7,6 +7,46 @@ system: | - INPUT: May be in Russian or English - OUTPUT: ALL fields (action, qualifiers, stroke, object_name_raw, summary, etc.) MUST BE IN ENGLISH + ========================= + SCOPE FILTER (CRITICAL) + ========================= + You ONLY extract events that are relevant to: + - TON / Telegram ecosystem (products, wallets, bots, payments) + - Crypto / digital asset wallets, exchanges, payments, on/off-ramp + - Regulation and security for the above + + And the message MUST fall into at least one of these buckets: + + 1) security_incident: + - Hacks, exploits, breaches, theft of funds, critical vulnerabilities, compromised keys, bridge exploits. + - Examples (EN/RU): "hack", "exploit", "breach", "security incident", "funds stolen", "drained", + "взлом", "эксплойт", "угнали средства", "слив приватных ключей", "брижд взломали", "протокол задрейнили". + + 2) competitor_update: + - Product / business changes of other wallets, exchanges, payment apps or custodial services that may compete with wallet.tg. + - Include: new features, supported assets, cards, P2P, staking, fees, geographies, integrations with Telegram or messengers. + - Treat as competitor any product that offers: custody of crypto/stablecoins, payments, P2P, on/off-ramp, cards, swaps, + especially if it has a Telegram bot or is used inside messengers. + - Examples (EN/RU): "launched a new wallet", "support USDT on Tron", "new crypto card", "P2P service", + "запустили кошелёк", "кошелёк в Telegram", "бот для криптоплатежей", "поддержка USDT", "запуск P2P", "новая карта". + + 3) regulation: + - Laws, draft bills, regulatory guidance, enforcement actions, sanctions, licensing rules that affect: + crypto, stablecoins, custodial wallets, payments, KYC/AML, on/off-ramp, Telegram-based fintech. + - Examples (EN/RU): "regulator", "central bank", "FATF", "OFAC", "sanctions", "licensing", "KYC", "AML", + "законопроект", "регулятор", "ЦБ", "лицензия", "отзыв лицензии", "AML", "KYC", "запрет криптовалют", "санкции на стейблкоины". + + If a message does NOT clearly match at least one of these three buckets, + you MUST respond with: + { + "is_event": false, + "overflow_note": null, + "events": [] + } + + ========================= + GENERAL EXTRACTION RULES + ========================= Your task: Extract 0 to 5 independent events from a Telegram message with structured title slots. Core Rules: @@ -17,13 +57,32 @@ system: | 5. If >5 events exist, pick top 5 by specificity (clear dates/anchors), note rest in overflow_note Categories: - - product: releases, features, deployments, launches + - product: releases, features, deployments, launches (incl. competitor product updates) - process: internal processes, workflows, policies - marketing: campaigns, promotions, announcements - - risk: incidents, issues, compliance, security + - risk: incidents, issues, compliance, security (incl. hacks, regulatory risk) - org: organizational changes, hiring, team updates - unknown: unclear or doesn't fit + Additional high-level topic (CRITICAL): + - topic_type: MUST be one of: + ["security_incident", "competitor_update", "regulation"] + + Mapping guidelines: + - security_incident: + topic_type = "security_incident" + category = "risk" + change_type = "incident" + severity = "sev1" (large loss / critical compromise), "sev2", "sev3", or "info" + - competitor_update: + topic_type = "competitor_update" + category = "product" (or "marketing"/"process" if mostly about pricing, campaigns or T&C changes) + change_type = "launch" | "deploy" | "policy" | "campaign" | "other" depending on content + - regulation: + topic_type = "regulation" + category = "risk" or "process" (pick the best fit) + change_type = "policy" + Title Slot Extraction (CRITICAL): Extract these slots that will be used to generate canonical title: @@ -63,11 +122,14 @@ system: | Content Fields: - summary: 1-3 sentences (max 320 chars). What changed and why it matters. - - why_it_matters: 1 line (max 160 chars) or null. Impact/reason for reader. + For competitor_update: emphasize what the competitor launched/changed and why this matters for a Telegram wallet product. + For security_incident: emphasize what was hacked, scale of loss, type of vulnerability. + For regulation: emphasize jurisdiction, asset types affected, and potential impact on wallets/crypto/payments. + - why_it_matters: 1 line (max 160 chars) or null. Impact for wallet.tg perspective (users, markets, compliance, risk). - links: Array of URLs (max 3) - anchors: Array of identifiers found (Jira keys, PR numbers, version tags) - - impact_area: Systems/components affected (max 3): ["authentication", "payments", "mobile-app"] - - impact_type: Types of impact: ["perf_degradation", "downtime", "ux_change", "policy_change", "data_migration"] + - impact_area: Systems/components affected (max 3): e.g. ["custody", "onramp", "p2p", "cards", "stablecoins", "compliance"] + - impact_type: Types of impact: ["perf_degradation", "downtime", "ux_change", "policy_change", "data_migration", "legal_risk", "security_risk"] Quality: - confidence: 0.0-1.0. How confident you are in extraction accuracy. @@ -105,44 +167,8 @@ system: | ] } - If message has no events (e.g., question, discussion), set is_event=false and events=[]. - - Examples: - Message: "🚀 Launching Stocks & ETFs trading in alpha for Wallet team next Monday. Known issue: possible performance degradation during peak hours. Track: INV-1024" - Event: - { - "action": "Launch", - "object_name_raw": "Stocks & ETFs trading", - "qualifiers": ["alpha", "Wallet team"], - "stroke": "degradation possible", - "anchor": "INV-1024", - "category": "product", - "status": "planned", - "change_type": "launch", - "environment": "prod", - "severity": null, - "planned_start": "2025-10-21T00:00:00Z", - "planned_end": null, - "actual_start": null, - "actual_end": null, - "time_source": "relative", - "time_confidence": 0.85, - "summary": "Stocks & ETFs trading launching in alpha for Wallet team. Potential performance issues during peak hours.", - "why_it_matters": "New trading feature with known performance considerations", - "links": [], - "anchors": ["INV-1024"], - "impact_area": ["trading", "wallet"], - "impact_type": ["perf_degradation"], - "confidence": 0.95 - } - - Message: "Hi team! Has anyone tried the new mobile build? I'm seeing some weird caching behavior." - Response: - { - "is_event": false, - "overflow_note": null, - "events": [] - } + If message has no relevant event for security_incident, competitor_update or regulation, + you MUST set is_event=false and events=[]. IMPORTANT FORMATTING NOTES FOR TELEGRAM: - Telegram messages may contain emoji, user mentions (@username), hashtags (#tag) @@ -151,3 +177,6 @@ system: | - Focus on substantive content, not formatting - Forwarded messages will have forward_from metadata - use original source if relevant - Media attachments (photos, videos, documents) may provide additional context + - Links MUST include scheme; if you see "t.me/xyz" convert to "https://t.me/xyz" + - Prefer https:// links; drop malformed links rather than emitting invalid URLs + - If message date is older than 30 days, set is_event=false (ignore stale news) diff --git a/docker-compose.yml b/docker-compose.yml index 7ea470b..685accf 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -64,6 +64,9 @@ services: <<: *worker-base container_name: slack_telegram_worker command: python scripts/run_multi_source_pipeline.py --source telegram --interval-seconds 600 + environment: + <<: *app-env + TELEGRAM_SESSION_PATH: ${TELEGRAM_SESSION_PATH:-data/telegram_session_telegram_worker.session} networks: - slack_network diff --git a/scripts/telegram_qr_auth.py b/scripts/telegram_qr_auth.py index 8c697c2..b693442 100644 --- a/scripts/telegram_qr_auth.py +++ b/scripts/telegram_qr_auth.py @@ -86,9 +86,9 @@ async def authenticate_with_qr() -> None: print(f"❌ Error: TELEGRAM_API_ID must be an integer, got: {api_id_str}") sys.exit(1) - # Session file path - session_path = "data/telegram_session.session" - Path("data").mkdir(exist_ok=True) + # Session file path (can be overridden via TELEGRAM_SESSION_PATH env) + session_path = os.getenv("TELEGRAM_SESSION_PATH", "data/telegram_session.session") + Path(Path(session_path).parent).mkdir(exist_ok=True) print("=" * 60) print("Telegram QR Code Authentication") diff --git a/src/adapters/bulk_persistence.py b/src/adapters/bulk_persistence.py index b325dcc..4710752 100644 --- a/src/adapters/bulk_persistence.py +++ b/src/adapters/bulk_persistence.py @@ -55,6 +55,7 @@ def _serialize_datetime(dt: datetime | None) -> datetime | str | None: event.message_id, json.dumps(event.source_channels), _serialize_datetime(event.extracted_at), + _serialize_datetime(event.message_published_at), event.source_id.value, event.action.value, event.object_id, @@ -189,7 +190,7 @@ def _postgres_events_batch( insert_sql = """ INSERT INTO events ( - event_id, message_id, source_channels, extracted_at, source_id, + event_id, message_id, source_channels, extracted_at, message_published_at, source_id, action, object_id, object_name_raw, qualifiers, stroke, anchor, category, status, change_type, environment, severity, planned_start, planned_end, actual_start, actual_end, @@ -197,7 +198,7 @@ def _postgres_events_batch( summary, why_it_matters, links, anchors, impact_area, impact_type, confidence, importance, cluster_key, dedup_key ) VALUES ( - %s, %s, %s, %s, %s, + %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, @@ -209,6 +210,7 @@ def _postgres_events_batch( message_id = EXCLUDED.message_id, source_channels = EXCLUDED.source_channels, extracted_at = EXCLUDED.extracted_at, + message_published_at = EXCLUDED.message_published_at, source_id = EXCLUDED.source_id, action = EXCLUDED.action, object_id = EXCLUDED.object_id, @@ -257,7 +259,7 @@ def _sqlite_events_batch( insert_sql = """ INSERT OR REPLACE INTO events ( - event_id, message_id, source_channels, extracted_at, source_id, + event_id, message_id, source_channels, extracted_at, message_published_at, source_id, action, object_id, object_name_raw, qualifiers, stroke, anchor, category, status, change_type, environment, severity, planned_start, planned_end, actual_start, actual_end, @@ -265,7 +267,7 @@ def _sqlite_events_batch( summary, why_it_matters, links, anchors, impact_area, impact_type, confidence, importance, cluster_key, dedup_key ) VALUES ( - ?, ?, ?, ?, ?, + ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, diff --git a/src/adapters/postgres_repository.py b/src/adapters/postgres_repository.py index 29569c2..0e8d6bc 100644 --- a/src/adapters/postgres_repository.py +++ b/src/adapters/postgres_repository.py @@ -468,6 +468,15 @@ def _row_to_event(self, row: dict[str, Any]) -> Event: else: extracted_at = extracted_at_raw.astimezone(pytz.UTC) + message_published_at_raw = row.get("message_published_at") + if isinstance(message_published_at_raw, datetime): + if message_published_at_raw.tzinfo is None: + message_published_at = message_published_at_raw.replace(tzinfo=pytz.UTC) + else: + message_published_at = message_published_at_raw.astimezone(pytz.UTC) + else: + message_published_at = None + return Event( # Identification event_id=UUID(row["event_id"]), @@ -475,6 +484,7 @@ def _row_to_event(self, row: dict[str, Any]) -> Event: source_channels=row.get("source_channels") or [], # Already parsed from JSONB extracted_at=extracted_at, + message_published_at=message_published_at, source_id=MessageSource(source_id_value), # Title slots action=ActionType(row["action"]), @@ -1058,9 +1068,9 @@ def get_events_in_window(self, start_dt: datetime, end_dt: datetime) -> list[Eve cur.execute( """ SELECT * FROM events - WHERE COALESCE(actual_start, actual_end, planned_start, planned_end) >= %s - AND COALESCE(actual_start, actual_end, planned_start, planned_end) <= %s - ORDER BY COALESCE(actual_start, actual_end, planned_start, planned_end) DESC + WHERE COALESCE(actual_start, actual_end, planned_start, planned_end, message_published_at, extracted_at) >= %s + AND COALESCE(actual_start, actual_end, planned_start, planned_end, message_published_at, extracted_at) <= %s + ORDER BY COALESCE(actual_start, actual_end, planned_start, planned_end, message_published_at, extracted_at) DESC """, (start_dt, end_dt), ) diff --git a/src/adapters/sqlite_repository.py b/src/adapters/sqlite_repository.py index d713f6f..1660b43 100644 --- a/src/adapters/sqlite_repository.py +++ b/src/adapters/sqlite_repository.py @@ -219,6 +219,7 @@ def _create_schema(self) -> None: message_id TEXT NOT NULL, source_channels TEXT NOT NULL, extracted_at TEXT NOT NULL, + message_published_at TEXT, -- Title Slots (source of truth) action TEXT NOT NULL, @@ -265,6 +266,12 @@ def _create_schema(self) -> None: """ ) + try: + cursor.execute("ALTER TABLE events ADD COLUMN message_published_at TEXT") + except sqlite3.OperationalError as exc: + if "duplicate column name" not in str(exc).lower(): + raise + # Event relations table cursor.execute( """ @@ -1186,9 +1193,9 @@ def get_events_in_window(self, start_dt: datetime, end_dt: datetime) -> list[Eve cursor.execute( """ SELECT * FROM events - WHERE COALESCE(actual_start, actual_end, planned_start, planned_end) >= ? - AND COALESCE(actual_start, actual_end, planned_start, planned_end) <= ? - ORDER BY COALESCE(actual_start, actual_end, planned_start, planned_end) ASC + WHERE COALESCE(actual_start, actual_end, planned_start, planned_end, message_published_at, extracted_at) >= ? + AND COALESCE(actual_start, actual_end, planned_start, planned_end, message_published_at, extracted_at) <= ? + ORDER BY COALESCE(actual_start, actual_end, planned_start, planned_end, message_published_at, extracted_at) ASC """, (start_dt.isoformat(), end_dt.isoformat()), ) @@ -1669,6 +1676,9 @@ def _parse_dt(value: str | None) -> datetime | None: message_id=row["message_id"], source_channels=json.loads(row["source_channels"] or "[]"), extracted_at=extracted_at, + message_published_at=_parse_dt(row["message_published_at"]) + if "message_published_at" in row.keys() + else None, # Title slots action=ActionType(row["action"]), object_id=row["object_id"], diff --git a/src/domain/models.py b/src/domain/models.py index 539e710..087a0a9 100644 --- a/src/domain/models.py +++ b/src/domain/models.py @@ -50,7 +50,7 @@ class MessageSourceConfig(BaseModel): default_factory=dict, description="Per-source LLM settings (temperature, timeout)", ) - channels: list[str] | list[Any] = Field( + channels: list[Any] = Field( default_factory=list, description="List of channel IDs (str) or channel config objects", ) @@ -468,6 +468,9 @@ class Event(BaseModel): source_channels: list[str] = Field( default_factory=list, description="Source channel names" ) + message_published_at: datetime | None = Field( + default=None, description="Original message timestamp (UTC)" + ) extracted_at: datetime = Field( default_factory=_utcnow, description="Extraction timestamp" ) @@ -620,6 +623,7 @@ def event_date(self) -> datetime | None: or self.actual_end or self.planned_start or self.planned_end + or self.message_published_at ) @property diff --git a/src/use_cases/extract_events.py b/src/use_cases/extract_events.py index 1ad71fa..6f1f61c 100644 --- a/src/use_cases/extract_events.py +++ b/src/use_cases/extract_events.py @@ -259,6 +259,7 @@ def convert_llm_event_to_domain( message_id=message_id, source_channels=[channel_name] if channel_name else [], source_id=source_id, + message_published_at=_normalize_to_utc(message_ts_dt), # Title slots action=action, object_id=object_id, diff --git a/tests/test_bulk_persistence_counts.py b/tests/test_bulk_persistence_counts.py index b2952d4..1a82a1a 100644 --- a/tests/test_bulk_persistence_counts.py +++ b/tests/test_bulk_persistence_counts.py @@ -49,6 +49,7 @@ def _create_event_values(index: int) -> tuple[object, ...]: event_id = str(uuid4()) message_id = f"msg-{index}" source_channels = "[]" + message_published_at = now source_id = "slack" action = "update" object_id = f"OBJ-{index}" @@ -77,6 +78,7 @@ def _create_event_values(index: int) -> tuple[object, ...]: message_id, source_channels, now, + message_published_at, source_id, action, object_id, @@ -127,16 +129,17 @@ def test_bulk_upsert_statement_counts(record_count: int) -> None: conn = sqlite3.connect(":memory:", factory=CountingConnection) conn.execute( """ - CREATE TABLE events ( - event_id TEXT PRIMARY KEY, - message_id TEXT, - source_channels TEXT, - extracted_at TEXT, - source_id TEXT, - action TEXT, - object_id TEXT, - object_name_raw TEXT, - qualifiers TEXT, + CREATE TABLE events ( + event_id TEXT PRIMARY KEY, + message_id TEXT, + source_channels TEXT, + extracted_at TEXT, + message_published_at TEXT, + source_id TEXT, + action TEXT, + object_id TEXT, + object_name_raw TEXT, + qualifiers TEXT, stroke TEXT, anchor TEXT, category TEXT,