Skip to content

Commit 056cf45

Browse files
committed
feat: Add comprehensive Apache Cassandra database integration
- Implement CassandraManager with full CRUD operations and analytics - Add distributed data storage with automatic URL and content deduplication - Create dynamic seed URL management system stored in database - Implement time-series crawl statistics and performance tracking - Add content versioning and change detection capabilities - Create CassandraParserManager extending existing parser with database features - Add Docker Compose setup for complete Cassandra deployment - Implement comprehensive test suite with 15+ test cases for database operations - Add demo script showcasing all Cassandra integration features - Support graceful fallback when database unavailable Database Architecture: - Articles table: Main content storage with time-series partitioning - URL tracker: Deduplication and processing history management - Seeds table: Dynamic crawler target management with prioritization - Statistics: Performance metrics and monitoring counters - History: Content versioning and change detection tracking Technical Features: - High-throughput write optimization for large-scale scraping - Horizontal scaling support across multiple Cassandra nodes - Intelligent deduplication using URL and content hashing - Database-driven seed management replacing file-based approach - Async/await integration with existing scraper architecture - Production-ready error handling and connection management - Time-series analytics for crawl performance monitoring Infrastructure: - Complete Docker Compose stack with Cassandra cluster - Web UI for database administration and monitoring - Health checks and automated service orchestration - Volume persistence and network configuration - Integration with existing Kubernetes deployment
1 parent bea3843 commit 056cf45

9 files changed

Lines changed: 1233 additions & 1 deletion

README.md

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ This project demonstrates enterprise-level software design patterns:
3131
## 🚀 Technical Stack
3232

3333
- **AI/ML Framework**: LangChain with prompt engineering and structured output parsing
34+
- **Database**: Apache Cassandra for distributed data storage and deduplication
3435
- **Browser Automation**: Crawlee + Playwright for sophisticated queue management
3536
- **Data Validation**: Pydantic v2 with advanced type checking and serialization
3637
- **Content Extraction**: Multi-method approach (Newspaper3k + Trafilatura + custom)
@@ -72,6 +73,14 @@ This project demonstrates enterprise-level software design patterns:
7273
- **Security**: Pod security standards, RBAC, and minimal privilege execution
7374
- **Reliability**: Failure recovery, resource cleanup, and graceful degradation
7475

76+
### Distributed Database Engineering
77+
- **Cassandra Integration**: High-performance, scalable NoSQL database for web scraping data
78+
- **Data Deduplication**: Intelligent URL and content duplicate detection and prevention
79+
- **Dynamic Seed Management**: Database-driven crawler seed URL management and prioritization
80+
- **Time-Series Analytics**: Crawl statistics, performance metrics, and historical data tracking
81+
- **Content Versioning**: Track article changes over time with automated change detection
82+
- **Horizontal Scaling**: Distributed architecture supporting multi-node deployments
83+
7584
### Testing & Quality Assurance
7685
- **Test-Driven Development**: 26+ automated tests covering multiple scenarios
7786
- **Integration Testing**: End-to-end workflow validation
@@ -92,6 +101,7 @@ This project demonstrates enterprise-level software design patterns:
92101
## 🏛️ Core Features
93102

94103
- **AI-Enhanced Content Analysis**: LangChain-powered summarization, sentiment analysis, and topic classification
104+
- **Distributed Database Storage**: Cassandra integration with deduplication and analytics
95105
- **Multi-Parser Architecture**: Automatic parser selection based on URL fingerprinting
96106
- **Advanced Kubernetes Orchestration**: Enterprise-grade batch processing with auto-scaling
97107
- **Production Pipelines**: Complete CI/CD with automated testing, building, and deployment
@@ -306,4 +316,4 @@ jobs:
306316
307317
## 🔧 Technical Keywords
308318
309-
`Python` • `LangChain` • `AI/ML Engineering` • `Async/Await` • `Pydantic` • `Playwright` • `Docker` • `Kubernetes` • `GitHub Actions` • `Test-Driven Development` • `Clean Architecture` • `Design Patterns` • `Type Safety` • `CI/CD` • `Container Orchestration` • `Web Scraping` • `Parser Registry` • `Strategy Pattern` • `Prompt Engineering` • `Content Analysis`
319+
`Python` • `Apache Cassandra` • `Distributed Systems` • `LangChain` • `AI/ML Engineering` • `Async/Await` • `Pydantic` • `Playwright` • `Docker` • `Kubernetes` • `GitHub Actions` • `Test-Driven Development` • `Clean Architecture` • `Design Patterns` • `Type Safety` • `CI/CD` • `Container Orchestration` • `Web Scraping` • `Parser Registry` • `Strategy Pattern` • `Prompt Engineering` • `Content Analysis` • `Database Engineering` • `Data Deduplication` • `Time-Series Analytics`

