diff --git a/src/filelock/_soft_rw/_sync.py b/src/filelock/_soft_rw/_sync.py index aa04763b..653fad2d 100644 --- a/src/filelock/_soft_rw/_sync.py +++ b/src/filelock/_soft_rw/_sync.py @@ -397,6 +397,40 @@ def _unlink_writer_marker_if_ours(self, token: str) -> None: return _unlink(self._paths.write) + def _claim_writer_marker(self, token: str) -> bool: + # Claim the writer slot for ``token``. Must be called holding ``self._locks.state``. Evicts a + # stale marker first, then refuses to claim while a live ``.write`` exists so a peer holding the + # slot is waited out instead of overwritten. + _break_stale_marker(self._paths.write, stale_threshold=self.stale_threshold, now=time.time()) + if _file_exists(self._paths.write): + return False + try: + _atomic_create_marker(self._paths.write, token) + except FileExistsError: + return False + return True + + def _touch_writer_marker_if_ours(self, token: str) -> bool: + # Refresh the writer marker through a single O_NOFOLLOW fd, but only while it still carries our + # token. Returns False when the marker is gone or now belongs to a peer that reclaimed the slot, + # so the caller can re-claim rather than keep a stranger's marker alive. Mirrors _refresh_marker. + fd = _open_marker(self._paths.write) + if fd is None: + return False + try: + try: + data = os.read(fd, _MAX_MARKER_SIZE + 1) + except OSError: # pragma: no cover - e.g. EAGAIN from a hostile FIFO that has a writer attached + return False + info = _parse_marker_bytes(data) + if info is None or not hmac.compare_digest(info.token, token): + return False + with suppress(OSError): + _touch(self._paths.write, fd=fd) + return True + finally: + os.close(fd) + @classmethod def get_lock( cls, @@ -524,21 +558,21 @@ def _acquire_writer_slot( def try_claim_writer() -> bool: with self._locks.state: - _break_stale_marker(self._paths.write, stale_threshold=self.stale_threshold, now=time.time()) - if _file_exists(self._paths.write): - return False - try: - _atomic_create_marker(self._paths.write, token) - except FileExistsError: - return False - return True + return self._claim_writer_marker(token) def readers_drained_touching() -> bool: with self._locks.state: - # Refresh our writer marker on every scan iteration. Otherwise phase 2 can exceed - # ``stale_threshold`` under contention and a peer would treat us as stale and evict us. - with suppress(OSError): - _touch(self._paths.write) + # Refresh our writer marker every scan iteration so phase 2 does not exceed + # ``stale_threshold`` under contention and get evicted. The refresh only happens while + # the marker is still ours: if we were paused past ``stale_threshold`` a peer can evict + # the stale marker and reclaim ``.write`` with its own token, and touching that path + # blindly would keep the peer's live marker alive and let this acquire finish as though + # we still held the slot, admitting a second writer. When the claim is no longer ours we + # re-claim the slot here (waiting behind the peer if it currently holds it) rather than + # trusting the foreign marker, mirroring the token re-check the stale-break and release + # paths already rely on. + if not self._touch_writer_marker_if_ours(token) and not self._claim_writer_marker(token): + return False self._break_stale_readers(time.time()) return not self._any_readers() diff --git a/tests/soft_rw/test_soft_rw_sync.py b/tests/soft_rw/test_soft_rw_sync.py index a5ae9e18..3d901c40 100644 --- a/tests/soft_rw/test_soft_rw_sync.py +++ b/tests/soft_rw/test_soft_rw_sync.py @@ -625,6 +625,40 @@ def test_release_keeps_a_peers_writer_marker(lock_file: str) -> None: lock.close() +def test_writer_phase2_does_not_complete_on_a_peers_marker(lock_file: str, monkeypatch: pytest.MonkeyPatch) -> None: + # If a writer is paused past stale_threshold during phase 2 (waiting for readers to drain) a peer can + # evict its stale marker and reclaim .write with its own token. Phase 2 must notice the foreign marker + # rather than keep touching it and completing the acquire as if we still held the slot, which would let + # two writers run at once. A live reader keeps phase 2 looping; on the first poll sleep we simulate the + # eviction by overwriting .write with a peer token and draining the reader. + reader = _make_lock(lock_file, heartbeat_interval=10, stale_threshold=40) + reader.acquire_read(timeout=2) + writer = _make_lock(lock_file, heartbeat_interval=10, stale_threshold=40) + write_marker = f"{lock_file}.write" + peer_marker = b"a" * 32 + b"\n1\npeerhost\n" + real_sleep = time.sleep + swapped = threading.Event() + + def hook(seconds: float) -> None: # noqa: ARG001 + if not swapped.is_set(): + swapped.set() + Path(write_marker).write_bytes(peer_marker) + reader.release() + real_sleep(0.005) + + monkeypatch.setattr(sync_mod.time, "sleep", hook) + try: + with pytest.raises(Timeout): + writer.acquire_write(timeout=0.6) + assert swapped.is_set() + # The peer's live marker was never overwritten or kept alive by us. + assert Path(write_marker).read_bytes() == peer_marker + finally: + monkeypatch.setattr(sync_mod.time, "sleep", real_sleep) + writer.close() + reader.close() + + def test_heartbeat_survives_transient_touch_error(lock_file: str, monkeypatch: pytest.MonkeyPatch) -> None: # On the NFS-style filesystems this lock targets a transient ESTALE / EIO on the heartbeat touch is # routine; it must not kill the heartbeat and silently drop the lease while we still believe we hold it.