-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathEXAMPLE-2
More file actions
executable file
·61 lines (47 loc) · 1.78 KB
/
Copy pathEXAMPLE-2
File metadata and controls
executable file
·61 lines (47 loc) · 1.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
#!/usr/bin/env python3
# -*- mode: python; coding: utf-8 -*-
assert str is not bytes
from lib_socks_proxy_2013_10_03 import monkey_patch as socks_proxy_monkey_patch
# XXX ``monkey_patch()`` must be run before other imports
socks_proxy_monkey_patch.monkey_patch()
import threading
from urllib import request as url_request
from lib_socks_proxy_2013_10_03 import socks_proxy_context
def proxy_thread(ui_lock, proxy_address, url):
# make request with using SOCKS proxy
opener = url_request.build_opener()
with socks_proxy_context.socks_proxy_context(proxy_address=proxy_address):
res = opener.open(url, timeout=20.0)
data = res.read(10000).decode()
with ui_lock:
print('*** BEGIN result of proxy_thread() ***')
print(data)
print('*** END result of proxy_thread() ***')
def non_proxy_thread(ui_lock, url):
# make request without proxy
opener = url_request.build_opener()
res = opener.open(url, timeout=20.0)
data = res.read(10000).decode()
with ui_lock:
print('*** BEGIN result of non_proxy_thread() ***')
print(data)
print('*** END result of non_proxy_thread() ***')
if __name__ == '__main__':
ui_lock = threading.RLock()
proxy_thr = threading.Thread(
target=lambda: proxy_thread(
ui_lock,
('127.0.0.1', 9050),
'https://internet.yandex.com/get_full_info/',
),
)
non_proxy_thr = threading.Thread(
target=lambda: non_proxy_thread(
ui_lock,
'https://internet.yandex.com/get_full_info/',
),
)
proxy_thr.start()
non_proxy_thr.start()
proxy_thr.join()
non_proxy_thr.join()