-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathIPQSmain.py
More file actions
102 lines (87 loc) · 4.61 KB
/
IPQSmain.py
File metadata and controls
102 lines (87 loc) · 4.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import time
start_time_ipqs = time.time()
import asyncio
import ipaddress
import json
import aiohttp
from common import Style, ips, timeout_set
from credentials import ipqs_api
all_ipqs_ips = []
async def ipqsmain(address, i, session):
try:
ipqs_url = f'https://ipqualityscore.com/api/json/ip/{ipqs_api}/{address}'
async with session.get(ipqs_url) as response:
ipqs_response_json = await response.json()
print(f"IP {i}/{len(ips)} {response.status} {response.reason} for {address} on IPQS")
# print(ipqs_response_json)
# print(f"response start {ipqs_response_json} responseend")
if ipqs_response_json['success'] is False:
ipqs_ip = f'{address}'
ipqs_res = -1
ipqs_link = ipqs_istor = ipqs_ra = ipqs_bt = ipqs_ic = ipqs_p = ipqs_v = f"INVALID RESULT - {ipqs_response_json['message']}"
else:
ipqs_ip = ipqs_response_json["host"]
ipqs_link = f"https://www.ipqualityscore.com/free-ip-lookup-proxy-vpn-test/lookup/{address}"
ipqs_istor = ipqs_response_json["tor"]
ipqs_res = ipqs_response_json["fraud_score"]
ipqs_ra = ipqs_response_json["recent_abuse"]
ipqs_bt = ipqs_response_json["bot_status"]
ipqs_ic = ipqs_response_json["is_crawler"]
ipqs_p = ipqs_response_json["proxy"]
ipqs_v = ipqs_response_json["vpn"]
if ipqs_res > 75:
print(f'\t{Style.RED_Highlighted}Fraud Score: {ipqs_res}{Style.RESET}')
temp = {'IPQS_IP': ipqs_ip, 'IPQS_Link': ipqs_link, 'IPQS_Fraud_Score': ipqs_res,
'IPQS_isTor': ipqs_istor,
'IPQS_Recent_abuse': ipqs_ra,
'IPQS_bot_status': ipqs_bt, 'IPQS_is_crawler': ipqs_ic, 'IPQS_proxy': ipqs_p,
'IPQS_vpn': ipqs_v}
all_ipqs_ips.append(temp)
return ipqs_response_json
except asyncio.TimeoutError:
print(f"Request to {address} timed out after {timeout_set }seconds on IPQS")
ipqs_response_json = {'IP': f"{address}", 'IPQS_Fraud_Score': -1,
'success': False,
'message': f"INVALID RESULT - Request to {address} timed out after {timeout_set} seconds on IPQS. Try increasing timeout value",
'ipqs_istor':
f"INVALID RESULT - Request to {address} timed out after {timeout_set} seconds on IPQS. Try increasing timeout value."}
all_ipqs_ips.append(ipqs_response_json)
return ipqs_response_json
except aiohttp.ClientError as ex:
print(f"IP {i}/{len(ips)} Error for {address} on IPQS: {ex}")
async def main():
tasks = []
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout_set)) as session:
for i, ip in enumerate(ips):
i += 1
try:
address = ipaddress.ip_address(ip)
except ValueError:
print(f"IP {i}/{len(ips)} {Style.RED}Entered IP '{ip}' is not a valid IP!{Style.RESET}")
continue
if not address.is_private:
task = asyncio.create_task(ipqsmain(address, i, session))
tasks.append(task)
elif address.is_private:
print(f"IP {i}/{len(ips)} {Style.BLUE}Given IP {address} is Private{Style.RESET}")
else:
print(
f"IP {i}/{len(ips)} {Style.RED_Highlighted}Something gone terribly wrong. This line should never run{Style.RESET}")
await asyncio.gather(*tasks)
sorted_ipqs_ips = sorted(all_ipqs_ips, key=lambda x: (x['IPQS_Fraud_Score']), reverse=True)
print("\nMain Output:")
for i, result in enumerate(sorted_ipqs_ips):
if result['IPQS_Fraud_Score'] == -1:
print(f"{Style.GREY} {i + 1} {json.dumps(result, indent=3)}{Style.RESET}")
elif result['IPQS_Fraud_Score'] > 25:
print(f"{Style.RED_Highlighted} {i + 1} {json.dumps(result, indent=3)}{Style.RESET}")
elif result['IPQS_Fraud_Score'] > 10:
print(f"{Style.RED} {i + 1}: {json.dumps(result, indent=3)}{Style.RESET}")
elif result['IPQS_Fraud_Score'] > 2:
print(f"{Style.YELLOW} {i + 1}: {json.dumps(result, indent=3)}{Style.RESET}")
else:
print(f"{Style.GREEN} {i + 1}: {json.dumps(result, indent=3)}{Style.RESET}")
if __name__ == "__main__":
print("Executing directly")
asyncio.run(main())
print(f"Result received within {time.time() - start_time_ipqs} seconds!")