diff --git a/dlio_benchmark/main.py b/dlio_benchmark/main.py index e2c343c4..0503458d 100644 --- a/dlio_benchmark/main.py +++ b/dlio_benchmark/main.py @@ -28,6 +28,44 @@ from dlio_benchmark.common.enumerations import StorageType as _StorageType +# --------------------------------------------------------------------------- +# Page-cache flush configuration helpers (mlcommons/storage #487). +# --------------------------------------------------------------------------- + +#: Default per-call timeout for `sudo -n sh -c 'echo 3 > /proc/sys/vm/drop_caches'` +#: when DLIO_DROP_CACHES_TIMEOUT is unset, empty, or unparseable. Picked to +#: bound the original mlcommons/storage #391 hang case at a few seconds of +#: cumulative wait across epochs while still completing on most hardware. +_DROP_CACHES_TIMEOUT_DEFAULT_SECONDS = 30 + + +def _resolve_drop_caches_timeout(env=None) -> int: + """Read the page-cache flush timeout, in seconds, from the environment. + + Behavior is deliberately forgiving: any unset / empty / unparseable / + sub-1 value collapses to the default. The lower bound matters because + `subprocess.run(timeout=...)` rejects 0 and negative values at call + time, and we don't want a typo in an operator's env to crash DLIO. + + Args: + env: Mapping to read from. Defaults to ``os.environ``. Exposed for + tests so they don't have to monkey-patch the global env. + + Returns: + Integer >= 1 representing seconds. + """ + if env is None: + env = os.environ + raw = (env.get("DLIO_DROP_CACHES_TIMEOUT") or "").strip() + if not raw: + return _DROP_CACHES_TIMEOUT_DEFAULT_SECONDS + try: + value = int(raw) + except ValueError: + return _DROP_CACHES_TIMEOUT_DEFAULT_SECONDS + return max(value, 1) + + def _apply_settle_guard(args, comm) -> None: """Sleep after data generation for eventual-consistency object stores. @@ -671,26 +709,48 @@ def run(self): self.logger.output(f"{utcnow()} Worker pre-warm complete ({self.args.read_threads} workers spawned)") self.comm.barrier() - # Skip the per-epoch page-cache flush after the first failure so a host - # without NOPASSWD sudo doesn't pay the failure cost on every epoch and - # the warning fires exactly once. See mlcommons/storage issue #391. + # The flush has two distinct failure modes (mlcommons/storage #391, #487): + # + # * sudo -n refuses (no NOPASSWD configured, or sudo missing) + # -> non-zero exit code, fast. Warn once and disable for the + # run so we don't pay the failure cost every epoch. This + # is what #391 originally fixed (the interactive sudo + # prompt that hung for ~16 hours). + # + # * sudo -n authenticated, but the kernel itself is slow + # -> subprocess.TimeoutExpired. Don't disable; the kernel is + # working, just slowly. The next epoch retries. + # + # The per-call timeout is overridable via DLIO_DROP_CACHES_TIMEOUT + # so large-RAM hosts can raise the ceiling without an upstream change. + drop_caches_timeout = _resolve_drop_caches_timeout() drop_caches_disabled = False + drop_caches_slow_warned = False for epoch in dft_ai.pipeline.epoch.iter(range(1, self.epochs + 1), include_iter=False): # Flush page cache before each epoch so reads bypass the OS buffer cache. # Rank 0 does the flush via sudo -n (non-interactive); all ranks barrier- - # wait so no rank starts reading stale cached data. If sudo requires a - # password (or isn't installed) -n exits immediately with non-zero — we - # log a single warning and stop trying. This avoids the interactive - # password prompt that hung issue #391 for ~16 hours. + # wait so no rank starts reading stale cached data. if self.my_rank == 0 and not drop_caches_disabled: try: subprocess.run( ["sudo", "-n", "sh", "-c", "echo 3 > /proc/sys/vm/drop_caches"], - check=True, timeout=30, + check=True, timeout=drop_caches_timeout, stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, ) + except subprocess.TimeoutExpired: + # sudo -n already authenticated (otherwise we'd see a + # quick non-zero exit, not a timeout). The kernel is + # the slow path. Don't disable — the next epoch retries. + if not drop_caches_slow_warned: + self.logger.warning( + f"Page cache flush did not finish within " + f"{drop_caches_timeout}s. The next epoch will " + f"retry. If this recurs, raise the ceiling with " + f"DLIO_DROP_CACHES_TIMEOUT= (e.g. 300)." + ) + drop_caches_slow_warned = True except Exception as exc: drop_caches_disabled = True self.logger.warning( diff --git a/tests/test_drop_caches_timeout.py b/tests/test_drop_caches_timeout.py new file mode 100644 index 00000000..96c43660 --- /dev/null +++ b/tests/test_drop_caches_timeout.py @@ -0,0 +1,87 @@ +"""Tests for the DLIO_DROP_CACHES_TIMEOUT env var parsing (mlcommons/storage #487). + +The helper under test is small and pure: it reads an env mapping and returns an +integer >= 1. We accept a wide range of bad input gracefully (collapse to the +default) because the underlying `subprocess.run(timeout=...)` API rejects 0 / +negative values at call time, and a typo in an operator's environment must not +crash DLIO. +""" + +import pytest + +from dlio_benchmark.main import ( + _DROP_CACHES_TIMEOUT_DEFAULT_SECONDS, + _resolve_drop_caches_timeout, +) + + +_DEFAULT = _DROP_CACHES_TIMEOUT_DEFAULT_SECONDS + + +class TestResolveDropCachesTimeout: + """Unit tests for _resolve_drop_caches_timeout().""" + + def test_default_when_env_var_absent(self): + """No env var set → default.""" + assert _resolve_drop_caches_timeout(env={}) == _DEFAULT + + def test_default_when_env_var_empty(self): + """Empty string → default.""" + assert _resolve_drop_caches_timeout(env={"DLIO_DROP_CACHES_TIMEOUT": ""}) == _DEFAULT + + def test_default_when_env_var_whitespace_only(self): + """All-whitespace value → default.""" + assert _resolve_drop_caches_timeout(env={"DLIO_DROP_CACHES_TIMEOUT": " "}) == _DEFAULT + + @pytest.mark.parametrize("raw", ["not-a-number", "30s", "30.5", "30 ", "5,000", "nan", "True"]) + def test_default_when_value_is_unparseable(self, raw): + """Non-integer text → default (no crash).""" + assert _resolve_drop_caches_timeout(env={"DLIO_DROP_CACHES_TIMEOUT": raw}) == _DEFAULT + + @pytest.mark.parametrize("raw, expected", [ + ("1", 1), + ("60", 60), + ("300", 300), + ("7200", 7200), + ]) + def test_valid_integer_override(self, raw, expected): + """Valid positive integer → that value.""" + assert _resolve_drop_caches_timeout(env={"DLIO_DROP_CACHES_TIMEOUT": raw}) == expected + + def test_whitespace_around_valid_integer_is_stripped(self): + """Leading/trailing whitespace around an int is OK.""" + assert _resolve_drop_caches_timeout(env={"DLIO_DROP_CACHES_TIMEOUT": " 120\t"}) == 120 + + @pytest.mark.parametrize("raw", ["0", "-1", "-300"]) + def test_zero_and_negative_clamped_to_one(self, raw): + """0 and negative values are clamped to 1 (subprocess.run rejects 0/<0).""" + assert _resolve_drop_caches_timeout(env={"DLIO_DROP_CACHES_TIMEOUT": raw}) == 1 + + def test_unrelated_env_vars_ignored(self): + """Other DLIO env vars don't affect the result.""" + env = { + "DLIO_OUTPUT_FOLDER": "/tmp", + "DLIO_MAX_AUTO_THREADS": "4", + "DLIO_DROP_CACHES_TIMEOUT_TYPO": "300", + } + assert _resolve_drop_caches_timeout(env=env) == _DEFAULT + + def test_default_constant_matches_storage_391_history(self): + """Sanity check: the default must remain 30s. + + Lowering it would re-introduce the slow-flush regression + (mlcommons/storage #487). Raising it without coordination would shift + the original sudo-prompt hang window (#391). This test is a + speed-bump that forces a deliberate choice if either changes. + """ + assert _DEFAULT == 30 + + def test_defaults_to_os_environ_when_env_arg_omitted(self, monkeypatch): + """No env= arg → reads from os.environ.""" + monkeypatch.setenv("DLIO_DROP_CACHES_TIMEOUT", "180") + assert _resolve_drop_caches_timeout() == 180 + + def test_defaults_to_os_environ_when_var_unset(self, monkeypatch): + """No env= arg, var unset → default.""" + monkeypatch.delenv("DLIO_DROP_CACHES_TIMEOUT", raising=False) + assert _resolve_drop_caches_timeout() == _DEFAULT