diff --git a/apps/optimizer/schedulerapp/build.gradle b/apps/optimizer/schedulerapp/build.gradle
new file mode 100644
index 000000000..5de1a827d
--- /dev/null
+++ b/apps/optimizer/schedulerapp/build.gradle
@@ -0,0 +1,15 @@
+plugins {
+ id 'openhouse.springboot-ext-conventions'
+ id 'openhouse.maven-publish'
+ id 'org.springframework.boot' version '2.7.8'
+}
+
+// Deployable Spring Boot wrapper around the scheduler library. Holds SchedulerApplication (the
+// @SpringBootApplication entry point) and application.properties; the scheduling logic lives in
+// :services:optimizer:scheduler.
+dependencies {
+ implementation project(':services:optimizer:scheduler')
+ implementation 'org.springframework.boot:spring-boot-starter:2.7.8'
+ implementation 'org.springframework.boot:spring-boot-starter-data-jpa:2.7.8'
+ runtimeOnly 'mysql:mysql-connector-java:8.0.33'
+}
diff --git a/apps/optimizer/schedulerapp/src/main/java/com/linkedin/openhouse/optimizer/scheduler/SchedulerApplication.java b/apps/optimizer/schedulerapp/src/main/java/com/linkedin/openhouse/optimizer/scheduler/SchedulerApplication.java
new file mode 100644
index 000000000..b1f06e5d3
--- /dev/null
+++ b/apps/optimizer/schedulerapp/src/main/java/com/linkedin/openhouse/optimizer/scheduler/SchedulerApplication.java
@@ -0,0 +1,59 @@
+package com.linkedin.openhouse.optimizer.scheduler;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.boot.ExitCodeGenerator;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.autoconfigure.domain.EntityScan;
+import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
+
+/**
+ * Entry point for the Optimizer Scheduler application.
+ *
+ *
Spring Batch–style: implements {@link CommandLineRunner} so the work runs after context
+ * startup, and {@link ExitCodeGenerator} so the JVM exit code reflects batch outcome. {@code
+ * SpringApplication.exit(...)} closes the context (triggers {@code @PreDestroy} hooks, drains the
+ * JPA pool, etc.) so the k8s CronJob pod terminates cleanly with a status reflecting reality.
+ */
+@Slf4j
+@SpringBootApplication
+@EntityScan(basePackages = "com.linkedin.openhouse.optimizer.db")
+@EnableJpaRepositories(basePackages = "com.linkedin.openhouse.optimizer.repository")
+public class SchedulerApplication implements CommandLineRunner, ExitCodeGenerator {
+
+ private final SchedulerRunner runner;
+ private int exitCode = 0;
+
+ @Autowired
+ public SchedulerApplication(SchedulerRunner runner) {
+ this.runner = runner;
+ }
+
+ public static void main(String[] args) {
+ System.exit(SpringApplication.exit(SpringApplication.run(SchedulerApplication.class, args)));
+ }
+
+ /**
+ * Runs the scheduler once per registered operation type per process invocation. Any thrown
+ * exception is logged and surfaces as a non-zero exit code via {@link #getExitCode()} after the
+ * context is shut down cleanly.
+ */
+ @Override
+ public void run(String... args) {
+ try {
+ log.info("Scheduler starting; operation types: {}", runner.getRegisteredOperationTypes());
+ runner.getRegisteredOperationTypes().forEach(runner::schedule);
+ log.info("Scheduler completed successfully");
+ } catch (Exception e) {
+ log.error("Scheduler failed", e);
+ exitCode = 1;
+ }
+ }
+
+ @Override
+ public int getExitCode() {
+ return exitCode;
+ }
+}
diff --git a/apps/optimizer/schedulerapp/src/main/resources/application.properties b/apps/optimizer/schedulerapp/src/main/resources/application.properties
new file mode 100644
index 000000000..b43a66459
--- /dev/null
+++ b/apps/optimizer/schedulerapp/src/main/resources/application.properties
@@ -0,0 +1,14 @@
+spring.application.name=openhouse-optimizer-scheduler
+spring.main.web-application-type=none
+spring.main.banner-mode=off
+spring.datasource.url=${OPTIMIZER_DB_URL:jdbc:h2:mem:schedulerdb;DB_CLOSE_DELAY=-1;MODE=MySQL}
+spring.datasource.username=${OPTIMIZER_DB_USER:sa}
+spring.datasource.password=${OPTIMIZER_DB_PASSWORD:}
+spring.jpa.hibernate.ddl-auto=none
+optimizer.scheduler.jobs.base-uri=${JOBS_BASE_URI:http://localhost:8002}
+# Per-bin caps for ORPHAN_FILES_DELETION; 0 disables a dimension. File count is the OFD cost
+# driver — per-file list, manifest joins, and delete calls dominate, independent of file size.
+optimizer.scheduler.ofd.max-files-per-bin=${SCHEDULER_OFD_MAX_FILES_PER_BIN:1000000}
+optimizer.scheduler.ofd.max-tables-per-bin=${SCHEDULER_OFD_MAX_TABLES_PER_BIN:50}
+optimizer.scheduler.results-endpoint=${SCHEDULER_RESULTS_ENDPOINT:http://openhouse-optimizer:8080/v1/optimizer/operations}
+optimizer.scheduler.cluster-id=${SCHEDULER_CLUSTER_ID:LocalHadoopCluster}
diff --git a/libs/optimizer/binpack/build.gradle b/libs/optimizer/binpack/build.gradle
new file mode 100644
index 000000000..ace4c3f89
--- /dev/null
+++ b/libs/optimizer/binpack/build.gradle
@@ -0,0 +1,15 @@
+plugins {
+ id 'openhouse.java-conventions'
+ id 'openhouse.maven-publish'
+}
+
+dependencies {
+ // api: bin-packing types reference TableOperationDto / TableStatsDto, which consumers need on
+ // their compile classpath.
+ api project(':services:optimizer')
+ api 'org.slf4j:slf4j-api:1.7.36'
+}
+
+test {
+ useJUnitPlatform()
+}
diff --git a/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/Bin.java b/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/Bin.java
new file mode 100644
index 000000000..09a6d5939
--- /dev/null
+++ b/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/Bin.java
@@ -0,0 +1,20 @@
+package com.linkedin.openhouse.optimizer.binpack;
+
+import com.linkedin.openhouse.optimizer.model.OperationTypeDto;
+import java.util.List;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.ToString;
+
+/**
+ * One scheduling unit: the operation type the bin will run as, and the items the scheduler will
+ * claim, narrow to claimed, and launch a single Spark job for. Pure data — the scheduler reads from
+ * a bin to do the work; the bin does no IO itself.
+ */
+@AllArgsConstructor
+@Getter
+@ToString
+public class Bin {
+ private final OperationTypeDto operationType;
+ private final List items;
+}
diff --git a/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/BinItem.java b/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/BinItem.java
new file mode 100644
index 000000000..6736ea5dc
--- /dev/null
+++ b/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/BinItem.java
@@ -0,0 +1,24 @@
+package com.linkedin.openhouse.optimizer.binpack;
+
+import com.linkedin.openhouse.optimizer.model.TableOperationDto;
+import com.linkedin.openhouse.optimizer.model.TableStatsDto;
+
+/**
+ * One packable unit. Exposes the weight a packer keys on, plus the identity the scheduler reads
+ * when it launches a Spark job (fully-qualified table name, operation id).
+ *
+ * Implementations have a public no-arg constructor — instantiated transiently inside {@link
+ * FirstFitBinPacker#pack} via a {@code Supplier} (typically a {@code
+ * MyItem::new} method reference) — on which {@link #fromOpAndStats} is called to return the
+ * populated item. Getters on the empty instance are not meaningful; it exists for the lifetime of a
+ * single projection call.
+ */
+public interface BinItem {
+ long getWeight();
+
+ String getFullyQualifiedTableName();
+
+ String getOperationId();
+
+ BinItem fromOpAndStats(TableOperationDto op, TableStatsDto stats);
+}
diff --git a/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/BinPacker.java b/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/BinPacker.java
new file mode 100644
index 000000000..92df50f4e
--- /dev/null
+++ b/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/BinPacker.java
@@ -0,0 +1,18 @@
+package com.linkedin.openhouse.optimizer.binpack;
+
+import com.linkedin.openhouse.optimizer.model.TableOperationDto;
+import com.linkedin.openhouse.optimizer.model.TableStatsDto;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Per-operation-type strategy. Given a flat list of operations and the corresponding stats, returns
+ * one grouping per batch the scheduler should submit. The scheduler wraps each grouping into a
+ * {@link Bin} with the registered operation type. Implementations do no IO and hold no mutable
+ * state; the projection from {@code (op, stats)} to {@link BinItem} and the bucketing strategy both
+ * live in the implementation.
+ */
+public interface BinPacker {
+ List> pack(
+ List operations, Map statsByTableUuid);
+}
diff --git a/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/FirstFitBinPacker.java b/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/FirstFitBinPacker.java
new file mode 100644
index 000000000..9cf452709
--- /dev/null
+++ b/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/FirstFitBinPacker.java
@@ -0,0 +1,92 @@
+package com.linkedin.openhouse.optimizer.binpack;
+
+import com.linkedin.openhouse.optimizer.model.TableOperationDto;
+import com.linkedin.openhouse.optimizer.model.TableStatsDto;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * First-fit-decreasing packing, generic over the concrete {@link BinItem} subtype {@code T}.
+ * Construction takes a {@code Supplier} — typically a {@code MyItem::new} method reference —
+ * which the packer invokes per operation to get an empty instance, then calls {@link
+ * BinItem#fromOpAndStats} on it to project the (operation, stats) pair into a populated item.
+ *
+ * Sorts items by weight descending, then places each into the first group whose totals stay at
+ * or below {@code maxWeightPerBin} and {@code maxItemsPerBin}. An item whose weight exceeds the cap
+ * on its own goes into a group by itself. Operations whose {@code tableUuid} has no entry in {@code
+ * statsByTableUuid} are dropped.
+ *
+ *
Stateless: the constructor takes only the BinItem supplier and the cap configuration; {@link
+ * #pack} is a pure function over its arguments. The packer is operation-agnostic — the scheduler
+ * wraps each grouping into a {@link Bin} with the registered operation type.
+ */
+@Slf4j
+public class FirstFitBinPacker implements BinPacker {
+
+ private final Supplier binItemSupplier;
+ private final long maxWeightPerBin;
+ private final int maxItemsPerBin;
+
+ public FirstFitBinPacker(Supplier binItemSupplier, long maxWeightPerBin, int maxItemsPerBin) {
+ this.binItemSupplier = binItemSupplier;
+ this.maxWeightPerBin = maxWeightPerBin;
+ this.maxItemsPerBin = maxItemsPerBin;
+ }
+
+ @Override
+ public List> pack(
+ List operations, Map statsByTableUuid) {
+ List items =
+ operations.stream()
+ .filter(op -> statsByTableUuid.containsKey(op.getTableUuid()))
+ .map(
+ op ->
+ binItemSupplier
+ .get()
+ .fromOpAndStats(op, statsByTableUuid.get(op.getTableUuid())))
+ .collect(Collectors.toList());
+ List packingBins =
+ items.stream()
+ .sorted(Comparator.comparingLong(BinItem::getWeight).reversed())
+ .collect(ArrayList::new, this::placeItem, List::addAll);
+ log.info(
+ "Packed {} operations ({} items after projection) into {} groupings",
+ operations.size(),
+ items.size(),
+ packingBins.size());
+ return packingBins.stream().map(pb -> pb.items).collect(Collectors.toList());
+ }
+
+ private void placeItem(List bins, BinItem item) {
+ bins.stream()
+ .filter(b -> b.fits(item, maxWeightPerBin, maxItemsPerBin))
+ .findFirst()
+ .ifPresentOrElse(
+ b -> b.add(item),
+ () -> {
+ PackingBin fresh = new PackingBin();
+ fresh.add(item);
+ bins.add(fresh);
+ });
+ }
+
+ /** Running-totals helper used during the fold. */
+ private static class PackingBin {
+ final List items = new ArrayList<>();
+ long totalWeight;
+
+ boolean fits(BinItem item, long maxWeight, int maxItems) {
+ return items.size() < maxItems && totalWeight + item.getWeight() <= maxWeight;
+ }
+
+ void add(BinItem item) {
+ items.add(item);
+ totalWeight += item.getWeight();
+ }
+ }
+}
diff --git a/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/TotalFilesBinItem.java b/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/TotalFilesBinItem.java
new file mode 100644
index 000000000..70d63bb39
--- /dev/null
+++ b/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/TotalFilesBinItem.java
@@ -0,0 +1,49 @@
+package com.linkedin.openhouse.optimizer.binpack;
+
+import com.linkedin.openhouse.optimizer.model.TableOperationDto;
+import com.linkedin.openhouse.optimizer.model.TableStatsDto;
+import java.util.Optional;
+import lombok.Getter;
+import lombok.ToString;
+
+/**
+ * {@link BinItem} that weights by the table's current file count. Suitable for any operation whose
+ * Spark cost scales with file count — orphan files deletion, stats collection, etc. The
+ * implementation knows nothing about which operation type it is wired up to.
+ *
+ * Construction: callers pass {@code TotalFilesBinItem::new} as the {@code Supplier} to {@link
+ * FirstFitBinPacker}; the packer calls the supplier per operation to get an empty instance, then
+ * {@link #fromOpAndStats} on it to get a populated copy.
+ */
+@Getter
+@ToString
+public class TotalFilesBinItem implements BinItem {
+
+ private final String fullyQualifiedTableName;
+ private final String operationId;
+ private final long weight;
+
+ /** Empty constructor: call {@link #fromOpAndStats} on the result to get a populated instance. */
+ public TotalFilesBinItem() {
+ this("", "", 0L);
+ }
+
+ private TotalFilesBinItem(String fullyQualifiedTableName, String operationId, long weight) {
+ this.fullyQualifiedTableName = fullyQualifiedTableName;
+ this.operationId = operationId;
+ this.weight = weight;
+ }
+
+ @Override
+ public BinItem fromOpAndStats(TableOperationDto op, TableStatsDto stats) {
+ return new TotalFilesBinItem(
+ op.getDatabaseName() + "." + op.getTableName(), op.getId(), currentFileCount(stats));
+ }
+
+ private static long currentFileCount(TableStatsDto stats) {
+ return Optional.ofNullable(stats)
+ .map(TableStatsDto::getSnapshot)
+ .map(TableStatsDto.SnapshotMetrics::getNumCurrentFiles)
+ .orElse(0L);
+ }
+}
diff --git a/libs/optimizer/binpack/src/test/java/com/linkedin/openhouse/optimizer/binpack/FirstFitBinPackerTest.java b/libs/optimizer/binpack/src/test/java/com/linkedin/openhouse/optimizer/binpack/FirstFitBinPackerTest.java
new file mode 100644
index 000000000..602f41711
--- /dev/null
+++ b/libs/optimizer/binpack/src/test/java/com/linkedin/openhouse/optimizer/binpack/FirstFitBinPackerTest.java
@@ -0,0 +1,150 @@
+package com.linkedin.openhouse.optimizer.binpack;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.linkedin.openhouse.optimizer.model.TableOperationDto;
+import com.linkedin.openhouse.optimizer.model.TableStatsDto;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import lombok.Getter;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests the {@link FirstFitBinPacker} bucketing logic in isolation via a {@link TestItem} whose
+ * weight comes from a {@code "weight"} entry in {@code tableProperties}. The supplier-then-{@code
+ * fromOpAndStats} pattern is exercised end-to-end through the public {@code pack} entry point.
+ * Projection logic for production BinItems (e.g. {@link TotalFilesBinItem}) is covered by their own
+ * tests.
+ */
+class FirstFitBinPackerTest {
+
+ @Getter
+ static class TestItem implements BinItem {
+ private final String operationId;
+ private final long weight;
+
+ public TestItem() {
+ this("", 0L);
+ }
+
+ private TestItem(String operationId, long weight) {
+ this.operationId = operationId;
+ this.weight = weight;
+ }
+
+ @Override
+ public String getFullyQualifiedTableName() {
+ return "db.tbl_" + operationId;
+ }
+
+ @Override
+ public BinItem fromOpAndStats(TableOperationDto op, TableStatsDto stats) {
+ long w = Long.parseLong(stats.getTableProperties().get("weight"));
+ return new TestItem(op.getId(), w);
+ }
+ }
+
+ private static TableOperationDto op(String id) {
+ return TableOperationDto.builder().id(id).tableUuid(id).build();
+ }
+
+ private static TableStatsDto statsWithWeight(String uuid, long weight) {
+ return TableStatsDto.builder()
+ .tableUuid(uuid)
+ .tableProperties(Map.of("weight", Long.toString(weight)))
+ .build();
+ }
+
+ private static List opsList(String... ids) {
+ return java.util.Arrays.stream(ids).map(FirstFitBinPackerTest::op).collect(Collectors.toList());
+ }
+
+ private static Map statsMap(Object... uuidWeightPairs) {
+ Map map = new HashMap<>();
+ for (int i = 0; i < uuidWeightPairs.length; i += 2) {
+ String uuid = (String) uuidWeightPairs[i];
+ long weight = (long) uuidWeightPairs[i + 1];
+ map.put(uuid, statsWithWeight(uuid, weight));
+ }
+ return map;
+ }
+
+ private static FirstFitBinPacker packer(long maxWeight, int maxItems) {
+ return new FirstFitBinPacker<>(TestItem::new, maxWeight, maxItems);
+ }
+
+ @Test
+ void emptyInput_returnsEmptyGroupings() {
+ assertThat(packer(100L, 10).pack(List.of(), Map.of())).isEmpty();
+ }
+
+ @Test
+ void singleItem_oneGrouping() {
+ List> groupings = packer(1_000_000L, 10).pack(opsList("a"), statsMap("a", 100L));
+ assertThat(groupings).hasSize(1);
+ assertThat(groupings.get(0)).hasSize(1);
+ }
+
+ @Test
+ void underWeightLimit_oneGrouping() {
+ List> groupings =
+ packer(1_000_000L, 10)
+ .pack(opsList("a", "b", "c"), statsMap("a", 300_000L, "b", 300_000L, "c", 300_000L));
+ assertThat(groupings).hasSize(1);
+ assertThat(groupings.get(0)).hasSize(3);
+ }
+
+ @Test
+ void overWeightLimit_twoGroupings() {
+ List> groupings =
+ packer(1_000_000L, 10)
+ .pack(opsList("a", "b", "c"), statsMap("a", 600_000L, "b", 600_000L, "c", 400_000L));
+ assertThat(groupings).hasSize(2);
+ // FFD: sort desc → 600, 600, 400. Place 600 → bin0; next 600 doesn't fit bin0, → bin1; 400
+ // fits bin0 (total 1_000_000).
+ long b0 = groupings.get(0).stream().mapToLong(BinItem::getWeight).sum();
+ long b1 = groupings.get(1).stream().mapToLong(BinItem::getWeight).sum();
+ assertThat(b0).isEqualTo(1_000_000L);
+ assertThat(b1).isEqualTo(600_000L);
+ }
+
+ @Test
+ void itemLargerThanCap_getsOwnGrouping() {
+ List> groupings =
+ packer(1_000L, 10).pack(opsList("big"), statsMap("big", 5_000L));
+ assertThat(groupings).hasSize(1);
+ assertThat(groupings.get(0)).hasSize(1);
+ }
+
+ @Test
+ void sortedDescending_largestFirst() {
+ List> groupings =
+ packer(2_000_000L, 10)
+ .pack(opsList("small", "large"), statsMap("small", 100L, "large", 900_000L));
+ assertThat(groupings).hasSize(1);
+ List ids =
+ groupings.get(0).stream().map(BinItem::getOperationId).collect(Collectors.toList());
+ assertThat(ids).containsExactly("large", "small");
+ }
+
+ @Test
+ void maxItemsCap_splitsGroupings() {
+ List> groupings =
+ packer(1_000_000L, 2)
+ .pack(opsList("a", "b", "c", "d"), statsMap("a", 1L, "b", 1L, "c", 1L, "d", 1L));
+ assertThat(groupings).hasSize(2);
+ assertThat(groupings.get(0)).hasSize(2);
+ assertThat(groupings.get(1)).hasSize(2);
+ }
+
+ @Test
+ void operationsWithoutStats_dropped() {
+ List> groupings =
+ packer(1_000_000L, 10).pack(opsList("a", "missing"), statsMap("a", 100L));
+ assertThat(groupings).hasSize(1);
+ assertThat(groupings.get(0)).hasSize(1);
+ assertThat(groupings.get(0).get(0).getOperationId()).isEqualTo("a");
+ }
+}
diff --git a/libs/optimizer/binpack/src/test/java/com/linkedin/openhouse/optimizer/binpack/TotalFilesBinItemTest.java b/libs/optimizer/binpack/src/test/java/com/linkedin/openhouse/optimizer/binpack/TotalFilesBinItemTest.java
new file mode 100644
index 000000000..dcf081ad7
--- /dev/null
+++ b/libs/optimizer/binpack/src/test/java/com/linkedin/openhouse/optimizer/binpack/TotalFilesBinItemTest.java
@@ -0,0 +1,70 @@
+package com.linkedin.openhouse.optimizer.binpack;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.linkedin.openhouse.optimizer.model.OperationTypeDto;
+import com.linkedin.openhouse.optimizer.model.TableOperationDto;
+import com.linkedin.openhouse.optimizer.model.TableStatsDto;
+import java.util.UUID;
+import org.junit.jupiter.api.Test;
+
+class TotalFilesBinItemTest {
+
+ private static TableOperationDto op() {
+ return TableOperationDto.builder()
+ .id(UUID.randomUUID().toString())
+ .tableUuid(UUID.randomUUID().toString())
+ .databaseName("db1")
+ .tableName("tbl1")
+ .operationType(OperationTypeDto.ORPHAN_FILES_DELETION)
+ .build();
+ }
+
+ private static TableStatsDto statsWithFiles(Long fileCount) {
+ return TableStatsDto.builder()
+ .snapshot(TableStatsDto.SnapshotMetrics.builder().numCurrentFiles(fileCount).build())
+ .build();
+ }
+
+ @Test
+ void fromOpAndStats_buildsFullyQualifiedNameAndOperationId() {
+ TableOperationDto op = op();
+ BinItem item = new TotalFilesBinItem().fromOpAndStats(op, statsWithFiles(42L));
+
+ assertThat(item.getFullyQualifiedTableName()).isEqualTo("db1.tbl1");
+ assertThat(item.getOperationId()).isEqualTo(op.getId());
+ }
+
+ @Test
+ void fromOpAndStats_weightIsCurrentFileCount() {
+ BinItem item = new TotalFilesBinItem().fromOpAndStats(op(), statsWithFiles(123_456L));
+ assertThat(item.getWeight()).isEqualTo(123_456L);
+ }
+
+ @Test
+ void fromOpAndStats_nullStats_weightIsZero() {
+ BinItem item = new TotalFilesBinItem().fromOpAndStats(op(), null);
+ assertThat(item.getWeight()).isEqualTo(0L);
+ }
+
+ @Test
+ void fromOpAndStats_nullSnapshot_weightIsZero() {
+ BinItem item = new TotalFilesBinItem().fromOpAndStats(op(), TableStatsDto.builder().build());
+ assertThat(item.getWeight()).isEqualTo(0L);
+ }
+
+ @Test
+ void fromOpAndStats_nullFileCount_weightIsZero() {
+ BinItem item = new TotalFilesBinItem().fromOpAndStats(op(), statsWithFiles(null));
+ assertThat(item.getWeight()).isEqualTo(0L);
+ }
+
+ @Test
+ void emptyInstance_doesNotShareStateWithPopulated() {
+ TotalFilesBinItem empty = new TotalFilesBinItem();
+ BinItem populated = empty.fromOpAndStats(op(), statsWithFiles(7L));
+
+ assertThat(empty.getWeight()).isEqualTo(0L);
+ assertThat(populated.getWeight()).isEqualTo(7L);
+ }
+}
diff --git a/services/optimizer/scheduler/build.gradle b/services/optimizer/scheduler/build.gradle
new file mode 100644
index 000000000..039e2e53e
--- /dev/null
+++ b/services/optimizer/scheduler/build.gradle
@@ -0,0 +1,36 @@
+plugins {
+ id 'openhouse.springboot-ext-conventions'
+ id 'openhouse.maven-publish'
+ id 'org.springframework.boot' version '2.7.8'
+}
+
+// Library jar — the @SpringBootApplication entry point lives in :apps:optimizer:schedulerapp.
+// Disable bootJar so we don't try to assemble a runnable jar from a library that has no main
+// class; keep jar enabled so consumers (the apps wrapper) get a normal library artifact.
+bootJar {
+ enabled = false
+}
+
+jar {
+ enabled = true
+ archiveClassifier = ''
+}
+
+dependencies {
+ // api: the scheduler's public types (e.g. Bin, BinPacker, OperationTypeDto) come from
+ // :services:optimizer and :libs:optimizer:binpack, so consumers of this library see them on
+ // their compile classpath.
+ api project(':services:optimizer')
+ api project(':libs:optimizer:binpack')
+ implementation 'org.springframework.boot:spring-boot-starter:2.7.8'
+ implementation 'org.springframework.boot:spring-boot-starter-webflux:2.7.8'
+ implementation 'org.springframework.boot:spring-boot-starter-data-jpa:2.7.8'
+ implementation 'org.springframework.boot:spring-boot-starter-aop:2.7.8'
+ runtimeOnly 'mysql:mysql-connector-java:8.0.33'
+ testImplementation 'org.springframework.boot:spring-boot-starter-test:2.7.8'
+ testRuntimeOnly 'com.h2database:h2'
+}
+
+test {
+ useJUnitPlatform()
+}
diff --git a/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/SchedulerRunner.java b/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/SchedulerRunner.java
new file mode 100644
index 000000000..eade78242
--- /dev/null
+++ b/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/SchedulerRunner.java
@@ -0,0 +1,251 @@
+package com.linkedin.openhouse.optimizer.scheduler;
+
+import com.linkedin.openhouse.optimizer.binpack.Bin;
+import com.linkedin.openhouse.optimizer.binpack.BinItem;
+import com.linkedin.openhouse.optimizer.binpack.BinPacker;
+import com.linkedin.openhouse.optimizer.db.OperationStatus;
+import com.linkedin.openhouse.optimizer.db.TableOperationsRow;
+import com.linkedin.openhouse.optimizer.db.TableStatsRow;
+import com.linkedin.openhouse.optimizer.model.OperationTypeDto;
+import com.linkedin.openhouse.optimizer.model.TableOperationDto;
+import com.linkedin.openhouse.optimizer.model.TableStatsDto;
+import com.linkedin.openhouse.optimizer.repository.TableOperationsRepository;
+import com.linkedin.openhouse.optimizer.repository.TableStatsRepository;
+import com.linkedin.openhouse.optimizer.scheduler.client.JobsServiceClient;
+import java.time.Instant;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.data.domain.Pageable;
+import org.springframework.transaction.annotation.Transactional;
+
+/**
+ * Generic scheduler. Operation types are registered at construction via {@link #registerOperation},
+ * which returns a new instance with the additional entry — the registry is immutable, so the bean
+ * Spring publishes is the fully-registered runner produced in {@link
+ * com.linkedin.openhouse.optimizer.scheduler.config.SchedulerConfig}. For each registered type the
+ * runner:
+ *
+ *
+ * - Reads PENDING rows from MySQL.
+ *
- For each {@code tableUuid}, picks the oldest PENDING row to schedule and the rest to cancel
+ * — both lists derived independently from the same grouping.
+ *
- Cancels the duplicate rows; loads stats for the rows to schedule.
+ *
- Hands the (operations, stats) pair to the {@link BinPacker} and receives one grouping per
+ * batch.
+ *
- Wraps each grouping into a {@link Bin} tagged with the operation type and schedules it
+ * (claim CAS, narrow to claimed, launch, record).
+ *
+ *
+ * The runner is operation-agnostic. All IO and the claim/launch/mark lifecycle live here; the
+ * only per-operation knowledge in the module is the {@link BinPacker} the caller registers.
+ */
+@Slf4j
+public class SchedulerRunner {
+
+ private final TableOperationsRepository operationsRepo;
+ private final TableStatsRepository statsRepo;
+ private final JobsServiceClient jobsClient;
+ private final String resultsEndpoint;
+ private final Map registry;
+
+ public SchedulerRunner(
+ TableOperationsRepository operationsRepo,
+ TableStatsRepository statsRepo,
+ JobsServiceClient jobsClient,
+ String resultsEndpoint) {
+ this(operationsRepo, statsRepo, jobsClient, resultsEndpoint, Map.of());
+ }
+
+ private SchedulerRunner(
+ TableOperationsRepository operationsRepo,
+ TableStatsRepository statsRepo,
+ JobsServiceClient jobsClient,
+ String resultsEndpoint,
+ Map registry) {
+ this.operationsRepo = operationsRepo;
+ this.statsRepo = statsRepo;
+ this.jobsClient = jobsClient;
+ this.resultsEndpoint = resultsEndpoint;
+ this.registry = registry;
+ }
+
+ /**
+ * Return a new {@link SchedulerRunner} whose registry is this one's plus {@code (type, packer)}.
+ * If {@code type} was already registered, the new entry replaces the prior one. Pure: the
+ * receiver is unchanged.
+ */
+ public SchedulerRunner registerOperation(OperationTypeDto type, BinPacker packer) {
+ HashMap next = new HashMap<>(registry);
+ next.put(type, packer);
+ return new SchedulerRunner(
+ operationsRepo, statsRepo, jobsClient, resultsEndpoint, Map.copyOf(next));
+ }
+
+ public Set getRegisteredOperationTypes() {
+ return registry.keySet();
+ }
+
+ public void schedule(OperationTypeDto type) {
+ schedule(type, Optional.empty(), Optional.empty());
+ }
+
+ public void schedule(
+ OperationTypeDto type, Optional databaseName, Optional tableName) {
+ Comparator oldestFirst =
+ Comparator.comparing(TableOperationsRow::getCreatedAt)
+ .thenComparing(TableOperationsRow::getId);
+
+ // Per tableUuid, the oldest row (lex-tiebreak on id) is the one we schedule; any others are
+ // duplicates we cancel. Both lists are derived independently from the same grouping —
+ // scheduling does not wait on cancellation and is not predicated on its outcome.
+ Map> byTableUuid =
+ operationsRepo
+ .find(
+ Optional.of(type.toDb()),
+ Optional.of(OperationStatus.PENDING),
+ Optional.empty(),
+ databaseName,
+ tableName,
+ Optional.empty(),
+ Optional.empty(),
+ Pageable.unpaged())
+ .stream()
+ .collect(Collectors.groupingBy(TableOperationsRow::getTableUuid));
+
+ // Cancel the duplicate-per-tableUuid rows as the terminal side effect of the dedup pipeline.
+ // The collectingAndThen finisher short-circuits the IN () clause that Hibernate will not run.
+ byTableUuid.values().stream()
+ .filter(rows -> rows.size() > 1)
+ .flatMap(rows -> rows.stream().sorted(oldestFirst).skip(1))
+ .map(TableOperationsRow::getId)
+ .collect(
+ Collectors.collectingAndThen(
+ Collectors.toList(),
+ ids -> {
+ if (!ids.isEmpty()) {
+ operationsRepo.cancel(ids);
+ }
+ return null;
+ }));
+ // Read stats, bin pack, and schedule the operations per bin.
+ Optional.ofNullable(registry.get(type))
+ .ifPresent(
+ packer ->
+ packer
+ .pack(
+ byTableUuid.values().stream()
+ .map(rows -> rows.stream().min(oldestFirst).orElseThrow())
+ .map(TableOperationDto::fromRow)
+ .collect(Collectors.toList()),
+ statsRepo.findAllById(byTableUuid.keySet()).stream()
+ .collect(
+ Collectors.toMap(
+ TableStatsRow::getTableUuid, TableStatsDto::fromRow)))
+ .stream()
+ .map(grouping -> new Bin(type, grouping))
+ .forEach(this::scheduleBin));
+ }
+
+ /**
+ * Claim the bin's operations, narrow to the rows actually owned, launch one batched Spark job for
+ * the claimed subset, and mark SCHEDULED — or revert to PENDING if launch failed.
+ */
+ @Transactional
+ void scheduleBin(Bin bin) {
+ Instant claimedAt = Instant.now();
+ List claimedItems = claim(bin, claimedAt);
+
+ if (claimedItems.isEmpty()) {
+ log.info("All rows in bin already claimed by another scheduler instance; skipping");
+ return;
+ }
+ if (claimedItems.size() < bin.getItems().size()) {
+ log.info(
+ "Partial claim: {} of {} ops in bin claimed; launching job for claimed subset only",
+ claimedItems.size(),
+ bin.getItems().size());
+ }
+
+ List claimedIds =
+ claimedItems.stream().map(BinItem::getOperationId).collect(Collectors.toList());
+
+ jobsClient
+ .launch(
+ String.format(
+ "batched-%s-%d",
+ bin.getOperationType().name().toLowerCase(), claimedAt.toEpochMilli()),
+ bin.getOperationType().name(),
+ claimedItems.stream()
+ .map(BinItem::getFullyQualifiedTableName)
+ .collect(Collectors.toList()),
+ claimedIds,
+ resultsEndpoint)
+ .ifPresentOrElse(
+ jobId -> {
+ int updated =
+ operationsRepo.updateBatch(
+ claimedIds,
+ OperationStatus.SCHEDULING,
+ OperationStatus.SCHEDULED,
+ Optional.empty(),
+ Optional.of(jobId));
+ log.info(
+ "Submitted job {} for {} tables ({} rows marked SCHEDULED)",
+ jobId,
+ claimedItems.size(),
+ updated);
+ },
+ () -> {
+ int reverted =
+ operationsRepo.updateBatch(
+ claimedIds,
+ OperationStatus.SCHEDULING,
+ OperationStatus.PENDING,
+ Optional.empty(),
+ Optional.empty());
+ log.warn(
+ "Job submission failed; reverted {} claimed rows back to PENDING for retry on the"
+ + " next pass",
+ reverted);
+ });
+ }
+
+ /**
+ * CAS-claim every item in the bin from PENDING to SCHEDULING with {@code claimedAt} as the
+ * watermark, then narrow {@link Bin#getItems()} to those rows this caller actually owns. Items
+ * lost to a racing scheduler are dropped. Returns the {@link BinItem}s ready for launch.
+ */
+ private List claim(Bin bin, Instant claimedAt) {
+ List ids =
+ bin.getItems().stream().map(BinItem::getOperationId).collect(Collectors.toList());
+ operationsRepo.updateBatch(
+ ids,
+ OperationStatus.PENDING,
+ OperationStatus.SCHEDULING,
+ Optional.of(claimedAt),
+ Optional.empty());
+ Set claimedIds =
+ operationsRepo
+ .find(
+ Optional.empty(),
+ Optional.of(OperationStatus.SCHEDULING),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.of(claimedAt),
+ Optional.of(ids),
+ Pageable.unpaged())
+ .stream()
+ .map(TableOperationsRow::getId)
+ .collect(Collectors.toSet());
+ return bin.getItems().stream()
+ .filter(item -> claimedIds.contains(item.getOperationId()))
+ .collect(Collectors.toList());
+ }
+}
diff --git a/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/client/JobsServiceClient.java b/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/client/JobsServiceClient.java
new file mode 100644
index 000000000..ee8fa38ee
--- /dev/null
+++ b/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/client/JobsServiceClient.java
@@ -0,0 +1,80 @@
+package com.linkedin.openhouse.optimizer.scheduler.client;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import java.time.Duration;
+import java.util.List;
+import java.util.Optional;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.web.reactive.function.client.WebClient;
+
+/**
+ * Client for the OpenHouse Jobs Service.
+ *
+ * Submits one {@code BatchedOrphanFilesDeletionSparkApp} job per bin via {@code POST /jobs}.
+ */
+@Slf4j
+public class JobsServiceClient {
+
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+ private static final Duration TIMEOUT = Duration.ofSeconds(30);
+
+ private final WebClient webClient;
+ private final String clusterId;
+
+ public JobsServiceClient(WebClient webClient, String clusterId) {
+ this.webClient = webClient;
+ this.clusterId = clusterId;
+ }
+
+ /**
+ * Submit a batched Spark job for the given tables.
+ *
+ * @param jobName human-readable name for the job
+ * @param jobType operation type string (e.g. "ORPHAN_FILES_DELETION")
+ * @param tableNames fully-qualified table names (db.table)
+ * @param operationIds operation UUIDs — parallel to tableNames
+ * @param resultsEndpoint base URL the Spark app PATCHes results back to
+ * @return job ID if the submission succeeded, empty if an error occurred
+ */
+ public Optional launch(
+ String jobName,
+ String jobType,
+ List tableNames,
+ List operationIds,
+ String resultsEndpoint) {
+ try {
+ ObjectNode body = MAPPER.createObjectNode();
+ body.put("jobName", jobName);
+ body.put("clusterId", clusterId);
+
+ ObjectNode jobConf = body.putObject("jobConf");
+ jobConf.put("jobType", jobType);
+
+ ArrayNode args = jobConf.putArray("args");
+ args.add("--tableNames");
+ args.add(String.join(",", tableNames));
+ args.add("--operationIds");
+ args.add(String.join(",", operationIds));
+ args.add("--resultsEndpoint");
+ args.add(resultsEndpoint);
+
+ String responseBody =
+ webClient
+ .post()
+ .uri("/jobs")
+ .bodyValue(body)
+ .retrieve()
+ .bodyToMono(String.class)
+ .timeout(TIMEOUT)
+ .block();
+
+ String jobId = MAPPER.readTree(responseBody).path("jobId").asText(null);
+ return Optional.ofNullable(jobId);
+ } catch (Exception e) {
+ log.error("Failed to submit job '{}': {}", jobName, e.getMessage());
+ return Optional.empty();
+ }
+ }
+}
diff --git a/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/config/SchedulerConfig.java b/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/config/SchedulerConfig.java
new file mode 100644
index 000000000..82af33275
--- /dev/null
+++ b/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/config/SchedulerConfig.java
@@ -0,0 +1,57 @@
+package com.linkedin.openhouse.optimizer.scheduler.config;
+
+import com.linkedin.openhouse.optimizer.binpack.FirstFitBinPacker;
+import com.linkedin.openhouse.optimizer.binpack.TotalFilesBinItem;
+import com.linkedin.openhouse.optimizer.model.OperationTypeDto;
+import com.linkedin.openhouse.optimizer.repository.TableOperationsRepository;
+import com.linkedin.openhouse.optimizer.repository.TableStatsRepository;
+import com.linkedin.openhouse.optimizer.scheduler.SchedulerRunner;
+import com.linkedin.openhouse.optimizer.scheduler.client.JobsServiceClient;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.web.reactive.function.client.WebClient;
+
+/**
+ * Cross-cutting wiring (jobs-service client) plus the {@link SchedulerRunner} bean. Each operation
+ * type's identity (type, packing strategy, item supplier) is composed in {@link #schedulerRunner};
+ * the runner itself never names an operation type beyond the keys in its registry.
+ */
+@Configuration
+public class SchedulerConfig {
+
+ @Value("${optimizer.scheduler.jobs.base-uri}")
+ private String jobsBaseUri;
+
+ @Value("${optimizer.scheduler.cluster-id}")
+ private String clusterId;
+
+ @Bean
+ public WebClient jobsWebClient() {
+ return WebClient.builder().baseUrl(jobsBaseUri).build();
+ }
+
+ @Bean
+ public JobsServiceClient jobsServiceClient(WebClient jobsWebClient) {
+ return new JobsServiceClient(jobsWebClient, clusterId);
+ }
+
+ /**
+ * Orphan files deletion: a {@link FirstFitBinPacker} over {@link TotalFilesBinItem}. Cost scales
+ * with file count — per-file list, manifest joins, and delete calls dominate independent of file
+ * size.
+ */
+ @Bean
+ public SchedulerRunner schedulerRunner(
+ TableOperationsRepository operationsRepo,
+ TableStatsRepository statsRepo,
+ JobsServiceClient jobsClient,
+ @Value("${optimizer.scheduler.results-endpoint}") String resultsEndpoint,
+ @Value("${optimizer.scheduler.ofd.max-files-per-bin}") long ofdMaxFilesPerBin,
+ @Value("${optimizer.scheduler.ofd.max-tables-per-bin}") int ofdMaxTablesPerBin) {
+ return new SchedulerRunner(operationsRepo, statsRepo, jobsClient, resultsEndpoint)
+ .registerOperation(
+ OperationTypeDto.ORPHAN_FILES_DELETION,
+ new FirstFitBinPacker<>(TotalFilesBinItem::new, ofdMaxFilesPerBin, ofdMaxTablesPerBin));
+ }
+}
diff --git a/services/optimizer/scheduler/src/test/java/com/linkedin/openhouse/optimizer/scheduler/SchedulerRunnerTest.java b/services/optimizer/scheduler/src/test/java/com/linkedin/openhouse/optimizer/scheduler/SchedulerRunnerTest.java
new file mode 100644
index 000000000..92df3287d
--- /dev/null
+++ b/services/optimizer/scheduler/src/test/java/com/linkedin/openhouse/optimizer/scheduler/SchedulerRunnerTest.java
@@ -0,0 +1,344 @@
+package com.linkedin.openhouse.optimizer.scheduler;
+
+import static com.linkedin.openhouse.optimizer.model.OperationTypeDto.ORPHAN_FILES_DELETION;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.linkedin.openhouse.optimizer.binpack.FirstFitBinPacker;
+import com.linkedin.openhouse.optimizer.binpack.TotalFilesBinItem;
+import com.linkedin.openhouse.optimizer.db.OperationStatus;
+import com.linkedin.openhouse.optimizer.db.SnapshotMetrics;
+import com.linkedin.openhouse.optimizer.db.TableOperationsRow;
+import com.linkedin.openhouse.optimizer.db.TableStatsRow;
+import com.linkedin.openhouse.optimizer.repository.TableOperationsRepository;
+import com.linkedin.openhouse.optimizer.repository.TableStatsRepository;
+import com.linkedin.openhouse.optimizer.scheduler.client.JobsServiceClient;
+import java.time.Instant;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class SchedulerRunnerTest {
+
+ private static final String RESULTS_ENDPOINT = "http://localhost:8080/v1/optimizer/operations";
+
+ @Mock private TableOperationsRepository operationsRepo;
+ @Mock private TableStatsRepository statsRepo;
+ @Mock private JobsServiceClient jobsClient;
+
+ private SchedulerRunner runner;
+
+ @BeforeEach
+ void setUp() {
+ // A real packer — the runner exercises the full pipeline against actual bucketing and the
+ // packer's projection logic, while the IO is mocked.
+ runner =
+ new SchedulerRunner(operationsRepo, statsRepo, jobsClient, RESULTS_ENDPOINT)
+ .registerOperation(
+ ORPHAN_FILES_DELETION,
+ new FirstFitBinPacker<>(TotalFilesBinItem::new, 1_000_000L, 50));
+ }
+
+ // ---- Stubbing helpers ----
+
+ private void stubFindPending(List rows) {
+ when(operationsRepo.find(
+ eq(Optional.of(ORPHAN_FILES_DELETION.toDb())),
+ eq(Optional.of(OperationStatus.PENDING)),
+ eq(Optional.empty()),
+ eq(Optional.empty()),
+ eq(Optional.empty()),
+ eq(Optional.empty()),
+ eq(Optional.empty()),
+ any()))
+ .thenReturn(rows);
+ }
+
+ private void stubFindClaimed(List rows) {
+ when(operationsRepo.find(
+ eq(Optional.empty()),
+ eq(Optional.of(OperationStatus.SCHEDULING)),
+ eq(Optional.empty()),
+ eq(Optional.empty()),
+ eq(Optional.empty()),
+ any(),
+ any(),
+ any()))
+ .thenReturn(rows);
+ }
+
+ private TableOperationsRow pendingRow(String uuid, String db, String table) {
+ return TableOperationsRow.builder()
+ .id(UUID.randomUUID().toString())
+ .tableUuid(uuid)
+ .databaseName(db)
+ .tableName(table)
+ .operationType(ORPHAN_FILES_DELETION.toDb())
+ .status(OperationStatus.PENDING)
+ .createdAt(Instant.now())
+ .build();
+ }
+
+ private TableOperationsRow schedulingRow(TableOperationsRow source) {
+ return source.toBuilder().status(OperationStatus.SCHEDULING).build();
+ }
+
+ private TableStatsRow statsRow(String uuid, long numCurrentFiles) {
+ return TableStatsRow.builder()
+ .tableUuid(uuid)
+ .snapshot(SnapshotMetrics.builder().numCurrentFiles(numCurrentFiles).build())
+ .build();
+ }
+
+ // ---- Tests ----
+
+ @Test
+ void schedule_unknownOperationType_doesNotLaunch() {
+ SchedulerRunner empty =
+ new SchedulerRunner(operationsRepo, statsRepo, jobsClient, RESULTS_ENDPOINT);
+ stubFindPending(List.of(pendingRow(UUID.randomUUID().toString(), "db1", "tbl1")));
+
+ empty.schedule(ORPHAN_FILES_DELETION);
+
+ verify(jobsClient, never()).launch(anyString(), anyString(), anyList(), anyList(), anyString());
+ }
+
+ @Test
+ void getRegisteredOperationTypes_returnsRegisteredSet() {
+ assertThat(runner.getRegisteredOperationTypes()).containsExactly(ORPHAN_FILES_DELETION);
+ }
+
+ @Test
+ void schedule_noPendingOps_noJobSubmitted() {
+ stubFindPending(List.of());
+
+ runner.schedule(ORPHAN_FILES_DELETION);
+
+ verify(jobsClient, never()).launch(anyString(), anyString(), anyList(), anyList(), anyString());
+ }
+
+ @Test
+ void schedule_allOpsWithoutStats_noJobSubmitted() {
+ TableOperationsRow row = pendingRow(UUID.randomUUID().toString(), "db1", "tbl1");
+ stubFindPending(List.of(row));
+ when(statsRepo.findAllById(any())).thenReturn(List.of());
+
+ runner.schedule(ORPHAN_FILES_DELETION);
+
+ verify(jobsClient, never()).launch(anyString(), anyString(), anyList(), anyList(), anyString());
+ }
+
+ @Test
+ void schedule_singleBin_claimsAndMarksScheduled() {
+ String uuid = UUID.randomUUID().toString();
+ TableOperationsRow row = pendingRow(uuid, "db1", "tbl1");
+
+ stubFindPending(List.of(row));
+ when(statsRepo.findAllById(any())).thenReturn(List.of(statsRow(uuid, 100_000L)));
+ when(operationsRepo.updateBatch(
+ anyList(), eq(OperationStatus.PENDING), eq(OperationStatus.SCHEDULING), any(), any()))
+ .thenReturn(1);
+ stubFindClaimed(List.of(schedulingRow(row)));
+ when(operationsRepo.updateBatch(
+ anyList(), eq(OperationStatus.SCHEDULING), eq(OperationStatus.SCHEDULED), any(), any()))
+ .thenReturn(1);
+ when(jobsClient.launch(anyString(), anyString(), anyList(), anyList(), anyString()))
+ .thenReturn(Optional.of("job-123"));
+
+ runner.schedule(ORPHAN_FILES_DELETION);
+
+ verify(operationsRepo)
+ .updateBatch(
+ eq(List.of(row.getId())),
+ eq(OperationStatus.SCHEDULING),
+ eq(OperationStatus.SCHEDULED),
+ eq(Optional.empty()),
+ eq(Optional.of("job-123")));
+ verify(operationsRepo, never())
+ .updateBatch(
+ anyList(), eq(OperationStatus.SCHEDULING), eq(OperationStatus.PENDING), any(), any());
+
+ ArgumentCaptor> tableNames = ArgumentCaptor.forClass(List.class);
+ verify(jobsClient)
+ .launch(
+ anyString(),
+ eq(ORPHAN_FILES_DELETION.name()),
+ tableNames.capture(),
+ anyList(),
+ anyString());
+ assertThat(tableNames.getValue()).containsExactly("db1.tbl1");
+ }
+
+ @Test
+ void schedule_jobLaunchFails_marksPendingForRetry() {
+ String uuid = UUID.randomUUID().toString();
+ TableOperationsRow row = pendingRow(uuid, "db1", "tbl1");
+
+ stubFindPending(List.of(row));
+ when(statsRepo.findAllById(any())).thenReturn(List.of(statsRow(uuid, 100L)));
+ when(operationsRepo.updateBatch(
+ anyList(), eq(OperationStatus.PENDING), eq(OperationStatus.SCHEDULING), any(), any()))
+ .thenReturn(1);
+ stubFindClaimed(List.of(schedulingRow(row)));
+ when(jobsClient.launch(anyString(), anyString(), anyList(), anyList(), anyString()))
+ .thenReturn(Optional.empty());
+ when(operationsRepo.updateBatch(
+ anyList(), eq(OperationStatus.SCHEDULING), eq(OperationStatus.PENDING), any(), any()))
+ .thenReturn(1);
+
+ runner.schedule(ORPHAN_FILES_DELETION);
+
+ verify(operationsRepo)
+ .updateBatch(
+ eq(List.of(row.getId())),
+ eq(OperationStatus.SCHEDULING),
+ eq(OperationStatus.PENDING),
+ eq(Optional.empty()),
+ eq(Optional.empty()));
+ verify(operationsRepo, never())
+ .updateBatch(
+ anyList(), eq(OperationStatus.SCHEDULING), eq(OperationStatus.SCHEDULED), any(), any());
+ }
+
+ @Test
+ void schedule_rowsAlreadyClaimed_skipsSubmit() {
+ String uuid = UUID.randomUUID().toString();
+ TableOperationsRow row = pendingRow(uuid, "db1", "tbl1");
+
+ stubFindPending(List.of(row));
+ when(statsRepo.findAllById(any())).thenReturn(List.of(statsRow(uuid, 100L)));
+ when(operationsRepo.updateBatch(
+ anyList(), eq(OperationStatus.PENDING), eq(OperationStatus.SCHEDULING), any(), any()))
+ .thenReturn(0);
+ stubFindClaimed(List.of());
+
+ runner.schedule(ORPHAN_FILES_DELETION);
+
+ verify(jobsClient, never()).launch(anyString(), anyString(), anyList(), anyList(), anyString());
+ verify(operationsRepo, never())
+ .updateBatch(
+ anyList(), eq(OperationStatus.SCHEDULING), eq(OperationStatus.SCHEDULED), any(), any());
+ verify(operationsRepo, never())
+ .updateBatch(
+ anyList(), eq(OperationStatus.SCHEDULING), eq(OperationStatus.PENDING), any(), any());
+ }
+
+ @Test
+ void schedule_cancelsDuplicatePendingPerCycle() {
+ String uuid = UUID.randomUUID().toString();
+ TableOperationsRow row1 = pendingRow(uuid, "db1", "tbl1");
+ TableOperationsRow row2 = pendingRow(uuid, "db1", "tbl1");
+
+ stubFindPending(List.of(row1, row2));
+ when(operationsRepo.cancel(anyList())).thenReturn(1);
+ when(statsRepo.findAllById(any())).thenReturn(List.of(statsRow(uuid, 100L)));
+ when(operationsRepo.updateBatch(
+ anyList(), eq(OperationStatus.PENDING), eq(OperationStatus.SCHEDULING), any(), any()))
+ .thenReturn(1);
+ TableOperationsRow survivor = row1.getCreatedAt().isBefore(row2.getCreatedAt()) ? row1 : row2;
+ if (row1.getCreatedAt().equals(row2.getCreatedAt())) {
+ survivor = row1.getId().compareTo(row2.getId()) <= 0 ? row1 : row2;
+ }
+ stubFindClaimed(List.of(schedulingRow(survivor)));
+ when(operationsRepo.updateBatch(
+ anyList(), eq(OperationStatus.SCHEDULING), eq(OperationStatus.SCHEDULED), any(), any()))
+ .thenReturn(1);
+ when(jobsClient.launch(anyString(), anyString(), anyList(), anyList(), anyString()))
+ .thenReturn(Optional.of("job-dedup"));
+
+ runner.schedule(ORPHAN_FILES_DELETION);
+
+ ArgumentCaptor> cancelled = ArgumentCaptor.forClass(List.class);
+ verify(operationsRepo).cancel(cancelled.capture());
+ assertThat(cancelled.getValue()).hasSize(1);
+ }
+
+ @Test
+ void schedule_partialClaim_launchesAndMarksOnlyClaimedSubset() {
+ String uuidA = UUID.randomUUID().toString();
+ String uuidB = UUID.randomUUID().toString();
+ TableOperationsRow rowA = pendingRow(uuidA, "db1", "tblA");
+ TableOperationsRow rowB = pendingRow(uuidB, "db1", "tblB");
+
+ stubFindPending(List.of(rowA, rowB));
+ when(statsRepo.findAllById(any()))
+ .thenReturn(List.of(statsRow(uuidA, 100L), statsRow(uuidB, 100L)));
+ when(operationsRepo.updateBatch(
+ anyList(), eq(OperationStatus.PENDING), eq(OperationStatus.SCHEDULING), any(), any()))
+ .thenReturn(1);
+ // Only A actually claimed.
+ stubFindClaimed(List.of(schedulingRow(rowA)));
+ when(operationsRepo.updateBatch(
+ anyList(), eq(OperationStatus.SCHEDULING), eq(OperationStatus.SCHEDULED), any(), any()))
+ .thenReturn(1);
+ when(jobsClient.launch(anyString(), anyString(), anyList(), anyList(), anyString()))
+ .thenReturn(Optional.of("job-partial"));
+
+ runner.schedule(ORPHAN_FILES_DELETION);
+
+ ArgumentCaptor> launchedTableNames = ArgumentCaptor.forClass(List.class);
+ ArgumentCaptor> launchedOpIds = ArgumentCaptor.forClass(List.class);
+ verify(jobsClient)
+ .launch(
+ anyString(),
+ anyString(),
+ launchedTableNames.capture(),
+ launchedOpIds.capture(),
+ anyString());
+ assertThat(launchedTableNames.getValue()).containsExactly("db1.tblA");
+ assertThat(launchedOpIds.getValue()).containsExactly(rowA.getId());
+
+ verify(operationsRepo)
+ .updateBatch(
+ eq(List.of(rowA.getId())),
+ eq(OperationStatus.SCHEDULING),
+ eq(OperationStatus.SCHEDULED),
+ eq(Optional.empty()),
+ eq(Optional.of("job-partial")));
+ }
+
+ @Test
+ void schedule_opsWithoutStats_skipped() {
+ String withStats = UUID.randomUUID().toString();
+ String missing = UUID.randomUUID().toString();
+ TableOperationsRow withStatsRow = pendingRow(withStats, "db1", "tblA");
+ TableOperationsRow missingRow = pendingRow(missing, "db1", "tblB");
+
+ stubFindPending(List.of(withStatsRow, missingRow));
+ when(statsRepo.findAllById(any())).thenReturn(List.of(statsRow(withStats, 50L)));
+ when(operationsRepo.updateBatch(
+ anyList(), eq(OperationStatus.PENDING), eq(OperationStatus.SCHEDULING), any(), any()))
+ .thenReturn(1);
+ stubFindClaimed(List.of(schedulingRow(withStatsRow)));
+ when(operationsRepo.updateBatch(
+ anyList(), eq(OperationStatus.SCHEDULING), eq(OperationStatus.SCHEDULED), any(), any()))
+ .thenReturn(1);
+ when(jobsClient.launch(anyString(), anyString(), anyList(), anyList(), anyString()))
+ .thenReturn(Optional.of("job-skip"));
+
+ runner.schedule(ORPHAN_FILES_DELETION);
+
+ ArgumentCaptor> ids = ArgumentCaptor.forClass(List.class);
+ verify(operationsRepo)
+ .updateBatch(
+ ids.capture(),
+ eq(OperationStatus.PENDING),
+ eq(OperationStatus.SCHEDULING),
+ any(),
+ any());
+ assertThat(ids.getValue()).containsExactly(withStatsRow.getId());
+ }
+}
diff --git a/services/optimizer/scheduler/src/test/resources/application-test.properties b/services/optimizer/scheduler/src/test/resources/application-test.properties
new file mode 100644
index 000000000..57354728e
--- /dev/null
+++ b/services/optimizer/scheduler/src/test/resources/application-test.properties
@@ -0,0 +1,11 @@
+spring.datasource.url=jdbc:h2:mem:schedulertestdb;DB_CLOSE_DELAY=-1;MODE=MySQL
+spring.datasource.username=sa
+spring.datasource.password=
+spring.jpa.hibernate.ddl-auto=none
+spring.sql.init.mode=always
+spring.sql.init.schema-locations=classpath:db/optimizer-schema.sql
+optimizer.scheduler.jobs.base-uri=http://localhost:9999
+optimizer.scheduler.ofd.max-files-per-bin=1000000
+optimizer.scheduler.ofd.max-tables-per-bin=50
+optimizer.scheduler.results-endpoint=http://localhost:8080/v1/optimizer/operations
+optimizer.scheduler.cluster-id=test-cluster
diff --git a/settings.gradle b/settings.gradle
index 810ecd643..25a76bba0 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -45,6 +45,7 @@ include ':iceberg:openhouse:internalcatalog'
include ':iceberg:azure'
include ':libs:datalayout'
+include ':libs:optimizer:binpack'
include ':services:common'
include ':services:housetables'
@@ -52,6 +53,8 @@ include ':services:jobs'
include ':services:optimizer'
include ':services:optimizer:analyzer'
include ':apps:optimizer:analyzerapp'
+include ':services:optimizer:scheduler'
+include ':apps:optimizer:schedulerapp'
include ':services:tables'
include ':tables-test-fixtures:tables-test-fixtures-iceberg-1.2'
include ':tables-test-fixtures:tables-test-fixtures-iceberg-1.5'