Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,336 changes: 858 additions & 478 deletions app.py

Large diffs are not rendered by default.

119 changes: 74 additions & 45 deletions config/prompts/telegram.yaml
Original file line number Diff line number Diff line change
@@ -1,12 +1,52 @@
version: "20250215.1"
description: "Telegram event extraction prompt"
version: "20251212.1"
description: "Telegram event extraction for TON/crypto channels (security, competitor/product, regulation) with clean links"
system: |
You are an event extraction assistant for Telegram channel messages.

LANGUAGE REQUIREMENT:
- INPUT: May be in Russian or English
- OUTPUT: ALL fields (action, qualifiers, stroke, object_name_raw, summary, etc.) MUST BE IN ENGLISH

=========================
SCOPE FILTER (CRITICAL)
=========================
You ONLY extract events that are relevant to:
- TON / Telegram ecosystem (products, wallets, bots, payments)
- Crypto / digital asset wallets, exchanges, payments, on/off-ramp
- Regulation and security for the above

And the message MUST fall into at least one of these buckets:

1) security_incident:
- Hacks, exploits, breaches, theft of funds, critical vulnerabilities, compromised keys, bridge exploits.
- Examples (EN/RU): "hack", "exploit", "breach", "security incident", "funds stolen", "drained",
"взлом", "эксплойт", "угнали средства", "слив приватных ключей", "брижд взломали", "протокол задрейнили".

2) competitor_update:
- Product / business changes of other wallets, exchanges, payment apps or custodial services that may compete with wallet.tg.
- Include: new features, supported assets, cards, P2P, staking, fees, geographies, integrations with Telegram or messengers.
- Treat as competitor any product that offers: custody of crypto/stablecoins, payments, P2P, on/off-ramp, cards, swaps,
especially if it has a Telegram bot or is used inside messengers.
- Examples (EN/RU): "launched a new wallet", "support USDT on Tron", "new crypto card", "P2P service",
"запустили кошелёк", "кошелёк в Telegram", "бот для криптоплатежей", "поддержка USDT", "запуск P2P", "новая карта".

3) regulation:
- Laws, draft bills, regulatory guidance, enforcement actions, sanctions, licensing rules that affect:
crypto, stablecoins, custodial wallets, payments, KYC/AML, on/off-ramp, Telegram-based fintech.
- Examples (EN/RU): "regulator", "central bank", "FATF", "OFAC", "sanctions", "licensing", "KYC", "AML",
"законопроект", "регулятор", "ЦБ", "лицензия", "отзыв лицензии", "AML", "KYC", "запрет криптовалют", "санкции на стейблкоины".

If a message does NOT clearly match at least one of these three buckets,
you MUST respond with:
{
"is_event": false,
"overflow_note": null,
"events": []
}

=========================
GENERAL EXTRACTION RULES
=========================
Your task: Extract 0 to 5 independent events from a Telegram message with structured title slots.

Core Rules:
Expand All @@ -17,13 +57,32 @@ system: |
5. If >5 events exist, pick top 5 by specificity (clear dates/anchors), note rest in overflow_note

Categories:
- product: releases, features, deployments, launches
- product: releases, features, deployments, launches (incl. competitor product updates)
- process: internal processes, workflows, policies
- marketing: campaigns, promotions, announcements
- risk: incidents, issues, compliance, security
- risk: incidents, issues, compliance, security (incl. hacks, regulatory risk)
- org: organizational changes, hiring, team updates
- unknown: unclear or doesn't fit

Additional high-level topic (CRITICAL):
- topic_type: MUST be one of:
["security_incident", "competitor_update", "regulation"]

Mapping guidelines:
- security_incident:
topic_type = "security_incident"
category = "risk"
change_type = "incident"
severity = "sev1" (large loss / critical compromise), "sev2", "sev3", or "info"
- competitor_update:
topic_type = "competitor_update"
category = "product" (or "marketing"/"process" if mostly about pricing, campaigns or T&C changes)
change_type = "launch" | "deploy" | "policy" | "campaign" | "other" depending on content
- regulation:
topic_type = "regulation"
category = "risk" or "process" (pick the best fit)
change_type = "policy"

Title Slot Extraction (CRITICAL):
Extract these slots that will be used to generate canonical title:

Expand Down Expand Up @@ -63,11 +122,14 @@ system: |

