diff --git a/apps/optimizer/schedulerapp/build.gradle b/apps/optimizer/schedulerapp/build.gradle new file mode 100644 index 000000000..5de1a827d --- /dev/null +++ b/apps/optimizer/schedulerapp/build.gradle @@ -0,0 +1,15 @@ +plugins { + id 'openhouse.springboot-ext-conventions' + id 'openhouse.maven-publish' + id 'org.springframework.boot' version '2.7.8' +} + +// Deployable Spring Boot wrapper around the scheduler library. Holds SchedulerApplication (the +// @SpringBootApplication entry point) and application.properties; the scheduling logic lives in +// :services:optimizer:scheduler. +dependencies { + implementation project(':services:optimizer:scheduler') + implementation 'org.springframework.boot:spring-boot-starter:2.7.8' + implementation 'org.springframework.boot:spring-boot-starter-data-jpa:2.7.8' + runtimeOnly 'mysql:mysql-connector-java:8.0.33' +} diff --git a/apps/optimizer/schedulerapp/src/main/java/com/linkedin/openhouse/optimizer/scheduler/SchedulerApplication.java b/apps/optimizer/schedulerapp/src/main/java/com/linkedin/openhouse/optimizer/scheduler/SchedulerApplication.java new file mode 100644 index 000000000..b1f06e5d3 --- /dev/null +++ b/apps/optimizer/schedulerapp/src/main/java/com/linkedin/openhouse/optimizer/scheduler/SchedulerApplication.java @@ -0,0 +1,59 @@ +package com.linkedin.openhouse.optimizer.scheduler; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.CommandLineRunner; +import org.springframework.boot.ExitCodeGenerator; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.domain.EntityScan; +import org.springframework.data.jpa.repository.config.EnableJpaRepositories; + +/** + * Entry point for the Optimizer Scheduler application. + * + *

