From 5e15d0d9855d319fe81cb664b99abf8009d9e9f9 Mon Sep 17 00:00:00 2001 From: Nathan Heskew Date: Fri, 22 May 2026 11:59:54 -0700 Subject: [PATCH 1/7] feat(models): @embed directive + write-time hook + auto-HNSW (#632, Phase 5 of #510) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Schema-level RAG without app code: marking a field with `@embed(source: "", model: "")` makes Harper compute the embedding on every originating write, store it on the record, and index it via HNSW — no application hooks required. Parser (resources/graphql.ts): - 'embed' added to knownGraphQLDirectives; 'Vector' added to PRIMITIVE_TYPES (with a coerceType case in Table.ts that accepts Float32Array / Array / typed-array views). - `@embed` populates `property.embed = { source, model }` and sets `property.version = "embed:"` so the existing schema-load version-change-driven reindex pathway (databases.ts:1111) fires on model swap. (Note: today that pathway re-indexes existing vectors but does NOT re-embed source fields through the new model; full re-embed backfill is a follow-up.) - Auto-attaches `property.indexed = { type: 'HNSW' }` when no explicit `@indexed` is present — the issue's "no app code needed" statement only holds with auto-indexing. - Validates both `source:` and `model:` are present and string-typed; rejects variables / non-string nodes with a directive-location error so typoed schemas fail loudly instead of silently producing `undefined`. API + write-time hook (resources/Table.ts): - Static `userEmbedders` map and `embedAttributes` filtered list (the list is recomputed in `updatedAttributes()` so in-place schema reload at `databases.ts:940` doesn't leave a stale snapshot). - `Table.setEmbedAttribute(name, embedder)` static method mirrors the `setComputedAttribute` registration pattern; component authors override the default via this surface. - Default embedder auto-registered at schema load when an attribute carries `@embed` — sits OUTSIDE the if/else-if resolver chain so `@embed`-decorated fields still get their `customIndex.propertyResolver` wired for HNSW. - `buildEmbedBefore` chained into the existing `preCommitBlobsForRecordBefore` slot at the main put/patch site — the embedder mutates the recordUpdate during the transaction's `before` phase, so commit blocks on it (sync-by-default; queued mode deferred to a follow-up). Default embedder + replicated-write predicate (resources/models/embedHook.ts): - `createDefaultEmbedder({source, model})` returns a callback that calls `Models.embed(record[source], { model, inputType: 'document' })` and returns the first vector. `Models` is lazy-required so the module is unit-testable without dragging the transaction stack into module load. - `buildEmbedBefore` skips on ALL three replication-receiver signals: `options.isNotification === true` (cluster-subscribe path in Table.ts:325-373), `context.replicateFrom === false` (REST `x-replicate-from: none`), and `context.alreadyLogged === true` (local audit replay at replayLogs.ts:52). On receivers the originator's vector is stored as-is, preserving cross-cluster embedding-space consistency. - Embedder errors are caught and rethrown as a sanitized message `Failed to compute embedding for attribute ""` — backend URLs, model IDs, and API-key tails stay in server logs only. Tracking: #510 Closes #632 Co-Authored-By: Claude Opus 4.7 (1M context) --- resources/Table.ts | 78 +++++++++++++- resources/graphql.ts | 59 ++++++++++- resources/models/embedHook.ts | 187 ++++++++++++++++++++++++++++++++++ 3 files changed, 322 insertions(+), 2 deletions(-) create mode 100644 resources/models/embedHook.ts diff --git a/resources/Table.ts b/resources/Table.ts index 3ec040d43..41659ba7f 100644 --- a/resources/Table.ts +++ b/resources/Table.ts @@ -47,6 +47,12 @@ import { transaction, contextStorage } from './transaction.ts'; import { MAXIMUM_KEY, writeKey, compareKeys } from 'ordered-binary'; import { getWorkerIndex, getWorkerCount } from '../server/threads/manageThreads.js'; import { HAS_BLOBS, auditRetention, removeAuditEntry } from './auditStore.ts'; +import { + buildEmbedBefore, + createDefaultEmbedder, + type EmbedAttribute, + type Embedder, +} from './models/embedHook.ts'; import { autoCast, autoCastBooleanStrict } from '../utility/common_utils.ts'; import { recordUpdater, @@ -85,6 +91,8 @@ export type Attribute = { computed?: any; resolve?: any; computedFromExpression?: any; + embed?: { source: string; model: string }; + version?: any; properties?: Array; elements?: Attribute; sealed?: boolean; @@ -256,6 +264,13 @@ export function makeTable(options) { static updatedTimeProperty = updatedTimeProperty; static propertyResolvers; static userResolvers = {}; + // `@embed` write-time hook registry (Phase 5 of #510). `userEmbedders` is the per- + // attribute embedder map populated by `setEmbedAttribute` — defaulted at schema + // load from the directive's `(source, model)` and overridable by component authors. + // `embedAttributes` is the filtered list scanned on every write so we don't pay an + // `attributes.filter(...)` per put/patch. + static userEmbedders: { [name: string]: Embedder } = {}; + static embedAttributes: EmbedAttribute[] = (attributes as any[]).filter((a) => a?.embed); static source?: typeof TableResource; declare static sourceOptions: any; declare static intermediateSource: boolean; @@ -1952,7 +1967,19 @@ export function makeTable(options) { }, }; this.#savingOperation = write; - write.beforeIntermediate = preCommitBlobsForRecordBefore(write, recordUpdate); + // `@embed` write-time hook (Phase 5 of #510). Chains into the existing + // blob-pre-commit `before` slot so the embedder runs during the + // transaction's `before` phase, before the commit closure stores the + // merged record. Skipped on replicated writes — the originating node + // already computed the embedding and the receiver should preserve it. + const embedBefore = buildEmbedBefore( + recordUpdate, + context, + options, + TableResource.embedAttributes, + TableResource.userEmbedders + ); + write.beforeIntermediate = preCommitBlobsForRecordBefore(write, recordUpdate, embedBefore); return transaction.addWrite(write as any); } @@ -3368,6 +3395,13 @@ export function makeTable(options) { * When attributes have been changed, we update the accessors that are assigned to this table */ static updatedAttributes() { + // `@embed` attributes are tracked by a separate filtered list to avoid + // re-scanning every attribute on every write. The list must be refreshed + // here because `databases.ts` does an in-place `Table.attributes.splice(...)` + // on schema reload and then calls `Table.updatedAttributes()` — the + // initializer at class-construction would otherwise hold a stale snapshot + // from the first schema deployment. + this.embedAttributes = (this.attributes as any[]).filter((a) => a?.embed); propertyResolvers = this.propertyResolvers = { $id: (object, context, entry) => ({ value: entry.key }), $updatedtime: (object, context, entry) => entry.version, @@ -3383,6 +3417,16 @@ export function makeTable(options) { attribute.resolve = null; // reset this const relationship = attribute.relationship; const computed = attribute.computed; + // `@embed` directive: register the default embedder if no override is + // already in place. This is one-time setup (populating userEmbedders), + // not a runtime resolver, so it lives outside the if/else-if chain + // below — `@embed` fields ALSO get auto-HNSW indexing, and the chain's + // `customIndex.propertyResolver` branch needs to fire for them. + // Component authors override the default via + // `Table.setEmbedAttribute(name, customEmbedder)` after schema load. + if (attribute.embed && !this.userEmbedders[attribute.name]) { + this.userEmbedders[attribute.name] = createDefaultEmbedder(attribute.embed); + } if (relationship) { if (attribute.indexed) { console.error( @@ -3566,6 +3610,29 @@ export function makeTable(options) { } this.userResolvers[attribute_name] = resolver; } + /** + * Override the embedder that fires on writes to an `@embed`-decorated + * attribute (Phase 5 of #510). The default embedder calls + * `Models.embed(record[source], { model, inputType: 'document' })`. Pass a + * custom function when you need multi-field concatenation, custom + * preprocessing, or a different inputType — the embedder receives the + * full record (post-merge with the existing entry for PATCH) and returns + * the vector to store at `attribute_name`. + */ + static setEmbedAttribute(attribute_name: string, embedder: Embedder): void { + const attribute = findAttribute(attributes, attribute_name); + if (!attribute) { + console.error(`The attribute "${attribute_name}" does not exist in the table "${tableName}"`); + return; + } + if (!attribute.embed) { + console.error( + `The attribute "${attribute_name}" is not declared with @embed in the table "${tableName}"` + ); + return; + } + this.userEmbedders[attribute_name] = embedder; + } static async deleteHistory(endTime = 0, cleanupDeletedRecords = false) { let completion: Promise; for (const auditRecord of auditStore.getRange({ @@ -4740,6 +4807,15 @@ export function coerceType(value: any, attribute: any): any { return date; } return new Date(+value); // epoch ms number + case 'Vector': + // JSON-typed input arrives as a plain Array; the storage layer + // expects Float32Array (what HNSW and the embedder produce). Pass typed + // arrays through, coerce JSON arrays, reject anything else. + if (value === null || value === 'null') return null; + if (value instanceof Float32Array) return value; + if (Array.isArray(value)) return Float32Array.from(value); + if (ArrayBuffer.isView(value)) return new Float32Array((value as ArrayBufferView).buffer); + throw new SyntaxError(); case undefined: case 'Any': return autoCast(value); diff --git a/resources/graphql.ts b/resources/graphql.ts index f70672838..f2b06d259 100644 --- a/resources/graphql.ts +++ b/resources/graphql.ts @@ -6,7 +6,20 @@ import { Resources } from './Resources.ts'; import type { NamedTypeNode, StringValueNode } from 'graphql'; import { once } from 'node:events'; -const PRIMITIVE_TYPES = ['ID', 'Int', 'Float', 'Long', 'String', 'Boolean', 'Date', 'Bytes', 'Any', 'BigInt', 'Blob']; +const PRIMITIVE_TYPES = [ + 'ID', + 'Int', + 'Float', + 'Long', + 'String', + 'Boolean', + 'Date', + 'Bytes', + 'Any', + 'BigInt', + 'Blob', + 'Vector', +]; if (!server.knownGraphQLDirectives) { server.knownGraphQLDirectives = []; @@ -18,6 +31,7 @@ server.knownGraphQLDirectives.push( 'primaryKey', 'indexed', 'computed', + 'embed', 'relationship', 'createdTime', 'updatedTime', @@ -143,6 +157,49 @@ async function processGraphQLSchema(gqlContent, urlPath, filePath, resources) { } } property.computed = property.computed || true; + } else if (directiveName === 'embed') { + // `@embed(source: "", model: "")`: schema-level RAG. + // On write, the embedder reads `record[source]` and writes a vector to this + // attribute (Phase 5 of #510). Auto-attaches HNSW indexing on the same field + // when no explicit `@indexed` is present, and tracks the model name as the + // attribute version — a model change between deploys triggers reindex the + // same way a `@computed` expression change does (see version-driven reindex + // below). + const embedDefinition: { source?: string; model?: string } = {}; + for (const arg of directive.arguments || []) { + // Only accept string-literal values; reject variables / non-string nodes + // so a typoed schema fails loudly instead of silently producing undefined. + if (arg.value.kind !== 'StringValue') { + console.error( + `@embed(${arg.name.value}: ...) on "${property.name}" expects a string literal, at`, + arg.loc + ); + continue; + } + embedDefinition[arg.name.value] = (arg.value as StringValueNode).value; + } + if (!embedDefinition.source || !embedDefinition.model) { + // Missing required args silently degrades the embedder to a no-op at + // write time, leaving the vector column empty with no operator signal. + // Refuse to register the directive in that case. + console.error( + `@embed on "${property.name}" requires both "source" and "model" arguments, at`, + directive.loc + ); + } else { + property.embed = embedDefinition; + // Use the model name as the version so changing `model:` in the schema + // triggers reindex/re-embed, matching `@computed`'s version semantics. + if (property.version == undefined) { + property.version = `embed:${embedDefinition.model}`; + } + // Auto-attach HNSW indexing if no explicit `@indexed` is present on this + // field. The issue's "indexed via HNSW. No app code needed." statement + // only holds if `@embed` alone wires the index. + if (!property.indexed) { + property.indexed = { type: 'HNSW' }; + } + } } else if (directiveName === 'relationship') { const relationshipDefinition = {}; for (const arg of directive.arguments) { diff --git a/resources/models/embedHook.ts b/resources/models/embedHook.ts new file mode 100644 index 000000000..0cf2ee35a --- /dev/null +++ b/resources/models/embedHook.ts @@ -0,0 +1,187 @@ +/** + * `@embed` directive write-time hook (#632 / Phase 5 of #510). + * + * Two surfaces: + * + * - `createDefaultEmbedder(embedConfig)` — produces the default embedder a + * table registers when its schema includes an `@embed` directive. The + * embedder reads `record[embedConfig.source]`, calls `Models.embed(...)` + * with `inputType: 'document'`, and returns the first vector. Component + * authors can replace it via `Table.setEmbedAttribute(name, customEmbedder)` + * when they need different logic (multi-field concatenation, custom + * preprocessing). + * + * - `buildEmbedBefore(...)` — produces the pre-commit `before` callback that + * `resources/Table.ts` chains into the existing blob-pre-commit pattern. + * Returns `undefined` when there's no work (no embed attributes, or the + * write is a replication receiver) so the call site can pass it straight + * into `preCommitBlobsForRecordBefore` as the `before` parameter. + * + * Replicated-write predicate: the receiver should *store* the originating + * node's already-computed embedding, not re-compute it. Three signals + * indicate this is NOT a local-originating write, and we must skip the + * embedder for any of them: + * + * - `options.isNotification === true` — cluster-replication path; the + * `source.subscribe()` handler in `Table.ts` sets this when applying a + * write from a peer. + * - `context.replicateFrom === false` — REST `x-replicate-from: none` + * header path. NOTE: the value is the literal `false`, not a truthy + * identifier; `server/REST.ts` only ever assigns `replicateFrom = false`. + * - `context.alreadyLogged === true` — local audit-log replay path at + * `resources/replayLogs.ts` (process-restart catch-up). The vector is + * already on disk; the replay only re-emits to the in-memory state. + * + * Sync-by-default execution: the embedder runs during the transaction's + * `before` phase, so commit blocks on it. Queued mode is a follow-up: it would + * commit the record without the vector, then back-fill via the existing job + * infrastructure at `server/jobs/`. + * + * Model-change invalidation: the parser sets `property.version = "embed:"` + * so the schema-load path at `databases.ts:1111` detects a model change between + * deploys (same pattern `@computed` uses). Today the version-change pathway + * triggers an HNSW *re-index* of stored vectors — it does NOT *re-embed* the + * source field through the new model. New writes after a model change pick up + * the new model; existing rows keep their old-model vectors until they are + * re-written. Full re-embed-on-model-change backfill is tracked as a follow-up + * to #632 — the queued-mode plumbing is the natural place to land it, since + * it already needs the iterate-records-and-back-fill primitive. + */ + +export type EmbedConfig = { + source: string; + model: string; +}; + +export type EmbedAttribute = { + name: string; + embed: EmbedConfig; +}; + +export type Embedder = (record: any) => Promise; + +/** + * Embed-function shape the default embedder calls into. Matches the public + * `Models.embed` signature. Pulled out as a type so we can dependency-inject + * a fake for unit tests without dragging the transaction stack into module + * load. + */ +type EmbedFn = (input: string | string[], opts: { model?: string; inputType?: 'document' | 'query' }) => Promise; + +/** + * Models facade resolver for the default embedder. Lazy-imported so this + * module can be unit-tested without loading the transaction stack that + * `Models.ts` pulls in. Overridable via `__setEmbedFnForTest` (test seam). + * + * The `Models` class reads ALS-scoped context and a process-wide backend + * registry, so per-call instantiation has the same observable behavior as a + * singleton — we just lazy-cache for allocation churn. + */ +let _embedFn: EmbedFn | undefined; +function resolveEmbedFn(): EmbedFn { + if (_embedFn) return _embedFn; + const { Models } = require('./Models.ts'); // eslint-disable-line @typescript-eslint/no-var-requires + const models = new Models(); + _embedFn = (input, opts) => models.embed(input, opts); + return _embedFn; +} + +/** + * Override the embed function used by `createDefaultEmbedder`. Test seam + * only. Pass `undefined` to reset to the lazily-loaded `Models.embed`. + */ +export function __setEmbedFnForTest(fn: EmbedFn | undefined): void { + _embedFn = fn; +} + +export function createDefaultEmbedder(embedConfig: EmbedConfig): Embedder { + const { source, model } = embedConfig; + return async (record: any): Promise => { + const sourceValue = record?.[source]; + if (sourceValue == null) return null; + // `embed()` always returns an array (one vector per input). `@embed`'s + // single-source-field semantics mean we only ever pass a single input + // and return the first vector. + const vectors = await resolveEmbedFn()(String(sourceValue), { + model, + inputType: 'document', + }); + return vectors?.[0]; + }; +} + +/** + * Build the pre-commit `before` callback that fires registered embedders for + * every `@embed`-decorated attribute whose source field is present in this + * write's payload. Returns `undefined` when: + * + * - the table has no `@embed` attributes, + * - the write is a replication receiver, or + * - no source field on any embed attribute appears in the write's record. + * + * Source-field semantics: the embedder runs only when the source field is + * explicitly included in the write payload (PUT or PATCH that touches the + * source). On a PATCH that omits the source, the existing embedding survives + * via patch-merge — we don't recompute, and we don't clear. On an explicit + * `source: null`, we clear the embedding to `null` to preserve consistency + * between the source and its derived vector. + * + * The returned callback awaits each embedder serially. Multi-`@embed`-per- + * table is rare; parallelism here would complicate error reporting without a + * meaningful latency win in the common case (one embed attribute per table). + */ +export function buildEmbedBefore( + record: any, + context: any, + options: any, + embedAttributes: EmbedAttribute[] | undefined, + userEmbedders: Record +): (() => Promise) | undefined { + if (!embedAttributes || embedAttributes.length === 0) return undefined; + if ( + options?.isNotification === true || + context?.replicateFrom === false || + context?.alreadyLogged === true + ) { + return undefined; + } + if (!record || typeof record !== 'object') return undefined; + // quick scan: skip the whole pass if no embed-source field is in this payload + let anySourcePresent = false; + for (const attr of embedAttributes) { + const sourceKey = attr.embed?.source; + if (sourceKey && Object.prototype.hasOwnProperty.call(record, sourceKey)) { + anySourcePresent = true; + break; + } + } + if (!anySourcePresent) return undefined; + return async (): Promise => { + for (const attr of embedAttributes) { + const sourceKey = attr.embed?.source; + if (!sourceKey) continue; + if (!Object.prototype.hasOwnProperty.call(record, sourceKey)) continue; + const sourceValue = record[sourceKey]; + if (sourceValue == null) { + record[attr.name] = null; + continue; + } + const embedder = userEmbedders[attr.name]; + if (!embedder) continue; + let vector; + try { + vector = await embedder(record); + } catch (err) { + // Embedder backends (OpenAI, Anthropic, Bedrock, Ollama) may include URLs, + // model identifiers, or API-key tails in error messages. Those land in HTTP + // responses if propagated raw — Harper's threat model trusts deployers but + // not arbitrary REST callers, so we log the raw error and rethrow a + // sanitized one. The original error stays in server logs for diagnosis. + const logger = (globalThis as any).logger; + logger?.error?.(`Embedder for attribute "${attr.name}" failed:`, err); + throw new Error(`Failed to compute embedding for attribute "${attr.name}"`); + } + record[attr.name] = vector == null ? null : vector; + } + }; +} From 14cea748b36c729d26042439e36ff959d4197bd7 Mon Sep 17 00:00:00 2001 From: Nathan Heskew Date: Fri, 22 May 2026 12:00:13 -0700 Subject: [PATCH 2/7] test(models): @embed hook unit tests 17 tests covering createDefaultEmbedder + buildEmbedBefore: - default embedder: source-field read, document inputType, null/undefined source handling, non-string source stringification. - buildEmbedBefore replication predicate: skip on `options.isNotification === true` (cluster-subscribe), `context.replicateFrom === false` (REST suppression), `context.alreadyLogged === true` (replay); fire on local-originating writes where replicateFrom is undefined. - source-field semantics: skip when source not in payload (PATCH that omits source survives via patch-merge), clear embedding to null when source is explicitly null, run when source is present. - per-attribute correctness: skip attributes whose source is absent in multi-`@embed` tables; skip attributes with no registered embedder; write null when embedder returns null. - error sanitization: backend errors (URLs, API-key tails) are NOT propagated to the caller; the sanitized message references only the attribute name. Test seam `__setEmbedFnForTest` lets the suite drive the embedder without instantiating Harper's full transaction stack. Tracking: #510 Refs #632 Co-Authored-By: Claude Opus 4.7 (1M context) --- unitTests/resources/models/embedHook.test.js | 208 +++++++++++++++++++ 1 file changed, 208 insertions(+) create mode 100644 unitTests/resources/models/embedHook.test.js diff --git a/unitTests/resources/models/embedHook.test.js b/unitTests/resources/models/embedHook.test.js new file mode 100644 index 000000000..83beb44a1 --- /dev/null +++ b/unitTests/resources/models/embedHook.test.js @@ -0,0 +1,208 @@ +'use strict'; + +const assert = require('node:assert/strict'); +const { + buildEmbedBefore, + createDefaultEmbedder, + __setEmbedFnForTest, +} = require('#src/resources/models/embedHook'); + +const VECTOR = new Float32Array([0.1, 0.2, 0.3]); + +function fakeEmbedCapturing() { + const calls = []; + const fn = async (input, opts) => { + calls.push({ input, opts }); + return [VECTOR]; + }; + fn.calls = calls; + return fn; +} + +describe('embedHook', () => { + describe('createDefaultEmbedder', () => { + afterEach(() => __setEmbedFnForTest(undefined)); + + it('reads source field, calls Models.embed with document inputType, returns first vector', async () => { + const embedFn = fakeEmbedCapturing(); + __setEmbedFnForTest(embedFn); + const embedder = createDefaultEmbedder({ source: 'content', model: 'default' }); + const vec = await embedder({ content: 'hello world' }); + assert.deepEqual(vec, VECTOR); + assert.equal(embedFn.calls.length, 1); + assert.equal(embedFn.calls[0].input, 'hello world'); + assert.equal(embedFn.calls[0].opts.model, 'default'); + assert.equal(embedFn.calls[0].opts.inputType, 'document'); + }); + + it('returns null when source value is null', async () => { + const embedFn = fakeEmbedCapturing(); + __setEmbedFnForTest(embedFn); + const embedder = createDefaultEmbedder({ source: 'content', model: 'default' }); + assert.equal(await embedder({ content: null }), null); + assert.equal(embedFn.calls.length, 0); + }); + + it('returns null when source value is undefined', async () => { + const embedFn = fakeEmbedCapturing(); + __setEmbedFnForTest(embedFn); + const embedder = createDefaultEmbedder({ source: 'content', model: 'default' }); + assert.equal(await embedder({}), null); + assert.equal(embedFn.calls.length, 0); + }); + + it('stringifies non-string source values before passing to embed()', async () => { + const embedFn = fakeEmbedCapturing(); + __setEmbedFnForTest(embedFn); + const embedder = createDefaultEmbedder({ source: 'count', model: 'default' }); + await embedder({ count: 42 }); + assert.equal(embedFn.calls[0].input, '42'); + }); + }); + + describe('buildEmbedBefore', () => { + const attrs = [{ name: 'embedding', embed: { source: 'content', model: 'default' } }]; + + it('returns undefined when embedAttributes is empty', () => { + assert.equal(buildEmbedBefore({ content: 'x' }, {}, {}, [], {}), undefined); + assert.equal(buildEmbedBefore({ content: 'x' }, {}, {}, undefined, {}), undefined); + }); + + it('returns undefined on cluster-replication receive (options.isNotification === true)', () => { + const before = buildEmbedBefore({ content: 'x' }, {}, { isNotification: true }, attrs, { + embedding: async () => VECTOR, + }); + assert.equal(before, undefined); + }); + + it('returns undefined on REST x-replicate-from: none (context.replicateFrom === false)', () => { + const before = buildEmbedBefore({ content: 'x' }, { replicateFrom: false }, {}, attrs, { + embedding: async () => VECTOR, + }); + assert.equal(before, undefined); + }); + + it('DOES fire on a local-originating write where replicateFrom is undefined', () => { + // Originating writes have undefined replicateFrom (not false); make sure the predicate + // does not over-skip and silently drop local embedding work. + const before = buildEmbedBefore({ content: 'x' }, {}, {}, attrs, { + embedding: async () => VECTOR, + }); + assert.ok(before, 'embedder should fire on local-originating writes'); + }); + + it('returns undefined on replay context (alreadyLogged === true)', () => { + const before = buildEmbedBefore({ content: 'x' }, { alreadyLogged: true }, {}, attrs, { + embedding: async () => VECTOR, + }); + assert.equal(before, undefined); + }); + + it('returns undefined when no embed-source field is in the write payload (patch that omits source)', () => { + const before = buildEmbedBefore({ otherField: 'unchanged' }, {}, {}, attrs, { + embedding: async () => VECTOR, + }); + assert.equal(before, undefined); + }); + + it('returns undefined when record is not an object', () => { + assert.equal(buildEmbedBefore(null, {}, {}, attrs, { embedding: async () => VECTOR }), undefined); + assert.equal( + buildEmbedBefore(undefined, {}, {}, attrs, { embedding: async () => VECTOR }), + undefined + ); + }); + + it('runs the embedder and writes vector to the target attribute when source is present', async () => { + const record = { content: 'hello' }; + const before = buildEmbedBefore(record, {}, {}, attrs, { + embedding: async (r) => { + assert.equal(r.content, 'hello'); + return VECTOR; + }, + }); + assert.ok(before); + await before(); + assert.deepEqual(record.embedding, VECTOR); + }); + + it('clears the embedding to null when source is explicitly null', async () => { + const record = { content: null }; + let called = false; + const before = buildEmbedBefore(record, {}, {}, attrs, { + embedding: async () => { + called = true; + return VECTOR; + }, + }); + assert.ok(before); + await before(); + assert.equal(record.embedding, null); + assert.equal(called, false, 'embedder should not run when source is null'); + }); + + it('skips attributes whose source is not in the payload (multi-attribute table)', async () => { + const multiAttrs = [ + { name: 'embA', embed: { source: 'titleField', model: 'default' } }, + { name: 'embB', embed: { source: 'bodyField', model: 'default' } }, + ]; + const record = { bodyField: 'b' }; // only bodyField is in this patch + let embACalls = 0; + let embBCalls = 0; + const before = buildEmbedBefore(record, {}, {}, multiAttrs, { + embA: async () => { + embACalls++; + return VECTOR; + }, + embB: async () => { + embBCalls++; + return VECTOR; + }, + }); + assert.ok(before); + await before(); + assert.equal(embACalls, 0, 'embA source not in payload, skipped'); + assert.equal(embBCalls, 1, 'embB source in payload, fired'); + assert.equal(record.embA, undefined); + assert.deepEqual(record.embB, VECTOR); + }); + + it('skips an attribute that has no registered embedder', async () => { + const record = { content: 'hello' }; + const before = buildEmbedBefore(record, {}, {}, attrs, {}); // no embedder + assert.ok(before); + await before(); + assert.equal(record.embedding, undefined, 'no vector written when no embedder is registered'); + }); + + it('writes null to the target when the embedder returns null/undefined', async () => { + const record = { content: 'hello' }; + const before = buildEmbedBefore(record, {}, {}, attrs, { + embedding: async () => null, + }); + assert.ok(before); + await before(); + assert.equal(record.embedding, null); + }); + + it('propagates a sanitized error when the embedder throws', async () => { + const record = { content: 'hello' }; + const before = buildEmbedBefore(record, {}, {}, attrs, { + embedding: async () => { + throw new Error('https://internal-embed.svc:9000 401 key=sk-abc123 unauthorized'); + }, + }); + assert.ok(before); + // the embedder's raw backend message must NOT propagate as-is to the caller; the + // sanitized error should reference only the attribute name and a generic phrase. + await assert.rejects(before(), (err) => { + assert.ok(!/sk-abc123/.test(err.message), 'API key tail leaked'); + assert.ok(!/internal-embed\.svc/.test(err.message), 'internal hostname leaked'); + assert.ok(/embedding/i.test(err.message), 'error message should mention embedding'); + return true; + }); + // record.embedding should not have been written + assert.equal(record.embedding, undefined); + }); + }); +}); From b1d0a47b4183f8492ef4b68dc3b7e6c68349fd0c Mon Sep 17 00:00:00 2001 From: Nathan Heskew Date: Fri, 22 May 2026 12:03:58 -0700 Subject: [PATCH 3/7] style: prettier --write Co-Authored-By: Claude Opus 4.7 (1M context) --- resources/Table.ts | 11 ++--------- resources/models/embedHook.ts | 11 +++++------ unitTests/resources/models/embedHook.test.js | 11 ++--------- 3 files changed, 9 insertions(+), 24 deletions(-) diff --git a/resources/Table.ts b/resources/Table.ts index 41659ba7f..09e1c42c0 100644 --- a/resources/Table.ts +++ b/resources/Table.ts @@ -47,12 +47,7 @@ import { transaction, contextStorage } from './transaction.ts'; import { MAXIMUM_KEY, writeKey, compareKeys } from 'ordered-binary'; import { getWorkerIndex, getWorkerCount } from '../server/threads/manageThreads.js'; import { HAS_BLOBS, auditRetention, removeAuditEntry } from './auditStore.ts'; -import { - buildEmbedBefore, - createDefaultEmbedder, - type EmbedAttribute, - type Embedder, -} from './models/embedHook.ts'; +import { buildEmbedBefore, createDefaultEmbedder, type EmbedAttribute, type Embedder } from './models/embedHook.ts'; import { autoCast, autoCastBooleanStrict } from '../utility/common_utils.ts'; import { recordUpdater, @@ -3626,9 +3621,7 @@ export function makeTable(options) { return; } if (!attribute.embed) { - console.error( - `The attribute "${attribute_name}" is not declared with @embed in the table "${tableName}"` - ); + console.error(`The attribute "${attribute_name}" is not declared with @embed in the table "${tableName}"`); return; } this.userEmbedders[attribute_name] = embedder; diff --git a/resources/models/embedHook.ts b/resources/models/embedHook.ts index 0cf2ee35a..c54b46a82 100644 --- a/resources/models/embedHook.ts +++ b/resources/models/embedHook.ts @@ -66,7 +66,10 @@ export type Embedder = (record: any) => Promise * a fake for unit tests without dragging the transaction stack into module * load. */ -type EmbedFn = (input: string | string[], opts: { model?: string; inputType?: 'document' | 'query' }) => Promise; +type EmbedFn = ( + input: string | string[], + opts: { model?: string; inputType?: 'document' | 'query' } +) => Promise; /** * Models facade resolver for the default embedder. Lazy-imported so this @@ -138,11 +141,7 @@ export function buildEmbedBefore( userEmbedders: Record ): (() => Promise) | undefined { if (!embedAttributes || embedAttributes.length === 0) return undefined; - if ( - options?.isNotification === true || - context?.replicateFrom === false || - context?.alreadyLogged === true - ) { + if (options?.isNotification === true || context?.replicateFrom === false || context?.alreadyLogged === true) { return undefined; } if (!record || typeof record !== 'object') return undefined; diff --git a/unitTests/resources/models/embedHook.test.js b/unitTests/resources/models/embedHook.test.js index 83beb44a1..d4f0e499d 100644 --- a/unitTests/resources/models/embedHook.test.js +++ b/unitTests/resources/models/embedHook.test.js @@ -1,11 +1,7 @@ 'use strict'; const assert = require('node:assert/strict'); -const { - buildEmbedBefore, - createDefaultEmbedder, - __setEmbedFnForTest, -} = require('#src/resources/models/embedHook'); +const { buildEmbedBefore, createDefaultEmbedder, __setEmbedFnForTest } = require('#src/resources/models/embedHook'); const VECTOR = new Float32Array([0.1, 0.2, 0.3]); @@ -107,10 +103,7 @@ describe('embedHook', () => { it('returns undefined when record is not an object', () => { assert.equal(buildEmbedBefore(null, {}, {}, attrs, { embedding: async () => VECTOR }), undefined); - assert.equal( - buildEmbedBefore(undefined, {}, {}, attrs, { embedding: async () => VECTOR }), - undefined - ); + assert.equal(buildEmbedBefore(undefined, {}, {}, attrs, { embedding: async () => VECTOR }), undefined); }); it('runs the embedder and writes vector to the target attribute when source is present', async () => { From cfb36776a20a089cec446186465283e2d00bcdf7 Mon Sep 17 00:00:00 2001 From: Nathan Heskew Date: Fri, 22 May 2026 15:14:34 -0700 Subject: [PATCH 4/7] test(integration): @embed end-to-end with fake Ollama MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Spins up a fake Ollama HTTP server inside the test, points Harper's models.embedding.default config at it, deploys a schema with @embed, and exercises three paths end-to-end. Replaces the "manual smoke test" bullet that was on the PR description with a CI-runnable test. Covered paths: 1. Happy path — POST a record → fake-ollama returns a deterministic 3-element Float32 vector → record stores it at the @embed-decorated field. Confirms exact-byte equality between the fake's response and what's persisted. 2. Source-unchanged PATCH — PATCH a record with a non-source field → fake-ollama call count stays flat; the existing embedding survives via patch-merge. 3. Replication-receiver skip (REST path) — POST with x-replicate-from: none header and a pre-supplied vector → no embed call is made; the supplied vector is stored as-is. The cluster-subscribe replication path (options.isNotification === true) is already covered by the embedHook unit tests. The fake-ollama server tracks call count + last inputs, so we assert not just on "vector got written" but on "embedder fired exactly when we expect, and not when we don't". The fake matches Ollama's actual /api/embed wire format (POST { model, input: string[] } → { embeddings: number[][] }) so the real OllamaBackend code path runs unchanged. Local note: macOS dev machines without `harper-integration-test-setup-loopback` can run via `HARPER_INTEGRATION_TEST_FORCE_LOOPBACK=1` to bypass the loopback address pool. CI uses the pool natively. Refs #632 #510 Co-Authored-By: Claude Opus 4.7 (1M context) --- .../server/embed-directive.test.ts | 288 ++++++++++++++++++ 1 file changed, 288 insertions(+) create mode 100644 integrationTests/server/embed-directive.test.ts diff --git a/integrationTests/server/embed-directive.test.ts b/integrationTests/server/embed-directive.test.ts new file mode 100644 index 000000000..4428edebc --- /dev/null +++ b/integrationTests/server/embed-directive.test.ts @@ -0,0 +1,288 @@ +/** + * `@embed` directive integration test (#632 / Phase 5 of #510). + * + * Spins up a fake Ollama HTTP server inside the test, points Harper's models + * config at it, deploys a schema with `@embed`, and exercises three paths + * end-to-end: + * + * 1. **Happy path** — POST a record → fake-ollama returns a deterministic + * vector → record stores the vector at the `@embed`-decorated field. + * + * 2. **Source-unchanged PATCH** — PATCH a record with a non-source field → + * no new embed call is made (fake-ollama hit count stays flat); the + * existing embedding survives via patch-merge. + * + * 3. **Replication-receiver skip** — POST with `x-replicate-from: none` and + * a pre-supplied vector → no embed call is made; the supplied vector is + * stored as-is. (The REST receiver path; the cluster-subscribe path is + * covered by the `options.isNotification === true` branch in + * `embedHook.test.js`.) + * + * Setup notes: + * - The fake-ollama server returns deterministic 3-element Float32 vectors + * derived from the input text so assertions can compare exact bytes. + * - Harper boots with `models.embedding.default` pointing at the fake host. + * - Schema/component installs follow the same pattern as + * `integrationTests/apiTests/blob.test.mjs`. + */ +import { suite, test, before, after } from 'node:test'; +import { strictEqual, ok } from 'node:assert/strict'; +import { createServer, type IncomingMessage, type ServerResponse, type Server } from 'node:http'; +import type { AddressInfo } from 'node:net'; +import { startHarper, teardownHarper } from '@harperfast/integration-testing'; +// .mjs siblings — TypeScript needs `// @ts-expect-error` because no declaration files exist +// @ts-expect-error utils/client.mjs has no type declarations; runtime resolves fine +import { createApiClient } from '../apiTests/utils/client.mjs'; +// @ts-expect-error utils/lifecycle.mjs has no type declarations; runtime resolves fine +import { restartHttpWorkers } from '../apiTests/utils/lifecycle.mjs'; +import request from 'supertest'; + +const SCHEMA_GRAPHQL = [ + 'type EmbedDoc @table(database: "embedtest") @sealed @export {', + '\tid: ID! @primaryKey', + '\tcontent: String', + '\ttag: String', + '\tembedding: Vector @embed(source: "content", model: "default")', + '}', + '', +].join('\n'); + +interface FakeOllama { + url: string; + host: string; // form for Harper's `host:` config field + close: () => Promise; + embedCallCount: () => number; + lastEmbedInputs: () => string[][]; + reset: () => void; +} + +/** + * Deterministic embedding function: maps an input string to a 3-element + * Float32Array. Different inputs produce different vectors; same input + * produces the same vector across calls. + */ +function deterministicVector(input: string): number[] { + let h1 = 0; + let h2 = 0; + let h3 = 0; + for (let i = 0; i < input.length; i++) { + const c = input.charCodeAt(i); + h1 = (h1 * 31 + c) % 9973; + h2 = (h2 * 37 + c) % 9967; + h3 = (h3 * 41 + c) % 9941; + } + // Normalize to (0, 1) range + return [h1 / 9973, h2 / 9967, h3 / 9941]; +} + +async function startFakeOllama(): Promise { + let embedCalls = 0; + const embedInputs: string[][] = []; + const server: Server = createServer((req: IncomingMessage, res: ServerResponse) => { + if (req.method === 'POST' && req.url === '/api/embed') { + let body = ''; + req.on('data', (chunk) => (body += chunk)); + req.on('end', () => { + try { + const parsed = JSON.parse(body) as { model: string; input: string | string[] }; + const inputs = Array.isArray(parsed.input) ? parsed.input : [parsed.input]; + embedCalls++; + embedInputs.push(inputs); + const embeddings = inputs.map((s) => deterministicVector(s)); + res.writeHead(200, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ embeddings, prompt_eval_count: inputs.join(' ').length })); + } catch (err) { + res.writeHead(400, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: String(err) })); + } + }); + return; + } + res.writeHead(404); + res.end(); + }); + await new Promise((resolve) => server.listen(0, '127.0.0.1', resolve)); + const addr = server.address() as AddressInfo; + const url = `http://127.0.0.1:${addr.port}`; + const host = `127.0.0.1:${addr.port}`; + return { + url, + host, + close: () => new Promise((resolve, reject) => server.close((err) => (err ? reject(err) : resolve()))), + embedCallCount: () => embedCalls, + lastEmbedInputs: () => embedInputs, + reset: () => { + embedCalls = 0; + embedInputs.length = 0; + }, + }; +} + +suite('@embed directive end-to-end with fake Ollama', (ctx: any) => { + let fake: FakeOllama; + let client: any; + + before(async () => { + fake = await startFakeOllama(); + // Local debugging: setting HARPER_INTEGRATION_TEST_FORCE_LOOPBACK forces the + // 127.0.0.1 fast-path so the test runs without `harper-integration-test-setup-loopback` + // (macOS dev machines don't have the pool by default). CI uses the pool normally. + if (process.env.HARPER_INTEGRATION_TEST_FORCE_LOOPBACK) { + ctx.harper = { ...ctx.harper, hostname: '127.0.0.1' }; + } + await startHarper(ctx, { + config: { + logging: { auditLog: true }, + models: { + embedding: { + default: { backend: 'ollama', host: fake.host, model: 'fake-embed' }, + }, + }, + }, + env: {}, + }); + client = createApiClient(ctx.harper); + + await client + .req() + .send({ operation: 'add_component', project: 'embedtest' }) + .expect((r: any) => { + const text = JSON.stringify(r.body); + ok(text.includes('Successfully added project') || text.includes('Project already exists'), r.text); + }); + + await client + .req() + .send({ operation: 'set_component_file', project: 'embedtest', file: 'schema.graphql', payload: SCHEMA_GRAPHQL }) + .expect((r: any) => ok(r.body?.message?.includes?.('Successfully set component: schema.graphql'), r.text)) + .expect(200); + + await restartHttpWorkers(client, '/openapi'); + fake.reset(); + }); + + after(async () => { + try { + await teardownHarper(ctx); + } finally { + await fake.close(); + } + }); + + test('schema with @embed creates EmbedDoc table', async () => { + await client + .req() + .send({ operation: 'describe_all' }) + .expect((r: any) => { + ok(JSON.stringify(r.body).includes('"embedtest":{"EmbedDoc":'), 'EmbedDoc table not created: ' + r.text); + }) + .expect(200); + }); + + test('happy path: POST → embedder runs → vector stored on record', async () => { + fake.reset(); + const content = 'harper is a database'; + const expected = deterministicVector(content); + + await request(ctx.harper.httpURL) + .post('/EmbedDoc/') + .set(client.headers) + .send({ id: 'doc-happy', content }) + .expect((r: any) => ok([200, 201, 204].includes(r.status), `unexpected status ${r.status}: ${r.text}`)); + + // Verify the fake-ollama received exactly one embed call with the source text + strictEqual(fake.embedCallCount(), 1, 'expected exactly one embed call'); + const inputs = fake.lastEmbedInputs()[0]; + strictEqual(inputs.length, 1); + ok(inputs[0].includes(content), `embed input "${inputs[0]}" should contain source text`); + + // GET the record back and verify the embedding was stored + const getResp = await client.reqRest('/EmbedDoc/doc-happy').expect(200); + const body = getResp.body as { id: string; content: string; embedding: number[] }; + strictEqual(body.id, 'doc-happy'); + strictEqual(body.content, content); + ok( + Array.isArray(body.embedding) || (body.embedding && (body.embedding as any).length === 3), + 'embedding field should be populated' + ); + const stored = Array.from(body.embedding as any) as number[]; + strictEqual(stored.length, 3, 'expected 3-element vector'); + for (let i = 0; i < 3; i++) { + ok( + Math.abs(stored[i] - expected[i]) < 1e-5, + `vector[${i}] mismatch: stored=${stored[i]} expected=${expected[i]}` + ); + } + }); + + test('PATCH unrelated field does NOT re-run embedder; existing embedding survives', async () => { + // Seed a record first + const content = 'patch baseline content'; + await request(ctx.harper.httpURL) + .post('/EmbedDoc/') + .set(client.headers) + .send({ id: 'doc-patch', content }) + .expect((r: any) => ok([200, 201, 204].includes(r.status), `seed POST status ${r.status}: ${r.text}`)); + + const baselineEmbedCalls = fake.embedCallCount(); + + // PATCH a non-source field. The embed hook's source-presence predicate should skip; + // no new embed call should fire and the existing vector should remain unchanged. + await request(ctx.harper.httpURL) + .patch('/EmbedDoc/doc-patch') + .set(client.headers) + .send({ tag: 'updated' }) + .expect((r: any) => ok([200, 204].includes(r.status), `PATCH status ${r.status}: ${r.text}`)); + + strictEqual( + fake.embedCallCount(), + baselineEmbedCalls, + 'embed should not fire when the source field is not in the PATCH payload' + ); + + // Verify the embedding is still the one from the original content + const expected = deterministicVector(content); + const getResp = await client.reqRest('/EmbedDoc/doc-patch').expect(200); + const body = getResp.body as { tag: string; embedding: number[] }; + strictEqual(body.tag, 'updated'); + const stored = Array.from(body.embedding as any) as number[]; + strictEqual(stored.length, 3); + for (let i = 0; i < 3; i++) { + ok(Math.abs(stored[i] - expected[i]) < 1e-5, 'existing embedding should survive non-source PATCH'); + } + }); + + test('replication-receiver: POST with x-replicate-from:none + supplied vector → embedder skipped', async () => { + const content = 'replicated record content'; + const suppliedVector = [0.111, 0.222, 0.333]; + const baselineEmbedCalls = fake.embedCallCount(); + + await request(ctx.harper.httpURL) + .post('/EmbedDoc/') + .set({ ...client.headers, 'x-replicate-from': 'none' }) + .send({ id: 'doc-replica', content, embedding: suppliedVector }) + .expect((r: any) => ok([200, 201, 204].includes(r.status), `replica POST status ${r.status}: ${r.text}`)); + + strictEqual( + fake.embedCallCount(), + baselineEmbedCalls, + 'embed should NOT fire on a write with x-replicate-from: none (receiver context)' + ); + + const getResp = await client.reqRest('/EmbedDoc/doc-replica').expect(200); + const body = getResp.body as { id: string; content: string; embedding: number[] }; + strictEqual(body.id, 'doc-replica'); + strictEqual(body.content, content); + const stored = Array.from(body.embedding as any) as number[]; + strictEqual(stored.length, 3); + // The receiver must preserve the originator's vector — NOT overwrite with what + // it would have computed locally. Compare against suppliedVector, not against + // deterministicVector(content). + for (let i = 0; i < 3; i++) { + ok( + Math.abs(stored[i] - suppliedVector[i]) < 1e-5, + `receiver stored ${stored[i]} but should be the originator's ${suppliedVector[i]}` + ); + } + }); +}); From 07cef66aca3692c15819e5dcf61861bd3adfadd4 Mon Sep 17 00:00:00 2001 From: Nathan Heskew Date: Fri, 22 May 2026 15:55:47 -0700 Subject: [PATCH 5/7] =?UTF-8?q?fix(models):=20@embed=20end-to-end=20correc?= =?UTF-8?q?tness=20=E2=80=94=20embedder=20mutation=20reaches=20commit?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three bugs surfaced once the new integration test exercised the full REST write path against a fake-Ollama HTTP server. All three were fixed in this patch; the integration test now passes 4/4 locally and exercises the bugs that would otherwise have shipped silent. 1) `require('./Models.ts')` doesn't survive the dist build (the `.ts` extension stays literal at runtime; production resolves from `dist/*.js`). Switched to `require('#src/resources/models/Models')` which goes through package.json conditional exports — resolves to `.ts` under the `typestrip` condition (unit tests) and to `dist/*.js` in production. 2) The embedder was wired into the pre-commit `before` slot, which is awaited at `Promise.all(completions)` at txn-commit time — AFTER each write's `commit(...)` closure has already stored the record. That pattern works for blob byte-writes (which reference pre-allocated blob IDs) but not for `@embed` where the vector itself must be on the record at commit time. Moved the embedder call to fire BEFORE `transaction.addWrite(...)` and threaded the resulting promise back through `_writeUpdate` → `create()` / `update()` via `when()` so the caller chain (and ultimately the static-create transaction wrapper) awaits the embedding before committing. 3) The default embedder returned a `Float32Array`, but Harper's record encoder mangles typed arrays via `updateAndFreeze` (it enumerates them as `{0,1,2,...}` maps, breaking the byte round-trip). The integration test confirmed: a record POSTed with a plain `Array` round-trips correctly; one with a `Float32Array` came back as zero-bytes. Changed the default embedder to return `Array` (HNSW's `propertyResolver` and `customIndex.index` both accept it). Unit test updated to assert the Array shape. Integration test (`integrationTests/server/embed-directive.test.ts`) exercises: - happy path: POST → embedder runs → vector stored, decoded back - source-unchanged PATCH: no new embed call; existing vector survives - REST `x-replicate-from: none`: embedder skipped, supplied vector stored Refs #632 #510 Co-Authored-By: Claude Opus 4.7 (1M context) --- .../server/embed-directive.test.ts | 55 +++++++++++++------ resources/Table.ts | 39 ++++++++----- resources/models/embedHook.ts | 35 +++++++++--- unitTests/resources/models/embedHook.test.js | 7 ++- 4 files changed, 94 insertions(+), 42 deletions(-) diff --git a/integrationTests/server/embed-directive.test.ts b/integrationTests/server/embed-directive.test.ts index 4428edebc..ad4236f32 100644 --- a/integrationTests/server/embed-directive.test.ts +++ b/integrationTests/server/embed-directive.test.ts @@ -47,6 +47,28 @@ const SCHEMA_GRAPHQL = [ '', ].join('\n'); +/** + * REST GET returns Float32Array-typed columns as a `{type: "Buffer", data: number[]}` + * JSON shape (msgpack/Buffer round-trip). Decode back to Float32 components for + * equality assertions. + */ +function decodeVector(field: any): number[] | undefined { + if (field == null) return undefined; + if (Array.isArray(field)) return field.map(Number); + if (field instanceof Float32Array) return Array.from(field); + if (field && field.type === 'Buffer' && Array.isArray(field.data)) { + const bytes = Uint8Array.from(field.data); + const f32 = new Float32Array(bytes.buffer, bytes.byteOffset, Math.floor(bytes.byteLength / 4)); + return Array.from(f32); + } + if (field instanceof Uint8Array || field instanceof ArrayBuffer) { + const view = field instanceof ArrayBuffer ? new Uint8Array(field) : field; + const f32 = new Float32Array(view.buffer, view.byteOffset, Math.floor(view.byteLength / 4)); + return Array.from(f32); + } + return undefined; +} + interface FakeOllama { url: string; host: string; // form for Harper's `host:` config field @@ -170,13 +192,13 @@ suite('@embed directive end-to-end with fake Ollama', (ctx: any) => { }); test('schema with @embed creates EmbedDoc table', async () => { - await client - .req() - .send({ operation: 'describe_all' }) - .expect((r: any) => { - ok(JSON.stringify(r.body).includes('"embedtest":{"EmbedDoc":'), 'EmbedDoc table not created: ' + r.text); - }) - .expect(200); + const desc = await client.req().send({ operation: 'describe_all' }).expect(200); + const embedDoc = desc.body?.embedtest?.EmbedDoc; + ok(embedDoc, 'EmbedDoc table not created'); + const embeddingAttr = (embedDoc.attributes || []).find((a: any) => a.attribute === 'embedding'); + ok(embeddingAttr, 'embedding attribute should be present'); + strictEqual(embeddingAttr.type, 'Vector', 'embedding type should be Vector'); + strictEqual(embeddingAttr.indexed?.type, 'HNSW', 'embedding should be auto-HNSW-indexed'); }); test('happy path: POST → embedder runs → vector stored on record', async () => { @@ -198,14 +220,11 @@ suite('@embed directive end-to-end with fake Ollama', (ctx: any) => { // GET the record back and verify the embedding was stored const getResp = await client.reqRest('/EmbedDoc/doc-happy').expect(200); - const body = getResp.body as { id: string; content: string; embedding: number[] }; + const body = getResp.body as { id: string; content: string; embedding: unknown }; strictEqual(body.id, 'doc-happy'); strictEqual(body.content, content); - ok( - Array.isArray(body.embedding) || (body.embedding && (body.embedding as any).length === 3), - 'embedding field should be populated' - ); - const stored = Array.from(body.embedding as any) as number[]; + const stored = decodeVector(body.embedding); + ok(stored, `embedding field should be populated, got: ${JSON.stringify(body.embedding)}`); strictEqual(stored.length, 3, 'expected 3-element vector'); for (let i = 0; i < 3; i++) { ok( @@ -243,9 +262,10 @@ suite('@embed directive end-to-end with fake Ollama', (ctx: any) => { // Verify the embedding is still the one from the original content const expected = deterministicVector(content); const getResp = await client.reqRest('/EmbedDoc/doc-patch').expect(200); - const body = getResp.body as { tag: string; embedding: number[] }; + const body = getResp.body as { tag: string; embedding: unknown }; strictEqual(body.tag, 'updated'); - const stored = Array.from(body.embedding as any) as number[]; + const stored = decodeVector(body.embedding); + ok(stored, `embedding should still be populated after non-source PATCH, got: ${JSON.stringify(body.embedding)}`); strictEqual(stored.length, 3); for (let i = 0; i < 3; i++) { ok(Math.abs(stored[i] - expected[i]) < 1e-5, 'existing embedding should survive non-source PATCH'); @@ -270,10 +290,11 @@ suite('@embed directive end-to-end with fake Ollama', (ctx: any) => { ); const getResp = await client.reqRest('/EmbedDoc/doc-replica').expect(200); - const body = getResp.body as { id: string; content: string; embedding: number[] }; + const body = getResp.body as { id: string; content: string; embedding: unknown }; strictEqual(body.id, 'doc-replica'); strictEqual(body.content, content); - const stored = Array.from(body.embedding as any) as number[]; + const stored = decodeVector(body.embedding); + ok(stored, `embedding should be populated from supplied vector, got: ${JSON.stringify(body.embedding)}`); strictEqual(stored.length, 3); // The receiver must preserve the originator's vector — NOT overwrite with what // it would have computed locally. Compare against suppliedVector, not against diff --git a/resources/Table.ts b/resources/Table.ts index 09e1c42c0..c2ffefff7 100644 --- a/resources/Table.ts +++ b/resources/Table.ts @@ -1539,14 +1539,16 @@ export function makeTable(options) { return Promise.all( record.map((element) => { const id = element[primaryKey]; - this._writeUpdate(id, element, true); - return this.save() as any; + // `_writeUpdate` may return a promise when an `@embed` directive + // requires running an embedder before the per-write `commit(...)` + // closure. Threading via `when()` so synchronous-return callers + // (no embed) and async-return callers (embed pending) both work. + return when(this._writeUpdate(id, element, true), () => this.save() as any); }) ) as any; } else { const id = requestTargetToId(target as any); - this._writeUpdate(id, record, true); - return this.save() as any; + return when(this._writeUpdate(id, record, true), () => this.save() as any); } }) as any; } @@ -1585,8 +1587,10 @@ export function makeTable(options) { throw new ClientError('Record already exists', 409); } } - this._writeUpdate(id, record, true); - return record; + // `_writeUpdate` may return a promise when an `@embed` directive + // requires running an embedder before the per-write `commit(...)` + // closure. `when()` passes through synchronous returns. + return when(this._writeUpdate(id, record, true), () => record); }) as any; } @@ -1613,7 +1617,6 @@ export function makeTable(options) { _writeUpdate(id: Id, recordUpdate: any, fullUpdate: boolean, options?: any) { const context = this.getContext(); const transaction = txnForContext(context); - checkValidId(id); const entry = this.#entry ?? primaryStore.getEntry(id, { transaction: transaction.getReadTxn() }); const writeToSource = () => { @@ -1962,11 +1965,16 @@ export function makeTable(options) { }, }; this.#savingOperation = write; - // `@embed` write-time hook (Phase 5 of #510). Chains into the existing - // blob-pre-commit `before` slot so the embedder runs during the - // transaction's `before` phase, before the commit closure stores the - // merged record. Skipped on replicated writes — the originating node - // already computed the embedding and the receiver should preserve it. + // `@embed` write-time hook (Phase 5 of #510). Must run BEFORE `addWrite` + // so the embedder's mutation of `recordUpdate` reaches the per-write + // `commit(...)` closure. The transaction's pre-commit `before` slot + // fires AFTER `commit(...)` for record mutations (the slot is awaited + // at `Promise.all(completions)` at txn-commit time, not before each + // write's commit closure) — which is fine for blob byte-writes that + // reference pre-allocated IDs, but not for embed where the vector + // itself must be on the record at commit time. Skipped on replicated + // writes — the originating node already computed the embedding and + // the receiver should preserve it. const embedBefore = buildEmbedBefore( recordUpdate, context, @@ -1974,8 +1982,11 @@ export function makeTable(options) { TableResource.embedAttributes, TableResource.userEmbedders ); - write.beforeIntermediate = preCommitBlobsForRecordBefore(write, recordUpdate, embedBefore); - return transaction.addWrite(write as any); + const proceed = (): any => { + write.beforeIntermediate = preCommitBlobsForRecordBefore(write, recordUpdate); + return transaction.addWrite(write as any); + }; + return embedBefore ? embedBefore().then(proceed) : proceed(); } async delete(target: RequestTargetOrId): Promise { diff --git a/resources/models/embedHook.ts b/resources/models/embedHook.ts index c54b46a82..4e432f7fc 100644 --- a/resources/models/embedHook.ts +++ b/resources/models/embedHook.ts @@ -11,11 +11,17 @@ * when they need different logic (multi-field concatenation, custom * preprocessing). * - * - `buildEmbedBefore(...)` — produces the pre-commit `before` callback that - * `resources/Table.ts` chains into the existing blob-pre-commit pattern. - * Returns `undefined` when there's no work (no embed attributes, or the - * write is a replication receiver) so the call site can pass it straight - * into `preCommitBlobsForRecordBefore` as the `before` parameter. + * - `buildEmbedBefore(...)` — produces the embedder callback invoked + * *before* `transaction.addWrite(...)` at the put/patch site. The + * embedder mutates `record[attr.name]` so the new vector is on the + * record when the per-write `commit(...)` closure runs. (It can't ride + * the txn's pre-commit `before` slot because that slot is awaited at + * `Promise.all(completions)` at txn-commit time — AFTER each write's + * `commit(...)` has already stored the record. The blob pattern works + * there because blob IDs are pre-allocated synchronously and the blob + * bytes write independently of the record.) Returns `undefined` when + * there's no work (no embed attributes, or the write is a replication + * receiver) so the call site can short-circuit. * * Replicated-write predicate: the receiver should *store* the originating * node's already-computed embedding, not re-compute it. Three signals @@ -58,7 +64,7 @@ export type EmbedAttribute = { embed: EmbedConfig; }; -export type Embedder = (record: any) => Promise; +export type Embedder = (record: any) => Promise; /** * Embed-function shape the default embedder calls into. Matches the public @@ -83,7 +89,11 @@ type EmbedFn = ( let _embedFn: EmbedFn | undefined; function resolveEmbedFn(): EmbedFn { if (_embedFn) return _embedFn; - const { Models } = require('./Models.ts'); // eslint-disable-line @typescript-eslint/no-var-requires + // `#src/` alias goes through package.json conditional exports — resolves to + // `./*.ts` under the `typestrip` condition (unit tests) and to `./dist/*.js` + // in production. A bare `require('./Models.ts')` doesn't survive the dist + // build because the `.ts` extension stays literal at runtime. + const { Models } = require('#src/resources/models/Models'); // eslint-disable-line @typescript-eslint/no-var-requires const models = new Models(); _embedFn = (input, opts) => models.embed(input, opts); return _embedFn; @@ -99,7 +109,7 @@ export function __setEmbedFnForTest(fn: EmbedFn | undefined): void { export function createDefaultEmbedder(embedConfig: EmbedConfig): Embedder { const { source, model } = embedConfig; - return async (record: any): Promise => { + return async (record: any): Promise => { const sourceValue = record?.[source]; if (sourceValue == null) return null; // `embed()` always returns an array (one vector per input). `@embed`'s @@ -109,7 +119,14 @@ export function createDefaultEmbedder(embedConfig: EmbedConfig): Embedder { model, inputType: 'document', }); - return vectors?.[0]; + const v = vectors?.[0]; + if (v == null) return undefined; + // Convert Float32Array → plain Array for storage. Harper's record + // encoder doesn't round-trip typed arrays cleanly through msgpack/CRDT + // merge (`updateAndFreeze` enumerates them as `{0,1,2,...}` maps), so we + // store as a plain numeric array. HNSW's `propertyResolver` and + // `customIndex.index` both accept `number[]` and `Float32Array`. + return v instanceof Float32Array ? Array.from(v) : Array.from(v as any); }; } diff --git a/unitTests/resources/models/embedHook.test.js b/unitTests/resources/models/embedHook.test.js index d4f0e499d..c1a7e4e9e 100644 --- a/unitTests/resources/models/embedHook.test.js +++ b/unitTests/resources/models/embedHook.test.js @@ -19,12 +19,15 @@ describe('embedHook', () => { describe('createDefaultEmbedder', () => { afterEach(() => __setEmbedFnForTest(undefined)); - it('reads source field, calls Models.embed with document inputType, returns first vector', async () => { + it('reads source field, calls Models.embed with document inputType, returns first vector as Array', async () => { const embedFn = fakeEmbedCapturing(); __setEmbedFnForTest(embedFn); const embedder = createDefaultEmbedder({ source: 'content', model: 'default' }); const vec = await embedder({ content: 'hello world' }); - assert.deepEqual(vec, VECTOR); + // Default embedder converts Float32Array → Array so Harper's record + // encoder doesn't mangle it via `updateAndFreeze`. HNSW accepts both. + assert.ok(Array.isArray(vec), 'vec should be a plain Array'); + assert.deepEqual(vec, Array.from(VECTOR)); assert.equal(embedFn.calls.length, 1); assert.equal(embedFn.calls[0].input, 'hello world'); assert.equal(embedFn.calls[0].opts.model, 'default'); From b1088047d970dbc0e2bd95e4de02446dab5b7478 Mon Sep 17 00:00:00 2001 From: Nathan Heskew Date: Fri, 22 May 2026 15:59:18 -0700 Subject: [PATCH 6/7] =?UTF-8?q?fix(models):=20claude-bot=20review=20?= =?UTF-8?q?=E2=80=94=20embedder=20refresh=20+=20Float32=20byte=20cast?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two real blockers from PR #747 review: 1) Default embedder not refreshed on model change. The `!this.userEmbedders[attribute.name]` guard prevented refreshing the auto-generated default embedder when a schema reload via `databases.ts:940` in-place `attributes.splice()` calls `updatedAttributes()` again on the existing Table class — `userEmbedders` carried over, so a `@embed(model: "B")` swap from `"A"` left writes silently using the old model. Track user-set override names in `static userSetEmbedders: Set` and only skip refresh for those; defaults are refreshed every time. `setEmbedAttribute` populates the set. 2) `ArrayBuffer.isView` branch reinterpreted bytes. `new Float32Array(value.buffer)` copies the raw bytes of the backing ArrayBuffer from offset 0 — when `value` is a `Float64Array`, the 8-byte-per-element encoding gets reinterpreted as 4-byte float32s (garbage). Same issue with typed-array subarrays (nonzero `byteOffset`). Use `Float32Array.from(value)` to iterate element values and convert numerically. Both flagged inline by claude-bot on `resources/Table.ts:3433` and `resources/Table.ts:4822`. Integration test still passes 4/4, unit tests 17/17. Refs #632 #510 Co-Authored-By: Claude Opus 4.7 (1M context) --- resources/Table.ts | 34 ++++++++++++++++++++++------------ 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/resources/Table.ts b/resources/Table.ts index c2ffefff7..f1be8fb5f 100644 --- a/resources/Table.ts +++ b/resources/Table.ts @@ -262,9 +262,13 @@ export function makeTable(options) { // `@embed` write-time hook registry (Phase 5 of #510). `userEmbedders` is the per- // attribute embedder map populated by `setEmbedAttribute` — defaulted at schema // load from the directive's `(source, model)` and overridable by component authors. - // `embedAttributes` is the filtered list scanned on every write so we don't pay an + // `userSetEmbedders` tracks which names were explicitly set by `setEmbedAttribute` + // vs. auto-populated as defaults — on a schema reload with a changed `model:`, we + // must refresh defaults while preserving user-author overrides. `embedAttributes` + // is the filtered list scanned on every write so we don't pay an // `attributes.filter(...)` per put/patch. static userEmbedders: { [name: string]: Embedder } = {}; + static userSetEmbedders: Set = new Set(); static embedAttributes: EmbedAttribute[] = (attributes as any[]).filter((a) => a?.embed); static source?: typeof TableResource; declare static sourceOptions: any; @@ -3423,14 +3427,17 @@ export function makeTable(options) { attribute.resolve = null; // reset this const relationship = attribute.relationship; const computed = attribute.computed; - // `@embed` directive: register the default embedder if no override is - // already in place. This is one-time setup (populating userEmbedders), - // not a runtime resolver, so it lives outside the if/else-if chain - // below — `@embed` fields ALSO get auto-HNSW indexing, and the chain's - // `customIndex.propertyResolver` branch needs to fire for them. - // Component authors override the default via - // `Table.setEmbedAttribute(name, customEmbedder)` after schema load. - if (attribute.embed && !this.userEmbedders[attribute.name]) { + // `@embed` directive: register the default embedder. This is one-time + // setup (populating userEmbedders), not a runtime resolver, so it lives + // outside the if/else-if chain below — `@embed` fields ALSO get + // auto-HNSW indexing, and the chain's `customIndex.propertyResolver` + // branch needs to fire for them. Component authors override the default + // via `Table.setEmbedAttribute(name, customEmbedder)` after schema load. + // `userSetEmbedders` tracks the override names so we can refresh defaults + // on an in-place `Table.attributes.splice()` schema reload at + // `databases.ts:940` (which calls `updatedAttributes()` again on the + // existing class) without clobbering an author's custom embedder. + if (attribute.embed && !TableResource.userSetEmbedders.has(attribute.name)) { this.userEmbedders[attribute.name] = createDefaultEmbedder(attribute.embed); } if (relationship) { @@ -3636,6 +3643,7 @@ export function makeTable(options) { return; } this.userEmbedders[attribute_name] = embedder; + this.userSetEmbedders.add(attribute_name); } static async deleteHistory(endTime = 0, cleanupDeletedRecords = false) { let completion: Promise; @@ -4813,12 +4821,14 @@ export function coerceType(value: any, attribute: any): any { return new Date(+value); // epoch ms number case 'Vector': // JSON-typed input arrives as a plain Array; the storage layer - // expects Float32Array (what HNSW and the embedder produce). Pass typed - // arrays through, coerce JSON arrays, reject anything else. + // accepts numeric arrays (what HNSW indexes). Convert element values via + // `Float32Array.from(...)` rather than reinterpreting raw bytes — taking + // `new Float32Array(view.buffer)` would mis-cast `Float64Array` / typed- + // array subarrays as garbage float32s. if (value === null || value === 'null') return null; if (value instanceof Float32Array) return value; if (Array.isArray(value)) return Float32Array.from(value); - if (ArrayBuffer.isView(value)) return new Float32Array((value as ArrayBufferView).buffer); + if (ArrayBuffer.isView(value)) return Float32Array.from(value as any); throw new SyntaxError(); case undefined: case 'Any': From 47bd103c4e826ba8139bc4e1c65602010653cd43 Mon Sep 17 00:00:00 2001 From: Nathan Heskew Date: Sat, 23 May 2026 11:14:08 -0700 Subject: [PATCH 7/7] fix(models): @embed coverage gaps + nits from PR #747 review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Kris's review (with Gemini cross-check) flagged six items + one nit; all applied. Local: 17/17 unit + 4/4 integration green. CRITICAL — silent data loss on PATCH: TableResource.update() called this._writeUpdate(...) at two sites (lines ~1247 and ~1257) and dropped the returned promise. With @embed active, _writeUpdate returns a pending promise; without awaiting it, the caller's `save()` ran before the write was registered on the txn — the embed never reached storage and the txn committed empty (or crashed with "transaction already closed" when the embedder finally resolved). Threaded both call sites through `when(...)` matching the pattern already in put/create. HIGH — Float32Array storage mangling on user-supplied vectors: coerceType returned a Float32Array for `type: 'Vector'`. The record encoder mangles typed arrays via `updateAndFreeze` into `{0,1,2,...}` maps — the same bug we already worked around in `createDefaultEmbedder` by emitting `Array`. coerceType is on the input path for user-supplied vectors (REST writes, replication receivers, programmatic API), so it had the same hazard. Switched to `Array` here too. MEDIUM — getFromSource cache writes did not run the embedder: Caching-table populations (`sourcedFrom()` → `getFromSource()`) build their own write op at Table.ts:~4511 and call `addWrite` directly, bypassing `_writeUpdate`. So an `@embed` declared on a caching table silently never embedded. Wired `buildEmbedBefore` into that path with the same "await before addWrite" pattern. This is the canonical use case Kris highlighted (and matches #750's derived-cache-table follow- up — caching tables are where embeddings on derived data live). LOW — malformed @embed silently degraded: Missing `source:` or `model:` only logged `console.error` and let schema deployment succeed — the resulting Vector field was registered as a regular un-embedded attribute. Promote to a thrown Error so the component install fails loudly. Note: this breaks the file's existing "log and continue" convention for other directive validations (dup primary key etc.); justified inline because @embed's silent-degrade failure mode is harder to diagnose than a missing primary key. LOW — globalThis.logger: Replaced with a lazy `require('#src/utility/logging/logger')` resolver. The cleaner static `import { logger } from ...` trips the documented `common_utils.ts ↔ harper_logger.ts` CJS cycle (ERR_REQUIRE_CYCLE_MODULE) the moment this module loads via a unit-test path that bypasses the full transaction stack. Lazy resolution sidesteps the cycle while still going through standard module resolution (not globalThis lookup). NITS — applied: - Parallelize per-attribute embedders via Promise.all (Kris). Typical schemas have one @embed field per table, but multi-@embed shouldn't serialize HTTP roundtrips for no benefit. - Simplify `Object.prototype.hasOwnProperty.call(record, key)` to `key in record` (Kris's "so paranoid, we have prototype-pollution protection (freezing)"). Open question (not in this commit): Should `Vector` storage preserve Float64/Float16 precision? Today everything flattens to Array and is re-narrowed when HNSW needs Float32 for distance computation. Kris asked about Float16 specifically; deferring to PR thread. Refs #632 #510 #750 Co-Authored-By: Claude Opus 4.7 (1M context) --- resources/Table.ts | 46 +++++++++++++++------ resources/graphql.ts | 20 ++++++--- resources/models/embedHook.ts | 77 +++++++++++++++++++++++------------ 3 files changed, 98 insertions(+), 45 deletions(-) diff --git a/resources/Table.ts b/resources/Table.ts index f1be8fb5f..b13de7d5b 100644 --- a/resources/Table.ts +++ b/resources/Table.ts @@ -1248,14 +1248,16 @@ export function makeTable(options) { } return when(loading, () => { this.#changes = updates; - this._writeUpdate(id, this.#changes, false); - return this; + // Thread `_writeUpdate`'s return through `when()` so the embed-hook + // promise (when `@embed` is active) is awaited before this method + // resolves — otherwise the caller proceeds to `save()` with the + // write not yet registered on the txn. + return when(this._writeUpdate(id, this.#changes, false), () => this); }); }); } } - this._writeUpdate(id, this.#changes, fullUpdate); - return this; + return when(this._writeUpdate(id, this.#changes, fullUpdate), () => this); } /** @@ -4506,6 +4508,23 @@ export function makeTable(options) { } }, }; + // `@embed` write-time hook on the cache-from-source path. The + // `getFromSource` write builds its own write op and addWrite-s it + // directly here — bypassing `_writeUpdate` — so we have to wire the + // embedder here too. Caching tables that declare `@embed` are a + // canonical use case (see #750): the source resource returns the + // record's content fields, and this table derives the vector. + // Receivers do not run the embedder; this path is on the originating + // node by definition (no `replicateFrom` / `alreadyLogged` / cluster- + // subscribe context here). + const embedBefore = buildEmbedBefore( + updatedRecord, + sourceContext, + undefined, + TableResource.embedAttributes, + TableResource.userEmbedders + ); + if (embedBefore) await embedBefore(); sourceWrite.before = preCommitBlobsForRecordBefore(sourceWrite, updatedRecord); dbTxn.addWrite(sourceWrite); }), @@ -4820,15 +4839,18 @@ export function coerceType(value: any, attribute: any): any { } return new Date(+value); // epoch ms number case 'Vector': - // JSON-typed input arrives as a plain Array; the storage layer - // accepts numeric arrays (what HNSW indexes). Convert element values via - // `Float32Array.from(...)` rather than reinterpreting raw bytes — taking - // `new Float32Array(view.buffer)` would mis-cast `Float64Array` / typed- - // array subarrays as garbage float32s. + // Coerce to plain `Array`. Typed arrays are stored unchanged by + // `updateAndFreeze` only if we hand them off as plain arrays — handing a + // `Float32Array` through gets enumerated as `{0,1,2,...}` map keys and + // loses the typed-array shape on read. HNSW's `propertyResolver` and + // `customIndex.index` both accept `number[]`. (Open Q in PR review: + // should Vector preserve Float64 / Float16 precision? Today everything + // flattens to `number[]` and is re-narrowed when the HNSW index needs + // Float32 for distance computation.) if (value === null || value === 'null') return null; - if (value instanceof Float32Array) return value; - if (Array.isArray(value)) return Float32Array.from(value); - if (ArrayBuffer.isView(value)) return Float32Array.from(value as any); + if (value instanceof Float32Array) return Array.from(value); + if (Array.isArray(value)) return value.map(Number); + if (ArrayBuffer.isView(value)) return Array.from(value as any); throw new SyntaxError(); case undefined: case 'Any': diff --git a/resources/graphql.ts b/resources/graphql.ts index f2b06d259..ebd1dca0a 100644 --- a/resources/graphql.ts +++ b/resources/graphql.ts @@ -179,12 +179,20 @@ async function processGraphQLSchema(gqlContent, urlPath, filePath, resources) { embedDefinition[arg.name.value] = (arg.value as StringValueNode).value; } if (!embedDefinition.source || !embedDefinition.model) { - // Missing required args silently degrades the embedder to a no-op at - // write time, leaving the vector column empty with no operator signal. - // Refuse to register the directive in that case. - console.error( - `@embed on "${property.name}" requires both "source" and "model" arguments, at`, - directive.loc + // Missing required args would silently degrade the embedder to a + // no-op at write time, leaving the vector column empty with no + // operator signal. Halt schema processing so the component install + // surfaces the error instead of returning 200 OK on a broken + // schema. Other directive validations in this parser (e.g. dup + // primary key at line ~119) only log and continue — the difference + // here is that the failure mode is "looks-correct schema that + // never embeds anything," which is harder to diagnose than a + // missing primary key. + const loc = directive.loc; + console.error(`@embed on "${property.name}" requires both "source" and "model" arguments, at`, loc); + throw new Error( + `@embed on "${property.name}" requires both "source" and "model" arguments` + + (loc ? ` (line ${loc.startToken?.line ?? '?'}, column ${loc.startToken?.column ?? '?'})` : '') ); } else { property.embed = embedDefinition; diff --git a/resources/models/embedHook.ts b/resources/models/embedHook.ts index 4e432f7fc..35ac7821f 100644 --- a/resources/models/embedHook.ts +++ b/resources/models/embedHook.ts @@ -54,6 +54,22 @@ * it already needs the iterate-records-and-back-fill primitive. */ +/** + * Lazy logger resolver. A static `import { logger } from '../../utility/logging/logger.ts'` + * would be cleaner but trips the documented `common_utils.ts ↔ harper_logger.ts` + * CJS cycle (ERR_REQUIRE_CYCLE_MODULE) the moment this module is loaded by a + * unit-test path that bypasses the full transaction stack. We only need the + * logger on the failure path, so we resolve it lazily. + */ +function getLogger(): { error?: (...args: any[]) => void } { + try { + // eslint-disable-next-line @typescript-eslint/no-var-requires + return require('#src/utility/logging/logger').logger ?? {}; + } catch { + return {}; + } +} + export type EmbedConfig = { source: string; model: string; @@ -166,38 +182,45 @@ export function buildEmbedBefore( let anySourcePresent = false; for (const attr of embedAttributes) { const sourceKey = attr.embed?.source; - if (sourceKey && Object.prototype.hasOwnProperty.call(record, sourceKey)) { + if (sourceKey && sourceKey in record) { anySourcePresent = true; break; } } if (!anySourcePresent) return undefined; return async (): Promise => { - for (const attr of embedAttributes) { - const sourceKey = attr.embed?.source; - if (!sourceKey) continue; - if (!Object.prototype.hasOwnProperty.call(record, sourceKey)) continue; - const sourceValue = record[sourceKey]; - if (sourceValue == null) { - record[attr.name] = null; - continue; - } - const embedder = userEmbedders[attr.name]; - if (!embedder) continue; - let vector; - try { - vector = await embedder(record); - } catch (err) { - // Embedder backends (OpenAI, Anthropic, Bedrock, Ollama) may include URLs, - // model identifiers, or API-key tails in error messages. Those land in HTTP - // responses if propagated raw — Harper's threat model trusts deployers but - // not arbitrary REST callers, so we log the raw error and rethrow a - // sanitized one. The original error stays in server logs for diagnosis. - const logger = (globalThis as any).logger; - logger?.error?.(`Embedder for attribute "${attr.name}" failed:`, err); - throw new Error(`Failed to compute embedding for attribute "${attr.name}"`); - } - record[attr.name] = vector == null ? null : vector; - } + // Run embedders for each `@embed` attribute in parallel — typical schemas + // have a single `@embed` field per table but a multi-`@embed` table would + // otherwise serialize HTTP roundtrips for no benefit. Each embedder + // mutates a distinct attribute on the same record, so there's no + // ordering hazard between them. + await Promise.all( + embedAttributes.map(async (attr) => { + const sourceKey = attr.embed?.source; + if (!sourceKey) return; + if (!(sourceKey in record)) return; + const sourceValue = record[sourceKey]; + if (sourceValue == null) { + record[attr.name] = null; + return; + } + const embedder = userEmbedders[attr.name]; + if (!embedder) return; + let vector; + try { + vector = await embedder(record); + } catch (err) { + // Embedder backends (OpenAI, Anthropic, Bedrock, Ollama) may include + // URLs, model identifiers, or API-key tails in error messages. Those + // land in HTTP responses if propagated raw — Harper's threat model + // trusts deployers but not arbitrary REST callers, so we log the raw + // error and rethrow a sanitized one. The original error stays in + // server logs for diagnosis. + getLogger().error?.(`Embedder for attribute "${attr.name}" failed:`, err); + throw new Error(`Failed to compute embedding for attribute "${attr.name}"`); + } + record[attr.name] = vector == null ? null : vector; + }) + ); }; }