diff --git a/src/app/api/pipelines/cio-property-recs/recs/[userId]/route.ts b/src/app/api/pipelines/cio-property-recs/recs/[userId]/route.ts new file mode 100644 index 0000000..27d4b8d --- /dev/null +++ b/src/app/api/pipelines/cio-property-recs/recs/[userId]/route.ts @@ -0,0 +1,61 @@ +import { type NextRequest, NextResponse } from "next/server"; +import { env } from "@/env"; +import { + getStoredRecs, +} from "@/server/pipelines/cio-property-recs/cio-property-recs.pipeline"; + +/** + * GET /api/pipelines/cio-property-recs/recs/{userId} + * + * Returns pre-computed personalized property recommendations for a user. + * Used by the wander.com homepage "For You" carousel. + * Requires Authorization: Bearer . + * + * Query params: + * limit — max properties to return (default 6, max 20) + * + * Response: + * { user_id, properties: [{ slug, score, property_name, city, state, ... }], generated_at } + * + * Returns 404 if user has no recommendations. + */ +export async function GET( + _request: NextRequest, + { params }: { params: Promise<{ userId: string }> }, +) { + const token = + _request.headers.get("authorization")?.replace("Bearer ", "") ?? + _request.nextUrl.searchParams.get("token"); + + if (token !== env.ADMIN_TOKEN) { + return NextResponse.json({ error: "Unauthorized" }, { status: 401 }); + } + + const { userId } = await params; + const limit = Math.max( + 1, + Math.min( + parseInt( + new URL(_request.url).searchParams.get("limit") ?? "6", + 10, + ) || 6, + 20, + ), + ); + + const recs = getStoredRecs(userId); + + if (!recs) { + return NextResponse.json( + { error: "User not found in recommendations" }, + { status: 404 }, + ); + } + + return NextResponse.json({ + user_id: userId, + properties: recs.properties.slice(0, limit), + is_cold_start: recs.is_cold_start, + generated_at: recs.generated_at, + }); +} diff --git a/src/server/pipelines/cio-property-recs/cio-property-recs.pipeline.ts b/src/server/pipelines/cio-property-recs/cio-property-recs.pipeline.ts index 9c45354..a385913 100644 --- a/src/server/pipelines/cio-property-recs/cio-property-recs.pipeline.ts +++ b/src/server/pipelines/cio-property-recs/cio-property-recs.pipeline.ts @@ -8,16 +8,24 @@ * ─────────────── * 1. Fetch ~4,700 bookable properties from BigQuery (analytics.int_properties) * 2. Generate OpenAI embeddings for each property (text-embedding-3-small, 1536 dims) - * 3. Fetch user behavior signals from BigQuery (views, abandoned checkouts, bookings) + * 3. Fetch user behavior signals + search history + Minerva enrichment from BigQuery * 4. Precompute property-level popularity + recency indexes (once, before user loop) - * 5. For each user: compute taste embedding → cosine similarity → multi-factor score → top 3 + * For each user: compute taste embedding → cosine similarity → multi-factor score → top 3 + * 5. Store results in-memory for API endpoint (homepage "For You" carousel) * 6. Sync 32 flat attributes per user to CIO (Track API identify or Pipelines batch) * - * SCORING WEIGHTS + * SCORING WEIGHTS (7 factors) * ─────────────── - * Similarity 50% · Popularity 15% · Recency 10% · Price match 10% + * Similarity 45% · Popularity 15% · Recency 10% · Price match 10% + * Income match 8% · Life stage fit 5% · Engagement heat 2% * Diversity enforced via landscape-category filtering (not a score factor) * + * ENRICHMENT (Minerva demographics from analytics.customer_profiles) + * ─────────────── + * Income-aware pricing: maps estimated_income_range → property price tier + * Life stage fit: number_of_children + marital_status → bedroom/amenity matching + * Engagement heat: ts_last_visit + count_confirmed_bookings → activity signal + * * CIO ATTRIBUTE SCHEMA (per user, 3 slots) * ───────────────────────────────────────── * rec_for_you_{1|2|3}_name — property name @@ -65,10 +73,13 @@ const SIGNAL_WEIGHTS = { } as const; const SCORING = { - similarity: 0.5, + similarity: 0.45, popularity: 0.15, recency: 0.1, priceMatch: 0.1, + incomeMatch: 0.08, // Minerva income-aware pricing — prevents $2K recs for budget travelers + lifeStageFit: 0.05, // family/couples/remote worker → bedroom/amenity fit + engagementHeat: 0.02, // visit recency + booking frequency boost // diversity is enforced via landscape filtering, not a numeric score } as const; @@ -130,6 +141,18 @@ interface BQUserSearch { search_count: number; } +interface BQCustomerProfile { + id_user: string; + estimated_income_range: string | null; + number_of_children: number | null; + marital_status: string | null; + is_likely_remote_worker: boolean | null; + is_vip: boolean | null; + count_confirmed_bookings: number | null; + total_booking_revenue: number | null; + ts_last_visit: string | null; +} + // ── Internal Types ──────────────────────────────────────────────────────────── interface PropertySignals { @@ -140,6 +163,17 @@ interface PropertySignals { last_interaction: string | null; } +interface EnrichmentData { + incomeRange: string | null; + numberOfChildren: number | null; + maritalStatus: string | null; + isRemoteWorker: boolean; + isVip: boolean; + confirmedBookings: number; + totalRevenue: number; + lastVisit: string | null; +} + interface UserProfile { properties: Record; searchLocations: Array<{ @@ -148,6 +182,7 @@ interface UserProfile { count: number; }>; totalSignals: number; + enrichment?: EnrichmentData; } // Precomputed property-level scores — built once, used for every user ranking @@ -293,11 +328,37 @@ export async function fetchUserSearchHistory(userIds?: string[]): Promise { + const userFilter = userIds?.length ? `AND id_user IN UNNEST(@user_ids)` : ""; + return executeQuery( + ` + SELECT + id_user, + estimated_income_range, + number_of_children, + marital_status, + is_likely_remote_worker, + is_vip, + count_confirmed_bookings, + total_booking_revenue, + CAST(ts_last_visit AS STRING) AS ts_last_visit + FROM \`wander-9fc9c.analytics.customer_profiles\` + WHERE id_user IS NOT NULL + AND is_deleted = false + AND is_banned = false + ${userFilter} + `, + userIds?.length ? { user_ids: userIds } : undefined, + userIds?.length ? { user_ids: ["STRING"] } : undefined, + ); +} + // ── Profile Building ────────────────────────────────────────────────────────── export function buildUserProfiles( signals: BQUserSignal[], searches: BQUserSearch[], + customerProfiles?: BQCustomerProfile[], ): Map { const profiles = new Map(); @@ -363,6 +424,25 @@ export function buildUserProfiles( profile.totalSignals = total; } + // Merge Minerva enrichment data + if (customerProfiles?.length) { + for (const cp of customerProfiles) { + const profile = profiles.get(cp.id_user); + if (profile) { + profile.enrichment = { + incomeRange: cp.estimated_income_range, + numberOfChildren: cp.number_of_children, + maritalStatus: cp.marital_status, + isRemoteWorker: cp.is_likely_remote_worker ?? false, + isVip: cp.is_vip ?? false, + confirmedBookings: cp.count_confirmed_bookings ?? 0, + totalRevenue: cp.total_booking_revenue ?? 0, + lastVisit: cp.ts_last_visit, + }; + } + } + } + // Drop users with no property interactions for (const [uid, profile] of profiles) { if (Object.keys(profile.properties).length === 0) profiles.delete(uid); @@ -504,6 +584,106 @@ function priceMatchScore( return Math.max(0, 1 - Math.abs(Math.log(Math.max(ratio, 0.1))) * 0.5); } +// ── Enrichment Scoring ──────────────────────────────────────────────────────── + +const INCOME_TIERS: Record = { + // Minerva ranges from analytics.customer_profiles → mapped to 1-7 tiers + "<$100K": 2, + "$101K - $250K": 4, + "$251K - $500K": 5, + "$501K - $1M": 6, + ">$1M": 7, +}; + +function incomeTierFromRange(range: string | null): number { + if (!range) return 4; // assume middle if unknown + return INCOME_TIERS[range] ?? 4; +} + +function propertyPriceTier(basePrice: number | null): number { + if (!basePrice) return 4; + if (basePrice < 200) return 1; + if (basePrice < 400) return 2; + if (basePrice < 600) return 3; + if (basePrice < 800) return 4; + if (basePrice < 1200) return 5; + if (basePrice < 2000) return 6; + return 7; +} + +function incomeMatchScore( + enrichment: EnrichmentData | undefined, + candidatePrice: number | null, +): number { + if (!enrichment?.incomeRange) return 0.5; // neutral if no data + const userTier = incomeTierFromRange(enrichment.incomeRange); + const propTier = propertyPriceTier(candidatePrice); + const distance = Math.abs(userTier - propTier); + return Math.max(0, 1 - distance * 0.2); // 0.2 penalty per tier gap +} + +function lifeStageFitScore( + enrichment: EnrichmentData | undefined, + bedrooms: number | null, + description: string, +): number { + if (!enrichment) return 0.5; + let score = 0.5; + + // Families: boost properties with 4+ bedrooms + const children = enrichment.numberOfChildren ?? 0; + if (children >= 2 && bedrooms && bedrooms >= 4) score += 0.3; + else if (children >= 1 && bedrooms && bedrooms >= 3) score += 0.2; + + // Couples: boost 2-bed romantic properties (only if we know they have no kids) + if ( + enrichment.maritalStatus === "Married" && + enrichment.numberOfChildren !== null && + enrichment.numberOfChildren === 0 && + bedrooms && + bedrooms <= 3 + ) { + score += 0.15; + } + + // Remote workers: boost properties mentioning workspace amenities + if (enrichment.isRemoteWorker) { + const descLower = description.toLowerCase(); + if ( + descLower.includes("office") || + descLower.includes("desk") || + descLower.includes("workspace") || + descLower.includes("wifi") + ) { + score += 0.2; + } + } + + return Math.min(1, score); +} + +function engagementHeatScore(enrichment: EnrichmentData | undefined): number { + if (!enrichment) return 0.5; + let score = 0.5; // start at neutral, boost from there + + // Recency of last visit + if (enrichment.lastVisit) { + const daysAgo = + (Date.now() - new Date(enrichment.lastVisit).getTime()) / 86_400_000; + if (daysAgo < 7) score += 0.4; + else if (daysAgo < 30) score += 0.25; + else if (daysAgo < 90) score += 0.1; + } + + // Booking frequency bonus + if (enrichment.confirmedBookings >= 3) score += 0.2; + else if (enrichment.confirmedBookings >= 1) score += 0.1; + + return Math.min(1, score); +} + +// ── Ranking ─────────────────────────────────────────────────────────────────── + function rankPropertiesForUser( userEmbedding: number[], profile: UserProfile, @@ -534,7 +714,10 @@ function rankPropertiesForUser( sim * SCORING.similarity + (index.recency.get(name) ?? 0) * SCORING.recency + (index.popularity.get(name) ?? 0) * SCORING.popularity + - priceMatchScore(emb.base_price, pricePref) * SCORING.priceMatch; + priceMatchScore(emb.base_price, pricePref) * SCORING.priceMatch + + incomeMatchScore(profile.enrichment, emb.base_price) * SCORING.incomeMatch + + lifeStageFitScore(profile.enrichment, emb.bedrooms, emb.description) * SCORING.lifeStageFit + + engagementHeatScore(profile.enrichment) * SCORING.engagementHeat; scored.push({ propertyName: name, @@ -719,6 +902,69 @@ async function syncToCio( return { synced, failed }; } +// ── Recs API Store ─────────────────────────────────────────────────────────── + +interface StoredRec { + slug: string; + score: number; + property_name: string; + city: string; + state: string; + bedrooms: number | null; + base_price: number | null; + cover_image_url: string; + landscape: string; +} + +interface StoredUserRecs { + properties: StoredRec[]; + is_cold_start: boolean; + generated_at: string; +} + +const recsStore = new Map(); +let recsGeneratedAt = ""; + +function extractSlug(url: string): string { + if (url.includes("/property/")) return url.split("/property/").pop()?.replace(/\/$/, "") ?? url; + return url.replace(/\/$/, "").split("/").pop() ?? url; +} + +function storeRecsForApi( + allRecs: Map, + coldStartUserIds: Set, +): void { + recsStore.clear(); + recsGeneratedAt = new Date().toISOString(); + + for (const [uid, recs] of allRecs) { + recsStore.set(uid, { + properties: recs.map((r) => ({ + slug: extractSlug(r.url), + score: r.score, + property_name: r.propertyName, + city: r.city, + state: r.state, + bedrooms: r.bedrooms, + base_price: r.basePrice, + cover_image_url: r.imageUrl, + landscape: r.landscape, + })), + is_cold_start: coldStartUserIds.has(uid), + generated_at: recsGeneratedAt, + }); + } +} + +export function getStoredRecs(userId: string): StoredUserRecs | null { + return recsStore.get(userId) ?? null; +} + +/** Used internally for debugging — not exposed via API */ +function getRecsStoreStats(): { users: number; generatedAt: string } { + return { users: recsStore.size, generatedAt: recsGeneratedAt }; +} + // ── Pipeline Entry Point ────────────────────────────────────────────────────── export async function run( @@ -736,24 +982,30 @@ export async function run( console.log(" [2/5] Generating property embeddings..."); const embeddings = await generatePropertyEmbeddings(properties); - console.log(" [3/5] Fetching user signals and search history..."); + console.log(" [3/6] Fetching user signals, search history, and enrichment..."); const userIds = options.testEmails?.length ? await lookupUserIdsByEmail(options.testEmails) : undefined; if (userIds) console.log(` Filtering to ${userIds.length} test users`); - const [signals, searches] = await Promise.all([ + const [signals, searches, customerProfiles] = await Promise.all([ fetchUserBehaviorSignals(userIds), fetchUserSearchHistory(userIds), + fetchCustomerProfiles(userIds), ]); - const profiles = buildUserProfiles(signals, searches); + console.log(` ${customerProfiles.length.toLocaleString()} enrichment profiles`); + const profiles = buildUserProfiles(signals, searches, customerProfiles); console.log(` ${profiles.size.toLocaleString()} active users`); - console.log(" [4/5] Ranking properties per user..."); + console.log(" [4/6] Ranking properties per user..."); const index = buildPropertyIndex(profiles); - const { recs, coldStart } = generateAllRecs(profiles, embeddings, index); + const { recs, coldStart, coldStartUserIds } = generateAllRecs(profiles, embeddings, index); console.log(` ${coldStart.toLocaleString()} cold start users`); - console.log(" [5/5] Syncing to Customer.io..."); + console.log(" [5/6] Storing recs for API..."); + storeRecsForApi(recs, coldStartUserIds); + console.log(` ${recs.size.toLocaleString()} users stored`); + + console.log(" [6/6] Syncing to Customer.io..."); console.log( ` mode=${options.useBatchSync ? "pipelines-batch" : "track-identify"}`, );