diff --git a/apps/optimizer/analyzerapp/build.gradle b/apps/optimizer/analyzerapp/build.gradle
new file mode 100644
index 000000000..15947754c
--- /dev/null
+++ b/apps/optimizer/analyzerapp/build.gradle
@@ -0,0 +1,14 @@
+plugins {
+ id 'openhouse.springboot-ext-conventions'
+ id 'org.springframework.boot' version '2.7.8'
+}
+
+// Deployable Spring Boot wrapper around the analyzer library. Holds AnalyzerApplication (the
+// @SpringBootApplication entry point) and application.properties; the analysis logic lives in
+// :services:optimizer:analyzer.
+dependencies {
+ implementation project(':services:optimizer:analyzer')
+ implementation 'org.springframework.boot:spring-boot-starter:2.7.8'
+ implementation 'org.springframework.boot:spring-boot-starter-data-jpa:2.7.8'
+ runtimeOnly 'mysql:mysql-connector-java:8.0.33'
+}
diff --git a/services/optimizer/analyzer/src/main/java/com/linkedin/openhouse/optimizer/analyzer/AnalyzerApplication.java b/apps/optimizer/analyzerapp/src/main/java/com/linkedin/openhouse/optimizer/analyzer/AnalyzerApplication.java
similarity index 100%
rename from services/optimizer/analyzer/src/main/java/com/linkedin/openhouse/optimizer/analyzer/AnalyzerApplication.java
rename to apps/optimizer/analyzerapp/src/main/java/com/linkedin/openhouse/optimizer/analyzer/AnalyzerApplication.java
diff --git a/services/optimizer/analyzer/src/main/resources/application.properties b/apps/optimizer/analyzerapp/src/main/resources/application.properties
similarity index 92%
rename from services/optimizer/analyzer/src/main/resources/application.properties
rename to apps/optimizer/analyzerapp/src/main/resources/application.properties
index ff1601f80..d0e70622a 100644
--- a/services/optimizer/analyzer/src/main/resources/application.properties
+++ b/apps/optimizer/analyzerapp/src/main/resources/application.properties
@@ -6,4 +6,3 @@ spring.datasource.password=${OPTIMIZER_DB_PASSWORD:}
spring.jpa.hibernate.ddl-auto=none
ofd.success-retry-hours=16
ofd.failure-retry-hours=1
-analyzer.tables-page-size=10000
diff --git a/services/optimizer/analyzer/build.gradle b/services/optimizer/analyzer/build.gradle
index f66ecc608..c49951de3 100644
--- a/services/optimizer/analyzer/build.gradle
+++ b/services/optimizer/analyzer/build.gradle
@@ -3,8 +3,22 @@ plugins {
id 'org.springframework.boot' version '2.7.8'
}
+// Library jar — the @SpringBootApplication entry point lives in :apps:optimizer:analyzerapp.
+// Disable bootJar so we don't try to assemble a runnable jar from a library that has no main
+// class; keep jar enabled so consumers (the apps wrapper) get a normal library artifact.
+bootJar {
+ enabled = false
+}
+
+jar {
+ enabled = true
+ archiveClassifier = ''
+}
+
dependencies {
- implementation project(':services:optimizer')
+ // api: the analyzer's public types (e.g. OperationAnalyzer's signature, OperationTypeDto) come
+ // from :services:optimizer, so consumers of this library see them on their compile classpath.
+ api project(':services:optimizer')
implementation 'org.springframework.boot:spring-boot-starter:2.7.8'
implementation 'org.springframework.boot:spring-boot-starter-webflux:2.7.8'
implementation 'org.springframework.boot:spring-boot-starter-data-jpa:2.7.8'
diff --git a/services/optimizer/analyzer/src/main/java/com/linkedin/openhouse/optimizer/analyzer/AnalyzerRunner.java b/services/optimizer/analyzer/src/main/java/com/linkedin/openhouse/optimizer/analyzer/AnalyzerRunner.java
index 2c67fa386..f6901bd0b 100644
--- a/services/optimizer/analyzer/src/main/java/com/linkedin/openhouse/optimizer/analyzer/AnalyzerRunner.java
+++ b/services/optimizer/analyzer/src/main/java/com/linkedin/openhouse/optimizer/analyzer/AnalyzerRunner.java
@@ -13,8 +13,7 @@
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.data.domain.PageRequest;
+import org.springframework.data.domain.Pageable;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
@@ -40,14 +39,6 @@ public class AnalyzerRunner {
private final TableOperationsRepository operationsRepo;
private final TableOperationsHistoryRepository historyRepo;
- /**
- * Page size for the per-database table scan. The analyzer iterates a database's tables in pages
- * of this size and processes every page until the database is exhausted — no tables are
- * dropped because of this bound. Caps the in-memory working set per page, not per cycle.
- */
- @Value("${analyzer.tables-page-size:10000}")
- private int tablesPageSize = 10_000;
-
/**
* Run the analysis loop for {@code operationType} across all databases, with no filters.
* Equivalent to {@link #analyze(OperationTypeDto, Optional, Optional, Optional)} with all-empty
@@ -88,107 +79,91 @@ void analyzeDatabase(
Optional tableName,
Optional tableUuid) {
+ // Load the three join inputs unbounded for this database. Aligned page-by-page pagination on
+ // these maps would leave keys in one map's page mismatched with the others' — a table whose
+ // op/history happens to fall in a different page would be misread as "no current op / no
+ // history" and trigger duplicate scheduling. Correctness requires the maps to be complete
+ // relative to the tables being processed; the working set is bounded by tables-in-db, not by
+ // any per-cycle cap. Memory characterization is tracked in BDP-102738.
+ Map currentOps =
+ operationsRepo
+ .find(
+ Optional.of(analyzer.getOperationType().toDb()),
+ Optional.empty(),
+ tableUuid,
+ Optional.of(databaseName),
+ tableName,
+ Optional.empty(),
+ Optional.empty(),
+ Pageable.unpaged())
+ .stream()
+ .filter(e -> e.getTableUuid() != null)
+ .map(TableOperationDto::fromRow)
+ .collect(
+ Collectors.toMap(
+ TableOperationDto::getTableUuid, op -> op, TableOperationDto::mostRecent));
+
+ Map latestHistory =
+ historyRepo.findLatest(analyzer.getOperationType().toDb(), Pageable.unpaged()).stream()
+ .filter(r -> r.getTableUuid() != null)
+ .map(TableOperationsHistoryDto::fromRow)
+ .collect(
+ Collectors.toMap(
+ TableOperationsHistoryDto::getTableUuid,
+ h -> h,
+ TableOperationsHistoryDto::after));
+
+ List tables =
+ statsRepo.find(Optional.of(databaseName), tableName, tableUuid, Pageable.unpaged()).stream()
+ .filter(row -> row.getTableUuid() != null)
+ .map(TableDto::fromRow)
+ .collect(Collectors.toList());
+
+ /*
+ * For each table in this database, decide whether to create a new PENDING operation.
+ *
+ * 1. Skip tables not opted in to this operation type.
+ * 2. Look up the table's current active operation (if any) and its most recent completed
+ * history entry from the maps loaded above.
+ * 3. Delegate the schedule-or-not decision to the analyzer's shouldSchedule — strategy
+ * encapsulates cadence, retry policy, and any future per-operation signals.
+ * 4. On true, persist a new PENDING operation. The scheduler picks it up on its next pass.
+ */
int created = 0;
int failed = 0;
- int pageNumber = 0;
-
- while (true) {
- PageRequest page = PageRequest.of(pageNumber, tablesPageSize);
-
- // Per-page pre-load: ops and history are re-queried for each table page so the working set
- // stays bounded. Maps are keyed by tableUuid and may miss entries for tables whose
- // operations / history happen to fall in a different page of those queries — affected
- // tables get treated as "no current op / no history" and may produce a duplicate PENDING
- // row that the scheduler's cancelDuplicates path handles. The alternative (load all maps
- // unbounded once per DB) trades that minor duplication for unbounded memory per cycle.
- Map currentOps =
- operationsRepo
- .find(
- Optional.of(analyzer.getOperationType().toDb()),
- Optional.empty(),
- tableUuid,
- Optional.of(databaseName),
- tableName,
- Optional.empty(),
- Optional.empty(),
- page)
- .stream()
- .filter(e -> e.getTableUuid() != null)
- .map(TableOperationDto::fromRow)
- .collect(
- Collectors.toMap(
- TableOperationDto::getTableUuid, op -> op, TableOperationDto::mostRecent));
-
- Map latestHistory =
- historyRepo.findLatest(analyzer.getOperationType().toDb(), page).stream()
- .filter(r -> r.getTableUuid() != null)
- .map(TableOperationsHistoryDto::fromRow)
- .collect(
- Collectors.toMap(
- TableOperationsHistoryDto::getTableUuid,
- h -> h,
- TableOperationsHistoryDto::after));
-
- List tables =
- statsRepo.find(Optional.of(databaseName), tableName, tableUuid, page).stream()
- .filter(row -> row.getTableUuid() != null)
- .map(TableDto::fromRow)
- .collect(Collectors.toList());
-
- if (tables.isEmpty()) {
- break;
+ for (TableDto table : tables) {
+ if (!analyzer.isEnabled(table)) {
+ continue;
}
-
- /*
- * For each table in this page, decide whether to create a new PENDING operation.
- *
- * 1. Skip tables not opted in to this operation type.
- * 2. Look up the table's current active operation (if any) and its most recent completed
- * history entry from the maps loaded above.
- * 3. Delegate the schedule-or-not decision to the analyzer's shouldSchedule — strategy
- * encapsulates cadence, retry policy, and any future per-operation signals.
- * 4. On true, persist a new PENDING operation. The scheduler picks it up on its next pass.
- */
- for (TableDto table : tables) {
- if (!analyzer.isEnabled(table)) {
- continue;
- }
- Optional currentOp =
- Optional.ofNullable(currentOps.get(table.getTableUuid()));
- Optional entry =
- Optional.ofNullable(latestHistory.get(table.getTableUuid()));
- if (!analyzer.shouldSchedule(table, currentOp, entry)) {
- continue;
- }
- try {
- TableOperationDto op = TableOperationDto.pending(table, analyzer.getOperationType());
- operationsRepo.save(op.toRow());
- log.debug(
- "Created PENDING {} operation for table {}.{}",
- analyzer.getOperationType(),
- table.getDatabaseName(),
- table.getTableId());
- created++;
- } catch (RuntimeException e) {
- // One bad table should not abort the rest of the database. Log and continue; the next
- // analyzer pass will retry for any table whose save failed here.
- log.error(
- "Failed to create PENDING {} operation for table {}.{}: {}",
- analyzer.getOperationType(),
- table.getDatabaseName(),
- table.getTableId(),
- e.toString(),
- e);
- failed++;
- }
+ Optional currentOp =
+ Optional.ofNullable(currentOps.get(table.getTableUuid()));
+ Optional entry =
+ Optional.ofNullable(latestHistory.get(table.getTableUuid()));
+ if (!analyzer.shouldSchedule(table, currentOp, entry)) {
+ continue;
}
-
- if (tables.size() < tablesPageSize) {
- break;
+ try {
+ TableOperationDto op = TableOperationDto.pending(table, analyzer.getOperationType());
+ operationsRepo.save(op.toRow());
+ log.debug(
+ "Created PENDING {} operation for table {}.{}",
+ analyzer.getOperationType(),
+ table.getDatabaseName(),
+ table.getTableId());
+ created++;
+ } catch (RuntimeException e) {
+ // One bad table should not abort the rest of the database. Log and continue; the next
+ // analyzer pass will retry for any table whose save failed here.
+ log.error(
+ "Failed to create PENDING {} operation for table {}.{}: {}",
+ analyzer.getOperationType(),
+ table.getDatabaseName(),
+ table.getTableId(),
+ e.toString(),
+ e);
+ failed++;
}
- pageNumber++;
}
-
log.info(
"Finished analyzing Database {}: created {} PENDING {} operation(s) ({} failed)",
databaseName,
diff --git a/services/optimizer/analyzer/src/test/java/com/linkedin/openhouse/optimizer/analyzer/AnalyzerRunnerTest.java b/services/optimizer/analyzer/src/test/java/com/linkedin/openhouse/optimizer/analyzer/AnalyzerRunnerTest.java
index 6bc374609..4731b5fb7 100644
--- a/services/optimizer/analyzer/src/test/java/com/linkedin/openhouse/optimizer/analyzer/AnalyzerRunnerTest.java
+++ b/services/optimizer/analyzer/src/test/java/com/linkedin/openhouse/optimizer/analyzer/AnalyzerRunnerTest.java
@@ -2,10 +2,8 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -195,54 +193,6 @@ void analyze_skipsTable_whenShouldScheduleReturnsFalse() {
verify(operationsRepo, never()).save(any());
}
- @Test
- void analyze_iteratesAllPages_processesEveryTableAcrossPageBoundary() throws Exception {
- // Force a tiny page size so 3 tables span 2 pages: page 0 returns [t1, t2] (full → loop
- // continues), page 1 returns [t3] (partial → loop terminates).
- java.lang.reflect.Field f = AnalyzerRunner.class.getDeclaredField("tablesPageSize");
- f.setAccessible(true);
- f.setInt(runner, 2);
-
- TableStatsRow t1 =
- TableStatsRow.builder().tableUuid("uuid-1").databaseName(DB).tableName("tbl1").build();
- TableStatsRow t2 =
- TableStatsRow.builder().tableUuid("uuid-2").databaseName(DB).tableName("tbl2").build();
- TableStatsRow t3 =
- TableStatsRow.builder().tableUuid("uuid-3").databaseName(DB).tableName("tbl3").build();
-
- when(statsRepo.find(
- eq(Optional.of(DB)),
- eq(Optional.empty()),
- eq(Optional.empty()),
- argThat(p -> p != null && p.getPageNumber() == 0)))
- .thenReturn(List.of(t1, t2));
- when(statsRepo.find(
- eq(Optional.of(DB)),
- eq(Optional.empty()),
- eq(Optional.empty()),
- argThat(p -> p != null && p.getPageNumber() == 1)))
- .thenReturn(List.of(t3));
- when(operationsRepo.find(
- eq(Optional.of(OFD_DB)),
- eq(Optional.empty()),
- eq(Optional.empty()),
- eq(Optional.of(DB)),
- eq(Optional.empty()),
- eq(Optional.empty()),
- eq(Optional.empty()),
- any()))
- .thenReturn(Collections.emptyList());
- when(historyRepo.findLatest(eq(OFD_DB), any())).thenReturn(Collections.emptyList());
- when(analyzer.isEnabled(any())).thenReturn(true);
- when(analyzer.shouldSchedule(any(), eq(Optional.empty()), eq(Optional.empty())))
- .thenReturn(true);
-
- runner.analyze(OFD_TYPE);
-
- // All 3 tables across both pages get a PENDING row saved.
- verify(operationsRepo, times(3)).save(any());
- }
-
@Test
void analyze_skipsTable_whenTableUuidIsNull() {
TableStatsRow statsEntity = TableStatsRow.builder().databaseName(DB).build();
diff --git a/settings.gradle b/settings.gradle
index fb82211f4..810ecd643 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -51,6 +51,7 @@ include ':services:housetables'
include ':services:jobs'
include ':services:optimizer'
include ':services:optimizer:analyzer'
+include ':apps:optimizer:analyzerapp'
include ':services:tables'
include ':tables-test-fixtures:tables-test-fixtures-iceberg-1.2'
include ':tables-test-fixtures:tables-test-fixtures-iceberg-1.5'