-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsite_check.py
More file actions
132 lines (110 loc) · 3.89 KB
/
site_check.py
File metadata and controls
132 lines (110 loc) · 3.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# -*- coding: utf-8 -*-
from __future__ import print_function, unicode_literals
import sys
import os
import re
from datetime import datetime
from itertools import product
from tornado import gen, httpclient, ioloop
from tornado.ioloop import IOLoop
from tornado.queues import Queue
# Amount of parallel requests
concurrency = 10
PAGE = 'hc'
ZENDESK_PAGE = '.zendesk.com/hc'
TOP_DOMAINS = ["support", "help", "faq", "soporte", "service", "supporto", "sales", "hilfe", "customer", "kc", "kb", "contact", "ask", "ajuda", "ajuto", "aide", "helpdesk"]
TLD = [".com", ".co.uk", ".de", ".it"]
RESULT = {}
DOMAINS = {}
DEBUG = False
DEBUG_DIR = 'debug/'
def generate_url_list(domain):
"""
Generate urls list combination out of top domains + domain name + page
"""
lst = [ "http://" + s + "." + domain for s in TOP_DOMAINS ]
lst_out = [ a + b + "/" + PAGE for a, b in product(lst, TLD)]
zen_page = "http://" + domain + ZENDESK_PAGE
lst_out.append(zen_page)
for url in lst_out:
RESULT[url] = domain
RESULT[zen_page] = domain
return lst_out
@gen.coroutine
def main():
# Start consumer without waiting
# Tornado framework used for async IO
# http://www.tornadoweb.org/en/stable/index.html
q = Queue()
@gen.coroutine
def consumer():
item = yield q.get()
try:
code = False
try:
response = yield httpclient.AsyncHTTPClient().fetch(item)
codes = ['200', '301', '302']
code = any(s in response.headers['Status'] for s in codes)
rcode = response.code
if DEBUG:
fname = re.match(r'http://([\w+|.]+)/',item).group(1)
fname = os.path.join(DEBUG_DIR,fname.replace(".","_"))
with open(fname, 'w') as f:
for k,v in response.headers.get_all():
f.write(k+' '+v+'\n')
f.write('\n')
f.write(response.body)
f.close()
except Exception as e:
code = False
rcode = str(e)
print('%s,%s,%s,"%s"' %
(datetime.now(), item, code, rcode))
# Append to DOMAINS found URL
if code:
DOMAINS[RESULT[item]].append(item)
finally:
q.task_done()
@gen.coroutine
def worker():
while True:
yield consumer()
@gen.coroutine
def producer():
if DEBUG and not os.path.exists(DEBUG_DIR):
print('Creating debug out dir: %s' % DEBUG_DIR)
os.makedirs(DEBUG_DIR)
# Open and process file if supplied
if len(sys.argv) >= 2:
with open(sys.argv[1]) as f:
for line in f:
DOMAINS[line.strip()]= []
else:
print("Domains list file wasn't provided")
print("Usage: %s <domains.txt> [ report.txt ]" % sys.argv[0])
sys.exit(2)
# Generate processing list
for d in DOMAINS.keys():
for url in generate_url_list(d):
q.put(url)
yield producer()# Wait for producer to put all tasks.
# Start workers, then wait for the work queue to be empty.
for _ in range(concurrency):
worker()
yield q.join() # Wait for consumer to finish all tasks.
# Out results
if len(sys.argv) >= 3:
f = open(sys.argv[2],'w')
else:
f = sys.stdout
for key, val in DOMAINS.items():
if DOMAINS[key]:
DOMAINS[key] = '"'+" ".join(val)+'"'
else:
DOMAINS[key] = 'No'
out = "\n".join([",".join([key, str(val)]) for key, val in DOMAINS.items()]) + '\n'
f.write(out)
# Main IO Loop
if __name__ == '__main__':
io_loop = ioloop.IOLoop.current()
io_loop.run_sync(main)