Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 68 additions & 8 deletions dlio_benchmark/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,44 @@
from dlio_benchmark.common.enumerations import StorageType as _StorageType


# ---------------------------------------------------------------------------
# Page-cache flush configuration helpers (mlcommons/storage #487).
# ---------------------------------------------------------------------------

#: Default per-call timeout for `sudo -n sh -c 'echo 3 > /proc/sys/vm/drop_caches'`
#: when DLIO_DROP_CACHES_TIMEOUT is unset, empty, or unparseable. Picked to
#: bound the original mlcommons/storage #391 hang case at a few seconds of
#: cumulative wait across epochs while still completing on most hardware.
_DROP_CACHES_TIMEOUT_DEFAULT_SECONDS = 30


def _resolve_drop_caches_timeout(env=None) -> int:
"""Read the page-cache flush timeout, in seconds, from the environment.

Behavior is deliberately forgiving: any unset / empty / unparseable /
sub-1 value collapses to the default. The lower bound matters because
`subprocess.run(timeout=...)` rejects 0 and negative values at call
time, and we don't want a typo in an operator's env to crash DLIO.

Args:
env: Mapping to read from. Defaults to ``os.environ``. Exposed for
tests so they don't have to monkey-patch the global env.

Returns:
Integer >= 1 representing seconds.
"""
if env is None:
env = os.environ
raw = (env.get("DLIO_DROP_CACHES_TIMEOUT") or "").strip()
if not raw:
return _DROP_CACHES_TIMEOUT_DEFAULT_SECONDS
try:
value = int(raw)
except ValueError:
return _DROP_CACHES_TIMEOUT_DEFAULT_SECONDS
return max(value, 1)


