-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathopsec_requests.py
More file actions
191 lines (148 loc) · 6.34 KB
/
opsec_requests.py
File metadata and controls
191 lines (148 loc) · 6.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
"""
OPSEC-Enhanced HTTP Request Wrapper
Provides safe, anonymized HTTP requests for OSINT tools
"""
import aiohttp
import logging
from typing import Optional, Dict, Any
from opsec_config import OPSECConfig
# Try to import Tor support, but don't fail if unavailable
try:
from aiohttp_socks import ProxyConnector
TOR_AVAILABLE = True
except ImportError:
TOR_AVAILABLE = False
ProxyConnector = None
logger = logging.getLogger(__name__)
class OPSECSession:
"""HTTP session with OPSEC protections built-in"""
@staticmethod
async def get(url: str, **kwargs) -> aiohttp.ClientResponse:
"""
Make an OPSEC-protected GET request
Args:
url: Target URL
**kwargs: Additional aiohttp parameters (will be merged with OPSEC config)
Returns:
aiohttp.ClientResponse object
"""
# Apply random delay before request (human-like timing)
await OPSECConfig.random_delay()
# Get OPSEC configuration
opsec_config = OPSECConfig.get_request_config()
# Merge with any user-provided kwargs
headers = opsec_config.get('headers', {})
if 'headers' in kwargs:
headers.update(kwargs['headers'])
timeout = aiohttp.ClientTimeout(total=opsec_config.get('timeout', 30))
# Build connector with proxy if Tor is enabled
connector_kwargs = {}
if OPSECConfig.USE_TOR:
# Note: Requires aiohttp-socks for Tor support
# Will be added in Phase 2
logger.info(f"Tor routing requested but not yet implemented - using direct connection")
# Create session and make request
async with aiohttp.ClientSession(
headers=headers,
timeout=timeout,
connector=aiohttp.TCPConnector(**connector_kwargs)
) as session:
logger.debug(f"OPSEC Request: {url}")
logger.debug(f"User-Agent: {headers.get('User-Agent', 'None')[:50]}...")
async with session.get(url, **kwargs) as response:
return response
@staticmethod
async def get_text(url: str, **kwargs) -> str:
"""
Convenience method: GET request returning text
Args:
url: Target URL
**kwargs: Additional aiohttp parameters
Returns:
Response text as string
"""
# Apply OPSEC delay
await OPSECConfig.random_delay()
# Get OPSEC headers
headers = OPSECConfig.get_request_headers()
if 'headers' in kwargs:
headers.update(kwargs['headers'])
timeout = aiohttp.ClientTimeout(total=30)
# Create connector with Tor if enabled
connector = None
if OPSECConfig.USE_TOR:
if not TOR_AVAILABLE:
logger.warning("Tor requested but aiohttp-socks not available. Using direct connection.")
else:
logger.info(f"Routing through Tor: {url}")
connector = ProxyConnector.from_url('socks5://127.0.0.1:9050')
logger.debug(f"OPSEC GET: {url}")
async with aiohttp.ClientSession(headers=headers, timeout=timeout, connector=connector) as session:
async with session.get(url, **kwargs) as response:
return await response.text()
@staticmethod
async def get_json(url: str, **kwargs) -> Dict:
"""
Convenience method: GET request returning JSON
Args:
url: Target URL
**kwargs: Additional aiohttp parameters
Returns:
Parsed JSON as dict
"""
# Apply OPSEC delay
await OPSECConfig.random_delay()
# Get OPSEC headers
headers = OPSECConfig.get_request_headers()
if 'headers' in kwargs:
headers.update(kwargs['headers'])
timeout = aiohttp.ClientTimeout(total=30)
# Create connector with Tor if enabled
connector = None
if OPSECConfig.USE_TOR:
if not TOR_AVAILABLE:
logger.warning("Tor requested but aiohttp-socks not available. Using direct connection.")
else:
logger.info(f"Routing through Tor (JSON): {url}")
connector = ProxyConnector.from_url('socks5://127.0.0.1:9050')
logger.debug(f"OPSEC GET JSON: {url}")
async with aiohttp.ClientSession(headers=headers, timeout=timeout, connector=connector) as session:
async with session.get(url, **kwargs) as response:
return await response.json()
@staticmethod
async def post(url: str, data: Optional[Any] = None, **kwargs) -> aiohttp.ClientResponse:
"""
Make an OPSEC-protected POST request
Args:
url: Target URL
data: POST data
**kwargs: Additional aiohttp parameters
Returns:
aiohttp.ClientResponse object
"""
# Apply random delay
await OPSECConfig.random_delay()
# Get OPSEC headers
headers = OPSECConfig.get_request_headers()
if 'headers' in kwargs:
headers.update(kwargs['headers'])
timeout = aiohttp.ClientTimeout(total=30)
async with aiohttp.ClientSession(headers=headers, timeout=timeout) as session:
logger.debug(f"OPSEC POST: {url}")
async with session.post(url, data=data, **kwargs) as response:
return response
# Convenience functions for quick use
async def opsec_get(url: str, **kwargs) -> str:
"""Quick OPSEC-protected GET returning text"""
return await OPSECSession.get_text(url, **kwargs)
async def opsec_get_json(url: str, **kwargs) -> Dict:
"""Quick OPSEC-protected GET returning JSON"""
return await OPSECSession.get_json(url, **kwargs)
def log_opsec_status():
"""Log current OPSEC configuration"""
logger.info("=== OPSEC Status ===")
logger.info(f"Tor Enabled: {OPSECConfig.USE_TOR}")
logger.info(f"User-Agent Randomization: {OPSECConfig.RANDOMIZE_USER_AGENT}")
logger.info(f"Timing Randomization: {OPSECConfig.RANDOMIZE_TIMING}")
logger.info(f"Average Delay: {OPSECConfig.MEAN_DELAY}s")
logger.info("===================")