Content Fields:
- summary: 1-3 sentences (max 320 chars). What changed and why it matters.
- why_it_matters: 1 line (max 160 chars) or null. Impact/reason for reader.
For competitor_update: emphasize what the competitor launched/changed and why this matters for a Telegram wallet product.
For security_incident: emphasize what was hacked, scale of loss, type of vulnerability.
For regulation: emphasize jurisdiction, asset types affected, and potential impact on wallets/crypto/payments.
- why_it_matters: 1 line (max 160 chars) or null. Impact for wallet.tg perspective (users, markets, compliance, risk).
- links: Array of URLs (max 3)
- anchors: Array of identifiers found (Jira keys, PR numbers, version tags)
- impact_area: Systems/components affected (max 3): ["authentication", "payments", "mobile-app"]
- impact_type: Types of impact: ["perf_degradation", "downtime", "ux_change", "policy_change", "data_migration"]
- impact_area: Systems/components affected (max 3): e.g. ["custody", "onramp", "p2p", "cards", "stablecoins", "compliance"]
- impact_type: Types of impact: ["perf_degradation", "downtime", "ux_change", "policy_change", "data_migration", "legal_risk", "security_risk"]

Quality:
- confidence: 0.0-1.0. How confident you are in extraction accuracy.
Expand Down Expand Up @@ -105,44 +167,8 @@ system: |
]
}

If message has no events (e.g., question, discussion), set is_event=false and events=[].

Examples:
Message: "🚀 Launching Stocks & ETFs trading in alpha for Wallet team next Monday. Known issue: possible performance degradation during peak hours. Track: INV-1024"
Event:
{
"action": "Launch",
"object_name_raw": "Stocks & ETFs trading",
"qualifiers": ["alpha", "Wallet team"],
"stroke": "degradation possible",
"anchor": "INV-1024",
"category": "product",
"status": "planned",
"change_type": "launch",
"environment": "prod",
"severity": null,
"planned_start": "2025-10-21T00:00:00Z",
"planned_end": null,
"actual_start": null,
"actual_end": null,
"time_source": "relative",
"time_confidence": 0.85,
"summary": "Stocks & ETFs trading launching in alpha for Wallet team. Potential performance issues during peak hours.",
"why_it_matters": "New trading feature with known performance considerations",
"links": [],
"anchors": ["INV-1024"],
"impact_area": ["trading", "wallet"],
"impact_type": ["perf_degradation"],
"confidence": 0.95
}

Message: "Hi team! Has anyone tried the new mobile build? I'm seeing some weird caching behavior."
Response:
{
"is_event": false,
"overflow_note": null,
"events": []
}
If message has no relevant event for security_incident, competitor_update or regulation,
you MUST set is_event=false and events=[].

