diff --git a/apps/cms/src/api/core-sync/services/strapi-helpers.ts b/apps/cms/src/api/core-sync/services/strapi-helpers.ts index db8132d6..9e7c6a7e 100644 --- a/apps/cms/src/api/core-sync/services/strapi-helpers.ts +++ b/apps/cms/src/api/core-sync/services/strapi-helpers.ts @@ -180,7 +180,10 @@ export async function getLastSyncTime( ): Promise { const knex = strapi.db.connection const row = await knex(SYNC_STATE_TABLE).where({ phase }).first() - return row?.last_synced_at ?? null + if (!row?.last_synced_at) return null + // knex/pg returns timestamptz as a JS Date — convert to ISO string for the Core API + const val = row.last_synced_at + return val instanceof Date ? val.toISOString() : String(val) } /** Persist the sync timestamp for a phase after a successful run. */ diff --git a/apps/cms/src/api/core-sync/services/sync-videos.ts b/apps/cms/src/api/core-sync/services/sync-videos.ts index e1e3a249..60ebabe9 100644 --- a/apps/cms/src/api/core-sync/services/sync-videos.ts +++ b/apps/cms/src/api/core-sync/services/sync-videos.ts @@ -7,9 +7,7 @@ import { type ProgressReporter, getPrimaryValue, formatError, - upsertByCoreId, softDeleteUnseen, - buildCoreIdMap, } from "./strapi-helpers" import { bulkUpsertByCoreId, type BulkRecord } from "./bulk-upsert" @@ -193,7 +191,7 @@ export async function syncVideos( ) } - // Sync BibleBooks via Strapi (small set, only on full sync) + // Sync BibleBooks via bulk upsert (small set, only on full sync) if (!isIncremental) { try { const bibleData = ( @@ -202,20 +200,25 @@ export async function syncVideos( strapi.log.info( `[core-sync] Fetched ${bibleData.bibleBooks.length} bible books from core`, ) - for (const book of bibleData.bibleBooks) { - await upsertByCoreId( + const bibleBookRecords: Array<{ + coreId: string + data: Record + }> = bibleData.bibleBooks.map((book) => ({ + coreId: book.id, + data: { + name: getPrimaryValue(book.name), + osis_id: book.osisId, + alternate_name: book.alternateName ?? null, + paratext_abbreviation: book.paratextAbbreviation, + is_new_testament: book.isNewTestament, + order: book.order, + }, + })) + if (bibleBookRecords.length > 0) { + await bulkUpsertByCoreId( strapi, - "api::bible-book.bible-book", - book.id, - { - name: getPrimaryValue(book.name), - osisId: book.osisId, - alternateName: book.alternateName ?? undefined, - paratextAbbreviation: book.paratextAbbreviation, - isNewTestament: book.isNewTestament, - order: book.order, - }, - { locale: "en" }, + { tableName: "bible_books", locale: "en", linkConfigs: [] }, + bibleBookRecords, ) } } catch (error) { @@ -226,54 +229,172 @@ export async function syncVideos( } // Pre-load lookup caches - const languageMap = await buildCoreIdMap( - strapi, - "api::language.language", - "en", - ) - const bibleBookMap = await buildCoreIdMap( - strapi, - "api::bible-book.bible-book", - "en", - ) - const keywordMap = await buildCoreIdMap(strapi, "api::keyword.keyword") + const knex = strapi.db.connection + + const languageRows: Array<{ core_id: string; document_id: string }> = + await knex("languages") + .select("core_id", "document_id") + .where("locale", "en") + .whereNotNull("core_id") + .groupBy("core_id", "document_id") + const languageMap = new Map() + for (const row of languageRows) languageMap.set(row.core_id, row.document_id) + + const bibleBookRows: Array<{ core_id: string; document_id: string }> = + await knex("bible_books") + .select("core_id", "document_id") + .where("locale", "en") + .whereNotNull("core_id") + .groupBy("core_id", "document_id") + const bibleBookMap = new Map() + for (const row of bibleBookRows) + bibleBookMap.set(row.core_id, row.document_id) + + const keywordRows: Array<{ core_id: string; document_id: string }> = + await knex("keywords") + .select("core_id", "document_id") + .whereNotNull("core_id") + .groupBy("core_id", "document_id") + const keywordMap = new Map() + for (const row of keywordRows) keywordMap.set(row.core_id, row.document_id) strapi.log.info( `[core-sync] Loaded caches: ${languageMap.size} languages, ${bibleBookMap.size} bible books, ${keywordMap.size} keywords`, ) - // Dedup origins and editions (small cardinality, upsert via Strapi) + // Dedup origins and editions (small cardinality, accumulated across pages) const originMap = new Map() const editionMap = new Map() const seenVideoIds = new Set() const seenSubtitleIds = new Set() const seenImageIds = new Set() - const seenStudyQuestionIds = new Set() - const seenCitationIds = new Set() const parentChildMap = new Map() - // Collect ALL records for bulk upsert - const allVideoRecords: BulkRecord[] = [] - const allImageRecords: BulkRecord[] = [] - const allSubtitleRecords: BulkRecord[] = [] - const allStudyQuestionRecords: BulkRecord[] = [] - const allCitationRecords: BulkRecord[] = [] - // Track keyword links per video: videoCoreId → keywordCoreIds + // Track keyword links per video: videoCoreId -> keywordCoreIds const videoKeywordLinks = new Map() + // Running map of video coreId -> documentId, accumulated across pages + const videoDocMap = new Map() + + // Link configs (reused each page iteration) + const videoLinkConfigs = [ + { + linkTable: "videos_origin_lnk", + sourceColumn: "video_id", + targetTable: "video_origins", + targetColumn: "video_origin_id", + targetLocale: "", + orderColumn: "video_ord", + }, + { + linkTable: "videos_primary_language_lnk", + sourceColumn: "video_id", + targetTable: "languages", + targetColumn: "language_id", + targetLocale: "en", + orderColumn: "video_ord", + }, + ] + + const imageLinkConfigs = [ + { + linkTable: "video_images_video_lnk", + sourceColumn: "video_image_id", + targetTable: "videos", + targetColumn: "video_id", + targetLocale: "en", + orderColumn: "video_image_ord", + }, + ] + + const studyQuestionLinkConfigs = [ + { + linkTable: "video_study_questions_video_lnk", + sourceColumn: "video_study_question_id", + targetTable: "videos", + targetColumn: "video_id", + targetLocale: "en", + orderColumn: "video_study_question_ord", + }, + ] + + const citationLinkConfigs = [ + { + linkTable: "bible_citations_bible_book_lnk", + sourceColumn: "bible_citation_id", + targetTable: "bible_books", + targetColumn: "bible_book_id", + targetLocale: "en", + orderColumn: "bible_citation_ord", + }, + { + linkTable: "bible_citations_video_lnk", + sourceColumn: "bible_citation_id", + targetTable: "videos", + targetColumn: "video_id", + targetLocale: "en", + orderColumn: "bible_citation_ord", + }, + ] + + const subtitleLinkConfigs = [ + { + linkTable: "video_subtitles_language_lnk", + sourceColumn: "video_subtitle_id", + targetTable: "languages", + targetColumn: "language_id", + targetLocale: "en", + orderColumn: "video_subtitle_ord", + }, + { + linkTable: "video_subtitles_video_edition_lnk", + sourceColumn: "video_subtitle_id", + targetTable: "video_editions", + targetColumn: "video_edition_id", + targetLocale: "", + orderColumn: "video_subtitle_ord", + }, + { + linkTable: "video_subtitles_video_lnk", + sourceColumn: "video_subtitle_id", + targetTable: "videos", + targetColumn: "video_id", + targetLocale: "en", + orderColumn: "video_subtitle_ord", + }, + ] + let offset = 0 let totalFetched = 0 - // ── Phase 1: Fetch all videos from core API ────────────────────────── + // Prefetch: kick off the first page fetch + let pendingFetch: Promise | null = null + + function fetchPage(pageOffset: number): Promise { + const fetchStart = Date.now() + return getCoreClient() + .query({ + query: VIDEOS_QUERY, + variables: { limit: pageSize, offset: pageOffset, where }, + }) + .then(({ data }) => { + const fetchMs = Date.now() - fetchStart + strapi.log.info( + `[core-sync] [timing] fetch page offset=${pageOffset}: ${fetchMs}ms (${data.videos.length} records)`, + ) + return data.videos + }) + } + + pendingFetch = fetchPage(offset) + + // ── Main loop: fetch, upsert per page ────────────────────────────────── while (true) { + const pageStart = Date.now() let videos: CoreVideo[] try { - const { data } = await getCoreClient().query({ - query: VIDEOS_QUERY, - variables: { limit: pageSize, offset, where }, - }) - videos = data.videos + videos = await pendingFetch! } catch (error) { strapi.log.warn( `[core-sync] Failed to fetch video page (offset ${offset}): ${formatError(error)}. Stopping pagination.`, @@ -290,46 +411,101 @@ export async function syncVideos( if (videos.length === 0) break - // Upsert origins and editions (small cardinality) + // Prefetch next page while we process this one + const hasMore = videos.length === pageSize + if (hasMore) { + pendingFetch = fetchPage(offset + pageSize).catch((e) => { + // Prevent unhandled rejection if current page processing throws + strapi.log.warn( + `[core-sync] Prefetch failed (offset ${offset + pageSize}): ${formatError(e)}`, + ) + return [] as CoreVideo[] + }) + } + + // ── Bulk upsert origins and editions for this page ──────────────── + const originEditionStart = Date.now() + const pageOriginRecords: Array<{ + coreId: string + data: Record + }> = [] + const pageEditionRecords: Array<{ + coreId: string + data: Record + }> = [] + for (const video of videos) { if (video.origin && !originMap.has(video.origin.id)) { - try { - const { documentId } = await upsertByCoreId( - strapi, - "api::video-origin.video-origin", - video.origin.id, - { - name: video.origin.name, - description: video.origin.description ?? undefined, - }, - ) - originMap.set(video.origin.id, documentId) - } catch (error) { - strapi.log.warn( - `[core-sync] Failed to upsert video origin ${video.origin.id}: ${formatError(error)}`, - ) - } + pageOriginRecords.push({ + coreId: video.origin.id, + data: { + name: video.origin.name, + description: video.origin.description ?? null, + }, + }) + originMap.set(video.origin.id, "") // placeholder } for (const sub of video.subtitles) { if (sub.videoEdition && !editionMap.has(sub.videoEdition.id)) { - try { - const { documentId } = await upsertByCoreId( - strapi, - "api::video-edition.video-edition", - sub.videoEdition.id, - { name: sub.videoEdition.name ?? undefined }, - ) - editionMap.set(sub.videoEdition.id, documentId) - } catch (error) { - strapi.log.warn( - `[core-sync] Failed to upsert edition ${sub.videoEdition.id}: ${formatError(error)}`, - ) - } + pageEditionRecords.push({ + coreId: sub.videoEdition.id, + data: { name: sub.videoEdition.name ?? null }, + }) + editionMap.set(sub.videoEdition.id, "") // placeholder } } } - // Collect records + if (pageOriginRecords.length > 0) { + await bulkUpsertByCoreId( + strapi, + { tableName: "video_origins", locale: "", linkConfigs: [] }, + pageOriginRecords, + ) + const originCoreIds = pageOriginRecords.map((r) => r.coreId) + const originRows: Array<{ core_id: string; document_id: string }> = + await knex("video_origins") + .select("core_id", "document_id") + .whereIn("core_id", originCoreIds) + .where("locale", "") + .groupBy("core_id", "document_id") + for (const row of originRows) { + originMap.set(row.core_id, row.document_id) + } + } + + if (pageEditionRecords.length > 0) { + await bulkUpsertByCoreId( + strapi, + { tableName: "video_editions", locale: "", linkConfigs: [] }, + pageEditionRecords, + ) + const edCoreIds = pageEditionRecords.map((r) => r.coreId) + const edRows: Array<{ core_id: string; document_id: string }> = + await knex("video_editions") + .select("core_id", "document_id") + .whereIn("core_id", edCoreIds) + .where("locale", "") + .groupBy("core_id", "document_id") + for (const row of edRows) { + editionMap.set(row.core_id, row.document_id) + } + } + + const originEditionMs = Date.now() - originEditionStart + if (originEditionMs > 50) { + strapi.log.info( + `[core-sync] [timing] origin/edition bulk upserts: ${originEditionMs}ms (${pageOriginRecords.length} origins, ${pageEditionRecords.length} editions)`, + ) + } + + // ── Build page-scoped records ───────────────────────────────────── + const pageVideoRecords: BulkRecord[] = [] + const pageImageRecords: BulkRecord[] = [] + const pageSubtitleRecords: BulkRecord[] = [] + const pageStudyQuestionRecords: BulkRecord[] = [] + const pageCitationRecords: BulkRecord[] = [] + for (const video of videos) { seenVideoIds.add(video.id) @@ -341,7 +517,7 @@ export async function syncVideos( ? originMap.get(video.origin.id) : undefined - allVideoRecords.push({ + pageVideoRecords.push({ coreId: video.id, data: { title: getPrimaryValue(video.title), @@ -367,7 +543,7 @@ export async function syncVideos( // Images for (const img of video.images) { seenImageIds.add(img.id) - allImageRecords.push({ + pageImageRecords.push({ coreId: img.id, data: { cloudflare_id: img.id, @@ -381,7 +557,6 @@ export async function syncVideos( blurhash: img.blurhash ?? null, }, links: { - // Placeholder — resolved after video bulk upsert _videoCoreId: video.id, } as Record, }) @@ -390,8 +565,7 @@ export async function syncVideos( // Study questions for (const sq of video.studyQuestions) { if (!sq.id) continue - seenStudyQuestionIds.add(sq.id) - allStudyQuestionRecords.push({ + pageStudyQuestionRecords.push({ coreId: sq.id, data: { value: sq.value, @@ -405,8 +579,7 @@ export async function syncVideos( // Bible citations for (const bc of video.bibleCitations) { - seenCitationIds.add(bc.id) - allCitationRecords.push({ + pageCitationRecords.push({ coreId: bc.id, data: { osis_id: bc.osisId, @@ -429,7 +602,7 @@ export async function syncVideos( const editionDocId = sub.videoEdition ? editionMap.get(sub.videoEdition.id) : undefined - allSubtitleRecords.push({ + pageSubtitleRecords.push({ coreId: sub.id, data: { primary: sub.primary, @@ -447,211 +620,132 @@ export async function syncVideos( } } - totalFetched += videos.length - progress.increment(videos.length) - const pct = coreTotal - ? `${((totalFetched / coreTotal) * 100).toFixed(1)}%` - : "?" - strapi.log.info( - `[core-sync] Videos fetched: ${totalFetched}/${coreTotal} (${pct})`, + // ── Bulk upsert videos for this page ────────────────────────────── + const videoUpsertStart = Date.now() + if (pageVideoRecords.length > 0) { + const videoStats = await bulkUpsertByCoreId( + strapi, + { + tableName: "videos", + locale: "en", + linkConfigs: videoLinkConfigs, + }, + pageVideoRecords, + progress, + ) + stats.created += videoStats.created + stats.updated += videoStats.updated + stats.errors += videoStats.errors + } + const videoUpsertMs = Date.now() - videoUpsertStart + + // ── Query video documentIds for this page's coreIds ─────────────── + const resolveStart = Date.now() + if (pageVideoRecords.length > 0) { + const pageCoreIds = pageVideoRecords.map((r) => r.coreId) + const videoRows: Array<{ core_id: string; document_id: string }> = + await knex("videos") + .select("core_id", "document_id") + .whereIn("core_id", pageCoreIds) + .where("locale", "en") + .groupBy("core_id", "document_id") + for (const row of videoRows) { + videoDocMap.set(row.core_id, row.document_id) + } + } + + // Resolve _videoCoreId placeholders to actual documentIds + function resolveVideoLinks(records: BulkRecord[], linkTableName: string) { + for (const rec of records) { + const videoCoreId = (rec.links as Record)?._videoCoreId + if (videoCoreId) { + delete rec.links!._videoCoreId + rec.links![linkTableName] = videoDocMap.get(videoCoreId) + } + } + } + + // ── Bulk upsert sub-entities for this page ──────────────────────── + resolveVideoLinks(pageImageRecords, "video_images_video_lnk") + resolveVideoLinks( + pageStudyQuestionRecords, + "video_study_questions_video_lnk", ) + resolveVideoLinks(pageCitationRecords, "bible_citations_video_lnk") + resolveVideoLinks(pageSubtitleRecords, "video_subtitles_video_lnk") - if (videos.length < pageSize) break - offset += pageSize - } + const subEntityStart = Date.now() - // ── Phase 2: Bulk upsert videos ─────────────────────────────────────── - strapi.log.info(`[core-sync] Bulk upserting ${allVideoRecords.length} videos`) - const videoStats = await bulkUpsertByCoreId( - strapi, - { - tableName: "videos", - locale: "en", - linkConfigs: [ + if (pageImageRecords.length > 0) { + const imgStats = await bulkUpsertByCoreId( + strapi, { - linkTable: "videos_origin_lnk", - sourceColumn: "video_id", - targetTable: "video_origins", - targetColumn: "video_origin_id", - targetLocale: "", - orderColumn: "video_ord", + tableName: "video_images", + locale: "", + linkConfigs: imageLinkConfigs, }, + pageImageRecords, + ) + stats.errors += imgStats.errors + } + + if (pageStudyQuestionRecords.length > 0) { + const sqStats = await bulkUpsertByCoreId( + strapi, { - linkTable: "videos_primary_language_lnk", - sourceColumn: "video_id", - targetTable: "languages", - targetColumn: "language_id", - targetLocale: "en", - orderColumn: "video_ord", + tableName: "video_study_questions", + locale: "en", + linkConfigs: studyQuestionLinkConfigs, }, - ], - }, - allVideoRecords, - ) - stats.created = videoStats.created - stats.updated = videoStats.updated - stats.errors = videoStats.errors - strapi.log.info( - `[core-sync] Videos: ${videoStats.created} created, ${videoStats.updated} updated, ${videoStats.errors} errors`, - ) - - // Build video coreId → documentId map for sub-entity linking - const videoDocMap = await buildCoreIdMap(strapi, "api::video.video", "en") + pageStudyQuestionRecords, + ) + stats.errors += sqStats.errors + } - // Resolve _videoCoreId placeholders to actual documentIds - function resolveVideoLinks(records: BulkRecord[], linkTableName: string) { - for (const rec of records) { - const videoCoreId = (rec.links as Record)?._videoCoreId - if (videoCoreId) { - delete rec.links!._videoCoreId - rec.links![linkTableName] = videoDocMap.get(videoCoreId) - } + if (pageCitationRecords.length > 0) { + const bcStats = await bulkUpsertByCoreId( + strapi, + { + tableName: "bible_citations", + locale: "", + linkConfigs: citationLinkConfigs, + }, + pageCitationRecords, + ) + stats.errors += bcStats.errors } - } - // ── Phase 3: Bulk upsert images ─────────────────────────────────────── - resolveVideoLinks(allImageRecords, "video_images_video_lnk") - if (allImageRecords.length > 0) { - strapi.log.info( - `[core-sync] Bulk upserting ${allImageRecords.length} video images`, - ) - const imgStats = await bulkUpsertByCoreId( - strapi, - { - tableName: "video_images", - locale: "", - linkConfigs: [ - { - linkTable: "video_images_video_lnk", - sourceColumn: "video_image_id", - targetTable: "videos", - targetColumn: "video_id", - targetLocale: "en", - orderColumn: "video_image_ord", - }, - ], - }, - allImageRecords, - ) - strapi.log.info( - `[core-sync] Video images: ${imgStats.created} created, ${imgStats.updated} updated`, - ) - } + if (pageSubtitleRecords.length > 0) { + const subStats = await bulkUpsertByCoreId( + strapi, + { + tableName: "video_subtitles", + locale: "", + linkConfigs: subtitleLinkConfigs, + }, + pageSubtitleRecords, + ) + stats.errors += subStats.errors + } - // ── Phase 4: Bulk upsert study questions ────────────────────────────── - resolveVideoLinks(allStudyQuestionRecords, "video_study_questions_video_lnk") - if (allStudyQuestionRecords.length > 0) { - strapi.log.info( - `[core-sync] Bulk upserting ${allStudyQuestionRecords.length} study questions`, - ) - const sqStats = await bulkUpsertByCoreId( - strapi, - { - tableName: "video_study_questions", - locale: "en", - linkConfigs: [ - { - linkTable: "video_study_questions_video_lnk", - sourceColumn: "video_study_question_id", - targetTable: "videos", - targetColumn: "video_id", - targetLocale: "en", - orderColumn: "video_study_question_ord", - }, - ], - }, - allStudyQuestionRecords, - ) - strapi.log.info( - `[core-sync] Study questions: ${sqStats.created} created, ${sqStats.updated} updated`, - ) - } + const subEntityMs = Date.now() - subEntityStart + const resolveMs = Date.now() - resolveStart + const pageMs = Date.now() - pageStart - // ── Phase 5: Bulk upsert bible citations ────────────────────────────── - resolveVideoLinks(allCitationRecords, "bible_citations_video_lnk") - if (allCitationRecords.length > 0) { - strapi.log.info( - `[core-sync] Bulk upserting ${allCitationRecords.length} bible citations`, - ) - const bcStats = await bulkUpsertByCoreId( - strapi, - { - tableName: "bible_citations", - locale: "", - linkConfigs: [ - { - linkTable: "bible_citations_bible_book_lnk", - sourceColumn: "bible_citation_id", - targetTable: "bible_books", - targetColumn: "bible_book_id", - targetLocale: "en", - orderColumn: "bible_citation_ord", - }, - { - linkTable: "bible_citations_video_lnk", - sourceColumn: "bible_citation_id", - targetTable: "videos", - targetColumn: "video_id", - targetLocale: "en", - orderColumn: "bible_citation_ord", - }, - ], - }, - allCitationRecords, - ) + totalFetched += videos.length + const pct = coreTotal + ? `${((totalFetched / coreTotal) * 100).toFixed(1)}%` + : "?" strapi.log.info( - `[core-sync] Bible citations: ${bcStats.created} created, ${bcStats.updated} updated`, + `[core-sync] Videos: ${totalFetched}/${coreTotal} (${pct}) — page: ${pageVideoRecords.length}v/${pageImageRecords.length}img/${pageSubtitleRecords.length}sub/${pageStudyQuestionRecords.length}sq/${pageCitationRecords.length}bc — timing: page=${pageMs}ms video_upsert=${videoUpsertMs}ms resolve=${resolveMs}ms sub_entities=${subEntityMs}ms`, ) - } - // ── Phase 6: Bulk upsert subtitles ──────────────────────────────────── - resolveVideoLinks(allSubtitleRecords, "video_subtitles_video_lnk") - if (allSubtitleRecords.length > 0) { - strapi.log.info( - `[core-sync] Bulk upserting ${allSubtitleRecords.length} subtitles`, - ) - const subStats = await bulkUpsertByCoreId( - strapi, - { - tableName: "video_subtitles", - locale: "", - linkConfigs: [ - { - linkTable: "video_subtitles_language_lnk", - sourceColumn: "video_subtitle_id", - targetTable: "languages", - targetColumn: "language_id", - targetLocale: "en", - orderColumn: "video_subtitle_ord", - }, - { - linkTable: "video_subtitles_video_edition_lnk", - sourceColumn: "video_subtitle_id", - targetTable: "video_editions", - targetColumn: "video_edition_id", - targetLocale: "", - orderColumn: "video_subtitle_ord", - }, - { - linkTable: "video_subtitles_video_lnk", - sourceColumn: "video_subtitle_id", - targetTable: "videos", - targetColumn: "video_id", - targetLocale: "en", - orderColumn: "video_subtitle_ord", - }, - ], - }, - allSubtitleRecords, - ) - strapi.log.info( - `[core-sync] Subtitles: ${subStats.created} created, ${subStats.updated} updated`, - ) + if (videos.length < pageSize) break + offset += pageSize } // ── Phase 7: Link keywords (manyToMany via raw SQL) ─────────────────── if (videoKeywordLinks.size > 0) { - const knex = strapi.db.connection const linkRows: Array<{ video_id: number keyword_id: number @@ -716,7 +810,7 @@ export async function syncVideos( const kwIds = kwIdMap.get(kwDocId) if (!kwIds) continue - // Draft → Draft + // Draft -> Draft if (videoIds.draftId && kwIds.draftId) { linkRows.push({ video_id: videoIds.draftId, @@ -725,7 +819,7 @@ export async function syncVideos( keyword_ord: 1, }) } - // Published → Published + // Published -> Published if (videoIds.publishedId && kwIds.publishedId) { linkRows.push({ video_id: videoIds.publishedId, @@ -739,13 +833,19 @@ export async function syncVideos( } if (linkRows.length > 0) { - // Delete existing keyword links for all videos being synced - const allVideoRowIds = [...videoIdMap.values()].flatMap((v) => - [v.draftId, v.publishedId].filter(Boolean), - ) - for (let i = 0; i < allVideoRowIds.length; i += 1000) { + // Delete existing keyword links only for videos in THIS sync batch + const syncedVideoRowIds: number[] = [] + for (const videoCoreId of videoKeywordLinks.keys()) { + const docId = videoDocMap.get(videoCoreId) + if (!docId) continue + const ids = videoIdMap.get(docId) + if (!ids) continue + if (ids.draftId) syncedVideoRowIds.push(ids.draftId) + if (ids.publishedId) syncedVideoRowIds.push(ids.publishedId) + } + for (let i = 0; i < syncedVideoRowIds.length; i += 1000) { await knex("videos_keywords_lnk") - .whereIn("video_id", allVideoRowIds.slice(i, i + 1000)) + .whereIn("video_id", syncedVideoRowIds.slice(i, i + 1000)) .delete() } @@ -779,9 +879,8 @@ export async function syncVideos( ) } - // ── Phase 9: Link parent→children (manyToMany self-ref) ────────────── + // ── Phase 9: Link parent->children (manyToMany self-ref) ────────────── if (parentChildMap.size > 0) { - const knex = strapi.db.connection const videoRows: Array<{ id: number document_id: string diff --git a/docs/solutions/cms/core-sync-production-vs-local-performance-gap.md b/docs/solutions/cms/core-sync-production-vs-local-performance-gap.md new file mode 100644 index 00000000..a05a3d76 --- /dev/null +++ b/docs/solutions/cms/core-sync-production-vs-local-performance-gap.md @@ -0,0 +1,119 @@ +--- +title: "Core Sync Production vs Local Performance Gap" +category: cms +date: 2026-03-30 +severity: high +tags: + - core-sync + - performance + - railway + - strapi-v5 + - upsertByCoreId +modules: + - apps/cms +related_issues: + - "PR #558" + - "PR #560" + - "PR #562" +--- + +# Core Sync Production vs Local Performance Gap + +## Problem + +The video sync phase completed in 48.8 seconds locally but was stuck for 20+ minutes on production (Railway) for the same 1,056 records. The video variant sync showed similar patterns — 60+ minutes on production for 207K records with zero progress visibility. + +## Symptoms + +- Production sync status stuck at `processed=0/1056` for 20+ minutes on videos phase +- Locally the same phase completed in 48.8 seconds +- Languages/countries/keywords (which use `bulkUpsertByCoreId`) completed fast on both local and production +- Only phases using `upsertByCoreId` (Strapi document service, one-at-a-time) were slow on production + +## Root Cause + +**`upsertByCoreId` (Strapi document service) is dramatically slower on Railway than local dev.** + +Each `upsertByCoreId` call does: + +1. `findFirst` with filters (through Strapi ORM → SQL query) +2. Either `create` or `update` (through Strapi ORM → SQL query) + +On local dev, the app and PostgreSQL are on the same Docker network (~0.1ms latency). On Railway, the app container and PostgreSQL container communicate over Railway's internal network with higher latency (~1-5ms per query). With 2 queries per upsert and hundreds of origins/editions per page, this compounds massively. + +The `bulkUpsertByCoreId` path bypasses Strapi's document service entirely, using raw knex SQL with batch operations. This is consistently fast on both local and production because it minimizes round-trips. + +## Solution + +Applied across all sync phases (PRs #558, #560, #562): + +1. **Replace `upsertByCoreId` with `bulkUpsertByCoreId`** for all entity types: + - Video origins and editions (sync-videos.ts) + - Video editions and mux videos (sync-video-variants.ts) + - Bible books (sync-videos.ts) + +2. **Per-page upsert** instead of collect-all-then-upsert: + - Fetch page → bulk upsert → next page + - Immediate progress visibility + - Reduced memory usage + +3. **Prefetch next page** while upserting current one + +4. **Incremental doc maps** (targeted `whereIn` queries) instead of full table scans via `buildCoreIdMap` + +### Performance comparison (videos phase, 1,056 records) + +| Approach | Local | Production | +| ----------------------------------- | ----- | ------------- | +| Old (collect-all + upsertByCoreId) | ~45s | 20+ min | +| New (per-page + bulkUpsertByCoreId) | ~49s | TBD (PR #562) | + +### Per-page timing breakdown (local, 500 records/page) + +| Component | Time | Notes | +| --------------------------- | ------ | --------------------------------------------- | +| Core API fetch | 2-28s | Gateway response, varies with payload size | +| Origin/edition bulk upsert | ~64ms | Was 2+ seconds with upsertByCoreId | +| Video bulk upsert | ~76ms | Fast — temp table + UPDATE FROM | +| Sub-entity resolve + upsert | ~1-2s | Images, subtitles, study questions, citations | +| **Page total** | ~5-30s | Dominated by API fetch | + +## Gotchas + +### Strapi document service vs raw SQL on Railway + +Never use `upsertByCoreId` (Strapi document service) in a loop for bulk operations on Railway. Each call incurs 2+ network round-trips through the ORM. Use `bulkUpsertByCoreId` (raw knex SQL) instead, which batches operations and minimizes round-trips. + +Local dev masks this because the DB latency is near-zero. + +### The controller incremental bug + +The controller at `controllers/core-sync.ts` had `ctx.request.body?.incremental === true` which sent `false` (not `undefined`) to `runSync` when the body omitted the field. This overrode the `?? true` default in `runSync`, causing production to always run full syncs. Fixed by sending `undefined` when not specified in the body. + +### Core API gateway returns all records with updatedAt filter + +When testing incremental, we found the Core API returned all 1,056 videos even with `updatedAt: { gte: }`. Investigation revealed the videos watermark was from March 27 (3 days old) because the video phase hadn't completed successfully since then. All videos had genuinely been updated since March 27. + +The incremental system works correctly — it just needs a successful full sync to set a recent watermark. Subsequent syncs with a recent watermark correctly return 0 records. + +## Prevention + +| Pattern | Rule | +| -------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------- | +| Strapi document service in loops | Never use `upsertByCoreId` in a loop on Railway. Always use `bulkUpsertByCoreId` for batch operations. | +| Local vs production parity | Test sync performance on Railway, not just local. Network latency between containers amplifies per-query overhead. | +| Controller defaults | When a controller passes options to a service, use `undefined` (not `false`) for unspecified optional booleans to let the service's default apply. | +| Progress visibility | Always show progress during long operations. The collect-all pattern gives zero visibility for the entire fetch duration. | + +## Key Files + +- `apps/cms/src/api/core-sync/services/sync-videos.ts` — video sync (PR #562) +- `apps/cms/src/api/core-sync/services/sync-video-variants.ts` — variant sync (PR #560) +- `apps/cms/src/api/core-sync/services/bulk-upsert.ts` — batch upsert engine (PR #558) +- `apps/cms/src/api/core-sync/controllers/core-sync.ts` — controller incremental fix (PR #560) + +## Related Documentation + +- [Core sync bulk UPDATE temp table pattern](./core-sync-bulk-update-temp-table-pattern.md) +- [Core sync per-page upsert pattern](./core-sync-per-page-upsert-pattern.md) +- [Core sync incremental delta sync](./core-sync-incremental-delta-sync.md)