Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion app/service/file_svc.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,22 @@ async def get_file(self, headers):
async def save_file(self, filename, payload, target_dir, encrypt=True, encoding=None):
if encoding:
payload = await self._decode_contents(payload, encoding)
self._save(os.path.join(target_dir, filename), payload, encrypt)
full_path = os.path.join(target_dir, filename)
# Containment check — the resolved final path must not escape its parent.
# save_file has two legitimate call modes:
# (1) target_dir + basename — sink mode, used by agent contact handlers
# (DNS, FTP, Gist, Slack) that take filename from upload metadata.
# (2) '' + relative path — path mode, used by ability/source managers
# that assemble paths from sanitised IDs.
# Either mode is broken by a filename containing '..'. Resolving the final
# path and checking it stays under the parent rejects both classes with one
# check, without needing callers to know which mode they are using.
parent = os.path.realpath(target_dir) if target_dir else os.path.realpath(os.getcwd())
final = os.path.realpath(full_path)
if final != parent and not final.startswith(parent + os.sep):
raise ValueError('save_file: path %r escapes parent %r (filename=%r, target_dir=%r)'
% (final, parent, filename, target_dir))
self._save(full_path, payload, encrypt)

async def create_exfil_sub_directory(self, dir_name):
path = os.path.join(self.get_config('exfil_dir'), dir_name)
Expand Down
26 changes: 26 additions & 0 deletions tests/services/test_file_svc.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,32 @@ def test_save_file(self, event_loop, file_svc, tmp_path):
with open(file_location, "r") as file_contents:
assert payload.decode("utf-8") == file_contents.read()

def test_save_file_rejects_path_traversal(self, event_loop, file_svc, tmp_path):
# save_file is reachable from the agent contact handlers (DNS, FTP, Gist,
# Slack) with an attacker-controlled filename. A '../../...' basename
# must NOT be allowed to escape target_dir, regardless of the encryption
# setting. Regression test for the unauthenticated file-write primitive
# that ended in pickle.loads on the next restart.
payload = b'attacker bytes'
# Avoid filesystem-resident paths like /etc/passwd as targets — the
# post-condition guard would false-positive on any pre-existing system
# file. Use names that cannot pre-exist by accident.
traversal_attempts = [
'../caldera-traversal-canary-1.bin',
'../../caldera-traversal-canary-2.bin',
'../../../../tmp/caldera-traversal-canary-3.bin',
]
for evil in traversal_attempts:
with pytest.raises(ValueError, match='escapes parent'):
event_loop.run_until_complete(
file_svc.save_file(evil, payload, str(tmp_path), encrypt=False)
)
# Defense-in-depth: confirm the canary file the test name reserves
# wasn't actually written (proves save_file rejected BEFORE the I/O,
# not just that it raised after writing).
resolved = os.path.realpath(os.path.join(str(tmp_path), evil))
assert not os.path.exists(resolved), 'save_file wrote to %s before raising' % resolved

def test_create_exfil_sub_directory(self, event_loop, file_svc):
exfil_dir_name = 'unit-testing-Rocks'
new_dir = event_loop.run_until_complete(file_svc.create_exfil_sub_directory(exfil_dir_name))
Expand Down
Loading