Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 66 additions & 20 deletions crates/khive-db/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,13 +235,15 @@ impl StorageBackend {
/// Get a VectorStore for a specific embedding model, scoped to the default namespace.
///
/// Creates the vec0 virtual table if it does not already exist. The `model_key`
/// must contain only ASCII alphanumeric/underscore characters.
/// must contain only ASCII alphanumeric/underscore characters. The `embedding_model`
/// is the canonical display name stored in each vector row.
pub fn vectors(
&self,
model_key: &str,
embedding_model: &str,
dimensions: usize,
) -> Result<Arc<dyn khive_storage::VectorStore>, SqliteError> {
self.vectors_for_namespace(model_key, dimensions, "local")
self.vectors_for_namespace(model_key, embedding_model, dimensions, "local")
}

/// Get a VectorStore for a specific embedding model with a default namespace.
Expand All @@ -251,9 +253,12 @@ impl StorageBackend {
/// (count, delete, info). Access control is enforced at the runtime layer.
///
/// The `model_key` must contain only ASCII alphanumeric/underscore characters.
/// The `embedding_model` is the canonical display name stored in the `embedding_model`
/// column of each vector row (e.g. `"all-minilm-l6-v2"`).
pub fn vectors_for_namespace(
&self,
model_key: &str,
embedding_model: &str,
dimensions: usize,
namespace: &str,
) -> Result<Arc<dyn khive_storage::VectorStore>, SqliteError> {
Expand Down Expand Up @@ -298,21 +303,24 @@ impl StorageBackend {
.is_some();

if table_exists {
let has_field: bool = {
let (has_field, has_embedding_model) = {
let pragma = format!("PRAGMA table_xinfo({})", table);
let mut stmt = writer.conn().prepare(&pragma)?;
let mut rows = stmt.query([])?;
let mut found = false;
let mut has_field = false;
let mut has_embedding_model = false;
while let Some(row) = rows.next()? {
let name: String = row.get(1)?;
if name == "field" {
found = true;
break;
has_field = true;
}
if name == "embedding_model" {
has_embedding_model = true;
}
}
found
(has_field, has_embedding_model)
};
if !has_field {
if !has_field || !has_embedding_model {
let drop_ddl = format!("DROP TABLE IF EXISTS {}", table);
writer.conn().execute_batch(&drop_ddl)?;
}
Expand All @@ -332,19 +340,13 @@ impl StorageBackend {

// Create the vec0 virtual table. Idempotent on fresh databases and after the
// old-schema rebuild above.
//
// NOTE: `embedding_model_id` is NOT included in this DDL because sqlite-vec
// enforces NOT NULL on TEXT metadata columns at insert time, so the column
// cannot be added at virtual-table creation as a nullable FK. The column will
// be present after the ADR-043 §8 startup backfill rebuild (steps 2-4), which
// is deferred to a follow-up PR — see the tracking issue filed against MAJ-2
// of codex round-1 review of PR #374.
let ddl = format!(
"CREATE VIRTUAL TABLE IF NOT EXISTS vec_{} USING vec0(\
subject_id TEXT PRIMARY KEY, \
namespace TEXT NOT NULL, \
kind TEXT NOT NULL, \
field TEXT NOT NULL, \
embedding_model TEXT NOT NULL, \
embedding float[{}] distance_metric=cosine\
)",
model_key, dimensions
Expand All @@ -355,11 +357,54 @@ impl StorageBackend {
Arc::clone(&self.pool),
self.is_file_backed,
model_key.to_string(),
embedding_model.to_string(),
dimensions,
namespace.trim().to_string(),
)?))
}

/// Register an embedding model in the `_embedding_models` registry table (ADR-043).
///
/// Idempotent: if a row with the same `canonical_key` already exists, updates its
/// status back to `'active'` without changing other fields.
pub fn register_embedding_model(
&self,
engine_name: &str,
model_id: &str,
key_version: &str,
dimensions: u32,
) -> Result<(), SqliteError> {
let writer = self.pool.try_writer()?;
writer
.conn()
.execute_batch(crate::migrations::EMBEDDING_MODELS_DDL)?;

let now = chrono::Utc::now().timestamp_micros();
let canonical_key =
format!("{engine_name}:{model_id}:{key_version}:{dimensions}").into_bytes();
let id = uuid::Uuid::new_v4();
writer.conn().execute(
"INSERT INTO _embedding_models \
(id, engine_name, model_id, key_version, dim, output_dim, status, \
activated_at, superseded_at, superseded_by, canonical_key, created_at) \
VALUES (?1, ?2, ?3, ?4, ?5, NULL, 'active', ?6, NULL, NULL, ?7, ?8) \
ON CONFLICT(canonical_key) DO UPDATE SET \
status = 'active', \
activated_at = COALESCE(_embedding_models.activated_at, excluded.activated_at)",
rusqlite::params![
id.as_bytes().as_slice(),
engine_name,
model_id,
key_version,
dimensions as i64,
now,
canonical_key,
now,
],
)?;
Ok(())
}

/// Get a SparseStore for a specific model key, scoped to the default namespace.
///
/// Creates the sparse table if it does not already exist.
Expand Down Expand Up @@ -599,7 +644,7 @@ mod tests {
#[cfg(feature = "vectors")]
async fn vectors_roundtrip_via_public_api() {
let backend = StorageBackend::memory().unwrap();
let store = backend.vectors("test_api", 3).unwrap();
let store = backend.vectors("test_api", "test_api", 3).unwrap();

let id = uuid::Uuid::new_v4();
store
Expand All @@ -619,6 +664,7 @@ mod tests {
top_k: 1,
namespace: None,
kind: None,
embedding_model: None,
filter: None,
backend_hints: None,
})
Expand All @@ -635,8 +681,8 @@ mod tests {
async fn vectors_creates_table_idempotently() {
let backend = StorageBackend::memory().unwrap();

let store1 = backend.vectors("idempotent", 3).unwrap();
let store2 = backend.vectors("idempotent", 3).unwrap();
let store1 = backend.vectors("idempotent", "idempotent", 3).unwrap();
let store2 = backend.vectors("idempotent", "idempotent", 3).unwrap();

let id = uuid::Uuid::new_v4();
store1
Expand Down Expand Up @@ -724,8 +770,8 @@ mod tests {
#[test]
fn invalid_model_key_rejected() {
let backend = StorageBackend::memory().unwrap();
assert!(backend.vectors("bad key!", 3).is_err());
assert!(backend.vectors("", 3).is_err());
assert!(backend.vectors("bad key!", "bad key!", 3).is_err());
assert!(backend.vectors("", "", 3).is_err());
}

#[test]
Expand Down
3 changes: 2 additions & 1 deletion crates/khive-db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ pub mod stores;
pub use backend::StorageBackend;
pub use error::SqliteError;
pub use migrations::{
run_migrations, Migration, ServiceSchemaPlan, VersionedMigration, MIGRATIONS,
query_embedding_models, run_migrations, EmbeddingModelRegistryRecord, Migration,
ServiceSchemaPlan, VersionedMigration, MIGRATIONS,
};
pub use pool::{ConnectionPool, PoolConfig, ReaderGuard, WriterGuard};
pub use sql_bridge::SqlBridge;
Loading
Loading