Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions openviking/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,13 @@ def __init__(

self.user = UserIdentifier.the_default_user()
self._initialized = False
self._singleton_initialized = True
# Mark initialized only after LocalClient is successfully constructed.
self._singleton_initialized = False

self._client: BaseClient = LocalClient(
path=path,
)
self._singleton_initialized = True

# ============= Lifecycle methods =============

Expand All @@ -78,7 +80,9 @@ async def _ensure_initialized(self):

async def close(self) -> None:
"""Close OpenViking and release resources."""
await self._client.close()
client = getattr(self, "_client", None)
if client is not None:
await client.close()
self._initialized = False
self._singleton_initialized = False

Expand All @@ -88,8 +92,6 @@ async def reset(cls) -> None:
with cls._lock:
if cls._instance is not None:
await cls._instance.close()
cls._instance._initialized = False
cls._instance._singleton_initialized = False
cls._instance = None

# ============= Session methods =============
Expand Down
10 changes: 3 additions & 7 deletions openviking/core/directories.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,13 +228,9 @@ async def _ensure_directory(
logger.debug(f"[VikingFS] Directory {uri} already exists")

# 2. Ensure record exists in vector storage
from openviking_cli.utils.config import get_openviking_config

config = get_openviking_config()

existing = await self.vikingdb.filter(
collection=config.storage.vectordb.name,
filter={"op": "must", "field": "uri", "conds": [uri]},
existing = await self.vikingdb.get_context_by_uri(
account_id=ctx.account_id,
uri=uri,
limit=1,
)
if not existing:
Expand Down
82 changes: 70 additions & 12 deletions openviking/eval/ragas/playback.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,29 +415,87 @@ async def _play_vikingdb_operation(self, record: IORecord) -> PlaybackResult:
kwargs = request.get("kwargs", {})

if operation == "insert":
await self._vector_store.insert(*args, **kwargs)
if args:
payload = args[-1]
else:
payload = kwargs.get("data", request.get("data", {}))
await self._vector_store.upsert(payload)
elif operation == "update":
await self._vector_store.update(*args, **kwargs)
if len(args) >= 3:
record_id = args[-2]
payload = args[-1]
elif len(args) == 2:
record_id = args[0]
payload = args[1]
else:
record_id = kwargs.get("id", request.get("id"))
payload = kwargs.get("data", request.get("data", {}))
existing = await self._vector_store.get([record_id])
if existing:
merged = {**existing[0], **payload, "id": record_id}
await self._vector_store.upsert(merged)
elif operation == "upsert":
await self._vector_store.upsert(*args, **kwargs)
if args:
payload = args[-1]
else:
payload = kwargs.get("data", request.get("data", {}))
await self._vector_store.upsert(payload)
elif operation == "delete":
await self._vector_store.delete(*args, **kwargs)
if args:
ids = args[-1]
else:
ids = kwargs.get("ids", request.get("ids", []))
await self._vector_store.delete(ids)
elif operation == "get":
await self._vector_store.get(*args, **kwargs)
if args:
ids = args[-1]
else:
ids = kwargs.get("ids", request.get("ids", []))
await self._vector_store.get(ids)
elif operation == "exists":
await self._vector_store.exists(*args, **kwargs)
if len(args) >= 2:
record_id = args[-1]
elif len(args) == 1:
record_id = args[0]
else:
record_id = kwargs.get("id", request.get("id"))
await self._vector_store.exists(record_id)
elif operation == "search":
await self._vector_store.search(*args, **kwargs)
if len(args) >= 4:
query_vector = args[1]
limit = args[2]
where = args[3]
elif args:
query_vector = args[0]
limit = kwargs.get("top_k", kwargs.get("limit", 10))
where = kwargs.get("filter")
else:
query_vector = kwargs.get("vector", kwargs.get("query_vector"))
limit = kwargs.get("top_k", kwargs.get("limit", request.get("top_k", 10)))
where = kwargs.get("filter", request.get("filter"))
await self._vector_store.search(
query_vector=query_vector, filter=where, limit=limit
)
elif operation == "filter":
await self._vector_store.filter(*args, **kwargs)
if len(args) >= 4:
where = args[1]
limit = args[2]
offset = args[3]
elif args:
where = args[0]
limit = kwargs.get("limit", 100)
offset = kwargs.get("offset", 0)
else:
where = kwargs.get("filter", request.get("filter", {}))
limit = kwargs.get("limit", request.get("limit", 100))
offset = kwargs.get("offset", request.get("offset", 0))
await self._vector_store.filter(filter=where, limit=limit, offset=offset)
elif operation == "create_collection":
await self._vector_store.create_collection(*args, **kwargs)
elif operation == "drop_collection":
await self._vector_store.drop_collection(*args, **kwargs)
await self._vector_store.drop_collection()
elif operation == "collection_exists":
await self._vector_store.collection_exists(*args, **kwargs)
elif operation == "list_collections":
await self._vector_store.list_collections(*args, **kwargs)
await self._vector_store.collection_exists()
else:
raise ValueError(f"Unknown VikingDB operation: {operation}")

Expand Down
97 changes: 60 additions & 37 deletions openviking/eval/recorder/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,35 @@ def __getattr__(self, name: str) -> Any:
if not callable(original_attr) or name.startswith("_"):
return original_attr
# viking_fs文件操作
if name not in ("ls", "mkdir", "stat", "rm", "mv", "read", "write", "grep", "glob", "tree",
"abstract", "overview", "relations", "link", "unlink",
"write_file", "read_file", "read_file_bytes", "write_file_bytes", "append_file", "move_file",
"delete_temp", "write_context", "get_relations", "get_relations_with_content",
"find", "search",
):
if name not in (
"ls",
"mkdir",
"stat",
"rm",
"mv",
"read",
"write",
"grep",
"glob",
"tree",
"abstract",
"overview",
"relations",
"link",
"unlink",
"write_file",
"read_file",
"read_file_bytes",
"write_file_bytes",
"append_file",
"move_file",
"delete_temp",
"write_context",
"get_relations",
"get_relations_with_content",
"find",
"search",
):
return original_attr

async def wrapped_async(*args, **kwargs):
Expand Down Expand Up @@ -179,6 +202,7 @@ def wrapped_sync(*args, **kwargs):
raise

import inspect

if inspect.iscoroutinefunction(original_attr) or name.startswith("_"):
return wrapped_async

Expand All @@ -201,6 +225,7 @@ def _build_request(self, name: str, args: tuple, kwargs: dict) -> Dict[str, Any]
param_names = []
try:
import inspect

original_attr = getattr(self._fs, name, None)
if original_attr and callable(original_attr):
sig = inspect.signature(original_attr)
Expand All @@ -223,7 +248,7 @@ def _build_request(self, name: str, args: tuple, kwargs: dict) -> Dict[str, Any]

class RecordingVikingDB:
"""
Wrapper for VikingDBInterface that records all operations.
Wrapper for vector store instances that records all operations.

Usage:
from openviking.eval.recorder import init_recorder
Expand All @@ -239,7 +264,7 @@ def __init__(self, viking_db: Any, recorder: Optional[IORecorder] = None):
Initialize wrapper.

Args:
viking_db: VikingDBInterface instance to wrap
viking_db: Vector store instance to wrap
recorder: IORecorder instance (uses global if None)
"""
self._db = viking_db
Expand Down Expand Up @@ -269,7 +294,7 @@ async def insert(self, collection: str, data: Dict[str, Any]) -> str:
request = {"collection": collection, "data": data}
start_time = time.time()
try:
result = await self._db.insert(collection, data)
result = await self._db.upsert(data)
latency_ms = (time.time() - start_time) * 1000
self._record("insert", request, result, latency_ms)
return result
Expand All @@ -283,7 +308,12 @@ async def update(self, collection: str, id: str, data: Dict[str, Any]) -> bool:
request = {"collection": collection, "id": id, "data": data}
start_time = time.time()
try:
result = await self._db.update(collection, id, data)
existing = await self._db.get([id])
if not existing:
result = False
else:
payload = {**existing[0], **data, "id": id}
result = bool(await self._db.upsert(payload))
latency_ms = (time.time() - start_time) * 1000
self._record("update", request, result, latency_ms)
return result
Expand All @@ -297,7 +327,7 @@ async def upsert(self, collection: str, data: Dict[str, Any]) -> str:
request = {"collection": collection, "data": data}
start_time = time.time()
try:
result = await self._db.upsert(collection, data)
result = await self._db.upsert(data)
latency_ms = (time.time() - start_time) * 1000
self._record("upsert", request, result, latency_ms)
return result
Expand All @@ -311,7 +341,7 @@ async def delete(self, collection: str, ids: List[str]) -> int:
request = {"collection": collection, "ids": ids}
start_time = time.time()
try:
result = await self._db.delete(collection, ids)
result = await self._db.delete(ids)
latency_ms = (time.time() - start_time) * 1000
self._record("delete", request, result, latency_ms)
return result
Expand All @@ -325,7 +355,7 @@ async def get(self, collection: str, ids: List[str]) -> List[Dict[str, Any]]:
request = {"collection": collection, "ids": ids}
start_time = time.time()
try:
result = await self._db.get(collection, ids)
result = await self._db.get(ids)
latency_ms = (time.time() - start_time) * 1000
self._record("get", request, result, latency_ms)
return result
Expand All @@ -339,7 +369,7 @@ async def exists(self, collection: str, id: str) -> bool:
request = {"collection": collection, "id": id}
start_time = time.time()
try:
result = await self._db.exists(collection, id)
result = await self._db.exists(id)
latency_ms = (time.time() - start_time) * 1000
self._record("exists", request, result, latency_ms)
return result
Expand All @@ -359,7 +389,11 @@ async def search(
request = {"collection": collection, "vector": vector, "top_k": top_k, "filter": filter}
start_time = time.time()
try:
result = await self._db.search(collection, vector, top_k, filter)
result = await self._db.search(
query_vector=vector,
filter=filter,
limit=top_k,
)
latency_ms = (time.time() - start_time) * 1000
self._record("search", request, result, latency_ms)
return result
Expand All @@ -379,7 +413,11 @@ async def filter(
request = {"collection": collection, "filter": filter, "limit": limit, "offset": offset}
start_time = time.time()
try:
result = await self._db.filter(collection, filter, limit, offset)
result = await self._db.filter(
filter=filter,
limit=limit,
offset=offset,
)
latency_ms = (time.time() - start_time) * 1000
self._record("filter", request, result, latency_ms)
return result
Expand All @@ -402,12 +440,12 @@ async def create_collection(self, name: str, schema: Dict[str, Any]) -> bool:
self._record("create_collection", request, None, latency_ms, False, str(e))
raise

async def drop_collection(self, name: str) -> bool:
async def drop_collection(self) -> bool:
"""Drop collection with recording."""
request = {"name": name}
request = {}
start_time = time.time()
try:
result = await self._db.drop_collection(name)
result = await self._db.drop_collection()
latency_ms = (time.time() - start_time) * 1000
self._record("drop_collection", request, result, latency_ms)
return result
Expand All @@ -416,12 +454,12 @@ async def drop_collection(self, name: str) -> bool:
self._record("drop_collection", request, None, latency_ms, False, str(e))
raise

async def collection_exists(self, name: str) -> bool:
async def collection_exists(self) -> bool:
"""Check collection exists with recording."""
request = {"name": name}
request = {}
start_time = time.time()
try:
result = await self._db.collection_exists(name)
result = await self._db.collection_exists()
latency_ms = (time.time() - start_time) * 1000
self._record("collection_exists", request, result, latency_ms)
return result
Expand All @@ -430,21 +468,6 @@ async def collection_exists(self, name: str) -> bool:
self._record("collection_exists", request, None, latency_ms, False, str(e))
raise

async def list_collections(self) -> List[str]:
"""List collections with recording."""
request = {}
start_time = time.time()
try:
result = await self._db.list_collections()
latency_ms = (time.time() - start_time) * 1000
self._record("list_collections", request, result, latency_ms)
return result
except Exception as e:
latency_ms = (time.time() - start_time) * 1000
self._record("list_collections", request, None, latency_ms, False, str(e))
raise

def __getattr__(self, name: str) -> Any:
"""Pass through any other attributes to the wrapped db."""
return getattr(self._db, name)

Loading
Loading