diff --git a/app/service/file_svc.py b/app/service/file_svc.py index ae032b4a0..c8d319696 100644 --- a/app/service/file_svc.py +++ b/app/service/file_svc.py @@ -74,7 +74,22 @@ async def get_file(self, headers): async def save_file(self, filename, payload, target_dir, encrypt=True, encoding=None): if encoding: payload = await self._decode_contents(payload, encoding) - self._save(os.path.join(target_dir, filename), payload, encrypt) + full_path = os.path.join(target_dir, filename) + # Containment check — the resolved final path must not escape its parent. + # save_file has two legitimate call modes: + # (1) target_dir + basename — sink mode, used by agent contact handlers + # (DNS, FTP, Gist, Slack) that take filename from upload metadata. + # (2) '' + relative path — path mode, used by ability/source managers + # that assemble paths from sanitised IDs. + # Either mode is broken by a filename containing '..'. Resolving the final + # path and checking it stays under the parent rejects both classes with one + # check, without needing callers to know which mode they are using. + parent = os.path.realpath(target_dir) if target_dir else os.path.realpath(os.getcwd()) + final = os.path.realpath(full_path) + if final != parent and not final.startswith(parent + os.sep): + raise ValueError('save_file: path %r escapes parent %r (filename=%r, target_dir=%r)' + % (final, parent, filename, target_dir)) + self._save(full_path, payload, encrypt) async def create_exfil_sub_directory(self, dir_name): path = os.path.join(self.get_config('exfil_dir'), dir_name) diff --git a/tests/services/test_file_svc.py b/tests/services/test_file_svc.py index 88710937f..97080d8e1 100644 --- a/tests/services/test_file_svc.py +++ b/tests/services/test_file_svc.py @@ -45,6 +45,32 @@ def test_save_file(self, event_loop, file_svc, tmp_path): with open(file_location, "r") as file_contents: assert payload.decode("utf-8") == file_contents.read() + def test_save_file_rejects_path_traversal(self, event_loop, file_svc, tmp_path): + # save_file is reachable from the agent contact handlers (DNS, FTP, Gist, + # Slack) with an attacker-controlled filename. A '../../...' basename + # must NOT be allowed to escape target_dir, regardless of the encryption + # setting. Regression test for the unauthenticated file-write primitive + # that ended in pickle.loads on the next restart. + payload = b'attacker bytes' + # Avoid filesystem-resident paths like /etc/passwd as targets — the + # post-condition guard would false-positive on any pre-existing system + # file. Use names that cannot pre-exist by accident. + traversal_attempts = [ + '../caldera-traversal-canary-1.bin', + '../../caldera-traversal-canary-2.bin', + '../../../../tmp/caldera-traversal-canary-3.bin', + ] + for evil in traversal_attempts: + with pytest.raises(ValueError, match='escapes parent'): + event_loop.run_until_complete( + file_svc.save_file(evil, payload, str(tmp_path), encrypt=False) + ) + # Defense-in-depth: confirm the canary file the test name reserves + # wasn't actually written (proves save_file rejected BEFORE the I/O, + # not just that it raised after writing). + resolved = os.path.realpath(os.path.join(str(tmp_path), evil)) + assert not os.path.exists(resolved), 'save_file wrote to %s before raising' % resolved + def test_create_exfil_sub_directory(self, event_loop, file_svc): exfil_dir_name = 'unit-testing-Rocks' new_dir = event_loop.run_until_complete(file_svc.create_exfil_sub_directory(exfil_dir_name))