Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions openviking/parse/parsers/code/ast/code_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,7 @@ def search_symbols(query: str, files: List[Tuple[str, str]]) -> str:
return "\n".join(out)


def _resolve_symbol(
skeleton: CodeSkeleton, symbol: str
) -> Optional[Tuple[str, int, int]]:
def _resolve_symbol(skeleton: CodeSkeleton, symbol: str) -> Optional[Tuple[str, int, int]]:
"""Find a symbol by 'foo' (bare) or 'Foo.bar' (qualified). Case sensitive.

Search priority for bare names (no dot):
Expand Down
4 changes: 1 addition & 3 deletions openviking/parse/parsers/code/ast/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,7 @@ def extract(self, file_name: str, content: str) -> Optional[CodeSkeleton]:
try:
return extractor.extract(file_name, content)
except Exception as e:
logger.warning(
"AST extraction failed for '%s' (language: %s): %s", file_name, lang, e
)
logger.warning("AST extraction failed for '%s' (language: %s): %s", file_name, lang, e)
return None

def extract_skeleton(
Expand Down
36 changes: 13 additions & 23 deletions openviking/parse/parsers/markdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,9 +340,7 @@ async def _compute_layout(
content, frontmatter = self._extract_frontmatter(content)
if frontmatter:
meta["frontmatter"] = frontmatter
logger.debug(
f"[MarkdownParser] Extracted frontmatter: {list(frontmatter.keys())}"
)
logger.debug(f"[MarkdownParser] Extracted frontmatter: {list(frontmatter.keys())}")

explicit_name = kwargs.get("resource_name")
if not explicit_name and kwargs.get("source_name"):
Expand Down Expand Up @@ -593,7 +591,6 @@ async def _ingest_local_images(
seen_paths = set()

for path_str in image_refs:

# Skip remote URIs
if self._is_remote_uri(path_str):
continue
Expand Down Expand Up @@ -629,7 +626,9 @@ async def _ingest_local_images(
image_bytes = await asyncio.to_thread(resolved_path.read_bytes)

# Validate pixel size and file size; skip non-compliant images
if not await asyncio.to_thread(self._is_valid_image, image_bytes, resolved_path):
if not await asyncio.to_thread(
self._is_valid_image, image_bytes, resolved_path
):
continue

# Get filename and deduplicate
Expand All @@ -649,7 +648,9 @@ async def _ingest_local_images(
logger.warning(f"[MarkdownParser] Failed to ingest image {resolved_path}: {e}")

if file_mappings:
rel_md_path = md_uri[len(root_prefix) + 1 :] if md_uri.startswith(root_prefix) else md_uri
rel_md_path = (
md_uri[len(root_prefix) + 1 :] if md_uri.startswith(root_prefix) else md_uri
)
mappings[rel_md_path] = file_mappings

# Write a single mapping file at the root directory for rewrite_image_uris
Expand Down Expand Up @@ -688,9 +689,7 @@ def _resolve_image_path(

# Reject absolute paths: they can point anywhere on the host
if path.is_absolute():
logger.warning(
f"[MarkdownParser] Rejected absolute image path: {path_str}"
)
logger.warning(f"[MarkdownParser] Rejected absolute image path: {path_str}")
return None

# Build the list of allowed roots to confine resolution to.
Expand Down Expand Up @@ -760,9 +759,7 @@ def _is_valid_image(self, image_bytes: bytes, source_path: Path) -> bool:
"""
# File size check (local file path limit: 10 MB)
if len(image_bytes) > self.IMAGE_MAX_FILE_BYTES:
logger.warning(
f"[MarkdownParser] Image exceeds 10MB, skipping: {source_path}"
)
logger.warning(f"[MarkdownParser] Image exceeds 10MB, skipping: {source_path}")
return False

# Pixel size check
Expand Down Expand Up @@ -932,20 +929,15 @@ def _to_rel(new_disk: str, keep_suffix: bool) -> str:
rel
for rel, text in layout.items()
if any(
_gh_slug(h) == anchor
for h in re.findall(r"^#{1,6}\s+(.+)$", text, re.M)
_gh_slug(h) == anchor for h in re.findall(r"^#{1,6}\s+(.+)$", text, re.M)
)
]
if len(hits) == 1:
return _to_rel(
os.path.join(target_parent, hits[0]), keep_suffix=True
)
return _to_rel(os.path.join(target_parent, hits[0]), keep_suffix=True)
# B) Single-file document (a bare file, or a directory holding exactly one
# file) → the anchor/query lives in that one file; keep the suffix.
if len(layout) == 1:
return _to_rel(
os.path.join(target_parent, next(iter(layout))), keep_suffix=True
)
return _to_rel(os.path.join(target_parent, next(iter(layout))), keep_suffix=True)

