From a17a2ef4ee83a3b07e46503a6d5b7e85d39715a7 Mon Sep 17 00:00:00 2001 From: akshat4703 Date: Fri, 13 Mar 2026 22:28:03 +0530 Subject: [PATCH 1/3] stabilize ELF analysis and prevent crashes/hangs --- capa/loader.py | 163 ++++++++++++++++++++++++++++++---- capa/main.py | 53 ++++++++++- tests/test_loader_segfault.py | 103 +++++++++++++++++++++ 3 files changed, 303 insertions(+), 16 deletions(-) diff --git a/capa/loader.py b/capa/loader.py index 939680ab7d..a21c36c2f9 100644 --- a/capa/loader.py +++ b/capa/loader.py @@ -16,6 +16,8 @@ import logging import datetime import contextlib +import threading +import signal from typing import Optional from pathlib import Path @@ -50,6 +52,7 @@ from capa.capabilities.common import Capabilities from capa.features.extractors.base_extractor import ( SampleHashes, + FunctionFilter, FeatureExtractor, StaticFeatureExtractor, DynamicFeatureExtractor, @@ -74,6 +77,10 @@ class CorruptFile(ValueError): pass +class _AnalysisTimeoutError(RuntimeError): + pass + + def is_supported_format(sample: Path) -> bool: """ Return if this is a supported file based on magic header values @@ -177,6 +184,83 @@ def _is_probably_corrupt_pe(path: Path) -> bool: return False +def _get_elf_analysis_timeout_seconds() -> int: + """ + Return timeout for viv ELF analysis in seconds. + 0 disables timeout. + """ + value = os.environ.get("CAPA_ELF_ANALYSIS_TIMEOUT_SECONDS", "120").strip() + try: + return max(0, int(value)) + except ValueError: + logger.warning("invalid CAPA_ELF_ANALYSIS_TIMEOUT_SECONDS=%r, using default 120", value) + return 120 + + +def _get_elf_max_functions() -> int: + """ + Return max number of ELF functions to analyze with viv. + 0 disables capping. + """ + value = os.environ.get("CAPA_ELF_MAX_FUNCTIONS", "1000").strip() + try: + return max(0, int(value)) + except ValueError: + logger.warning("invalid CAPA_ELF_MAX_FUNCTIONS=%r, using default 1000", value) + return 1000 + + +@contextlib.contextmanager +def _timebox(seconds: int): + """ + Timebox a block using SIGALRM on platforms that support it. + """ + if ( + seconds <= 0 + or not hasattr(signal, "SIGALRM") + or threading.current_thread() is not threading.main_thread() + ): + yield + return + + def _handle_timeout(signum, frame): + raise _AnalysisTimeoutError(f"analysis exceeded {seconds}s") + + previous_handler = signal.getsignal(signal.SIGALRM) + signal.signal(signal.SIGALRM, _handle_timeout) + signal.setitimer(signal.ITIMER_REAL, float(seconds)) + try: + yield + finally: + signal.setitimer(signal.ITIMER_REAL, 0.0) + signal.signal(signal.SIGALRM, previous_handler) + + +@contextlib.contextmanager +def _temporarily_disable_viv_elf_section_symbols(): + """ + Disable viv's ELF section-symbol parsing while loading a workspace. + + The parser reads large .symtab/.strtab sections very inefficiently and can + cause severe slowdowns on large real-world ELF binaries. + """ + import Elf + + original = getattr(Elf.Elf, "_parseSectionSymbols", None) + if original is None: + yield + return + + def _skip_section_symbols(self): + logger.debug("skipping viv ELF section-symbol parsing") + + Elf.Elf._parseSectionSymbols = _skip_section_symbols + try: + yield + finally: + Elf.Elf._parseSectionSymbols = original + + def get_workspace(path: Path, input_format: str, sigpaths: list[Path]): """ load the program at the given path into a vivisect workspace using the given format. @@ -206,15 +290,24 @@ def get_workspace(path: Path, input_format: str, sigpaths: list[Path]): + " - skipping analysis to avoid excessive resource usage." ) + is_elf_input = False + if input_format == FORMAT_ELF: + is_elf_input = True + elif input_format == FORMAT_AUTO: + with path.open("rb") as f: + is_elf_input = f.read(4).startswith(capa.features.extractors.common.MATCH_ELF) + try: if input_format == FORMAT_AUTO: if not is_supported_format(path): raise UnsupportedFormatError() # don't analyze, so that we can add our Flirt function analyzer first. - vw = viv_utils.getWorkspace(str(path), analyze=False, should_save=False) + with _temporarily_disable_viv_elf_section_symbols() if is_elf_input else contextlib.nullcontext(): + vw = viv_utils.getWorkspace(str(path), analyze=False, should_save=False) elif input_format in {FORMAT_PE, FORMAT_ELF}: - vw = viv_utils.getWorkspace(str(path), analyze=False, should_save=False) + with _temporarily_disable_viv_elf_section_symbols() if is_elf_input else contextlib.nullcontext(): + vw = viv_utils.getWorkspace(str(path), analyze=False, should_save=False) elif input_format == FORMAT_SC32: # these are not analyzed nor saved. vw = viv_utils.getShellcodeWorkspaceFromFile(str(path), arch="i386", analyze=False) @@ -224,6 +317,12 @@ def get_workspace(path: Path, input_format: str, sigpaths: list[Path]): raise ValueError("unexpected format: " + input_format) except envi.exc.SegmentationViolation as e: raise CorruptFile(f"Invalid memory access during binary parsing: {e}") from e + except ModuleNotFoundError as e: + # viv may fail while loading architecture-specific impapi modules. + # treat this as unsupported architecture instead of crashing. + if e.name and e.name.startswith("vivisect.impapi.posix."): + raise UnsupportedArchError() from e + raise except Exception as e: # vivisect raises raw Exception instances, and we don't want # to do a subclass check via isinstance. @@ -240,19 +339,36 @@ def get_workspace(path: Path, input_format: str, sigpaths: list[Path]): viv_utils.flirt.register_flirt_signature_analyzers(vw, [str(s) for s in sigpaths]) - with contextlib.suppress(Exception): - # unfortuately viv raises a raw Exception (not any subclass). - # This happens when the module isn't found, such as with a viv upgrade. - # - # Remove the symbolic switch case solver. - # This is only enabled for ELF files, not PE files. - # During the following performance investigation, this analysis module - # had some terrible worst-case behavior. - # We can put up with slightly worse CFG reconstruction in order to avoid this. - # https://github.com/mandiant/capa/issues/1989#issuecomment-1948022767 - vw.delFuncAnalysisModule("vivisect.analysis.generic.symswitchcase") + if is_elf_input: + for module in ( + # During performance investigations we've observed pathological + # behavior in several viv ELF function-analysis passes. prefer + # slightly reduced CFG reconstruction over indefinite analysis. + "vivisect.analysis.generic.symswitchcase", + "vivisect.analysis.elf.elfplt", + "vivisect.analysis.amd64.emulation", + "vivisect.analysis.generic.emucode", + "vivisect.analysis.generic.noret", + ): + with contextlib.suppress(Exception): + # unfortunately viv raises raw Exception (not any subclass) + # when a module isn't found (e.g. after viv upgrades). + vw.delFuncAnalysisModule(module) - vw.analyze() + try: + timeout_s = _get_elf_analysis_timeout_seconds() if is_elf_input else 0 + with _timebox(timeout_s): + vw.analyze() + except _AnalysisTimeoutError as e: + raise CorruptFile( + f"analysis timed out after {timeout_s}s while processing ELF sample; refusing to hang indefinitely" + ) from e + except ModuleNotFoundError as e: + # viv may fail late when it cannot load an architecture-specific impapi module. + # treat this as an unsupported architecture instead of crashing with a traceback. + if e.name and e.name.startswith("vivisect.impapi.posix."): + raise UnsupportedArchError() from e + raise logger.debug("%s", get_meta_str(vw)) return vw @@ -364,7 +480,24 @@ def get_extractor( else: logger.debug("CAPA_SAVE_WORKSPACE unset, not saving workspace") - return capa.features.extractors.viv.extractor.VivisectFeatureExtractor(vw, input_path, os_) + extractor: FeatureExtractor = capa.features.extractors.viv.extractor.VivisectFeatureExtractor(vw, input_path, os_) + if input_format == FORMAT_ELF: + max_functions = _get_elf_max_functions() + if max_functions > 0: + selected = [] + functions = extractor.get_functions() + for i, f in enumerate(functions): + if i >= max_functions: + logger.warning( + "ELF function count exceeds CAPA_ELF_MAX_FUNCTIONS=%d, limiting analysis scope", + max_functions, + ) + break + selected.append(f.address) + if selected: + extractor = FunctionFilter(extractor, set(selected)) + + return extractor elif backend == BACKEND_FREEZE: return frz.load(input_path.read_bytes()) diff --git a/capa/main.py b/capa/main.py index ce0191d3b1..f6a39bcd4c 100644 --- a/capa/main.py +++ b/capa/main.py @@ -17,8 +17,10 @@ import os import sys import time +import signal import logging import argparse +import threading import textwrap import contextlib from types import TracebackType @@ -140,6 +142,10 @@ logger = logging.getLogger("capa") +class _AnalysisTimeoutError(RuntimeError): + pass + + class FilterConfig(TypedDict, total=False): processes: set[int] functions: set[int] @@ -153,6 +159,42 @@ def timing(msg: str): logger.debug("perf: %s: %0.2fs", msg, t1 - t0) +def _get_elf_total_analysis_timeout_seconds() -> int: + """ + Return timeout for ELF capability matching in seconds. + 0 disables timeout. + """ + value = os.environ.get("CAPA_ELF_TOTAL_ANALYSIS_TIMEOUT_SECONDS", "120").strip() + try: + return max(0, int(value)) + except ValueError: + logger.warning("invalid CAPA_ELF_TOTAL_ANALYSIS_TIMEOUT_SECONDS=%r, using default 120", value) + return 120 + + +@contextlib.contextmanager +def _timebox(seconds: int): + if ( + seconds <= 0 + or not hasattr(signal, "SIGALRM") + or threading.current_thread() is not threading.main_thread() + ): + yield + return + + def _handle_timeout(signum, frame): + raise _AnalysisTimeoutError(f"analysis exceeded {seconds}s") + + previous_handler = signal.getsignal(signal.SIGALRM) + signal.signal(signal.SIGALRM, _handle_timeout) + signal.setitimer(signal.ITIMER_REAL, float(seconds)) + try: + yield + finally: + signal.setitimer(signal.ITIMER_REAL, 0.0) + signal.signal(signal.SIGALRM, previous_handler) + + def set_vivisect_log_level(level): logging.getLogger("vivisect").setLevel(level) logging.getLogger("vivisect.base").setLevel(level) @@ -1039,7 +1081,16 @@ def main(argv: Optional[list[str]] = None): except ShouldExitError as e: return e.status_code - capabilities: Capabilities = find_capabilities(rules, extractor, disable_progress=args.quiet) + try: + timeout_s = _get_elf_total_analysis_timeout_seconds() if (input_format == FORMAT_ELF and backend == BACKEND_VIV) else 0 + with _timebox(timeout_s): + capabilities = find_capabilities(rules, extractor, disable_progress=args.quiet) + except _AnalysisTimeoutError: + logger.error( + "analysis timed out after %ds while matching capabilities for ELF sample; refusing to hang indefinitely", + timeout_s, + ) + return E_FILE_LIMITATION meta: rdoc.Metadata = capa.loader.collect_metadata( argv, args.input_file, input_format, os_, args.rules, extractor, capabilities diff --git a/tests/test_loader_segfault.py b/tests/test_loader_segfault.py index 7d8dc20ed1..f6cf306ec8 100644 --- a/tests/test_loader_segfault.py +++ b/tests/test_loader_segfault.py @@ -17,8 +17,10 @@ import pytest import envi.exc +import capa.loader from capa.loader import CorruptFile, get_workspace +from capa.exceptions import UnsupportedArchError from capa.features.common import FORMAT_PE, FORMAT_ELF @@ -58,3 +60,104 @@ def test_corrupt_pe_with_unrealistic_section_size_short_circuits(): # vivisect should never have been called mock_workspace.assert_not_called() + + +def test_elf_workspace_temporarily_disables_section_symbol_parsing(): + """ + Test that loading ELF in viv temporarily disables section-symbol parsing + and restores the original parser after workspace creation. + """ + import Elf + + fake_path = Path("/tmp/fake.elf") + original = Elf.Elf._parseSectionSymbols + observed = {} + removed_modules = [] + + class FakeWorkspace: + metadata = {} + + def delFuncAnalysisModule(self, _): + removed_modules.append(_) + return None + + def analyze(self): + return None + + def getFunctions(self): + return [] + + def fake_get_workspace(*args, **kwargs): + observed["during"] = Elf.Elf._parseSectionSymbols + return FakeWorkspace() + + with patch("viv_utils.getWorkspace", side_effect=fake_get_workspace): + get_workspace(fake_path, FORMAT_ELF, []) + + assert observed["during"] is not original + assert Elf.Elf._parseSectionSymbols is original + assert "vivisect.analysis.generic.symswitchcase" in removed_modules + assert "vivisect.analysis.elf.elfplt" in removed_modules + assert "vivisect.analysis.amd64.emulation" in removed_modules + assert "vivisect.analysis.generic.emucode" in removed_modules + assert "vivisect.analysis.generic.noret" in removed_modules + + +def test_viv_module_not_found_maps_to_unsupported_arch(): + """ + Test that viv architecture-specific impapi import errors are converted + to UnsupportedArchError. + """ + fake_path = Path("/tmp/fake.elf") + + class FakeWorkspace: + metadata = {} + + def delFuncAnalysisModule(self, _): + return None + + def analyze(self): + raise ModuleNotFoundError( + "No module named 'vivisect.impapi.posix.a64'", + name="vivisect.impapi.posix.a64", + ) + + with patch("viv_utils.getWorkspace", return_value=FakeWorkspace()): + with pytest.raises(UnsupportedArchError): + get_workspace(fake_path, FORMAT_ELF, []) + + +def test_viv_workspace_module_not_found_maps_to_unsupported_arch(): + """ + Test that impapi import failures during workspace creation are converted + to UnsupportedArchError. + """ + fake_path = Path("/tmp/fake.elf") + err = ModuleNotFoundError( + "No module named 'vivisect.impapi.posix.a64'", + name="vivisect.impapi.posix.a64", + ) + + with patch("viv_utils.getWorkspace", side_effect=err): + with pytest.raises(UnsupportedArchError): + get_workspace(fake_path, FORMAT_ELF, []) + + +def test_elf_analysis_timeout_maps_to_corrupt_file(): + """ + Test that ELF analysis timeout is converted to CorruptFile. + """ + fake_path = Path("/tmp/fake.elf") + + class FakeWorkspace: + metadata = {} + + def delFuncAnalysisModule(self, _): + return None + + def analyze(self): + raise capa.loader._AnalysisTimeoutError("analysis exceeded timeout") + + with patch("viv_utils.getWorkspace", return_value=FakeWorkspace()): + with pytest.raises(CorruptFile, match="analysis timed out"): + get_workspace(fake_path, FORMAT_ELF, []) From 1936147545c5646a9375a5cb24aa51dbb282dc54 Mon Sep 17 00:00:00 2001 From: akshat4703 Date: Fri, 13 Mar 2026 22:31:23 +0530 Subject: [PATCH 2/3] changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index db5fe728ea..6a0a401dd7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,6 +41,7 @@ - render: escape sample-controlled strings before passing to Rich to prevent MarkupError @devs6186 #2699 - rules: handle empty or invalid YAML documents gracefully in `Rule.from_yaml` and `get_rules` @devs6186 #2900 - Fixed insecure deserialization vulnerability in YAML loading @0x1622 (#2770) +- loader/main: harden ELF analysis against hangs and architecture-related viv import failures; cap ELF viv function scope for bounded runtime @devs6186 #2780 - loader: gracefully handle ELF files with unsupported architectures kamranulhaq2002@gmail.com #2800 - loader: handle SegmentationViolation for malformed ELF files @kami922 #2799 - lint: disable rule caching during linting @Maijin #2817 From 186e374406c809f6c04d54224dd81e2446c1dd77 Mon Sep 17 00:00:00 2001 From: akshat4703 Date: Fri, 13 Mar 2026 22:53:06 +0530 Subject: [PATCH 3/3] gimini-asked-changes --- capa/exceptions.py | 4 ++++ capa/helpers.py | 30 ++++++++++++++++++++++++- capa/loader.py | 42 +++++------------------------------ capa/main.py | 37 +++++------------------------- tests/test_loader_segfault.py | 5 ++--- 5 files changed, 46 insertions(+), 72 deletions(-) diff --git a/capa/exceptions.py b/capa/exceptions.py index c00420e100..dfced86464 100644 --- a/capa/exceptions.py +++ b/capa/exceptions.py @@ -43,3 +43,7 @@ class NonExistantFunctionError(ValueError): class NonExistantProcessError(ValueError): pass + + +class AnalysisTimeoutError(RuntimeError): + pass diff --git a/capa/helpers.py b/capa/helpers.py index 27c757dcc6..35f680421c 100644 --- a/capa/helpers.py +++ b/capa/helpers.py @@ -16,10 +16,12 @@ import os import sys import gzip +import signal import ctypes import logging import tempfile import contextlib +import threading import importlib.util from typing import BinaryIO, Iterator, NoReturn from pathlib import Path @@ -42,7 +44,7 @@ TimeRemainingColumn, ) -from capa.exceptions import UnsupportedFormatError +from capa.exceptions import AnalysisTimeoutError, UnsupportedFormatError from capa.features.common import ( FORMAT_PE, FORMAT_CAPE, @@ -76,6 +78,32 @@ log_console: Console = Console(stderr=True) +@contextlib.contextmanager +def timebox(seconds: int): + """ + Timebox a block using SIGALRM on platforms that support it. + """ + if ( + seconds <= 0 + or not hasattr(signal, "SIGALRM") + or threading.current_thread() is not threading.main_thread() + ): + yield + return + + def _handle_timeout(signum, frame): + raise AnalysisTimeoutError(f"analysis exceeded {seconds}s") + + previous_handler = signal.getsignal(signal.SIGALRM) + signal.signal(signal.SIGALRM, _handle_timeout) + signal.setitimer(signal.ITIMER_REAL, float(seconds)) + try: + yield + finally: + signal.setitimer(signal.ITIMER_REAL, 0.0) + signal.signal(signal.SIGALRM, previous_handler) + + def hex(n: int) -> str: """render the given number using upper case hex, like: 0x123ABC""" if n < 0: diff --git a/capa/loader.py b/capa/loader.py index a21c36c2f9..bbb30fd542 100644 --- a/capa/loader.py +++ b/capa/loader.py @@ -16,8 +16,6 @@ import logging import datetime import contextlib -import threading -import signal from typing import Optional from pathlib import Path @@ -25,6 +23,7 @@ from typing_extensions import assert_never import capa.rules +import capa.helpers import capa.version import capa.features.common import capa.features.freeze as frz @@ -33,7 +32,7 @@ import capa.features.extractors.common from capa.rules import RuleSet from capa.engine import MatchResults -from capa.exceptions import UnsupportedOSError, UnsupportedArchError, UnsupportedFormatError +from capa.exceptions import AnalysisTimeoutError, UnsupportedOSError, UnsupportedArchError, UnsupportedFormatError from capa.features.common import ( OS_AUTO, FORMAT_PE, @@ -77,10 +76,6 @@ class CorruptFile(ValueError): pass -class _AnalysisTimeoutError(RuntimeError): - pass - - def is_supported_format(sample: Path) -> bool: """ Return if this is a supported file based on magic header values @@ -210,32 +205,6 @@ def _get_elf_max_functions() -> int: return 1000 -@contextlib.contextmanager -def _timebox(seconds: int): - """ - Timebox a block using SIGALRM on platforms that support it. - """ - if ( - seconds <= 0 - or not hasattr(signal, "SIGALRM") - or threading.current_thread() is not threading.main_thread() - ): - yield - return - - def _handle_timeout(signum, frame): - raise _AnalysisTimeoutError(f"analysis exceeded {seconds}s") - - previous_handler = signal.getsignal(signal.SIGALRM) - signal.signal(signal.SIGALRM, _handle_timeout) - signal.setitimer(signal.ITIMER_REAL, float(seconds)) - try: - yield - finally: - signal.setitimer(signal.ITIMER_REAL, 0.0) - signal.signal(signal.SIGALRM, previous_handler) - - @contextlib.contextmanager def _temporarily_disable_viv_elf_section_symbols(): """ @@ -290,12 +259,13 @@ def get_workspace(path: Path, input_format: str, sigpaths: list[Path]): + " - skipping analysis to avoid excessive resource usage." ) - is_elf_input = False if input_format == FORMAT_ELF: is_elf_input = True elif input_format == FORMAT_AUTO: with path.open("rb") as f: is_elf_input = f.read(4).startswith(capa.features.extractors.common.MATCH_ELF) + else: + is_elf_input = False try: if input_format == FORMAT_AUTO: @@ -357,9 +327,9 @@ def get_workspace(path: Path, input_format: str, sigpaths: list[Path]): try: timeout_s = _get_elf_analysis_timeout_seconds() if is_elf_input else 0 - with _timebox(timeout_s): + with capa.helpers.timebox(timeout_s): vw.analyze() - except _AnalysisTimeoutError as e: + except AnalysisTimeoutError as e: raise CorruptFile( f"analysis timed out after {timeout_s}s while processing ELF sample; refusing to hang indefinitely" ) from e diff --git a/capa/main.py b/capa/main.py index eb205d5ad2..2575944648 100644 --- a/capa/main.py +++ b/capa/main.py @@ -17,10 +17,8 @@ import os import sys import time -import signal import logging import argparse -import threading import textwrap import contextlib from types import TracebackType @@ -74,6 +72,7 @@ log_unsupported_drakvuf_report_error, ) from capa.exceptions import ( + AnalysisTimeoutError, InvalidArgument, EmptyReportError, UnsupportedOSError, @@ -142,10 +141,6 @@ logger = logging.getLogger("capa") -class _AnalysisTimeoutError(RuntimeError): - pass - - class FilterConfig(TypedDict, total=False): processes: set[int] functions: set[int] @@ -172,29 +167,6 @@ def _get_elf_total_analysis_timeout_seconds() -> int: return 120 -@contextlib.contextmanager -def _timebox(seconds: int): - if ( - seconds <= 0 - or not hasattr(signal, "SIGALRM") - or threading.current_thread() is not threading.main_thread() - ): - yield - return - - def _handle_timeout(signum, frame): - raise _AnalysisTimeoutError(f"analysis exceeded {seconds}s") - - previous_handler = signal.getsignal(signal.SIGALRM) - signal.signal(signal.SIGALRM, _handle_timeout) - signal.setitimer(signal.ITIMER_REAL, float(seconds)) - try: - yield - finally: - signal.setitimer(signal.ITIMER_REAL, 0.0) - signal.signal(signal.SIGALRM, previous_handler) - - def set_vivisect_log_level(level): logging.getLogger("vivisect").setLevel(level) logging.getLogger("vivisect.base").setLevel(level) @@ -1080,10 +1052,11 @@ def main(argv: Optional[list[str]] = None): return e.status_code try: - timeout_s = _get_elf_total_analysis_timeout_seconds() if (input_format == FORMAT_ELF and backend == BACKEND_VIV) else 0 - with _timebox(timeout_s): + is_elf_viv_analysis = input_format == FORMAT_ELF and backend == BACKEND_VIV + timeout_s = _get_elf_total_analysis_timeout_seconds() if is_elf_viv_analysis else 0 + with capa.helpers.timebox(timeout_s): capabilities = find_capabilities(rules, extractor, disable_progress=args.quiet) - except _AnalysisTimeoutError: + except AnalysisTimeoutError: logger.error( "analysis timed out after %ds while matching capabilities for ELF sample; refusing to hang indefinitely", timeout_s, diff --git a/tests/test_loader_segfault.py b/tests/test_loader_segfault.py index f6cf306ec8..0f5c712aa8 100644 --- a/tests/test_loader_segfault.py +++ b/tests/test_loader_segfault.py @@ -17,10 +17,9 @@ import pytest import envi.exc -import capa.loader from capa.loader import CorruptFile, get_workspace -from capa.exceptions import UnsupportedArchError +from capa.exceptions import AnalysisTimeoutError, UnsupportedArchError from capa.features.common import FORMAT_PE, FORMAT_ELF @@ -156,7 +155,7 @@ def delFuncAnalysisModule(self, _): return None def analyze(self): - raise capa.loader._AnalysisTimeoutError("analysis exceeded timeout") + raise AnalysisTimeoutError("analysis exceeded timeout") with patch("viv_utils.getWorkspace", return_value=FakeWorkspace()): with pytest.raises(CorruptFile, match="analysis timed out"):