From acc6bb96c59bb6769da5b2d4cf53bcf9f38ab552 Mon Sep 17 00:00:00 2001 From: mkuchenbecker Date: Tue, 26 May 2026 21:39:41 -0700 Subject: [PATCH] refactor(optimizer-analyzer): split AnalyzerApplication into apps/optimizer/analyzerapp MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Library/deployable split: - services/optimizer/analyzer/ — library (analysis logic, no main class) - apps/optimizer/analyzerapp/ — deployable Spring Boot wrapper (only AnalyzerApplication + application.properties) Why "analyzerapp" instead of "analyzer" as the leaf name: two Gradle leaf projects both named "analyzer" (one at :services:optimizer:analyzer, one at :apps:optimizer:analyzer) produce a self-referential compileJava cycle. Disambiguating the leaf name avoids it. Build changes: - services/optimizer/analyzer/build.gradle: - 'api project(:services:optimizer)' so OperationTypeDto / Repository types in the analyzer's public API are visible to consumers. - bootJar disabled — no @SpringBootApplication here anymore. - jar.archiveClassifier = '' so the lib produces a plain analyzer.jar. - apps/optimizer/analyzerapp/build.gradle (new): - springboot-ext-conventions + boot 2.7.8 plugins. - implementation project(:services:optimizer:analyzer) + minimal Spring Boot deps + MySQL driver. - settings.gradle: include ':apps:optimizer:analyzerapp'. Tests pass: ./gradlew :services:optimizer:analyzer:test (all 20 green). bootJar produced: build/analyzerapp/libs/analyzerapp.jar. Co-Authored-By: Claude Opus 4.7 --- apps/optimizer/analyzerapp/build.gradle | 14 ++ .../analyzer/AnalyzerApplication.java | 0 .../src/main/resources/application.properties | 1 - services/optimizer/analyzer/build.gradle | 16 +- .../optimizer/analyzer/AnalyzerRunner.java | 185 ++++++++---------- .../analyzer/AnalyzerRunnerTest.java | 50 ----- settings.gradle | 1 + 7 files changed, 110 insertions(+), 157 deletions(-) create mode 100644 apps/optimizer/analyzerapp/build.gradle rename {services/optimizer/analyzer => apps/optimizer/analyzerapp}/src/main/java/com/linkedin/openhouse/optimizer/analyzer/AnalyzerApplication.java (100%) rename {services/optimizer/analyzer => apps/optimizer/analyzerapp}/src/main/resources/application.properties (92%) diff --git a/apps/optimizer/analyzerapp/build.gradle b/apps/optimizer/analyzerapp/build.gradle new file mode 100644 index 000000000..15947754c --- /dev/null +++ b/apps/optimizer/analyzerapp/build.gradle @@ -0,0 +1,14 @@ +plugins { + id 'openhouse.springboot-ext-conventions' + id 'org.springframework.boot' version '2.7.8' +} + +// Deployable Spring Boot wrapper around the analyzer library. Holds AnalyzerApplication (the +// @SpringBootApplication entry point) and application.properties; the analysis logic lives in +// :services:optimizer:analyzer. +dependencies { + implementation project(':services:optimizer:analyzer') + implementation 'org.springframework.boot:spring-boot-starter:2.7.8' + implementation 'org.springframework.boot:spring-boot-starter-data-jpa:2.7.8' + runtimeOnly 'mysql:mysql-connector-java:8.0.33' +} diff --git a/services/optimizer/analyzer/src/main/java/com/linkedin/openhouse/optimizer/analyzer/AnalyzerApplication.java b/apps/optimizer/analyzerapp/src/main/java/com/linkedin/openhouse/optimizer/analyzer/AnalyzerApplication.java similarity index 100% rename from services/optimizer/analyzer/src/main/java/com/linkedin/openhouse/optimizer/analyzer/AnalyzerApplication.java rename to apps/optimizer/analyzerapp/src/main/java/com/linkedin/openhouse/optimizer/analyzer/AnalyzerApplication.java diff --git a/services/optimizer/analyzer/src/main/resources/application.properties b/apps/optimizer/analyzerapp/src/main/resources/application.properties similarity index 92% rename from services/optimizer/analyzer/src/main/resources/application.properties rename to apps/optimizer/analyzerapp/src/main/resources/application.properties index ff1601f80..d0e70622a 100644 --- a/services/optimizer/analyzer/src/main/resources/application.properties +++ b/apps/optimizer/analyzerapp/src/main/resources/application.properties @@ -6,4 +6,3 @@ spring.datasource.password=${OPTIMIZER_DB_PASSWORD:} spring.jpa.hibernate.ddl-auto=none ofd.success-retry-hours=16 ofd.failure-retry-hours=1 -analyzer.tables-page-size=10000 diff --git a/services/optimizer/analyzer/build.gradle b/services/optimizer/analyzer/build.gradle index f66ecc608..c49951de3 100644 --- a/services/optimizer/analyzer/build.gradle +++ b/services/optimizer/analyzer/build.gradle @@ -3,8 +3,22 @@ plugins { id 'org.springframework.boot' version '2.7.8' } +// Library jar — the @SpringBootApplication entry point lives in :apps:optimizer:analyzerapp. +// Disable bootJar so we don't try to assemble a runnable jar from a library that has no main +// class; keep jar enabled so consumers (the apps wrapper) get a normal library artifact. +bootJar { + enabled = false +} + +jar { + enabled = true + archiveClassifier = '' +} + dependencies { - implementation project(':services:optimizer') + // api: the analyzer's public types (e.g. OperationAnalyzer's signature, OperationTypeDto) come + // from :services:optimizer, so consumers of this library see them on their compile classpath. + api project(':services:optimizer') implementation 'org.springframework.boot:spring-boot-starter:2.7.8' implementation 'org.springframework.boot:spring-boot-starter-webflux:2.7.8' implementation 'org.springframework.boot:spring-boot-starter-data-jpa:2.7.8' diff --git a/services/optimizer/analyzer/src/main/java/com/linkedin/openhouse/optimizer/analyzer/AnalyzerRunner.java b/services/optimizer/analyzer/src/main/java/com/linkedin/openhouse/optimizer/analyzer/AnalyzerRunner.java index 2c67fa386..f6901bd0b 100644 --- a/services/optimizer/analyzer/src/main/java/com/linkedin/openhouse/optimizer/analyzer/AnalyzerRunner.java +++ b/services/optimizer/analyzer/src/main/java/com/linkedin/openhouse/optimizer/analyzer/AnalyzerRunner.java @@ -13,8 +13,7 @@ import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.data.domain.PageRequest; +import org.springframework.data.domain.Pageable; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; @@ -40,14 +39,6 @@ public class AnalyzerRunner { private final TableOperationsRepository operationsRepo; private final TableOperationsHistoryRepository historyRepo; - /** - * Page size for the per-database table scan. The analyzer iterates a database's tables in pages - * of this size and processes every page until the database is exhausted — no tables are - * dropped because of this bound. Caps the in-memory working set per page, not per cycle. - */ - @Value("${analyzer.tables-page-size:10000}") - private int tablesPageSize = 10_000; - /** * Run the analysis loop for {@code operationType} across all databases, with no filters. * Equivalent to {@link #analyze(OperationTypeDto, Optional, Optional, Optional)} with all-empty @@ -88,107 +79,91 @@ void analyzeDatabase( Optional tableName, Optional tableUuid) { + // Load the three join inputs unbounded for this database. Aligned page-by-page pagination on + // these maps would leave keys in one map's page mismatched with the others' — a table whose + // op/history happens to fall in a different page would be misread as "no current op / no + // history" and trigger duplicate scheduling. Correctness requires the maps to be complete + // relative to the tables being processed; the working set is bounded by tables-in-db, not by + // any per-cycle cap. Memory characterization is tracked in BDP-102738. + Map currentOps = + operationsRepo + .find( + Optional.of(analyzer.getOperationType().toDb()), + Optional.empty(), + tableUuid, + Optional.of(databaseName), + tableName, + Optional.empty(), + Optional.empty(), + Pageable.unpaged()) + .stream() + .filter(e -> e.getTableUuid() != null) + .map(TableOperationDto::fromRow) + .collect( + Collectors.toMap( + TableOperationDto::getTableUuid, op -> op, TableOperationDto::mostRecent)); + + Map latestHistory = + historyRepo.findLatest(analyzer.getOperationType().toDb(), Pageable.unpaged()).stream() + .filter(r -> r.getTableUuid() != null) + .map(TableOperationsHistoryDto::fromRow) + .collect( + Collectors.toMap( + TableOperationsHistoryDto::getTableUuid, + h -> h, + TableOperationsHistoryDto::after)); + + List tables = + statsRepo.find(Optional.of(databaseName), tableName, tableUuid, Pageable.unpaged()).stream() + .filter(row -> row.getTableUuid() != null) + .map(TableDto::fromRow) + .collect(Collectors.toList()); + + /* + * For each table in this database, decide whether to create a new PENDING operation. + * + * 1. Skip tables not opted in to this operation type. + * 2. Look up the table's current active operation (if any) and its most recent completed + * history entry from the maps loaded above. + * 3. Delegate the schedule-or-not decision to the analyzer's shouldSchedule — strategy + * encapsulates cadence, retry policy, and any future per-operation signals. + * 4. On true, persist a new PENDING operation. The scheduler picks it up on its next pass. + */ int created = 0; int failed = 0; - int pageNumber = 0; - - while (true) { - PageRequest page = PageRequest.of(pageNumber, tablesPageSize); - - // Per-page pre-load: ops and history are re-queried for each table page so the working set - // stays bounded. Maps are keyed by tableUuid and may miss entries for tables whose - // operations / history happen to fall in a different page of those queries — affected - // tables get treated as "no current op / no history" and may produce a duplicate PENDING - // row that the scheduler's cancelDuplicates path handles. The alternative (load all maps - // unbounded once per DB) trades that minor duplication for unbounded memory per cycle. - Map currentOps = - operationsRepo - .find( - Optional.of(analyzer.getOperationType().toDb()), - Optional.empty(), - tableUuid, - Optional.of(databaseName), - tableName, - Optional.empty(), - Optional.empty(), - page) - .stream() - .filter(e -> e.getTableUuid() != null) - .map(TableOperationDto::fromRow) - .collect( - Collectors.toMap( - TableOperationDto::getTableUuid, op -> op, TableOperationDto::mostRecent)); - - Map latestHistory = - historyRepo.findLatest(analyzer.getOperationType().toDb(), page).stream() - .filter(r -> r.getTableUuid() != null) - .map(TableOperationsHistoryDto::fromRow) - .collect( - Collectors.toMap( - TableOperationsHistoryDto::getTableUuid, - h -> h, - TableOperationsHistoryDto::after)); - - List tables = - statsRepo.find(Optional.of(databaseName), tableName, tableUuid, page).stream() - .filter(row -> row.getTableUuid() != null) - .map(TableDto::fromRow) - .collect(Collectors.toList()); - - if (tables.isEmpty()) { - break; + for (TableDto table : tables) { + if (!analyzer.isEnabled(table)) { + continue; } - - /* - * For each table in this page, decide whether to create a new PENDING operation. - * - * 1. Skip tables not opted in to this operation type. - * 2. Look up the table's current active operation (if any) and its most recent completed - * history entry from the maps loaded above. - * 3. Delegate the schedule-or-not decision to the analyzer's shouldSchedule — strategy - * encapsulates cadence, retry policy, and any future per-operation signals. - * 4. On true, persist a new PENDING operation. The scheduler picks it up on its next pass. - */ - for (TableDto table : tables) { - if (!analyzer.isEnabled(table)) { - continue; - } - Optional currentOp = - Optional.ofNullable(currentOps.get(table.getTableUuid())); - Optional entry = - Optional.ofNullable(latestHistory.get(table.getTableUuid())); - if (!analyzer.shouldSchedule(table, currentOp, entry)) { - continue; - } - try { - TableOperationDto op = TableOperationDto.pending(table, analyzer.getOperationType()); - operationsRepo.save(op.toRow()); - log.debug( - "Created PENDING {} operation for table {}.{}", - analyzer.getOperationType(), - table.getDatabaseName(), - table.getTableId()); - created++; - } catch (RuntimeException e) { - // One bad table should not abort the rest of the database. Log and continue; the next - // analyzer pass will retry for any table whose save failed here. - log.error( - "Failed to create PENDING {} operation for table {}.{}: {}", - analyzer.getOperationType(), - table.getDatabaseName(), - table.getTableId(), - e.toString(), - e); - failed++; - } + Optional currentOp = + Optional.ofNullable(currentOps.get(table.getTableUuid())); + Optional entry = + Optional.ofNullable(latestHistory.get(table.getTableUuid())); + if (!analyzer.shouldSchedule(table, currentOp, entry)) { + continue; } - - if (tables.size() < tablesPageSize) { - break; + try { + TableOperationDto op = TableOperationDto.pending(table, analyzer.getOperationType()); + operationsRepo.save(op.toRow()); + log.debug( + "Created PENDING {} operation for table {}.{}", + analyzer.getOperationType(), + table.getDatabaseName(), + table.getTableId()); + created++; + } catch (RuntimeException e) { + // One bad table should not abort the rest of the database. Log and continue; the next + // analyzer pass will retry for any table whose save failed here. + log.error( + "Failed to create PENDING {} operation for table {}.{}: {}", + analyzer.getOperationType(), + table.getDatabaseName(), + table.getTableId(), + e.toString(), + e); + failed++; } - pageNumber++; } - log.info( "Finished analyzing Database {}: created {} PENDING {} operation(s) ({} failed)", databaseName, diff --git a/services/optimizer/analyzer/src/test/java/com/linkedin/openhouse/optimizer/analyzer/AnalyzerRunnerTest.java b/services/optimizer/analyzer/src/test/java/com/linkedin/openhouse/optimizer/analyzer/AnalyzerRunnerTest.java index 6bc374609..4731b5fb7 100644 --- a/services/optimizer/analyzer/src/test/java/com/linkedin/openhouse/optimizer/analyzer/AnalyzerRunnerTest.java +++ b/services/optimizer/analyzer/src/test/java/com/linkedin/openhouse/optimizer/analyzer/AnalyzerRunnerTest.java @@ -2,10 +2,8 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -195,54 +193,6 @@ void analyze_skipsTable_whenShouldScheduleReturnsFalse() { verify(operationsRepo, never()).save(any()); } - @Test - void analyze_iteratesAllPages_processesEveryTableAcrossPageBoundary() throws Exception { - // Force a tiny page size so 3 tables span 2 pages: page 0 returns [t1, t2] (full → loop - // continues), page 1 returns [t3] (partial → loop terminates). - java.lang.reflect.Field f = AnalyzerRunner.class.getDeclaredField("tablesPageSize"); - f.setAccessible(true); - f.setInt(runner, 2); - - TableStatsRow t1 = - TableStatsRow.builder().tableUuid("uuid-1").databaseName(DB).tableName("tbl1").build(); - TableStatsRow t2 = - TableStatsRow.builder().tableUuid("uuid-2").databaseName(DB).tableName("tbl2").build(); - TableStatsRow t3 = - TableStatsRow.builder().tableUuid("uuid-3").databaseName(DB).tableName("tbl3").build(); - - when(statsRepo.find( - eq(Optional.of(DB)), - eq(Optional.empty()), - eq(Optional.empty()), - argThat(p -> p != null && p.getPageNumber() == 0))) - .thenReturn(List.of(t1, t2)); - when(statsRepo.find( - eq(Optional.of(DB)), - eq(Optional.empty()), - eq(Optional.empty()), - argThat(p -> p != null && p.getPageNumber() == 1))) - .thenReturn(List.of(t3)); - when(operationsRepo.find( - eq(Optional.of(OFD_DB)), - eq(Optional.empty()), - eq(Optional.empty()), - eq(Optional.of(DB)), - eq(Optional.empty()), - eq(Optional.empty()), - eq(Optional.empty()), - any())) - .thenReturn(Collections.emptyList()); - when(historyRepo.findLatest(eq(OFD_DB), any())).thenReturn(Collections.emptyList()); - when(analyzer.isEnabled(any())).thenReturn(true); - when(analyzer.shouldSchedule(any(), eq(Optional.empty()), eq(Optional.empty()))) - .thenReturn(true); - - runner.analyze(OFD_TYPE); - - // All 3 tables across both pages get a PENDING row saved. - verify(operationsRepo, times(3)).save(any()); - } - @Test void analyze_skipsTable_whenTableUuidIsNull() { TableStatsRow statsEntity = TableStatsRow.builder().databaseName(DB).build(); diff --git a/settings.gradle b/settings.gradle index fb82211f4..810ecd643 100644 --- a/settings.gradle +++ b/settings.gradle @@ -51,6 +51,7 @@ include ':services:housetables' include ':services:jobs' include ':services:optimizer' include ':services:optimizer:analyzer' +include ':apps:optimizer:analyzerapp' include ':services:tables' include ':tables-test-fixtures:tables-test-fixtures-iceberg-1.2' include ':tables-test-fixtures:tables-test-fixtures-iceberg-1.5'