-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcmdline_secrets.py
More file actions
170 lines (146 loc) · 6.38 KB
/
cmdline_secrets.py
File metadata and controls
170 lines (146 loc) · 6.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
import os
import sys
from time import time,sleep
import json
from datetime import datetime
import psutil
from detect_secrets.core.secrets_collection import SecretsCollection
from detect_secrets.settings import transient_settings
OUT_FOLDER = "output_cmdline_secrets"
SECRETS_OUTPUT_FILE = f"secrets_{datetime.now().strftime('%Y-%m-%d_%H:%M:%S')}.log"
def log_ts(msg) -> None:
msg = f"[{datetime.now().strftime('%H:%M:%S')}] {msg}"
print(msg)
def print_trace(trace: list[dict]) -> None:
print("\nPID\t\tPPID\t\tCMDLINE")
for t in trace:
print(f"{t['pid']}\t\t{t['ppid']}\t\t{t['cmdline']}")
def get_process_command_line(all_process_info: set) -> tuple[set[str], set[str]]:
"""Main function for capturing process information"""
relevant_process_info = set()
for proc in psutil.process_iter(["pid", "ppid", "name", "cmdline"]):
try:
info = proc.info
# cmdlines with arguments will be > 1
if info["cmdline"]:
proc_object = {
"pid": info["pid"],
"ppid": info["ppid"],
"name": info["name"],
"cmdline": " ".join(info["cmdline"])
}
all_process_info.add(json.dumps(proc_object))
if len(info["cmdline"]) > 1:
relevant_process_info.add(json.dumps(proc_object))
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
return all_process_info, relevant_process_info
def get_ppid_info(proc: dict, observations: list[dict], trace: list[dict]) -> list[dict]:
"""Recursive function to capture process tree for a given process"""
ppid_to_find = proc["ppid"]
for p in observations:
if p["pid"] != ppid_to_find:
continue
trace.append(p)
if p["pid"] == 1:
return trace
return get_ppid_info(p, observations, trace)
return trace
def find_occurances(observations: list[dict], secret: str) -> None:
for proc in observations:
cmdline = proc["cmdline"]
if secret != cmdline:
continue
trace = []
trace += get_ppid_info(proc, observations, trace=[])
log_ts("[\033[92m+\033[00m] Process tree for detected secret:")
trace.reverse()
print_trace(trace)
print(f"{proc['pid']}\t\t{proc['ppid']}\t\t\033[91m{proc['cmdline']}\033[00m\n")
def get_secret_from_outfile(file_path: str, line_no: int) -> str:
line_no = line_no-1
with open(file_path, "r", encoding="utf-8") as f:
for i, line in enumerate(f):
if i == line_no:
return line.replace("\n","")
log_ts(f"[!] Could not find line #{line_no} in {file_path}")
sys.exit(1)
def main(found_secrets=[]):
start_time = time()
output = []
# Contains every unique process (pid,ppid,cmdline) observed
all_process_info_cache = set()
# Contains every unique process (pid,ppid,cmdline) with cmdline arguments observed
relevant_process_info_cache = set()
# Main loop capturing process details, breaking every 5 seconds to scan for secrets
while time() - start_time < 5:
all_process_info, relevant_process_info = get_process_command_line(all_process_info_cache)
all_process_info_cache = all_process_info_cache | all_process_info
relevant_process_info_cache = relevant_process_info_cache | relevant_process_info
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
output.append({"timestamp": timestamp,
"processes": [json.loads(s) for s in relevant_process_info]})
sleep(1)
all_process_info_cache = [json.loads(s) for s in all_process_info_cache]
relevant_process_info_cache = [json.loads(s) for s in relevant_process_info_cache]
output_file = f"capture_{datetime.now().strftime('%Y-%m-%d')}"
with open(f"{OUT_FOLDER}/{output_file}.json", "w", encoding="utf-8") as f:
json.dump(output, f, indent=4)
secrets_target = f"{OUT_FOLDER}/uniq_{output_file}.txt"
with open(secrets_target, "w+", encoding="utf-8") as f:
for process in relevant_process_info_cache:
f.write(process["cmdline"]+"\n")
# Detect secrets from relevant unique processes
secrets = SecretsCollection()
with transient_settings({
"plugins_used": [
{"name":"ArtifactoryDetector"},
{"name":"AWSKeyDetector"},
{"name":"AzureStorageKeyDetector"},
{"name":"BasicAuthDetector"},
{"name":"CloudantDetector"},
{"name":"DiscordBotTokenDetector"},
{"name":"GitHubTokenDetector"},
{"name":"GitLabTokenDetector"},
{"name":"Base64HighEntropyString"},
{"name":"HexHighEntropyString"},
{"name":"IbmCloudIamDetector"},
{"name":"IbmCosHmacDetector"},
{"name":"IPPublicDetector"},
{"name":"JwtTokenDetector"},
{"name":"KeywordDetector"},
{"name":"MailchimpDetector"},
{"name":"NpmDetector"},
{"name":"OpenAIDetector"},
{"name":"PrivateKeyDetector"},
{"name":"PypiTokenDetector"},
{"name":"SendGridDetector"},
{"name":"SlackDetector"},
{"name":"SoftlayerDetector"},
{"name":"SquareOAuthDetector"},
{"name":"StripeDetector"},
{"name":"TelegramBotTokenDetector"},
{"name":"TwilioKeyDetector"}
],
"filters_used": []
}):
secrets.scan_file(secrets_target)
secrets_found_raw = secrets.json()
if secrets_target not in secrets_found_raw.keys():
secrets_found = []
else:
secrets_found = secrets_found_raw[secrets_target]
for secret in secrets_found:
secret_hash = secret["hashed_secret"]
if secret_hash in found_secrets:
continue
log_ts(f"[\033[92m+\033[00m] \033[1m\033[92mFound a secret! (Type={secret['type']}, Verified={secret['is_verified']})\033[00m\033[00m")
line_no = int(secret["line_number"])
secret_raw = get_secret_from_outfile(secrets_target, line_no)
find_occurances(observations=all_process_info_cache, secret=secret_raw)
found_secrets.append(secret_hash)
main(found_secrets) # Go again!
if __name__ == "__main__":
os.makedirs(OUT_FOLDER, exist_ok=True)
log_ts("[\033[33m~\033[0m] Watching processes for secrets (Ctrl+C to exit)...")
main()