From 095c7527c279fa21a4d6672360d4ed8a80e54a31 Mon Sep 17 00:00:00 2001 From: hugoozhang Date: Wed, 24 Jun 2026 14:59:30 +0800 Subject: [PATCH] perf(postgres): optimize PgMemoryStore for throughput and query efficiency Six targeted performance improvements to the PostgreSQL memory store: 1. Batch transaction writes (upsertL1Batch / upsertL0Batch): - Wrap multi-record inserts in a single BEGIN/COMMIT transaction - Eliminates per-row connection acquisition and network round-trips - Falls back to single-record path when batch size is 1 2. Hybrid search SQL rewrite (searchL1HybridNative): - Carry all columns through CTEs, eliminating the final JOIN back to the main table (saves a full table scan on large datasets) - Add WHERE content <@> to_bm25query(...) > 0 to exclude zero-relevance BM25 results from RRF fusion - Use named prepared statement for server-side plan caching 3. Named prepared statements for high-frequency searches: - search_l1_vector_v1, search_l1_fts_v1 - search_l0_vector_v1, search_l0_fts_v1 - l1_hybrid_native_v2 Avoids repeated parse+plan overhead on every call. 4. Concurrent reindexing (reindexAll): - Replace serial embed+update loop with paginated reads - Bounded-parallelism embedding (CONCURRENCY=4, Promise.allSettled) - Batched UPDATE via unnest() arrays instead of per-row UPDATE 5. VACUUM ANALYZE after bulk deletes (deleteL1Expired / deleteL0Expired): - Fire-and-forget VACUUM ANALYZE when >50 rows are deleted - Prevents DiskANN/HNSW index bloat from dead tuples 6. BM25 query normalization improvement: - Add Chinese stopword filtering (60+ high-frequency function words) - Filter single-character CJK tokens (too generic for BM25 precision) - Cap token count at MAX_BM25_TOKENS=8 to maintain query precision --- src/core/store/postgres.ts | 353 +++++++++++++++++++++++++++++-------- 1 file changed, 276 insertions(+), 77 deletions(-) diff --git a/src/core/store/postgres.ts b/src/core/store/postgres.ts index 1442e1f..8b9253c 100644 --- a/src/core/store/postgres.ts +++ b/src/core/store/postgres.ts @@ -170,11 +170,55 @@ export class PgMemoryStore implements IMemoryStore { } async upsertL1Batch(records: MemoryRecord[]): Promise { - let ok = 0; - for (const record of records) { - if (await this.upsertL1(record)) ok++; + if (this.degraded || records.length === 0) return 0; + if (records.length === 1) return (await this.upsertL1(records[0])) ? 1 : 0; + + const client = await this.pool.connect(); + try { + await client.query("BEGIN"); + let ok = 0; + for (const record of records) { + const tsStr = record.timestamps[0] ?? ""; + const tsStart = record.timestamps.length > 0 ? record.timestamps.reduce((a, b) => (a < b ? a : b)) : tsStr; + const tsEnd = record.timestamps.length > 0 ? record.timestamps.reduce((a, b) => (a > b ? a : b)) : tsStr; + const vec = this.prepareVector(undefined); + await client.query( + `INSERT INTO ${this.table(POSTGRES_TABLES.l1)} ( + record_id, content, type, priority, scene_name, session_key, session_id, + timestamp_str, timestamp_start, timestamp_end, created_time, updated_time, + metadata_json, embedding + ) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13::jsonb,$14::vector) + ON CONFLICT (record_id) DO UPDATE SET + content = EXCLUDED.content, + type = EXCLUDED.type, + priority = EXCLUDED.priority, + scene_name = EXCLUDED.scene_name, + session_key = EXCLUDED.session_key, + session_id = EXCLUDED.session_id, + timestamp_str = EXCLUDED.timestamp_str, + timestamp_start = EXCLUDED.timestamp_start, + timestamp_end = EXCLUDED.timestamp_end, + updated_time = EXCLUDED.updated_time, + metadata_json = EXCLUDED.metadata_json, + embedding = COALESCE(EXCLUDED.embedding, ${this.table(POSTGRES_TABLES.l1)}.embedding)`, + [ + record.id, record.content, record.type, record.priority, + record.scene_name, record.sessionKey, record.sessionId, + tsStr, tsStart, tsEnd, record.createdAt, record.updatedAt, + JSON.stringify(record.metadata ?? {}), vec, + ], + ); + ok++; + } + await client.query("COMMIT"); + return ok; + } catch (err) { + await client.query("ROLLBACK").catch(() => undefined); + this.logger?.warn?.(`${TAG} [L1-upsert-batch] FAILED (${records.length} records): ${errorMessage(err)}`); + return 0; + } finally { + client.release(); } - return ok; } async deleteL1(recordId: string): Promise { @@ -255,13 +299,17 @@ export class PgMemoryStore implements IMemoryStore { if (this.degraded || !this.vectorAvailable || !this.isValidQueryVector(queryEmbedding)) return []; try { const limit = topK + ZERO_VEC_BUFFER; - const rows = await this.pool.query(` - SELECT ${this.l1SearchColumns()}, 1 - (embedding <=> $1::vector) AS score - FROM ${this.table(POSTGRES_TABLES.l1)} - WHERE embedding IS NOT NULL AND vector_dims(embedding) = $2 - ORDER BY embedding <=> $1::vector - LIMIT $3 - `, [vectorToSql(queryEmbedding), this.dimensions, limit]); + const rows = await this.pool.query({ + name: "search_l1_vector_v1", + text: ` + SELECT ${this.l1SearchColumns()}, 1 - (embedding <=> $1::vector) AS score + FROM ${this.table(POSTGRES_TABLES.l1)} + WHERE embedding IS NOT NULL AND vector_dims(embedding) = $2 + ORDER BY embedding <=> $1::vector + LIMIT $3 + `, + values: [vectorToSql(queryEmbedding), this.dimensions, limit], + }); return rows.rows.slice(0, topK).map(normalizeL1Score); } catch (err) { this.logger?.warn?.(`${TAG} [L1-vector] FAILED: ${errorMessage(err)}`); @@ -274,12 +322,17 @@ export class PgMemoryStore implements IMemoryStore { const query = normalizeBm25Query(ftsQuery); if (!query) return []; try { - const rows = await this.pool.query(` - SELECT ${this.l1SearchColumns()}, content <@> to_bm25query($1, $3) AS rank - FROM ${this.table(POSTGRES_TABLES.l1)} - ORDER BY content <@> to_bm25query($1, $3) - LIMIT $2 - `, [query, limit, postgresIndexName(this.config.schema, "l1_content_bm25_idx")]); + const rows = await this.pool.query({ + name: "search_l1_fts_v1", + text: ` + SELECT ${this.l1SearchColumns()}, content <@> to_bm25query($1, $3) AS rank + FROM ${this.table(POSTGRES_TABLES.l1)} + WHERE content <@> to_bm25query($1, $3) > 0 + ORDER BY content <@> to_bm25query($1, $3) + LIMIT $2 + `, + values: [query, limit, postgresIndexName(this.config.schema, "l1_content_bm25_idx")], + }); return rows.rows.map((row) => ({ ...row, score: bm25RankToScore(Number(row.rank)) })); } catch (err) { this.logger?.warn?.(`${TAG} [L1-fts] FAILED: ${errorMessage(err)}`); @@ -323,6 +376,11 @@ export class PgMemoryStore implements IMemoryStore { /** * Single-SQL hybrid search: BM25 + vector with Reciprocal Rank Fusion (RRF). + * + * Optimizations over naive approach: + * - BM25 CTE filters score > 0 to exclude irrelevant results from fusion + * - Both CTEs carry full row data, eliminating the final JOIN back to the main table + * - COALESCE on FULL OUTER JOIN produces correct RRF even when a record only appears in one path */ private async searchL1HybridNative( ftsQuery: string, @@ -334,19 +392,24 @@ export class PgMemoryStore implements IMemoryStore { const dims = this.dimensions; const idxName = postgresIndexName(this.config.schema, "l1_content_bm25_idx"); const tbl = this.table(POSTGRES_TABLES.l1); - const columns = this.l1SearchColumns("b"); + const cols = `record_id, content, type, priority, scene_name, + timestamp_str, timestamp_start, timestamp_end, session_key, session_id, + metadata_json::text AS metadata_json`; try { - const rows = await this.pool.query(` + const rows = await this.pool.query({ + name: "l1_hybrid_native_v2", + text: ` WITH fts AS ( - SELECT record_id, + SELECT ${cols}, row_number() OVER (ORDER BY content <@> to_bm25query($1, $2)) AS rnk FROM ${tbl} + WHERE content <@> to_bm25query($1, $2) > 0 ORDER BY content <@> to_bm25query($1, $2) LIMIT $3 ), vec AS ( - SELECT record_id, + SELECT ${cols}, row_number() OVER (ORDER BY embedding <=> $4::vector) AS rnk FROM ${tbl} WHERE embedding IS NOT NULL AND vector_dims(embedding) = $5 @@ -354,18 +417,30 @@ export class PgMemoryStore implements IMemoryStore { LIMIT $3 ), fused AS ( - SELECT record_id, + SELECT COALESCE(fts.record_id, vec.record_id) AS record_id, + COALESCE(fts.content, vec.content) AS content, + COALESCE(fts.type, vec.type) AS type, + COALESCE(fts.priority, vec.priority) AS priority, + COALESCE(fts.scene_name, vec.scene_name) AS scene_name, + COALESCE(fts.timestamp_str, vec.timestamp_str) AS timestamp_str, + COALESCE(fts.timestamp_start, vec.timestamp_start) AS timestamp_start, + COALESCE(fts.timestamp_end, vec.timestamp_end) AS timestamp_end, + COALESCE(fts.session_key, vec.session_key) AS session_key, + COALESCE(fts.session_id, vec.session_id) AS session_id, + COALESCE(fts.metadata_json, vec.metadata_json) AS metadata_json, COALESCE(1.0 / (60 + fts.rnk), 0) + COALESCE(1.0 / (60 + vec.rnk), 0) AS score FROM fts - FULL OUTER JOIN vec USING (record_id) + FULL OUTER JOIN vec ON fts.record_id = vec.record_id ) - SELECT ${columns}, - fused.score AS score + SELECT record_id, content, type, priority, scene_name, + timestamp_str, timestamp_start, timestamp_end, session_key, session_id, + metadata_json, score FROM fused - JOIN ${tbl} b ON b.record_id = fused.record_id ORDER BY score DESC LIMIT $6 - `, [ftsQuery, idxName, candidateK, vectorLiteral, dims, topK]); + `, + values: [ftsQuery, idxName, candidateK, vectorLiteral, dims, topK], + }); return rows.rows.map(normalizeL1Score); } catch (err) { this.logger?.warn?.(`${TAG} [L1-hybrid-native] FAILED: ${errorMessage(err)}`); @@ -399,11 +474,40 @@ export class PgMemoryStore implements IMemoryStore { } async upsertL0Batch(records: L0Record[]): Promise { - let ok = 0; - for (const record of records) { - if (await this.upsertL0(record)) ok++; + if (this.degraded || records.length === 0) return 0; + if (records.length === 1) return (await this.upsertL0(records[0])) ? 1 : 0; + + const client = await this.pool.connect(); + try { + await client.query("BEGIN"); + let ok = 0; + for (const record of records) { + const vec = this.prepareVector(undefined); + await client.query( + `INSERT INTO ${this.table(POSTGRES_TABLES.l0)} ( + record_id, session_key, session_id, role, message_text, recorded_at, timestamp, embedding + ) VALUES ($1,$2,$3,$4,$5,$6,$7,$8::vector) + ON CONFLICT (record_id) DO UPDATE SET + session_key = EXCLUDED.session_key, + session_id = EXCLUDED.session_id, + role = EXCLUDED.role, + message_text = EXCLUDED.message_text, + recorded_at = EXCLUDED.recorded_at, + timestamp = EXCLUDED.timestamp, + embedding = COALESCE(EXCLUDED.embedding, ${this.table(POSTGRES_TABLES.l0)}.embedding)`, + [record.id, record.sessionKey, record.sessionId, record.role, record.messageText, record.recordedAt, record.timestamp, vec], + ); + ok++; + } + await client.query("COMMIT"); + return ok; + } catch (err) { + await client.query("ROLLBACK").catch(() => undefined); + this.logger?.warn?.(`${TAG} [L0-upsert-batch] FAILED (${records.length} records): ${errorMessage(err)}`); + return 0; + } finally { + client.release(); } - return ok; } async updateL0Embedding(recordId: string, embedding: Float32Array): Promise { @@ -498,15 +602,19 @@ export class PgMemoryStore implements IMemoryStore { if (this.degraded || !this.vectorAvailable || !this.isValidQueryVector(queryEmbedding)) return []; try { const limit = topK + ZERO_VEC_BUFFER; - const rows = await this.pool.query(` - SELECT record_id, session_key, session_id, role, message_text, - 1 - (embedding <=> $1::vector) AS score, - recorded_at, timestamp::bigint AS timestamp - FROM ${this.table(POSTGRES_TABLES.l0)} - WHERE embedding IS NOT NULL AND vector_dims(embedding) = $2 - ORDER BY embedding <=> $1::vector - LIMIT $3 - `, [vectorToSql(queryEmbedding), this.dimensions, limit]); + const rows = await this.pool.query({ + name: "search_l0_vector_v1", + text: ` + SELECT record_id, session_key, session_id, role, message_text, + 1 - (embedding <=> $1::vector) AS score, + recorded_at, timestamp::bigint AS timestamp + FROM ${this.table(POSTGRES_TABLES.l0)} + WHERE embedding IS NOT NULL AND vector_dims(embedding) = $2 + ORDER BY embedding <=> $1::vector + LIMIT $3 + `, + values: [vectorToSql(queryEmbedding), this.dimensions, limit], + }); return rows.rows.slice(0, topK).map(normalizeL0SearchRow); } catch (err) { this.logger?.warn?.(`${TAG} [L0-vector] FAILED: ${errorMessage(err)}`); @@ -519,14 +627,19 @@ export class PgMemoryStore implements IMemoryStore { const query = normalizeBm25Query(ftsQuery); if (!query) return []; try { - const rows = await this.pool.query(` - SELECT record_id, session_key, session_id, role, message_text, - message_text <@> to_bm25query($1, $3) AS rank, - recorded_at, timestamp::bigint AS timestamp - FROM ${this.table(POSTGRES_TABLES.l0)} - ORDER BY message_text <@> to_bm25query($1, $3) - LIMIT $2 - `, [query, limit, postgresIndexName(this.config.schema, "l0_message_bm25_idx")]); + const rows = await this.pool.query({ + name: "search_l0_fts_v1", + text: ` + SELECT record_id, session_key, session_id, role, message_text, + message_text <@> to_bm25query($1, $3) AS rank, + recorded_at, timestamp::bigint AS timestamp + FROM ${this.table(POSTGRES_TABLES.l0)} + WHERE message_text <@> to_bm25query($1, $3) > 0 + ORDER BY message_text <@> to_bm25query($1, $3) + LIMIT $2 + `, + values: [query, limit, postgresIndexName(this.config.schema, "l0_message_bm25_idx")], + }); return rows.rows.map((row) => ({ ...normalizeL0SearchRow(row), score: bm25RankToScore(Number(row.rank)) })); } catch (err) { this.logger?.warn?.(`${TAG} [L0-fts] FAILED: ${errorMessage(err)}`); @@ -601,38 +714,89 @@ export class PgMemoryStore implements IMemoryStore { onProgress?: (done: number, total: number, layer: "L1" | "L0") => void, ): Promise<{ l1Count: number; l0Count: number }> { if (this.degraded || !this.vectorAvailable) return { l1Count: 0, l0Count: 0 }; - const l1Rows = await this.getAllL1Texts(); - let l1Done = 0; - for (const row of l1Rows) { - try { - const embedding = await embedFn(row.content); - const vec = this.prepareVector(embedding); - if (vec) { - await this.pool.query(`UPDATE ${this.table(POSTGRES_TABLES.l1)} SET embedding = $2::vector WHERE record_id = $1`, [row.record_id, vec]); + const BATCH_SIZE = 20; + const CONCURRENCY = 4; + + const l1Count = await this.reindexLayer( + POSTGRES_TABLES.l1, "content", "L1", embedFn, onProgress, BATCH_SIZE, CONCURRENCY, + ); + const l0Count = await this.reindexLayer( + POSTGRES_TABLES.l0, "message_text", "L0", embedFn, onProgress, BATCH_SIZE, CONCURRENCY, + ); + return { l1Count, l0Count }; + } + + /** + * Reindex a single layer with batched embedding + batched UPDATE. + * + * Improvements over the naive serial approach: + * - Paginated cursor read (avoids loading all rows into memory) + * - Concurrent embedding calls (up to CONCURRENCY in flight) + * - Batched UPDATE via unnest (single round-trip per batch) + */ + private async reindexLayer( + table: string, + textColumn: string, + label: "L1" | "L0", + embedFn: (text: string) => Promise, + onProgress?: (done: number, total: number, layer: "L1" | "L0") => void, + batchSize = 20, + concurrency = 4, + ): Promise { + const total = await this.countTable(table); + if (total === 0) return 0; + + let done = 0; + let offset = 0; + + while (true) { + const batch = await this.pool.query<{ record_id: string; text: string }>( + `SELECT record_id, ${quoteIdent(textColumn)} AS text FROM ${this.table(table)} ORDER BY record_id LIMIT $1 OFFSET $2`, + [batchSize, offset], + ); + if (batch.rows.length === 0) break; + + // Concurrent embedding with bounded parallelism + const embedResults: Array<{ id: string; vec: string | null }> = []; + for (let i = 0; i < batch.rows.length; i += concurrency) { + const chunk = batch.rows.slice(i, i + concurrency); + const results = await Promise.allSettled( + chunk.map(async (row) => { + const embedding = await embedFn(row.text); + return { id: row.record_id, vec: this.prepareVector(embedding) }; + }), + ); + for (const result of results) { + if (result.status === "fulfilled" && result.value.vec) { + embedResults.push(result.value); + } else if (result.status === "rejected") { + this.logger?.warn?.(`${TAG} [reindex-${label}] embed failed: ${errorMessage(result.reason)}`); + } } - } catch (err) { - this.logger?.warn?.(`${TAG} [reindex-L1] skip ${row.record_id}: ${errorMessage(err)}`); } - l1Done++; - onProgress?.(l1Done, l1Rows.length, "L1"); - } - const l0Rows = await this.getAllL0Texts(); - let l0Done = 0; - for (const row of l0Rows) { - try { - const embedding = await embedFn(row.message_text); - const vec = this.prepareVector(embedding); - if (vec) { - await this.pool.query(`UPDATE ${this.table(POSTGRES_TABLES.l0)} SET embedding = $2::vector WHERE record_id = $1`, [row.record_id, vec]); + // Batched UPDATE via unnest (single SQL for entire batch) + if (embedResults.length > 0) { + const ids = embedResults.map((r) => r.id); + const vecs = embedResults.map((r) => r.vec!); + try { + await this.pool.query( + `UPDATE ${this.table(table)} AS t + SET embedding = data.vec::vector + FROM (SELECT unnest($1::text[]) AS rid, unnest($2::text[]) AS vec) AS data + WHERE t.record_id = data.rid`, + [ids, vecs], + ); + } catch (err) { + this.logger?.warn?.(`${TAG} [reindex-${label}] batch update failed: ${errorMessage(err)}`); } - } catch (err) { - this.logger?.warn?.(`${TAG} [reindex-L0] skip ${row.record_id}: ${errorMessage(err)}`); } - l0Done++; - onProgress?.(l0Done, l0Rows.length, "L0"); + + done += batch.rows.length; + offset += batchSize; + onProgress?.(done, total, label); } - return { l1Count: l1Done, l0Count: l0Done }; + return done; } isFtsAvailable(): boolean { @@ -691,7 +855,18 @@ export class PgMemoryStore implements IMemoryStore { `DELETE FROM ${this.table(table)} WHERE ${quoteIdent(column)} != '' AND ${quoteIdent(column)} < $1`, [cutoffIso], ); - return result.rowCount ?? expiredCount; + const deleted = result.rowCount ?? expiredCount; + + // After significant deletions, hint autovacuum to reclaim dead tuples. + // This prevents index bloat (especially DiskANN/HNSW) from degrading + // search performance over time. Fire-and-forget; non-blocking. + if (deleted > 50) { + void this.pool.query(`VACUUM ANALYZE ${this.table(table)}`).catch((vacErr: unknown) => { + this.logger?.debug?.(`${TAG} [${label}-vacuum] non-critical: ${errorMessage(vacErr)}`); + }); + } + + return deleted; } catch (err) { this.logger?.warn?.(`${TAG} [${label}-delete-expired] FAILED: ${errorMessage(err)}`); return 0; @@ -748,11 +923,35 @@ function vectorToSql(embedding: Float32Array): string { return `[${Array.from(embedding, (value) => Number.isFinite(value) ? String(value) : "0").join(",")}]`; } +// Common Chinese stopwords that are too generic for BM25 precision +const ZH_STOPWORDS = new Set([ + "的", "了", "在", "是", "我", "有", "和", "就", "不", "人", + "都", "一", "一个", "上", "也", "很", "到", "说", "要", "去", + "你", "会", "着", "没有", "看", "好", "自己", "这", "他", "她", + "它", "那", "这个", "那个", "什么", "怎么", "如何", "可以", "能", + "吗", "吧", "呢", "啊", "哦", "把", "被", "让", "给", "从", + "但", "而", "或", "与", "及", "等", "之", "所", "以", "因", + "为", "对", "于", "用", "来", "做", "中", "大", "小", "多", +]); + +const MAX_BM25_TOKENS = 8; + function normalizeBm25Query(ftsQuery: string): string { const tokens = ftsQuery .match(/[\p{L}\p{N}_-]+/gu) - ?.filter((token) => !/^(OR|AND|NOT)$/i.test(token)) ?? []; - return [...new Set(tokens)].join(" "); + ?.filter((token) => { + // Remove BM25 logical operators + if (/^(OR|AND|NOT)$/i.test(token)) return false; + // Remove single-character CJK tokens (too generic) + if (token.length === 1 && /\p{Script=Han}/u.test(token)) return false; + // Remove common Chinese stopwords + if (ZH_STOPWORDS.has(token)) return false; + return true; + }) ?? []; + + // Deduplicate and cap at MAX_BM25_TOKENS to maintain precision + const unique = [...new Set(tokens)]; + return unique.slice(0, MAX_BM25_TOKENS).join(" "); } function bm25RankToScore(rank: number): number {