diff --git a/openviking/parse/parsers/code/ast/code_tools.py b/openviking/parse/parsers/code/ast/code_tools.py index 4873b86239..97004610a2 100644 --- a/openviking/parse/parsers/code/ast/code_tools.py +++ b/openviking/parse/parsers/code/ast/code_tools.py @@ -127,9 +127,7 @@ def search_symbols(query: str, files: List[Tuple[str, str]]) -> str: return "\n".join(out) -def _resolve_symbol( - skeleton: CodeSkeleton, symbol: str -) -> Optional[Tuple[str, int, int]]: +def _resolve_symbol(skeleton: CodeSkeleton, symbol: str) -> Optional[Tuple[str, int, int]]: """Find a symbol by 'foo' (bare) or 'Foo.bar' (qualified). Case sensitive. Search priority for bare names (no dot): diff --git a/openviking/parse/parsers/code/ast/extractor.py b/openviking/parse/parsers/code/ast/extractor.py index fee68b9928..bd4b814830 100644 --- a/openviking/parse/parsers/code/ast/extractor.py +++ b/openviking/parse/parsers/code/ast/extractor.py @@ -107,9 +107,7 @@ def extract(self, file_name: str, content: str) -> Optional[CodeSkeleton]: try: return extractor.extract(file_name, content) except Exception as e: - logger.warning( - "AST extraction failed for '%s' (language: %s): %s", file_name, lang, e - ) + logger.warning("AST extraction failed for '%s' (language: %s): %s", file_name, lang, e) return None def extract_skeleton( diff --git a/openviking/parse/parsers/markdown.py b/openviking/parse/parsers/markdown.py index c53d6cfad1..36532482fb 100644 --- a/openviking/parse/parsers/markdown.py +++ b/openviking/parse/parsers/markdown.py @@ -340,9 +340,7 @@ async def _compute_layout( content, frontmatter = self._extract_frontmatter(content) if frontmatter: meta["frontmatter"] = frontmatter - logger.debug( - f"[MarkdownParser] Extracted frontmatter: {list(frontmatter.keys())}" - ) + logger.debug(f"[MarkdownParser] Extracted frontmatter: {list(frontmatter.keys())}") explicit_name = kwargs.get("resource_name") if not explicit_name and kwargs.get("source_name"): @@ -593,7 +591,6 @@ async def _ingest_local_images( seen_paths = set() for path_str in image_refs: - # Skip remote URIs if self._is_remote_uri(path_str): continue @@ -629,7 +626,9 @@ async def _ingest_local_images( image_bytes = await asyncio.to_thread(resolved_path.read_bytes) # Validate pixel size and file size; skip non-compliant images - if not await asyncio.to_thread(self._is_valid_image, image_bytes, resolved_path): + if not await asyncio.to_thread( + self._is_valid_image, image_bytes, resolved_path + ): continue # Get filename and deduplicate @@ -649,7 +648,9 @@ async def _ingest_local_images( logger.warning(f"[MarkdownParser] Failed to ingest image {resolved_path}: {e}") if file_mappings: - rel_md_path = md_uri[len(root_prefix) + 1 :] if md_uri.startswith(root_prefix) else md_uri + rel_md_path = ( + md_uri[len(root_prefix) + 1 :] if md_uri.startswith(root_prefix) else md_uri + ) mappings[rel_md_path] = file_mappings # Write a single mapping file at the root directory for rewrite_image_uris @@ -688,9 +689,7 @@ def _resolve_image_path( # Reject absolute paths: they can point anywhere on the host if path.is_absolute(): - logger.warning( - f"[MarkdownParser] Rejected absolute image path: {path_str}" - ) + logger.warning(f"[MarkdownParser] Rejected absolute image path: {path_str}") return None # Build the list of allowed roots to confine resolution to. @@ -760,9 +759,7 @@ def _is_valid_image(self, image_bytes: bytes, source_path: Path) -> bool: """ # File size check (local file path limit: 10 MB) if len(image_bytes) > self.IMAGE_MAX_FILE_BYTES: - logger.warning( - f"[MarkdownParser] Image exceeds 10MB, skipping: {source_path}" - ) + logger.warning(f"[MarkdownParser] Image exceeds 10MB, skipping: {source_path}") return False # Pixel size check @@ -932,20 +929,15 @@ def _to_rel(new_disk: str, keep_suffix: bool) -> str: rel for rel, text in layout.items() if any( - _gh_slug(h) == anchor - for h in re.findall(r"^#{1,6}\s+(.+)$", text, re.M) + _gh_slug(h) == anchor for h in re.findall(r"^#{1,6}\s+(.+)$", text, re.M) ) ] if len(hits) == 1: - return _to_rel( - os.path.join(target_parent, hits[0]), keep_suffix=True - ) + return _to_rel(os.path.join(target_parent, hits[0]), keep_suffix=True) # B) Single-file document (a bare file, or a directory holding exactly one # file) → the anchor/query lives in that one file; keep the suffix. if len(layout) == 1: - return _to_rel( - os.path.join(target_parent, next(iter(layout))), keep_suffix=True - ) + return _to_rel(os.path.join(target_parent, next(iter(layout))), keep_suffix=True) # C) Single bare file with no suffix to place (e.g. a future small .md kept as # a file) → point at the file itself (empty suffix ⇒ no trailing slash). @@ -971,9 +963,7 @@ async def _ingest_will_handle_image(self, link: str) -> bool: if resolved is not None: try: image_bytes = await asyncio.to_thread(resolved.read_bytes) - handled = await asyncio.to_thread( - self._is_valid_image, image_bytes, resolved - ) + handled = await asyncio.to_thread(self._is_valid_image, image_bytes, resolved) except Exception: handled = False cache[link] = handled diff --git a/openviking/server/api_keys/legacy.py b/openviking/server/api_keys/legacy.py index 9b4694f3da..a2e4f69253 100644 --- a/openviking/server/api_keys/legacy.py +++ b/openviking/server/api_keys/legacy.py @@ -2,11 +2,13 @@ # SPDX-License-Identifier: AGPL-3.0 """Legacy API Key management (original implementation).""" +import asyncio import fnmatch import hashlib import hmac import json import secrets +import time from datetime import datetime, timezone from typing import Dict, Optional @@ -44,6 +46,16 @@ LEGACY_ARGON2_PARALLELISM = ARGON2_PARALLELISM LEGACY_ARGON2_HASH_LENGTH = ARGON2_HASH_LENGTH +# In-memory cache TTL. Multi-instance deploys share AGFS state but each instance +# keeps its own dict; without a TTL an account created on instance A would never +# appear on instance B. Cache-miss reload (see resolve_with_refresh) handles the +# urgent case; this constant bounds staleness for already-cached entries. +ACCOUNTS_CACHE_TTL_SECONDS = 30.0 + +# Bound how often a *known-bad* key can trigger a reload. Without this a stream +# of bogus keys from a misconfigured client would reload AGFS on every request. +UNKNOWN_KEY_RELOAD_COOLDOWN_SECONDS = ACCOUNTS_CACHE_TTL_SECONDS + class LegacyAPIKeyManager: """Manages API keys for multi-tenant authentication (legacy implementation).""" @@ -69,6 +81,18 @@ def __init__( self._accounts: Dict[str, AccountInfo] = {} # Prefix index: key_prefix -> list[UserKeyEntry] self._prefix_index: Dict[str, list[UserKeyEntry]] = {} + # Monotonic clock at last successful AGFS load. ``None`` means "never + # loaded" — resolve_with_refresh treats this as expired so the first + # cache-miss for an unknown key always triggers a reload. + self._loaded_at: Optional[float] = None + # Lazily created in resolve_with_refresh (constructor may run outside + # an event loop, e.g. test fixtures). Dedupes concurrent reloads so a + # burst of misses for a freshly-created key only hits AGFS once. + self._reload_lock: Optional[asyncio.Lock] = None + # Negative cache: monotonic timestamp of the most recent forced reload + # triggered by *this* unknown key. Bounds the reload rate when a client + # repeatedly presents a key that genuinely doesn't exist. + self._unknown_key_reload_at: Dict[str, float] = {} def _discard_account_state(self, account_id: str) -> None: """Remove an account and its key index entries from in-memory state.""" @@ -105,7 +129,16 @@ async def _rollback_create_account(self, account_id: str) -> None: logger.exception("Failed to persist rollback for account %s", account_id) async def load(self) -> None: - """Load accounts and user keys from VikingFS into memory.""" + """Load accounts and user keys from VikingFS into memory. + + Safe to call repeatedly — clears existing in-memory state first so a + reload picks up writes made by peer instances against shared AGFS. + """ + load_start = time.monotonic() + # Reset in-memory state so peer-instance deletions are also reflected. + self._accounts.clear() + self._prefix_index.clear() + accounts_data = await self._read_json(ACCOUNTS_PATH) if accounts_data is None: # First run: create default account @@ -172,6 +205,16 @@ async def load(self) -> None: len(self._accounts), sum(len(info.users) for info in self._accounts.values()), ) + elapsed = time.monotonic() - load_start + if elapsed > 1.0: + logger.warning( + "LegacyAPIKeyManager.load() took %.2fs (>1s) — consider scoping" + " reloads if storage layer adds per-key reads", + elapsed, + ) + self._loaded_at = time.monotonic() + # A successful reload makes any prior negative-cache entries obsolete. + self._unknown_key_reload_at.clear() def resolve(self, api_key: str) -> ResolvedIdentity: """Resolve an API key to identity. Sequential matching: root key first, then user key index.""" @@ -205,6 +248,104 @@ def resolve(self, api_key: str) -> ResolvedIdentity: raise UnauthenticatedError("Invalid API Key") + def _cache_is_stale(self) -> bool: + """Whether the in-memory cache is older than the TTL.""" + if self._loaded_at is None: + return True + return (time.monotonic() - self._loaded_at) > ACCOUNTS_CACHE_TTL_SECONDS + + def _should_skip_unknown_key_reload(self, api_key: str) -> bool: + """Whether this unknown key has already triggered a recent reload.""" + last = self._unknown_key_reload_at.get(api_key) + if last is None: + return False + return (time.monotonic() - last) < UNKNOWN_KEY_RELOAD_COOLDOWN_SECONDS + + def invalidate_cache(self) -> None: + """Clear the unknown-key cooldown after a local write. + + Local writes mutate ``self._accounts`` synchronously so the new state + is already visible to ``resolve``. The only stale piece is the + negative cache: a key looked up *before* it was minted would have + been recorded as unknown and would otherwise sit in cooldown until + the TTL elapsed. Clearing the cooldown lets the just-minted key + resolve on the very next request. + """ + self._unknown_key_reload_at.clear() + + async def _reload_locked(self, force: bool = False) -> None: + """Re-read AGFS state under the dedup lock. + + ``force=True`` semantics: at least one reload after the call site + observed a stale cache. Concurrent forced reloads dedupe — once any + caller's reload completes inside the lock, later callers see a fresh + ``_loaded_at`` and skip. + """ + if self._reload_lock is None: + self._reload_lock = asyncio.Lock() + # Snapshot the load timestamp *before* acquiring the lock so we can + # tell whether a peer caller already refreshed by the time we get in. + observed = self._loaded_at + async with self._reload_lock: + if force: + if self._loaded_at is not None and self._loaded_at != observed: + # Another caller refreshed while we were queued. Done. + return + else: + if not self._cache_is_stale(): + return + logger.info("api-key cache reload from AGFS (force=%s)", force) + await self.load() + + async def resolve_with_refresh(self, api_key: str) -> ResolvedIdentity: + """Like ``resolve`` but reloads from AGFS on cache miss / TTL expiry. + + Two-pronged freshness for multi-instance deploys: + 1. Cache-miss reload — an unknown ``api_key`` triggers one forced + reload before declaring 401, so accounts created on a peer + instance become visible on the next request. + 2. TTL refresh — entries older than ``ACCOUNTS_CACHE_TTL_SECONDS`` + are refreshed before the lookup, bounding staleness for entries + we already think we know. + + Concurrent callers dedupe through ``_reload_lock`` so a thundering + herd of 401s after a fresh account creation only hits AGFS once. + """ + if not api_key: + raise UnauthenticatedError("Missing API Key") + + # Periodic refresh — picks up rotations / removals on peer instances. + if self._cache_is_stale(): + await self._reload_locked(force=False) + + try: + return self.resolve(api_key) + except UnauthenticatedError: + pass + + # Cache miss. Avoid hammering AGFS for keys we just confirmed missing + # *and for which a reload has already been performed*. If a peer + # caller is mid-reload, our _reload_locked call below will piggy-back + # on that reload rather than starting a new one. + if self._should_skip_unknown_key_reload(api_key): + raise UnauthenticatedError("Invalid API Key") + + observed_loaded_at = self._loaded_at + await self._reload_locked(force=True) + # Only stamp the negative cache *after* the reload resolved the + # question (key truly absent). Stamping before would make peer + # callers in the same burst short-circuit their own reload attempt + # without ever seeing the post-reload state. + try: + return self.resolve(api_key) + except UnauthenticatedError: + # Only count this as a confirmed miss if our caller (or a peer) + # actually reloaded — otherwise a concurrent caller's stamp could + # cause us to mark a key as missing without re-checking AGFS. + if self._loaded_at != observed_loaded_at: + self._unknown_key_reload_at[api_key] = time.monotonic() + raise + async def create_account( self, account_id: str, @@ -269,6 +410,7 @@ async def create_account( except Exception: await self._rollback_create_account(account_id) raise + self.invalidate_cache() return key async def delete_account(self, account_id: str) -> None: @@ -279,6 +421,7 @@ async def delete_account(self, account_id: str) -> None: self._discard_account_state(account_id) await self._save_accounts_json() + self.invalidate_cache() async def register_user(self, account_id: str, user_id: str, role: str = "user") -> str: """Register a new user in an account. Returns the user's API key (legacy format).""" @@ -328,6 +471,7 @@ async def register_user(self, account_id: str, user_id: str, role: str = "user") self._prefix_index[key_prefix].append(entry) await self._save_users_json(account_id) + self.invalidate_cache() return key async def remove_user(self, account_id: str, user_id: str) -> None: @@ -359,6 +503,7 @@ async def remove_user(self, account_id: str, user_id: str) -> None: del self._prefix_index[key_prefix] await self._save_users_json(account_id) + self.invalidate_cache() async def regenerate_key(self, account_id: str, user_id: str) -> str: """Regenerate a user's API key. Old key is immediately invalidated.""" @@ -422,6 +567,7 @@ async def regenerate_key(self, account_id: str, user_id: str) -> str: self._prefix_index[new_key_prefix].append(entry) await self._save_users_json(account_id) + self.invalidate_cache() return new_key async def set_role(self, account_id: str, user_id: str, role: str) -> None: @@ -450,6 +596,7 @@ async def set_role(self, account_id: str, user_id: str, role: str) -> None: break await self._save_users_json(account_id) + self.invalidate_cache() def get_accounts(self) -> list: """List all accounts.""" diff --git a/openviking/server/api_keys/new.py b/openviking/server/api_keys/new.py index 6f97d1dfa8..33e2592761 100644 --- a/openviking/server/api_keys/new.py +++ b/openviking/server/api_keys/new.py @@ -157,6 +157,42 @@ def resolve(self, api_key: str) -> ResolvedIdentity: # Fall back to legacy resolver for legacy keys return self._legacy.resolve(api_key) + async def resolve_with_refresh(self, api_key: str) -> ResolvedIdentity: + """Resolve an API key, reloading from AGFS on cache miss / TTL expiry. + + See ``LegacyAPIKeyManager.resolve_with_refresh`` for the policy. The + new-format fast path goes through the same ``resolve`` body, so + cache-miss reload behaves identically for new and legacy keys. + """ + import time as _time + + if not api_key: + raise UnauthenticatedError("Missing API Key") + + if self._legacy._cache_is_stale(): + await self._legacy._reload_locked(force=False) + + try: + return self.resolve(api_key) + except UnauthenticatedError: + pass + + if self._legacy._should_skip_unknown_key_reload(api_key): + raise UnauthenticatedError("Invalid API Key") + + observed_loaded_at = self._legacy._loaded_at + await self._legacy._reload_locked(force=True) + try: + return self.resolve(api_key) + except UnauthenticatedError: + if self._legacy._loaded_at != observed_loaded_at: + self._legacy._unknown_key_reload_at[api_key] = _time.monotonic() + raise + + def invalidate_cache(self) -> None: + """Proxy to ``LegacyAPIKeyManager.invalidate_cache``.""" + self._legacy.invalidate_cache() + async def create_account( self, account_id: str, @@ -233,6 +269,7 @@ async def create_account( except Exception: await self._legacy._rollback_create_account(account_id) raise + self._legacy.invalidate_cache() return key async def delete_account(self, account_id: str) -> None: @@ -292,6 +329,7 @@ async def register_user(self, account_id: str, user_id: str, role: str = "user") self._legacy._prefix_index[key_prefix].append(entry) await self._legacy._save_users_json(account_id) + self._legacy.invalidate_cache() return key async def remove_user(self, account_id: str, user_id: str) -> None: @@ -366,6 +404,7 @@ async def regenerate_key(self, account_id: str, user_id: str) -> str: self._legacy._prefix_index[new_key_prefix].append(entry) await self._legacy._save_users_json(account_id) + self._legacy.invalidate_cache() return new_key async def set_role(self, account_id: str, user_id: str, role: str) -> None: diff --git a/openviking/server/auth.py b/openviking/server/auth.py index 86316f0e83..ff21e4bf0e 100644 --- a/openviking/server/auth.py +++ b/openviking/server/auth.py @@ -332,7 +332,7 @@ async def resolve_identity( if oauth_identity is not None: return oauth_identity - identity = api_key_manager.resolve(api_key) + identity = await api_key_manager.resolve_with_refresh(api_key) identity.account_id = identity.account_id or "default" identity.user_id = identity.user_id or "default" diff --git a/tests/parse/test_code_tools.py b/tests/parse/test_code_tools.py index 55a2333b26..172f08dfde 100644 --- a/tests/parse/test_code_tools.py +++ b/tests/parse/test_code_tools.py @@ -281,14 +281,14 @@ def test_outline_unsupported_language(self): # --------------------------------------------------------------------------- -SECOND_FILE = '''def greet(): +SECOND_FILE = """def greet(): pass class Other: def helper(self): pass -''' +""" class TestSearchSymbols: diff --git a/tests/server/test_api_keys_cache_invalidation.py b/tests/server/test_api_keys_cache_invalidation.py new file mode 100644 index 0000000000..6d4b2d9bed --- /dev/null +++ b/tests/server/test_api_keys_cache_invalidation.py @@ -0,0 +1,258 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: AGPL-3.0 + +"""Tests for the api-keys cache TTL + cache-miss reload behavior (issue #2351). + +These tests bypass real AGFS by overriding ``_read_json`` / ``_write_json`` on +``LegacyAPIKeyManager`` with an in-memory dict, so they isolate the cache +invalidation logic from the storage backend (which the existing +``test_api_key_manager.py`` covers via a real OpenVikingService). +""" + +import asyncio +import json +from typing import Optional +from unittest.mock import MagicMock + +import pytest + +from openviking.server.api_keys import APIKeyManager +from openviking.server.api_keys import legacy as legacy_module +from openviking_cli.exceptions import UnauthenticatedError + +ROOT_KEY = "test-root-key-abcdef1234567890abcdef1234567890" + + +class _FakeAGFS: + """In-memory AGFS replacement; only the methods the manager uses.""" + + def __init__(self, store: Optional[dict] = None): + self.store: dict = store if store is not None else {} + self.read_count = 0 + + async def _read_json(self, path: str): + self.read_count += 1 + if path not in self.store: + return None + return json.loads(self.store[path]) + + async def _write_json(self, path: str, data: dict): + self.store[path] = json.dumps(data) + + async def _ensure_parent_dirs_async(self, path: str): + return None + + +def _make_manager(fake: _FakeAGFS) -> APIKeyManager: + """Construct an APIKeyManager whose AGFS calls are redirected to ``fake``.""" + # Build a manager with a stubbed VikingFS (we override the I/O methods so + # the real AsyncAGFSClient is never called). + viking_fs = MagicMock() + viking_fs.agfs = MagicMock() + mgr = APIKeyManager(root_key=ROOT_KEY, viking_fs=viking_fs) + # Patch the legacy I/O. The manager exposes _legacy via a property. + mgr._legacy._read_json = fake._read_json # type: ignore[assignment] + mgr._legacy._write_json = fake._write_json # type: ignore[assignment] + mgr._legacy._ensure_parent_dirs_async = fake._ensure_parent_dirs_async # type: ignore[assignment] + return mgr + + +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_unknown_key_triggers_one_reload(): + """A cache-miss on a key that exists in AGFS should trigger one reload and succeed.""" + fake = _FakeAGFS() + + # Instance A: create an account+user. This populates AGFS. + mgr_a = _make_manager(fake) + await mgr_a.load() + user_key = await mgr_a.create_account("acme", "alice") + + # Instance B: a *fresh* manager loads later (not yet aware of acme/alice). + # Simulate "loaded earlier than the create" by loading before the data was + # written — already done above for mgr_a; for mgr_b we just need to + # discard whatever AGFS state is there at load time. + mgr_b = _make_manager(fake) + # Snapshot the store, load with empty store, then restore. + saved = dict(fake.store) + fake.store.clear() + await mgr_b.load() + fake.store.update(saved) + + # Pre-condition: instance B doesn't know the key yet. + with pytest.raises(UnauthenticatedError): + mgr_b.resolve(user_key) + + reads_before = fake.read_count + # The async path with refresh should reload + succeed. + identity = await mgr_b.resolve_with_refresh(user_key) + assert identity.account_id == "acme" + assert identity.user_id == "alice" + # And the reload actually hit AGFS. + assert fake.read_count > reads_before + + +@pytest.mark.asyncio +async def test_ttl_expires_entry(monkeypatch): + """When TTL elapses, the next resolve_with_refresh reloads from AGFS.""" + fake = _FakeAGFS() + mgr = _make_manager(fake) + await mgr.load() + user_key = await mgr.create_account("acme", "alice") + + # Sanity: a known key resolves without further reload. + reads_before = fake.read_count + await mgr.resolve_with_refresh(user_key) + # No miss, no expiry → no reload. + assert fake.read_count == reads_before + + # Advance the monotonic clock past the TTL. + real_monotonic = legacy_module.time.monotonic + offset = legacy_module.ACCOUNTS_CACHE_TTL_SECONDS + 5.0 + monkeypatch.setattr(legacy_module.time, "monotonic", lambda: real_monotonic() + offset) + + reads_before = fake.read_count + await mgr.resolve_with_refresh(user_key) + # TTL elapsed → reload happened (AGFS reads bumped). + assert fake.read_count > reads_before + + +@pytest.mark.asyncio +async def test_local_write_invalidates_immediately(): + """A locally-created account is resolvable on the same instance with no TTL wait. + + This also exercises the negative-cache invalidation: if a key was looked + up *before* the account existed, the failed lookup shouldn't poison the + next lookup post-creation. + """ + fake = _FakeAGFS() + mgr = _make_manager(fake) + await mgr.load() + + # We want to construct a key that will be created next, but we don't know + # what create_account will mint. So instead: try a bogus key first, then + # create, then try the real key. The bogus key seeds the negative cache; + # invalidate_cache should still let the real key resolve immediately. + fake_attempt = "deadbeef" * 8 # 64 hex chars, won't collide with real key + with pytest.raises(UnauthenticatedError): + await mgr.resolve_with_refresh(fake_attempt) + + # Now actually create. + user_key = await mgr.create_account("acme", "alice") + + # Same instance, same process: should resolve without going through TTL. + identity = await mgr.resolve_with_refresh(user_key) + assert identity.account_id == "acme" + assert identity.user_id == "alice" + + +@pytest.mark.asyncio +async def test_concurrent_misses_dedupe_reload(): + """10 concurrent misses for the same unknown key should trigger only one reload.""" + fake = _FakeAGFS() + + # Set up AGFS to contain an account that mgr_b doesn't know about yet. + mgr_a = _make_manager(fake) + await mgr_a.load() + user_key = await mgr_a.create_account("acme", "alice") + + mgr_b = _make_manager(fake) + saved = dict(fake.store) + fake.store.clear() + await mgr_b.load() + fake.store.update(saved) + + # Spy on the storage layer. + real_load = mgr_b._legacy.load + call_count = {"n": 0} + + async def counting_load(): + call_count["n"] += 1 + # Tiny await so concurrent callers can pile up at the lock. + await asyncio.sleep(0.01) + await real_load() + + mgr_b._legacy.load = counting_load # type: ignore[assignment] + + results = await asyncio.gather( + *[mgr_b.resolve_with_refresh(user_key) for _ in range(10)], + return_exceptions=True, + ) + + # All ten succeed (they may resolve before or after the reload, but must + # not raise). + for r in results: + assert not isinstance(r, BaseException), r + assert r.account_id == "acme" + assert r.user_id == "alice" + + # Exactly one forced reload despite 10 concurrent misses. + assert call_count["n"] == 1, f"expected 1 reload, got {call_count['n']}" + + +@pytest.mark.asyncio +async def test_known_invalid_key_does_not_reload_every_time(): + """A key that *isn't* in AGFS shouldn't reload AGFS on every request.""" + fake = _FakeAGFS() + mgr = _make_manager(fake) + await mgr.load() + # Don't create the key in AGFS — it stays unknown. + bad_key = "a" * 64 + + real_load = mgr._legacy.load + reload_count = {"n": 0} + + async def counting_load(): + reload_count["n"] += 1 + await real_load() + + mgr._legacy.load = counting_load # type: ignore[assignment] + + # Hammer with the same bad key 20 times. + for _ in range(20): + with pytest.raises(UnauthenticatedError): + await mgr.resolve_with_refresh(bad_key) + + # First miss reloads; subsequent ones hit the negative cache and skip. + # Allow at most a small constant — anything not bounded is a regression. + assert reload_count["n"] <= 2, f"unbounded reloads on known-bad key: {reload_count['n']}" + + +# --------------------------------------------------------------------------- +# Sanity checks for the underlying primitives. +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_load_is_idempotent(): + """Calling load() twice must not duplicate the prefix index.""" + fake = _FakeAGFS() + mgr = _make_manager(fake) + await mgr.load() + user_key = await mgr.create_account("acme", "alice") + + prefix_size_before = sum(len(v) for v in mgr._legacy._prefix_index.values()) + await mgr._legacy.load() + prefix_size_after = sum(len(v) for v in mgr._legacy._prefix_index.values()) + + assert prefix_size_before == prefix_size_after + # And the key still resolves. + assert mgr.resolve(user_key).account_id == "acme" + + +@pytest.mark.asyncio +async def test_invalidate_cache_clears_negative_cache(): + """invalidate_cache clears _unknown_key_reload_at so a freshly-created key resolves.""" + fake = _FakeAGFS() + mgr = _make_manager(fake) + await mgr.load() + + bad_key = "z" * 64 + with pytest.raises(UnauthenticatedError): + await mgr.resolve_with_refresh(bad_key) + assert bad_key in mgr._legacy._unknown_key_reload_at + + mgr._legacy.invalidate_cache() + assert mgr._legacy._unknown_key_reload_at == {}