diff --git a/integrationTests/server/embed-directive.test.ts b/integrationTests/server/embed-directive.test.ts new file mode 100644 index 000000000..ad4236f32 --- /dev/null +++ b/integrationTests/server/embed-directive.test.ts @@ -0,0 +1,309 @@ +/** + * `@embed` directive integration test (#632 / Phase 5 of #510). + * + * Spins up a fake Ollama HTTP server inside the test, points Harper's models + * config at it, deploys a schema with `@embed`, and exercises three paths + * end-to-end: + * + * 1. **Happy path** — POST a record → fake-ollama returns a deterministic + * vector → record stores the vector at the `@embed`-decorated field. + * + * 2. **Source-unchanged PATCH** — PATCH a record with a non-source field → + * no new embed call is made (fake-ollama hit count stays flat); the + * existing embedding survives via patch-merge. + * + * 3. **Replication-receiver skip** — POST with `x-replicate-from: none` and + * a pre-supplied vector → no embed call is made; the supplied vector is + * stored as-is. (The REST receiver path; the cluster-subscribe path is + * covered by the `options.isNotification === true` branch in + * `embedHook.test.js`.) + * + * Setup notes: + * - The fake-ollama server returns deterministic 3-element Float32 vectors + * derived from the input text so assertions can compare exact bytes. + * - Harper boots with `models.embedding.default` pointing at the fake host. + * - Schema/component installs follow the same pattern as + * `integrationTests/apiTests/blob.test.mjs`. + */ +import { suite, test, before, after } from 'node:test'; +import { strictEqual, ok } from 'node:assert/strict'; +import { createServer, type IncomingMessage, type ServerResponse, type Server } from 'node:http'; +import type { AddressInfo } from 'node:net'; +import { startHarper, teardownHarper } from '@harperfast/integration-testing'; +// .mjs siblings — TypeScript needs `// @ts-expect-error` because no declaration files exist +// @ts-expect-error utils/client.mjs has no type declarations; runtime resolves fine +import { createApiClient } from '../apiTests/utils/client.mjs'; +// @ts-expect-error utils/lifecycle.mjs has no type declarations; runtime resolves fine +import { restartHttpWorkers } from '../apiTests/utils/lifecycle.mjs'; +import request from 'supertest'; + +const SCHEMA_GRAPHQL = [ + 'type EmbedDoc @table(database: "embedtest") @sealed @export {', + '\tid: ID! @primaryKey', + '\tcontent: String', + '\ttag: String', + '\tembedding: Vector @embed(source: "content", model: "default")', + '}', + '', +].join('\n'); + +/** + * REST GET returns Float32Array-typed columns as a `{type: "Buffer", data: number[]}` + * JSON shape (msgpack/Buffer round-trip). Decode back to Float32 components for + * equality assertions. + */ +function decodeVector(field: any): number[] | undefined { + if (field == null) return undefined; + if (Array.isArray(field)) return field.map(Number); + if (field instanceof Float32Array) return Array.from(field); + if (field && field.type === 'Buffer' && Array.isArray(field.data)) { + const bytes = Uint8Array.from(field.data); + const f32 = new Float32Array(bytes.buffer, bytes.byteOffset, Math.floor(bytes.byteLength / 4)); + return Array.from(f32); + } + if (field instanceof Uint8Array || field instanceof ArrayBuffer) { + const view = field instanceof ArrayBuffer ? new Uint8Array(field) : field; + const f32 = new Float32Array(view.buffer, view.byteOffset, Math.floor(view.byteLength / 4)); + return Array.from(f32); + } + return undefined; +} + +interface FakeOllama { + url: string; + host: string; // form for Harper's `host:` config field + close: () => Promise; + embedCallCount: () => number; + lastEmbedInputs: () => string[][]; + reset: () => void; +} + +/** + * Deterministic embedding function: maps an input string to a 3-element + * Float32Array. Different inputs produce different vectors; same input + * produces the same vector across calls. + */ +function deterministicVector(input: string): number[] { + let h1 = 0; + let h2 = 0; + let h3 = 0; + for (let i = 0; i < input.length; i++) { + const c = input.charCodeAt(i); + h1 = (h1 * 31 + c) % 9973; + h2 = (h2 * 37 + c) % 9967; + h3 = (h3 * 41 + c) % 9941; + } + // Normalize to (0, 1) range + return [h1 / 9973, h2 / 9967, h3 / 9941]; +} + +async function startFakeOllama(): Promise { + let embedCalls = 0; + const embedInputs: string[][] = []; + const server: Server = createServer((req: IncomingMessage, res: ServerResponse) => { + if (req.method === 'POST' && req.url === '/api/embed') { + let body = ''; + req.on('data', (chunk) => (body += chunk)); + req.on('end', () => { + try { + const parsed = JSON.parse(body) as { model: string; input: string | string[] }; + const inputs = Array.isArray(parsed.input) ? parsed.input : [parsed.input]; + embedCalls++; + embedInputs.push(inputs); + const embeddings = inputs.map((s) => deterministicVector(s)); + res.writeHead(200, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ embeddings, prompt_eval_count: inputs.join(' ').length })); + } catch (err) { + res.writeHead(400, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: String(err) })); + } + }); + return; + } + res.writeHead(404); + res.end(); + }); + await new Promise((resolve) => server.listen(0, '127.0.0.1', resolve)); + const addr = server.address() as AddressInfo; + const url = `http://127.0.0.1:${addr.port}`; + const host = `127.0.0.1:${addr.port}`; + return { + url, + host, + close: () => new Promise((resolve, reject) => server.close((err) => (err ? reject(err) : resolve()))), + embedCallCount: () => embedCalls, + lastEmbedInputs: () => embedInputs, + reset: () => { + embedCalls = 0; + embedInputs.length = 0; + }, + }; +} + +suite('@embed directive end-to-end with fake Ollama', (ctx: any) => { + let fake: FakeOllama; + let client: any; + + before(async () => { + fake = await startFakeOllama(); + // Local debugging: setting HARPER_INTEGRATION_TEST_FORCE_LOOPBACK forces the + // 127.0.0.1 fast-path so the test runs without `harper-integration-test-setup-loopback` + // (macOS dev machines don't have the pool by default). CI uses the pool normally. + if (process.env.HARPER_INTEGRATION_TEST_FORCE_LOOPBACK) { + ctx.harper = { ...ctx.harper, hostname: '127.0.0.1' }; + } + await startHarper(ctx, { + config: { + logging: { auditLog: true }, + models: { + embedding: { + default: { backend: 'ollama', host: fake.host, model: 'fake-embed' }, + }, + }, + }, + env: {}, + }); + client = createApiClient(ctx.harper); + + await client + .req() + .send({ operation: 'add_component', project: 'embedtest' }) + .expect((r: any) => { + const text = JSON.stringify(r.body); + ok(text.includes('Successfully added project') || text.includes('Project already exists'), r.text); + }); + + await client + .req() + .send({ operation: 'set_component_file', project: 'embedtest', file: 'schema.graphql', payload: SCHEMA_GRAPHQL }) + .expect((r: any) => ok(r.body?.message?.includes?.('Successfully set component: schema.graphql'), r.text)) + .expect(200); + + await restartHttpWorkers(client, '/openapi'); + fake.reset(); + }); + + after(async () => { + try { + await teardownHarper(ctx); + } finally { + await fake.close(); + } + }); + + test('schema with @embed creates EmbedDoc table', async () => { + const desc = await client.req().send({ operation: 'describe_all' }).expect(200); + const embedDoc = desc.body?.embedtest?.EmbedDoc; + ok(embedDoc, 'EmbedDoc table not created'); + const embeddingAttr = (embedDoc.attributes || []).find((a: any) => a.attribute === 'embedding'); + ok(embeddingAttr, 'embedding attribute should be present'); + strictEqual(embeddingAttr.type, 'Vector', 'embedding type should be Vector'); + strictEqual(embeddingAttr.indexed?.type, 'HNSW', 'embedding should be auto-HNSW-indexed'); + }); + + test('happy path: POST → embedder runs → vector stored on record', async () => { + fake.reset(); + const content = 'harper is a database'; + const expected = deterministicVector(content); + + await request(ctx.harper.httpURL) + .post('/EmbedDoc/') + .set(client.headers) + .send({ id: 'doc-happy', content }) + .expect((r: any) => ok([200, 201, 204].includes(r.status), `unexpected status ${r.status}: ${r.text}`)); + + // Verify the fake-ollama received exactly one embed call with the source text + strictEqual(fake.embedCallCount(), 1, 'expected exactly one embed call'); + const inputs = fake.lastEmbedInputs()[0]; + strictEqual(inputs.length, 1); + ok(inputs[0].includes(content), `embed input "${inputs[0]}" should contain source text`); + + // GET the record back and verify the embedding was stored + const getResp = await client.reqRest('/EmbedDoc/doc-happy').expect(200); + const body = getResp.body as { id: string; content: string; embedding: unknown }; + strictEqual(body.id, 'doc-happy'); + strictEqual(body.content, content); + const stored = decodeVector(body.embedding); + ok(stored, `embedding field should be populated, got: ${JSON.stringify(body.embedding)}`); + strictEqual(stored.length, 3, 'expected 3-element vector'); + for (let i = 0; i < 3; i++) { + ok( + Math.abs(stored[i] - expected[i]) < 1e-5, + `vector[${i}] mismatch: stored=${stored[i]} expected=${expected[i]}` + ); + } + }); + + test('PATCH unrelated field does NOT re-run embedder; existing embedding survives', async () => { + // Seed a record first + const content = 'patch baseline content'; + await request(ctx.harper.httpURL) + .post('/EmbedDoc/') + .set(client.headers) + .send({ id: 'doc-patch', content }) + .expect((r: any) => ok([200, 201, 204].includes(r.status), `seed POST status ${r.status}: ${r.text}`)); + + const baselineEmbedCalls = fake.embedCallCount(); + + // PATCH a non-source field. The embed hook's source-presence predicate should skip; + // no new embed call should fire and the existing vector should remain unchanged. + await request(ctx.harper.httpURL) + .patch('/EmbedDoc/doc-patch') + .set(client.headers) + .send({ tag: 'updated' }) + .expect((r: any) => ok([200, 204].includes(r.status), `PATCH status ${r.status}: ${r.text}`)); + + strictEqual( + fake.embedCallCount(), + baselineEmbedCalls, + 'embed should not fire when the source field is not in the PATCH payload' + ); + + // Verify the embedding is still the one from the original content + const expected = deterministicVector(content); + const getResp = await client.reqRest('/EmbedDoc/doc-patch').expect(200); + const body = getResp.body as { tag: string; embedding: unknown }; + strictEqual(body.tag, 'updated'); + const stored = decodeVector(body.embedding); + ok(stored, `embedding should still be populated after non-source PATCH, got: ${JSON.stringify(body.embedding)}`); + strictEqual(stored.length, 3); + for (let i = 0; i < 3; i++) { + ok(Math.abs(stored[i] - expected[i]) < 1e-5, 'existing embedding should survive non-source PATCH'); + } + }); + + test('replication-receiver: POST with x-replicate-from:none + supplied vector → embedder skipped', async () => { + const content = 'replicated record content'; + const suppliedVector = [0.111, 0.222, 0.333]; + const baselineEmbedCalls = fake.embedCallCount(); + + await request(ctx.harper.httpURL) + .post('/EmbedDoc/') + .set({ ...client.headers, 'x-replicate-from': 'none' }) + .send({ id: 'doc-replica', content, embedding: suppliedVector }) + .expect((r: any) => ok([200, 201, 204].includes(r.status), `replica POST status ${r.status}: ${r.text}`)); + + strictEqual( + fake.embedCallCount(), + baselineEmbedCalls, + 'embed should NOT fire on a write with x-replicate-from: none (receiver context)' + ); + + const getResp = await client.reqRest('/EmbedDoc/doc-replica').expect(200); + const body = getResp.body as { id: string; content: string; embedding: unknown }; + strictEqual(body.id, 'doc-replica'); + strictEqual(body.content, content); + const stored = decodeVector(body.embedding); + ok(stored, `embedding should be populated from supplied vector, got: ${JSON.stringify(body.embedding)}`); + strictEqual(stored.length, 3); + // The receiver must preserve the originator's vector — NOT overwrite with what + // it would have computed locally. Compare against suppliedVector, not against + // deterministicVector(content). + for (let i = 0; i < 3; i++) { + ok( + Math.abs(stored[i] - suppliedVector[i]) < 1e-5, + `receiver stored ${stored[i]} but should be the originator's ${suppliedVector[i]}` + ); + } + }); +}); diff --git a/resources/Table.ts b/resources/Table.ts index 3ec040d43..b13de7d5b 100644 --- a/resources/Table.ts +++ b/resources/Table.ts @@ -47,6 +47,7 @@ import { transaction, contextStorage } from './transaction.ts'; import { MAXIMUM_KEY, writeKey, compareKeys } from 'ordered-binary'; import { getWorkerIndex, getWorkerCount } from '../server/threads/manageThreads.js'; import { HAS_BLOBS, auditRetention, removeAuditEntry } from './auditStore.ts'; +import { buildEmbedBefore, createDefaultEmbedder, type EmbedAttribute, type Embedder } from './models/embedHook.ts'; import { autoCast, autoCastBooleanStrict } from '../utility/common_utils.ts'; import { recordUpdater, @@ -85,6 +86,8 @@ export type Attribute = { computed?: any; resolve?: any; computedFromExpression?: any; + embed?: { source: string; model: string }; + version?: any; properties?: Array; elements?: Attribute; sealed?: boolean; @@ -256,6 +259,17 @@ export function makeTable(options) { static updatedTimeProperty = updatedTimeProperty; static propertyResolvers; static userResolvers = {}; + // `@embed` write-time hook registry (Phase 5 of #510). `userEmbedders` is the per- + // attribute embedder map populated by `setEmbedAttribute` — defaulted at schema + // load from the directive's `(source, model)` and overridable by component authors. + // `userSetEmbedders` tracks which names were explicitly set by `setEmbedAttribute` + // vs. auto-populated as defaults — on a schema reload with a changed `model:`, we + // must refresh defaults while preserving user-author overrides. `embedAttributes` + // is the filtered list scanned on every write so we don't pay an + // `attributes.filter(...)` per put/patch. + static userEmbedders: { [name: string]: Embedder } = {}; + static userSetEmbedders: Set = new Set(); + static embedAttributes: EmbedAttribute[] = (attributes as any[]).filter((a) => a?.embed); static source?: typeof TableResource; declare static sourceOptions: any; declare static intermediateSource: boolean; @@ -1234,14 +1248,16 @@ export function makeTable(options) { } return when(loading, () => { this.#changes = updates; - this._writeUpdate(id, this.#changes, false); - return this; + // Thread `_writeUpdate`'s return through `when()` so the embed-hook + // promise (when `@embed` is active) is awaited before this method + // resolves — otherwise the caller proceeds to `save()` with the + // write not yet registered on the txn. + return when(this._writeUpdate(id, this.#changes, false), () => this); }); }); } } - this._writeUpdate(id, this.#changes, fullUpdate); - return this; + return when(this._writeUpdate(id, this.#changes, fullUpdate), () => this); } /** @@ -1529,14 +1545,16 @@ export function makeTable(options) { return Promise.all( record.map((element) => { const id = element[primaryKey]; - this._writeUpdate(id, element, true); - return this.save() as any; + // `_writeUpdate` may return a promise when an `@embed` directive + // requires running an embedder before the per-write `commit(...)` + // closure. Threading via `when()` so synchronous-return callers + // (no embed) and async-return callers (embed pending) both work. + return when(this._writeUpdate(id, element, true), () => this.save() as any); }) ) as any; } else { const id = requestTargetToId(target as any); - this._writeUpdate(id, record, true); - return this.save() as any; + return when(this._writeUpdate(id, record, true), () => this.save() as any); } }) as any; } @@ -1575,8 +1593,10 @@ export function makeTable(options) { throw new ClientError('Record already exists', 409); } } - this._writeUpdate(id, record, true); - return record; + // `_writeUpdate` may return a promise when an `@embed` directive + // requires running an embedder before the per-write `commit(...)` + // closure. `when()` passes through synchronous returns. + return when(this._writeUpdate(id, record, true), () => record); }) as any; } @@ -1603,7 +1623,6 @@ export function makeTable(options) { _writeUpdate(id: Id, recordUpdate: any, fullUpdate: boolean, options?: any) { const context = this.getContext(); const transaction = txnForContext(context); - checkValidId(id); const entry = this.#entry ?? primaryStore.getEntry(id, { transaction: transaction.getReadTxn() }); const writeToSource = () => { @@ -1952,8 +1971,28 @@ export function makeTable(options) { }, }; this.#savingOperation = write; - write.beforeIntermediate = preCommitBlobsForRecordBefore(write, recordUpdate); - return transaction.addWrite(write as any); + // `@embed` write-time hook (Phase 5 of #510). Must run BEFORE `addWrite` + // so the embedder's mutation of `recordUpdate` reaches the per-write + // `commit(...)` closure. The transaction's pre-commit `before` slot + // fires AFTER `commit(...)` for record mutations (the slot is awaited + // at `Promise.all(completions)` at txn-commit time, not before each + // write's commit closure) — which is fine for blob byte-writes that + // reference pre-allocated IDs, but not for embed where the vector + // itself must be on the record at commit time. Skipped on replicated + // writes — the originating node already computed the embedding and + // the receiver should preserve it. + const embedBefore = buildEmbedBefore( + recordUpdate, + context, + options, + TableResource.embedAttributes, + TableResource.userEmbedders + ); + const proceed = (): any => { + write.beforeIntermediate = preCommitBlobsForRecordBefore(write, recordUpdate); + return transaction.addWrite(write as any); + }; + return embedBefore ? embedBefore().then(proceed) : proceed(); } async delete(target: RequestTargetOrId): Promise { @@ -3368,6 +3407,13 @@ export function makeTable(options) { * When attributes have been changed, we update the accessors that are assigned to this table */ static updatedAttributes() { + // `@embed` attributes are tracked by a separate filtered list to avoid + // re-scanning every attribute on every write. The list must be refreshed + // here because `databases.ts` does an in-place `Table.attributes.splice(...)` + // on schema reload and then calls `Table.updatedAttributes()` — the + // initializer at class-construction would otherwise hold a stale snapshot + // from the first schema deployment. + this.embedAttributes = (this.attributes as any[]).filter((a) => a?.embed); propertyResolvers = this.propertyResolvers = { $id: (object, context, entry) => ({ value: entry.key }), $updatedtime: (object, context, entry) => entry.version, @@ -3383,6 +3429,19 @@ export function makeTable(options) { attribute.resolve = null; // reset this const relationship = attribute.relationship; const computed = attribute.computed; + // `@embed` directive: register the default embedder. This is one-time + // setup (populating userEmbedders), not a runtime resolver, so it lives + // outside the if/else-if chain below — `@embed` fields ALSO get + // auto-HNSW indexing, and the chain's `customIndex.propertyResolver` + // branch needs to fire for them. Component authors override the default + // via `Table.setEmbedAttribute(name, customEmbedder)` after schema load. + // `userSetEmbedders` tracks the override names so we can refresh defaults + // on an in-place `Table.attributes.splice()` schema reload at + // `databases.ts:940` (which calls `updatedAttributes()` again on the + // existing class) without clobbering an author's custom embedder. + if (attribute.embed && !TableResource.userSetEmbedders.has(attribute.name)) { + this.userEmbedders[attribute.name] = createDefaultEmbedder(attribute.embed); + } if (relationship) { if (attribute.indexed) { console.error( @@ -3566,6 +3625,28 @@ export function makeTable(options) { } this.userResolvers[attribute_name] = resolver; } + /** + * Override the embedder that fires on writes to an `@embed`-decorated + * attribute (Phase 5 of #510). The default embedder calls + * `Models.embed(record[source], { model, inputType: 'document' })`. Pass a + * custom function when you need multi-field concatenation, custom + * preprocessing, or a different inputType — the embedder receives the + * full record (post-merge with the existing entry for PATCH) and returns + * the vector to store at `attribute_name`. + */ + static setEmbedAttribute(attribute_name: string, embedder: Embedder): void { + const attribute = findAttribute(attributes, attribute_name); + if (!attribute) { + console.error(`The attribute "${attribute_name}" does not exist in the table "${tableName}"`); + return; + } + if (!attribute.embed) { + console.error(`The attribute "${attribute_name}" is not declared with @embed in the table "${tableName}"`); + return; + } + this.userEmbedders[attribute_name] = embedder; + this.userSetEmbedders.add(attribute_name); + } static async deleteHistory(endTime = 0, cleanupDeletedRecords = false) { let completion: Promise; for (const auditRecord of auditStore.getRange({ @@ -4427,6 +4508,23 @@ export function makeTable(options) { } }, }; + // `@embed` write-time hook on the cache-from-source path. The + // `getFromSource` write builds its own write op and addWrite-s it + // directly here — bypassing `_writeUpdate` — so we have to wire the + // embedder here too. Caching tables that declare `@embed` are a + // canonical use case (see #750): the source resource returns the + // record's content fields, and this table derives the vector. + // Receivers do not run the embedder; this path is on the originating + // node by definition (no `replicateFrom` / `alreadyLogged` / cluster- + // subscribe context here). + const embedBefore = buildEmbedBefore( + updatedRecord, + sourceContext, + undefined, + TableResource.embedAttributes, + TableResource.userEmbedders + ); + if (embedBefore) await embedBefore(); sourceWrite.before = preCommitBlobsForRecordBefore(sourceWrite, updatedRecord); dbTxn.addWrite(sourceWrite); }), @@ -4740,6 +4838,20 @@ export function coerceType(value: any, attribute: any): any { return date; } return new Date(+value); // epoch ms number + case 'Vector': + // Coerce to plain `Array`. Typed arrays are stored unchanged by + // `updateAndFreeze` only if we hand them off as plain arrays — handing a + // `Float32Array` through gets enumerated as `{0,1,2,...}` map keys and + // loses the typed-array shape on read. HNSW's `propertyResolver` and + // `customIndex.index` both accept `number[]`. (Open Q in PR review: + // should Vector preserve Float64 / Float16 precision? Today everything + // flattens to `number[]` and is re-narrowed when the HNSW index needs + // Float32 for distance computation.) + if (value === null || value === 'null') return null; + if (value instanceof Float32Array) return Array.from(value); + if (Array.isArray(value)) return value.map(Number); + if (ArrayBuffer.isView(value)) return Array.from(value as any); + throw new SyntaxError(); case undefined: case 'Any': return autoCast(value); diff --git a/resources/graphql.ts b/resources/graphql.ts index f70672838..ebd1dca0a 100644 --- a/resources/graphql.ts +++ b/resources/graphql.ts @@ -6,7 +6,20 @@ import { Resources } from './Resources.ts'; import type { NamedTypeNode, StringValueNode } from 'graphql'; import { once } from 'node:events'; -const PRIMITIVE_TYPES = ['ID', 'Int', 'Float', 'Long', 'String', 'Boolean', 'Date', 'Bytes', 'Any', 'BigInt', 'Blob']; +const PRIMITIVE_TYPES = [ + 'ID', + 'Int', + 'Float', + 'Long', + 'String', + 'Boolean', + 'Date', + 'Bytes', + 'Any', + 'BigInt', + 'Blob', + 'Vector', +]; if (!server.knownGraphQLDirectives) { server.knownGraphQLDirectives = []; @@ -18,6 +31,7 @@ server.knownGraphQLDirectives.push( 'primaryKey', 'indexed', 'computed', + 'embed', 'relationship', 'createdTime', 'updatedTime', @@ -143,6 +157,57 @@ async function processGraphQLSchema(gqlContent, urlPath, filePath, resources) { } } property.computed = property.computed || true; + } else if (directiveName === 'embed') { + // `@embed(source: "", model: "")`: schema-level RAG. + // On write, the embedder reads `record[source]` and writes a vector to this + // attribute (Phase 5 of #510). Auto-attaches HNSW indexing on the same field + // when no explicit `@indexed` is present, and tracks the model name as the + // attribute version — a model change between deploys triggers reindex the + // same way a `@computed` expression change does (see version-driven reindex + // below). + const embedDefinition: { source?: string; model?: string } = {}; + for (const arg of directive.arguments || []) { + // Only accept string-literal values; reject variables / non-string nodes + // so a typoed schema fails loudly instead of silently producing undefined. + if (arg.value.kind !== 'StringValue') { + console.error( + `@embed(${arg.name.value}: ...) on "${property.name}" expects a string literal, at`, + arg.loc + ); + continue; + } + embedDefinition[arg.name.value] = (arg.value as StringValueNode).value; + } + if (!embedDefinition.source || !embedDefinition.model) { + // Missing required args would silently degrade the embedder to a + // no-op at write time, leaving the vector column empty with no + // operator signal. Halt schema processing so the component install + // surfaces the error instead of returning 200 OK on a broken + // schema. Other directive validations in this parser (e.g. dup + // primary key at line ~119) only log and continue — the difference + // here is that the failure mode is "looks-correct schema that + // never embeds anything," which is harder to diagnose than a + // missing primary key. + const loc = directive.loc; + console.error(`@embed on "${property.name}" requires both "source" and "model" arguments, at`, loc); + throw new Error( + `@embed on "${property.name}" requires both "source" and "model" arguments` + + (loc ? ` (line ${loc.startToken?.line ?? '?'}, column ${loc.startToken?.column ?? '?'})` : '') + ); + } else { + property.embed = embedDefinition; + // Use the model name as the version so changing `model:` in the schema + // triggers reindex/re-embed, matching `@computed`'s version semantics. + if (property.version == undefined) { + property.version = `embed:${embedDefinition.model}`; + } + // Auto-attach HNSW indexing if no explicit `@indexed` is present on this + // field. The issue's "indexed via HNSW. No app code needed." statement + // only holds if `@embed` alone wires the index. + if (!property.indexed) { + property.indexed = { type: 'HNSW' }; + } + } } else if (directiveName === 'relationship') { const relationshipDefinition = {}; for (const arg of directive.arguments) { diff --git a/resources/models/embedHook.ts b/resources/models/embedHook.ts new file mode 100644 index 000000000..35ac7821f --- /dev/null +++ b/resources/models/embedHook.ts @@ -0,0 +1,226 @@ +/** + * `@embed` directive write-time hook (#632 / Phase 5 of #510). + * + * Two surfaces: + * + * - `createDefaultEmbedder(embedConfig)` — produces the default embedder a + * table registers when its schema includes an `@embed` directive. The + * embedder reads `record[embedConfig.source]`, calls `Models.embed(...)` + * with `inputType: 'document'`, and returns the first vector. Component + * authors can replace it via `Table.setEmbedAttribute(name, customEmbedder)` + * when they need different logic (multi-field concatenation, custom + * preprocessing). + * + * - `buildEmbedBefore(...)` — produces the embedder callback invoked + * *before* `transaction.addWrite(...)` at the put/patch site. The + * embedder mutates `record[attr.name]` so the new vector is on the + * record when the per-write `commit(...)` closure runs. (It can't ride + * the txn's pre-commit `before` slot because that slot is awaited at + * `Promise.all(completions)` at txn-commit time — AFTER each write's + * `commit(...)` has already stored the record. The blob pattern works + * there because blob IDs are pre-allocated synchronously and the blob + * bytes write independently of the record.) Returns `undefined` when + * there's no work (no embed attributes, or the write is a replication + * receiver) so the call site can short-circuit. + * + * Replicated-write predicate: the receiver should *store* the originating + * node's already-computed embedding, not re-compute it. Three signals + * indicate this is NOT a local-originating write, and we must skip the + * embedder for any of them: + * + * - `options.isNotification === true` — cluster-replication path; the + * `source.subscribe()` handler in `Table.ts` sets this when applying a + * write from a peer. + * - `context.replicateFrom === false` — REST `x-replicate-from: none` + * header path. NOTE: the value is the literal `false`, not a truthy + * identifier; `server/REST.ts` only ever assigns `replicateFrom = false`. + * - `context.alreadyLogged === true` — local audit-log replay path at + * `resources/replayLogs.ts` (process-restart catch-up). The vector is + * already on disk; the replay only re-emits to the in-memory state. + * + * Sync-by-default execution: the embedder runs during the transaction's + * `before` phase, so commit blocks on it. Queued mode is a follow-up: it would + * commit the record without the vector, then back-fill via the existing job + * infrastructure at `server/jobs/`. + * + * Model-change invalidation: the parser sets `property.version = "embed:"` + * so the schema-load path at `databases.ts:1111` detects a model change between + * deploys (same pattern `@computed` uses). Today the version-change pathway + * triggers an HNSW *re-index* of stored vectors — it does NOT *re-embed* the + * source field through the new model. New writes after a model change pick up + * the new model; existing rows keep their old-model vectors until they are + * re-written. Full re-embed-on-model-change backfill is tracked as a follow-up + * to #632 — the queued-mode plumbing is the natural place to land it, since + * it already needs the iterate-records-and-back-fill primitive. + */ + +/** + * Lazy logger resolver. A static `import { logger } from '../../utility/logging/logger.ts'` + * would be cleaner but trips the documented `common_utils.ts ↔ harper_logger.ts` + * CJS cycle (ERR_REQUIRE_CYCLE_MODULE) the moment this module is loaded by a + * unit-test path that bypasses the full transaction stack. We only need the + * logger on the failure path, so we resolve it lazily. + */ +function getLogger(): { error?: (...args: any[]) => void } { + try { + // eslint-disable-next-line @typescript-eslint/no-var-requires + return require('#src/utility/logging/logger').logger ?? {}; + } catch { + return {}; + } +} + +export type EmbedConfig = { + source: string; + model: string; +}; + +export type EmbedAttribute = { + name: string; + embed: EmbedConfig; +}; + +export type Embedder = (record: any) => Promise; + +/** + * Embed-function shape the default embedder calls into. Matches the public + * `Models.embed` signature. Pulled out as a type so we can dependency-inject + * a fake for unit tests without dragging the transaction stack into module + * load. + */ +type EmbedFn = ( + input: string | string[], + opts: { model?: string; inputType?: 'document' | 'query' } +) => Promise; + +/** + * Models facade resolver for the default embedder. Lazy-imported so this + * module can be unit-tested without loading the transaction stack that + * `Models.ts` pulls in. Overridable via `__setEmbedFnForTest` (test seam). + * + * The `Models` class reads ALS-scoped context and a process-wide backend + * registry, so per-call instantiation has the same observable behavior as a + * singleton — we just lazy-cache for allocation churn. + */ +let _embedFn: EmbedFn | undefined; +function resolveEmbedFn(): EmbedFn { + if (_embedFn) return _embedFn; + // `#src/` alias goes through package.json conditional exports — resolves to + // `./*.ts` under the `typestrip` condition (unit tests) and to `./dist/*.js` + // in production. A bare `require('./Models.ts')` doesn't survive the dist + // build because the `.ts` extension stays literal at runtime. + const { Models } = require('#src/resources/models/Models'); // eslint-disable-line @typescript-eslint/no-var-requires + const models = new Models(); + _embedFn = (input, opts) => models.embed(input, opts); + return _embedFn; +} + +/** + * Override the embed function used by `createDefaultEmbedder`. Test seam + * only. Pass `undefined` to reset to the lazily-loaded `Models.embed`. + */ +export function __setEmbedFnForTest(fn: EmbedFn | undefined): void { + _embedFn = fn; +} + +export function createDefaultEmbedder(embedConfig: EmbedConfig): Embedder { + const { source, model } = embedConfig; + return async (record: any): Promise => { + const sourceValue = record?.[source]; + if (sourceValue == null) return null; + // `embed()` always returns an array (one vector per input). `@embed`'s + // single-source-field semantics mean we only ever pass a single input + // and return the first vector. + const vectors = await resolveEmbedFn()(String(sourceValue), { + model, + inputType: 'document', + }); + const v = vectors?.[0]; + if (v == null) return undefined; + // Convert Float32Array → plain Array for storage. Harper's record + // encoder doesn't round-trip typed arrays cleanly through msgpack/CRDT + // merge (`updateAndFreeze` enumerates them as `{0,1,2,...}` maps), so we + // store as a plain numeric array. HNSW's `propertyResolver` and + // `customIndex.index` both accept `number[]` and `Float32Array`. + return v instanceof Float32Array ? Array.from(v) : Array.from(v as any); + }; +} + +/** + * Build the pre-commit `before` callback that fires registered embedders for + * every `@embed`-decorated attribute whose source field is present in this + * write's payload. Returns `undefined` when: + * + * - the table has no `@embed` attributes, + * - the write is a replication receiver, or + * - no source field on any embed attribute appears in the write's record. + * + * Source-field semantics: the embedder runs only when the source field is + * explicitly included in the write payload (PUT or PATCH that touches the + * source). On a PATCH that omits the source, the existing embedding survives + * via patch-merge — we don't recompute, and we don't clear. On an explicit + * `source: null`, we clear the embedding to `null` to preserve consistency + * between the source and its derived vector. + * + * The returned callback awaits each embedder serially. Multi-`@embed`-per- + * table is rare; parallelism here would complicate error reporting without a + * meaningful latency win in the common case (one embed attribute per table). + */ +export function buildEmbedBefore( + record: any, + context: any, + options: any, + embedAttributes: EmbedAttribute[] | undefined, + userEmbedders: Record +): (() => Promise) | undefined { + if (!embedAttributes || embedAttributes.length === 0) return undefined; + if (options?.isNotification === true || context?.replicateFrom === false || context?.alreadyLogged === true) { + return undefined; + } + if (!record || typeof record !== 'object') return undefined; + // quick scan: skip the whole pass if no embed-source field is in this payload + let anySourcePresent = false; + for (const attr of embedAttributes) { + const sourceKey = attr.embed?.source; + if (sourceKey && sourceKey in record) { + anySourcePresent = true; + break; + } + } + if (!anySourcePresent) return undefined; + return async (): Promise => { + // Run embedders for each `@embed` attribute in parallel — typical schemas + // have a single `@embed` field per table but a multi-`@embed` table would + // otherwise serialize HTTP roundtrips for no benefit. Each embedder + // mutates a distinct attribute on the same record, so there's no + // ordering hazard between them. + await Promise.all( + embedAttributes.map(async (attr) => { + const sourceKey = attr.embed?.source; + if (!sourceKey) return; + if (!(sourceKey in record)) return; + const sourceValue = record[sourceKey]; + if (sourceValue == null) { + record[attr.name] = null; + return; + } + const embedder = userEmbedders[attr.name]; + if (!embedder) return; + let vector; + try { + vector = await embedder(record); + } catch (err) { + // Embedder backends (OpenAI, Anthropic, Bedrock, Ollama) may include + // URLs, model identifiers, or API-key tails in error messages. Those + // land in HTTP responses if propagated raw — Harper's threat model + // trusts deployers but not arbitrary REST callers, so we log the raw + // error and rethrow a sanitized one. The original error stays in + // server logs for diagnosis. + getLogger().error?.(`Embedder for attribute "${attr.name}" failed:`, err); + throw new Error(`Failed to compute embedding for attribute "${attr.name}"`); + } + record[attr.name] = vector == null ? null : vector; + }) + ); + }; +} diff --git a/unitTests/resources/models/embedHook.test.js b/unitTests/resources/models/embedHook.test.js new file mode 100644 index 000000000..c1a7e4e9e --- /dev/null +++ b/unitTests/resources/models/embedHook.test.js @@ -0,0 +1,204 @@ +'use strict'; + +const assert = require('node:assert/strict'); +const { buildEmbedBefore, createDefaultEmbedder, __setEmbedFnForTest } = require('#src/resources/models/embedHook'); + +const VECTOR = new Float32Array([0.1, 0.2, 0.3]); + +function fakeEmbedCapturing() { + const calls = []; + const fn = async (input, opts) => { + calls.push({ input, opts }); + return [VECTOR]; + }; + fn.calls = calls; + return fn; +} + +describe('embedHook', () => { + describe('createDefaultEmbedder', () => { + afterEach(() => __setEmbedFnForTest(undefined)); + + it('reads source field, calls Models.embed with document inputType, returns first vector as Array', async () => { + const embedFn = fakeEmbedCapturing(); + __setEmbedFnForTest(embedFn); + const embedder = createDefaultEmbedder({ source: 'content', model: 'default' }); + const vec = await embedder({ content: 'hello world' }); + // Default embedder converts Float32Array → Array so Harper's record + // encoder doesn't mangle it via `updateAndFreeze`. HNSW accepts both. + assert.ok(Array.isArray(vec), 'vec should be a plain Array'); + assert.deepEqual(vec, Array.from(VECTOR)); + assert.equal(embedFn.calls.length, 1); + assert.equal(embedFn.calls[0].input, 'hello world'); + assert.equal(embedFn.calls[0].opts.model, 'default'); + assert.equal(embedFn.calls[0].opts.inputType, 'document'); + }); + + it('returns null when source value is null', async () => { + const embedFn = fakeEmbedCapturing(); + __setEmbedFnForTest(embedFn); + const embedder = createDefaultEmbedder({ source: 'content', model: 'default' }); + assert.equal(await embedder({ content: null }), null); + assert.equal(embedFn.calls.length, 0); + }); + + it('returns null when source value is undefined', async () => { + const embedFn = fakeEmbedCapturing(); + __setEmbedFnForTest(embedFn); + const embedder = createDefaultEmbedder({ source: 'content', model: 'default' }); + assert.equal(await embedder({}), null); + assert.equal(embedFn.calls.length, 0); + }); + + it('stringifies non-string source values before passing to embed()', async () => { + const embedFn = fakeEmbedCapturing(); + __setEmbedFnForTest(embedFn); + const embedder = createDefaultEmbedder({ source: 'count', model: 'default' }); + await embedder({ count: 42 }); + assert.equal(embedFn.calls[0].input, '42'); + }); + }); + + describe('buildEmbedBefore', () => { + const attrs = [{ name: 'embedding', embed: { source: 'content', model: 'default' } }]; + + it('returns undefined when embedAttributes is empty', () => { + assert.equal(buildEmbedBefore({ content: 'x' }, {}, {}, [], {}), undefined); + assert.equal(buildEmbedBefore({ content: 'x' }, {}, {}, undefined, {}), undefined); + }); + + it('returns undefined on cluster-replication receive (options.isNotification === true)', () => { + const before = buildEmbedBefore({ content: 'x' }, {}, { isNotification: true }, attrs, { + embedding: async () => VECTOR, + }); + assert.equal(before, undefined); + }); + + it('returns undefined on REST x-replicate-from: none (context.replicateFrom === false)', () => { + const before = buildEmbedBefore({ content: 'x' }, { replicateFrom: false }, {}, attrs, { + embedding: async () => VECTOR, + }); + assert.equal(before, undefined); + }); + + it('DOES fire on a local-originating write where replicateFrom is undefined', () => { + // Originating writes have undefined replicateFrom (not false); make sure the predicate + // does not over-skip and silently drop local embedding work. + const before = buildEmbedBefore({ content: 'x' }, {}, {}, attrs, { + embedding: async () => VECTOR, + }); + assert.ok(before, 'embedder should fire on local-originating writes'); + }); + + it('returns undefined on replay context (alreadyLogged === true)', () => { + const before = buildEmbedBefore({ content: 'x' }, { alreadyLogged: true }, {}, attrs, { + embedding: async () => VECTOR, + }); + assert.equal(before, undefined); + }); + + it('returns undefined when no embed-source field is in the write payload (patch that omits source)', () => { + const before = buildEmbedBefore({ otherField: 'unchanged' }, {}, {}, attrs, { + embedding: async () => VECTOR, + }); + assert.equal(before, undefined); + }); + + it('returns undefined when record is not an object', () => { + assert.equal(buildEmbedBefore(null, {}, {}, attrs, { embedding: async () => VECTOR }), undefined); + assert.equal(buildEmbedBefore(undefined, {}, {}, attrs, { embedding: async () => VECTOR }), undefined); + }); + + it('runs the embedder and writes vector to the target attribute when source is present', async () => { + const record = { content: 'hello' }; + const before = buildEmbedBefore(record, {}, {}, attrs, { + embedding: async (r) => { + assert.equal(r.content, 'hello'); + return VECTOR; + }, + }); + assert.ok(before); + await before(); + assert.deepEqual(record.embedding, VECTOR); + }); + + it('clears the embedding to null when source is explicitly null', async () => { + const record = { content: null }; + let called = false; + const before = buildEmbedBefore(record, {}, {}, attrs, { + embedding: async () => { + called = true; + return VECTOR; + }, + }); + assert.ok(before); + await before(); + assert.equal(record.embedding, null); + assert.equal(called, false, 'embedder should not run when source is null'); + }); + + it('skips attributes whose source is not in the payload (multi-attribute table)', async () => { + const multiAttrs = [ + { name: 'embA', embed: { source: 'titleField', model: 'default' } }, + { name: 'embB', embed: { source: 'bodyField', model: 'default' } }, + ]; + const record = { bodyField: 'b' }; // only bodyField is in this patch + let embACalls = 0; + let embBCalls = 0; + const before = buildEmbedBefore(record, {}, {}, multiAttrs, { + embA: async () => { + embACalls++; + return VECTOR; + }, + embB: async () => { + embBCalls++; + return VECTOR; + }, + }); + assert.ok(before); + await before(); + assert.equal(embACalls, 0, 'embA source not in payload, skipped'); + assert.equal(embBCalls, 1, 'embB source in payload, fired'); + assert.equal(record.embA, undefined); + assert.deepEqual(record.embB, VECTOR); + }); + + it('skips an attribute that has no registered embedder', async () => { + const record = { content: 'hello' }; + const before = buildEmbedBefore(record, {}, {}, attrs, {}); // no embedder + assert.ok(before); + await before(); + assert.equal(record.embedding, undefined, 'no vector written when no embedder is registered'); + }); + + it('writes null to the target when the embedder returns null/undefined', async () => { + const record = { content: 'hello' }; + const before = buildEmbedBefore(record, {}, {}, attrs, { + embedding: async () => null, + }); + assert.ok(before); + await before(); + assert.equal(record.embedding, null); + }); + + it('propagates a sanitized error when the embedder throws', async () => { + const record = { content: 'hello' }; + const before = buildEmbedBefore(record, {}, {}, attrs, { + embedding: async () => { + throw new Error('https://internal-embed.svc:9000 401 key=sk-abc123 unauthorized'); + }, + }); + assert.ok(before); + // the embedder's raw backend message must NOT propagate as-is to the caller; the + // sanitized error should reference only the attribute name and a generic phrase. + await assert.rejects(before(), (err) => { + assert.ok(!/sk-abc123/.test(err.message), 'API key tail leaked'); + assert.ok(!/internal-embed\.svc/.test(err.message), 'internal hostname leaked'); + assert.ok(/embedding/i.test(err.message), 'error message should mention embedding'); + return true; + }); + // record.embedding should not have been written + assert.equal(record.embedding, undefined); + }); + }); +});