Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions apps/optimizer/analyzerapp/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
plugins {
id 'openhouse.springboot-ext-conventions'
id 'org.springframework.boot' version '2.7.8'
}

// Deployable Spring Boot wrapper around the analyzer library. Holds AnalyzerApplication (the
// @SpringBootApplication entry point) and application.properties; the analysis logic lives in
// :services:optimizer:analyzer.
dependencies {
implementation project(':services:optimizer:analyzer')
implementation 'org.springframework.boot:spring-boot-starter:2.7.8'
implementation 'org.springframework.boot:spring-boot-starter-data-jpa:2.7.8'
runtimeOnly 'mysql:mysql-connector-java:8.0.33'
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,3 @@ spring.datasource.password=${OPTIMIZER_DB_PASSWORD:}
spring.jpa.hibernate.ddl-auto=none
ofd.success-retry-hours=16
ofd.failure-retry-hours=1
analyzer.tables-page-size=10000
16 changes: 15 additions & 1 deletion services/optimizer/analyzer/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,22 @@ plugins {
id 'org.springframework.boot' version '2.7.8'
}

// Library jar — the @SpringBootApplication entry point lives in :apps:optimizer:analyzerapp.
// Disable bootJar so we don't try to assemble a runnable jar from a library that has no main
// class; keep jar enabled so consumers (the apps wrapper) get a normal library artifact.
bootJar {
enabled = false
}

jar {
enabled = true
archiveClassifier = ''
}

dependencies {
implementation project(':services:optimizer')
// api: the analyzer's public types (e.g. OperationAnalyzer's signature, OperationTypeDto) come
// from :services:optimizer, so consumers of this library see them on their compile classpath.
api project(':services:optimizer')
implementation 'org.springframework.boot:spring-boot-starter:2.7.8'
implementation 'org.springframework.boot:spring-boot-starter-webflux:2.7.8'
implementation 'org.springframework.boot:spring-boot-starter-data-jpa:2.7.8'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

Expand All @@ -40,14 +39,6 @@ public class AnalyzerRunner {
private final TableOperationsRepository operationsRepo;
private final TableOperationsHistoryRepository historyRepo;

/**
* Page size for the per-database table scan. The analyzer iterates a database's tables in pages
* of this size and processes <em>every</em> page until the database is exhausted — no tables are
* dropped because of this bound. Caps the in-memory working set per page, not per cycle.
*/
@Value("${analyzer.tables-page-size:10000}")
private int tablesPageSize = 10_000;

/**
* Run the analysis loop for {@code operationType} across all databases, with no filters.
* Equivalent to {@link #analyze(OperationTypeDto, Optional, Optional, Optional)} with all-empty
Expand Down Expand Up @@ -88,107 +79,91 @@ void analyzeDatabase(
Optional<String> tableName,
Optional<String> tableUuid) {

// Load the three join inputs unbounded for this database. Aligned page-by-page pagination on
// these maps would leave keys in one map's page mismatched with the others' — a table whose
// op/history happens to fall in a different page would be misread as "no current op / no
// history" and trigger duplicate scheduling. Correctness requires the maps to be complete
// relative to the tables being processed; the working set is bounded by tables-in-db, not by
// any per-cycle cap. Memory characterization is tracked in BDP-102738.
Map<String, TableOperationDto> currentOps =
operationsRepo
.find(
Optional.of(analyzer.getOperationType().toDb()),
Optional.empty(),
tableUuid,
Optional.of(databaseName),
tableName,
Optional.empty(),
Optional.empty(),
Pageable.unpaged())
.stream()
.filter(e -> e.getTableUuid() != null)
.map(TableOperationDto::fromRow)
.collect(
Collectors.toMap(
TableOperationDto::getTableUuid, op -> op, TableOperationDto::mostRecent));

Map<String, TableOperationsHistoryDto> latestHistory =
historyRepo.findLatest(analyzer.getOperationType().toDb(), Pageable.unpaged()).stream()
.filter(r -> r.getTableUuid() != null)
.map(TableOperationsHistoryDto::fromRow)
.collect(
Collectors.toMap(
TableOperationsHistoryDto::getTableUuid,
h -> h,
TableOperationsHistoryDto::after));

List<TableDto> tables =
statsRepo.find(Optional.of(databaseName), tableName, tableUuid, Pageable.unpaged()).stream()
.filter(row -> row.getTableUuid() != null)
.map(TableDto::fromRow)
.collect(Collectors.toList());

/*
* For each table in this database, decide whether to create a new PENDING operation.
*
* 1. Skip tables not opted in to this operation type.
* 2. Look up the table's current active operation (if any) and its most recent completed
* history entry from the maps loaded above.
* 3. Delegate the schedule-or-not decision to the analyzer's shouldSchedule — strategy
* encapsulates cadence, retry policy, and any future per-operation signals.
* 4. On true, persist a new PENDING operation. The scheduler picks it up on its next pass.
*/
int created = 0;
int failed = 0;
int pageNumber = 0;

while (true) {
PageRequest page = PageRequest.of(pageNumber, tablesPageSize);

// Per-page pre-load: ops and history are re-queried for each table page so the working set
// stays bounded. Maps are keyed by tableUuid and may miss entries for tables whose
// operations / history happen to fall in a different page of those queries — affected
// tables get treated as "no current op / no history" and may produce a duplicate PENDING
// row that the scheduler's cancelDuplicates path handles. The alternative (load all maps
// unbounded once per DB) trades that minor duplication for unbounded memory per cycle.
Map<String, TableOperationDto> currentOps =
operationsRepo
.find(
Optional.of(analyzer.getOperationType().toDb()),
Optional.empty(),
tableUuid,
Optional.of(databaseName),
tableName,
Optional.empty(),
Optional.empty(),
page)
.stream()
.filter(e -> e.getTableUuid() != null)
.map(TableOperationDto::fromRow)
.collect(
Collectors.toMap(
TableOperationDto::getTableUuid, op -> op, TableOperationDto::mostRecent));

Map<String, TableOperationsHistoryDto> latestHistory =
historyRepo.findLatest(analyzer.getOperationType().toDb(), page).stream()
.filter(r -> r.getTableUuid() != null)
.map(TableOperationsHistoryDto::fromRow)
.collect(
Collectors.toMap(
TableOperationsHistoryDto::getTableUuid,
h -> h,
TableOperationsHistoryDto::after));

List<TableDto> tables =
statsRepo.find(Optional.of(databaseName), tableName, tableUuid, page).stream()
.filter(row -> row.getTableUuid() != null)
.map(TableDto::fromRow)
.collect(Collectors.toList());

if (tables.isEmpty()) {
break;
for (TableDto table : tables) {
if (!analyzer.isEnabled(table)) {
continue;
}

/*
* For each table in this page, decide whether to create a new PENDING operation.
*
* 1. Skip tables not opted in to this operation type.
* 2. Look up the table's current active operation (if any) and its most recent completed
* history entry from the maps loaded above.
* 3. Delegate the schedule-or-not decision to the analyzer's shouldSchedule — strategy
* encapsulates cadence, retry policy, and any future per-operation signals.
* 4. On true, persist a new PENDING operation. The scheduler picks it up on its next pass.
*/
for (TableDto table : tables) {
if (!analyzer.isEnabled(table)) {
continue;
}
Optional<TableOperationDto> currentOp =
Optional.ofNullable(currentOps.get(table.getTableUuid()));
Optional<TableOperationsHistoryDto> entry =
Optional.ofNullable(latestHistory.get(table.getTableUuid()));
if (!analyzer.shouldSchedule(table, currentOp, entry)) {
continue;
}
try {
TableOperationDto op = TableOperationDto.pending(table, analyzer.getOperationType());
operationsRepo.save(op.toRow());
log.debug(
"Created PENDING {} operation for table {}.{}",
analyzer.getOperationType(),
table.getDatabaseName(),
table.getTableId());
created++;
} catch (RuntimeException e) {
// One bad table should not abort the rest of the database. Log and continue; the next
// analyzer pass will retry for any table whose save failed here.
log.error(
"Failed to create PENDING {} operation for table {}.{}: {}",
analyzer.getOperationType(),
table.getDatabaseName(),
table.getTableId(),
e.toString(),
e);
failed++;
}
Optional<TableOperationDto> currentOp =
Optional.ofNullable(currentOps.get(table.getTableUuid()));
Optional<TableOperationsHistoryDto> entry =
Optional.ofNullable(latestHistory.get(table.getTableUuid()));
if (!analyzer.shouldSchedule(table, currentOp, entry)) {
continue;
}

if (tables.size() < tablesPageSize) {
break;
try {
TableOperationDto op = TableOperationDto.pending(table, analyzer.getOperationType());
operationsRepo.save(op.toRow());
log.debug(
"Created PENDING {} operation for table {}.{}",
analyzer.getOperationType(),
table.getDatabaseName(),
table.getTableId());
created++;
} catch (RuntimeException e) {
// One bad table should not abort the rest of the database. Log and continue; the next
// analyzer pass will retry for any table whose save failed here.
log.error(
"Failed to create PENDING {} operation for table {}.{}: {}",
analyzer.getOperationType(),
table.getDatabaseName(),
table.getTableId(),
e.toString(),
e);
failed++;
}
pageNumber++;
}

log.info(
"Finished analyzing Database {}: created {} PENDING {} operation(s) ({} failed)",
databaseName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -195,54 +193,6 @@ void analyze_skipsTable_whenShouldScheduleReturnsFalse() {
verify(operationsRepo, never()).save(any());
}

@Test
void analyze_iteratesAllPages_processesEveryTableAcrossPageBoundary() throws Exception {
// Force a tiny page size so 3 tables span 2 pages: page 0 returns [t1, t2] (full → loop
// continues), page 1 returns [t3] (partial → loop terminates).
java.lang.reflect.Field f = AnalyzerRunner.class.getDeclaredField("tablesPageSize");
f.setAccessible(true);
f.setInt(runner, 2);

TableStatsRow t1 =
TableStatsRow.builder().tableUuid("uuid-1").databaseName(DB).tableName("tbl1").build();
TableStatsRow t2 =
TableStatsRow.builder().tableUuid("uuid-2").databaseName(DB).tableName("tbl2").build();
TableStatsRow t3 =
TableStatsRow.builder().tableUuid("uuid-3").databaseName(DB).tableName("tbl3").build();

when(statsRepo.find(
eq(Optional.of(DB)),
eq(Optional.empty()),
eq(Optional.empty()),
argThat(p -> p != null && p.getPageNumber() == 0)))
.thenReturn(List.of(t1, t2));
when(statsRepo.find(
eq(Optional.of(DB)),
eq(Optional.empty()),
eq(Optional.empty()),
argThat(p -> p != null && p.getPageNumber() == 1)))
.thenReturn(List.of(t3));
when(operationsRepo.find(
eq(Optional.of(OFD_DB)),
eq(Optional.empty()),
eq(Optional.empty()),
eq(Optional.of(DB)),
eq(Optional.empty()),
eq(Optional.empty()),
eq(Optional.empty()),
any()))
.thenReturn(Collections.emptyList());
when(historyRepo.findLatest(eq(OFD_DB), any())).thenReturn(Collections.emptyList());
when(analyzer.isEnabled(any())).thenReturn(true);
when(analyzer.shouldSchedule(any(), eq(Optional.empty()), eq(Optional.empty())))
.thenReturn(true);

runner.analyze(OFD_TYPE);

// All 3 tables across both pages get a PENDING row saved.
verify(operationsRepo, times(3)).save(any());
}

@Test
void analyze_skipsTable_whenTableUuidIsNull() {
TableStatsRow statsEntity = TableStatsRow.builder().databaseName(DB).build();
Expand Down
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ include ':services:housetables'
include ':services:jobs'
include ':services:optimizer'
include ':services:optimizer:analyzer'
include ':apps:optimizer:analyzerapp'
include ':services:tables'
include ':tables-test-fixtures:tables-test-fixtures-iceberg-1.2'
include ':tables-test-fixtures:tables-test-fixtures-iceberg-1.5'
Expand Down