Spring Batch–style: implements {@link CommandLineRunner} so the work runs after context + * startup, and {@link ExitCodeGenerator} so the JVM exit code reflects batch outcome. {@code + * SpringApplication.exit(...)} closes the context (triggers {@code @PreDestroy} hooks, drains the + * JPA pool, etc.) so the k8s CronJob pod terminates cleanly with a status reflecting reality. + */ +@Slf4j +@SpringBootApplication +@EntityScan(basePackages = "com.linkedin.openhouse.optimizer.db") +@EnableJpaRepositories(basePackages = "com.linkedin.openhouse.optimizer.repository") +public class SchedulerApplication implements CommandLineRunner, ExitCodeGenerator { + + private final SchedulerRunner runner; + private int exitCode = 0; + + @Autowired + public SchedulerApplication(SchedulerRunner runner) { + this.runner = runner; + } + + public static void main(String[] args) { + System.exit(SpringApplication.exit(SpringApplication.run(SchedulerApplication.class, args))); + } + + /** + * Runs the scheduler once per registered operation type per process invocation. Any thrown + * exception is logged and surfaces as a non-zero exit code via {@link #getExitCode()} after the + * context is shut down cleanly. + */ + @Override + public void run(String... args) { + try { + log.info("Scheduler starting; operation types: {}", runner.getRegisteredOperationTypes()); + runner.getRegisteredOperationTypes().forEach(runner::schedule); + log.info("Scheduler completed successfully"); + } catch (Exception e) { + log.error("Scheduler failed", e); + exitCode = 1; + } + } + + @Override + public int getExitCode() { + return exitCode; + } +} diff --git a/apps/optimizer/schedulerapp/src/main/resources/application.properties b/apps/optimizer/schedulerapp/src/main/resources/application.properties new file mode 100644 index 000000000..b43a66459 --- /dev/null +++ b/apps/optimizer/schedulerapp/src/main/resources/application.properties @@ -0,0 +1,14 @@ +spring.application.name=openhouse-optimizer-scheduler +spring.main.web-application-type=none +spring.main.banner-mode=off +spring.datasource.url=${OPTIMIZER_DB_URL:jdbc:h2:mem:schedulerdb;DB_CLOSE_DELAY=-1;MODE=MySQL} +spring.datasource.username=${OPTIMIZER_DB_USER:sa} +spring.datasource.password=${OPTIMIZER_DB_PASSWORD:} +spring.jpa.hibernate.ddl-auto=none +optimizer.scheduler.jobs.base-uri=${JOBS_BASE_URI:http://localhost:8002} +# Per-bin caps for ORPHAN_FILES_DELETION; 0 disables a dimension. File count is the OFD cost +# driver — per-file list, manifest joins, and delete calls dominate, independent of file size. +optimizer.scheduler.ofd.max-files-per-bin=${SCHEDULER_OFD_MAX_FILES_PER_BIN:1000000} +optimizer.scheduler.ofd.max-tables-per-bin=${SCHEDULER_OFD_MAX_TABLES_PER_BIN:50} +optimizer.scheduler.results-endpoint=${SCHEDULER_RESULTS_ENDPOINT:http://openhouse-optimizer:8080/v1/optimizer/operations} +optimizer.scheduler.cluster-id=${SCHEDULER_CLUSTER_ID:LocalHadoopCluster} diff --git a/libs/optimizer/binpack/build.gradle b/libs/optimizer/binpack/build.gradle new file mode 100644 index 000000000..ace4c3f89 --- /dev/null +++ b/libs/optimizer/binpack/build.gradle @@ -0,0 +1,15 @@ +plugins { + id 'openhouse.java-conventions' + id 'openhouse.maven-publish' +} + +dependencies { + // api: bin-packing types reference TableOperationDto / TableStatsDto, which consumers need on + // their compile classpath. + api project(':services:optimizer') + api 'org.slf4j:slf4j-api:1.7.36' +} + +test { + useJUnitPlatform() +} diff --git a/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/Bin.java b/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/Bin.java new file mode 100644 index 000000000..09a6d5939 --- /dev/null +++ b/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/Bin.java @@ -0,0 +1,20 @@ +package com.linkedin.openhouse.optimizer.binpack; + +import com.linkedin.openhouse.optimizer.model.OperationTypeDto; +import java.util.List; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.ToString; + +/** + * One scheduling unit: the operation type the bin will run as, and the items the scheduler will + * claim, narrow to claimed, and launch a single Spark job for. Pure data — the scheduler reads from + * a bin to do the work; the bin does no IO itself. + */ +@AllArgsConstructor +@Getter +@ToString +public class Bin { + private final OperationTypeDto operationType; + private final List items; +} diff --git a/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/BinItem.java b/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/BinItem.java new file mode 100644 index 000000000..6736ea5dc --- /dev/null +++ b/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/BinItem.java @@ -0,0 +1,24 @@ +package com.linkedin.openhouse.optimizer.binpack; + +import com.linkedin.openhouse.optimizer.model.TableOperationDto; +import com.linkedin.openhouse.optimizer.model.TableStatsDto; + +/** + * One packable unit. Exposes the weight a packer keys on, plus the identity the scheduler reads + * when it launches a Spark job (fully-qualified table name, operation id). + * + *

Implementations have a public no-arg constructor — instantiated transiently inside {@link + * FirstFitBinPacker#pack} via a {@code Supplier} (typically a {@code + * MyItem::new} method reference) — on which {@link #fromOpAndStats} is called to return the + * populated item. Getters on the empty instance are not meaningful; it exists for the lifetime of a + * single projection call. + */ +public interface BinItem { + long getWeight(); + + String getFullyQualifiedTableName(); + + String getOperationId(); + + BinItem fromOpAndStats(TableOperationDto op, TableStatsDto stats); +} diff --git a/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/BinPacker.java b/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/BinPacker.java new file mode 100644 index 000000000..92df50f4e --- /dev/null +++ b/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/BinPacker.java @@ -0,0 +1,18 @@ +package com.linkedin.openhouse.optimizer.binpack; + +import com.linkedin.openhouse.optimizer.model.TableOperationDto; +import com.linkedin.openhouse.optimizer.model.TableStatsDto; +import java.util.List; +import java.util.Map; + +/** + * Per-operation-type strategy. Given a flat list of operations and the corresponding stats, returns + * one grouping per batch the scheduler should submit. The scheduler wraps each grouping into a + * {@link Bin} with the registered operation type. Implementations do no IO and hold no mutable + * state; the projection from {@code (op, stats)} to {@link BinItem} and the bucketing strategy both + * live in the implementation. + */ +public interface BinPacker { + List> pack( + List operations, Map statsByTableUuid); +} diff --git a/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/FirstFitBinPacker.java b/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/FirstFitBinPacker.java new file mode 100644 index 000000000..9cf452709 --- /dev/null +++ b/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/FirstFitBinPacker.java @@ -0,0 +1,92 @@ +package com.linkedin.openhouse.optimizer.binpack; + +import com.linkedin.openhouse.optimizer.model.TableOperationDto; +import com.linkedin.openhouse.optimizer.model.TableStatsDto; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; + +/** + * First-fit-decreasing packing, generic over the concrete {@link BinItem} subtype {@code T}. + * Construction takes a {@code Supplier} — typically a {@code MyItem::new} method reference — + * which the packer invokes per operation to get an empty instance, then calls {@link + * BinItem#fromOpAndStats} on it to project the (operation, stats) pair into a populated item. + * + *

Sorts items by weight descending, then places each into the first group whose totals stay at + * or below {@code maxWeightPerBin} and {@code maxItemsPerBin}. An item whose weight exceeds the cap + * on its own goes into a group by itself. Operations whose {@code tableUuid} has no entry in {@code + * statsByTableUuid} are dropped. + * + *

Stateless: the constructor takes only the BinItem supplier and the cap configuration; {@link + * #pack} is a pure function over its arguments. The packer is operation-agnostic — the scheduler + * wraps each grouping into a {@link Bin} with the registered operation type. + */ +@Slf4j +public class FirstFitBinPacker implements BinPacker { + + private final Supplier binItemSupplier; + private final long maxWeightPerBin; + private final int maxItemsPerBin; + + public FirstFitBinPacker(Supplier binItemSupplier, long maxWeightPerBin, int maxItemsPerBin) { + this.binItemSupplier = binItemSupplier; + this.maxWeightPerBin = maxWeightPerBin; + this.maxItemsPerBin = maxItemsPerBin; + } + + @Override + public List> pack( + List operations, Map statsByTableUuid) { + List items = + operations.stream() + .filter(op -> statsByTableUuid.containsKey(op.getTableUuid())) + .map( + op -> + binItemSupplier + .get() + .fromOpAndStats(op, statsByTableUuid.get(op.getTableUuid()))) + .collect(Collectors.toList()); + List packingBins = + items.stream() + .sorted(Comparator.comparingLong(BinItem::getWeight).reversed()) + .collect(ArrayList::new, this::placeItem, List::addAll); + log.info( + "Packed {} operations ({} items after projection) into {} groupings", + operations.size(), + items.size(), + packingBins.size()); + return packingBins.stream().map(pb -> pb.items).collect(Collectors.toList()); + } + + private void placeItem(List bins, BinItem item) { + bins.stream() + .filter(b -> b.fits(item, maxWeightPerBin, maxItemsPerBin)) + .findFirst() + .ifPresentOrElse( + b -> b.add(item), + () -> { + PackingBin fresh = new PackingBin(); + fresh.add(item); + bins.add(fresh); + }); + } + + /** Running-totals helper used during the fold. */ + private static class PackingBin { + final List items = new ArrayList<>(); + long totalWeight; + + boolean fits(BinItem item, long maxWeight, int maxItems) { + return items.size() < maxItems && totalWeight + item.getWeight() <= maxWeight; + } + + void add(BinItem item) { + items.add(item); + totalWeight += item.getWeight(); + } + } +} diff --git a/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/TotalFilesBinItem.java b/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/TotalFilesBinItem.java new file mode 100644 index 000000000..70d63bb39 --- /dev/null +++ b/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/TotalFilesBinItem.java @@ -0,0 +1,49 @@ +package com.linkedin.openhouse.optimizer.binpack; + +import com.linkedin.openhouse.optimizer.model.TableOperationDto; +import com.linkedin.openhouse.optimizer.model.TableStatsDto; +import java.util.Optional; +import lombok.Getter; +import lombok.ToString; + +/** + * {@link BinItem} that weights by the table's current file count. Suitable for any operation whose + * Spark cost scales with file count — orphan files deletion, stats collection, etc. The + * implementation knows nothing about which operation type it is wired up to. + * + *

Construction: callers pass {@code TotalFilesBinItem::new} as the {@code Supplier} to {@link + * FirstFitBinPacker}; the packer calls the supplier per operation to get an empty instance, then + * {@link #fromOpAndStats} on it to get a populated copy. + */ +@Getter +@ToString +public class TotalFilesBinItem implements BinItem { + + private final String fullyQualifiedTableName; + private final String operationId; + private final long weight; + + /** Empty constructor: call {@link #fromOpAndStats} on the result to get a populated instance. */ + public TotalFilesBinItem() { + this("", "", 0L); + } + + private TotalFilesBinItem(String fullyQualifiedTableName, String operationId, long weight) { + this.fullyQualifiedTableName = fullyQualifiedTableName; + this.operationId = operationId; + this.weight = weight; + } + + @Override + public BinItem fromOpAndStats(TableOperationDto op, TableStatsDto stats) { + return new TotalFilesBinItem( + op.getDatabaseName() + "." + op.getTableName(), op.getId(), currentFileCount(stats)); + } + + private static long currentFileCount(TableStatsDto stats) { + return Optional.ofNullable(stats) + .map(TableStatsDto::getSnapshot) + .map(TableStatsDto.SnapshotMetrics::getNumCurrentFiles) + .orElse(0L); + } +} diff --git a/libs/optimizer/binpack/src/test/java/com/linkedin/openhouse/optimizer/binpack/FirstFitBinPackerTest.java b/libs/optimizer/binpack/src/test/java/com/linkedin/openhouse/optimizer/binpack/FirstFitBinPackerTest.java new file mode 100644 index 000000000..602f41711 --- /dev/null +++ b/libs/optimizer/binpack/src/test/java/com/linkedin/openhouse/optimizer/binpack/FirstFitBinPackerTest.java @@ -0,0 +1,150 @@ +package com.linkedin.openhouse.optimizer.binpack; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.linkedin.openhouse.optimizer.model.TableOperationDto; +import com.linkedin.openhouse.optimizer.model.TableStatsDto; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import lombok.Getter; +import org.junit.jupiter.api.Test; + +/** + * Tests the {@link FirstFitBinPacker} bucketing logic in isolation via a {@link TestItem} whose + * weight comes from a {@code "weight"} entry in {@code tableProperties}. The supplier-then-{@code + * fromOpAndStats} pattern is exercised end-to-end through the public {@code pack} entry point. + * Projection logic for production BinItems (e.g. {@link TotalFilesBinItem}) is covered by their own + * tests. + */ +class FirstFitBinPackerTest { + + @Getter + static class TestItem implements BinItem { + private final String operationId; + private final long weight; + + public TestItem() { + this("", 0L); + } + + private TestItem(String operationId, long weight) { + this.operationId = operationId; + this.weight = weight; + } + + @Override + public String getFullyQualifiedTableName() { + return "db.tbl_" + operationId; + } + + @Override + public BinItem fromOpAndStats(TableOperationDto op, TableStatsDto stats) { + long w = Long.parseLong(stats.getTableProperties().get("weight")); + return new TestItem(op.getId(), w); + } + } + + private static TableOperationDto op(String id) { + return TableOperationDto.builder().id(id).tableUuid(id).build(); + } + + private static TableStatsDto statsWithWeight(String uuid, long weight) { + return TableStatsDto.builder() + .tableUuid(uuid) + .tableProperties(Map.of("weight", Long.toString(weight))) + .build(); + } + + private static List opsList(String... ids) { + return java.util.Arrays.stream(ids).map(FirstFitBinPackerTest::op).collect(Collectors.toList()); + } + + private static Map statsMap(Object... uuidWeightPairs) { + Map map = new HashMap<>(); + for (int i = 0; i < uuidWeightPairs.length; i += 2) { + String uuid = (String) uuidWeightPairs[i]; + long weight = (long) uuidWeightPairs[i + 1]; + map.put(uuid, statsWithWeight(uuid, weight)); + } + return map; + } + + private static FirstFitBinPacker packer(long maxWeight, int maxItems) { + return new FirstFitBinPacker<>(TestItem::new, maxWeight, maxItems); + } + + @Test + void emptyInput_returnsEmptyGroupings() { + assertThat(packer(100L, 10).pack(List.of(), Map.of())).isEmpty(); + } + + @Test + void singleItem_oneGrouping() { + List> groupings = packer(1_000_000L, 10).pack(opsList("a"), statsMap("a", 100L)); + assertThat(groupings).hasSize(1); + assertThat(groupings.get(0)).hasSize(1); + } + + @Test + void underWeightLimit_oneGrouping() { + List> groupings = + packer(1_000_000L, 10) + .pack(opsList("a", "b", "c"), statsMap("a", 300_000L, "b", 300_000L, "c", 300_000L)); + assertThat(groupings).hasSize(1); + assertThat(groupings.get(0)).hasSize(3); + } + + @Test + void overWeightLimit_twoGroupings() { + List> groupings = + packer(1_000_000L, 10) + .pack(opsList("a", "b", "c"), statsMap("a", 600_000L, "b", 600_000L, "c", 400_000L)); + assertThat(groupings).hasSize(2); + // FFD: sort desc → 600, 600, 400. Place 600 → bin0; next 600 doesn't fit bin0, → bin1; 400 + // fits bin0 (total 1_000_000). + long b0 = groupings.get(0).stream().mapToLong(BinItem::getWeight).sum(); + long b1 = groupings.get(1).stream().mapToLong(BinItem::getWeight).sum(); + assertThat(b0).isEqualTo(1_000_000L); + assertThat(b1).isEqualTo(600_000L); + } + + @Test + void itemLargerThanCap_getsOwnGrouping() { + List> groupings = + packer(1_000L, 10).pack(opsList("big"), statsMap("big", 5_000L)); + assertThat(groupings).hasSize(1); + assertThat(groupings.get(0)).hasSize(1); + } + + @Test + void sortedDescending_largestFirst() { + List> groupings = + packer(2_000_000L, 10) + .pack(opsList("small", "large"), statsMap("small", 100L, "large", 900_000L)); + assertThat(groupings).hasSize(1); + List ids = + groupings.get(0).stream().map(BinItem::getOperationId).collect(Collectors.toList()); + assertThat(ids).containsExactly("large", "small"); + } + + @Test + void maxItemsCap_splitsGroupings() { + List> groupings = + packer(1_000_000L, 2) + .pack(opsList("a", "b", "c", "d"), statsMap("a", 1L, "b", 1L, "c", 1L, "d", 1L)); + assertThat(groupings).hasSize(2); + assertThat(groupings.get(0)).hasSize(2); + assertThat(groupings.get(1)).hasSize(2); + } + + @Test + void operationsWithoutStats_dropped() { + List> groupings = + packer(1_000_000L, 10).pack(opsList("a", "missing"), statsMap("a", 100L)); + assertThat(groupings).hasSize(1); + assertThat(groupings.get(0)).hasSize(1); + assertThat(groupings.get(0).get(0).getOperationId()).isEqualTo("a"); + } +} diff --git a/libs/optimizer/binpack/src/test/java/com/linkedin/openhouse/optimizer/binpack/TotalFilesBinItemTest.java b/libs/optimizer/binpack/src/test/java/com/linkedin/openhouse/optimizer/binpack/TotalFilesBinItemTest.java new file mode 100644 index 000000000..dcf081ad7 --- /dev/null +++ b/libs/optimizer/binpack/src/test/java/com/linkedin/openhouse/optimizer/binpack/TotalFilesBinItemTest.java @@ -0,0 +1,70 @@ +package com.linkedin.openhouse.optimizer.binpack; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.linkedin.openhouse.optimizer.model.OperationTypeDto; +import com.linkedin.openhouse.optimizer.model.TableOperationDto; +import com.linkedin.openhouse.optimizer.model.TableStatsDto; +import java.util.UUID; +import org.junit.jupiter.api.Test; + +class TotalFilesBinItemTest { + + private static TableOperationDto op() { + return TableOperationDto.builder() + .id(UUID.randomUUID().toString()) + .tableUuid(UUID.randomUUID().toString()) + .databaseName("db1") + .tableName("tbl1") + .operationType(OperationTypeDto.ORPHAN_FILES_DELETION) + .build(); + } + + private static TableStatsDto statsWithFiles(Long fileCount) { + return TableStatsDto.builder() + .snapshot(TableStatsDto.SnapshotMetrics.builder().numCurrentFiles(fileCount).build()) + .build(); + } + + @Test + void fromOpAndStats_buildsFullyQualifiedNameAndOperationId() { + TableOperationDto op = op(); + BinItem item = new TotalFilesBinItem().fromOpAndStats(op, statsWithFiles(42L)); + + assertThat(item.getFullyQualifiedTableName()).isEqualTo("db1.tbl1"); + assertThat(item.getOperationId()).isEqualTo(op.getId()); + } + + @Test + void fromOpAndStats_weightIsCurrentFileCount() { + BinItem item = new TotalFilesBinItem().fromOpAndStats(op(), statsWithFiles(123_456L)); + assertThat(item.getWeight()).isEqualTo(123_456L); + } + + @Test + void fromOpAndStats_nullStats_weightIsZero() { + BinItem item = new TotalFilesBinItem().fromOpAndStats(op(), null); + assertThat(item.getWeight()).isEqualTo(0L); + } + + @Test + void fromOpAndStats_nullSnapshot_weightIsZero() { + BinItem item = new TotalFilesBinItem().fromOpAndStats(op(), TableStatsDto.builder().build()); + assertThat(item.getWeight()).isEqualTo(0L); + } + + @Test + void fromOpAndStats_nullFileCount_weightIsZero() { + BinItem item = new TotalFilesBinItem().fromOpAndStats(op(), statsWithFiles(null)); + assertThat(item.getWeight()).isEqualTo(0L); + } + + @Test + void emptyInstance_doesNotShareStateWithPopulated() { + TotalFilesBinItem empty = new TotalFilesBinItem(); + BinItem populated = empty.fromOpAndStats(op(), statsWithFiles(7L)); + + assertThat(empty.getWeight()).isEqualTo(0L); + assertThat(populated.getWeight()).isEqualTo(7L); + } +} diff --git a/services/optimizer/scheduler/build.gradle b/services/optimizer/scheduler/build.gradle new file mode 100644 index 000000000..039e2e53e --- /dev/null +++ b/services/optimizer/scheduler/build.gradle @@ -0,0 +1,36 @@ +plugins { + id 'openhouse.springboot-ext-conventions' + id 'openhouse.maven-publish' + id 'org.springframework.boot' version '2.7.8' +} + +// Library jar — the @SpringBootApplication entry point lives in :apps:optimizer:schedulerapp. +// Disable bootJar so we don't try to assemble a runnable jar from a library that has no main +// class; keep jar enabled so consumers (the apps wrapper) get a normal library artifact. +bootJar { + enabled = false +} + +jar { + enabled = true + archiveClassifier = '' +} + +dependencies { + // api: the scheduler's public types (e.g. Bin, BinPacker, OperationTypeDto) come from + // :services:optimizer and :libs:optimizer:binpack, so consumers of this library see them on + // their compile classpath. + api project(':services:optimizer') + api project(':libs:optimizer:binpack') + implementation 'org.springframework.boot:spring-boot-starter:2.7.8' + implementation 'org.springframework.boot:spring-boot-starter-webflux:2.7.8' + implementation 'org.springframework.boot:spring-boot-starter-data-jpa:2.7.8' + implementation 'org.springframework.boot:spring-boot-starter-aop:2.7.8' + runtimeOnly 'mysql:mysql-connector-java:8.0.33' + testImplementation 'org.springframework.boot:spring-boot-starter-test:2.7.8' + testRuntimeOnly 'com.h2database:h2' +} + +test { + useJUnitPlatform() +} diff --git a/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/SchedulerRunner.java b/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/SchedulerRunner.java new file mode 100644 index 000000000..eade78242 --- /dev/null +++ b/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/SchedulerRunner.java @@ -0,0 +1,251 @@ +package com.linkedin.openhouse.optimizer.scheduler; + +import com.linkedin.openhouse.optimizer.binpack.Bin; +import com.linkedin.openhouse.optimizer.binpack.BinItem; +import com.linkedin.openhouse.optimizer.binpack.BinPacker; +import com.linkedin.openhouse.optimizer.db.OperationStatus; +import com.linkedin.openhouse.optimizer.db.TableOperationsRow; +import com.linkedin.openhouse.optimizer.db.TableStatsRow; +import com.linkedin.openhouse.optimizer.model.OperationTypeDto; +import com.linkedin.openhouse.optimizer.model.TableOperationDto; +import com.linkedin.openhouse.optimizer.model.TableStatsDto; +import com.linkedin.openhouse.optimizer.repository.TableOperationsRepository; +import com.linkedin.openhouse.optimizer.repository.TableStatsRepository; +import com.linkedin.openhouse.optimizer.scheduler.client.JobsServiceClient; +import java.time.Instant; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.domain.Pageable; +import org.springframework.transaction.annotation.Transactional; + +/** + * Generic scheduler. Operation types are registered at construction via {@link #registerOperation}, + * which returns a new instance with the additional entry — the registry is immutable, so the bean + * Spring publishes is the fully-registered runner produced in {@link + * com.linkedin.openhouse.optimizer.scheduler.config.SchedulerConfig}. For each registered type the + * runner: + * + *

    + *
  1. Reads PENDING rows from MySQL. + *
  2. For each {@code tableUuid}, picks the oldest PENDING row to schedule and the rest to cancel + * — both lists derived independently from the same grouping. + *
  3. Cancels the duplicate rows; loads stats for the rows to schedule. + *
  4. Hands the (operations, stats) pair to the {@link BinPacker} and receives one grouping per + * batch. + *
  5. Wraps each grouping into a {@link Bin} tagged with the operation type and schedules it + * (claim CAS, narrow to claimed, launch, record). + *
+ * + *

The runner is operation-agnostic. All IO and the claim/launch/mark lifecycle live here; the + * only per-operation knowledge in the module is the {@link BinPacker} the caller registers. + */ +@Slf4j +public class SchedulerRunner { + + private final TableOperationsRepository operationsRepo; + private final TableStatsRepository statsRepo; + private final JobsServiceClient jobsClient; + private final String resultsEndpoint; + private final Map registry; + + public SchedulerRunner( + TableOperationsRepository operationsRepo, + TableStatsRepository statsRepo, + JobsServiceClient jobsClient, + String resultsEndpoint) { + this(operationsRepo, statsRepo, jobsClient, resultsEndpoint, Map.of()); + } + + private SchedulerRunner( + TableOperationsRepository operationsRepo, + TableStatsRepository statsRepo, + JobsServiceClient jobsClient, + String resultsEndpoint, + Map registry) { + this.operationsRepo = operationsRepo; + this.statsRepo = statsRepo; + this.jobsClient = jobsClient; + this.resultsEndpoint = resultsEndpoint; + this.registry = registry; + } + + /** + * Return a new {@link SchedulerRunner} whose registry is this one's plus {@code (type, packer)}. + * If {@code type} was already registered, the new entry replaces the prior one. Pure: the + * receiver is unchanged. + */ + public SchedulerRunner registerOperation(OperationTypeDto type, BinPacker packer) { + HashMap next = new HashMap<>(registry); + next.put(type, packer); + return new SchedulerRunner( + operationsRepo, statsRepo, jobsClient, resultsEndpoint, Map.copyOf(next)); + } + + public Set getRegisteredOperationTypes() { + return registry.keySet(); + } + + public void schedule(OperationTypeDto type) { + schedule(type, Optional.empty(), Optional.empty()); + } + + public void schedule( + OperationTypeDto type, Optional databaseName, Optional tableName) { + Comparator oldestFirst = + Comparator.comparing(TableOperationsRow::getCreatedAt) + .thenComparing(TableOperationsRow::getId); + + // Per tableUuid, the oldest row (lex-tiebreak on id) is the one we schedule; any others are + // duplicates we cancel. Both lists are derived independently from the same grouping — + // scheduling does not wait on cancellation and is not predicated on its outcome. + Map> byTableUuid = + operationsRepo + .find( + Optional.of(type.toDb()), + Optional.of(OperationStatus.PENDING), + Optional.empty(), + databaseName, + tableName, + Optional.empty(), + Optional.empty(), + Pageable.unpaged()) + .stream() + .collect(Collectors.groupingBy(TableOperationsRow::getTableUuid)); + + // Cancel the duplicate-per-tableUuid rows as the terminal side effect of the dedup pipeline. + // The collectingAndThen finisher short-circuits the IN () clause that Hibernate will not run. + byTableUuid.values().stream() + .filter(rows -> rows.size() > 1) + .flatMap(rows -> rows.stream().sorted(oldestFirst).skip(1)) + .map(TableOperationsRow::getId) + .collect( + Collectors.collectingAndThen( + Collectors.toList(), + ids -> { + if (!ids.isEmpty()) { + operationsRepo.cancel(ids); + } + return null; + })); + // Read stats, bin pack, and schedule the operations per bin. + Optional.ofNullable(registry.get(type)) + .ifPresent( + packer -> + packer + .pack( + byTableUuid.values().stream() + .map(rows -> rows.stream().min(oldestFirst).orElseThrow()) + .map(TableOperationDto::fromRow) + .collect(Collectors.toList()), + statsRepo.findAllById(byTableUuid.keySet()).stream() + .collect( + Collectors.toMap( + TableStatsRow::getTableUuid, TableStatsDto::fromRow))) + .stream() + .map(grouping -> new Bin(type, grouping)) + .forEach(this::scheduleBin)); + } + + /** + * Claim the bin's operations, narrow to the rows actually owned, launch one batched Spark job for + * the claimed subset, and mark SCHEDULED — or revert to PENDING if launch failed. + */ + @Transactional + void scheduleBin(Bin bin) { + Instant claimedAt = Instant.now(); + List claimedItems = claim(bin, claimedAt); + + if (claimedItems.isEmpty()) { + log.info("All rows in bin already claimed by another scheduler instance; skipping"); + return; + } + if (claimedItems.size() < bin.getItems().size()) { + log.info( + "Partial claim: {} of {} ops in bin claimed; launching job for claimed subset only", + claimedItems.size(), + bin.getItems().size()); + } + + List claimedIds = + claimedItems.stream().map(BinItem::getOperationId).collect(Collectors.toList()); + + jobsClient + .launch( + String.format( + "batched-%s-%d", + bin.getOperationType().name().toLowerCase(), claimedAt.toEpochMilli()), + bin.getOperationType().name(), + claimedItems.stream() + .map(BinItem::getFullyQualifiedTableName) + .collect(Collectors.toList()), + claimedIds, + resultsEndpoint) + .ifPresentOrElse( + jobId -> { + int updated = + operationsRepo.updateBatch( + claimedIds, + OperationStatus.SCHEDULING, + OperationStatus.SCHEDULED, + Optional.empty(), + Optional.of(jobId)); + log.info( + "Submitted job {} for {} tables ({} rows marked SCHEDULED)", + jobId, + claimedItems.size(), + updated); + }, + () -> { + int reverted = + operationsRepo.updateBatch( + claimedIds, + OperationStatus.SCHEDULING, + OperationStatus.PENDING, + Optional.empty(), + Optional.empty()); + log.warn( + "Job submission failed; reverted {} claimed rows back to PENDING for retry on the" + + " next pass", + reverted); + }); + } + + /** + * CAS-claim every item in the bin from PENDING to SCHEDULING with {@code claimedAt} as the + * watermark, then narrow {@link Bin#getItems()} to those rows this caller actually owns. Items + * lost to a racing scheduler are dropped. Returns the {@link BinItem}s ready for launch. + */ + private List claim(Bin bin, Instant claimedAt) { + List ids = + bin.getItems().stream().map(BinItem::getOperationId).collect(Collectors.toList()); + operationsRepo.updateBatch( + ids, + OperationStatus.PENDING, + OperationStatus.SCHEDULING, + Optional.of(claimedAt), + Optional.empty()); + Set claimedIds = + operationsRepo + .find( + Optional.empty(), + Optional.of(OperationStatus.SCHEDULING), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.of(claimedAt), + Optional.of(ids), + Pageable.unpaged()) + .stream() + .map(TableOperationsRow::getId) + .collect(Collectors.toSet()); + return bin.getItems().stream() + .filter(item -> claimedIds.contains(item.getOperationId())) + .collect(Collectors.toList()); + } +} diff --git a/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/client/JobsServiceClient.java b/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/client/JobsServiceClient.java new file mode 100644 index 000000000..ee8fa38ee --- /dev/null +++ b/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/client/JobsServiceClient.java @@ -0,0 +1,80 @@ +package com.linkedin.openhouse.optimizer.scheduler.client; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import java.time.Duration; +import java.util.List; +import java.util.Optional; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.reactive.function.client.WebClient; + +/** + * Client for the OpenHouse Jobs Service. + * + *

Submits one {@code BatchedOrphanFilesDeletionSparkApp} job per bin via {@code POST /jobs}. + */ +@Slf4j +public class JobsServiceClient { + + private static final ObjectMapper MAPPER = new ObjectMapper(); + private static final Duration TIMEOUT = Duration.ofSeconds(30); + + private final WebClient webClient; + private final String clusterId; + + public JobsServiceClient(WebClient webClient, String clusterId) { + this.webClient = webClient; + this.clusterId = clusterId; + } + + /** + * Submit a batched Spark job for the given tables. + * + * @param jobName human-readable name for the job + * @param jobType operation type string (e.g. "ORPHAN_FILES_DELETION") + * @param tableNames fully-qualified table names (db.table) + * @param operationIds operation UUIDs — parallel to tableNames + * @param resultsEndpoint base URL the Spark app PATCHes results back to + * @return job ID if the submission succeeded, empty if an error occurred + */ + public Optional launch( + String jobName, + String jobType, + List tableNames, + List operationIds, + String resultsEndpoint) { + try { + ObjectNode body = MAPPER.createObjectNode(); + body.put("jobName", jobName); + body.put("clusterId", clusterId); + + ObjectNode jobConf = body.putObject("jobConf"); + jobConf.put("jobType", jobType); + + ArrayNode args = jobConf.putArray("args"); + args.add("--tableNames"); + args.add(String.join(",", tableNames)); + args.add("--operationIds"); + args.add(String.join(",", operationIds)); + args.add("--resultsEndpoint"); + args.add(resultsEndpoint); + + String responseBody = + webClient + .post() + .uri("/jobs") + .bodyValue(body) + .retrieve() + .bodyToMono(String.class) + .timeout(TIMEOUT) + .block(); + + String jobId = MAPPER.readTree(responseBody).path("jobId").asText(null); + return Optional.ofNullable(jobId); + } catch (Exception e) { + log.error("Failed to submit job '{}': {}", jobName, e.getMessage()); + return Optional.empty(); + } + } +} diff --git a/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/config/SchedulerConfig.java b/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/config/SchedulerConfig.java new file mode 100644 index 000000000..82af33275 --- /dev/null +++ b/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/config/SchedulerConfig.java @@ -0,0 +1,57 @@ +package com.linkedin.openhouse.optimizer.scheduler.config; + +import com.linkedin.openhouse.optimizer.binpack.FirstFitBinPacker; +import com.linkedin.openhouse.optimizer.binpack.TotalFilesBinItem; +import com.linkedin.openhouse.optimizer.model.OperationTypeDto; +import com.linkedin.openhouse.optimizer.repository.TableOperationsRepository; +import com.linkedin.openhouse.optimizer.repository.TableStatsRepository; +import com.linkedin.openhouse.optimizer.scheduler.SchedulerRunner; +import com.linkedin.openhouse.optimizer.scheduler.client.JobsServiceClient; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.reactive.function.client.WebClient; + +/** + * Cross-cutting wiring (jobs-service client) plus the {@link SchedulerRunner} bean. Each operation + * type's identity (type, packing strategy, item supplier) is composed in {@link #schedulerRunner}; + * the runner itself never names an operation type beyond the keys in its registry. + */ +@Configuration +public class SchedulerConfig { + + @Value("${optimizer.scheduler.jobs.base-uri}") + private String jobsBaseUri; + + @Value("${optimizer.scheduler.cluster-id}") + private String clusterId; + + @Bean + public WebClient jobsWebClient() { + return WebClient.builder().baseUrl(jobsBaseUri).build(); + } + + @Bean + public JobsServiceClient jobsServiceClient(WebClient jobsWebClient) { + return new JobsServiceClient(jobsWebClient, clusterId); + } + + /** + * Orphan files deletion: a {@link FirstFitBinPacker} over {@link TotalFilesBinItem}. Cost scales + * with file count — per-file list, manifest joins, and delete calls dominate independent of file + * size. + */ + @Bean + public SchedulerRunner schedulerRunner( + TableOperationsRepository operationsRepo, + TableStatsRepository statsRepo, + JobsServiceClient jobsClient, + @Value("${optimizer.scheduler.results-endpoint}") String resultsEndpoint, + @Value("${optimizer.scheduler.ofd.max-files-per-bin}") long ofdMaxFilesPerBin, + @Value("${optimizer.scheduler.ofd.max-tables-per-bin}") int ofdMaxTablesPerBin) { + return new SchedulerRunner(operationsRepo, statsRepo, jobsClient, resultsEndpoint) + .registerOperation( + OperationTypeDto.ORPHAN_FILES_DELETION, + new FirstFitBinPacker<>(TotalFilesBinItem::new, ofdMaxFilesPerBin, ofdMaxTablesPerBin)); + } +} diff --git a/services/optimizer/scheduler/src/test/java/com/linkedin/openhouse/optimizer/scheduler/SchedulerRunnerTest.java b/services/optimizer/scheduler/src/test/java/com/linkedin/openhouse/optimizer/scheduler/SchedulerRunnerTest.java new file mode 100644 index 000000000..92df3287d --- /dev/null +++ b/services/optimizer/scheduler/src/test/java/com/linkedin/openhouse/optimizer/scheduler/SchedulerRunnerTest.java @@ -0,0 +1,344 @@ +package com.linkedin.openhouse.optimizer.scheduler; + +import static com.linkedin.openhouse.optimizer.model.OperationTypeDto.ORPHAN_FILES_DELETION; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.linkedin.openhouse.optimizer.binpack.FirstFitBinPacker; +import com.linkedin.openhouse.optimizer.binpack.TotalFilesBinItem; +import com.linkedin.openhouse.optimizer.db.OperationStatus; +import com.linkedin.openhouse.optimizer.db.SnapshotMetrics; +import com.linkedin.openhouse.optimizer.db.TableOperationsRow; +import com.linkedin.openhouse.optimizer.db.TableStatsRow; +import com.linkedin.openhouse.optimizer.repository.TableOperationsRepository; +import com.linkedin.openhouse.optimizer.repository.TableStatsRepository; +import com.linkedin.openhouse.optimizer.scheduler.client.JobsServiceClient; +import java.time.Instant; +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class SchedulerRunnerTest { + + private static final String RESULTS_ENDPOINT = "http://localhost:8080/v1/optimizer/operations"; + + @Mock private TableOperationsRepository operationsRepo; + @Mock private TableStatsRepository statsRepo; + @Mock private JobsServiceClient jobsClient; + + private SchedulerRunner runner; + + @BeforeEach + void setUp() { + // A real packer — the runner exercises the full pipeline against actual bucketing and the + // packer's projection logic, while the IO is mocked. + runner = + new SchedulerRunner(operationsRepo, statsRepo, jobsClient, RESULTS_ENDPOINT) + .registerOperation( + ORPHAN_FILES_DELETION, + new FirstFitBinPacker<>(TotalFilesBinItem::new, 1_000_000L, 50)); + } + + // ---- Stubbing helpers ---- + + private void stubFindPending(List rows) { + when(operationsRepo.find( + eq(Optional.of(ORPHAN_FILES_DELETION.toDb())), + eq(Optional.of(OperationStatus.PENDING)), + eq(Optional.empty()), + eq(Optional.empty()), + eq(Optional.empty()), + eq(Optional.empty()), + eq(Optional.empty()), + any())) + .thenReturn(rows); + } + + private void stubFindClaimed(List rows) { + when(operationsRepo.find( + eq(Optional.empty()), + eq(Optional.of(OperationStatus.SCHEDULING)), + eq(Optional.empty()), + eq(Optional.empty()), + eq(Optional.empty()), + any(), + any(), + any())) + .thenReturn(rows); + } + + private TableOperationsRow pendingRow(String uuid, String db, String table) { + return TableOperationsRow.builder() + .id(UUID.randomUUID().toString()) + .tableUuid(uuid) + .databaseName(db) + .tableName(table) + .operationType(ORPHAN_FILES_DELETION.toDb()) + .status(OperationStatus.PENDING) + .createdAt(Instant.now()) + .build(); + } + + private TableOperationsRow schedulingRow(TableOperationsRow source) { + return source.toBuilder().status(OperationStatus.SCHEDULING).build(); + } + + private TableStatsRow statsRow(String uuid, long numCurrentFiles) { + return TableStatsRow.builder() + .tableUuid(uuid) + .snapshot(SnapshotMetrics.builder().numCurrentFiles(numCurrentFiles).build()) + .build(); + } + + // ---- Tests ---- + + @Test + void schedule_unknownOperationType_doesNotLaunch() { + SchedulerRunner empty = + new SchedulerRunner(operationsRepo, statsRepo, jobsClient, RESULTS_ENDPOINT); + stubFindPending(List.of(pendingRow(UUID.randomUUID().toString(), "db1", "tbl1"))); + + empty.schedule(ORPHAN_FILES_DELETION); + + verify(jobsClient, never()).launch(anyString(), anyString(), anyList(), anyList(), anyString()); + } + + @Test + void getRegisteredOperationTypes_returnsRegisteredSet() { + assertThat(runner.getRegisteredOperationTypes()).containsExactly(ORPHAN_FILES_DELETION); + } + + @Test + void schedule_noPendingOps_noJobSubmitted() { + stubFindPending(List.of()); + + runner.schedule(ORPHAN_FILES_DELETION); + + verify(jobsClient, never()).launch(anyString(), anyString(), anyList(), anyList(), anyString()); + } + + @Test + void schedule_allOpsWithoutStats_noJobSubmitted() { + TableOperationsRow row = pendingRow(UUID.randomUUID().toString(), "db1", "tbl1"); + stubFindPending(List.of(row)); + when(statsRepo.findAllById(any())).thenReturn(List.of()); + + runner.schedule(ORPHAN_FILES_DELETION); + + verify(jobsClient, never()).launch(anyString(), anyString(), anyList(), anyList(), anyString()); + } + + @Test + void schedule_singleBin_claimsAndMarksScheduled() { + String uuid = UUID.randomUUID().toString(); + TableOperationsRow row = pendingRow(uuid, "db1", "tbl1"); + + stubFindPending(List.of(row)); + when(statsRepo.findAllById(any())).thenReturn(List.of(statsRow(uuid, 100_000L))); + when(operationsRepo.updateBatch( + anyList(), eq(OperationStatus.PENDING), eq(OperationStatus.SCHEDULING), any(), any())) + .thenReturn(1); + stubFindClaimed(List.of(schedulingRow(row))); + when(operationsRepo.updateBatch( + anyList(), eq(OperationStatus.SCHEDULING), eq(OperationStatus.SCHEDULED), any(), any())) + .thenReturn(1); + when(jobsClient.launch(anyString(), anyString(), anyList(), anyList(), anyString())) + .thenReturn(Optional.of("job-123")); + + runner.schedule(ORPHAN_FILES_DELETION); + + verify(operationsRepo) + .updateBatch( + eq(List.of(row.getId())), + eq(OperationStatus.SCHEDULING), + eq(OperationStatus.SCHEDULED), + eq(Optional.empty()), + eq(Optional.of("job-123"))); + verify(operationsRepo, never()) + .updateBatch( + anyList(), eq(OperationStatus.SCHEDULING), eq(OperationStatus.PENDING), any(), any()); + + ArgumentCaptor> tableNames = ArgumentCaptor.forClass(List.class); + verify(jobsClient) + .launch( + anyString(), + eq(ORPHAN_FILES_DELETION.name()), + tableNames.capture(), + anyList(), + anyString()); + assertThat(tableNames.getValue()).containsExactly("db1.tbl1"); + } + + @Test + void schedule_jobLaunchFails_marksPendingForRetry() { + String uuid = UUID.randomUUID().toString(); + TableOperationsRow row = pendingRow(uuid, "db1", "tbl1"); + + stubFindPending(List.of(row)); + when(statsRepo.findAllById(any())).thenReturn(List.of(statsRow(uuid, 100L))); + when(operationsRepo.updateBatch( + anyList(), eq(OperationStatus.PENDING), eq(OperationStatus.SCHEDULING), any(), any())) + .thenReturn(1); + stubFindClaimed(List.of(schedulingRow(row))); + when(jobsClient.launch(anyString(), anyString(), anyList(), anyList(), anyString())) + .thenReturn(Optional.empty()); + when(operationsRepo.updateBatch( + anyList(), eq(OperationStatus.SCHEDULING), eq(OperationStatus.PENDING), any(), any())) + .thenReturn(1); + + runner.schedule(ORPHAN_FILES_DELETION); + + verify(operationsRepo) + .updateBatch( + eq(List.of(row.getId())), + eq(OperationStatus.SCHEDULING), + eq(OperationStatus.PENDING), + eq(Optional.empty()), + eq(Optional.empty())); + verify(operationsRepo, never()) + .updateBatch( + anyList(), eq(OperationStatus.SCHEDULING), eq(OperationStatus.SCHEDULED), any(), any()); + } + + @Test + void schedule_rowsAlreadyClaimed_skipsSubmit() { + String uuid = UUID.randomUUID().toString(); + TableOperationsRow row = pendingRow(uuid, "db1", "tbl1"); + + stubFindPending(List.of(row)); + when(statsRepo.findAllById(any())).thenReturn(List.of(statsRow(uuid, 100L))); + when(operationsRepo.updateBatch( + anyList(), eq(OperationStatus.PENDING), eq(OperationStatus.SCHEDULING), any(), any())) + .thenReturn(0); + stubFindClaimed(List.of()); + + runner.schedule(ORPHAN_FILES_DELETION); + + verify(jobsClient, never()).launch(anyString(), anyString(), anyList(), anyList(), anyString()); + verify(operationsRepo, never()) + .updateBatch( + anyList(), eq(OperationStatus.SCHEDULING), eq(OperationStatus.SCHEDULED), any(), any()); + verify(operationsRepo, never()) + .updateBatch( + anyList(), eq(OperationStatus.SCHEDULING), eq(OperationStatus.PENDING), any(), any()); + } + + @Test + void schedule_cancelsDuplicatePendingPerCycle() { + String uuid = UUID.randomUUID().toString(); + TableOperationsRow row1 = pendingRow(uuid, "db1", "tbl1"); + TableOperationsRow row2 = pendingRow(uuid, "db1", "tbl1"); + + stubFindPending(List.of(row1, row2)); + when(operationsRepo.cancel(anyList())).thenReturn(1); + when(statsRepo.findAllById(any())).thenReturn(List.of(statsRow(uuid, 100L))); + when(operationsRepo.updateBatch( + anyList(), eq(OperationStatus.PENDING), eq(OperationStatus.SCHEDULING), any(), any())) + .thenReturn(1); + TableOperationsRow survivor = row1.getCreatedAt().isBefore(row2.getCreatedAt()) ? row1 : row2; + if (row1.getCreatedAt().equals(row2.getCreatedAt())) { + survivor = row1.getId().compareTo(row2.getId()) <= 0 ? row1 : row2; + } + stubFindClaimed(List.of(schedulingRow(survivor))); + when(operationsRepo.updateBatch( + anyList(), eq(OperationStatus.SCHEDULING), eq(OperationStatus.SCHEDULED), any(), any())) + .thenReturn(1); + when(jobsClient.launch(anyString(), anyString(), anyList(), anyList(), anyString())) + .thenReturn(Optional.of("job-dedup")); + + runner.schedule(ORPHAN_FILES_DELETION); + + ArgumentCaptor> cancelled = ArgumentCaptor.forClass(List.class); + verify(operationsRepo).cancel(cancelled.capture()); + assertThat(cancelled.getValue()).hasSize(1); + } + + @Test + void schedule_partialClaim_launchesAndMarksOnlyClaimedSubset() { + String uuidA = UUID.randomUUID().toString(); + String uuidB = UUID.randomUUID().toString(); + TableOperationsRow rowA = pendingRow(uuidA, "db1", "tblA"); + TableOperationsRow rowB = pendingRow(uuidB, "db1", "tblB"); + + stubFindPending(List.of(rowA, rowB)); + when(statsRepo.findAllById(any())) + .thenReturn(List.of(statsRow(uuidA, 100L), statsRow(uuidB, 100L))); + when(operationsRepo.updateBatch( + anyList(), eq(OperationStatus.PENDING), eq(OperationStatus.SCHEDULING), any(), any())) + .thenReturn(1); + // Only A actually claimed. + stubFindClaimed(List.of(schedulingRow(rowA))); + when(operationsRepo.updateBatch( + anyList(), eq(OperationStatus.SCHEDULING), eq(OperationStatus.SCHEDULED), any(), any())) + .thenReturn(1); + when(jobsClient.launch(anyString(), anyString(), anyList(), anyList(), anyString())) + .thenReturn(Optional.of("job-partial")); + + runner.schedule(ORPHAN_FILES_DELETION); + + ArgumentCaptor> launchedTableNames = ArgumentCaptor.forClass(List.class); + ArgumentCaptor> launchedOpIds = ArgumentCaptor.forClass(List.class); + verify(jobsClient) + .launch( + anyString(), + anyString(), + launchedTableNames.capture(), + launchedOpIds.capture(), + anyString()); + assertThat(launchedTableNames.getValue()).containsExactly("db1.tblA"); + assertThat(launchedOpIds.getValue()).containsExactly(rowA.getId()); + + verify(operationsRepo) + .updateBatch( + eq(List.of(rowA.getId())), + eq(OperationStatus.SCHEDULING), + eq(OperationStatus.SCHEDULED), + eq(Optional.empty()), + eq(Optional.of("job-partial"))); + } + + @Test + void schedule_opsWithoutStats_skipped() { + String withStats = UUID.randomUUID().toString(); + String missing = UUID.randomUUID().toString(); + TableOperationsRow withStatsRow = pendingRow(withStats, "db1", "tblA"); + TableOperationsRow missingRow = pendingRow(missing, "db1", "tblB"); + + stubFindPending(List.of(withStatsRow, missingRow)); + when(statsRepo.findAllById(any())).thenReturn(List.of(statsRow(withStats, 50L))); + when(operationsRepo.updateBatch( + anyList(), eq(OperationStatus.PENDING), eq(OperationStatus.SCHEDULING), any(), any())) + .thenReturn(1); + stubFindClaimed(List.of(schedulingRow(withStatsRow))); + when(operationsRepo.updateBatch( + anyList(), eq(OperationStatus.SCHEDULING), eq(OperationStatus.SCHEDULED), any(), any())) + .thenReturn(1); + when(jobsClient.launch(anyString(), anyString(), anyList(), anyList(), anyString())) + .thenReturn(Optional.of("job-skip")); + + runner.schedule(ORPHAN_FILES_DELETION); + + ArgumentCaptor> ids = ArgumentCaptor.forClass(List.class); + verify(operationsRepo) + .updateBatch( + ids.capture(), + eq(OperationStatus.PENDING), + eq(OperationStatus.SCHEDULING), + any(), + any()); + assertThat(ids.getValue()).containsExactly(withStatsRow.getId()); + } +} diff --git a/services/optimizer/scheduler/src/test/resources/application-test.properties b/services/optimizer/scheduler/src/test/resources/application-test.properties new file mode 100644 index 000000000..57354728e --- /dev/null +++ b/services/optimizer/scheduler/src/test/resources/application-test.properties @@ -0,0 +1,11 @@ +spring.datasource.url=jdbc:h2:mem:schedulertestdb;DB_CLOSE_DELAY=-1;MODE=MySQL +spring.datasource.username=sa +spring.datasource.password= +spring.jpa.hibernate.ddl-auto=none +spring.sql.init.mode=always +spring.sql.init.schema-locations=classpath:db/optimizer-schema.sql +optimizer.scheduler.jobs.base-uri=http://localhost:9999 +optimizer.scheduler.ofd.max-files-per-bin=1000000 +optimizer.scheduler.ofd.max-tables-per-bin=50 +optimizer.scheduler.results-endpoint=http://localhost:8080/v1/optimizer/operations +optimizer.scheduler.cluster-id=test-cluster diff --git a/settings.gradle b/settings.gradle index 810ecd643..25a76bba0 100644 --- a/settings.gradle +++ b/settings.gradle @@ -45,6 +45,7 @@ include ':iceberg:openhouse:internalcatalog' include ':iceberg:azure' include ':libs:datalayout' +include ':libs:optimizer:binpack' include ':services:common' include ':services:housetables' @@ -52,6 +53,8 @@ include ':services:jobs' include ':services:optimizer' include ':services:optimizer:analyzer' include ':apps:optimizer:analyzerapp' +include ':services:optimizer:scheduler' +include ':apps:optimizer:schedulerapp' include ':services:tables' include ':tables-test-fixtures:tables-test-fixtures-iceberg-1.2' include ':tables-test-fixtures:tables-test-fixtures-iceberg-1.5'