demo_cassandra_integration.py

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
#!/usr/bin/env python3
2+
"""
3+
Cassandra Database Integration Demo
4+
Demonstrates distributed data storage, deduplication, and seed management.
5+
"""
6+
7+
import asyncio
8+
from datetime import datetime
9+
from src.database.cassandra_manager import CassandraManager, CassandraConfig
10+
from src.schemas.news import NewsArticle
11+
12+
13+
async def main():
14+
"""Demonstrate Cassandra database integration capabilities."""
15+
print("🗄️ Cassandra Database Integration Demo")
16+
print("=" * 50)
17+
18+
# Configuration
19+
config = CassandraConfig(
20+
hosts=["localhost"],
21+
keyspace="web_scraper_demo",
22+
replication_factor=1
23+
)
24+
25+
try:
26+
# Initialize database connection
27+
print("\n🔌 Connecting to Cassandra...")
28+
manager = CassandraManager(config)
29+
await manager.connect()
30+
print("✅ Connected successfully!")
31+
32+
# Demo 1: Store sample articles
33+
print("\n📰 Storing sample articles...")
34+
35+
sample_articles = [
36+
# Only title and url are required - all other fields are Optional
37+
NewsArticle( # type: ignore
38+
title="Revolutionary AI Breakthrough in Healthcare",
39+
content="Researchers have developed an AI system that can diagnose diseases "
40+
"with 95% accuracy, potentially transforming medical care worldwide.",
41+
url="https://example.com/ai-healthcare-breakthrough",
42+
author="Dr. Jane Smith"
43+
),
44+
NewsArticle( # type: ignore
45+
title="Climate Change Solutions: New Carbon Capture Technology",
46+
content="Scientists unveil innovative carbon capture technology that could "
47+
"remove millions of tons of CO2 from the atmosphere annually.",
48+
url="https://example.com/carbon-capture-tech"
49+
),
50+
NewsArticle( # type: ignore
51+
title="Quantum Computing Milestone Achieved",
52+
content="Tech giant announces quantum computer with 1000+ qubits, bringing "
53+
"practical quantum computing closer to reality.",
54+
url="https://example.com/quantum-milestone",
55+
author="Tech Reporter"
56+
)
57+
]
58+
59+
stored_count = 0
60+
duplicate_count = 0
61+
62+
for article in sample_articles:
63+
was_stored = await manager.store_article(article, "generic_news")
64+
if was_stored:
65+
stored_count += 1
66+
print(f" ✅ Stored: {article.title[:50]}...")
67+
else:
68+
duplicate_count += 1
69+
print(f" ⚠️ Duplicate: {article.title[:50]}...")
70+
71+
print(f"\n📊 Storage Results: {stored_count} stored, {duplicate_count} duplicates")
72+
73+
# Demo 2: Test deduplication
74+
print("\n🔄 Testing deduplication...")
75+
76+
# Try to store the same article again
77+
duplicate_article = sample_articles[0] # First article again
78+
was_stored = await manager.store_article(duplicate_article, "generic_news")
79+
80+
if not was_stored:
81+
print("✅ Deduplication working correctly - duplicate detected and skipped")
82+
else:
83+
print("❌ Deduplication failed - duplicate was stored")
84+
85+
# Demo 3: Add seed URLs
86+
print("\n🌱 Managing seed URLs...")
87+
88+
seed_urls = [
89+
{
90+
"url": "https://techcrunch.com",
91+
"label": "h2 a",
92+
"parser": "news",
93+
"priority": 8
94+
},
95+
{
96+
"url": "https://news.ycombinator.com",
97+
"label": "a.storylink",
98+
"parser": "news",
99+
"priority": 6
100+
},
101+
{
102+
"url": "https://reddit.com/r/technology",
103+
"label": "a[data-click-id='body']",
104+
"parser": "generic_news",
105+
"priority": 5
106+
}
107+
]
108+
109+
for seed in seed_urls:
110+
await manager.add_seed_url(
111+
url=seed["url"],
112+
label=seed["label"],
113+
parser=seed["parser"],
114+
priority=seed["priority"]
115+
)
116+
print(f" ✅ Added seed: {seed['url']}")
117+
118+
# Demo 4: Retrieve seeds from database
119+
print("\n📋 Retrieving seeds from database...")
120+
121+
seeds = await manager.get_seed_urls(limit=10)
122+
print(f"Found {len(seeds)} active seeds:")
123+
124+
for i, seed in enumerate(seeds, 1):
125+
print(f" {i}. {seed['url']}")
126+
print(f" Label: {seed['label']}")
127+
print(f" Parser: {seed['parser']}")
128+
129+
# Demo 5: Get crawl statistics
130+
print("\n📈 Crawl Statistics...")
131+
132+
stats = await manager.get_crawl_statistics(days=1)
133+
if stats:
134+
for metric, count in stats.items():
135+
print(f" {metric}: {count}")
136+
else:
137+
print(" No statistics available yet")
138+
139+
print("\n🎯 Key Features Demonstrated:")
140+
print(" ✅ Distributed data storage with Cassandra")
141+
print(" ✅ Automatic URL and content deduplication")
142+
print(" ✅ Dynamic seed URL management from database")
143+
print(" ✅ Time-series data tracking and statistics")
144+
print(" ✅ Scalable architecture for high-volume scraping")
145+
print(" ✅ Content versioning and change tracking")
146+
147+
print("\n🔧 Database Architecture:")
148+
print(" • Articles table: Main content storage with partitioning")
149+
print(" • URL tracker: Deduplication and processing history")
150+
print(" • Seeds table: Dynamic crawl target management")
151+
print(" • Statistics: Performance metrics and monitoring")
152+
print(" • History: Content versioning and change detection")
153+
154+
print("\n🚀 Production Benefits:")
155+
print(" • High write throughput for large-scale scraping")
156+
print(" • Horizontal scaling across multiple nodes")
157+
print(" • No single point of failure with replication")
158+
print(" • Efficient time-series data for analytics")
159+
print(" • Schema flexibility for varying content structures")
160+
161+
# Cleanup
162+
await manager.close()
163+
print("\n✨ Demo completed successfully!")
164+
165+
except Exception as e:
166+
print(f"\n❌ Demo failed: {e}")
167+
print("\n💡 Make sure Cassandra is running:")
168+
print(" docker-compose -f docker-compose.cassandra.yml up -d cassandra")
169+
170+
171+
if __name__ == "__main__":
172+
asyncio.run(main())
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Cassandra initialization script
2+
# Creates keyspace and initial seed data
3+
4+
# Create keyspace
5+
CREATE KEYSPACE IF NOT EXISTS web_scraper
6+
WITH replication = {
7+
'class': 'SimpleStrategy',
8+
'replication_factor': 1
9+
};
10+
11+
USE web_scraper;
12+
13+
# Sample seed URLs
14+
INSERT INTO seeds (seed_id, url, label, parser, priority, added_at, success_count, failure_count, status, metadata)
15+
VALUES (uuid(), 'https://example.com/news', 'a.article-link', 'news', 5, toTimestamp(now()), 0, 0, 'active', {});
16+
17+
INSERT INTO seeds (seed_id, url, label, parser, priority, added_at, success_count, failure_count, status, metadata)
18+
VALUES (uuid(), 'https://techcrunch.com', 'a[data-module="ArticleCard"]', 'news', 8, toTimestamp(now()), 0, 0, 'active', {});
19+
20+
INSERT INTO seeds (seed_id, url, label, parser, priority, added_at, success_count, failure_count, status, metadata)
21+
VALUES (uuid(), 'https://news.ycombinator.com', 'a.storylink', 'news', 6, toTimestamp(now()), 0, 0, 'active', {});
22+
23+
# Sample configuration
24+
INSERT INTO seeds (seed_id, url, label, parser, priority, added_at, success_count, failure_count, status, metadata)
25+
VALUES (uuid(), 'https://reddit.com/r/technology', 'a[data-click-id="body"]', 'generic_news', 4, toTimestamp(now()), 0, 0, 'active', {'source': 'reddit'});

