Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 71 additions & 9 deletions scripts/test_with_real_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@

import os
import sys
from argparse import ArgumentParser, Namespace
from datetime import datetime
from pathlib import Path
from typing import Final, TextIO

sys.path.insert(0, str(Path(__file__).parent.parent))

Expand All @@ -21,8 +23,46 @@
from src.use_cases.ingest_messages import process_slack_message

# Setup logging to file and console
LOG_FILE = "pipeline_detailed.log"
log_handle = None
LOG_FILE: Final[str] = "pipeline_detailed.log"
DEFAULT_MESSAGE_LIMIT: Final[int] = 5
DEFAULT_BATCH_LIMIT: Final[int] = 5

log_handle: TextIO | None = None


def parse_args() -> Namespace:
"""Parse command line arguments for configuring the test run."""

parser = ArgumentParser(
description="Run the real data pipeline test with optional limits."
)
parser.add_argument(
"--message-limit",
type=int,
default=DEFAULT_MESSAGE_LIMIT,
help=(
"Maximum number of Slack messages to fetch. "
"Use --full-run to process the historical 20-message batch."
),
)
parser.add_argument(
"--batch-size",
type=int,
default=DEFAULT_BATCH_LIMIT,
help=(
"Maximum number of candidates to process during LLM extraction. "
"Set to a small number to avoid exceeding CI timeouts."
),
)
parser.add_argument(
"--full-run",
action="store_true",
help=(
"Process the full historical batch (20 messages, all candidates). "
"This matches legacy behaviour and may take >10 minutes."
),
)
return parser.parse_args()


def setup_logging() -> None:
Expand Down Expand Up @@ -169,11 +209,28 @@ def inspect_database(db_path: str, stage: str = "") -> None:
conn.close()


def main() -> bool:
def resolve_limits(args: Namespace) -> tuple[int, int | None]:
"""Resolve message and batch size limits based on CLI arguments."""

if args.full_run:
return 20, None

if args.message_limit <= 0:
raise ValueError("--message-limit must be positive")

if args.batch_size is not None and args.batch_size <= 0:
raise ValueError("--batch-size must be positive when provided")

return args.message_limit, args.batch_size


def main(args: Namespace) -> bool:
"""Run pipeline test with real data."""
setup_logging()

log("\n🚀 Pipeline Test with Real Data (20 messages)")
message_limit, batch_size = resolve_limits(args)

log("\n🚀 Pipeline Test with Real Data")
log("=" * 70)
log("")

Expand Down Expand Up @@ -229,14 +286,15 @@ def main() -> bool:
log("")

# Step 1: Fetch messages
log("⏳ Step 1: Fetching 20 messages from releases channel...")
log(f"⏳ Step 1: Fetching {message_limit} messages from releases channel...")
try:
import time

time.sleep(2) # Avoid rate limit

raw_messages = slack_client.fetch_messages(
channel_id="C04V0TK7UG6", limit=20
channel_id="C04V0TK7UG6",
limit=message_limit,
)
log(f"✅ Fetched {len(raw_messages)} messages")
except Exception as e:
Expand Down Expand Up @@ -291,7 +349,10 @@ def main() -> bool:

# Step 4: Extract with LLM (process ALL candidates)
log("")
log("⏳ Step 4: Extracting events with LLM (processing ALL candidates)...")
llm_scope = (
"all candidates" if batch_size is None else f"batch size {batch_size}"
)
log(f"⏳ Step 4: Extracting events with LLM ({llm_scope})...")
log(" Note: Will show full LLM prompts and responses in verbose mode")
log("")

Expand All @@ -300,7 +361,7 @@ def main() -> bool:
repository=repo,
settings=settings,
source_id=MessageSource.SLACK, # Real data test - Slack only
batch_size=None, # Process ALL candidates without limit
batch_size=batch_size,
check_budget=False,
object_registry=object_registry,
importance_scorer=importance_scorer,
Expand Down Expand Up @@ -366,5 +427,6 @@ def main() -> bool:


if __name__ == "__main__":
success = main()
cli_args = parse_args()
success = main(cli_args)
sys.exit(0 if success else 1)