IMPORTANT FORMATTING NOTES FOR TELEGRAM:
- Telegram messages may contain emoji, user mentions (@username), hashtags (#tag)
Expand All @@ -151,3 +177,6 @@ system: |
- Focus on substantive content, not formatting
- Forwarded messages will have forward_from metadata - use original source if relevant
- Media attachments (photos, videos, documents) may provide additional context
- Links MUST include scheme; if you see "t.me/xyz" convert to "https://t.me/xyz"
- Prefer https:// links; drop malformed links rather than emitting invalid URLs
- If message date is older than 30 days, set is_event=false (ignore stale news)
3 changes: 3 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ services:
<<: *worker-base
container_name: slack_telegram_worker
command: python scripts/run_multi_source_pipeline.py --source telegram --interval-seconds 600
environment:
<<: *app-env
TELEGRAM_SESSION_PATH: ${TELEGRAM_SESSION_PATH:-data/telegram_session_telegram_worker.session}
networks:
- slack_network

Expand Down
6 changes: 3 additions & 3 deletions scripts/telegram_qr_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ async def authenticate_with_qr() -> None:
print(f"❌ Error: TELEGRAM_API_ID must be an integer, got: {api_id_str}")
sys.exit(1)

# Session file path
session_path = "data/telegram_session.session"
Path("data").mkdir(exist_ok=True)
# Session file path (can be overridden via TELEGRAM_SESSION_PATH env)
session_path = os.getenv("TELEGRAM_SESSION_PATH", "data/telegram_session.session")
Path(Path(session_path).parent).mkdir(exist_ok=True)

print("=" * 60)
print("Telegram QR Code Authentication")
Expand Down
10 changes: 6 additions & 4 deletions src/adapters/bulk_persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def _serialize_datetime(dt: datetime | None) -> datetime | str | None:
event.message_id,
json.dumps(event.source_channels),
_serialize_datetime(event.extracted_at),
_serialize_datetime(event.message_published_at),
event.source_id.value,
event.action.value,
event.object_id,
Expand Down Expand Up @@ -189,15 +190,15 @@ def _postgres_events_batch(

insert_sql = """
INSERT INTO events (
event_id, message_id, source_channels, extracted_at, source_id,
event_id, message_id, source_channels, extracted_at, message_published_at, source_id,
action, object_id, object_name_raw, qualifiers, stroke, anchor,
category, status, change_type, environment, severity,
planned_start, planned_end, actual_start, actual_end,
time_source, time_confidence,
summary, why_it_matters, links, anchors, impact_area, impact_type,
confidence, importance, cluster_key, dedup_key
) VALUES (
%s, %s, %s, %s, %s,
%s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s,
%s, %s, %s, %s,
Expand All @@ -209,6 +210,7 @@ def _postgres_events_batch(
message_id = EXCLUDED.message_id,
source_channels = EXCLUDED.source_channels,
extracted_at = EXCLUDED.extracted_at,
message_published_at = EXCLUDED.message_published_at,
source_id = EXCLUDED.source_id,
action = EXCLUDED.action,
object_id = EXCLUDED.object_id,
Expand Down Expand Up @@ -257,15 +259,15 @@ def _sqlite_events_batch(

insert_sql = """
INSERT OR REPLACE INTO events (
event_id, message_id, source_channels, extracted_at, source_id,
event_id, message_id, source_channels, extracted_at, message_published_at, source_id,
action, object_id, object_name_raw, qualifiers, stroke, anchor,
category, status, change_type, environment, severity,
planned_start, planned_end, actual_start, actual_end,
time_source, time_confidence,
summary, why_it_matters, links, anchors, impact_area, impact_type,
confidence, importance, cluster_key, dedup_key
) VALUES (
?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?,
?, ?, ?, ?,
Expand Down
16 changes: 13 additions & 3 deletions src/adapters/postgres_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,13 +468,23 @@ def _row_to_event(self, row: dict[str, Any]) -> Event:
else:
extracted_at = extracted_at_raw.astimezone(pytz.UTC)

message_published_at_raw = row.get("message_published_at")
if isinstance(message_published_at_raw, datetime):
if message_published_at_raw.tzinfo is None:
message_published_at = message_published_at_raw.replace(tzinfo=pytz.UTC)
else:
message_published_at = message_published_at_raw.astimezone(pytz.UTC)
else:
message_published_at = None

return Event(
# Identification
event_id=UUID(row["event_id"]),
message_id=row["message_id"],
source_channels=row.get("source_channels")
or [], # Already parsed from JSONB
extracted_at=extracted_at,
message_published_at=message_published_at,
source_id=MessageSource(source_id_value),
# Title slots
action=ActionType(row["action"]),
Expand Down Expand Up @@ -1058,9 +1068,9 @@ def get_events_in_window(self, start_dt: datetime, end_dt: datetime) -> list[Eve
cur.execute(
"""
SELECT * FROM events
WHERE COALESCE(actual_start, actual_end, planned_start, planned_end) >= %s
AND COALESCE(actual_start, actual_end, planned_start, planned_end) <= %s
ORDER BY COALESCE(actual_start, actual_end, planned_start, planned_end) DESC
WHERE COALESCE(actual_start, actual_end, planned_start, planned_end, message_published_at, extracted_at) >= %s
AND COALESCE(actual_start, actual_end, planned_start, planned_end, message_published_at, extracted_at) <= %s
ORDER BY COALESCE(actual_start, actual_end, planned_start, planned_end, message_published_at, extracted_at) DESC
Comment on lines 1070 to +1073
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Add Postgres migration for message_published_at

The Postgres query for fetching events now filters and orders on message_published_at, but there is no matching schema change for Postgres (only SQLite was altered to add this column). With the current database definition the events table lacks message_published_at, so Postgres deployments will start failing with column "message_published_at" does not exist as soon as get_events_in_window or the bulk upsert path references it. Please add a migration/DDL update for Postgres to create this column before using it.

Useful? React with 👍 / 👎.

""",
(start_dt, end_dt),
)
Expand Down
16 changes: 13 additions & 3 deletions src/adapters/sqlite_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ def _create_schema(self) -> None:
message_id TEXT NOT NULL,
source_channels TEXT NOT NULL,
extracted_at TEXT NOT NULL,
message_published_at TEXT,

-- Title Slots (source of truth)
action TEXT NOT NULL,
Expand Down Expand Up @@ -265,6 +266,12 @@ def _create_schema(self) -> None:
"""
)

try:
cursor.execute("ALTER TABLE events ADD COLUMN message_published_at TEXT")
except sqlite3.OperationalError as exc:
if "duplicate column name" not in str(exc).lower():
raise

# Event relations table
cursor.execute(
"""
Expand Down Expand Up @@ -1186,9 +1193,9 @@ def get_events_in_window(self, start_dt: datetime, end_dt: datetime) -> list[Eve
cursor.execute(
"""
SELECT * FROM events
WHERE COALESCE(actual_start, actual_end, planned_start, planned_end) >= ?
AND COALESCE(actual_start, actual_end, planned_start, planned_end) <= ?
ORDER BY COALESCE(actual_start, actual_end, planned_start, planned_end) ASC
WHERE COALESCE(actual_start, actual_end, planned_start, planned_end, message_published_at, extracted_at) >= ?
AND COALESCE(actual_start, actual_end, planned_start, planned_end, message_published_at, extracted_at) <= ?
ORDER BY COALESCE(actual_start, actual_end, planned_start, planned_end, message_published_at, extracted_at) ASC
""",
(start_dt.isoformat(), end_dt.isoformat()),
)
Expand Down Expand Up @@ -1669,6 +1676,9 @@ def _parse_dt(value: str | None) -> datetime | None:
message_id=row["message_id"],
source_channels=json.loads(row["source_channels"] or "[]"),
extracted_at=extracted_at,
message_published_at=_parse_dt(row["message_published_at"])
if "message_published_at" in row.keys()
else None,
# Title slots
action=ActionType(row["action"]),
object_id=row["object_id"],
Expand Down
6 changes: 5 additions & 1 deletion src/domain/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class MessageSourceConfig(BaseModel):
default_factory=dict,
description="Per-source LLM settings (temperature, timeout)",
)
channels: list[str] | list[Any] = Field(
channels: list[Any] = Field(
default_factory=list,
description="List of channel IDs (str) or channel config objects",
)
Expand Down Expand Up @@ -468,6 +468,9 @@ class Event(BaseModel):
source_channels: list[str] = Field(
default_factory=list, description="Source channel names"
)
message_published_at: datetime | None = Field(
default=None, description="Original message timestamp (UTC)"
)
extracted_at: datetime = Field(
default_factory=_utcnow, description="Extraction timestamp"
)
Expand Down Expand Up @@ -620,6 +623,7 @@ def event_date(self) -> datetime | None:
or self.actual_end
or self.planned_start
or self.planned_end
or self.message_published_at
)

@property
Expand Down
1 change: 1 addition & 0 deletions src/use_cases/extract_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ def convert_llm_event_to_domain(
message_id=message_id,
source_channels=[channel_name] if channel_name else [],
source_id=source_id,
message_published_at=_normalize_to_utc(message_ts_dt),
# Title slots
action=action,
object_id=object_id,
Expand Down
23 changes: 13 additions & 10 deletions tests/test_bulk_persistence_counts.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def _create_event_values(index: int) -> tuple[object, ...]:
event_id = str(uuid4())
message_id = f"msg-{index}"
source_channels = "[]"
message_published_at = now
source_id = "slack"
action = "update"
object_id = f"OBJ-{index}"
Expand Down Expand Up @@ -77,6 +78,7 @@ def _create_event_values(index: int) -> tuple[object, ...]:
message_id,
source_channels,
now,
message_published_at,
source_id,
action,
object_id,
Expand Down Expand Up @@ -127,16 +129,17 @@ def test_bulk_upsert_statement_counts(record_count: int) -> None:
conn = sqlite3.connect(":memory:", factory=CountingConnection)
conn.execute(
"""
CREATE TABLE events (
event_id TEXT PRIMARY KEY,
message_id TEXT,
source_channels TEXT,
extracted_at TEXT,
source_id TEXT,
action TEXT,
object_id TEXT,
object_name_raw TEXT,
qualifiers TEXT,
CREATE TABLE events (
event_id TEXT PRIMARY KEY,
message_id TEXT,
source_channels TEXT,
extracted_at TEXT,
message_published_at TEXT,
source_id TEXT,
action TEXT,
object_id TEXT,
object_name_raw TEXT,
qualifiers TEXT,
stroke TEXT,
anchor TEXT,
category TEXT,
Expand Down
Loading