docker-compose.cassandra.yml

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
version: '3.8'
2+
3+
services:
4+
cassandra:
5+
image: cassandra:4.1
6+
container_name: web-scraper-cassandra
7+
ports:
8+
- "9042:9042"
9+
- "9160:9160" # Thrift port (optional)
10+
environment:
11+
- CASSANDRA_CLUSTER_NAME=WebScraperCluster
12+
- CASSANDRA_DC=datacenter1
13+
- CASSANDRA_RACK=rack1
14+
- CASSANDRA_ENDPOINT_SNITCH=GossipingPropertyFileSnitch
15+
- CASSANDRA_NUM_TOKENS=256
16+
- MAX_HEAP_SIZE=1G
17+
- HEAP_NEWSIZE=200M
18+
volumes:
19+
- cassandra_data:/var/lib/cassandra
20+
- ./deployment/cassandra/cassandra.yaml:/etc/cassandra/cassandra.yaml
21+
- ./deployment/cassandra/init-scripts:/docker-entrypoint-initdb.d
22+
networks:
23+
- scraper-network
24+
healthcheck:
25+
test: ["CMD-SHELL", "cqlsh -e 'describe cluster'"]
26+
interval: 30s
27+
timeout: 10s
28+
retries: 5
29+
start_period: 2m
30+
31+
cassandra-web:
32+
image: markusgulden/cassandra-web:latest
33+
container_name: cassandra-web-ui
34+
ports:
35+
- "3000:3000"
36+
environment:
37+
- CASSANDRA_HOST=cassandra
38+
- CASSANDRA_PORT=9042
39+
depends_on:
40+
cassandra:
41+
condition: service_healthy
42+
networks:
43+
- scraper-network
44+
45+
web-scraper:
46+
build:
47+
context: .
48+
dockerfile: Dockerfile
49+
container_name: web-scraper-app
50+
environment:
51+
- CASSANDRA_HOSTS=cassandra
52+
- CASSANDRA_KEYSPACE=web_scraper
53+
- CASSANDRA_PORT=9042
54+
- PYTHONPATH=/app
55+
volumes:
56+
- ./storage:/app/storage
57+
- ./logs:/app/logs
58+
depends_on:
59+
cassandra:
60+
condition: service_healthy
61+
networks:
62+
- scraper-network
63+
command: ["python", "src/main.py", "--file", "seeds.txt"]
64+
65+
volumes:
66+
cassandra_data:
67+
driver: local
68+
69+
networks:
70+
scraper-network:
71+
driver: bridge

requirements.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@ aiofiles>=23.0.0
1212
kubernetes>=29.0.0
1313
structlog>=23.2.0
1414

15+
# Database dependencies
16+
cassandra-driver>=3.28.0
17+
pandas>=2.0.0 # For data analysis and export
18+
1519
# LangChain AI/ML dependencies
1620
langchain>=0.3.27
1721
langchain-community>=0.3.29

src/database/__init__.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
"""
2+
Database package initialization.
3+
Provides unified access to database components.
4+
"""
5+
6+
from .cassandra_manager import CassandraManager, CassandraConfig, create_cassandra_manager
7+
8+
__all__ = [
9+
'CassandraManager',
10+
'CassandraConfig',
11+
'create_cassandra_manager'
12+
]

0 commit comments

Comments
 (0)