From 09dfc2bc6a248c9efb4bfbf30d9966249b056cbe Mon Sep 17 00:00:00 2001 From: "assisted-by-ai (Bot Account)" Date: Tue, 25 Nov 2025 03:27:03 -0500 Subject: [PATCH] Add malicious control character coverage --- run-tests | 15 ++++- .../python3/dist-packages/stdisplay/stcat.py | 2 +- .../python3/dist-packages/stdisplay/stcatn.py | 6 +- .../python3/dist-packages/stdisplay/stecho.py | 12 ++-- .../dist-packages/stdisplay/stsponge.py | 23 +++++-- .../python3/dist-packages/stdisplay/sttee.py | 37 +++++++---- .../dist-packages/stdisplay/tests/__init__.py | 17 +++++ .../dist-packages/stdisplay/tests/stcat.py | 5 ++ .../dist-packages/stdisplay/tests/stcatn.py | 5 ++ .../dist-packages/stdisplay/tests/stecho.py | 4 ++ .../dist-packages/stdisplay/tests/stsponge.py | 66 +++++++++++++++++++ .../dist-packages/stdisplay/tests/sttee.py | 44 +++++++++++++ 12 files changed, 209 insertions(+), 27 deletions(-) diff --git a/run-tests b/run-tests index a93a8d17..ae04263c 100755 --- a/run-tests +++ b/run-tests @@ -6,13 +6,26 @@ pythonpath="${git_toplevel}/usr/lib/python3/dist-packages" export PYTHONPATH="${pythonpath}${PYTHONPATH+":${PYTHONPATH}"}" pytest=(python3 -m pytest -o 'python_files=tests/*.py') +pytest_args=() +for arg in "$@"; do + test_path="${pythonpath}/stdisplay/tests/${arg}.py" + if [[ "${arg}" != -* && "${arg}" != */* && -f "${test_path}" ]]; then + pytest_args+=("tests/${arg}.py") + else + pytest_args+=("${arg}") + fi +done + +if [[ ${#pytest_args[@]} -eq 0 ]]; then + pytest_args=("${@}") +fi black=(black --config="${pyrc}" --color --diff --check) pylint=(pylint --rcfile="${pyrc}") mypy=(mypy --config-file="${pyrc}") cd "${pythonpath}/stdisplay/" # Ideally, these variables should be ignored by the tests... -NO_COLOR="" COLORTERM="" TERM="xterm-direct" "${pytest[@]}" "${@}" +NO_COLOR="" COLORTERM="" TERM="xterm" "${pytest[@]}" "${pytest_args[@]}" "${black[@]}" . find . -type f -name "*.py" -print0 | xargs -0 "${pylint[@]}" "${mypy[@]}" . diff --git a/usr/lib/python3/dist-packages/stdisplay/stcat.py b/usr/lib/python3/dist-packages/stdisplay/stcat.py index 3f3e1e5f..6ac73e41 100755 --- a/usr/lib/python3/dist-packages/stdisplay/stcat.py +++ b/usr/lib/python3/dist-packages/stdisplay/stcat.py @@ -16,7 +16,7 @@ def main() -> None: """Safely print stdin or file to stdout.""" # https://github.com/pytest-dev/pytest/issues/4843 if "pytest" not in modules and stdin is not None: - stdin.reconfigure(errors="ignore") # type: ignore + stdin.reconfigure(errors="replace") # type: ignore if len(argv) == 1: if stdin is not None: for untrusted_line in stdin: diff --git a/usr/lib/python3/dist-packages/stdisplay/stcatn.py b/usr/lib/python3/dist-packages/stdisplay/stcatn.py index 17c74cb2..ddbce438 100755 --- a/usr/lib/python3/dist-packages/stdisplay/stcatn.py +++ b/usr/lib/python3/dist-packages/stdisplay/stcatn.py @@ -21,7 +21,7 @@ def main() -> None: """ # https://github.com/pytest-dev/pytest/issues/4843 if "pytest" not in modules and stdin is not None: - stdin.reconfigure(errors="ignore") # type: ignore + stdin.reconfigure(errors="replace") # type: ignore if len(argv) == 1: if stdin is not None: for untrusted_line in stdin: @@ -37,7 +37,9 @@ def main() -> None: ## We cannot read the entire file in at once like we do with ## stcat, since we need to trim trailing whitespace from each ## individual line in the file. - with open(untrusted_arg, "r", encoding="utf-8") as untrusted_file: + with open( + untrusted_arg, "r", encoding="utf-8", errors="replace" + ) as untrusted_file: for untrusted_line in untrusted_file: stdout.write(stdisplay(untrusted_line).rstrip() + "\n") stdout.flush() diff --git a/usr/lib/python3/dist-packages/stdisplay/stecho.py b/usr/lib/python3/dist-packages/stdisplay/stecho.py index 4f3c02de..a48c6e70 100755 --- a/usr/lib/python3/dist-packages/stdisplay/stecho.py +++ b/usr/lib/python3/dist-packages/stdisplay/stecho.py @@ -7,17 +7,17 @@ """Safely print argument to stdout with echo's formatting.""" -from sys import argv, stdout +import sys from stdisplay.stdisplay import stdisplay def main() -> None: """Safely print argument to stdout with echo's formatting.""" - if len(argv) > 1: - untrusted_text = " ".join(argv[1:]) - stdout.write(stdisplay(untrusted_text)) - stdout.write("\n") - stdout.flush() + if len(sys.argv) > 1: + untrusted_text = " ".join(sys.argv[1:]) + sys.stdout.write(stdisplay(untrusted_text)) + sys.stdout.write("\n") + sys.stdout.flush() if __name__ == "__main__": diff --git a/usr/lib/python3/dist-packages/stdisplay/stsponge.py b/usr/lib/python3/dist-packages/stdisplay/stsponge.py index 21035c9d..6ab5b08b 100755 --- a/usr/lib/python3/dist-packages/stdisplay/stsponge.py +++ b/usr/lib/python3/dist-packages/stdisplay/stsponge.py @@ -7,8 +7,8 @@ """Safely print stdin to stdout or file.""" +import os from sys import argv, stdin, stdout, modules -import shutil from tempfile import NamedTemporaryFile from stdisplay.stdisplay import stdisplay @@ -17,7 +17,7 @@ def main() -> None: """Safely print stdin to stdout or file.""" # https://github.com/pytest-dev/pytest/issues/4843 if "pytest" not in modules and stdin is not None: - stdin.reconfigure(errors="ignore") # type: ignore + stdin.reconfigure(errors="replace") # type: ignore untrusted_text_list = [] if stdin is not None: for untrusted_text in stdin: @@ -29,9 +29,22 @@ def main() -> None: with NamedTemporaryFile(mode="w", delete=False) as temp_file: temp_file.write(stdisplay("".join(untrusted_text_list))) temp_file.flush() - for file in argv[1:]: - shutil.copy2(temp_file.name, file) - temp_file.close() + temp_path = temp_file.name + try: + with open(temp_path, "rb") as source_file: + content = source_file.read() + for file in argv[1:]: + fd = os.open( + file, + os.O_CREAT | os.O_WRONLY | os.O_TRUNC | os.O_NOFOLLOW, + 0o600, + ) + with os.fdopen(fd, "wb") as destination_file: + destination_file.write(content) + destination_file.flush() + os.fchmod(destination_file.fileno(), 0o600) + finally: + os.unlink(temp_path) if __name__ == "__main__": diff --git a/usr/lib/python3/dist-packages/stdisplay/sttee.py b/usr/lib/python3/dist-packages/stdisplay/sttee.py index 77933d17..e805c709 100755 --- a/usr/lib/python3/dist-packages/stdisplay/sttee.py +++ b/usr/lib/python3/dist-packages/stdisplay/sttee.py @@ -5,7 +5,8 @@ ## ## SPDX-License-Identifier: AGPL-3.0-or-later -"""Safely print stdin to stdout and file.""" +"""Safely print stdin to stdout and file without following symlinks.""" +import os from sys import argv, stdin, stdout, modules from stdisplay.stdisplay import stdisplay @@ -14,17 +15,29 @@ def main() -> None: """Safely print stdin to stdout and file.""" # https://github.com/pytest-dev/pytest/issues/4843 if "pytest" not in modules and stdin is not None: - stdin.reconfigure(errors="ignore") # type: ignore - untrusted_text_list = [] - if stdin is not None: - for untrusted_text in stdin: - untrusted_text_list.append(untrusted_text) - stdout.write(stdisplay(untrusted_text)) - stdout.flush() - if len(argv) > 1: - for file_arg in argv[1:]: - with open(file_arg, mode="w", encoding="ascii") as file: - file.write(stdisplay("".join(untrusted_text_list))) + stdin.reconfigure(errors="replace") # type: ignore + output_files = [] + try: + if len(argv) > 1: + for file_arg in argv[1:]: + fd = os.open( + file_arg, + os.O_CREAT | os.O_WRONLY | os.O_TRUNC | os.O_NOFOLLOW, + 0o600, + ) + output_files.append(os.fdopen(fd, mode="w", encoding="ascii")) + if stdin is not None: + for untrusted_text in stdin: + rendered_text = stdisplay(untrusted_text) + stdout.write(rendered_text) + for output_file in output_files: + output_file.write(rendered_text) + stdout.flush() + for output_file in output_files: + output_file.flush() + finally: + for output_file in output_files: + output_file.close() if __name__ == "__main__": diff --git a/usr/lib/python3/dist-packages/stdisplay/tests/__init__.py b/usr/lib/python3/dist-packages/stdisplay/tests/__init__.py index 0c7cf256..b5086e1d 100644 --- a/usr/lib/python3/dist-packages/stdisplay/tests/__init__.py +++ b/usr/lib/python3/dist-packages/stdisplay/tests/__init__.py @@ -34,6 +34,9 @@ class TestSTBase(unittest.TestCase): def __init__(self, *args: Any, **kwargs: Any) -> None: self.text_dirty = "\x1b[0mTest\x1b[2Kor\x1b]1;is\x1b\n[m" self.text_dirty_sanitized = "\x1b[0mTest_[2Kor_]1;is_\n[m" + self.text_malicious = "pre\u202e.js\u200b\x00post\n" + self.text_malicious_sanitized = "pre_.js__post\n" + self.text_malicious_file_sanitized = "pre___.js____post\n" super().__init__(*args, **kwargs) def setUp(self) -> None: @@ -64,6 +67,20 @@ def setUp(self) -> None: "fill2": self.tmpfiles_list[5], } + invalid_path = os.path.join(self.tmpdir, "invalid") + with open(invalid_path, "wb") as file: + file.write(b"a\xffb\n") + file.flush() + file.close() + self.tmpfiles["invalid"] = invalid_path + + malicious_path = os.path.join(self.tmpdir, "malicious") + with open(malicious_path, "w", encoding="utf-8") as file: + file.write(self.text_malicious) + file.flush() + file.close() + self.tmpfiles["malicious"] = malicious_path + def tearDown(self) -> None: shutil.rmtree(self.tmpdir) diff --git a/usr/lib/python3/dist-packages/stdisplay/tests/stcat.py b/usr/lib/python3/dist-packages/stdisplay/tests/stcat.py index df7293db..864e2b3a 100644 --- a/usr/lib/python3/dist-packages/stdisplay/tests/stcat.py +++ b/usr/lib/python3/dist-packages/stdisplay/tests/stcat.py @@ -52,6 +52,11 @@ def test_stcat_file(self) -> None: [self.tmpfiles["raw"], self.tmpfiles["newline"]], ), (self.text_dirty_sanitized, [self.tmpfiles["dirty"]]), + ( + self.text_malicious_file_sanitized, + [self.tmpfiles["malicious"]], + ), + ("a_b\n", [self.tmpfiles["invalid"]]), ] for text, argv in cases: with self.subTest(text=text, argv=argv): diff --git a/usr/lib/python3/dist-packages/stdisplay/tests/stcatn.py b/usr/lib/python3/dist-packages/stdisplay/tests/stcatn.py index b330bd28..58b99065 100644 --- a/usr/lib/python3/dist-packages/stdisplay/tests/stcatn.py +++ b/usr/lib/python3/dist-packages/stdisplay/tests/stcatn.py @@ -30,6 +30,11 @@ def test_stcatn_file(self) -> None: [self.tmpfiles["raw"], self.tmpfiles["raw"]], ), (self.text_dirty_sanitized + "\n", [self.tmpfiles["dirty"]]), + ( + self.text_malicious_sanitized, + [self.tmpfiles["malicious"]], + ), + ("a_b\n", [self.tmpfiles["invalid"]]), ] for text, argv in cases: with self.subTest(text=text, argv=argv): diff --git a/usr/lib/python3/dist-packages/stdisplay/tests/stecho.py b/usr/lib/python3/dist-packages/stdisplay/tests/stecho.py index f943ce38..40676d0b 100644 --- a/usr/lib/python3/dist-packages/stdisplay/tests/stecho.py +++ b/usr/lib/python3/dist-packages/stdisplay/tests/stecho.py @@ -32,6 +32,10 @@ def test_stecho(self) -> None: self.text_dirty_sanitized + "\n", self._test_util(argv=[self.text_dirty]), ) + self.assertEqual( + self.text_malicious_sanitized + "\n", + self._test_util(argv=[self.text_malicious]), + ) if __name__ == "__main__": diff --git a/usr/lib/python3/dist-packages/stdisplay/tests/stsponge.py b/usr/lib/python3/dist-packages/stdisplay/tests/stsponge.py index 354b11ca..e7726621 100644 --- a/usr/lib/python3/dist-packages/stdisplay/tests/stsponge.py +++ b/usr/lib/python3/dist-packages/stdisplay/tests/stsponge.py @@ -6,8 +6,12 @@ ## ## SPDX-License-Identifier: AGPL-3.0-or-later +import importlib +import io +import sys import unittest from pathlib import Path +from unittest.mock import patch import stdisplay.tests @@ -61,6 +65,68 @@ def test_stsponge(self) -> None: Path(self.tmpfiles["fill2"]).read_text(encoding="utf-8"), ) + def test_stsponge_replaces_invalid_bytes(self) -> None: + """Invalid bytes are sanitized for stdout and files.""" + + output_path = Path(self.tmpfiles["fill"]) + invalid_bytes = b"a\xffb\n" + stdout_capture = io.StringIO() + pytest_module = sys.modules.pop("pytest", None) + try: + with patch.object(sys, "argv", ["stsponge.py", str(output_path)]), patch( + "sys.stdin", + io.TextIOWrapper(io.BytesIO(invalid_bytes), encoding="utf-8"), + ), patch("sys.stdout", stdout_capture): + self._del_module() + importlib.import_module("stdisplay.stsponge").main() + finally: + if pytest_module is not None: + sys.modules["pytest"] = pytest_module + + self.assertEqual("", stdout_capture.getvalue()) + self.assertEqual("a_b\n", output_path.read_text(encoding="ascii")) + + stdout_capture = io.StringIO() + pytest_module = sys.modules.pop("pytest", None) + try: + with patch.object(sys, "argv", ["stsponge.py"]), patch( + "sys.stdin", + io.TextIOWrapper(io.BytesIO(invalid_bytes), encoding="utf-8"), + ), patch("sys.stdout", stdout_capture): + self._del_module() + importlib.import_module("stdisplay.stsponge").main() + finally: + if pytest_module is not None: + sys.modules["pytest"] = pytest_module + + self.assertEqual("a_b\n", stdout_capture.getvalue()) + + def test_stsponge_sanitizes_control_and_bidi(self) -> None: + """Control and bidi characters are sanitized in outputs.""" + + output_path = Path(self.tmpfiles["fill"]) + stdout_capture = io.StringIO() + with patch.object(sys, "argv", ["stsponge.py", str(output_path)]), patch( + "sys.stdin", io.StringIO(self.text_malicious) + ), patch("sys.stdout", stdout_capture): + self._del_module() + importlib.import_module("stdisplay.stsponge").main() + + self.assertEqual("", stdout_capture.getvalue()) + self.assertEqual( + self.text_malicious_sanitized, + output_path.read_text(encoding="utf-8"), + ) + + stdout_capture = io.StringIO() + with patch.object(sys, "argv", ["stsponge.py"]), patch( + "sys.stdin", io.StringIO(self.text_malicious) + ), patch("sys.stdout", stdout_capture): + self._del_module() + importlib.import_module("stdisplay.stsponge").main() + + self.assertEqual(self.text_malicious_sanitized, stdout_capture.getvalue()) + if __name__ == "__main__": unittest.main() diff --git a/usr/lib/python3/dist-packages/stdisplay/tests/sttee.py b/usr/lib/python3/dist-packages/stdisplay/tests/sttee.py index c931bd25..29c12b90 100644 --- a/usr/lib/python3/dist-packages/stdisplay/tests/sttee.py +++ b/usr/lib/python3/dist-packages/stdisplay/tests/sttee.py @@ -6,8 +6,12 @@ ## ## SPDX-License-Identifier: AGPL-3.0-or-later +import importlib +import io +import sys import unittest from pathlib import Path +from unittest.mock import patch import stdisplay.tests @@ -61,6 +65,46 @@ def test_sttee(self) -> None: Path(self.tmpfiles["fill2"]).read_text(encoding="utf-8"), ) + def test_sttee_replaces_invalid_bytes(self) -> None: + """Invalid bytes are surfaced as sanitized underscores.""" + + output_path = Path(self.tmpfiles["fill"]) + invalid_bytes = b"a\xffb\n" + stdout_capture = io.StringIO() + pytest_module = sys.modules.pop("pytest", None) + try: + with patch.object(sys, "argv", ["sttee.py", str(output_path)]), patch( + "sys.stdin", + io.TextIOWrapper(io.BytesIO(invalid_bytes), encoding="utf-8"), + ), patch("sys.stdout", stdout_capture): + self._del_module() + importlib.import_module("stdisplay.sttee").main() + finally: + if pytest_module is not None: + sys.modules["pytest"] = pytest_module + + self.assertEqual("a_b\n", stdout_capture.getvalue()) + self.assertEqual("a_b\n", output_path.read_text(encoding="ascii")) + + def test_sttee_sanitizes_control_and_bidi(self) -> None: + """Dangerous control characters are sanitized before writing.""" + + output_path = Path(self.tmpfiles["fill"]) + stdout_capture = io.StringIO() + with patch.object( + sys, "argv", ["sttee.py", str(output_path)] + ), patch("sys.stdin", io.StringIO(self.text_malicious)), patch( + "sys.stdout", stdout_capture + ): + self._del_module() + importlib.import_module("stdisplay.sttee").main() + + self.assertEqual(self.text_malicious_sanitized, stdout_capture.getvalue()) + self.assertEqual( + self.text_malicious_sanitized, + output_path.read_text(encoding="ascii"), + ) + if __name__ == "__main__": unittest.main()