-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_concurrent_operations.py
More file actions
166 lines (139 loc) · 5.26 KB
/
test_concurrent_operations.py
File metadata and controls
166 lines (139 loc) · 5.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
#!/usr/bin/env python3
"""
Test script to verify that searching works while scraping is running.
This script will start a scraping process and then perform searches concurrently.
"""
import requests
import time
import json
import threading
from concurrent.futures import ThreadPoolExecutor
# Configuration
BASE_URL = "http://localhost:5000/api"
ADMIN_USERNAME = "admin"
ADMIN_PASSWORD = "dinosaurus123" # Update with your actual password
def login():
"""Login and get admin token"""
response = requests.post(f"{BASE_URL}/login", json={
"username": ADMIN_USERNAME,
"password": ADMIN_PASSWORD
})
if response.status_code == 200:
return response.json()["token"]
else:
print(f"Login failed: {response.status_code} - {response.text}")
return None
def start_scraping(token):
"""Start scraping process"""
headers = {"X-Admin-Token": token}
response = requests.post(f"{BASE_URL}/scrape", json={
"max_pages": 10,
"max_depth": 1,
"concurrency": 3,
"timeout": 10,
"retries": 1,
"ignore_existing": True,
"resume": False
}, headers=headers)
if response.status_code == 200:
data = response.json()
print(f"✅ Scraping started successfully (PID: {data.get('pid', 'unknown')})")
return True
else:
print(f"❌ Failed to start scraping: {response.status_code} - {response.text}")
return False
def check_scrape_status(token):
"""Check scraping process status"""
headers = {"X-Admin-Token": token}
response = requests.get(f"{BASE_URL}/scrape/status", headers=headers)
if response.status_code == 200:
return response.json()
else:
print(f"❌ Failed to check scrape status: {response.status_code}")
return None
def perform_search(query):
"""Perform a search query"""
response = requests.post(f"{BASE_URL}/search", json={"query": query})
if response.status_code == 200:
data = response.json()
results_count = len(data.get("results", []))
print(f"🔍 Search '{query}': {results_count} results")
return True
else:
print(f"❌ Search failed for '{query}': {response.status_code}")
return False
def search_worker(search_queries, results):
"""Worker function for concurrent searches"""
for query in search_queries:
success = perform_search(query)
results.append(success)
time.sleep(1) # Small delay between searches
def main():
print("🧪 Testing concurrent scraping and searching operations...")
# Login
print("\n1. Logging in...")
token = login()
if not token:
print("❌ Cannot proceed without authentication")
return
print("✅ Login successful")
# Start scraping
print("\n2. Starting scraping process...")
if not start_scraping(token):
print("❌ Cannot proceed without scraping")
return
# Wait a moment for scraping to start
time.sleep(2)
# Check initial status
print("\n3. Checking scraping status...")
status = check_scrape_status(token)
if status:
print(f" Status: {status['status']}")
print(f" Running: {status['running']}")
if status.get('pid'):
print(f" PID: {status['pid']}")
# Perform concurrent searches
print("\n4. Performing concurrent searches...")
search_queries = [
"tyrannosaurus",
"fossils",
"paleontology",
"dinosaur extinction",
"cretaceous period",
"velociraptor",
"triceratops",
"stegosaurus"
]
results = []
# Use ThreadPoolExecutor for concurrent searches
with ThreadPoolExecutor(max_workers=3) as executor:
# Split queries into chunks for different workers
chunk_size = len(search_queries) // 3
chunks = [search_queries[i:i + chunk_size] for i in range(0, len(search_queries), chunk_size)]
# Submit search tasks
futures = []
for chunk in chunks:
future = executor.submit(search_worker, chunk, results)
futures.append(future)
# Wait for all searches to complete
for future in futures:
future.result()
# Check final scraping status
print("\n5. Checking final scraping status...")
status = check_scrape_status(token)
if status:
print(f" Status: {status['status']}")
print(f" Running: {status['running']}")
# Summary
print("\n📊 Test Summary:")
successful_searches = sum(results)
total_searches = len(results)
print(f" Successful searches: {successful_searches}/{total_searches}")
print(f" Search success rate: {(successful_searches/total_searches)*100:.1f}%")
if successful_searches == total_searches:
print("✅ All searches completed successfully while scraping was running!")
else:
print("⚠️ Some searches failed, but this might be due to server load or other factors")
print("\n🎉 Concurrent operations test completed!")
if __name__ == "__main__":
main()