Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
"""add hnsw index on knowledge_base_embeddings

Revision ID: e8f2a1c3b5d9
Revises: c7a9e2f4b1d0
Create Date: 2026-04-10 10:00:00.000000

"""

from typing import Sequence, Union

from alembic import op
from sqlalchemy import text


# revision identifiers, used by Alembic.
revision: str = 'e8f2a1c3b5d9'
down_revision: Union[str, None] = 'c7a9e2f4b1d0'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
# HNSW requires a dimensioned vector expression — the column is stored without
# dimensions so we cast inline.
# embedding_vector → 512 dims (CLIP image / text embeddings)
# embedding_vector_1 → 1024 dims (DINO image embeddings)
#
# CREATE INDEX CONCURRENTLY must run outside a transaction block.
# We switch the connection to AUTOCOMMIT for the duration of this migration.
connection = op.get_bind()
connection.execution_options(isolation_level='AUTOCOMMIT')

connection.execute(
text("""
CREATE INDEX CONCURRENTLY IF NOT EXISTS
ix_kbe_embedding_vector_hnsw_cosine
ON knowledge_base_embeddings
USING hnsw ((embedding_vector::vector(512)) vector_cosine_ops)
WITH (m = 16, ef_construction = 64)
""")
)

connection.execute(
text("""
CREATE INDEX CONCURRENTLY IF NOT EXISTS
ix_kbe_embedding_vector_hnsw_l2
ON knowledge_base_embeddings
USING hnsw ((embedding_vector::vector(512)) vector_l2_ops)
WITH (m = 16, ef_construction = 64)
""")
)

connection.execute(
text("""
CREATE INDEX CONCURRENTLY IF NOT EXISTS
ix_kbe_embedding_vector_1_hnsw_cosine
ON knowledge_base_embeddings
USING hnsw ((embedding_vector_1::vector(1024)) vector_cosine_ops)
WITH (m = 16, ef_construction = 64)
""")
)

connection.execute(
text("""
CREATE INDEX CONCURRENTLY IF NOT EXISTS
ix_kbe_token_gin
ON knowledge_base_embeddings
USING gin (token)
""")
)


def downgrade() -> None:
connection = op.get_bind()
connection.execution_options(isolation_level='AUTOCOMMIT')

