From ba047b6e46332aba0b06bcefb4b8c6a903ca4ed2 Mon Sep 17 00:00:00 2001 From: Brian Sun Date: Wed, 4 Mar 2026 00:07:23 +0900 Subject: [PATCH 1/2] feat: add personalized recommendations pipeline (TypeScript port) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Port the Python recs engine (clients/wander/recs/) to TypeScript, integrated with the existing BigQuery and Customer.io clients. 5-phase pipeline: - Phase 1: Property embeddings via Voyage AI (voyage-3-lite, 1024 dims) - Phase 2: Aggregate user behavior signals into profiles - Phase 3: Compute user preference embeddings (weighted avg) - Phase 4: Score and rank properties → top 3 per user (multi-factor) - Phase 5: Sync recommendation attributes to Customer.io profiles Files: - types.ts — All types, signal weights, scoring config - queries.ts — BigQuery queries (properties, signals, searches) - embeddings.ts — Voyage AI embedding generation with batching - engine.ts — Full rec engine with vector math, time decay, diversity - sync.ts — Concurrent CIO sync via Track API - index.ts — Pipeline orchestrator + barrel exports Co-Authored-By: Claude Opus 4.6 --- src/server/services/recs/embeddings.ts | 165 ++++++++ src/server/services/recs/engine.ts | 501 +++++++++++++++++++++++++ src/server/services/recs/index.ts | 147 ++++++++ src/server/services/recs/queries.ts | 121 ++++++ src/server/services/recs/sync.ts | 249 ++++++++++++ src/server/services/recs/types.ts | 150 ++++++++ 6 files changed, 1333 insertions(+) create mode 100644 src/server/services/recs/embeddings.ts create mode 100644 src/server/services/recs/engine.ts create mode 100644 src/server/services/recs/index.ts create mode 100644 src/server/services/recs/queries.ts create mode 100644 src/server/services/recs/sync.ts create mode 100644 src/server/services/recs/types.ts diff --git a/src/server/services/recs/embeddings.ts b/src/server/services/recs/embeddings.ts new file mode 100644 index 0000000..70d6a40 --- /dev/null +++ b/src/server/services/recs/embeddings.ts @@ -0,0 +1,165 @@ +/** + * Phase 1: Generate property embeddings using Voyage AI. + * + * Uses voyage-3-lite model (1024 dimensions, $0.06/1M tokens). + * Total cost: ~$0.01 for ~730 properties. + * + * Ported from Python: clients/wander/recs/generate_embeddings.py + */ +import "server-only"; +import type { BQProperty, PropertyEmbedding } from "./types"; + +const VOYAGE_URL = "https://api.voyageai.com/v1/embeddings"; +const BATCH_SIZE = 128; + +// ── Text Builder ─────────────────────────────────────────────── + +function buildEmbeddingText(prop: BQProperty): string { + const pets = prop.is_pet_allowed ? "pet-friendly" : "no pets"; + const description = (prop.description || "").slice(0, 500); + + let testimonials: string[] = []; + try { + testimonials = prop.testimonial_text ? JSON.parse(prop.testimonial_text) : []; + } catch { + // ignore parse errors + } + const topTestimonial = testimonials[0]?.slice(0, 200) ?? ""; + + let activities: string[] = []; + try { + activities = prop.activities ? JSON.parse(prop.activities) : []; + } catch { + // ignore parse errors + } + const activityNames = activities.slice(0, 10).map((a) => { + if (a.includes(" - ") && a.includes(":")) { + return a.split(" - ")[1]?.split(":")[0]?.trim() ?? a; + } + return a; + }); + + let text = `${prop.property_name} +${prop.city}, ${prop.state} +${prop.landscape_category} landscape +${prop.bedrooms ?? "?"} bedrooms, ${prop.bathrooms ?? "?"} bathrooms, sleeps ${prop.occupancy ?? "?"} +$${prop.base_price ?? "?"}/night, ${pets} +${description}`; + + if (topTestimonial) { + text += `\nGuest review: ${topTestimonial}`; + } + if (activityNames.length > 0) { + text += `\nNearby: ${activityNames.join(", ")}`; + } + + return text; +} + +// ── Embedding Generation ─────────────────────────────────────── + +interface VoyageResponse { + data: Array<{ embedding: number[] }>; + usage: { total_tokens: number }; +} + +async function fetchVoyageEmbeddings( + texts: string[], + apiKey: string, +): Promise<{ embeddings: number[][]; tokens: number }> { + const allEmbeddings: number[][] = []; + let totalTokens = 0; + + for (let i = 0; i < texts.length; i += BATCH_SIZE) { + const batch = texts.slice(i, i + BATCH_SIZE); + const batchNum = Math.floor(i / BATCH_SIZE) + 1; + const totalBatches = Math.ceil(texts.length / BATCH_SIZE); + + console.log( + ` Batch ${batchNum}/${totalBatches} (${batch.length} properties)...`, + ); + + let success = false; + for (let attempt = 0; attempt < 8; attempt++) { + const response = await fetch(VOYAGE_URL, { + method: "POST", + headers: { + Authorization: `Bearer ${apiKey}`, + "Content-Type": "application/json", + }, + body: JSON.stringify({ + input: batch, + model: "voyage-3-lite", + input_type: "document", + }), + }); + + if (response.ok) { + const data = (await response.json()) as VoyageResponse; + for (const item of data.data) { + allEmbeddings.push(item.embedding); + } + totalTokens += data.usage.total_tokens; + console.log(` ✓ (${data.usage.total_tokens} tokens)`); + success = true; + break; + } else if (response.status === 429) { + const wait = 25 * (attempt + 1); + console.log(` 429 rate limited, waiting ${wait}s...`); + await new Promise((r) => setTimeout(r, wait * 1000)); + } else { + const text = await response.text(); + console.error(` Error ${response.status}: ${text}`); + break; + } + } + + if (!success) { + throw new Error(`Failed to generate embeddings for batch ${batchNum}`); + } + + // Rate limit: wait between batches + if (i + BATCH_SIZE < texts.length) { + await new Promise((r) => setTimeout(r, 22000)); + } + } + + return { embeddings: allEmbeddings, tokens: totalTokens }; +} + +// ── Public API ───────────────────────────────────────────────── + +export async function generatePropertyEmbeddings( + properties: BQProperty[], + voyageApiKey: string, +): Promise> { + console.log(`\nGenerating embeddings for ${properties.length} properties...`); + + const texts = properties.map(buildEmbeddingText); + const { embeddings, tokens } = await fetchVoyageEmbeddings( + texts, + voyageApiKey, + ); + + console.log(` Total tokens: ${tokens.toLocaleString()}`); + console.log(` Estimated cost: $${((tokens * 0.06) / 1_000_000).toFixed(4)}`); + + const result: Record = {}; + for (let i = 0; i < properties.length; i++) { + const prop = properties[i]!; + result[prop.property_name] = { + embedding: embeddings[i]!, + city: prop.city, + state: prop.state, + landscape: prop.landscape_category, + base_price: prop.base_price, + bedrooms: prop.bedrooms, + occupancy: prop.occupancy, + url: prop.url, + cover_image_url: prop.cover_image_url, + }; + } + + console.log(` Generated ${Object.keys(result).length} embeddings`); + return result; +} diff --git a/src/server/services/recs/engine.ts b/src/server/services/recs/engine.ts new file mode 100644 index 0000000..953cfb5 --- /dev/null +++ b/src/server/services/recs/engine.ts @@ -0,0 +1,501 @@ +/** + * Phases 2-4: Recommendation engine. + * + * Phase 2: Aggregate user behavior signals into profiles + * Phase 3: Compute user preference embeddings (weighted avg of property embeddings) + * Phase 4: Score and rank properties per user → top recommendations + * + * Ported from Python: clients/wander/recs/generate_recommendations.py + */ +import "server-only"; +import type { + BQUserSignal, + BQUserSearch, + PropertyEmbedding, + PropertySignals, + UserProfile, + Recommendation, + UserRecommendations, +} from "./types"; +import { + SIGNAL_WEIGHTS, + SCORING_WEIGHTS, + RECS_CONFIG, +} from "./types"; + +// ── Vector Math (replaces numpy) ─────────────────────────────── + +function dotProduct(a: number[], b: number[]): number { + let sum = 0; + for (let i = 0; i < a.length; i++) { + sum += a[i]! * b[i]!; + } + return sum; +} + +function norm(v: number[]): number { + return Math.sqrt(dotProduct(v, v)); +} + +function cosineSimilarity(a: number[], b: number[]): number { + const normA = norm(a); + const normB = norm(b); + if (normA === 0 || normB === 0) return 0; + return dotProduct(a, b) / (normA * normB); +} + +function normalizeVector(v: number[]): number[] { + const n = norm(v); + if (n === 0) return v; + return v.map((x) => x / n); +} + +function addWeighted( + target: number[], + source: number[], + weight: number, +): void { + for (let i = 0; i < target.length; i++) { + target[i]! += source[i]! * weight; + } +} + +// ── Time Decay ───────────────────────────────────────────────── + +function timeDecayWeight(lastInteractionTs: string | null): number { + if (!lastInteractionTs) return RECS_CONFIG.decayFloor; + + try { + const ts = new Date(lastInteractionTs); + const now = new Date(); + const daysAgo = (now.getTime() - ts.getTime()) / (1000 * 60 * 60 * 24); + const decay = Math.pow(2, -daysAgo / RECS_CONFIG.decayHalfLifeDays); + return Math.max(decay, RECS_CONFIG.decayFloor); + } catch { + return RECS_CONFIG.decayFloor; + } +} + +// ── Phase 2: Aggregate User Profiles ─────────────────────────── + +export function aggregateUserProfiles( + signals: BQUserSignal[], + searches: BQUserSearch[], +): Map { + const users = new Map(); + + const getOrCreate = (uid: string): UserProfile => { + let profile = users.get(uid); + if (!profile) { + profile = { + properties: {}, + search_locations: [], + minerva_location: null, + cio_id: null, + email: null, + total_signals: 0, + }; + users.set(uid, profile); + } + return profile; + }; + + // Process property signals + for (const row of signals) { + const profile = getOrCreate(row.id_user); + if (!profile.properties[row.property_name]) { + profile.properties[row.property_name] = { + view_count: 0, + abandon_count: 0, + book_count: 0, + wishlisted: false, + last_interaction: null, + }; + } + const p = profile.properties[row.property_name]!; + p.view_count += row.view_count; + p.abandon_count += row.abandon_count; + p.book_count += row.book_count; + if ( + row.last_interaction && + (!p.last_interaction || row.last_interaction > p.last_interaction) + ) { + p.last_interaction = row.last_interaction; + } + } + + // Process search locations + const searchByUser = new Map< + string, + Array<{ state: string | null; city: string | null; count: number }> + >(); + for (const row of searches) { + if (!searchByUser.has(row.id_user)) { + searchByUser.set(row.id_user, []); + } + searchByUser.get(row.id_user)!.push({ + state: row.search_location_state, + city: row.search_location_city, + count: row.search_count, + }); + } + for (const [uid, locs] of searchByUser) { + const profile = getOrCreate(uid); + profile.search_locations = locs.sort((a, b) => b.count - a.count); + } + + // Compute total signals + for (const profile of users.values()) { + let total = 0; + for (const p of Object.values(profile.properties)) { + total += p.view_count * 1; + total += p.abandon_count * 2; + total += p.book_count * 3; + if (p.wishlisted) total += SIGNAL_WEIGHTS.wishlisted; + } + profile.total_signals = total; + } + + // Filter to users with at least 1 property interaction + const active = new Map(); + for (const [uid, profile] of users) { + if (Object.keys(profile.properties).length >= 1) { + active.set(uid, profile); + } + } + + console.log( + ` ${users.size.toLocaleString()} total users → ${active.size.toLocaleString()} active (≥1 property)`, + ); + return active; +} + +// ── Phase 3: Compute User Embeddings ─────────────────────────── + +function computeUserEmbedding( + profile: UserProfile, + embeddings: Record, +): number[] | null { + const dim = Object.values(embeddings)[0]?.embedding.length ?? 1024; + const weightedSum = new Array(dim).fill(0) as number[]; + let totalWeight = 0; + + for (const [propName, signals] of Object.entries(profile.properties)) { + // Find embedding (case-insensitive fallback) + let embData = embeddings[propName]; + if (!embData) { + const lower = propName.toLowerCase(); + for (const [name, data] of Object.entries(embeddings)) { + if (name.toLowerCase() === lower) { + embData = data; + break; + } + } + } + if (!embData) continue; + + let rawWeight = 0; + rawWeight += signals.view_count * SIGNAL_WEIGHTS.viewed; + rawWeight += signals.abandon_count * SIGNAL_WEIGHTS.abandoned; + rawWeight += signals.book_count * SIGNAL_WEIGHTS.booked; + if (signals.wishlisted) rawWeight += SIGNAL_WEIGHTS.wishlisted; + + if (rawWeight > 0) { + const decay = timeDecayWeight(signals.last_interaction); + const weight = rawWeight * decay; + addWeighted(weightedSum, embData.embedding, weight); + totalWeight += weight; + } + } + + if (totalWeight === 0) return null; + return weightedSum.map((v) => v / totalWeight); +} + +export function computeAllUserEmbeddings( + profiles: Map, + embeddings: Record, +): Map { + const result = new Map(); + let skipped = 0; + + for (const [uid, profile] of profiles) { + const emb = computeUserEmbedding(profile, embeddings); + if (emb) { + result.set(uid, emb); + } else { + skipped++; + } + } + + console.log( + ` Computed ${result.size.toLocaleString()} user embeddings (${skipped} skipped)`, + ); + return result; +} + +// ── Phase 4: Generate Recommendations ────────────────────────── + +function computePopularityScores( + profiles: Map, +): Map { + const scores = new Map(); + + for (const profile of profiles.values()) { + for (const [name, s] of Object.entries(profile.properties)) { + const current = scores.get(name) ?? 0; + scores.set(name, current + s.view_count + s.abandon_count * 3 + s.book_count * 10); + } + } + + const maxScore = Math.max(...scores.values(), 1); + for (const [name, score] of scores) { + scores.set(name, score / maxScore); + } + + return scores; +} + +function priceMatchScore( + candidatePrice: number | null, + userPricePref: number | null, +): number { + if (!candidatePrice || !userPricePref) return 0.5; + const ratio = candidatePrice / userPricePref; + return Math.max(0, 1 - Math.abs(Math.log(Math.max(ratio, 0.1))) * 0.5); +} + +function computePriceAffinity( + profile: UserProfile, + embeddings: Record, +): number | null { + const prices: number[] = []; + for (const [propName, signals] of Object.entries(profile.properties)) { + const emb = embeddings[propName]; + if (emb?.base_price) { + const weight = + signals.view_count + signals.abandon_count * 2 + signals.book_count * 5; + for (let i = 0; i < Math.max(1, weight); i++) { + prices.push(emb.base_price); + } + } + } + if (prices.length === 0) return null; + prices.sort((a, b) => a - b); + return prices[Math.floor(prices.length / 2)]!; // median +} + +function generateForYou( + userEmbedding: number[], + profile: UserProfile, + embeddings: Record, + popularityScores: Map, +): Recommendation[] { + const booked = new Set( + Object.entries(profile.properties) + .filter(([, s]) => s.book_count > 0) + .map(([name]) => name), + ); + const interacted = new Set(Object.keys(profile.properties)); + const pricePref = computePriceAffinity(profile, embeddings); + const userNormed = normalizeVector(userEmbedding); + + // Score all properties by cosine similarity + const scored: Array<{ name: string; sim: number }> = []; + for (const [name, emb] of Object.entries(embeddings)) { + if (booked.has(name)) continue; + const embNormed = normalizeVector(emb.embedding); + const sim = dotProduct(userNormed, embNormed); + scored.push({ name, sim }); + } + scored.sort((a, b) => b.sim - a.sim); + + // Multi-factor scoring on top candidates + const candidates: Recommendation[] = []; + for (const { name, sim } of scored.slice(0, RECS_CONFIG.topCandidates)) { + const emb = embeddings[name]!; + const score = + sim * SCORING_WEIGHTS.similarity + + (popularityScores.get(name) ?? 0) * SCORING_WEIGHTS.popularity + + priceMatchScore(emb.base_price, pricePref) * SCORING_WEIGHTS.price_match; + + candidates.push({ + property_name: name, + score: Math.round(score * 10000) / 10000, + similarity: Math.round(sim * 10000) / 10000, + city: emb.city, + state: emb.state, + landscape: emb.landscape, + base_price: emb.base_price, + bedrooms: emb.bedrooms, + url: emb.url, + cover_image_url: emb.cover_image_url, + previously_viewed: interacted.has(name), + }); + } + + candidates.sort((a, b) => b.score - a.score); + + // Pick top N with landscape diversity + const final: Recommendation[] = []; + const seenLandscapes = new Set(); + + for (const rec of candidates) { + if (final.length >= RECS_CONFIG.recsPerFeed) break; + if ( + seenLandscapes.has(rec.landscape) && + final.length < RECS_CONFIG.recsPerFeed - 1 + ) { + continue; + } + final.push(rec); + seenLandscapes.add(rec.landscape); + } + + // Fill if diversity filtering was too aggressive + if (final.length < RECS_CONFIG.recsPerFeed) { + for (const rec of candidates) { + if (!final.includes(rec)) final.push(rec); + if (final.length >= RECS_CONFIG.recsPerFeed) break; + } + } + + return final; +} + +function generateNearYou( + profile: UserProfile, + embeddings: Record, +): Recommendation[] { + if (profile.search_locations.length === 0) return []; + + const top = profile.search_locations[0]!; + if (!top.state) return []; + + const booked = new Set( + Object.entries(profile.properties) + .filter(([, s]) => s.book_count > 0) + .map(([name]) => name), + ); + + const candidates: Recommendation[] = []; + for (const [name, emb] of Object.entries(embeddings)) { + if (booked.has(name)) continue; + + let geoScore = 0; + if (top.city && emb.city.toLowerCase() === top.city.toLowerCase()) { + geoScore = 1.0; + } else if (emb.state.toLowerCase() === top.state!.toLowerCase()) { + geoScore = 0.7; + } else { + continue; + } + + candidates.push({ + property_name: name, + score: geoScore, + city: emb.city, + state: emb.state, + landscape: emb.landscape, + base_price: emb.base_price, + url: emb.url, + cover_image_url: emb.cover_image_url, + search_location: [top.city, top.state].filter(Boolean).join(", "), + }); + } + + candidates.sort((a, b) => b.score - a.score); + return candidates.slice(0, RECS_CONFIG.recsPerFeed); +} + +function generateColdStartRecs( + embeddings: Record, + popularityScores: Map, +): Recommendation[] { + const candidates: Recommendation[] = []; + for (const [name, emb] of Object.entries(embeddings)) { + candidates.push({ + property_name: name, + score: popularityScores.get(name) ?? 0, + city: emb.city, + state: emb.state, + landscape: emb.landscape, + base_price: emb.base_price, + url: emb.url, + cover_image_url: emb.cover_image_url, + }); + } + + candidates.sort((a, b) => b.score - a.score); + + const recs: Recommendation[] = []; + const seenLandscapes = new Set(); + for (const c of candidates) { + if (!seenLandscapes.has(c.landscape) || recs.length >= 2) { + recs.push(c); + seenLandscapes.add(c.landscape); + } + if (recs.length >= RECS_CONFIG.recsPerFeed) break; + } + + return recs; +} + +// ── Main Generation ──────────────────────────────────────────── + +export function generateAllRecommendations( + profiles: Map, + userEmbeddings: Map, + embeddings: Record, +): Map { + const popularityScores = computePopularityScores(profiles); + const coldStartRecs = generateColdStartRecs(embeddings, popularityScores); + + const allRecs = new Map(); + let withRecs = 0; + let coldStart = 0; + + let i = 0; + for (const [uid, profile] of profiles) { + i++; + if (i % 10000 === 0) { + console.log( + ` Processing user ${i.toLocaleString()}/${profiles.size.toLocaleString()}...`, + ); + } + + const userEmb = userEmbeddings.get(uid); + + if (!userEmb) { + allRecs.set(uid, { + for_you: coldStartRecs, + near_you: [], + launches: [], + is_cold_start: true, + }); + coldStart++; + continue; + } + + const forYou = generateForYou( + userEmb, + profile, + embeddings, + popularityScores, + ); + const nearYou = generateNearYou(profile, embeddings); + + allRecs.set(uid, { + for_you: forYou, + near_you: nearYou, + launches: [], // TODO: implement when dt_launched is reliably available + is_cold_start: false, + }); + withRecs++; + } + + console.log( + ` Generated recs for ${withRecs.toLocaleString()} users (${coldStart.toLocaleString()} cold start)`, + ); + return allRecs; +} diff --git a/src/server/services/recs/index.ts b/src/server/services/recs/index.ts new file mode 100644 index 0000000..fe2b524 --- /dev/null +++ b/src/server/services/recs/index.ts @@ -0,0 +1,147 @@ +/** + * Personalized Recommendations Pipeline — Orchestrator + * + * Ties together all 5 phases: + * Phase 1: Generate property embeddings (Voyage AI) + * Phase 2: Aggregate user behavior signals into profiles + * Phase 3: Compute user preference embeddings (weighted avg) + * Phase 4: Score and rank properties per user → top recommendations + * Phase 5: Sync recommendations to Customer.io profile attributes + * + * The pipeline powers personalized email recommendations for ~238K users. + * Runs as a scheduled job (daily or on-demand via API route). + * + * Ported from Python: clients/wander/recs/ + */ +import "server-only"; + +// Re-exports +export type { + BQProperty, + BQUserSignal, + BQUserSearch, + PropertyEmbedding, + PropertySignals, + UserProfile, + Recommendation, + UserRecommendations, + CIORecAttributes, +} from "./types"; +export { + SIGNAL_WEIGHTS, + SCORING_WEIGHTS, + RECS_CONFIG, + REC_FIELDS, +} from "./types"; + +export { fetchProperties, fetchUserSignals, fetchUserSearches } from "./queries"; +export { generatePropertyEmbeddings } from "./embeddings"; +export { + aggregateUserProfiles, + computeAllUserEmbeddings, + generateAllRecommendations, +} from "./engine"; +export { syncRecommendationsToCIO } from "./sync"; +export type { SyncOptions, SyncResult } from "./sync"; + +// ── Full Pipeline ───────────────────────────────────────────── + +import { fetchProperties, fetchUserSignals, fetchUserSearches } from "./queries"; +import { generatePropertyEmbeddings } from "./embeddings"; +import { aggregateUserProfiles, computeAllUserEmbeddings, generateAllRecommendations } from "./engine"; +import { syncRecommendationsToCIO } from "./sync"; +import type { SyncOptions, SyncResult } from "./sync"; +import type { UserRecommendations } from "./types"; + +export interface PipelineOptions { + /** Voyage AI API key for embedding generation */ + voyageApiKey: string; + /** If true, skip CIO sync (just generate recs) */ + skipSync?: boolean; + /** Options passed to CIO sync */ + syncOptions?: SyncOptions; +} + +export interface PipelineResult { + propertyCount: number; + userCount: number; + embeddingCount: number; + recsGenerated: number; + coldStartCount: number; + sync?: SyncResult; + allRecs: Map; + elapsedMs: number; +} + +/** + * Run the full personalized recommendations pipeline. + * + * This is the main entry point — call from an API route or scheduled job. + */ +export async function runRecsPipeline( + options: PipelineOptions, +): Promise { + const start = Date.now(); + + console.log("=" .repeat(60)); + console.log("Personalized Recommendations Pipeline"); + console.log("=" .repeat(60)); + + // Phase 1: Fetch properties and generate embeddings + console.log("\n── Phase 1: Property Embeddings ──────────────────────"); + const properties = await fetchProperties(); + console.log(` Fetched ${properties.length} bookable properties`); + + const embeddings = await generatePropertyEmbeddings(properties, options.voyageApiKey); + + // Phase 2: Fetch user signals and aggregate profiles + console.log("\n── Phase 2: User Profiles ────────────────────────────"); + const [signals, searches] = await Promise.all([ + fetchUserSignals(), + fetchUserSearches(), + ]); + console.log(` Fetched ${signals.length.toLocaleString()} signals, ${searches.length.toLocaleString()} searches`); + + const profiles = aggregateUserProfiles(signals, searches); + + // Phase 3: Compute user embeddings + console.log("\n── Phase 3: User Embeddings ──────────────────────────"); + const userEmbeddings = computeAllUserEmbeddings(profiles, embeddings); + + // Phase 4: Generate recommendations + console.log("\n── Phase 4: Recommendations ─────────────────────────"); + const allRecs = generateAllRecommendations(profiles, userEmbeddings, embeddings); + + let coldStartCount = 0; + for (const recs of allRecs.values()) { + if (recs.is_cold_start) coldStartCount++; + } + + // Phase 5: Sync to Customer.io + let syncResult: SyncResult | undefined; + if (!options.skipSync) { + console.log("\n── Phase 5: CIO Sync ────────────────────────────────"); + syncResult = await syncRecommendationsToCIO(allRecs, options.syncOptions); + } + + const elapsedMs = Date.now() - start; + console.log(`\n${"=".repeat(60)}`); + console.log(`Pipeline complete in ${(elapsedMs / 1000).toFixed(1)}s`); + console.log(` ${Object.keys(embeddings).length} properties`); + console.log(` ${allRecs.size.toLocaleString()} users (${coldStartCount.toLocaleString()} cold start)`); + if (syncResult) { + console.log(` ${syncResult.synced.toLocaleString()} synced to CIO`); + } + console.log("=".repeat(60)); + + return { + propertyCount: properties.length, + userCount: profiles.size, + embeddingCount: Object.keys(embeddings).length, + recsGenerated: allRecs.size, + coldStartCount, + sync: syncResult, + allRecs, + elapsedMs, + }; +} diff --git a/src/server/services/recs/queries.ts b/src/server/services/recs/queries.ts new file mode 100644 index 0000000..ee29998 --- /dev/null +++ b/src/server/services/recs/queries.ts @@ -0,0 +1,121 @@ +/** + * BigQuery queries for the personalized recommendations pipeline. + * + * Tables used: + * - analytics.int_properties — property metadata (name, location, price, images) + * - analytics.reviews — guest reviews (for testimonials in embeddings) + * - analytics.bookings_gmv — booking history for signal aggregation + * - analytics.int_wishlist — wishlist data for signal aggregation + * + * Ported from Python: clients/wander/recs/BIGQUERY_QUERIES.md + */ +import "server-only"; +import { executeQuery } from "@/server/clients/bigquery.client"; +import type { BQProperty, BQUserSignal, BQUserSearch } from "./types"; + +// ── Properties ───────────────────────────────────────────────── + +const PROPERTIES_QUERY = ` +SELECT + p.property_name, + p.city, + p.state, + p.bedrooms, + p.bathrooms, + p.occupancy, + p.base_price, + p.landscape_category, + p.description, + p.is_pet_allowed, + p.cover_image_url, + p.url, + p.dt_launched, + TO_JSON_STRING(ARRAY( + SELECT r.text FROM UNNEST(p.testimonial_text) AS r LIMIT 3 + )) AS testimonial_text, + TO_JSON_STRING(ARRAY( + SELECT a FROM UNNEST(p.activities) AS a LIMIT 10 + )) AS activities +FROM \`wander-9fc9c.analytics.int_properties\` p +WHERE p.is_bookable = TRUE + AND p.cover_image_url IS NOT NULL +ORDER BY p.property_name +`; + +export async function fetchProperties(): Promise { + return executeQuery(PROPERTIES_QUERY); +} + +// ── User Behavior Signals ────────────────────────────────────── + +const USER_SIGNALS_QUERY = ` +WITH property_views AS ( + SELECT + id_user, + property_name, + COUNT(*) AS view_count, + MAX(ts) AS last_view + FROM \`wander-9fc9c.analytics.int_product_viewed\` + WHERE id_user IS NOT NULL AND property_name IS NOT NULL + GROUP BY id_user, property_name +), +abandoned_checkouts AS ( + SELECT + id_user, + property_name, + COUNT(*) AS abandon_count, + MAX(ts) AS last_abandon + FROM \`wander-9fc9c.analytics.funnel_events\` + WHERE event_type = 'checkout_started' + AND id_user IS NOT NULL + AND property_name IS NOT NULL + GROUP BY id_user, property_name +), +bookings AS ( + SELECT + id_user, + property_name, + COUNT(*) AS book_count, + MAX(ts_created) AS last_booking + FROM \`wander-9fc9c.analytics.bookings_gmv\` + WHERE is_profit = TRUE AND status = 'confirmed' + AND id_user IS NOT NULL AND property_name IS NOT NULL + GROUP BY id_user, property_name +) +SELECT + COALESCE(v.id_user, a.id_user, b.id_user) AS id_user, + COALESCE(v.property_name, a.property_name, b.property_name) AS property_name, + COALESCE(v.view_count, 0) AS view_count, + COALESCE(a.abandon_count, 0) AS abandon_count, + COALESCE(b.book_count, 0) AS book_count, + GREATEST(v.last_view, a.last_abandon, b.last_booking) AS last_interaction +FROM property_views v +FULL OUTER JOIN abandoned_checkouts a + ON v.id_user = a.id_user AND v.property_name = a.property_name +FULL OUTER JOIN bookings b + ON COALESCE(v.id_user, a.id_user) = b.id_user + AND COALESCE(v.property_name, a.property_name) = b.property_name +`; + +export async function fetchUserSignals(): Promise { + return executeQuery(USER_SIGNALS_QUERY); +} + +// ── User Search Locations ────────────────────────────────────── + +const USER_SEARCHES_QUERY = ` +SELECT + id_user, + search_location_state, + search_location_city, + COUNT(*) AS search_count +FROM \`wander-9fc9c.analytics.search_sessions\` +WHERE id_user IS NOT NULL + AND search_location_state IS NOT NULL +GROUP BY id_user, search_location_state, search_location_city +ORDER BY id_user, search_count DESC +`; + +export async function fetchUserSearches(): Promise { + return executeQuery(USER_SEARCHES_QUERY); +} diff --git a/src/server/services/recs/sync.ts b/src/server/services/recs/sync.ts new file mode 100644 index 0000000..de9ae9d --- /dev/null +++ b/src/server/services/recs/sync.ts @@ -0,0 +1,249 @@ +/** + * Phase 5: Sync personalized recommendations to Customer.io profile attributes. + * + * Writes flat attributes to each user's CIO profile via the Track API. + * These attributes power dynamic email templates (Liquid snippets). + * + * Attributes written per user (For You feed × 3 slots): + * rec_for_you_1_name, rec_for_you_1_image, rec_for_you_1_url, + * rec_for_you_1_city, rec_for_you_1_state, rec_for_you_1_price, + * rec_for_you_1_beds, rec_for_you_1_landscape, rec_for_you_1_description, + * rec_for_you_1_cta + * (repeated for _2 and _3) + * recs_updated_at — date string of last sync + * recs_is_cold_start — boolean + * + * Ported from Python: clients/wander/recs/sync_to_cio.py + */ +import "server-only"; +import { trackClient } from "@/server/clients/customerio.client"; +import type { UserRecommendations, Recommendation } from "./types"; +import { RECS_CONFIG, REC_FIELDS } from "./types"; + +// ── Config ──────────────────────────────────────────────────── + +const CONCURRENCY = 8; // Parallel requests (CIO Track API limit ~10/sec) +const FEEDS = RECS_CONFIG.feeds; // ["for_you"] +const SLOTS_PER_FEED = RECS_CONFIG.slotsPerFeed; // 3 + +// ── Attribute Builders ──────────────────────────────────────── + +function recToAttributes( + rec: Recommendation, + feedPrefix: string, + slotNum: number, + propertyDescriptions?: Record, + propertyCtas?: Record, +): Record { + const prefix = `rec_${feedPrefix}_${slotNum}`; + const propName = rec.property_name; + + let description = propertyDescriptions?.[propName] ?? ""; + if (description.length > 500) { + const truncated = description.slice(0, 497); + const lastSpace = truncated.lastIndexOf(" "); + description = (lastSpace > 0 ? truncated.slice(0, lastSpace) : truncated) + "..."; + } + + return { + [`${prefix}_name`]: propName, + [`${prefix}_image`]: rec.cover_image_url, + [`${prefix}_url`]: rec.url, + [`${prefix}_city`]: rec.city, + [`${prefix}_state`]: rec.state, + [`${prefix}_price`]: rec.base_price, + [`${prefix}_beds`]: rec.bedrooms ?? null, + [`${prefix}_landscape`]: rec.landscape, + [`${prefix}_description`]: description, + [`${prefix}_cta`]: propertyCtas?.[propName] ?? "Explore this home", + }; +} + +function buildUserAttributes( + userRecs: UserRecommendations, + propertyDescriptions?: Record, + propertyCtas?: Record, +): Record { + const attrs: Record = {}; + + for (const feed of FEEDS) { + const recs = userRecs[feed] ?? []; + for (let i = 0; i < Math.min(recs.length, SLOTS_PER_FEED); i++) { + Object.assign( + attrs, + recToAttributes(recs[i]!, feed, i + 1, propertyDescriptions, propertyCtas), + ); + } + // Clear unused slots + for (let i = recs.length; i < SLOTS_PER_FEED; i++) { + for (const field of REC_FIELDS) { + attrs[`rec_${feed}_${i + 1}_${field}`] = ""; + } + } + } + + attrs["recs_updated_at"] = new Date().toISOString().split("T")[0]!; + attrs["recs_is_cold_start"] = userRecs.is_cold_start; + return attrs; +} + +// ── Concurrent Sync ─────────────────────────────────────────── + +async function syncUser( + identifier: string, + attrs: Record, +): Promise { + try { + await trackClient.identify(identifier, attrs); + return true; + } catch (error) { + const msg = error instanceof Error ? error.message : String(error); + if (msg.includes("429")) { + // Rate limited — wait and retry once + await new Promise((r) => setTimeout(r, 1000)); + try { + await trackClient.identify(identifier, attrs); + return true; + } catch { + return false; + } + } + console.error(` Failed to sync ${identifier}: ${msg}`); + return false; + } +} + +async function runConcurrent( + items: T[], + fn: (item: T) => Promise, + concurrency: number, +): Promise { + const results: R[] = []; + let index = 0; + + async function worker() { + while (index < items.length) { + const i = index++; + results[i] = await fn(items[i]!); + } + } + + const workers = Array.from({ length: Math.min(concurrency, items.length) }, () => + worker(), + ); + await Promise.all(workers); + return results; +} + +// ── Public API ──────────────────────────────────────────────── + +export interface SyncOptions { + /** Set of user IDs to exclude (unsubscribed) */ + excludeIds?: Set; + /** Property descriptions for enrichment (property_name → description) */ + propertyDescriptions?: Record; + /** Property CTAs (property_name → CTA text) */ + propertyCtas?: Record; + /** Max users to sync (for testing) */ + limit?: number; + /** If true, don't write to CIO — just return stats */ + dryRun?: boolean; +} + +export interface SyncResult { + synced: number; + failed: number; + skippedUnsub: number; + skippedNoId: number; + totalAttributes: number; + elapsedMs: number; +} + +export async function syncRecommendationsToCIO( + allRecs: Map, + options: SyncOptions = {}, +): Promise { + const { + excludeIds = new Set(), + propertyDescriptions, + propertyCtas, + limit, + dryRun = false, + } = options; + + console.log(`\nPhase 5: Sync to Customer.io [${dryRun ? "DRY RUN" : "LIVE"}]`); + + // Build payloads + const payloads: Array<{ identifier: string; attrs: Record }> = []; + let skippedUnsub = 0; + let skippedNoId = 0; + + for (const [uid, recs] of allRecs) { + if (limit && payloads.length >= limit) break; + + if (excludeIds.has(uid)) { + skippedUnsub++; + continue; + } + + if (!uid) { + skippedNoId++; + continue; + } + + const attrs = buildUserAttributes(recs, propertyDescriptions, propertyCtas); + payloads.push({ identifier: uid, attrs }); + } + + const attrsPerUser = FEEDS.length * SLOTS_PER_FEED * REC_FIELDS.length + 2; + console.log( + ` ${payloads.length.toLocaleString()} users to sync ` + + `(${skippedUnsub.toLocaleString()} unsub, ${skippedNoId} no ID)`, + ); + console.log(` ${attrsPerUser} attributes per user`); + + if (dryRun) { + console.log(" DRY RUN — no data written"); + return { + synced: 0, + failed: 0, + skippedUnsub, + skippedNoId, + totalAttributes: payloads.length * attrsPerUser, + elapsedMs: 0, + }; + } + + // Sync concurrently + const start = Date.now(); + let synced = 0; + let failed = 0; + + const results = await runConcurrent( + payloads, + async (p) => syncUser(p.identifier, p.attrs), + CONCURRENCY, + ); + + for (const ok of results) { + if (ok) synced++; + else failed++; + } + + const elapsedMs = Date.now() - start; + const rate = synced / Math.max(elapsedMs / 1000, 1); + + console.log(` Synced ${synced.toLocaleString()} users in ${(elapsedMs / 1000).toFixed(1)}s (${rate.toFixed(0)}/sec)`); + if (failed > 0) { + console.log(` Failed: ${failed.toLocaleString()}`); + } + + return { + synced, + failed, + skippedUnsub, + skippedNoId, + totalAttributes: synced * attrsPerUser, + elapsedMs, + }; +} diff --git a/src/server/services/recs/types.ts b/src/server/services/recs/types.ts new file mode 100644 index 0000000..1ad1170 --- /dev/null +++ b/src/server/services/recs/types.ts @@ -0,0 +1,150 @@ +/** + * Types for the personalized recommendations pipeline. + * + * Ported from Python: clients/wander/recs/ + * Original author: Brian Sun (Ottomate) + */ + +// ── BigQuery Row Types ───────────────────────────────────────── + +export interface BQProperty { + property_name: string; + city: string; + state: string; + bedrooms: number | null; + bathrooms: number | null; + occupancy: number | null; + base_price: number | null; + landscape_category: string; + description: string; + is_pet_allowed: boolean; + cover_image_url: string; + url: string; + dt_launched: string | null; + testimonial_text: string | null; // JSON array stored as string + activities: string | null; // JSON array stored as string +} + +export interface BQUserSignal { + id_user: string; + property_name: string; + view_count: number; + abandon_count: number; + book_count: number; + last_interaction: string | null; +} + +export interface BQUserSearch { + id_user: string; + search_location_state: string | null; + search_location_city: string | null; + search_count: number; +} + +// ── Internal Types ───────────────────────────────────────────── + +export interface PropertyEmbedding { + embedding: number[]; + city: string; + state: string; + landscape: string; + base_price: number | null; + bedrooms: number | null; + occupancy: number | null; + url: string; + cover_image_url: string; +} + +export interface PropertySignals { + view_count: number; + abandon_count: number; + book_count: number; + wishlisted: boolean; + last_interaction: string | null; +} + +export interface UserProfile { + properties: Record; + search_locations: Array<{ + state: string | null; + city: string | null; + count: number; + }>; + minerva_location: { city: string; state: string } | null; + cio_id: string | null; + email: string | null; + total_signals: number; +} + +export interface Recommendation { + property_name: string; + score: number; + similarity?: number; + city: string; + state: string; + landscape: string; + base_price: number | null; + bedrooms?: number | null; + url: string; + cover_image_url: string; + previously_viewed?: boolean; + search_location?: string; + days_since_launch?: number; + recency_score?: number; +} + +export interface UserRecommendations { + for_you: Recommendation[]; + near_you: Recommendation[]; + launches: Recommendation[]; + is_cold_start: boolean; +} + +// ── CIO Sync Types ───────────────────────────────────────────── + +export interface CIORecAttributes { + [key: string]: string | number | boolean | null; +} + +// ── Config ───────────────────────────────────────────────────── + +export const SIGNAL_WEIGHTS = { + viewed: 1, + searched: 2, + wishlisted: 3, + abandoned: 4, + booked: 5, +} as const; + +export const SCORING_WEIGHTS = { + similarity: 0.5, + recency: 0.1, + popularity: 0.15, + diversity: 0.15, + price_match: 0.1, +} as const; + +export const RECS_CONFIG = { + minSignals: 2, + topCandidates: 50, + recsPerFeed: 3, + recencyDays: 90, + decayHalfLifeDays: 30, + decayFloor: 0.05, + suppressAfterShown: 2, + slotsPerFeed: 3, + feeds: ["for_you"] as const, +} as const; + +export const REC_FIELDS = [ + "name", + "image", + "url", + "city", + "state", + "price", + "beds", + "landscape", + "description", + "cta", +] as const; From 46c2487d916758941ca0c6f6f1ceed4bc4df576b Mon Sep 17 00:00:00 2001 From: Brian Sun Date: Wed, 4 Mar 2026 00:27:10 +0900 Subject: [PATCH 2/2] =?UTF-8?q?fix:=20address=20Bugbot=20review=20?= =?UTF-8?q?=E2=80=94=20scoring,=20dead=20code,=20geo=20match?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. Add recency score to multi-factor formula (was missing, weights now total 0.85 — diversity 0.15 applied via landscape filtering) 2. Remove unused cosineSimilarity function (inline normalize+dot used) 3. Fix city geo-match to also verify state (Portland ME ≠ Portland OR) Co-Authored-By: Claude Opus 4.6 --- src/server/services/recs/engine.ts | 32 ++++++++++++++++++++++-------- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/src/server/services/recs/engine.ts b/src/server/services/recs/engine.ts index 953cfb5..a47b5a8 100644 --- a/src/server/services/recs/engine.ts +++ b/src/server/services/recs/engine.ts @@ -37,13 +37,6 @@ function norm(v: number[]): number { return Math.sqrt(dotProduct(v, v)); } -function cosineSimilarity(a: number[], b: number[]): number { - const normA = norm(a); - const normB = norm(b); - if (normA === 0 || normB === 0) return 0; - return dotProduct(a, b) / (normA * normB); -} - function normalizeVector(v: number[]): number[] { const n = norm(v); if (n === 0) return v; @@ -290,6 +283,7 @@ function generateForYou( profile: UserProfile, embeddings: Record, popularityScores: Map, + profiles: Map, ): Recommendation[] { const booked = new Set( Object.entries(profile.properties) @@ -310,12 +304,29 @@ function generateForYou( } scored.sort((a, b) => b.sim - a.sim); + // Compute recency scores for candidates (days since last interaction across all users) + const now = new Date(); + function recencyScore(propName: string): number { + let latest: string | null = null; + for (const p of profiles.values()) { + const sig = p.properties[propName]; + if (sig?.last_interaction && (!latest || sig.last_interaction > latest)) { + latest = sig.last_interaction; + } + } + if (!latest) return 0; + const daysAgo = (now.getTime() - new Date(latest).getTime()) / (1000 * 60 * 60 * 24); + return Math.max(0, 1 - daysAgo / RECS_CONFIG.recencyDays); + } + // Multi-factor scoring on top candidates + // Note: diversity (0.15) is applied via landscape filtering below, not as a score factor const candidates: Recommendation[] = []; for (const { name, sim } of scored.slice(0, RECS_CONFIG.topCandidates)) { const emb = embeddings[name]!; const score = sim * SCORING_WEIGHTS.similarity + + recencyScore(name) * SCORING_WEIGHTS.recency + (popularityScores.get(name) ?? 0) * SCORING_WEIGHTS.popularity + priceMatchScore(emb.base_price, pricePref) * SCORING_WEIGHTS.price_match; @@ -383,7 +394,11 @@ function generateNearYou( if (booked.has(name)) continue; let geoScore = 0; - if (top.city && emb.city.toLowerCase() === top.city.toLowerCase()) { + if ( + top.city && + emb.city.toLowerCase() === top.city.toLowerCase() && + emb.state.toLowerCase() === top.state!.toLowerCase() + ) { geoScore = 1.0; } else if (emb.state.toLowerCase() === top.state!.toLowerCase()) { geoScore = 0.7; @@ -482,6 +497,7 @@ export function generateAllRecommendations( profile, embeddings, popularityScores, + profiles, ); const nearYou = generateNearYou(profile, embeddings);