def _apply_settle_guard(args, comm) -> None:
"""Sleep after data generation for eventual-consistency object stores.

Expand Down Expand Up @@ -671,26 +709,48 @@ def run(self):
self.logger.output(f"{utcnow()} Worker pre-warm complete ({self.args.read_threads} workers spawned)")

self.comm.barrier()
# Skip the per-epoch page-cache flush after the first failure so a host
# without NOPASSWD sudo doesn't pay the failure cost on every epoch and
# the warning fires exactly once. See mlcommons/storage issue #391.
# The flush has two distinct failure modes (mlcommons/storage #391, #487):
#
# * sudo -n refuses (no NOPASSWD configured, or sudo missing)
# -> non-zero exit code, fast. Warn once and disable for the
# run so we don't pay the failure cost every epoch. This
# is what #391 originally fixed (the interactive sudo
# prompt that hung for ~16 hours).
#
# * sudo -n authenticated, but the kernel itself is slow
# -> subprocess.TimeoutExpired. Don't disable; the kernel is
# working, just slowly. The next epoch retries.
#
# The per-call timeout is overridable via DLIO_DROP_CACHES_TIMEOUT
# so large-RAM hosts can raise the ceiling without an upstream change.
drop_caches_timeout = _resolve_drop_caches_timeout()
drop_caches_disabled = False
drop_caches_slow_warned = False
for epoch in dft_ai.pipeline.epoch.iter(range(1, self.epochs + 1), include_iter=False):
# Flush page cache before each epoch so reads bypass the OS buffer cache.
# Rank 0 does the flush via sudo -n (non-interactive); all ranks barrier-
# wait so no rank starts reading stale cached data. If sudo requires a
# password (or isn't installed) -n exits immediately with non-zero — we
# log a single warning and stop trying. This avoids the interactive
# password prompt that hung issue #391 for ~16 hours.
# wait so no rank starts reading stale cached data.
if self.my_rank == 0 and not drop_caches_disabled:
try:
subprocess.run(
["sudo", "-n", "sh", "-c", "echo 3 > /proc/sys/vm/drop_caches"],
check=True, timeout=30,
check=True, timeout=drop_caches_timeout,
stdin=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE,
)
except subprocess.TimeoutExpired:
# sudo -n already authenticated (otherwise we'd see a
# quick non-zero exit, not a timeout). The kernel is
# the slow path. Don't disable — the next epoch retries.
if not drop_caches_slow_warned:
self.logger.warning(
f"Page cache flush did not finish within "
f"{drop_caches_timeout}s. The next epoch will "
f"retry. If this recurs, raise the ceiling with "
f"DLIO_DROP_CACHES_TIMEOUT=<seconds> (e.g. 300)."
)
drop_caches_slow_warned = True
except Exception as exc:
drop_caches_disabled = True
self.logger.warning(
Expand Down
87 changes: 87 additions & 0 deletions tests/test_drop_caches_timeout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
"""Tests for the DLIO_DROP_CACHES_TIMEOUT env var parsing (mlcommons/storage #487).

The helper under test is small and pure: it reads an env mapping and returns an
integer >= 1. We accept a wide range of bad input gracefully (collapse to the
default) because the underlying `subprocess.run(timeout=...)` API rejects 0 /
negative values at call time, and a typo in an operator's environment must not
crash DLIO.
"""

import pytest

from dlio_benchmark.main import (
_DROP_CACHES_TIMEOUT_DEFAULT_SECONDS,
_resolve_drop_caches_timeout,
)


_DEFAULT = _DROP_CACHES_TIMEOUT_DEFAULT_SECONDS


class TestResolveDropCachesTimeout:
"""Unit tests for _resolve_drop_caches_timeout()."""

def test_default_when_env_var_absent(self):
"""No env var set → default."""
assert _resolve_drop_caches_timeout(env={}) == _DEFAULT

def test_default_when_env_var_empty(self):
"""Empty string → default."""
assert _resolve_drop_caches_timeout(env={"DLIO_DROP_CACHES_TIMEOUT": ""}) == _DEFAULT

def test_default_when_env_var_whitespace_only(self):
"""All-whitespace value → default."""
assert _resolve_drop_caches_timeout(env={"DLIO_DROP_CACHES_TIMEOUT": " "}) == _DEFAULT

@pytest.mark.parametrize("raw", ["not-a-number", "30s", "30.5", "30 ", "5,000", "nan", "True"])
def test_default_when_value_is_unparseable(self, raw):
"""Non-integer text → default (no crash)."""
assert _resolve_drop_caches_timeout(env={"DLIO_DROP_CACHES_TIMEOUT": raw}) == _DEFAULT

@pytest.mark.parametrize("raw, expected", [
("1", 1),
("60", 60),
("300", 300),
("7200", 7200),
])
def test_valid_integer_override(self, raw, expected):
"""Valid positive integer → that value."""
assert _resolve_drop_caches_timeout(env={"DLIO_DROP_CACHES_TIMEOUT": raw}) == expected

def test_whitespace_around_valid_integer_is_stripped(self):
"""Leading/trailing whitespace around an int is OK."""
assert _resolve_drop_caches_timeout(env={"DLIO_DROP_CACHES_TIMEOUT": " 120\t"}) == 120

@pytest.mark.parametrize("raw", ["0", "-1", "-300"])
def test_zero_and_negative_clamped_to_one(self, raw):
"""0 and negative values are clamped to 1 (subprocess.run rejects 0/<0)."""
assert _resolve_drop_caches_timeout(env={"DLIO_DROP_CACHES_TIMEOUT": raw}) == 1

def test_unrelated_env_vars_ignored(self):
"""Other DLIO env vars don't affect the result."""
env = {
"DLIO_OUTPUT_FOLDER": "/tmp",
"DLIO_MAX_AUTO_THREADS": "4",
"DLIO_DROP_CACHES_TIMEOUT_TYPO": "300",
}
assert _resolve_drop_caches_timeout(env=env) == _DEFAULT

def test_default_constant_matches_storage_391_history(self):
"""Sanity check: the default must remain 30s.

Lowering it would re-introduce the slow-flush regression
(mlcommons/storage #487). Raising it without coordination would shift
the original sudo-prompt hang window (#391). This test is a
speed-bump that forces a deliberate choice if either changes.
"""
assert _DEFAULT == 30

def test_defaults_to_os_environ_when_env_arg_omitted(self, monkeypatch):
"""No env= arg → reads from os.environ."""
monkeypatch.setenv("DLIO_DROP_CACHES_TIMEOUT", "180")
assert _resolve_drop_caches_timeout() == 180

def test_defaults_to_os_environ_when_var_unset(self, monkeypatch):
"""No env= arg, var unset → default."""
monkeypatch.delenv("DLIO_DROP_CACHES_TIMEOUT", raising=False)
assert _resolve_drop_caches_timeout() == _DEFAULT
Loading