connection.execute(
text('DROP INDEX CONCURRENTLY IF EXISTS ix_kbe_embedding_vector_hnsw_cosine')
)
connection.execute(
text('DROP INDEX CONCURRENTLY IF EXISTS ix_kbe_embedding_vector_hnsw_l2')
)
connection.execute(
text('DROP INDEX CONCURRENTLY IF EXISTS ix_kbe_embedding_vector_1_hnsw_cosine')
)
connection.execute(text('DROP INDEX CONCURRENTLY IF EXISTS ix_kbe_token_gin'))
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def get_combined_search_query(
}
metadata_filter_clause_final = ''
metadata_filter_clause_inner = ''
metadata_filter_clause_subquery = ''
if filter:
where_clause, filter_params = self.odata_parser.prepare_odata_filter(filter)
if where_clause and filter_params:
Expand All @@ -87,26 +88,50 @@ def get_combined_search_query(
filter_params,
lambda field: f"(d.metadata_value ->> '{field}')",
)
metadata_filter_clause_subquery = self.build_metadata_clause(
where_clause,
filter_params,
lambda field: f"(metadata_value ->> '{field}')",
)
query_params.update(filter_params)
sql_query = f"""
WITH vector_results AS (
WITH hnsw_candidates AS (
SELECT
e.id as embedding_id,
e.chunk_text,
e.chunk_index,
id,
document_id,
chunk_text,
chunk_index,
(embedding_vector::vector(512)) <=> :query_embed ::vector(512) AS distance
FROM
{KnowledgeBaseEmbeddings.__tablename__}
WHERE
document_id IN (
SELECT id FROM {KnowledgeBaseDocuments.__tablename__}
WHERE knowledge_base_id = :kb_id
{'AND (' + metadata_filter_clause_subquery + ')' if metadata_filter_clause_subquery else ''}
)
ORDER BY
(embedding_vector::vector(512)) <=> :query_embed ::vector(512)
LIMIT :limit * 10
),
vector_results AS (
SELECT
hc.id as embedding_id,
hc.chunk_text,
hc.chunk_index,
d.id as document_id,
d.file_path,
d.knowledge_base_id,
d.metadata_value,
1 - (e.embedding_vector <=> :query_embed ::vector) as vector_score
1 - hc.distance as vector_score
FROM
{KnowledgeBaseEmbeddings.__tablename__} e
hnsw_candidates hc
JOIN
{KnowledgeBaseDocuments.__tablename__} d ON e.document_id = d.id
{KnowledgeBaseDocuments.__tablename__} d ON hc.document_id = d.id
WHERE
d.knowledge_base_id = :kb_id {'AND (' + metadata_filter_clause_inner + ')' if metadata_filter_clause_inner else ''}
ORDER BY
vector_score DESC
hc.distance ASC
LIMIT :limit
),
keyword_results AS (
Expand Down Expand Up @@ -169,41 +194,49 @@ def get_image_embedding(
'kb_id': kb_id,
'top_k': top_k,
}
metadata_filter_clause_final = ''
metadata_filter_clause_subquery = ''
if filter:
where_clause, filter_params = self.odata_parser.prepare_odata_filter(filter)
if where_clause and filter_params:
metadata_filter_clause_final = self.build_metadata_clause(
metadata_filter_clause_subquery = self.build_metadata_clause(
where_clause,
filter_params,
lambda field: f"(d.metadata_value ->> '{field}')",
lambda field: f"(metadata_value ->> '{field}')",
)
params.update(filter_params)
sql_query = f"""
WITH ranked_embeddings AS (
WITH hnsw_candidates AS (
SELECT
e.id AS embedding_id,
e.chunk_text,
e.chunk_index,
d.id AS document_id,
d.file_path,
d.file_name,
d.knowledge_base_id,
d.metadata_value,
e.embedding_vector <-> :query_embedding ::vector AS distance
id,
document_id,
chunk_text,
chunk_index,
(embedding_vector::vector(512)) <-> :query_embedding ::vector(512) AS distance
FROM
{KnowledgeBaseEmbeddings.__tablename__} e
JOIN
{KnowledgeBaseDocuments.__tablename__} d ON e.document_id = d.id
{KnowledgeBaseEmbeddings.__tablename__}
WHERE
d.knowledge_base_id = :kb_id {'AND (' + metadata_filter_clause_final + ')' if metadata_filter_clause_final else ''}
ORDER BY distance ASC
document_id IN (
SELECT id FROM {KnowledgeBaseDocuments.__tablename__}
WHERE knowledge_base_id = :kb_id
{'AND (' + metadata_filter_clause_subquery + ')' if metadata_filter_clause_subquery else ''}
)
ORDER BY
(embedding_vector::vector(512)) <-> :query_embedding ::vector(512)
LIMIT :top_k
)
SELECT
*
FROM
ranked_embeddings
LIMIT :top_k
hc.id AS embedding_id,
hc.chunk_text,
hc.chunk_index,
d.id AS document_id,
d.file_path,
d.file_name,
d.knowledge_base_id,
d.metadata_value,
hc.distance
FROM hnsw_candidates hc
JOIN {KnowledgeBaseDocuments.__tablename__} d ON hc.document_id = d.id
ORDER BY hc.distance ASC
"""

return sql_query, params
Expand Down Expand Up @@ -238,43 +271,54 @@ def get_image_embedding_dino(
'limit': effective_limit,
}

metadata_filter_clause_final = ''
metadata_filter_clause_subquery = ''
if filter:
where_clause, filter_params = self.odata_parser.prepare_odata_filter(filter)
if where_clause and filter_params:
metadata_filter_clause_final = self.build_metadata_clause(
metadata_filter_clause_subquery = self.build_metadata_clause(
where_clause,
filter_params,
lambda field: f"(d.metadata_value ->> '{field}')",
lambda field: f"(metadata_value ->> '{field}')",
)
params.update(filter_params)
# Use ANY operator for PostgreSQL array matching

reference_filter = (
'AND e.document_id = ANY(:reference_ids)' if processed_reference_ids else ''
'AND id = ANY(:reference_ids)' if processed_reference_ids else ''
)

sql_query = f"""
WITH ranked_embeddings AS (
WITH hnsw_candidates AS (
SELECT
e.id AS embedding_id,
e.chunk_text,
e.chunk_index,
d.id AS document_id,
d.file_path,
d.file_name,
d.knowledge_base_id,
d.metadata_value,
(1 - (e.embedding_vector_1 <=> :query_embedding ::vector)) AS similarity
FROM {KnowledgeBaseEmbeddings.__tablename__} e
JOIN {KnowledgeBaseDocuments.__tablename__} d ON e.document_id = d.id
id,
document_id,
chunk_text,
chunk_index,
(embedding_vector_1::vector(1024)) <=> :query_embedding ::vector(1024) AS distance
FROM
{KnowledgeBaseEmbeddings.__tablename__}
WHERE
d.knowledge_base_id = :kb_id {reference_filter} {'AND (' + metadata_filter_clause_final + ')' if metadata_filter_clause_final else ''}
ORDER BY similarity DESC
document_id IN (
SELECT id FROM {KnowledgeBaseDocuments.__tablename__}
WHERE knowledge_base_id = :kb_id {reference_filter}
{'AND (' + metadata_filter_clause_subquery + ')' if metadata_filter_clause_subquery else ''}
)
ORDER BY
(embedding_vector_1::vector(1024)) <=> :query_embedding ::vector(1024)
LIMIT :limit * 10
)
SELECT
*
FROM
ranked_embeddings
hc.id AS embedding_id,
hc.chunk_text,
hc.chunk_index,
d.id AS document_id,
d.file_path,
d.file_name,
d.knowledge_base_id,
d.metadata_value,
1 - hc.distance AS similarity
FROM hnsw_candidates hc
JOIN {KnowledgeBaseDocuments.__tablename__} d ON hc.document_id = d.id
ORDER BY similarity DESC
LIMIT :limit OFFSET :offset
"""

Expand Down Expand Up @@ -346,4 +390,4 @@ def get_update_tokens_query() -> str:
Returns:
SQL query string
"""
return "UPDATE knowledge_base_embeddings SET token = to_tsvector('english', chunk_text)"
return "UPDATE knowledge_base_embeddings SET token = to_tsvector('english', chunk_text) WHERE token IS NULL"
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ def __init__(
KnowledgeBaseEmbeddings
],
):
self.reranked_image = []
self.query_generator = QueryGenerator()
self.knowledge_base_embeddings_repository = knowledge_base_embeddings_repository

Expand Down Expand Up @@ -42,22 +41,24 @@ async def retrieve_images(
response = await client.post(internal_api_url, json=data)
embedding = response.json().get('data', {}).get('response', [])

if embedding:
self.reranked_image = await self.image_retrieve(
embedding[0]['clip'], kb_id, threshold, top_k, query_filter
clip_embedding = next((e['clip'] for e in embedding if 'clip' in e), None)
dino_embedding = next((e['dino'] for e in embedding if 'dino' in e), None)

if clip_embedding and dino_embedding:
clip_results = await self.image_retrieve(
clip_embedding, kb_id, threshold, top_k, query_filter
)
reference_id_list = [
str(data['document_id']) for data in self.reranked_image
]
self.reranked_image = await self.image_retrieve_dino(
embedding[1]['dino'],
if not clip_results:
return []
reference_id_list = [str(data['document_id']) for data in clip_results]
return await self.image_retrieve_dino(
dino_embedding,
kb_id,
reference_id_list,
query_filter,
offset,
limit,
)
return self.reranked_image
else:
return []

Expand Down
Loading