diff --git a/app.py b/app.py
index b6f4f6a..dba346c 100644
--- a/app.py
+++ b/app.py
@@ -1,14 +1,13 @@
#!/usr/bin/env python3
"""
-Streamlit Demo App for Slack Event Manager.
+Streamlit Dashboard for Event Manager.
-This app provides a visual interface for the Slack Event Manager pipeline,
-allowing users to configure settings, run the pipeline, and visualize results.
+Clean read-only frontend for viewing extracted events from Slack (internal)
+and Telegram (external/market) sources.
"""
from datetime import UTC, datetime, timedelta
from typing import Any, Final
-from uuid import uuid4
import pandas as pd
import plotly.express as px
@@ -17,586 +16,967 @@
from src.adapters.repository_factory import create_repository
from src.config.settings import get_settings
from src.domain.protocols import RepositoryProtocol
-from src.observability.metrics import ensure_metrics_exporter
-from src.presentation.streamlit_orchestration import (
- RateLimitExceededError,
- job_result,
- job_status,
- submit_ingest_extract_job,
-)
-from src.use_cases.dashboard_queries import (
- fetch_recent_candidates,
- fetch_recent_events,
- fetch_recent_messages,
-)
-from src.use_cases.pipeline_orchestrator import PipelineParams
-
-# UI Constants
-MAX_MESSAGE_LENGTH: Final[int] = 150
-MAX_CANDIDATE_TEXT_LENGTH: Final[int] = 200
-SESSION_ID_KEY: Final[str] = "session_id"
-SESSION_JOB_ID_KEY: Final[str] = "active_job_id"
-SESSION_JOB_RESULT_KEY: Final[str] = "last_job_result"
-SESSION_STATUS_MESSAGE_KEY: Final[str] = "job_status_message"
-POLL_INTERVAL_MS: Final[int] = 1500
+# ============================================================================
+# CONSTANTS
+# ============================================================================
+
+MAX_TITLE_LENGTH: Final[int] = 80
+MAX_SUMMARY_LENGTH: Final[int] = 150
+DATE_RANGE_PAIR_LENGTH: Final[int] = 2
+TIMELINE_MAX_EVENTS: Final[int] = 30
+CALENDAR_TITLE_TRUNCATE_LENGTH: Final[int] = 12
+POPOVER_SUMMARY_PREVIEW_LENGTH: Final[int] = 200
+
+# Category colors for consistent styling
+CATEGORY_COLORS: Final[dict[str, str]] = {
+ "product": "#2ECC71", # Green
+ "risk": "#E74C3C", # Red
+ "process": "#3498DB", # Blue
+ "marketing": "#9B59B6", # Purple
+ "org": "#F39C12", # Orange
+ "unknown": "#95A5A6", # Gray
+}
-def get_repository() -> RepositoryProtocol:
- """Get repository instance based on settings.
+STATUS_COLORS: Final[dict[str, str]] = {
+ "planned": "#BDC3C7", # Light gray
+ "confirmed": "#85C1E9", # Light blue
+ "started": "#F1C40F", # Yellow
+ "completed": "#27AE60", # Green
+ "postponed": "#E67E22", # Orange
+ "canceled": "#E74C3C", # Red
+ "rolled_back": "#8E44AD", # Purple
+ "updated": "#3498DB", # Blue
+}
+
+
+# ============================================================================
+# DATA ACCESS
+# ============================================================================
+
+
+def fetch_channel_messages(
+ slack_client: Any,
+ *,
+ channels: list[str],
+ limit: int | None = None,
+) -> list[tuple[str, dict[str, Any]]]:
+ """Fetch messages from multiple Slack channels while preserving channel identity."""
- Returns:
- Repository instance (SQLite or PostgreSQL)
- """
+ collected: list[tuple[str, dict[str, Any]]] = []
+ for channel_id in channels:
+ messages = slack_client.fetch_messages(channel_id=channel_id, limit=limit)
+ for message in messages:
+ collected.append((channel_id, message))
+ return collected
+
+
+@st.cache_resource
+def get_repository() -> RepositoryProtocol:
+ """Get cached repository instance."""
settings = get_settings()
return create_repository(settings)
-def _ensure_session_defaults() -> None:
- if SESSION_ID_KEY not in st.session_state:
- st.session_state[SESSION_ID_KEY] = str(uuid4())
- st.session_state.setdefault(SESSION_JOB_ID_KEY, None)
- st.session_state.setdefault(SESSION_JOB_RESULT_KEY, None)
- st.session_state.setdefault(SESSION_STATUS_MESSAGE_KEY, "")
-
-
-def _require_auth() -> str:
- """No authentication required - return a default user ID."""
- if not st.session_state.get(SESSION_ID_KEY):
- st.session_state[SESSION_ID_KEY] = str(uuid4())
- return str(st.session_state[SESSION_ID_KEY])
-
-
-def _submit_pipeline_job(message_limit: int, channels: list[str], user_id: str) -> str:
- params = PipelineParams(message_limit=message_limit, channel_ids=channels)
- job_id = submit_ingest_extract_job(params, user_id)
- st.session_state[SESSION_JOB_ID_KEY] = job_id
- st.session_state[SESSION_JOB_RESULT_KEY] = None
- st.session_state[SESSION_STATUS_MESSAGE_KEY] = "Job submitted"
- return job_id
-
-
-def _render_job_status(job_id: str) -> None:
- status = job_status(job_id)
- progress_value = float(status.get("progress", 0.0))
- st.progress(progress_value)
- status_raw = str(status.get("status", "unknown")).lower()
- status_text = status_raw.capitalize()
- message = status.get("message") or ""
- st.info(f"Status: {status_text} {message}")
-
- if status_raw in {"succeeded", "failed"}:
- final = job_result(job_id)
- st.session_state[SESSION_JOB_RESULT_KEY] = final
- st.session_state[SESSION_JOB_ID_KEY] = None
- st.session_state[SESSION_STATUS_MESSAGE_KEY] = status_text
- if status_raw == "failed":
- error_msg = status.get("error") or "Pipeline failed"
- st.error(error_msg)
- else:
- st.success("Pipeline completed successfully")
+@st.cache_data(ttl=30)
+def fetch_events_by_source(source_id: str, limit: int = 500) -> pd.DataFrame:
+ """Fetch events filtered by source and convert to DataFrame."""
+ repo = get_repository()
+
+ # Get events from last 90 days to future
+ start_date = datetime.now(UTC) - timedelta(days=90)
+ end_date = datetime.now(UTC) + timedelta(days=365)
+
+ all_events = repo.get_events_in_window(start_date, end_date)
+
+ # Filter by source
+ events = [e for e in all_events if e.source_id.value == source_id][:limit]
+
+ if not events:
+ return pd.DataFrame()
+
+ data = []
+ for evt in events:
+ data.append(
+ {
+ "event_id": str(evt.event_id)[:8],
+ "title": evt.title[:MAX_TITLE_LENGTH] + "..."
+ if len(evt.title) > MAX_TITLE_LENGTH
+ else evt.title,
+ "full_title": evt.title,
+ "category": evt.category.value,
+ "status": evt.status.value,
+ "event_date": evt.event_date,
+ "confidence": evt.confidence,
+ "importance": evt.importance,
+ "summary": (evt.summary[:MAX_SUMMARY_LENGTH] + "...")
+ if evt.summary and len(evt.summary) > MAX_SUMMARY_LENGTH
+ else (evt.summary or ""),
+ "full_summary": evt.summary or "",
+ "environment": evt.environment.value if evt.environment else "unknown",
+ "source_id": evt.source_id.value,
+ "links": evt.links[:3] if evt.links else [],
+ "change_type": evt.change_type.value if evt.change_type else "other",
+ }
+ )
+
+ df = pd.DataFrame(data)
+
+ # Ensure event_date is datetime
+ if "event_date" in df.columns:
+ df["event_date"] = pd.to_datetime(df["event_date"], utc=True)
+
+ return df
-def _render_job_summary(result: dict[str, object]) -> None:
- st.subheader("Latest Pipeline Run")
- correlation_id = result.get("correlation_id")
- if correlation_id:
- st.caption(f"Correlation ID: {correlation_id}")
+# ============================================================================
+# FILTER COMPONENTS
+# ============================================================================
- ingest = result.get("ingest", {})
- extract = result.get("extract", {})
- dedup = result.get("dedup", {})
- col1, col2, col3 = st.columns(3)
+def render_filters(df: pd.DataFrame, key_prefix: str) -> pd.DataFrame:
+ """Render filter controls and return filtered DataFrame."""
+
+ if df.empty:
+ return df
+
+ # Create filter columns
+ col1, col2, col3, col4 = st.columns(4)
+
with col1:
- st.metric(
- "Messages Saved",
- ingest.get("messages_saved", 0),
- help="Total messages persisted during ingestion",
+ # Category filter
+ categories = ["All"] + sorted(df["category"].unique().tolist())
+ selected_category = st.selectbox(
+ "📂 Category", options=categories, key=f"{key_prefix}_category"
)
+
with col2:
- st.metric(
- "Events Extracted",
- extract.get("events_extracted", 0),
- help="Events produced by the extraction stage",
+ # Status filter
+ statuses = ["All"] + sorted(df["status"].unique().tolist())
+ selected_status = st.selectbox(
+ "📊 Status", options=statuses, key=f"{key_prefix}_status"
)
+
with col3:
- st.metric(
- "Final Events",
- dedup.get("total_events", 0),
- help="Events remaining after deduplication",
+ # Importance filter
+ min_importance = st.slider(
+ "⭐ Min Importance",
+ min_value=0,
+ max_value=100,
+ value=0,
+ key=f"{key_prefix}_importance",
)
+ with col4:
+ # Confidence filter
+ min_confidence = st.slider(
+ "🎯 Min Confidence",
+ min_value=0.0,
+ max_value=1.0,
+ value=0.0,
+ step=0.1,
+ key=f"{key_prefix}_confidence",
+ )
-# Page configuration
-st.set_page_config(
- page_title="Slack Event Manager",
- page_icon="📅",
- layout="wide",
- initial_sidebar_state="expanded",
-)
-
-# Custom CSS for better styling
-st.markdown(
- """
-
-""",
- unsafe_allow_html=True,
-)
+ # Second row: search and date range
+ col5, col6 = st.columns([2, 2])
+ with col5:
+ search_query = st.text_input(
+ "🔍 Search in titles",
+ placeholder="Type to search...",
+ key=f"{key_prefix}_search",
+ )
-def main():
- """Main application function."""
+ with col6:
+ # Date range filter
+ if df["event_date"].notna().any():
+ min_date = df["event_date"].min().date()
+ max_date = df["event_date"].max().date()
+
+ date_range = st.date_input(
+ "📅 Date Range",
+ value=(min_date, max_date),
+ min_value=min_date,
+ max_value=max_date,
+ key=f"{key_prefix}_date_range",
+ )
+ else:
+ date_range = None
- ensure_metrics_exporter()
- _ensure_session_defaults()
- user_id = _require_auth()
+ # Apply filters
+ filtered_df = df.copy()
- # Header
- st.markdown(
- '
📅 Slack Event Manager
', unsafe_allow_html=True
- )
- st.markdown(
- "Visual interface for processing Slack messages and extracting structured events."
- )
+ if selected_category != "All":
+ filtered_df = filtered_df[filtered_df["category"] == selected_category]
- # Sidebar configuration
- with st.sidebar:
- st.header("⚙️ Configuration")
+ if selected_status != "All":
+ filtered_df = filtered_df[filtered_df["status"] == selected_status]
- # Load settings
- settings = get_settings()
+ filtered_df = filtered_df[filtered_df["importance"] >= min_importance]
+ filtered_df = filtered_df[filtered_df["confidence"] >= min_confidence]
- # Basic settings
- message_limit = st.slider(
- "Message Limit",
- min_value=5,
- max_value=100,
- value=20,
- help="Number of recent messages to fetch from each channel",
+ if search_query:
+ mask = filtered_df["full_title"].str.contains(
+ search_query, case=False, na=False
)
-
- # Channel selection from config
- # Create mapping: channel_name -> channel_id
- channel_options = {
- f"{ch.channel_name} ({ch.channel_id})": ch.channel_id
- for ch in settings.slack_channels
- }
-
- # Use channel names as display options
- selected_channel_names = st.multiselect(
- "Channels",
- options=list(channel_options.keys()),
- default=list(channel_options.keys()), # All channels by default
- help="Select Slack channels to process",
+ filtered_df = filtered_df[mask]
+
+ if (
+ date_range
+ and len(date_range) == DATE_RANGE_PAIR_LENGTH
+ and filtered_df["event_date"].notna().any()
+ ):
+ start, end = date_range
+ # Convert to datetime for comparison
+ start_dt = pd.Timestamp(start, tz="UTC")
+ end_dt = pd.Timestamp(end, tz="UTC") + pd.Timedelta(days=1)
+ mask = (filtered_df["event_date"] >= start_dt) & (
+ filtered_df["event_date"] < end_dt
)
+ filtered_df = filtered_df[mask]
+
+ return filtered_df
+
+
+# ============================================================================
+# TABLE COMPONENTS
+# ============================================================================
+
+
+def render_events_table(df: pd.DataFrame, key_prefix: str) -> None:
+ """Render events table with proper column configuration."""
+
+ if df.empty:
+ st.info("No events match the selected filters.")
+ return
+
+ # Show count
+ st.caption(f"Showing {len(df)} events")
+
+ # Prepare display DataFrame
+ display_df = df[
+ [
+ "title",
+ "category",
+ "status",
+ "event_date",
+ "importance",
+ "confidence",
+ "environment",
+ "change_type",
+ ]
+ ].copy()
+
+ # Format date for display
+ display_df["event_date"] = display_df["event_date"].dt.strftime("%Y-%m-%d %H:%M")
+
+ st.dataframe(
+ display_df,
+ use_container_width=True,
+ hide_index=True,
+ column_config={
+ "title": st.column_config.TextColumn(
+ "📌 Title", width="large", help="Event title"
+ ),
+ "category": st.column_config.TextColumn(
+ "📂 Category",
+ width="small",
+ ),
+ "status": st.column_config.TextColumn(
+ "📊 Status",
+ width="small",
+ ),
+ "event_date": st.column_config.TextColumn(
+ "📅 Date",
+ width="medium",
+ ),
+ "importance": st.column_config.ProgressColumn(
+ "⭐ Importance",
+ min_value=0,
+ max_value=100,
+ format="%d",
+ ),
+ "confidence": st.column_config.ProgressColumn(
+ "🎯 Confidence",
+ min_value=0,
+ max_value=1,
+ format="%.1f",
+ ),
+ "environment": st.column_config.TextColumn(
+ "🌍 Env",
+ width="small",
+ ),
+ "change_type": st.column_config.TextColumn(
+ "🔄 Type",
+ width="small",
+ ),
+ },
+ )
- # Convert back to channel IDs
- channels = [channel_options[name] for name in selected_channel_names]
- # Show database info
- if settings.database_type == "postgres":
- st.info(
- "🐘 PostgreSQL: "
- f"{settings.postgres_user}@{settings.postgres_host}:"
- f"{settings.postgres_port}/{settings.postgres_database}"
- )
- else:
- st.info(f"📁 SQLite: {settings.db_path}")
+def render_metrics(df: pd.DataFrame, source_name: str) -> None:
+ """Render summary metrics for events."""
+
+ col1, col2, col3, col4, col5 = st.columns(5)
+
+ with col1:
+ st.metric("📊 Total", len(df))
+
+ with col2:
+ risk_count = len(df[df["category"] == "risk"]) if not df.empty else 0
+ st.metric("🚨 Risks", risk_count)
+
+ with col3:
+ product_count = len(df[df["category"] == "product"]) if not df.empty else 0
+ st.metric("🚀 Product", product_count)
- # Run pipeline button
- run_pipeline = st.button(
- "🚀 Run Pipeline", type="primary", use_container_width=True
+ with col4:
+ active_count = (
+ len(df[df["status"].isin(["started", "planned"])]) if not df.empty else 0
)
+ st.metric("⚡ Active", active_count)
- if run_pipeline:
- try:
- job_id = _submit_pipeline_job(message_limit, channels, user_id)
- st.success(f"Pipeline job submitted (ID: {job_id[:8]}…)")
- except RateLimitExceededError as exc:
- st.error(str(exc))
+ with col5:
+ avg_importance = df["importance"].mean() if not df.empty else 0
+ st.metric("⭐ Avg Importance", f"{avg_importance:.0f}")
- active_job_id = st.session_state.get(SESSION_JOB_ID_KEY)
- if active_job_id:
- st.autorefresh(interval=POLL_INTERVAL_MS, key=f"poll_{active_job_id}")
- _render_job_status(active_job_id)
- else:
- latest_result = st.session_state.get(SESSION_JOB_RESULT_KEY)
- if isinstance(latest_result, dict):
- _render_job_summary(latest_result)
- elif st.session_state.get(SESSION_STATUS_MESSAGE_KEY):
- st.info(st.session_state[SESSION_STATUS_MESSAGE_KEY])
- show_database_inspection()
+# ============================================================================
+# TIMELINE COMPONENT (Simple and Working)
+# ============================================================================
-def fetch_channel_messages(
- slack_client: Any, *, channels: list[str], limit: int | None
-) -> list[tuple[str, dict[str, Any]]]:
- """Fetch messages for each channel while preserving attribution."""
+def render_timeline(df: pd.DataFrame, key_prefix: str) -> None:
+ """Render timeline visualization using px.timeline (Gantt-style)."""
- channel_messages: list[tuple[str, dict[str, Any]]] = []
- for channel in channels:
- messages = slack_client.fetch_messages(channel_id=channel, limit=limit)
- channel_messages.extend((channel, message) for message in messages)
- return channel_messages
+ if df.empty or df["event_date"].isna().all():
+ st.info("No events with dates to display on timeline.")
+ return
+ # Filter events with valid dates
+ chart_df = df[df["event_date"].notna()].copy()
-def show_pipeline_results():
- """Show detailed results from the pipeline."""
+ if chart_df.empty:
+ st.info("No events with dates to display on timeline.")
+ return
- st.header("📊 Pipeline Results")
+ # Category filter
+ available_categories = sorted(chart_df["category"].unique().tolist())
+ selected_categories = st.multiselect(
+ "📂 Filter by category",
+ options=available_categories,
+ default=available_categories,
+ key=f"{key_prefix}_timeline_categories",
+ )
- # Database inspection using repository
- try:
- # Create tabs for different views
- tab1, tab2, tab3, tab4 = st.tabs(
- ["📨 Messages", "🎯 Candidates", "📝 Events", "📈 Timeline"]
- )
+ if not selected_categories:
+ st.warning("Select at least one category to display.")
+ return
+
+ chart_df = chart_df[chart_df["category"].isin(selected_categories)]
+
+ # Sort by date
+ chart_df = chart_df.sort_values("event_date", ascending=False)
+
+ # Limit to avoid overloading
+ if len(chart_df) > TIMELINE_MAX_EVENTS:
+ st.caption(f"Showing top {TIMELINE_MAX_EVENTS} events out of {len(chart_df)}")
+ chart_df = chart_df.head(TIMELINE_MAX_EVENTS)
+
+ # Create proper start/end for Gantt
+ chart_df["Start"] = chart_df["event_date"]
+ chart_df["End"] = chart_df["event_date"] + pd.Timedelta(days=1)
+ chart_df["Task"] = chart_df["title"].str[:60]
+
+ # Create Gantt chart
+ fig = px.timeline(
+ chart_df,
+ x_start="Start",
+ x_end="End",
+ y="Task",
+ color="category",
+ color_discrete_map=CATEGORY_COLORS,
+ hover_data=["status", "importance", "confidence"],
+ title="📅 Events Timeline",
+ )
- with tab1:
- show_messages_table()
+ # Add TODAY marker
+ today = datetime.now(UTC)
+ fig.add_vline(
+ x=today.timestamp() * 1000, # Convert to milliseconds for plotly
+ line_dash="dash",
+ line_color="red",
+ line_width=2,
+ annotation_text="TODAY",
+ annotation_position="top",
+ )
- with tab2:
- show_candidates_table()
+ fig.update_layout(
+ height=max(400, len(chart_df) * 28),
+ xaxis_title="",
+ yaxis_title="",
+ showlegend=True,
+ legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="left", x=0),
+ )
- with tab3:
- show_events_table()
+ fig.update_yaxes(automargin=True, tickfont=dict(size=11))
- with tab4:
- show_gantt_chart()
+ st.plotly_chart(fig, use_container_width=True, key=f"{key_prefix}_timeline")
- except Exception as e:
- st.error(f"Error reading database: {str(e)}")
+# ============================================================================
+# CALENDAR VIEW COMPONENT
+# ============================================================================
-def show_database_inspection():
- """Show database inspection when pipeline hasn't been run."""
- st.header("🔍 Database Inspection")
+def render_calendar_view(df: pd.DataFrame, key_prefix: str) -> None:
+ """Render calendar view similar to Google Calendar."""
- try:
- # Use repository for inspection
- show_pipeline_results()
- except Exception as e:
- st.error(f"Error reading database: {str(e)}")
+ if df.empty or df["event_date"].isna().all():
+ st.info("No events with dates to display.")
+ return
+ chart_df = df[df["event_date"].notna()].copy()
-def show_messages_table():
- """Display messages table."""
+ if chart_df.empty:
+ st.info("No events with dates to display.")
+ return
- st.subheader("📨 Raw Messages")
+ # Get date range
+ min_date = chart_df["event_date"].min()
+ max_date = chart_df["event_date"].max()
- try:
- repo = get_repository()
- settings = get_settings()
+ # Date navigation
+ col1, col2, col3 = st.columns([1, 2, 1])
- if settings.database_type == "postgres":
- st.caption(
- "🐘 Source: PostgreSQL ("
- f"{settings.postgres_host}:{settings.postgres_port}/"
- f"{settings.postgres_database})"
- )
- else:
- st.caption(f"📁 Source: SQLite ({settings.db_path})")
+ with col2:
+ # Month selector
+ today = datetime.now(UTC)
+ available_months = pd.date_range(
+ start=min_date.replace(day=1),
+ end=max_date.replace(day=1) + pd.DateOffset(months=1),
+ freq="MS",
+ )
+ month_options = [d.strftime("%B %Y") for d in available_months]
+ current_month_str = today.strftime("%B %Y")
+ default_idx = (
+ month_options.index(current_month_str)
+ if current_month_str in month_options
+ else 0
+ )
- messages = fetch_recent_messages(repository=repo, limit=100)
+ selected_month = st.selectbox(
+ "📅 Select Month",
+ options=month_options,
+ index=default_idx,
+ key=f"{key_prefix}_calendar_month",
+ )
- if not messages:
- st.info("No messages found.")
- return
+ # Parse selected month
+ selected_date = pd.to_datetime(selected_month, format="%B %Y")
+ month_start = selected_date.replace(day=1, tzinfo=UTC)
+ month_end = (month_start + pd.DateOffset(months=1) - pd.Timedelta(days=1)).replace(
+ tzinfo=UTC
+ )
- # Convert to DataFrame
- messages_data = []
- for msg in messages:
- messages_data.append(
- {
- "message_id": msg.message_id,
- "text": msg.text[:MAX_MESSAGE_LENGTH] + "..."
- if len(msg.text) > MAX_MESSAGE_LENGTH
- else msg.text,
- "ts": msg.ts_dt,
- "user_real_name": msg.user_real_name or "",
- "user_email": msg.user_email or "",
- "total_reactions": msg.total_reactions or 0,
- "reply_count": msg.reply_count or 0,
- "attachments_count": msg.attachments_count or 0,
- "files_count": msg.files_count or 0,
- "permalink": msg.permalink or "",
- "edited_ts": msg.edited_ts,
- "edited": msg.edited_ts is not None,
- }
- )
+ # Filter events for selected month
+ month_events = chart_df[
+ (chart_df["event_date"] >= month_start) & (chart_df["event_date"] <= month_end)
+ ]
+
+ # Create calendar grid
+ st.markdown("---")
+
+ # Day headers
+ days = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]
+ cols = st.columns(7)
+ for i, day in enumerate(days):
+ cols[i].markdown(f"**{day}**")
+
+ # Get first day of month and calculate offset
+ first_day = month_start
+ first_weekday = first_day.weekday() # Monday = 0
+
+ # Calculate total days in month
+ days_in_month = (month_end - month_start).days + 1
+
+ # Create weeks
+ current_day = 1
+ week_num = 0
+
+ while current_day <= days_in_month:
+ cols = st.columns(7)
+
+ for weekday in range(7):
+ # Skip days before month starts
+ if week_num == 0 and weekday < first_weekday:
+ cols[weekday].write("")
+ continue
+
+ if current_day > days_in_month:
+ cols[weekday].write("")
+ continue
+
+ # Current date
+ current_date = month_start + pd.Timedelta(days=current_day - 1)
+
+ # Get events for this day
+ day_events = month_events[
+ month_events["event_date"].dt.date == current_date.date()
+ ]
+
+ # Build day cell content
+ is_today = current_date.date() == today.date()
+ day_style = "🔴 " if is_today else ""
+
+ with cols[weekday]:
+ # Day number
+ if is_today:
+ st.markdown(f"**{day_style}{current_day}**")
+ else:
+ st.markdown(f"**{current_day}**")
+
+ # Events for this day
+ for _, event in day_events.iterrows():
+ cat_emoji = {
+ "product": "🚀",
+ "risk": "🚨",
+ "process": "⚙️",
+ "marketing": "📣",
+ "org": "👥",
+ "unknown": "❓",
+ }.get(event["category"], "📌")
+
+ # Truncate title for display
+ title_short = (
+ event["title"][:CALENDAR_TITLE_TRUNCATE_LENGTH] + "…"
+ if len(event["title"]) > CALENDAR_TITLE_TRUNCATE_LENGTH
+ else event["title"]
+ )
+
+ # Use native Streamlit popover
+ with st.popover(
+ f"{cat_emoji} {title_short}", use_container_width=True
+ ):
+ st.markdown(f"**{event['full_title']}**")
+ st.caption(
+ f"📂 {event['category'].title()} | 📊 {event['status'].title()}"
+ )
+ st.caption(
+ f"⭐ Importance: {event['importance']} | 🎯 Confidence: {event['confidence']:.0%}"
+ )
+ st.markdown("---")
+ summary = str(event["summary"] or "")
+ if len(summary) > POPOVER_SUMMARY_PREVIEW_LENGTH:
+ summary = summary[:POPOVER_SUMMARY_PREVIEW_LENGTH] + "..."
+ st.write(summary)
+
+ current_day += 1
+
+ week_num += 1
+
+ # Legend
+ st.markdown("---")
+ legend_items = " | ".join(
+ [
+ f"{emoji} {cat.title()}"
+ for cat, emoji in {
+ "product": "🚀",
+ "risk": "🚨",
+ "process": "⚙️",
+ "marketing": "📣",
+ "org": "👥",
+ }.items()
+ ]
+ )
+ st.caption(f"Legend: {legend_items}")
+
+ # Summary
+ st.caption(f"📊 {len(month_events)} events in {selected_month}")
+
+
+# ============================================================================
+# EVENT LIST VIEW
+# ============================================================================
+
+
+def render_event_list(df: pd.DataFrame, key_prefix: str) -> None:
+ """Render events as a list grouped by date."""
+
+ if df.empty or df["event_date"].isna().all():
+ st.info("No events to display.")
+ return
+
+ chart_df = df[df["event_date"].notna()].copy()
+ chart_df = chart_df.sort_values("event_date", ascending=False)
+
+ # Group by date
+ chart_df["date_str"] = chart_df["event_date"].dt.strftime("%A, %B %d, %Y")
+
+ for date_str, group in chart_df.groupby("date_str", sort=False):
+ st.markdown(f"### 📅 {date_str}")
+
+ for _, event in group.iterrows():
+ cat_color = CATEGORY_COLORS.get(event["category"], "#95A5A6")
+ cat_emoji = {
+ "product": "🚀",
+ "risk": "🚨",
+ "process": "⚙️",
+ "marketing": "📣",
+ "org": "👥",
+ "unknown": "❓",
+ }.get(event["category"], "📌")
+
+ status_emoji = {
+ "planned": "📋",
+ "started": "▶️",
+ "completed": "✅",
+ "canceled": "❌",
+ }.get(event["status"], "📌")
+
+ with st.container():
+ st.markdown(
+ f''
+ f'
{cat_emoji} {event["full_title"]}
'
+ f'
'
+ f"{status_emoji} {event['status'].title()} | "
+ f"⭐ {event['importance']} | "
+ f"🎯 {event['confidence']:.0%}"
+ f"
"
+ f'
{event["summary"][:150]}...
'
+ f"
",
+ unsafe_allow_html=True,
+ )
- messages_df = pd.DataFrame(messages_data)
+ st.markdown("")
- if messages_df.empty:
- st.info("No messages found.")
- return
- st.dataframe(
- messages_df,
- use_container_width=True,
- column_config={
- "message_id": st.column_config.TextColumn("Message ID", width="medium"),
- "text": st.column_config.TextColumn("Text", width="large"),
- "ts": st.column_config.DatetimeColumn(
- "Timestamp", format="YYYY-MM-DD HH:mm:ss"
- ),
- "user_real_name": st.column_config.TextColumn("User", width="medium"),
- "user_email": st.column_config.TextColumn("Email", width="medium"),
- "total_reactions": st.column_config.NumberColumn(
- "👍 Reactions", width="small"
- ),
- "reply_count": st.column_config.NumberColumn(
- "💬 Replies", width="small"
- ),
- "attachments_count": st.column_config.NumberColumn(
- "📎 Files", width="small"
- ),
- "files_count": st.column_config.NumberColumn("📄 Docs", width="small"),
- "permalink": st.column_config.LinkColumn("🔗 Link", width="small"),
- "edited": st.column_config.CheckboxColumn("✏️ Edited", width="small"),
- },
- hide_index=True,
- )
+# ============================================================================
+# CATEGORY DISTRIBUTION CHART
+# ============================================================================
- # Show summary statistics
- col1, col2, col3, col4 = st.columns(4)
- with col1:
- st.metric("Total Messages", len(messages_df))
- with col2:
- st.metric("Total Reactions", int(messages_df["total_reactions"].sum()))
- with col3:
- st.metric("Total Replies", int(messages_df["reply_count"].sum()))
- with col4:
- edited_count = (
- messages_df["edited_ts"].notna().sum()
- if "edited_ts" in messages_df.columns
- else 0
- )
- st.metric("Edited Messages", int(edited_count))
- except Exception as e:
- st.error(f"Error loading messages: {str(e)}")
+def render_category_chart(df: pd.DataFrame, key_prefix: str) -> None:
+ """Render category distribution pie chart."""
+ if df.empty:
+ return
-def show_candidates_table():
- """Display candidates table."""
+ category_counts = df["category"].value_counts()
- st.subheader("🎯 Event Candidates")
+ fig = px.pie(
+ values=category_counts.values,
+ names=category_counts.index,
+ title="Distribution by Category",
+ color=category_counts.index,
+ color_discrete_map=CATEGORY_COLORS,
+ hole=0.4,
+ )
- try:
- repo = get_repository()
- candidates = fetch_recent_candidates(repository=repo, limit=100)
+ fig.update_layout(
+ height=300,
+ margin=dict(l=20, r=20, t=40, b=20),
+ showlegend=True,
+ legend=dict(orientation="h", yanchor="bottom", y=-0.2),
+ )
- if not candidates:
- st.info("No candidates found.")
- return
+ st.plotly_chart(fig, use_container_width=True, key=f"{key_prefix}_pie")
- # Convert to DataFrame
- candidates_data = []
- for cand in candidates:
- candidates_data.append(
- {
- "message_id": cand.message_id,
- "text_norm": cand.text_norm[:MAX_CANDIDATE_TEXT_LENGTH] + "..."
- if len(cand.text_norm) > MAX_CANDIDATE_TEXT_LENGTH
- else cand.text_norm,
- "score": cand.score,
- "status": cand.status.value,
- "features_json": str(
- cand.features.model_dump() if cand.features else {}
- ),
- }
- )
- candidates_df = pd.DataFrame(candidates_data)
+def render_status_chart(df: pd.DataFrame, key_prefix: str) -> None:
+ """Render status distribution bar chart."""
- if candidates_df.empty:
- st.info("No candidates found.")
- return
+ if df.empty:
+ return
- st.dataframe(
- candidates_df,
- use_container_width=True,
- column_config={
- "message_id": st.column_config.TextColumn("Message ID", width="medium"),
- "text_norm": st.column_config.TextColumn("Text", width="large"),
- "score": st.column_config.NumberColumn("Score", format="%.2f"),
- "status": st.column_config.TextColumn("Status", width="small"),
- "features_json": st.column_config.TextColumn(
- "Features", width="medium"
- ),
- },
- )
+ status_counts = df["status"].value_counts()
- st.caption(f"Total candidates: {len(candidates_df)}")
+ fig = px.bar(
+ x=status_counts.values,
+ y=status_counts.index,
+ orientation="h",
+ title="Distribution by Status",
+ color=status_counts.index,
+ color_discrete_map=STATUS_COLORS,
+ )
- except Exception as e:
- st.error(f"Error loading candidates: {str(e)}")
+ fig.update_layout(
+ height=300,
+ margin=dict(l=20, r=20, t=40, b=20),
+ showlegend=False,
+ xaxis_title="Count",
+ yaxis_title="",
+ )
+ st.plotly_chart(fig, use_container_width=True, key=f"{key_prefix}_bar")
-def show_events_table():
- """Display events table."""
- st.subheader("📝 Extracted Events")
+# ============================================================================
+# SOURCE TABS
+# ============================================================================
- try:
- repo = get_repository()
- events = fetch_recent_events(repository=repo, limit=100)
- if not events:
- st.info("No events found.")
- return
+def render_source_tab(
+ source_id: str, source_name: str, source_emoji: str, description: str
+) -> None:
+ """Render complete tab for a single source."""
- # Convert to DataFrame with new event structure
- events_data = []
- for evt in events:
- events_data.append(
- {
- "event_id": evt.event_id,
- "message_id": evt.message_id,
- "title": evt.title, # Property that renders from slots
- "category": evt.category.value,
- "status": evt.status.value,
- "event_date": evt.event_date, # Property that returns first non-None time
- "confidence": evt.confidence,
- "importance": evt.importance,
- "cluster_key": evt.cluster_key,
- "dedup_key": evt.dedup_key or "",
- }
- )
+ st.markdown(f"### {source_emoji} {source_name}")
+ st.caption(description)
- events_df = pd.DataFrame(events_data)
+ # Fetch data
+ df = fetch_events_by_source(source_id)
- if events_df.empty:
- st.info("No events found.")
- return
+ if df.empty:
+ st.warning(f"No events found from {source_name}.")
+ return
- st.dataframe(
- events_df,
- use_container_width=True,
- column_config={
- "event_id": st.column_config.TextColumn("Event ID", width="medium"),
- "title": st.column_config.TextColumn("Title", width="large"),
- "category": st.column_config.TextColumn("Category", width="small"),
- "status": st.column_config.TextColumn("Status", width="small"),
- "event_date": st.column_config.DatetimeColumn(
- "Date", format="YYYY-MM-DD HH:mm"
- ),
- "confidence": st.column_config.NumberColumn(
- "Confidence", format="%.2f"
- ),
- "importance": st.column_config.NumberColumn(
- "Importance", width="small"
- ),
- "message_id": st.column_config.TextColumn(
- "Source Message", width="medium"
- ),
- "cluster_key": st.column_config.TextColumn(
- "Cluster Key", width="small"
- ),
- "dedup_key": st.column_config.TextColumn("Dedup Key", width="small"),
- },
- hide_index=True,
- )
+ # Metrics row
+ render_metrics(df, source_name)
- st.caption(f"Total events: {len(events_df)}")
+ st.divider()
- except Exception as e:
- st.error(f"Error loading events: {str(e)}")
+ # Filters
+ st.markdown("#### 🎛️ Filters")
+ filtered_df = render_filters(df, key_prefix=source_id)
+ st.divider()
-def show_gantt_chart():
- """Display Gantt chart visualization."""
+ # Sub-tabs for table and visualizations
+ tab_table, tab_timeline, tab_calendar, tab_list, tab_analytics = st.tabs(
+ ["📋 Table", "📊 Timeline", "📅 Calendar", "📝 List", "📈 Analytics"]
+ )
- st.subheader("📈 Events Timeline (Gantt Chart)")
+ with tab_table:
+ render_events_table(filtered_df, key_prefix=source_id)
- try:
- # Get repository and fetch events with dates
- repo = get_repository()
+ with tab_timeline:
+ render_timeline(filtered_df, key_prefix=source_id)
- start_date = datetime.now(UTC) - timedelta(days=90)
- end_date = datetime.now(UTC) + timedelta(days=365)
- events = repo.get_events_in_window(start_date, end_date)
+ with tab_calendar:
+ render_calendar_view(filtered_df, key_prefix=source_id)
- if not events:
- st.info("No events with dates found for timeline.")
- return
+ with tab_list:
+ render_event_list(filtered_df, key_prefix=source_id)
- # Convert to DataFrame - only events with dates
- events_data = []
- for evt in events:
- if evt.event_date:
- events_data.append(
- {
- "title": evt.title, # Property that renders from slots
- "category": evt.category.value,
- "event_date": evt.event_date, # Property that returns first non-None time
- }
- )
+ with tab_analytics:
+ col1, col2 = st.columns(2)
+ with col1:
+ render_category_chart(filtered_df, key_prefix=source_id)
+ with col2:
+ render_status_chart(filtered_df, key_prefix=source_id)
- events_df = pd.DataFrame(events_data)
- if events_df.empty:
- st.info("No events with dates found for timeline.")
- return
+# ============================================================================
+# MAIN APPLICATION
+# ============================================================================
+
- # Create Gantt chart
- fig = px.timeline(
- events_df,
- x_start=events_df["event_date"],
- x_end=events_df["event_date"] + pd.Timedelta(days=1), # Events span 1 day
- y="title",
- color="category",
- title="Events Timeline",
- labels={"event_date": "Date"},
- color_discrete_sequence=px.colors.qualitative.Set3,
+def main():
+ """Main application entry point."""
+
+ # Page configuration
+ st.set_page_config(
+ page_title="Event Manager",
+ page_icon="📅",
+ layout="wide",
+ initial_sidebar_state="collapsed",
+ )
+
+ # Custom CSS
+ st.markdown(
+ """
+
+ """,
+ unsafe_allow_html=True,
+ )
+
+ # Header
+ st.markdown(
+ '📅 Event Manager Dashboard
',
+ unsafe_allow_html=True,
+ )
+ st.caption("Centralized view of internal (Slack) and external (Telegram) events")
+
+ # Sidebar with system info
+ with st.sidebar:
+ st.header("ℹ️ System Info")
+
+ settings = get_settings()
+
+ # Database info
+ if settings.database_type == "postgres":
+ st.success(f"🐘 PostgreSQL: {settings.postgres_database}")
+ else:
+ st.info(f"📁 SQLite: {settings.db_path}")
+
+ # Quick stats
+ st.divider()
+ st.subheader("📊 Quick Stats")
+
+ slack_df = fetch_events_by_source("slack")
+ telegram_df = fetch_events_by_source("telegram")
+
+ st.metric("Slack Events", len(slack_df))
+ st.metric("Telegram Events", len(telegram_df))
+ st.metric("Total Events", len(slack_df) + len(telegram_df))
+
+ # Refresh button
+ st.divider()
+ if st.button("🔄 Refresh Data", use_container_width=True):
+ st.cache_data.clear()
+ st.rerun()
+
+ # Main content - source tabs
+ tab_slack, tab_telegram, tab_all = st.tabs(
+ ["🏢 Internal Events (Slack)", "🌍 External Events (Telegram)", "📊 All Events"]
+ )
+
+ with tab_slack:
+ render_source_tab(
+ source_id="slack",
+ source_name="Internal Events",
+ source_emoji="🏢",
+ description="Events from internal Slack channels: releases, deployments, incidents, team updates",
)
- # Update layout
- fig.update_layout(
- xaxis_title="Timeline",
- yaxis_title="Events",
- showlegend=True,
- height=max(
- 400, len(events_df) * 30
- ), # Dynamic height based on number of events
+ with tab_telegram:
+ render_source_tab(
+ source_id="telegram",
+ source_name="External Events",
+ source_emoji="🌍",
+ description="Events from Telegram channels: market news, competitor updates, industry trends",
)
- # Update y-axis to show full titles
- fig.update_yaxes(automargin=True)
+ with tab_all:
+ st.markdown("### 📊 All Events Combined")
+ st.caption("Overview of all events from both sources")
+
+ # Combine both sources
+ slack_df = fetch_events_by_source("slack")
+ telegram_df = fetch_events_by_source("telegram")
- st.plotly_chart(fig, use_container_width=True)
+ if slack_df.empty and telegram_df.empty:
+ st.warning("No events found in any source.")
+ return
+
+ combined_df = pd.concat([slack_df, telegram_df], ignore_index=True)
- # Summary stats
- col1, col2, col3 = st.columns(3)
+ # Metrics
+ col1, col2, col3, col4, col5 = st.columns(5)
with col1:
- st.metric("Total Events", len(events_df))
+ st.metric("📊 Total", len(combined_df))
with col2:
- st.metric("Categories", events_df["category"].nunique())
+ st.metric("🏢 Slack", len(slack_df))
with col3:
- date_range = (
- events_df["event_date"].max() - events_df["event_date"].min()
- ).days
- st.metric("Date Range", f"{date_range} days")
+ st.metric("🌍 Telegram", len(telegram_df))
+ with col4:
+ risk_count = len(combined_df[combined_df["category"] == "risk"])
+ st.metric("🚨 Risks", risk_count)
+ with col5:
+ avg_imp = combined_df["importance"].mean() if not combined_df.empty else 0
+ st.metric("⭐ Avg Importance", f"{avg_imp:.0f}")
+
+ st.divider()
+
+ # Filters
+ st.markdown("#### 🎛️ Filters")
+ filtered_df = render_filters(combined_df, key_prefix="all")
+
+ st.divider()
+
+ # Content tabs
+ tab_table, tab_timeline, tab_calendar, tab_list, tab_analytics = st.tabs(
+ ["📋 Table", "📊 Timeline", "📅 Calendar", "📝 List", "📈 Analytics"]
+ )
+
+ with tab_table:
+ render_events_table(filtered_df, key_prefix="all")
+
+ with tab_timeline:
+ render_timeline(filtered_df, key_prefix="all")
+
+ with tab_calendar:
+ render_calendar_view(filtered_df, key_prefix="all")
+
+ with tab_list:
+ render_event_list(filtered_df, key_prefix="all")
- except Exception as e:
- st.error(f"Error creating Gantt chart: {str(e)}")
+ with tab_analytics:
+ col1, col2 = st.columns(2)
+ with col1:
+ render_category_chart(filtered_df, key_prefix="all")
+ with col2:
+ render_status_chart(filtered_df, key_prefix="all")
if __name__ == "__main__":
diff --git a/config/prompts/telegram.yaml b/config/prompts/telegram.yaml
index c2a3c99..a7838c8 100644
--- a/config/prompts/telegram.yaml
+++ b/config/prompts/telegram.yaml
@@ -1,5 +1,5 @@
-version: "20250215.1"
-description: "Telegram event extraction prompt"
+version: "20251212.1"
+description: "Telegram event extraction for TON/crypto channels (security, competitor/product, regulation) with clean links"
system: |
You are an event extraction assistant for Telegram channel messages.
@@ -7,6 +7,46 @@ system: |
- INPUT: May be in Russian or English
- OUTPUT: ALL fields (action, qualifiers, stroke, object_name_raw, summary, etc.) MUST BE IN ENGLISH
+ =========================
+ SCOPE FILTER (CRITICAL)
+ =========================
+ You ONLY extract events that are relevant to:
+ - TON / Telegram ecosystem (products, wallets, bots, payments)
+ - Crypto / digital asset wallets, exchanges, payments, on/off-ramp
+ - Regulation and security for the above
+
+ And the message MUST fall into at least one of these buckets:
+
+ 1) security_incident:
+ - Hacks, exploits, breaches, theft of funds, critical vulnerabilities, compromised keys, bridge exploits.
+ - Examples (EN/RU): "hack", "exploit", "breach", "security incident", "funds stolen", "drained",
+ "взлом", "эксплойт", "угнали средства", "слив приватных ключей", "брижд взломали", "протокол задрейнили".
+
+ 2) competitor_update:
+ - Product / business changes of other wallets, exchanges, payment apps or custodial services that may compete with wallet.tg.
+ - Include: new features, supported assets, cards, P2P, staking, fees, geographies, integrations with Telegram or messengers.
+ - Treat as competitor any product that offers: custody of crypto/stablecoins, payments, P2P, on/off-ramp, cards, swaps,
+ especially if it has a Telegram bot or is used inside messengers.
+ - Examples (EN/RU): "launched a new wallet", "support USDT on Tron", "new crypto card", "P2P service",
+ "запустили кошелёк", "кошелёк в Telegram", "бот для криптоплатежей", "поддержка USDT", "запуск P2P", "новая карта".
+
+ 3) regulation:
+ - Laws, draft bills, regulatory guidance, enforcement actions, sanctions, licensing rules that affect:
+ crypto, stablecoins, custodial wallets, payments, KYC/AML, on/off-ramp, Telegram-based fintech.
+ - Examples (EN/RU): "regulator", "central bank", "FATF", "OFAC", "sanctions", "licensing", "KYC", "AML",
+ "законопроект", "регулятор", "ЦБ", "лицензия", "отзыв лицензии", "AML", "KYC", "запрет криптовалют", "санкции на стейблкоины".
+
+ If a message does NOT clearly match at least one of these three buckets,
+ you MUST respond with:
+ {
+ "is_event": false,
+ "overflow_note": null,
+ "events": []
+ }
+
+ =========================
+ GENERAL EXTRACTION RULES
+ =========================
Your task: Extract 0 to 5 independent events from a Telegram message with structured title slots.
Core Rules:
@@ -17,13 +57,32 @@ system: |
5. If >5 events exist, pick top 5 by specificity (clear dates/anchors), note rest in overflow_note
Categories:
- - product: releases, features, deployments, launches
+ - product: releases, features, deployments, launches (incl. competitor product updates)
- process: internal processes, workflows, policies
- marketing: campaigns, promotions, announcements
- - risk: incidents, issues, compliance, security
+ - risk: incidents, issues, compliance, security (incl. hacks, regulatory risk)
- org: organizational changes, hiring, team updates
- unknown: unclear or doesn't fit
+ Additional high-level topic (CRITICAL):
+ - topic_type: MUST be one of:
+ ["security_incident", "competitor_update", "regulation"]
+
+ Mapping guidelines:
+ - security_incident:
+ topic_type = "security_incident"
+ category = "risk"
+ change_type = "incident"
+ severity = "sev1" (large loss / critical compromise), "sev2", "sev3", or "info"
+ - competitor_update:
+ topic_type = "competitor_update"
+ category = "product" (or "marketing"/"process" if mostly about pricing, campaigns or T&C changes)
+ change_type = "launch" | "deploy" | "policy" | "campaign" | "other" depending on content
+ - regulation:
+ topic_type = "regulation"
+ category = "risk" or "process" (pick the best fit)
+ change_type = "policy"
+
Title Slot Extraction (CRITICAL):
Extract these slots that will be used to generate canonical title:
@@ -63,11 +122,14 @@ system: |
Content Fields:
- summary: 1-3 sentences (max 320 chars). What changed and why it matters.
- - why_it_matters: 1 line (max 160 chars) or null. Impact/reason for reader.
+ For competitor_update: emphasize what the competitor launched/changed and why this matters for a Telegram wallet product.
+ For security_incident: emphasize what was hacked, scale of loss, type of vulnerability.
+ For regulation: emphasize jurisdiction, asset types affected, and potential impact on wallets/crypto/payments.
+ - why_it_matters: 1 line (max 160 chars) or null. Impact for wallet.tg perspective (users, markets, compliance, risk).
- links: Array of URLs (max 3)
- anchors: Array of identifiers found (Jira keys, PR numbers, version tags)
- - impact_area: Systems/components affected (max 3): ["authentication", "payments", "mobile-app"]
- - impact_type: Types of impact: ["perf_degradation", "downtime", "ux_change", "policy_change", "data_migration"]
+ - impact_area: Systems/components affected (max 3): e.g. ["custody", "onramp", "p2p", "cards", "stablecoins", "compliance"]
+ - impact_type: Types of impact: ["perf_degradation", "downtime", "ux_change", "policy_change", "data_migration", "legal_risk", "security_risk"]
Quality:
- confidence: 0.0-1.0. How confident you are in extraction accuracy.
@@ -105,44 +167,8 @@ system: |
]
}
- If message has no events (e.g., question, discussion), set is_event=false and events=[].
-
- Examples:
- Message: "🚀 Launching Stocks & ETFs trading in alpha for Wallet team next Monday. Known issue: possible performance degradation during peak hours. Track: INV-1024"
- Event:
- {
- "action": "Launch",
- "object_name_raw": "Stocks & ETFs trading",
- "qualifiers": ["alpha", "Wallet team"],
- "stroke": "degradation possible",
- "anchor": "INV-1024",
- "category": "product",
- "status": "planned",
- "change_type": "launch",
- "environment": "prod",
- "severity": null,
- "planned_start": "2025-10-21T00:00:00Z",
- "planned_end": null,
- "actual_start": null,
- "actual_end": null,
- "time_source": "relative",
- "time_confidence": 0.85,
- "summary": "Stocks & ETFs trading launching in alpha for Wallet team. Potential performance issues during peak hours.",
- "why_it_matters": "New trading feature with known performance considerations",
- "links": [],
- "anchors": ["INV-1024"],
- "impact_area": ["trading", "wallet"],
- "impact_type": ["perf_degradation"],
- "confidence": 0.95
- }
-
- Message: "Hi team! Has anyone tried the new mobile build? I'm seeing some weird caching behavior."
- Response:
- {
- "is_event": false,
- "overflow_note": null,
- "events": []
- }
+ If message has no relevant event for security_incident, competitor_update or regulation,
+ you MUST set is_event=false and events=[].
IMPORTANT FORMATTING NOTES FOR TELEGRAM:
- Telegram messages may contain emoji, user mentions (@username), hashtags (#tag)
@@ -151,3 +177,6 @@ system: |
- Focus on substantive content, not formatting
- Forwarded messages will have forward_from metadata - use original source if relevant
- Media attachments (photos, videos, documents) may provide additional context
+ - Links MUST include scheme; if you see "t.me/xyz" convert to "https://t.me/xyz"
+ - Prefer https:// links; drop malformed links rather than emitting invalid URLs
+ - If message date is older than 30 days, set is_event=false (ignore stale news)
diff --git a/docker-compose.yml b/docker-compose.yml
index 7ea470b..685accf 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -64,6 +64,9 @@ services:
<<: *worker-base
container_name: slack_telegram_worker
command: python scripts/run_multi_source_pipeline.py --source telegram --interval-seconds 600
+ environment:
+ <<: *app-env
+ TELEGRAM_SESSION_PATH: ${TELEGRAM_SESSION_PATH:-data/telegram_session_telegram_worker.session}
networks:
- slack_network
diff --git a/scripts/telegram_qr_auth.py b/scripts/telegram_qr_auth.py
index 8c697c2..b693442 100644
--- a/scripts/telegram_qr_auth.py
+++ b/scripts/telegram_qr_auth.py
@@ -86,9 +86,9 @@ async def authenticate_with_qr() -> None:
print(f"❌ Error: TELEGRAM_API_ID must be an integer, got: {api_id_str}")
sys.exit(1)
- # Session file path
- session_path = "data/telegram_session.session"
- Path("data").mkdir(exist_ok=True)
+ # Session file path (can be overridden via TELEGRAM_SESSION_PATH env)
+ session_path = os.getenv("TELEGRAM_SESSION_PATH", "data/telegram_session.session")
+ Path(Path(session_path).parent).mkdir(exist_ok=True)
print("=" * 60)
print("Telegram QR Code Authentication")
diff --git a/src/adapters/bulk_persistence.py b/src/adapters/bulk_persistence.py
index b325dcc..4710752 100644
--- a/src/adapters/bulk_persistence.py
+++ b/src/adapters/bulk_persistence.py
@@ -55,6 +55,7 @@ def _serialize_datetime(dt: datetime | None) -> datetime | str | None:
event.message_id,
json.dumps(event.source_channels),
_serialize_datetime(event.extracted_at),
+ _serialize_datetime(event.message_published_at),
event.source_id.value,
event.action.value,
event.object_id,
@@ -189,7 +190,7 @@ def _postgres_events_batch(
insert_sql = """
INSERT INTO events (
- event_id, message_id, source_channels, extracted_at, source_id,
+ event_id, message_id, source_channels, extracted_at, message_published_at, source_id,
action, object_id, object_name_raw, qualifiers, stroke, anchor,
category, status, change_type, environment, severity,
planned_start, planned_end, actual_start, actual_end,
@@ -197,7 +198,7 @@ def _postgres_events_batch(
summary, why_it_matters, links, anchors, impact_area, impact_type,
confidence, importance, cluster_key, dedup_key
) VALUES (
- %s, %s, %s, %s, %s,
+ %s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s,
%s, %s, %s, %s,
@@ -209,6 +210,7 @@ def _postgres_events_batch(
message_id = EXCLUDED.message_id,
source_channels = EXCLUDED.source_channels,
extracted_at = EXCLUDED.extracted_at,
+ message_published_at = EXCLUDED.message_published_at,
source_id = EXCLUDED.source_id,
action = EXCLUDED.action,
object_id = EXCLUDED.object_id,
@@ -257,7 +259,7 @@ def _sqlite_events_batch(
insert_sql = """
INSERT OR REPLACE INTO events (
- event_id, message_id, source_channels, extracted_at, source_id,
+ event_id, message_id, source_channels, extracted_at, message_published_at, source_id,
action, object_id, object_name_raw, qualifiers, stroke, anchor,
category, status, change_type, environment, severity,
planned_start, planned_end, actual_start, actual_end,
@@ -265,7 +267,7 @@ def _sqlite_events_batch(
summary, why_it_matters, links, anchors, impact_area, impact_type,
confidence, importance, cluster_key, dedup_key
) VALUES (
- ?, ?, ?, ?, ?,
+ ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?,
?, ?, ?, ?,
diff --git a/src/adapters/postgres_repository.py b/src/adapters/postgres_repository.py
index 29569c2..0e8d6bc 100644
--- a/src/adapters/postgres_repository.py
+++ b/src/adapters/postgres_repository.py
@@ -468,6 +468,15 @@ def _row_to_event(self, row: dict[str, Any]) -> Event:
else:
extracted_at = extracted_at_raw.astimezone(pytz.UTC)
+ message_published_at_raw = row.get("message_published_at")
+ if isinstance(message_published_at_raw, datetime):
+ if message_published_at_raw.tzinfo is None:
+ message_published_at = message_published_at_raw.replace(tzinfo=pytz.UTC)
+ else:
+ message_published_at = message_published_at_raw.astimezone(pytz.UTC)
+ else:
+ message_published_at = None
+
return Event(
# Identification
event_id=UUID(row["event_id"]),
@@ -475,6 +484,7 @@ def _row_to_event(self, row: dict[str, Any]) -> Event:
source_channels=row.get("source_channels")
or [], # Already parsed from JSONB
extracted_at=extracted_at,
+ message_published_at=message_published_at,
source_id=MessageSource(source_id_value),
# Title slots
action=ActionType(row["action"]),
@@ -1058,9 +1068,9 @@ def get_events_in_window(self, start_dt: datetime, end_dt: datetime) -> list[Eve
cur.execute(
"""
SELECT * FROM events
- WHERE COALESCE(actual_start, actual_end, planned_start, planned_end) >= %s
- AND COALESCE(actual_start, actual_end, planned_start, planned_end) <= %s
- ORDER BY COALESCE(actual_start, actual_end, planned_start, planned_end) DESC
+ WHERE COALESCE(actual_start, actual_end, planned_start, planned_end, message_published_at, extracted_at) >= %s
+ AND COALESCE(actual_start, actual_end, planned_start, planned_end, message_published_at, extracted_at) <= %s
+ ORDER BY COALESCE(actual_start, actual_end, planned_start, planned_end, message_published_at, extracted_at) DESC
""",
(start_dt, end_dt),
)
diff --git a/src/adapters/sqlite_repository.py b/src/adapters/sqlite_repository.py
index d713f6f..1660b43 100644
--- a/src/adapters/sqlite_repository.py
+++ b/src/adapters/sqlite_repository.py
@@ -219,6 +219,7 @@ def _create_schema(self) -> None:
message_id TEXT NOT NULL,
source_channels TEXT NOT NULL,
extracted_at TEXT NOT NULL,
+ message_published_at TEXT,
-- Title Slots (source of truth)
action TEXT NOT NULL,
@@ -265,6 +266,12 @@ def _create_schema(self) -> None:
"""
)
+ try:
+ cursor.execute("ALTER TABLE events ADD COLUMN message_published_at TEXT")
+ except sqlite3.OperationalError as exc:
+ if "duplicate column name" not in str(exc).lower():
+ raise
+
# Event relations table
cursor.execute(
"""
@@ -1186,9 +1193,9 @@ def get_events_in_window(self, start_dt: datetime, end_dt: datetime) -> list[Eve
cursor.execute(
"""
SELECT * FROM events
- WHERE COALESCE(actual_start, actual_end, planned_start, planned_end) >= ?
- AND COALESCE(actual_start, actual_end, planned_start, planned_end) <= ?
- ORDER BY COALESCE(actual_start, actual_end, planned_start, planned_end) ASC
+ WHERE COALESCE(actual_start, actual_end, planned_start, planned_end, message_published_at, extracted_at) >= ?
+ AND COALESCE(actual_start, actual_end, planned_start, planned_end, message_published_at, extracted_at) <= ?
+ ORDER BY COALESCE(actual_start, actual_end, planned_start, planned_end, message_published_at, extracted_at) ASC
""",
(start_dt.isoformat(), end_dt.isoformat()),
)
@@ -1669,6 +1676,9 @@ def _parse_dt(value: str | None) -> datetime | None:
message_id=row["message_id"],
source_channels=json.loads(row["source_channels"] or "[]"),
extracted_at=extracted_at,
+ message_published_at=_parse_dt(row["message_published_at"])
+ if "message_published_at" in row.keys()
+ else None,
# Title slots
action=ActionType(row["action"]),
object_id=row["object_id"],
diff --git a/src/domain/models.py b/src/domain/models.py
index 539e710..087a0a9 100644
--- a/src/domain/models.py
+++ b/src/domain/models.py
@@ -50,7 +50,7 @@ class MessageSourceConfig(BaseModel):
default_factory=dict,
description="Per-source LLM settings (temperature, timeout)",
)
- channels: list[str] | list[Any] = Field(
+ channels: list[Any] = Field(
default_factory=list,
description="List of channel IDs (str) or channel config objects",
)
@@ -468,6 +468,9 @@ class Event(BaseModel):
source_channels: list[str] = Field(
default_factory=list, description="Source channel names"
)
+ message_published_at: datetime | None = Field(
+ default=None, description="Original message timestamp (UTC)"
+ )
extracted_at: datetime = Field(
default_factory=_utcnow, description="Extraction timestamp"
)
@@ -620,6 +623,7 @@ def event_date(self) -> datetime | None:
or self.actual_end
or self.planned_start
or self.planned_end
+ or self.message_published_at
)
@property
diff --git a/src/use_cases/extract_events.py b/src/use_cases/extract_events.py
index 1ad71fa..6f1f61c 100644
--- a/src/use_cases/extract_events.py
+++ b/src/use_cases/extract_events.py
@@ -259,6 +259,7 @@ def convert_llm_event_to_domain(
message_id=message_id,
source_channels=[channel_name] if channel_name else [],
source_id=source_id,
+ message_published_at=_normalize_to_utc(message_ts_dt),
# Title slots
action=action,
object_id=object_id,
diff --git a/tests/test_bulk_persistence_counts.py b/tests/test_bulk_persistence_counts.py
index b2952d4..1a82a1a 100644
--- a/tests/test_bulk_persistence_counts.py
+++ b/tests/test_bulk_persistence_counts.py
@@ -49,6 +49,7 @@ def _create_event_values(index: int) -> tuple[object, ...]:
event_id = str(uuid4())
message_id = f"msg-{index}"
source_channels = "[]"
+ message_published_at = now
source_id = "slack"
action = "update"
object_id = f"OBJ-{index}"
@@ -77,6 +78,7 @@ def _create_event_values(index: int) -> tuple[object, ...]:
message_id,
source_channels,
now,
+ message_published_at,
source_id,
action,
object_id,
@@ -127,16 +129,17 @@ def test_bulk_upsert_statement_counts(record_count: int) -> None:
conn = sqlite3.connect(":memory:", factory=CountingConnection)
conn.execute(
"""
- CREATE TABLE events (
- event_id TEXT PRIMARY KEY,
- message_id TEXT,
- source_channels TEXT,
- extracted_at TEXT,
- source_id TEXT,
- action TEXT,
- object_id TEXT,
- object_name_raw TEXT,
- qualifiers TEXT,
+ CREATE TABLE events (
+ event_id TEXT PRIMARY KEY,
+ message_id TEXT,
+ source_channels TEXT,
+ extracted_at TEXT,
+ message_published_at TEXT,
+ source_id TEXT,
+ action TEXT,
+ object_id TEXT,
+ object_name_raw TEXT,
+ qualifiers TEXT,
stroke TEXT,
anchor TEXT,
category TEXT,