# C) Single bare file with no suffix to place (e.g. a future small .md kept as
# a file) → point at the file itself (empty suffix ⇒ no trailing slash).
Expand All @@ -971,9 +963,7 @@ async def _ingest_will_handle_image(self, link: str) -> bool:
if resolved is not None:
try:
image_bytes = await asyncio.to_thread(resolved.read_bytes)
handled = await asyncio.to_thread(
self._is_valid_image, image_bytes, resolved
)
handled = await asyncio.to_thread(self._is_valid_image, image_bytes, resolved)
except Exception:
handled = False
cache[link] = handled
Expand Down
149 changes: 148 additions & 1 deletion openviking/server/api_keys/legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
# SPDX-License-Identifier: AGPL-3.0
"""Legacy API Key management (original implementation)."""

import asyncio
import fnmatch
import hashlib
import hmac
import json
import secrets
import time
from datetime import datetime, timezone
from typing import Dict, Optional

Expand Down Expand Up @@ -44,6 +46,16 @@
LEGACY_ARGON2_PARALLELISM = ARGON2_PARALLELISM
LEGACY_ARGON2_HASH_LENGTH = ARGON2_HASH_LENGTH

# In-memory cache TTL. Multi-instance deploys share AGFS state but each instance
# keeps its own dict; without a TTL an account created on instance A would never
# appear on instance B. Cache-miss reload (see resolve_with_refresh) handles the
# urgent case; this constant bounds staleness for already-cached entries.
ACCOUNTS_CACHE_TTL_SECONDS = 30.0

# Bound how often a *known-bad* key can trigger a reload. Without this a stream
# of bogus keys from a misconfigured client would reload AGFS on every request.
UNKNOWN_KEY_RELOAD_COOLDOWN_SECONDS = ACCOUNTS_CACHE_TTL_SECONDS


class LegacyAPIKeyManager:
"""Manages API keys for multi-tenant authentication (legacy implementation)."""
Expand All @@ -69,6 +81,18 @@ def __init__(
self._accounts: Dict[str, AccountInfo] = {}
# Prefix index: key_prefix -> list[UserKeyEntry]
self._prefix_index: Dict[str, list[UserKeyEntry]] = {}
# Monotonic clock at last successful AGFS load. ``None`` means "never
# loaded" — resolve_with_refresh treats this as expired so the first
# cache-miss for an unknown key always triggers a reload.
self._loaded_at: Optional[float] = None
# Lazily created in resolve_with_refresh (constructor may run outside
# an event loop, e.g. test fixtures). Dedupes concurrent reloads so a
# burst of misses for a freshly-created key only hits AGFS once.
self._reload_lock: Optional[asyncio.Lock] = None
# Negative cache: monotonic timestamp of the most recent forced reload
# triggered by *this* unknown key. Bounds the reload rate when a client
# repeatedly presents a key that genuinely doesn't exist.
self._unknown_key_reload_at: Dict[str, float] = {}

def _discard_account_state(self, account_id: str) -> None:
"""Remove an account and its key index entries from in-memory state."""
Expand Down Expand Up @@ -105,7 +129,16 @@ async def _rollback_create_account(self, account_id: str) -> None:
logger.exception("Failed to persist rollback for account %s", account_id)

async def load(self) -> None:
"""Load accounts and user keys from VikingFS into memory."""
"""Load accounts and user keys from VikingFS into memory.

Safe to call repeatedly — clears existing in-memory state first so a
reload picks up writes made by peer instances against shared AGFS.
"""
load_start = time.monotonic()
# Reset in-memory state so peer-instance deletions are also reflected.
self._accounts.clear()
self._prefix_index.clear()

accounts_data = await self._read_json(ACCOUNTS_PATH)
if accounts_data is None:
# First run: create default account
Expand Down Expand Up @@ -172,6 +205,16 @@ async def load(self) -> None:
len(self._accounts),
sum(len(info.users) for info in self._accounts.values()),
)
elapsed = time.monotonic() - load_start
if elapsed > 1.0:
logger.warning(
"LegacyAPIKeyManager.load() took %.2fs (>1s) — consider scoping"
" reloads if storage layer adds per-key reads",
elapsed,
)
self._loaded_at = time.monotonic()
# A successful reload makes any prior negative-cache entries obsolete.
self._unknown_key_reload_at.clear()

def resolve(self, api_key: str) -> ResolvedIdentity:
"""Resolve an API key to identity. Sequential matching: root key first, then user key index."""
Expand Down Expand Up @@ -205,6 +248,104 @@ def resolve(self, api_key: str) -> ResolvedIdentity:

raise UnauthenticatedError("Invalid API Key")

def _cache_is_stale(self) -> bool:
"""Whether the in-memory cache is older than the TTL."""
if self._loaded_at is None:
return True
return (time.monotonic() - self._loaded_at) > ACCOUNTS_CACHE_TTL_SECONDS

def _should_skip_unknown_key_reload(self, api_key: str) -> bool:
"""Whether this unknown key has already triggered a recent reload."""
last = self._unknown_key_reload_at.get(api_key)
if last is None:
return False
return (time.monotonic() - last) < UNKNOWN_KEY_RELOAD_COOLDOWN_SECONDS

