From 493670e558a5dfa9d14665777e77b9d505c6c58d Mon Sep 17 00:00:00 2001 From: adibarra <93070681+adibarra@users.noreply.github.com> Date: Thu, 26 Mar 2026 20:29:19 -0500 Subject: [PATCH] fix: skip benchmark/stats ingest for evals-only runs --- packages/db/src/etl/changelog-ingest.ts | 12 +- packages/db/src/ingest-ci-run.ts | 297 +++++++++++++----------- packages/db/src/ingest-gcs-backup.ts | 20 +- 3 files changed, 190 insertions(+), 139 deletions(-) diff --git a/packages/db/src/etl/changelog-ingest.ts b/packages/db/src/etl/changelog-ingest.ts index 0efbc27..d7b68a9 100644 --- a/packages/db/src/etl/changelog-ingest.ts +++ b/packages/db/src/etl/changelog-ingest.ts @@ -11,6 +11,7 @@ export interface ChangelogEntry { configKeys: string[]; description: string; prLink: string | null; + evalsOnly: boolean; } /** @@ -37,11 +38,20 @@ export function parseChangelogEntries(raw: unknown): ChangelogEntry[] { (item['pr-link'] ? String(item['pr-link']) : null) ?? description.match(/\bPR:\s*(https?:\/\/\S+)/)?.[1] ?? null; - out.push({ configKeys, description, prLink }); + const evalsOnly = item['evals-only'] === true; + out.push({ configKeys, description, prLink, evalsOnly }); } return out; } +/** + * Returns true if any changelog entry in the given arrays is marked `evals-only: true`. + * When a run is evals-only, its benchmark/perf data should be skipped during ingest. + */ +export function hasEvalsOnlyFlag(changelogs: Array<{ entries: ChangelogEntry[] }>): boolean { + return changelogs.some((c) => c.entries.some((e) => e.evalsOnly)); +} + /** * Insert changelog entries for a workflow run into the `changelog_entries` table. * Uses `ON CONFLICT DO NOTHING` on `(workflow_run_id, base_ref, head_ref)`, so diff --git a/packages/db/src/ingest-ci-run.ts b/packages/db/src/ingest-ci-run.ts index 6da90a4..38b68cd 100644 --- a/packages/db/src/ingest-ci-run.ts +++ b/packages/db/src/ingest-ci-run.ts @@ -40,7 +40,11 @@ import { } from './etl/benchmark-ingest'; import { mapAggEvalRow } from './etl/eval-mapper'; import { ingestEvalRow } from './etl/eval-ingest'; -import { parseChangelogEntries, ingestChangelogEntries } from './etl/changelog-ingest'; +import { + parseChangelogEntries, + ingestChangelogEntries, + hasEvalsOnlyFlag, +} from './etl/changelog-ingest'; // ── Config ────────────────────────────────────────────────────────────────── @@ -251,143 +255,178 @@ async function main(): Promise { let totalEvals = 0; let totalChangelogs = 0; + // ── Check for evals-only flag in changelog ──────────────────────────── + const changelogDir = path.join(artifactsDir, ARTIFACT_NAMES.changelog); + const changelogFiles = findJsonFiles(changelogDir); + const parsedChangelogs: Array<{ + baseRef: string; + headRef: string; + entries: ReturnType; + }> = []; + for (const file of changelogFiles) { + const data = readJson(file) as Record | null; + if (!data || typeof data !== 'object') continue; + const baseRef = String(data.base_ref ?? ''); + const headRef = String(data.head_ref ?? ''); + if (!baseRef || !headRef) continue; + const entries = parseChangelogEntries(data.entries); + if (entries.length > 0) parsedChangelogs.push({ baseRef, headRef, entries }); + } + const evalsOnly = hasEvalsOnlyFlag(parsedChangelogs); + if (evalsOnly) { + console.log('\n ⚠ evals-only run detected — skipping benchmark and stats ingest'); + } + // ── Ingest benchmark results ────────────────────────────────────────── console.log('\n--- Benchmark Results ---'); - const bmkDir = path.join(artifactsDir, ARTIFACT_NAMES.benchmarks); - const bmkFiles = findJsonFiles(bmkDir); - - const allBmkDirs = fs.existsSync(artifactsDir) - ? fs - .readdirSync(artifactsDir) - .filter((d) => d.startsWith('bmk_') || d.startsWith('results_')) - .map((d) => path.join(artifactsDir, d)) - .filter((d) => fs.statSync(d).isDirectory()) - : []; - - const serverLogPaths = new Map(); - if (fs.existsSync(artifactsDir)) { - for (const d of fs.readdirSync(artifactsDir)) { - if (!d.startsWith('server_logs_')) continue; - const logPath = path.join(artifactsDir, d, 'server.log'); - if (!fs.existsSync(logPath)) continue; - const configKey = d.replace(/^server_logs_/, ''); - serverLogPaths.set(configKey, logPath); + if (evalsOnly) { + console.log(' Skipped (evals-only run)'); + } else { + const bmkDir = path.join(artifactsDir, ARTIFACT_NAMES.benchmarks); + const bmkFiles = findJsonFiles(bmkDir); + + const allBmkDirs = fs.existsSync(artifactsDir) + ? fs + .readdirSync(artifactsDir) + .filter((d) => d.startsWith('bmk_') || d.startsWith('results_')) + .map((d) => path.join(artifactsDir, d)) + .filter((d) => fs.statSync(d).isDirectory()) + : []; + + const serverLogPaths = new Map(); + if (fs.existsSync(artifactsDir)) { + for (const d of fs.readdirSync(artifactsDir)) { + if (!d.startsWith('server_logs_')) continue; + const logPath = path.join(artifactsDir, d, 'server.log'); + if (!fs.existsSync(logPath)) continue; + const configKey = d.replace(/^server_logs_/, ''); + serverLogPaths.set(configKey, logPath); + } + } + if (serverLogPaths.size > 0) { + console.log(` Found ${serverLogPaths.size} server log artifact(s)`); } - } - if (serverLogPaths.size > 0) { - console.log(` Found ${serverLogPaths.size} server log artifact(s)`); - } - const allBmkFiles = [...bmkFiles, ...allBmkDirs.flatMap((d) => findJsonFiles(d))]; - console.log(` Found ${allBmkFiles.length} benchmark JSON file(s)`); + const allBmkFiles = [...bmkFiles, ...allBmkDirs.flatMap((d) => findJsonFiles(d))]; + console.log(` Found ${allBmkFiles.length} benchmark JSON file(s)`); - for (const file of allBmkFiles) { - const data = readJson(file); - if (!data) continue; + for (const file of allBmkFiles) { + const data = readJson(file); + if (!data) continue; - const rawRows: Record[] = Array.isArray(data) - ? data - : [data as Record]; + const rawRows: Record[] = Array.isArray(data) + ? data + : [data as Record]; - const rows = rawRows - .filter((r) => typeof r === 'object' && r !== null) - .map((r) => mapBenchmarkRow(r, tracker)) - .filter((r): r is NonNullable => r !== null); + const rows = rawRows + .filter((r) => typeof r === 'object' && r !== null) + .map((r) => mapBenchmarkRow(r, tracker)) + .filter((r): r is NonNullable => r !== null); - if (rows.length === 0) continue; + if (rows.length === 0) continue; - const toInsert = []; - for (const row of rows) { - try { - const configId = await getOrCreateConfig(row.config); - toInsert.push({ ...row, configId }); - } catch (err: any) { - tracker.recordDbError(`config for ${path.basename(file)}`, err); + const toInsert = []; + for (const row of rows) { + try { + const configId = await getOrCreateConfig(row.config); + toInsert.push({ ...row, configId }); + } catch (err: any) { + tracker.recordDbError(`config for ${path.basename(file)}`, err); + } } - } - if (toInsert.length > 0) { - try { - const { newCount, dupCount, insertedIds } = await bulkIngestBenchmarkRows( - sql, - toInsert, - workflowRunId, - date, - ); - totalNewBmk += newCount; - totalDupBmk += dupCount; - - // Build availability only after successful insert - for (const r of toInsert) { - availRows.push({ - model: r.config.model, - isl: r.isl, - osl: r.osl, - precision: r.config.precision, - hardware: r.config.hardware, - framework: r.config.framework, - specMethod: r.config.specMethod, - disagg: r.config.disagg, - }); - } + if (toInsert.length > 0) { + try { + const { newCount, dupCount, insertedIds } = await bulkIngestBenchmarkRows( + sql, + toInsert, + workflowRunId, + date, + ); + totalNewBmk += newCount; + totalDupBmk += dupCount; + + // Build availability only after successful insert + for (const r of toInsert) { + availRows.push({ + model: r.config.model, + isl: r.isl, + osl: r.osl, + precision: r.config.precision, + hardware: r.config.hardware, + framework: r.config.framework, + specMethod: r.config.specMethod, + disagg: r.config.disagg, + }); + } - const parentDir = path.basename(path.dirname(file)); - if (parentDir.startsWith('bmk_') && insertedIds.length > 0) { - const configKey = parentDir.replace(/^bmk_/, ''); - const logPath = serverLogPaths.get(configKey); - if (logPath) { - try { - const serverLog = fs.readFileSync(logPath, 'utf-8').replaceAll('\x00', ''); - await insertServerLog(sql, insertedIds, serverLog); - } catch (err: any) { - tracker.recordDbError(`server_log for ${configKey}`, err); + const parentDir = path.basename(path.dirname(file)); + if (parentDir.startsWith('bmk_') && insertedIds.length > 0) { + const configKey = parentDir.replace(/^bmk_/, ''); + const logPath = serverLogPaths.get(configKey); + if (logPath) { + try { + const serverLog = fs.readFileSync(logPath, 'utf-8').replaceAll('\x00', ''); + await insertServerLog(sql, insertedIds, serverLog); + } catch (err: any) { + tracker.recordDbError(`server_log for ${configKey}`, err); + } } } + } catch (err: any) { + tracker.recordDbError(path.basename(file), err); } - } catch (err: any) { - tracker.recordDbError(path.basename(file), err); } } - } - console.log(` Benchmarks: +${totalNewBmk} new, ${totalDupBmk} dup`); + console.log(` Benchmarks: +${totalNewBmk} new, ${totalDupBmk} dup`); - if (availRows.length > 0) { - try { - await bulkUpsertAvailability(sql, availRows, date); - console.log(` Availability: ${availRows.length} row(s) upserted`); - } catch (err: any) { - tracker.recordDbError('availability', err); + if (availRows.length > 0) { + try { + await bulkUpsertAvailability(sql, availRows, date); + console.log(` Availability: ${availRows.length} row(s) upserted`); + } catch (err: any) { + tracker.recordDbError('availability', err); + } } } // ── Ingest run stats ────────────────────────────────────────────────── console.log('\n--- Run Stats ---'); - const statsDir = path.join(artifactsDir, ARTIFACT_NAMES.runStats); - const statsFiles = findJsonFiles(statsDir); - - const statsRows: Array<{ hardware: string; nSuccess: number; total: number }> = []; - for (const file of statsFiles) { - const data = readJson(file) as Record | null; - if (!data || typeof data !== 'object' || Array.isArray(data)) continue; - for (const [hwKey, stats] of Object.entries(data)) { - if (!GPU_KEYS.has(hwKey)) continue; - if (typeof stats?.n_success !== 'number' || typeof stats?.total !== 'number') continue; - statsRows.push({ hardware: hwKey, nSuccess: stats.n_success, total: stats.total }); + if (evalsOnly) { + console.log(' Skipped (evals-only run)'); + } else { + const statsDir = path.join(artifactsDir, ARTIFACT_NAMES.runStats); + const statsFiles = findJsonFiles(statsDir); + + const statsRows: Array<{ hardware: string; nSuccess: number; total: number }> = []; + for (const file of statsFiles) { + const data = readJson(file) as Record | null; + if (!data || typeof data !== 'object' || Array.isArray(data)) continue; + for (const [hwKey, stats] of Object.entries(data)) { + if (!GPU_KEYS.has(hwKey)) continue; + if (typeof stats?.n_success !== 'number' || typeof stats?.total !== 'number') continue; + statsRows.push({ hardware: hwKey, nSuccess: stats.n_success, total: stats.total }); + } } - } - if (statsRows.length > 0) { - try { - const { newCount, dupCount } = await bulkIngestRunStats(sql, statsRows, workflowRunId, date); - totalNewStats = newCount; - totalDupStats = dupCount; - } catch (err: any) { - tracker.recordDbError('run_stats', err); + if (statsRows.length > 0) { + try { + const { newCount, dupCount } = await bulkIngestRunStats( + sql, + statsRows, + workflowRunId, + date, + ); + totalNewStats = newCount; + totalDupStats = dupCount; + } catch (err: any) { + tracker.recordDbError('run_stats', err); + } } + console.log(` Run stats: +${totalNewStats} new, ${totalDupStats} dup`); } - console.log(` Run stats: +${totalNewStats} new, ${totalDupStats} dup`); // ── Ingest eval results ─────────────────────────────────────────────── @@ -414,34 +453,22 @@ async function main(): Promise { } console.log(` Eval results: +${totalEvals} new`); - // ── Ingest changelog ────────────────────────────────────────────────── + // ── Ingest changelog (already parsed above for evals-only check) ───── console.log('\n--- Changelog ---'); - const changelogDir = path.join(artifactsDir, ARTIFACT_NAMES.changelog); - const changelogFiles = findJsonFiles(changelogDir); - - for (const file of changelogFiles) { - const data = readJson(file) as Record | null; - if (!data || typeof data !== 'object') continue; - const baseRef = String(data.base_ref ?? ''); - const headRef = String(data.head_ref ?? ''); - if (!baseRef || !headRef) continue; - - const entries = parseChangelogEntries(data.entries); - if (entries.length > 0) { - try { - const inserted = await ingestChangelogEntries( - sql, - workflowRunId, - date, - baseRef, - headRef, - entries, - ); - totalChangelogs += inserted; - } catch (err: any) { - tracker.recordDbError('changelog', err); - } + for (const { baseRef, headRef, entries } of parsedChangelogs) { + try { + const inserted = await ingestChangelogEntries( + sql, + workflowRunId, + date, + baseRef, + headRef, + entries, + ); + totalChangelogs += inserted; + } catch (err: any) { + tracker.recordDbError('changelog', err); } } console.log(` Changelog: +${totalChangelogs} new`); diff --git a/packages/db/src/ingest-gcs-backup.ts b/packages/db/src/ingest-gcs-backup.ts index 8fe175e..1633caa 100644 --- a/packages/db/src/ingest-gcs-backup.ts +++ b/packages/db/src/ingest-gcs-backup.ts @@ -46,7 +46,11 @@ import { } from './etl/benchmark-ingest'; import { mapEvalRow, mapAggEvalRow, type EvalParams } from './etl/eval-mapper'; import { ingestEvalRow } from './etl/eval-ingest'; -import { parseChangelogEntries, ingestChangelogEntries } from './etl/changelog-ingest'; +import { + parseChangelogEntries, + ingestChangelogEntries, + hasEvalsOnlyFlag, +} from './etl/changelog-ingest'; import { readZipJson, readZipJsonMap, readZipText } from './etl/zip-reader'; const GCS_DIR = path.join(import.meta.dirname, '..', '..', '..', 'gcs'); @@ -82,6 +86,8 @@ interface WorkflowMapResult { statsRows: Array<{ hardware: string; nSuccess: number; total: number }>; evalRows: EvalParams[]; changelogs: Array<{ baseRef: string; headRef: string; entries: ChangelogEntry[] }>; + /** True when the changelog declares evals-only — benchmark/stats data is dropped. */ + evalsOnly: boolean; /** Skip counts from mapping phase (dbError is tracked separately in phase 2). */ localSkips: Omit; localUnmappedModels: Set; @@ -372,6 +378,13 @@ async function mapWorkflowDir( if (entries.length > 0) changelogs.push({ baseRef, headRef, entries }); } + const evalsOnly = hasEvalsOnlyFlag(changelogs); + if (evalsOnly) { + console.log( + ` [${dateDir}] evals-only run ${githubRunId} — skipping ${bmkZips.length} benchmark ZIP(s) and ${statsRows.length} stats row(s)`, + ); + } + return { dateDir, workflowDir, @@ -381,10 +394,11 @@ async function mapWorkflowDir( headSha, createdAt, ghInfo, - bmkZips, - statsRows, + bmkZips: evalsOnly ? [] : bmkZips, + statsRows: evalsOnly ? [] : statsRows, evalRows, changelogs, + evalsOnly, localSkips: { badZip: local.skips.badZip, unmappedModel: local.skips.unmappedModel,