From e5dbd458340756361b44a8dd8977a114c02b7a3b Mon Sep 17 00:00:00 2001 From: vizsatiz Date: Fri, 10 Apr 2026 17:32:44 +0530 Subject: [PATCH 1/7] fix for migration --- ...f2a1c3b5d9_add_hnsw_index_on_embeddings.py | 69 +++++++++++++++++++ .../queries/generate_query.py | 35 +++++++--- .../services/image_rag_retrieve.py | 19 +++-- 3 files changed, 102 insertions(+), 21 deletions(-) create mode 100644 wavefront/server/modules/db_repo_module/db_repo_module/alembic/versions/2026_04_10_1000-e8f2a1c3b5d9_add_hnsw_index_on_embeddings.py diff --git a/wavefront/server/modules/db_repo_module/db_repo_module/alembic/versions/2026_04_10_1000-e8f2a1c3b5d9_add_hnsw_index_on_embeddings.py b/wavefront/server/modules/db_repo_module/db_repo_module/alembic/versions/2026_04_10_1000-e8f2a1c3b5d9_add_hnsw_index_on_embeddings.py new file mode 100644 index 00000000..1b69cbe8 --- /dev/null +++ b/wavefront/server/modules/db_repo_module/db_repo_module/alembic/versions/2026_04_10_1000-e8f2a1c3b5d9_add_hnsw_index_on_embeddings.py @@ -0,0 +1,69 @@ +"""add hnsw index on knowledge_base_embeddings + +Revision ID: e8f2a1c3b5d9 +Revises: c7a9e2f4b1d0 +Create Date: 2026-04-10 10:00:00.000000 + +""" + +from typing import Sequence, Union + +from alembic import op + + +# revision identifiers, used by Alembic. +revision: str = 'e8f2a1c3b5d9' +down_revision: Union[str, None] = 'c7a9e2f4b1d0' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # HNSW requires a dimensioned vector expression — the column is stored without + # dimensions so we cast inline. + # embedding_vector → 512 dims (CLIP image / text embeddings) + # embedding_vector_1 → 1024 dims (DINO image embeddings) + + # HNSW index on embedding_vector for cosine distance (used in text RAG retrieval) + op.execute(""" + CREATE INDEX CONCURRENTLY IF NOT EXISTS + ix_kbe_embedding_vector_hnsw_cosine + ON knowledge_base_embeddings + USING hnsw ((embedding_vector::vector(512)) vector_cosine_ops) + WITH (m = 16, ef_construction = 64) + """) + + # HNSW index on embedding_vector for L2 distance (used in CLIP image search) + op.execute(""" + CREATE INDEX CONCURRENTLY IF NOT EXISTS + ix_kbe_embedding_vector_hnsw_l2 + ON knowledge_base_embeddings + USING hnsw ((embedding_vector::vector(512)) vector_l2_ops) + WITH (m = 16, ef_construction = 64) + """) + + # HNSW index on embedding_vector_1 for cosine distance (used in DINO image search) + op.execute(""" + CREATE INDEX CONCURRENTLY IF NOT EXISTS + ix_kbe_embedding_vector_1_hnsw_cosine + ON knowledge_base_embeddings + USING hnsw ((embedding_vector_1::vector(1024)) vector_cosine_ops) + WITH (m = 16, ef_construction = 64) + """) + + # GIN index on token column for fast full-text keyword search + op.execute(""" + CREATE INDEX CONCURRENTLY IF NOT EXISTS + ix_kbe_token_gin + ON knowledge_base_embeddings + USING gin (token) + """) + + +def downgrade() -> None: + op.execute('DROP INDEX CONCURRENTLY IF EXISTS ix_kbe_embedding_vector_hnsw_cosine') + op.execute('DROP INDEX CONCURRENTLY IF EXISTS ix_kbe_embedding_vector_hnsw_l2') + op.execute( + 'DROP INDEX CONCURRENTLY IF EXISTS ix_kbe_embedding_vector_1_hnsw_cosine' + ) + op.execute('DROP INDEX CONCURRENTLY IF EXISTS ix_kbe_token_gin') diff --git a/wavefront/server/modules/knowledge_base_module/knowledge_base_module/queries/generate_query.py b/wavefront/server/modules/knowledge_base_module/knowledge_base_module/queries/generate_query.py index 7b4ced74..9e034dea 100644 --- a/wavefront/server/modules/knowledge_base_module/knowledge_base_module/queries/generate_query.py +++ b/wavefront/server/modules/knowledge_base_module/knowledge_base_module/queries/generate_query.py @@ -89,24 +89,37 @@ def get_combined_search_query( ) query_params.update(filter_params) sql_query = f""" - WITH vector_results AS ( + WITH hnsw_candidates AS ( SELECT - e.id as embedding_id, - e.chunk_text, - e.chunk_index, + id, + document_id, + chunk_text, + chunk_index, + embedding_vector <=> :query_embed ::vector(512) AS distance + FROM + {KnowledgeBaseEmbeddings.__tablename__} + ORDER BY + embedding_vector <=> :query_embed ::vector(512) + LIMIT :limit * 10 + ), + vector_results AS ( + SELECT + hc.id as embedding_id, + hc.chunk_text, + hc.chunk_index, d.id as document_id, d.file_path, d.knowledge_base_id, d.metadata_value, - 1 - (e.embedding_vector <=> :query_embed ::vector) as vector_score + 1 - hc.distance as vector_score FROM - {KnowledgeBaseEmbeddings.__tablename__} e + hnsw_candidates hc JOIN - {KnowledgeBaseDocuments.__tablename__} d ON e.document_id = d.id + {KnowledgeBaseDocuments.__tablename__} d ON hc.document_id = d.id WHERE d.knowledge_base_id = :kb_id {'AND (' + metadata_filter_clause_inner + ')' if metadata_filter_clause_inner else ''} ORDER BY - vector_score DESC + hc.distance ASC LIMIT :limit ), keyword_results AS ( @@ -190,7 +203,7 @@ def get_image_embedding( d.file_name, d.knowledge_base_id, d.metadata_value, - e.embedding_vector <-> :query_embedding ::vector AS distance + e.embedding_vector <-> :query_embedding ::vector(512) AS distance FROM {KnowledgeBaseEmbeddings.__tablename__} e JOIN @@ -264,7 +277,7 @@ def get_image_embedding_dino( d.file_name, d.knowledge_base_id, d.metadata_value, - (1 - (e.embedding_vector_1 <=> :query_embedding ::vector)) AS similarity + (1 - (e.embedding_vector_1 <=> :query_embedding ::vector(1024))) AS similarity FROM {KnowledgeBaseEmbeddings.__tablename__} e JOIN {KnowledgeBaseDocuments.__tablename__} d ON e.document_id = d.id WHERE @@ -346,4 +359,4 @@ def get_update_tokens_query() -> str: Returns: SQL query string """ - return "UPDATE knowledge_base_embeddings SET token = to_tsvector('english', chunk_text)" + return "UPDATE knowledge_base_embeddings SET token = to_tsvector('english', chunk_text) WHERE token IS NULL" diff --git a/wavefront/server/modules/knowledge_base_module/knowledge_base_module/services/image_rag_retrieve.py b/wavefront/server/modules/knowledge_base_module/knowledge_base_module/services/image_rag_retrieve.py index 1c78a93c..2603920d 100644 --- a/wavefront/server/modules/knowledge_base_module/knowledge_base_module/services/image_rag_retrieve.py +++ b/wavefront/server/modules/knowledge_base_module/knowledge_base_module/services/image_rag_retrieve.py @@ -14,7 +14,6 @@ def __init__( KnowledgeBaseEmbeddings ], ): - self.reranked_image = [] self.query_generator = QueryGenerator() self.knowledge_base_embeddings_repository = knowledge_base_embeddings_repository @@ -42,22 +41,22 @@ async def retrieve_images( response = await client.post(internal_api_url, json=data) embedding = response.json().get('data', {}).get('response', []) - if embedding: - self.reranked_image = await self.image_retrieve( - embedding[0]['clip'], kb_id, threshold, top_k, query_filter + clip_embedding = next((e['clip'] for e in embedding if 'clip' in e), None) + dino_embedding = next((e['dino'] for e in embedding if 'dino' in e), None) + + if clip_embedding and dino_embedding: + clip_results = await self.image_retrieve( + clip_embedding, kb_id, threshold, top_k, query_filter ) - reference_id_list = [ - str(data['document_id']) for data in self.reranked_image - ] - self.reranked_image = await self.image_retrieve_dino( - embedding[1]['dino'], + reference_id_list = [str(data['document_id']) for data in clip_results] + return await self.image_retrieve_dino( + dino_embedding, kb_id, reference_id_list, query_filter, offset, limit, ) - return self.reranked_image else: return [] From 82ec69613833cc7b483f9810b97113aa1facaa19 Mon Sep 17 00:00:00 2001 From: vizsatiz Date: Fri, 10 Apr 2026 17:44:35 +0530 Subject: [PATCH 2/7] fix for migration --- ...8f2a1c3b5d9_add_hnsw_index_on_embeddings.py | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/wavefront/server/modules/db_repo_module/db_repo_module/alembic/versions/2026_04_10_1000-e8f2a1c3b5d9_add_hnsw_index_on_embeddings.py b/wavefront/server/modules/db_repo_module/db_repo_module/alembic/versions/2026_04_10_1000-e8f2a1c3b5d9_add_hnsw_index_on_embeddings.py index 1b69cbe8..ee81b075 100644 --- a/wavefront/server/modules/db_repo_module/db_repo_module/alembic/versions/2026_04_10_1000-e8f2a1c3b5d9_add_hnsw_index_on_embeddings.py +++ b/wavefront/server/modules/db_repo_module/db_repo_module/alembic/versions/2026_04_10_1000-e8f2a1c3b5d9_add_hnsw_index_on_embeddings.py @@ -26,7 +26,7 @@ def upgrade() -> None: # HNSW index on embedding_vector for cosine distance (used in text RAG retrieval) op.execute(""" - CREATE INDEX CONCURRENTLY IF NOT EXISTS + CREATE INDEX IF NOT EXISTS ix_kbe_embedding_vector_hnsw_cosine ON knowledge_base_embeddings USING hnsw ((embedding_vector::vector(512)) vector_cosine_ops) @@ -35,7 +35,7 @@ def upgrade() -> None: # HNSW index on embedding_vector for L2 distance (used in CLIP image search) op.execute(""" - CREATE INDEX CONCURRENTLY IF NOT EXISTS + CREATE INDEX IF NOT EXISTS ix_kbe_embedding_vector_hnsw_l2 ON knowledge_base_embeddings USING hnsw ((embedding_vector::vector(512)) vector_l2_ops) @@ -44,7 +44,7 @@ def upgrade() -> None: # HNSW index on embedding_vector_1 for cosine distance (used in DINO image search) op.execute(""" - CREATE INDEX CONCURRENTLY IF NOT EXISTS + CREATE INDEX IF NOT EXISTS ix_kbe_embedding_vector_1_hnsw_cosine ON knowledge_base_embeddings USING hnsw ((embedding_vector_1::vector(1024)) vector_cosine_ops) @@ -53,7 +53,7 @@ def upgrade() -> None: # GIN index on token column for fast full-text keyword search op.execute(""" - CREATE INDEX CONCURRENTLY IF NOT EXISTS + CREATE INDEX IF NOT EXISTS ix_kbe_token_gin ON knowledge_base_embeddings USING gin (token) @@ -61,9 +61,7 @@ def upgrade() -> None: def downgrade() -> None: - op.execute('DROP INDEX CONCURRENTLY IF EXISTS ix_kbe_embedding_vector_hnsw_cosine') - op.execute('DROP INDEX CONCURRENTLY IF EXISTS ix_kbe_embedding_vector_hnsw_l2') - op.execute( - 'DROP INDEX CONCURRENTLY IF EXISTS ix_kbe_embedding_vector_1_hnsw_cosine' - ) - op.execute('DROP INDEX CONCURRENTLY IF EXISTS ix_kbe_token_gin') + op.execute('DROP INDEX IF EXISTS ix_kbe_embedding_vector_hnsw_cosine') + op.execute('DROP INDEX IF EXISTS ix_kbe_embedding_vector_hnsw_l2') + op.execute('DROP INDEX IF EXISTS ix_kbe_embedding_vector_1_hnsw_cosine') + op.execute('DROP INDEX IF EXISTS ix_kbe_token_gin') From da2062bf060472ced3ebdf4eeda1257514b6c0b1 Mon Sep 17 00:00:00 2001 From: vizsatiz Date: Fri, 10 Apr 2026 18:12:10 +0530 Subject: [PATCH 3/7] fix for migration --- .../queries/generate_query.py | 123 +++++++++++------- .../services/image_rag_retrieve.py | 2 + 2 files changed, 79 insertions(+), 46 deletions(-) diff --git a/wavefront/server/modules/knowledge_base_module/knowledge_base_module/queries/generate_query.py b/wavefront/server/modules/knowledge_base_module/knowledge_base_module/queries/generate_query.py index 9e034dea..1d8b240a 100644 --- a/wavefront/server/modules/knowledge_base_module/knowledge_base_module/queries/generate_query.py +++ b/wavefront/server/modules/knowledge_base_module/knowledge_base_module/queries/generate_query.py @@ -71,6 +71,7 @@ def get_combined_search_query( } metadata_filter_clause_final = '' metadata_filter_clause_inner = '' + metadata_filter_clause_subquery = '' if filter: where_clause, filter_params = self.odata_parser.prepare_odata_filter(filter) if where_clause and filter_params: @@ -87,6 +88,11 @@ def get_combined_search_query( filter_params, lambda field: f"(d.metadata_value ->> '{field}')", ) + metadata_filter_clause_subquery = self.build_metadata_clause( + where_clause, + filter_params, + lambda field: f"(metadata_value ->> '{field}')", + ) query_params.update(filter_params) sql_query = f""" WITH hnsw_candidates AS ( @@ -95,11 +101,17 @@ def get_combined_search_query( document_id, chunk_text, chunk_index, - embedding_vector <=> :query_embed ::vector(512) AS distance + (embedding_vector::vector(512)) <=> :query_embed ::vector(512) AS distance FROM {KnowledgeBaseEmbeddings.__tablename__} + WHERE + document_id IN ( + SELECT id FROM {KnowledgeBaseDocuments.__tablename__} + WHERE knowledge_base_id = :kb_id + {'AND (' + metadata_filter_clause_subquery + ')' if metadata_filter_clause_subquery else ''} + ) ORDER BY - embedding_vector <=> :query_embed ::vector(512) + (embedding_vector::vector(512)) <=> :query_embed ::vector(512) LIMIT :limit * 10 ), vector_results AS ( @@ -182,41 +194,49 @@ def get_image_embedding( 'kb_id': kb_id, 'top_k': top_k, } - metadata_filter_clause_final = '' + metadata_filter_clause_subquery = '' if filter: where_clause, filter_params = self.odata_parser.prepare_odata_filter(filter) if where_clause and filter_params: - metadata_filter_clause_final = self.build_metadata_clause( + metadata_filter_clause_subquery = self.build_metadata_clause( where_clause, filter_params, - lambda field: f"(d.metadata_value ->> '{field}')", + lambda field: f"(metadata_value ->> '{field}')", ) params.update(filter_params) sql_query = f""" - WITH ranked_embeddings AS ( + WITH hnsw_candidates AS ( SELECT - e.id AS embedding_id, - e.chunk_text, - e.chunk_index, - d.id AS document_id, - d.file_path, - d.file_name, - d.knowledge_base_id, - d.metadata_value, - e.embedding_vector <-> :query_embedding ::vector(512) AS distance + id, + document_id, + chunk_text, + chunk_index, + (embedding_vector::vector(512)) <-> :query_embedding ::vector(512) AS distance FROM - {KnowledgeBaseEmbeddings.__tablename__} e - JOIN - {KnowledgeBaseDocuments.__tablename__} d ON e.document_id = d.id + {KnowledgeBaseEmbeddings.__tablename__} WHERE - d.knowledge_base_id = :kb_id {'AND (' + metadata_filter_clause_final + ')' if metadata_filter_clause_final else ''} - ORDER BY distance ASC + document_id IN ( + SELECT id FROM {KnowledgeBaseDocuments.__tablename__} + WHERE knowledge_base_id = :kb_id + {'AND (' + metadata_filter_clause_subquery + ')' if metadata_filter_clause_subquery else ''} + ) + ORDER BY + (embedding_vector::vector(512)) <-> :query_embedding ::vector(512) + LIMIT :top_k ) SELECT - * - FROM - ranked_embeddings - LIMIT :top_k + hc.id AS embedding_id, + hc.chunk_text, + hc.chunk_index, + d.id AS document_id, + d.file_path, + d.file_name, + d.knowledge_base_id, + d.metadata_value, + hc.distance + FROM hnsw_candidates hc + JOIN {KnowledgeBaseDocuments.__tablename__} d ON hc.document_id = d.id + ORDER BY hc.distance ASC """ return sql_query, params @@ -251,43 +271,54 @@ def get_image_embedding_dino( 'limit': effective_limit, } - metadata_filter_clause_final = '' + metadata_filter_clause_subquery = '' if filter: where_clause, filter_params = self.odata_parser.prepare_odata_filter(filter) if where_clause and filter_params: - metadata_filter_clause_final = self.build_metadata_clause( + metadata_filter_clause_subquery = self.build_metadata_clause( where_clause, filter_params, - lambda field: f"(d.metadata_value ->> '{field}')", + lambda field: f"(metadata_value ->> '{field}')", ) params.update(filter_params) - # Use ANY operator for PostgreSQL array matching + reference_filter = ( - 'AND e.document_id = ANY(:reference_ids)' if processed_reference_ids else '' + 'AND id = ANY(:reference_ids)' if processed_reference_ids else '' ) sql_query = f""" - WITH ranked_embeddings AS ( + WITH hnsw_candidates AS ( SELECT - e.id AS embedding_id, - e.chunk_text, - e.chunk_index, - d.id AS document_id, - d.file_path, - d.file_name, - d.knowledge_base_id, - d.metadata_value, - (1 - (e.embedding_vector_1 <=> :query_embedding ::vector(1024))) AS similarity - FROM {KnowledgeBaseEmbeddings.__tablename__} e - JOIN {KnowledgeBaseDocuments.__tablename__} d ON e.document_id = d.id + id, + document_id, + chunk_text, + chunk_index, + (embedding_vector_1::vector(1024)) <=> :query_embedding ::vector(1024) AS distance + FROM + {KnowledgeBaseEmbeddings.__tablename__} WHERE - d.knowledge_base_id = :kb_id {reference_filter} {'AND (' + metadata_filter_clause_final + ')' if metadata_filter_clause_final else ''} - ORDER BY similarity DESC + document_id IN ( + SELECT id FROM {KnowledgeBaseDocuments.__tablename__} + WHERE knowledge_base_id = :kb_id {reference_filter} + {'AND (' + metadata_filter_clause_subquery + ')' if metadata_filter_clause_subquery else ''} + ) + ORDER BY + (embedding_vector_1::vector(1024)) <=> :query_embedding ::vector(1024) + LIMIT :limit * 10 ) SELECT - * - FROM - ranked_embeddings + hc.id AS embedding_id, + hc.chunk_text, + hc.chunk_index, + d.id AS document_id, + d.file_path, + d.file_name, + d.knowledge_base_id, + d.metadata_value, + 1 - hc.distance AS similarity + FROM hnsw_candidates hc + JOIN {KnowledgeBaseDocuments.__tablename__} d ON hc.document_id = d.id + ORDER BY similarity DESC LIMIT :limit OFFSET :offset """ diff --git a/wavefront/server/modules/knowledge_base_module/knowledge_base_module/services/image_rag_retrieve.py b/wavefront/server/modules/knowledge_base_module/knowledge_base_module/services/image_rag_retrieve.py index 2603920d..667c66fa 100644 --- a/wavefront/server/modules/knowledge_base_module/knowledge_base_module/services/image_rag_retrieve.py +++ b/wavefront/server/modules/knowledge_base_module/knowledge_base_module/services/image_rag_retrieve.py @@ -48,6 +48,8 @@ async def retrieve_images( clip_results = await self.image_retrieve( clip_embedding, kb_id, threshold, top_k, query_filter ) + if not clip_results: + return [] reference_id_list = [str(data['document_id']) for data in clip_results] return await self.image_retrieve_dino( dino_embedding, From 988f23f252a754e3e7a3dc0fff77a6a20fe34c16 Mon Sep 17 00:00:00 2001 From: vizsatiz Date: Fri, 10 Apr 2026 20:44:09 +0530 Subject: [PATCH 4/7] fix for migration --- ...f2a1c3b5d9_add_hnsw_index_on_embeddings.py | 51 +++++++++++++------ 1 file changed, 35 insertions(+), 16 deletions(-) diff --git a/wavefront/server/modules/db_repo_module/db_repo_module/alembic/versions/2026_04_10_1000-e8f2a1c3b5d9_add_hnsw_index_on_embeddings.py b/wavefront/server/modules/db_repo_module/db_repo_module/alembic/versions/2026_04_10_1000-e8f2a1c3b5d9_add_hnsw_index_on_embeddings.py index ee81b075..df07ae53 100644 --- a/wavefront/server/modules/db_repo_module/db_repo_module/alembic/versions/2026_04_10_1000-e8f2a1c3b5d9_add_hnsw_index_on_embeddings.py +++ b/wavefront/server/modules/db_repo_module/db_repo_module/alembic/versions/2026_04_10_1000-e8f2a1c3b5d9_add_hnsw_index_on_embeddings.py @@ -9,6 +9,7 @@ from typing import Sequence, Union from alembic import op +from sqlalchemy import text # revision identifiers, used by Alembic. @@ -23,45 +24,63 @@ def upgrade() -> None: # dimensions so we cast inline. # embedding_vector → 512 dims (CLIP image / text embeddings) # embedding_vector_1 → 1024 dims (DINO image embeddings) + # + # CREATE INDEX CONCURRENTLY must run outside a transaction block. + # We switch the connection to AUTOCOMMIT for the duration of this migration. + connection = op.get_bind() + connection.execution_options(isolation_level='AUTOCOMMIT') - # HNSW index on embedding_vector for cosine distance (used in text RAG retrieval) - op.execute(""" - CREATE INDEX IF NOT EXISTS + connection.execute( + text(""" + CREATE INDEX CONCURRENTLY IF NOT EXISTS ix_kbe_embedding_vector_hnsw_cosine ON knowledge_base_embeddings USING hnsw ((embedding_vector::vector(512)) vector_cosine_ops) WITH (m = 16, ef_construction = 64) """) + ) - # HNSW index on embedding_vector for L2 distance (used in CLIP image search) - op.execute(""" - CREATE INDEX IF NOT EXISTS + connection.execute( + text(""" + CREATE INDEX CONCURRENTLY IF NOT EXISTS ix_kbe_embedding_vector_hnsw_l2 ON knowledge_base_embeddings USING hnsw ((embedding_vector::vector(512)) vector_l2_ops) WITH (m = 16, ef_construction = 64) """) + ) - # HNSW index on embedding_vector_1 for cosine distance (used in DINO image search) - op.execute(""" - CREATE INDEX IF NOT EXISTS + connection.execute( + text(""" + CREATE INDEX CONCURRENTLY IF NOT EXISTS ix_kbe_embedding_vector_1_hnsw_cosine ON knowledge_base_embeddings USING hnsw ((embedding_vector_1::vector(1024)) vector_cosine_ops) WITH (m = 16, ef_construction = 64) """) + ) - # GIN index on token column for fast full-text keyword search - op.execute(""" - CREATE INDEX IF NOT EXISTS + connection.execute( + text(""" + CREATE INDEX CONCURRENTLY IF NOT EXISTS ix_kbe_token_gin ON knowledge_base_embeddings USING gin (token) """) + ) def downgrade() -> None: - op.execute('DROP INDEX IF EXISTS ix_kbe_embedding_vector_hnsw_cosine') - op.execute('DROP INDEX IF EXISTS ix_kbe_embedding_vector_hnsw_l2') - op.execute('DROP INDEX IF EXISTS ix_kbe_embedding_vector_1_hnsw_cosine') - op.execute('DROP INDEX IF EXISTS ix_kbe_token_gin') + connection = op.get_bind() + connection.execution_options(isolation_level='AUTOCOMMIT') + + connection.execute( + text('DROP INDEX CONCURRENTLY IF EXISTS ix_kbe_embedding_vector_hnsw_cosine') + ) + connection.execute( + text('DROP INDEX CONCURRENTLY IF EXISTS ix_kbe_embedding_vector_hnsw_l2') + ) + connection.execute( + text('DROP INDEX CONCURRENTLY IF EXISTS ix_kbe_embedding_vector_1_hnsw_cosine') + ) + connection.execute(text('DROP INDEX CONCURRENTLY IF EXISTS ix_kbe_token_gin')) From 37acaf25f6f4d30c0c30f5c1c2adef918a794a8d Mon Sep 17 00:00:00 2001 From: vizsatiz Date: Tue, 14 Apr 2026 09:41:45 +0530 Subject: [PATCH 5/7] fix for embedding index creation --- ...f2a1c3b5d9_add_hnsw_index_on_embeddings.py | 16 +----- .../queries/generate_query.py | 57 ++++++------------- 2 files changed, 19 insertions(+), 54 deletions(-) diff --git a/wavefront/server/modules/db_repo_module/db_repo_module/alembic/versions/2026_04_10_1000-e8f2a1c3b5d9_add_hnsw_index_on_embeddings.py b/wavefront/server/modules/db_repo_module/db_repo_module/alembic/versions/2026_04_10_1000-e8f2a1c3b5d9_add_hnsw_index_on_embeddings.py index df07ae53..990ecd11 100644 --- a/wavefront/server/modules/db_repo_module/db_repo_module/alembic/versions/2026_04_10_1000-e8f2a1c3b5d9_add_hnsw_index_on_embeddings.py +++ b/wavefront/server/modules/db_repo_module/db_repo_module/alembic/versions/2026_04_10_1000-e8f2a1c3b5d9_add_hnsw_index_on_embeddings.py @@ -26,10 +26,11 @@ def upgrade() -> None: # embedding_vector_1 → 1024 dims (DINO image embeddings) # # CREATE INDEX CONCURRENTLY must run outside a transaction block. - # We switch the connection to AUTOCOMMIT for the duration of this migration. connection = op.get_bind() connection.execution_options(isolation_level='AUTOCOMMIT') + connection.execute(text("SET maintenance_work_mem = '2GB'")) + connection.execute( text(""" CREATE INDEX CONCURRENTLY IF NOT EXISTS @@ -40,16 +41,6 @@ def upgrade() -> None: """) ) - connection.execute( - text(""" - CREATE INDEX CONCURRENTLY IF NOT EXISTS - ix_kbe_embedding_vector_hnsw_l2 - ON knowledge_base_embeddings - USING hnsw ((embedding_vector::vector(512)) vector_l2_ops) - WITH (m = 16, ef_construction = 64) - """) - ) - connection.execute( text(""" CREATE INDEX CONCURRENTLY IF NOT EXISTS @@ -77,9 +68,6 @@ def downgrade() -> None: connection.execute( text('DROP INDEX CONCURRENTLY IF EXISTS ix_kbe_embedding_vector_hnsw_cosine') ) - connection.execute( - text('DROP INDEX CONCURRENTLY IF EXISTS ix_kbe_embedding_vector_hnsw_l2') - ) connection.execute( text('DROP INDEX CONCURRENTLY IF EXISTS ix_kbe_embedding_vector_1_hnsw_cosine') ) diff --git a/wavefront/server/modules/knowledge_base_module/knowledge_base_module/queries/generate_query.py b/wavefront/server/modules/knowledge_base_module/knowledge_base_module/queries/generate_query.py index 1d8b240a..96a2fbbe 100644 --- a/wavefront/server/modules/knowledge_base_module/knowledge_base_module/queries/generate_query.py +++ b/wavefront/server/modules/knowledge_base_module/knowledge_base_module/queries/generate_query.py @@ -71,7 +71,6 @@ def get_combined_search_query( } metadata_filter_clause_final = '' metadata_filter_clause_inner = '' - metadata_filter_clause_subquery = '' if filter: where_clause, filter_params = self.odata_parser.prepare_odata_filter(filter) if where_clause and filter_params: @@ -88,11 +87,6 @@ def get_combined_search_query( filter_params, lambda field: f"(d.metadata_value ->> '{field}')", ) - metadata_filter_clause_subquery = self.build_metadata_clause( - where_clause, - filter_params, - lambda field: f"(metadata_value ->> '{field}')", - ) query_params.update(filter_params) sql_query = f""" WITH hnsw_candidates AS ( @@ -104,15 +98,9 @@ def get_combined_search_query( (embedding_vector::vector(512)) <=> :query_embed ::vector(512) AS distance FROM {KnowledgeBaseEmbeddings.__tablename__} - WHERE - document_id IN ( - SELECT id FROM {KnowledgeBaseDocuments.__tablename__} - WHERE knowledge_base_id = :kb_id - {'AND (' + metadata_filter_clause_subquery + ')' if metadata_filter_clause_subquery else ''} - ) ORDER BY (embedding_vector::vector(512)) <=> :query_embed ::vector(512) - LIMIT :limit * 10 + LIMIT :limit * 20 ), vector_results AS ( SELECT @@ -194,14 +182,14 @@ def get_image_embedding( 'kb_id': kb_id, 'top_k': top_k, } - metadata_filter_clause_subquery = '' + metadata_filter_clause = '' if filter: where_clause, filter_params = self.odata_parser.prepare_odata_filter(filter) if where_clause and filter_params: - metadata_filter_clause_subquery = self.build_metadata_clause( + metadata_filter_clause = self.build_metadata_clause( where_clause, filter_params, - lambda field: f"(metadata_value ->> '{field}')", + lambda field: f"(d.metadata_value ->> '{field}')", ) params.update(filter_params) sql_query = f""" @@ -211,18 +199,12 @@ def get_image_embedding( document_id, chunk_text, chunk_index, - (embedding_vector::vector(512)) <-> :query_embedding ::vector(512) AS distance + (embedding_vector::vector(512)) <=> :query_embedding ::vector(512) AS distance FROM {KnowledgeBaseEmbeddings.__tablename__} - WHERE - document_id IN ( - SELECT id FROM {KnowledgeBaseDocuments.__tablename__} - WHERE knowledge_base_id = :kb_id - {'AND (' + metadata_filter_clause_subquery + ')' if metadata_filter_clause_subquery else ''} - ) ORDER BY - (embedding_vector::vector(512)) <-> :query_embedding ::vector(512) - LIMIT :top_k + (embedding_vector::vector(512)) <=> :query_embedding ::vector(512) + LIMIT :top_k * 20 ) SELECT hc.id AS embedding_id, @@ -236,7 +218,10 @@ def get_image_embedding( hc.distance FROM hnsw_candidates hc JOIN {KnowledgeBaseDocuments.__tablename__} d ON hc.document_id = d.id + WHERE d.knowledge_base_id = :kb_id + {'AND (' + metadata_filter_clause + ')' if metadata_filter_clause else ''} ORDER BY hc.distance ASC + LIMIT :top_k """ return sql_query, params @@ -265,27 +250,22 @@ def get_image_embedding_dino( params = { 'query_embedding': query_embeddings, 'kb_id': kb_id, - 'top_k': effective_limit, 'reference_ids': processed_reference_ids, 'offset': effective_offset, 'limit': effective_limit, } - metadata_filter_clause_subquery = '' + metadata_filter_clause = '' if filter: where_clause, filter_params = self.odata_parser.prepare_odata_filter(filter) if where_clause and filter_params: - metadata_filter_clause_subquery = self.build_metadata_clause( + metadata_filter_clause = self.build_metadata_clause( where_clause, filter_params, - lambda field: f"(metadata_value ->> '{field}')", + lambda field: f"(d.metadata_value ->> '{field}')", ) params.update(filter_params) - reference_filter = ( - 'AND id = ANY(:reference_ids)' if processed_reference_ids else '' - ) - sql_query = f""" WITH hnsw_candidates AS ( SELECT @@ -296,15 +276,9 @@ def get_image_embedding_dino( (embedding_vector_1::vector(1024)) <=> :query_embedding ::vector(1024) AS distance FROM {KnowledgeBaseEmbeddings.__tablename__} - WHERE - document_id IN ( - SELECT id FROM {KnowledgeBaseDocuments.__tablename__} - WHERE knowledge_base_id = :kb_id {reference_filter} - {'AND (' + metadata_filter_clause_subquery + ')' if metadata_filter_clause_subquery else ''} - ) ORDER BY (embedding_vector_1::vector(1024)) <=> :query_embedding ::vector(1024) - LIMIT :limit * 10 + LIMIT :limit * 20 ) SELECT hc.id AS embedding_id, @@ -318,6 +292,9 @@ def get_image_embedding_dino( 1 - hc.distance AS similarity FROM hnsw_candidates hc JOIN {KnowledgeBaseDocuments.__tablename__} d ON hc.document_id = d.id + WHERE d.knowledge_base_id = :kb_id + {('AND d.id = ANY(:reference_ids)' if processed_reference_ids else '')} + {'AND (' + metadata_filter_clause + ')' if metadata_filter_clause else ''} ORDER BY similarity DESC LIMIT :limit OFFSET :offset """ From 762e812e3c20201fbcf7636377279f42cdef6ee5 Mon Sep 17 00:00:00 2001 From: vizsatiz Date: Wed, 15 Apr 2026 14:12:50 +0530 Subject: [PATCH 6/7] fix migration commit issue --- ...26_04_10_1000-e8f2a1c3b5d9_add_hnsw_index_on_embeddings.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/wavefront/server/modules/db_repo_module/db_repo_module/alembic/versions/2026_04_10_1000-e8f2a1c3b5d9_add_hnsw_index_on_embeddings.py b/wavefront/server/modules/db_repo_module/db_repo_module/alembic/versions/2026_04_10_1000-e8f2a1c3b5d9_add_hnsw_index_on_embeddings.py index 990ecd11..37fe6e3d 100644 --- a/wavefront/server/modules/db_repo_module/db_repo_module/alembic/versions/2026_04_10_1000-e8f2a1c3b5d9_add_hnsw_index_on_embeddings.py +++ b/wavefront/server/modules/db_repo_module/db_repo_module/alembic/versions/2026_04_10_1000-e8f2a1c3b5d9_add_hnsw_index_on_embeddings.py @@ -26,7 +26,10 @@ def upgrade() -> None: # embedding_vector_1 → 1024 dims (DINO image embeddings) # # CREATE INDEX CONCURRENTLY must run outside a transaction block. + # SQLAlchemy 2.x autobegins a transaction on get_bind(); we must COMMIT it + # before switching to AUTOCOMMIT, otherwise isolation_level change is rejected. connection = op.get_bind() + connection.execute(text('COMMIT')) connection.execution_options(isolation_level='AUTOCOMMIT') connection.execute(text("SET maintenance_work_mem = '2GB'")) @@ -63,6 +66,7 @@ def upgrade() -> None: def downgrade() -> None: connection = op.get_bind() + connection.execute(text('COMMIT')) connection.execution_options(isolation_level='AUTOCOMMIT') connection.execute( From d5dd2ee0be70227d28fc9dc56b5a3cea75c5778a Mon Sep 17 00:00:00 2001 From: vizsatiz Date: Wed, 15 Apr 2026 14:47:52 +0530 Subject: [PATCH 7/7] fix migration commit issue --- ...f2a1c3b5d9_add_hnsw_index_on_embeddings.py | 92 ++++++++++--------- 1 file changed, 49 insertions(+), 43 deletions(-) diff --git a/wavefront/server/modules/db_repo_module/db_repo_module/alembic/versions/2026_04_10_1000-e8f2a1c3b5d9_add_hnsw_index_on_embeddings.py b/wavefront/server/modules/db_repo_module/db_repo_module/alembic/versions/2026_04_10_1000-e8f2a1c3b5d9_add_hnsw_index_on_embeddings.py index 37fe6e3d..6c00f325 100644 --- a/wavefront/server/modules/db_repo_module/db_repo_module/alembic/versions/2026_04_10_1000-e8f2a1c3b5d9_add_hnsw_index_on_embeddings.py +++ b/wavefront/server/modules/db_repo_module/db_repo_module/alembic/versions/2026_04_10_1000-e8f2a1c3b5d9_add_hnsw_index_on_embeddings.py @@ -25,54 +25,60 @@ def upgrade() -> None: # embedding_vector → 512 dims (CLIP image / text embeddings) # embedding_vector_1 → 1024 dims (DINO image embeddings) # - # CREATE INDEX CONCURRENTLY must run outside a transaction block. - # SQLAlchemy 2.x autobegins a transaction on get_bind(); we must COMMIT it - # before switching to AUTOCOMMIT, otherwise isolation_level change is rejected. - connection = op.get_bind() - connection.execute(text('COMMIT')) - connection.execution_options(isolation_level='AUTOCOMMIT') + # CREATE INDEX CONCURRENTLY cannot run inside a transaction block. + # SQLAlchemy 2.x autobegins a transaction on op.get_bind(), and + # execution_options(isolation_level=AUTOCOMMIT) is rejected while a + # Transaction object is active. We get a fresh AUTOCOMMIT connection + # directly from the underlying sync engine instead. + bind = op.get_bind() + sync_engine = getattr(bind.engine, 'sync_engine', bind.engine) - connection.execute(text("SET maintenance_work_mem = '2GB'")) + with sync_engine.execution_options(isolation_level='AUTOCOMMIT').connect() as conn: + conn.execute(text("SET maintenance_work_mem = '2GB'")) - connection.execute( - text(""" - CREATE INDEX CONCURRENTLY IF NOT EXISTS - ix_kbe_embedding_vector_hnsw_cosine - ON knowledge_base_embeddings - USING hnsw ((embedding_vector::vector(512)) vector_cosine_ops) - WITH (m = 16, ef_construction = 64) - """) - ) + conn.execute( + text(""" + CREATE INDEX CONCURRENTLY IF NOT EXISTS + ix_kbe_embedding_vector_hnsw_cosine + ON knowledge_base_embeddings + USING hnsw ((embedding_vector::vector(512)) vector_cosine_ops) + WITH (m = 16, ef_construction = 64) + """) + ) - connection.execute( - text(""" - CREATE INDEX CONCURRENTLY IF NOT EXISTS - ix_kbe_embedding_vector_1_hnsw_cosine - ON knowledge_base_embeddings - USING hnsw ((embedding_vector_1::vector(1024)) vector_cosine_ops) - WITH (m = 16, ef_construction = 64) - """) - ) + conn.execute( + text(""" + CREATE INDEX CONCURRENTLY IF NOT EXISTS + ix_kbe_embedding_vector_1_hnsw_cosine + ON knowledge_base_embeddings + USING hnsw ((embedding_vector_1::vector(1024)) vector_cosine_ops) + WITH (m = 16, ef_construction = 64) + """) + ) - connection.execute( - text(""" - CREATE INDEX CONCURRENTLY IF NOT EXISTS - ix_kbe_token_gin - ON knowledge_base_embeddings - USING gin (token) - """) - ) + conn.execute( + text(""" + CREATE INDEX CONCURRENTLY IF NOT EXISTS + ix_kbe_token_gin + ON knowledge_base_embeddings + USING gin (token) + """) + ) def downgrade() -> None: - connection = op.get_bind() - connection.execute(text('COMMIT')) - connection.execution_options(isolation_level='AUTOCOMMIT') + bind = op.get_bind() + sync_engine = getattr(bind.engine, 'sync_engine', bind.engine) - connection.execute( - text('DROP INDEX CONCURRENTLY IF EXISTS ix_kbe_embedding_vector_hnsw_cosine') - ) - connection.execute( - text('DROP INDEX CONCURRENTLY IF EXISTS ix_kbe_embedding_vector_1_hnsw_cosine') - ) - connection.execute(text('DROP INDEX CONCURRENTLY IF EXISTS ix_kbe_token_gin')) + with sync_engine.execution_options(isolation_level='AUTOCOMMIT').connect() as conn: + conn.execute( + text( + 'DROP INDEX CONCURRENTLY IF EXISTS ix_kbe_embedding_vector_hnsw_cosine' + ) + ) + conn.execute( + text( + 'DROP INDEX CONCURRENTLY IF EXISTS ix_kbe_embedding_vector_1_hnsw_cosine' + ) + ) + conn.execute(text('DROP INDEX CONCURRENTLY IF EXISTS ix_kbe_token_gin'))