Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 46 additions & 12 deletions src/filelock/_soft_rw/_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,40 @@ def _unlink_writer_marker_if_ours(self, token: str) -> None:
return
_unlink(self._paths.write)

def _claim_writer_marker(self, token: str) -> bool:
# Claim the writer slot for ``token``. Must be called holding ``self._locks.state``. Evicts a
# stale marker first, then refuses to claim while a live ``.write`` exists so a peer holding the
# slot is waited out instead of overwritten.
_break_stale_marker(self._paths.write, stale_threshold=self.stale_threshold, now=time.time())
if _file_exists(self._paths.write):
return False
try:
_atomic_create_marker(self._paths.write, token)
except FileExistsError:
return False
return True

def _touch_writer_marker_if_ours(self, token: str) -> bool:
# Refresh the writer marker through a single O_NOFOLLOW fd, but only while it still carries our
# token. Returns False when the marker is gone or now belongs to a peer that reclaimed the slot,
# so the caller can re-claim rather than keep a stranger's marker alive. Mirrors _refresh_marker.
fd = _open_marker(self._paths.write)
if fd is None:
return False
try:
try:
data = os.read(fd, _MAX_MARKER_SIZE + 1)
except OSError: # pragma: no cover - e.g. EAGAIN from a hostile FIFO that has a writer attached
return False
info = _parse_marker_bytes(data)
if info is None or not hmac.compare_digest(info.token, token):
return False
with suppress(OSError):
_touch(self._paths.write, fd=fd)
return True
finally:
os.close(fd)

@classmethod
def get_lock(
cls,
Expand Down Expand Up @@ -524,21 +558,21 @@ def _acquire_writer_slot(

def try_claim_writer() -> bool:
with self._locks.state:
_break_stale_marker(self._paths.write, stale_threshold=self.stale_threshold, now=time.time())
if _file_exists(self._paths.write):
return False
try:
_atomic_create_marker(self._paths.write, token)
except FileExistsError:
return False
return True
return self._claim_writer_marker(token)

def readers_drained_touching() -> bool:
with self._locks.state:
# Refresh our writer marker on every scan iteration. Otherwise phase 2 can exceed
# ``stale_threshold`` under contention and a peer would treat us as stale and evict us.
with suppress(OSError):
_touch(self._paths.write)
# Refresh our writer marker every scan iteration so phase 2 does not exceed
# ``stale_threshold`` under contention and get evicted. The refresh only happens while
# the marker is still ours: if we were paused past ``stale_threshold`` a peer can evict
# the stale marker and reclaim ``.write`` with its own token, and touching that path
# blindly would keep the peer's live marker alive and let this acquire finish as though
# we still held the slot, admitting a second writer. When the claim is no longer ours we
# re-claim the slot here (waiting behind the peer if it currently holds it) rather than
# trusting the foreign marker, mirroring the token re-check the stale-break and release
# paths already rely on.
if not self._touch_writer_marker_if_ours(token) and not self._claim_writer_marker(token):
return False
self._break_stale_readers(time.time())
return not self._any_readers()

Expand Down
34 changes: 34 additions & 0 deletions tests/soft_rw/test_soft_rw_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,40 @@ def test_release_keeps_a_peers_writer_marker(lock_file: str) -> None:
lock.close()


def test_writer_phase2_does_not_complete_on_a_peers_marker(lock_file: str, monkeypatch: pytest.MonkeyPatch) -> None:
# If a writer is paused past stale_threshold during phase 2 (waiting for readers to drain) a peer can
# evict its stale marker and reclaim .write with its own token. Phase 2 must notice the foreign marker
# rather than keep touching it and completing the acquire as if we still held the slot, which would let
# two writers run at once. A live reader keeps phase 2 looping; on the first poll sleep we simulate the
# eviction by overwriting .write with a peer token and draining the reader.
reader = _make_lock(lock_file, heartbeat_interval=10, stale_threshold=40)
reader.acquire_read(timeout=2)
writer = _make_lock(lock_file, heartbeat_interval=10, stale_threshold=40)
write_marker = f"{lock_file}.write"
peer_marker = b"a" * 32 + b"\n1\npeerhost\n"
real_sleep = time.sleep
swapped = threading.Event()

def hook(seconds: float) -> None: # noqa: ARG001
if not swapped.is_set():
swapped.set()
Path(write_marker).write_bytes(peer_marker)
reader.release()
real_sleep(0.005)

monkeypatch.setattr(sync_mod.time, "sleep", hook)
try:
with pytest.raises(Timeout):
writer.acquire_write(timeout=0.6)
assert swapped.is_set()
# The peer's live marker was never overwritten or kept alive by us.
assert Path(write_marker).read_bytes() == peer_marker
finally:
monkeypatch.setattr(sync_mod.time, "sleep", real_sleep)
writer.close()
reader.close()


def test_heartbeat_survives_transient_touch_error(lock_file: str, monkeypatch: pytest.MonkeyPatch) -> None:
# On the NFS-style filesystems this lock targets a transient ESTALE / EIO on the heartbeat touch is
# routine; it must not kill the heartbeat and silently drop the lease while we still believe we hold it.
Expand Down