Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions app/core/lock.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,28 @@
"""Store-level coarse lock helpers."""

from _thread import LockType
import threading
from types import TracebackType
from typing import Optional, Protocol


def create_store_lock() -> LockType:
class StoreLock(Protocol):
"""Minimal lock contract used by the store."""

def acquire(self, blocking: bool = True, timeout: float = -1) -> bool: ...

def release(self) -> None: ...

def __enter__(self) -> bool: ...

def __exit__(
self,
exc_type: Optional[type[BaseException]],
exc: Optional[BaseException],
tb: Optional[TracebackType],
) -> None: ...


def create_store_lock() -> StoreLock:
"""Create the single coarse lock shared by store operations."""

return threading.Lock()
5 changes: 2 additions & 3 deletions app/core/store.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"""Store API and key/value storage implementation."""

import time
from _thread import LockType
from typing import Callable, Optional, Tuple

from app.core.expiration import (
Expand All @@ -10,7 +9,7 @@
is_expired,
ttl_seconds,
)
from app.core.lock import create_store_lock
from app.core.lock import StoreLock, create_store_lock


class Store:
Expand All @@ -19,7 +18,7 @@ class Store:
def __init__(self, clock: Optional[Callable[[], float]] = None) -> None:
self.data_map: dict[str, str] = {}
self.expire_map: dict[str, float] = {}
self.lock: LockType = create_store_lock()
self.lock: StoreLock = create_store_lock()
self._clock = clock if clock is not None else time.time

def get(self, key: str) -> Tuple[bool, Optional[str]]:
Expand Down
4 changes: 1 addition & 3 deletions app/persistence/replay.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@
from .aof import AofEntry, AofParseError


def apply_aof_entry_to_store(
store: StoreProtocol, entry: AofEntry, now: float
) -> None:
def apply_aof_entry_to_store(store: StoreProtocol, entry: AofEntry, now: float) -> None:
"""Apply a single AOF entry to the store. Store.expireat(..., past) deletes the key."""

if entry.command == "SET":
Expand Down
20 changes: 14 additions & 6 deletions app/protocol/resp_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,22 @@

from __future__ import annotations

from typing import BinaryIO
from typing import Protocol


class RespProtocolError(Exception):
"""Raised when a RESP request does not match the supported subset."""


def parse_command_frame(stream: BinaryIO) -> list[str] | None:
class RespReadableStream(Protocol):
"""Binary stream shape required by the RESP parser."""

def read(self, size: int = -1, /) -> bytes | None: ...

def readline(self, size: int = -1, /) -> bytes: ...


def parse_command_frame(stream: RespReadableStream) -> list[str] | None:
"""Parse one RESP command frame from a binary stream.

Returns ``None`` when the peer closes the stream cleanly before sending
Expand Down Expand Up @@ -53,28 +61,28 @@ def parse_command_frame(stream: BinaryIO) -> list[str] | None:
return parts


def _parse_length(stream: BinaryIO, error_message: str) -> int:
def _parse_length(stream: RespReadableStream, error_message: str) -> int:
line = _readline(stream, error_message)
try:
return int(line)
except ValueError as error:
raise RespProtocolError(error_message) from error


def _readline(stream: BinaryIO, error_message: str) -> bytes:
def _readline(stream: RespReadableStream, error_message: str) -> bytes:
line = stream.readline()
if line == b"" or not line.endswith(b"\r\n"):
raise RespProtocolError(error_message)
return line[:-2]


def _read_exact(stream: BinaryIO, size: int) -> bytes:
def _read_exact(stream: RespReadableStream, size: int) -> bytes:
payload = stream.read(size)
if payload is None or len(payload) != size:
raise RespProtocolError("protocol error: incomplete bulk string")
return payload


def _expect_crlf(stream: BinaryIO, error_message: str) -> None:
def _expect_crlf(stream: RespReadableStream, error_message: str) -> None:
if stream.read(2) != b"\r\n":
raise RespProtocolError(error_message)
8 changes: 6 additions & 2 deletions tests/integration/test_protocol_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,9 @@ async def test_malformed_json_maps_to_invalid_request() -> None:
assert fake_executor.calls == []


def test_default_app_starts_with_missing_aof_file(tmp_path: Any, monkeypatch: pytest.MonkeyPatch) -> None:
def test_default_app_starts_with_missing_aof_file(
tmp_path: Any, monkeypatch: pytest.MonkeyPatch
) -> None:
monkeypatch.chdir(tmp_path)

with TestClient(create_app()) as client:
Expand Down Expand Up @@ -251,7 +253,9 @@ def test_default_app_fails_startup_for_malformed_aof(
) -> None:
monkeypatch.chdir(tmp_path)
aof_path = tmp_path / "appendonly.aof"
aof_path.write_text('{"command":"SET","args":["a","1"]}\nnot-json\n', encoding="utf-8")
aof_path.write_text(
'{"command":"SET","args":["a","1"]}\nnot-json\n', encoding="utf-8"
)

with pytest.raises(AofParseError, match="line 2"):
with TestClient(create_app()):
Expand Down
8 changes: 2 additions & 6 deletions tests/unit/test_persistence_aof.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,13 +198,9 @@ def sweep_expired(self) -> int:

store = RecordingStore()
apply_aof_entry_to_store(store, AofEntry("SET", ("k", "v")), 100.0)
apply_aof_entry_to_store(
store, AofEntry("EXPIREAT", ("k", 50.0)), 100.0
)
apply_aof_entry_to_store(store, AofEntry("EXPIREAT", ("k", 50.0)), 100.0)
assert store.calls == [("set", ("k", "v")), ("expireat", ("k", 50.0))]

store.calls.clear()
apply_aof_entry_to_store(
store, AofEntry("EXPIREAT", ("k", 150.0)), 100.0
)
apply_aof_entry_to_store(store, AofEntry("EXPIREAT", ("k", 150.0)), 100.0)
assert store.calls == [("expireat", ("k", 150.0))]
Loading