def invalidate_cache(self) -> None:
"""Clear the unknown-key cooldown after a local write.

Local writes mutate ``self._accounts`` synchronously so the new state
is already visible to ``resolve``. The only stale piece is the
negative cache: a key looked up *before* it was minted would have
been recorded as unknown and would otherwise sit in cooldown until
the TTL elapsed. Clearing the cooldown lets the just-minted key
resolve on the very next request.
"""
self._unknown_key_reload_at.clear()

async def _reload_locked(self, force: bool = False) -> None:
"""Re-read AGFS state under the dedup lock.

``force=True`` semantics: at least one reload after the call site
observed a stale cache. Concurrent forced reloads dedupe — once any
caller's reload completes inside the lock, later callers see a fresh
``_loaded_at`` and skip.
"""
if self._reload_lock is None:
self._reload_lock = asyncio.Lock()
# Snapshot the load timestamp *before* acquiring the lock so we can
# tell whether a peer caller already refreshed by the time we get in.
observed = self._loaded_at
async with self._reload_lock:
if force:
if self._loaded_at is not None and self._loaded_at != observed:
# Another caller refreshed while we were queued. Done.
return
else:
if not self._cache_is_stale():
return
logger.info("api-key cache reload from AGFS (force=%s)", force)
await self.load()

async def resolve_with_refresh(self, api_key: str) -> ResolvedIdentity:
"""Like ``resolve`` but reloads from AGFS on cache miss / TTL expiry.

Two-pronged freshness for multi-instance deploys:
1. Cache-miss reload — an unknown ``api_key`` triggers one forced
reload before declaring 401, so accounts created on a peer
instance become visible on the next request.
2. TTL refresh — entries older than ``ACCOUNTS_CACHE_TTL_SECONDS``
are refreshed before the lookup, bounding staleness for entries
we already think we know.

Concurrent callers dedupe through ``_reload_lock`` so a thundering
herd of 401s after a fresh account creation only hits AGFS once.
"""
if not api_key:
raise UnauthenticatedError("Missing API Key")

# Periodic refresh — picks up rotations / removals on peer instances.
if self._cache_is_stale():
await self._reload_locked(force=False)

try:
return self.resolve(api_key)
except UnauthenticatedError:
pass

# Cache miss. Avoid hammering AGFS for keys we just confirmed missing
# *and for which a reload has already been performed*. If a peer
# caller is mid-reload, our _reload_locked call below will piggy-back
# on that reload rather than starting a new one.
if self._should_skip_unknown_key_reload(api_key):
raise UnauthenticatedError("Invalid API Key")

observed_loaded_at = self._loaded_at
await self._reload_locked(force=True)
# Only stamp the negative cache *after* the reload resolved the
# question (key truly absent). Stamping before would make peer
# callers in the same burst short-circuit their own reload attempt
# without ever seeing the post-reload state.
try:
return self.resolve(api_key)
except UnauthenticatedError:
# Only count this as a confirmed miss if our caller (or a peer)
# actually reloaded — otherwise a concurrent caller's stamp could
# cause us to mark a key as missing without re-checking AGFS.
if self._loaded_at != observed_loaded_at:
self._unknown_key_reload_at[api_key] = time.monotonic()
raise

async def create_account(
self,
account_id: str,
Expand Down Expand Up @@ -269,6 +410,7 @@ async def create_account(
except Exception:
await self._rollback_create_account(account_id)
raise
self.invalidate_cache()
return key

async def delete_account(self, account_id: str) -> None:
Expand All @@ -279,6 +421,7 @@ async def delete_account(self, account_id: str) -> None:
self._discard_account_state(account_id)

await self._save_accounts_json()
self.invalidate_cache()

async def register_user(self, account_id: str, user_id: str, role: str = "user") -> str:
"""Register a new user in an account. Returns the user's API key (legacy format)."""
Expand Down Expand Up @@ -328,6 +471,7 @@ async def register_user(self, account_id: str, user_id: str, role: str = "user")
self._prefix_index[key_prefix].append(entry)

await self._save_users_json(account_id)
self.invalidate_cache()
return key

async def remove_user(self, account_id: str, user_id: str) -> None:
Expand Down Expand Up @@ -359,6 +503,7 @@ async def remove_user(self, account_id: str, user_id: str) -> None:
del self._prefix_index[key_prefix]

await self._save_users_json(account_id)
self.invalidate_cache()

async def regenerate_key(self, account_id: str, user_id: str) -> str:
"""Regenerate a user's API key. Old key is immediately invalidated."""
Expand Down Expand Up @@ -422,6 +567,7 @@ async def regenerate_key(self, account_id: str, user_id: str) -> str:
self._prefix_index[new_key_prefix].append(entry)

await self._save_users_json(account_id)
self.invalidate_cache()
return new_key

async def set_role(self, account_id: str, user_id: str, role: str) -> None:
Expand Down Expand Up @@ -450,6 +596,7 @@ async def set_role(self, account_id: str, user_id: str, role: str) -> None:
break

await self._save_users_json(account_id)
self.invalidate_cache()

def get_accounts(self) -> list:
"""List all accounts."""
Expand Down
Loading
Loading