From 9ed509c1ae8793de646eefca0761d65abd334843 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6ren=20Nikolaus?= <31724172+srnnkls@users.noreply.github.com> Date: Sun, 26 Oct 2025 09:15:58 +0100 Subject: [PATCH 1/4] Update proxy.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/deigma/proxy.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/deigma/proxy.py b/src/deigma/proxy.py index 6ba5d20..c161a40 100644 --- a/src/deigma/proxy.py +++ b/src/deigma/proxy.py @@ -77,7 +77,7 @@ def _wrap_core_schema(schema: CoreSchema) -> CoreSchema: # Build wrapped schema match schema: - # someting we can reference to (e.g. BaseModel, Dataclass, ...) + # something we can reference to (e.g. BaseModel, Dataclass, ...) case {"ref": ref}: wrapped_schema = core_schema.definitions_schema( schema=core_schema.definition_reference_schema( From 6eb67e13c0b6db79ef19ef0644414058923a3732 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 26 Oct 2025 09:37:49 +0000 Subject: [PATCH 2/4] Fix thread safety in SerializationProxy MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Comprehensive thread-safety improvements to prevent race conditions: 1. Global schema cache protection: - Added RLock for _wrapped_schema_cache - Store (original_schema, wrapped_schema) tuples to prevent id() reuse bugs - Verify schema identity on cache hits 2. Class-level proxy type cache protection: - Added RLock for _proxy_type_cache - Guard against id() reuse with identity checks - Double-check locking pattern to prevent duplicate proxy type creation 3. Per-instance attribute cache protection: - Added RLock for _attr_cache in each proxy instance - Protected __getattr__ and __getitem__ cache access - Use re-entrant locks to prevent deadlocks from __repr__ 4. Serialized data immutability: - Wrap dict results in MappingProxyType in build() - Prevents accidental mutation from multiple threads All OrderedDict operations (move_to_end, popitem) are now protected by appropriate locks. Critical sections minimized by building schemas and proxy types outside locks when possible. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- src/deigma/proxy.py | 109 +++++++++++++++++++++++++++----------------- 1 file changed, 67 insertions(+), 42 deletions(-) diff --git a/src/deigma/proxy.py b/src/deigma/proxy.py index c161a40..8eeaf69 100644 --- a/src/deigma/proxy.py +++ b/src/deigma/proxy.py @@ -1,6 +1,7 @@ from collections import OrderedDict from collections.abc import Callable, Iterable, Mapping from copy import deepcopy +from threading import RLock from types import MappingProxyType from typing import Generic, NamedTuple, TypeGuard, TypeVar @@ -61,8 +62,10 @@ def apply_to_unwrapped(proxy: "SerializationProxy[T]") -> T: # Bounded cache for wrapped schemas to prevent memory leaks in long-running applications # Using OrderedDict for LRU eviction +# Store tuple (orig_schema, wrapped_schema) to prevent id() reuse bugs _WRAPPED_SCHEMA_CACHE_SIZE = 256 -_wrapped_schema_cache: OrderedDict[int, CoreSchema] = OrderedDict() +_wrapped_schema_cache: OrderedDict[int, tuple[CoreSchema, CoreSchema]] = OrderedDict() +_WRAPPED_SCHEMA_CACHE_LOCK = RLock() def _wrap_core_schema(schema: CoreSchema) -> CoreSchema: @@ -70,12 +73,16 @@ def _wrap_core_schema(schema: CoreSchema) -> CoreSchema: schema_id = id(schema) # Check cache first (LRU: move to end if found) - if schema_id in _wrapped_schema_cache: - # Move to end (most recently used) - _wrapped_schema_cache.move_to_end(schema_id) - return _wrapped_schema_cache[schema_id] - - # Build wrapped schema + with _WRAPPED_SCHEMA_CACHE_LOCK: + tup = _wrapped_schema_cache.get(schema_id) + if tup is not None: + orig, wrapped = tup + # Guard against id() reuse by verifying identity + if orig is schema: + _wrapped_schema_cache.move_to_end(schema_id) + return wrapped + + # Build wrapped schema (outside lock to minimize critical section) match schema: # something we can reference to (e.g. BaseModel, Dataclass, ...) case {"ref": ref}: @@ -105,12 +112,13 @@ def _wrap_core_schema(schema: CoreSchema) -> CoreSchema: ) # Cache with LRU eviction - _wrapped_schema_cache[schema_id] = wrapped_schema - _wrapped_schema_cache.move_to_end(schema_id) + with _WRAPPED_SCHEMA_CACHE_LOCK: + _wrapped_schema_cache[schema_id] = (schema, wrapped_schema) + _wrapped_schema_cache.move_to_end(schema_id) - # Evict oldest entry if cache is too large - if len(_wrapped_schema_cache) > _WRAPPED_SCHEMA_CACHE_SIZE: - _wrapped_schema_cache.popitem(last=False) + # Evict oldest entry if cache is too large + if len(_wrapped_schema_cache) > _WRAPPED_SCHEMA_CACHE_SIZE: + _wrapped_schema_cache.popitem(last=False) return wrapped_schema @@ -123,6 +131,7 @@ class SerializationProxy(Generic[T]): # Bounded cache for proxy types to prevent memory leaks _PROXY_TYPE_CACHE_SIZE = 256 _proxy_type_cache: OrderedDict[int, type["SerializationProxy"]] = OrderedDict() + _PROXY_TYPE_CACHE_LOCK = RLock() def __init__( self, @@ -136,6 +145,7 @@ def __init__( # Cache for accessed attributes to avoid rebuilding proxies # Keys are either strings (for attributes) or tuples (for items) self._attr_cache: dict[str | tuple, "SerializationProxy"] = {} + self._attr_cache_lock = RLock() @classmethod def _build( @@ -148,30 +158,38 @@ def _build( schema_id = id(core_schema) # Check if we already have a cached proxy type for this schema (LRU) - if schema_id in cls._proxy_type_cache: - # Move to end (most recently used) - cls._proxy_type_cache.move_to_end(schema_id) - proxy_type = cls._proxy_type_cache[schema_id] - else: - # Build new proxy type - wrapped_core_schema = _wrap_core_schema(core_schema) - proxy_type = type( - f"SerializationProxy[{type(obj).__name__}]", - (cls,), - { - "core_schema": core_schema, - "__pydantic_serializer__": SchemaSerializer(wrapped_core_schema), - "__pydantic_core_schema__": wrapped_core_schema, - "__pydantic_validator__": adapter.validator, - }, - ) - # Cache the proxy type with LRU eviction - cls._proxy_type_cache[schema_id] = proxy_type - cls._proxy_type_cache.move_to_end(schema_id) - - # Evict oldest entry if cache is too large - if len(cls._proxy_type_cache) > cls._PROXY_TYPE_CACHE_SIZE: - cls._proxy_type_cache.popitem(last=False) + with cls._PROXY_TYPE_CACHE_LOCK: + proxy_type = cls._proxy_type_cache.get(schema_id) + # Guard against id() reuse by verifying identity + if proxy_type is not None and getattr(proxy_type, "core_schema", None) is core_schema: + cls._proxy_type_cache.move_to_end(schema_id) + return proxy_type(obj, serialized, adapter) + + # Build new proxy type (outside lock to minimize critical section) + wrapped_core_schema = _wrap_core_schema(core_schema) + proxy_type = type( + f"SerializationProxy[{type(obj).__name__}]", + (cls,), + { + "core_schema": core_schema, + "__pydantic_serializer__": SchemaSerializer(wrapped_core_schema), + "__pydantic_core_schema__": wrapped_core_schema, + "__pydantic_validator__": adapter.validator, + }, + ) + + # Publish under lock + with cls._PROXY_TYPE_CACHE_LOCK: + # Re-check if someone else beat us + existing = cls._proxy_type_cache.get(schema_id) + if existing is None or getattr(existing, "core_schema", None) is not core_schema: + cls._proxy_type_cache[schema_id] = proxy_type + cls._proxy_type_cache.move_to_end(schema_id) + # Evict oldest entry if cache is too large + if len(cls._proxy_type_cache) > cls._PROXY_TYPE_CACHE_SIZE: + cls._proxy_type_cache.popitem(last=False) + else: + proxy_type = existing return proxy_type(obj, serialized, adapter) @@ -187,13 +205,17 @@ def build( if adapter is None: adapter = TypeAdapter(type(obj)) serialized = adapter.dump_python(obj) + # If it's a dict, make it read-only to prevent accidental mutation from other threads + if isinstance(serialized, dict): + serialized = MappingProxyType(serialized) core_schema = adapter.core_schema return cls._build(obj, serialized, adapter, core_schema) def __getattr__(self, name: str): # Check attribute cache first - if name in self._attr_cache: - return self._attr_cache[name] + with self._attr_cache_lock: + if name in self._attr_cache: + return self._attr_cache[name] if isinstance(self.serialized, dict) and name in self.serialized: sub_schema = _extract_subschema(self.core_schema, name) @@ -204,15 +226,17 @@ def __getattr__(self, name: str): sub_schema, ) # Cache the built proxy - self._attr_cache[name] = proxy + with self._attr_cache_lock: + self._attr_cache[name] = proxy return proxy return getattr(self.obj, name) def __getitem__(self, key): # For getitem, we use a tuple for cache key to avoid collisions cache_key = ("__item__", key) - if cache_key in self._attr_cache: - return self._attr_cache[cache_key] + with self._attr_cache_lock: + if cache_key in self._attr_cache: + return self._attr_cache[cache_key] sub_schema = _extract_subschema(self.core_schema, key) if type(self.serialized) is type(self.obj): @@ -231,7 +255,8 @@ def __getitem__(self, key): ) # Cache the built proxy - self._attr_cache[cache_key] = proxy + with self._attr_cache_lock: + self._attr_cache[cache_key] = proxy return proxy def __iter__(self): From fe7dc00e5865f2c5dbbc23365b02d0e75c7433c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6ren=20Nikolaus?= Date: Sun, 26 Oct 2025 13:01:42 +0100 Subject: [PATCH 3/4] fix(proxy): Harden cache access and enforce immutability - Add lock-free cache fast path; guard OrderedDict.get and fall back to locked path on race - Replace RLock with Lock for cache guards - Broaden serialized type to Mapping | Iterable | object - Wrap dicts in MappingProxyType to prevent mutation leaks, including child values - Prefer underlying child object when available in __getitem__ --- src/deigma/proxy.py | 75 ++++++++++++++++++++++++++++++++------------- 1 file changed, 53 insertions(+), 22 deletions(-) diff --git a/src/deigma/proxy.py b/src/deigma/proxy.py index 8eeaf69..ce952e4 100644 --- a/src/deigma/proxy.py +++ b/src/deigma/proxy.py @@ -1,7 +1,7 @@ from collections import OrderedDict from collections.abc import Callable, Iterable, Mapping from copy import deepcopy -from threading import RLock +from threading import Lock, RLock from types import MappingProxyType from typing import Generic, NamedTuple, TypeGuard, TypeVar @@ -65,14 +65,24 @@ def apply_to_unwrapped(proxy: "SerializationProxy[T]") -> T: # Store tuple (orig_schema, wrapped_schema) to prevent id() reuse bugs _WRAPPED_SCHEMA_CACHE_SIZE = 256 _wrapped_schema_cache: OrderedDict[int, tuple[CoreSchema, CoreSchema]] = OrderedDict() -_WRAPPED_SCHEMA_CACHE_LOCK = RLock() +_WRAPPED_SCHEMA_CACHE_LOCK = Lock() def _wrap_core_schema(schema: CoreSchema) -> CoreSchema: """Wrap a CoreSchema to make it proxy-aware. Uses bounded LRU cache to avoid expensive deepcopy.""" schema_id = id(schema) - # Check cache first (LRU: move to end if found) + # Lock-free fast path: check cache without lock first + # Note: OrderedDict.get() can raise during concurrent modifications, so we catch any errors + try: + tup = _wrapped_schema_cache.get(schema_id) + if tup is not None and tup[0] is schema: + return tup[1] + except (ValueError, RuntimeError): + # Race condition detected, fall through to locked path + pass + + # Slow path: take lock and recheck with _WRAPPED_SCHEMA_CACHE_LOCK: tup = _wrapped_schema_cache.get(schema_id) if tup is not None: @@ -131,12 +141,12 @@ class SerializationProxy(Generic[T]): # Bounded cache for proxy types to prevent memory leaks _PROXY_TYPE_CACHE_SIZE = 256 _proxy_type_cache: OrderedDict[int, type["SerializationProxy"]] = OrderedDict() - _PROXY_TYPE_CACHE_LOCK = RLock() + _PROXY_TYPE_CACHE_LOCK = Lock() def __init__( self, obj: T, - serialized: MappingProxyType, + serialized: Mapping | Iterable | object, root_adapter: TypeAdapter, ): self.obj = obj @@ -151,13 +161,27 @@ def __init__( def _build( cls, obj: T, - serialized: MappingProxyType, + serialized: Mapping | Iterable | object, adapter: TypeAdapter, core_schema: CoreSchema, ): + # Normalize: wrap dicts to ensure immutability + if isinstance(serialized, dict) and not isinstance(serialized, MappingProxyType): + serialized = MappingProxyType(serialized) + schema_id = id(core_schema) - # Check if we already have a cached proxy type for this schema (LRU) + # Lock-free fast path: check cache without lock first + # Note: OrderedDict.get() can raise during concurrent modifications, so we catch any errors + try: + proxy_type = cls._proxy_type_cache.get(schema_id) + if proxy_type is not None and getattr(proxy_type, "core_schema", None) is core_schema: + return proxy_type(obj, serialized, adapter) + except (ValueError, RuntimeError): + # Race condition detected, fall through to locked path + pass + + # Slow path: take lock and recheck with cls._PROXY_TYPE_CACHE_LOCK: proxy_type = cls._proxy_type_cache.get(schema_id) # Guard against id() reuse by verifying identity @@ -219,9 +243,13 @@ def __getattr__(self, name: str): if isinstance(self.serialized, dict) and name in self.serialized: sub_schema = _extract_subschema(self.core_schema, name) + child_ser = self.serialized[name] + # Wrap child dicts to prevent mutation + if isinstance(child_ser, dict): + child_ser = MappingProxyType(child_ser) proxy = self._build( getattr(self.obj, name), - self.serialized[name], + child_ser, self.root_adapter, sub_schema, ) @@ -239,20 +267,23 @@ def __getitem__(self, key): return self._attr_cache[cache_key] sub_schema = _extract_subschema(self.core_schema, key) - if type(self.serialized) is type(self.obj): - proxy = self._build( - self.obj[key], - self.serialized[key], - self.root_adapter, - sub_schema, - ) - else: - proxy = self._build( - self.serialized[key], - self.serialized[key], - self.root_adapter, - sub_schema, - ) + child_ser = self.serialized[key] + # Wrap child dicts to prevent mutation + if isinstance(child_ser, dict): + child_ser = MappingProxyType(child_ser) + + # Try to keep the real underlying object if possible; otherwise fall back to serialized + try: + child_obj = self.obj[key] + except Exception: + child_obj = child_ser + + proxy = self._build( + child_obj, + child_ser, + self.root_adapter, + sub_schema, + ) # Cache the built proxy with self._attr_cache_lock: From da380eb09946117c74d2366c8d4205b884a953f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6ren=20Nikolaus?= Date: Sun, 26 Oct 2025 19:59:37 +0100 Subject: [PATCH 4/4] fix(proxy): Handle primitive fields and mapping proxies - Return non-dict/list/tuple children directly to preserve field serializers (e.g., PlainSerializer) in __getattr__ and __getitem__ - Accept Mapping and MappingProxyType when checking serialized mappings --- src/deigma/proxy.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/deigma/proxy.py b/src/deigma/proxy.py index ce952e4..9405971 100644 --- a/src/deigma/proxy.py +++ b/src/deigma/proxy.py @@ -241,9 +241,15 @@ def __getattr__(self, name: str): if name in self._attr_cache: return self._attr_cache[name] - if isinstance(self.serialized, dict) and name in self.serialized: + if isinstance(self.serialized, (dict, MappingProxyType, Mapping)) and name in self.serialized: sub_schema = _extract_subschema(self.core_schema, name) child_ser = self.serialized[name] + + # For primitive types (non-dict/list serialized values), return the serialized value directly + # This ensures field serializers (like PlainSerializer) are properly applied + if not isinstance(child_ser, (dict, list, tuple)): + return child_ser + # Wrap child dicts to prevent mutation if isinstance(child_ser, dict): child_ser = MappingProxyType(child_ser) @@ -268,6 +274,12 @@ def __getitem__(self, key): sub_schema = _extract_subschema(self.core_schema, key) child_ser = self.serialized[key] + + # For primitive types (non-dict/list serialized values), return the serialized value directly + # This ensures field serializers (like PlainSerializer) are properly applied + if not isinstance(child_ser, (dict, list, tuple)): + return child_ser + # Wrap child dicts to prevent mutation if isinstance(child_ser, dict): child_ser = MappingProxyType(child_ser)