future, OptimizerServiceClient client) {
+ try {
+ return Boolean.TRUE.equals(future.get()) ? 1 : 0;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.error("Worker interrupted: fqtn={}", entry.getFqtn(), e);
+ otelEmitter.count(
+ METRICS_SCOPE,
+ "optimizer_batch_interrupted",
+ 1,
+ Attributes.of(AttributeKey.stringKey(AppConstants.TABLE_NAME), entry.getFqtn()));
+ return 0;
+ } catch (ExecutionException e) {
+ // The worker catches Throwable internally and always reports its own result, so reaching
+ // here means the worker itself leaked an exception. Be defensive: post FAILED so the
+ // operation row doesn't sit SCHEDULED until the stale-timeout.
+ log.error(
+ "Worker threw outside its own catch for fqtn={} — reporting FAILED",
+ entry.getFqtn(),
+ e.getCause());
+ reportResult(entry, UpdateOperationRequest.StatusEnum.FAILED, client);
+ return 0;
+ }
+ }
+
+ private void shutdownPool(ExecutorService pool) {
+ pool.shutdown();
+ try {
+ if (!pool.awaitTermination(30, TimeUnit.SECONDS)) {
+ pool.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ pool.shutdownNow();
+ }
+ }
+
+ protected OptimizerServiceClient newOptimizerClient() {
+ return new OptimizerServiceClient(resultsEndpoint);
+ }
+
+ /**
+ * POST the per-operation outcome to the Optimizer Service via the generated client. The wrapper
+ * returns {@link java.util.Optional#empty()} when the call exhausts retries — we log + count and
+ * leave the operation row at SCHEDULED so the Analyzer's stale-timeout can re-queue it.
+ *
+ * {@code status} is passed in as a {@link UpdateOperationRequest.StatusEnum} rather than a
+ * boolean so the caller's intent is unambiguous and new terminal states (e.g. CANCELED) can be
+ * plumbed in without changing the signature.
+ */
+ private void reportResult(
+ BatchEntry entry, UpdateOperationRequest.StatusEnum status, OptimizerServiceClient client) {
+ UpdateOperationRequest body =
+ new UpdateOperationRequest()
+ .operationId(entry.getOperationId())
+ .status(status)
+ .tableUuid(entry.getTableUuid())
+ .databaseName(entry.getDatabaseName())
+ .tableName(entry.getTableName())
+ .operationType(UpdateOperationRequest.OperationTypeEnum.ORPHAN_FILES_DELETION);
+ if (!client.updateOperation(entry.getOperationId(), body).isPresent()) {
+ log.error(
+ "Failed to report operation result after retries; row will stay SCHEDULED until stale-timeout: operationId={} fqtn={}",
+ entry.getOperationId(),
+ entry.getFqtn());
+ otelEmitter.count(
+ METRICS_SCOPE,
+ "optimizer_update_failed",
+ 1,
+ Attributes.of(AttributeKey.stringKey(AppConstants.TABLE_NAME), entry.getFqtn()));
+ }
+ }
+
+ /** One unit of work in a batched OFD job. */
+ private final class TableWorker implements Callable {
+ private final Operations ops;
+ private final BatchEntry entry;
+ private final OptimizerServiceClient client;
+
+ TableWorker(Operations ops, BatchEntry entry, OptimizerServiceClient client) {
+ this.ops = ops;
+ this.entry = entry;
+ this.client = client;
+ }
+
+ @Override
+ public Boolean call() {
+ String fqtn = entry.getFqtn();
+ UpdateOperationRequest.StatusEnum status = UpdateOperationRequest.StatusEnum.FAILED;
+ try {
+ log.info("OFD start: fqtn={} operationId={}", fqtn, entry.getOperationId());
+ Table table = ops.getTable(fqtn);
+ long olderThanTimestampMillis =
+ System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(resolveTtlSeconds(table));
+ DeleteOrphanFiles.Result result =
+ ops.deleteOrphanFiles(
+ table,
+ olderThanTimestampMillis,
+ Boolean.parseBoolean(
+ table.properties().getOrDefault(AppConstants.BACKUP_ENABLED_KEY, "false")),
+ backupDir,
+ concurrentDeletes,
+ streamResults,
+ maxOrphanFileSampleSize);
+ // Count via iteration rather than materializing the full path list: a table with millions
+ // of orphan files would otherwise OOM the driver, and that risk multiplies with
+ // driverParallelism workers running concurrently.
+ int orphanCount = Iterables.size(result.orphanFileLocations());
+ otelEmitter.count(
+ METRICS_SCOPE,
+ AppConstants.ORPHAN_FILE_COUNT,
+ orphanCount,
+ Attributes.of(AttributeKey.stringKey(AppConstants.TABLE_NAME), fqtn));
+ validate(fqtn);
+ status = UpdateOperationRequest.StatusEnum.SUCCESS;
+ log.info("OFD success: fqtn={} orphansDetected={}", fqtn, orphanCount);
+ } catch (Throwable t) {
+ log.error("OFD failed: fqtn={} operationId={}", fqtn, entry.getOperationId(), t);
+ } finally {
+ // Defensive: reportResult must not throw out of the finally block, since that would mask
+ // the original failure and propagate up to awaitOne, which would then report FAILED again.
+ try {
+ reportResult(entry, status, client);
+ } catch (Throwable t) {
+ log.error(
+ "reportResult itself threw; operation row will stay SCHEDULED until stale-timeout: fqtn={}",
+ fqtn,
+ t);
+ }
+ }
+ return status == UpdateOperationRequest.StatusEnum.SUCCESS;
+ }
+
+ /**
+ * Re-runs {@link TableStateValidator} — the same post-job consistency check the single-table
+ * {@link OrphanFilesDeletionSparkApp} uses — to confirm the table's manifests and metadata are
+ * intact after deletion. A failure here is treated as a failed operation: it's logged, counted,
+ * and re-thrown so the outer {@link #call()} marks {@code success=false}.
+ */
+ private void validate(String fqtn) {
+ try {
+ TableStateValidator.run(ops.spark(), fqtn);
+ } catch (TableValidationException e) {
+ log.error("Post-job validation failed: fqtn={}", fqtn, e);
+ otelEmitter.count(
+ METRICS_SCOPE,
+ "post_run_validation_error",
+ 1,
+ Attributes.of(
+ AttributeKey.stringKey(AppConstants.TABLE_NAME),
+ fqtn,
+ AttributeKey.stringKey(AppConstants.JOB_NAME),
+ BatchedOrphanFilesDeletionSparkApp.class.getSimpleName()));
+ throw e;
+ }
+ }
+
+ private long resolveTtlSeconds(Table table) {
+ long resolved = ttlSeconds;
+ if (Boolean.parseBoolean(
+ table.properties().getOrDefault(AppConstants.OFD_ONE_DAY_TTL_ENABLED_KEY, "false"))) {
+ resolved = TimeUnit.DAYS.toSeconds(1);
+ }
+ String tableType =
+ table
+ .properties()
+ .getOrDefault(AppConstants.OPENHOUSE_TABLE_TYPE_KEY, AppConstants.TABLE_TYPE_PRIMARY);
+ if (AppConstants.TABLE_TYPE_REPLICA.equals(tableType)
+ && Duration.ofSeconds(resolved).toDays() < DEFAULT_MIN_OFD_TTL_IN_DAYS) {
+ resolved = TimeUnit.DAYS.toSeconds(DEFAULT_MIN_OFD_TTL_IN_DAYS);
+ }
+ return resolved;
+ }
+ }
+
+ /** Per-table inputs for one operation row inside a bin. */
+ @lombok.AllArgsConstructor
+ @lombok.Builder
+ @lombok.Getter
+ @lombok.ToString
+ public static class BatchEntry {
+ private final String fqtn;
+ private final String operationId;
+ private final String tableUuid;
+ private final String databaseName;
+ private final String tableName;
+ }
+
+ public static void main(String[] args) {
+ OtelEmitter otelEmitter =
+ new AppsOtelEmitter(Collections.singletonList(DefaultOtelConfig.getOpenTelemetry()));
+ createApp(args, otelEmitter).run();
+ }
+
+ public static BatchedOrphanFilesDeletionSparkApp createApp(
+ String[] args, OtelEmitter otelEmitter) {
+ List extraOptions =
+ Arrays.asList(
+ valueOpt("tableNames", "Comma-separated list of fully-qualified table names"),
+ valueOpt("operationIds", "Comma-separated operation UUIDs, parallel to tableNames"),
+ valueOpt("tableUuids", "Comma-separated table UUIDs, parallel to tableNames"),
+ valueOpt("resultsEndpoint", "Base URL of the Optimizer Service"),
+ valueOpt("driverParallelism", "Worker threads in this batch (default 1)"),
+ valueOpt("trashDir", "tr", "Orphan files staging dir before deletion"),
+ valueOpt(
+ "ttl",
+ "r",
+ "How old files should be to be considered orphaned in seconds, minimum 1d is enforced"),
+ valueOpt("backupDir", "b", "Backup directory for deleted data"),
+ valueOpt("concurrentDeletes", "c", "Number of concurrent deletes per table"),
+ flagOpt("streamResults", "Stream orphan file deletions instead of collecting"),
+ valueOpt("maxOrphanFileSampleSize", "Max orphan file sample paths returned"));
+
+ CommandLine cmdLine = createCommandLine(args, extraOptions);
+
+ List entries =
+ buildEntries(
+ cmdLine.getOptionValue("tableNames"),
+ cmdLine.getOptionValue("operationIds"),
+ cmdLine.getOptionValue("tableUuids"));
+
+ return new BatchedOrphanFilesDeletionSparkApp(
+ getJobId(cmdLine),
+ createStateManager(cmdLine, otelEmitter),
+ otelEmitter,
+ entries,
+ requireOption(cmdLine, "resultsEndpoint"),
+ Integer.parseInt(cmdLine.getOptionValue("driverParallelism", "1")),
+ Math.max(
+ NumberUtils.toLong(cmdLine.getOptionValue("ttl"), TimeUnit.DAYS.toSeconds(7)),
+ TimeUnit.DAYS.toSeconds(1)),
+ cmdLine.getOptionValue("backupDir", ".backup"),
+ Integer.parseInt(cmdLine.getOptionValue("concurrentDeletes", "10")),
+ cmdLine.hasOption("streamResults"),
+ Integer.parseInt(
+ cmdLine.getOptionValue(
+ "maxOrphanFileSampleSize", String.valueOf(DEFAULT_MAX_ORPHAN_FILE_SAMPLE_SIZE))));
+ }
+
+ static List buildEntries(String tableNames, String operationIds, String tableUuids) {
+ if (tableNames == null
+ || operationIds == null
+ || tableUuids == null
+ || tableNames.isEmpty()
+ || operationIds.isEmpty()
+ || tableUuids.isEmpty()) {
+ throw new IllegalArgumentException(
+ "--tableNames, --operationIds, and --tableUuids are all required and must be non-empty");
+ }
+ String[] tables = tableNames.split(",");
+ String[] ops = operationIds.split(",");
+ String[] uuids = tableUuids.split(",");
+ if (tables.length != ops.length || tables.length != uuids.length) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Parallel-list length mismatch: tableNames=%d operationIds=%d tableUuids=%d",
+ tables.length, ops.length, uuids.length));
+ }
+ List entries = new ArrayList<>(tables.length);
+ for (int i = 0; i < tables.length; i++) {
+ String fqtn = tables[i].trim();
+ String[] dbAndTable = fqtn.split("\\.", 2);
+ if (dbAndTable.length != 2 || dbAndTable[0].isEmpty() || dbAndTable[1].isEmpty()) {
+ throw new IllegalArgumentException(
+ "tableNames entries must be fully-qualified (db.table): " + fqtn);
+ }
+ entries.add(
+ BatchEntry.builder()
+ .fqtn(fqtn)
+ .operationId(ops[i].trim())
+ .tableUuid(uuids[i].trim())
+ .databaseName(dbAndTable[0])
+ .tableName(dbAndTable[1])
+ .build());
+ }
+ return entries;
+ }
+
+ private static String requireOption(CommandLine cmdLine, String name) {
+ String value = cmdLine.getOptionValue(name);
+ if (value == null || value.isEmpty()) {
+ throw new IllegalArgumentException("--" + name + " is required");
+ }
+ return value;
+ }
+
+ /** Long-only CLI option carrying a value (read with {@code cmdLine.getOptionValue(name)}). */
+ private static Option valueOpt(String name, String description) {
+ return new Option(null, name, true, description);
+ }
+
+ /** Aliased CLI option carrying a value. {@code shortOpt} is the legacy single-letter alias. */
+ private static Option valueOpt(String name, String shortOpt, String description) {
+ return new Option(shortOpt, name, true, description);
+ }
+
+ /** Long-only boolean CLI flag (read with {@code cmdLine.hasOption(name)}). */
+ private static Option flagOpt(String name, String description) {
+ return new Option(null, name, false, description);
+ }
+
+ /** Visible for tests. */
+ List getEntries() {
+ return Collections.unmodifiableList(entries);
+ }
+
+ /** Visible for tests. */
+ int getDriverParallelism() {
+ return driverParallelism;
+ }
+}
diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/optimizer/OptimizerServiceClient.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/optimizer/OptimizerServiceClient.java
new file mode 100644
index 000000000..f350b228d
--- /dev/null
+++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/optimizer/OptimizerServiceClient.java
@@ -0,0 +1,83 @@
+package com.linkedin.openhouse.jobs.spark.optimizer;
+
+import com.linkedin.openhouse.client.ssl.OptimizerApiClientFactory;
+import com.linkedin.openhouse.jobs.util.RetryUtil;
+import com.linkedin.openhouse.optimizer.client.api.TableOperationsControllerApi;
+import com.linkedin.openhouse.optimizer.client.invoker.ApiClient;
+import com.linkedin.openhouse.optimizer.client.model.TableOperationsHistory;
+import com.linkedin.openhouse.optimizer.client.model.UpdateOperationRequest;
+import java.net.MalformedURLException;
+import java.time.Duration;
+import java.util.Objects;
+import java.util.Optional;
+import javax.net.ssl.SSLException;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.retry.RetryCallback;
+import org.springframework.retry.support.RetryTemplate;
+
+/**
+ * Thin wrapper around the generated optimizer-service {@link TableOperationsControllerApi}. Mirrors
+ * the structure of {@link com.linkedin.openhouse.jobs.client.JobsClient}: a hand-written facade
+ * over the auto-generated client, wired with a {@link RetryTemplate} that retries 5xx with
+ * exponential backoff.
+ *
+ * The batched Spark app calls {@link #updateOperation(String, UpdateOperationRequest)} once per
+ * finished operation to record SUCCESS or FAILED. Per the design, a missed update is recoverable —
+ * the operation row stays {@code SCHEDULED} and the Analyzer's stale-timeout will re-queue it — so
+ * this client surfaces but does not swallow failures.
+ */
+@Slf4j
+public class OptimizerServiceClient {
+
+ private static final int REQUEST_TIMEOUT_SECONDS = 5;
+
+ private final RetryTemplate retryTemplate;
+ private final TableOperationsControllerApi api;
+
+ public OptimizerServiceClient(String baseUrl) {
+ this(baseUrl, null);
+ }
+
+ public OptimizerServiceClient(String baseUrl, String truststoreLocation) {
+ this(RetryUtil.getOptimizerApiRetryTemplate(), buildApi(baseUrl, truststoreLocation));
+ }
+
+ OptimizerServiceClient(RetryTemplate retryTemplate, TableOperationsControllerApi api) {
+ this.retryTemplate = retryTemplate;
+ this.api = api;
+ }
+
+ /**
+ * Reports a terminal status for {@code operationId}. The path id and the body's {@code
+ * operationId} must match — callers should set both via the same value.
+ *
+ * @return the created history record, or {@link Optional#empty()} if the call failed after
+ * retries (logged; the caller decides what to do).
+ */
+ public Optional updateOperation(
+ String operationId, UpdateOperationRequest request) {
+ Objects.requireNonNull(operationId, "operationId");
+ Objects.requireNonNull(request, "request");
+ return Optional.ofNullable(
+ RetryUtil.executeWithRetry(
+ retryTemplate,
+ (RetryCallback)
+ context ->
+ api.updateOperation(operationId, request)
+ .block(Duration.ofSeconds(REQUEST_TIMEOUT_SECONDS)),
+ null));
+ }
+
+ private static TableOperationsControllerApi buildApi(String baseUrl, String truststoreLocation) {
+ Objects.requireNonNull(baseUrl, "baseUrl");
+ try {
+ ApiClient apiClient =
+ OptimizerApiClientFactory.getInstance()
+ .createApiClient(baseUrl, null, truststoreLocation);
+ return new TableOperationsControllerApi(apiClient);
+ } catch (MalformedURLException | SSLException e) {
+ throw new RuntimeException(
+ "Failed to construct optimizer service client for baseUrl=" + baseUrl, e);
+ }
+ }
+}
diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/RetryUtil.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/RetryUtil.java
index 29a25e962..357b09eaf 100644
--- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/RetryUtil.java
+++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/RetryUtil.java
@@ -51,6 +51,15 @@ public static RetryTemplate getTablesApiRetryTemplate() {
.build();
}
+ public static RetryTemplate getOptimizerApiRetryTemplate() {
+ RetryTemplateBuilder builder = new RetryTemplateBuilder();
+ return builder
+ .maxAttempts(5)
+ .customBackoff(DEFAULT_JOBS_BACKOFF_POLICY)
+ .retryOn(WebClientResponseException.InternalServerError.class)
+ .build();
+ }
+
public static RetryTemplate getTrinoClientRetryTemplate() {
RetryTemplateBuilder builder = new RetryTemplateBuilder();
return builder
diff --git a/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/BatchedOrphanFilesDeletionSparkAppArgsTest.java b/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/BatchedOrphanFilesDeletionSparkAppArgsTest.java
new file mode 100644
index 000000000..7a32e503f
--- /dev/null
+++ b/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/BatchedOrphanFilesDeletionSparkAppArgsTest.java
@@ -0,0 +1,74 @@
+package com.linkedin.openhouse.jobs.spark;
+
+import java.util.List;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Pure-Java unit tests for {@link BatchedOrphanFilesDeletionSparkApp#buildEntries}. No Spark
+ * session, no HTTP — exercises the CLI-parsing edges that decide whether the app can even start.
+ */
+public class BatchedOrphanFilesDeletionSparkAppArgsTest {
+
+ @Test
+ public void buildEntriesParsesParallelLists() {
+ List entries =
+ BatchedOrphanFilesDeletionSparkApp.buildEntries(
+ "db1.t1,db2.t2", "op-1,op-2", "uuid-1,uuid-2");
+
+ Assertions.assertEquals(2, entries.size());
+ Assertions.assertEquals("db1.t1", entries.get(0).getFqtn());
+ Assertions.assertEquals("db1", entries.get(0).getDatabaseName());
+ Assertions.assertEquals("t1", entries.get(0).getTableName());
+ Assertions.assertEquals("op-1", entries.get(0).getOperationId());
+ Assertions.assertEquals("uuid-1", entries.get(0).getTableUuid());
+ Assertions.assertEquals("db2.t2", entries.get(1).getFqtn());
+ Assertions.assertEquals("op-2", entries.get(1).getOperationId());
+ }
+
+ @Test
+ public void buildEntriesTrimsWhitespaceInEachEntry() {
+ List entries =
+ BatchedOrphanFilesDeletionSparkApp.buildEntries(
+ " db1.t1 , db2.t2 ", " op-1 , op-2 ", " uuid-1 , uuid-2 ");
+
+ Assertions.assertEquals("db1.t1", entries.get(0).getFqtn());
+ Assertions.assertEquals("op-1", entries.get(0).getOperationId());
+ Assertions.assertEquals("uuid-1", entries.get(0).getTableUuid());
+ }
+
+ @Test
+ public void buildEntriesRejectsMismatchedLengths() {
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ BatchedOrphanFilesDeletionSparkApp.buildEntries("db.a,db.b", "op-1", "uuid-1,uuid-2"));
+ }
+
+ @Test
+ public void buildEntriesRejectsNullArguments() {
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () -> BatchedOrphanFilesDeletionSparkApp.buildEntries(null, "op-1", "uuid-1"));
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () -> BatchedOrphanFilesDeletionSparkApp.buildEntries("db.a", null, "uuid-1"));
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () -> BatchedOrphanFilesDeletionSparkApp.buildEntries("db.a", "op-1", null));
+ }
+
+ @Test
+ public void buildEntriesRejectsEmptyStrings() {
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () -> BatchedOrphanFilesDeletionSparkApp.buildEntries("", "op-1", "uuid-1"));
+ }
+
+ @Test
+ public void buildEntriesRejectsNonFqtn() {
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () -> BatchedOrphanFilesDeletionSparkApp.buildEntries("just_a_table", "op-1", "uuid-1"));
+ }
+}
diff --git a/client/optimizerclient/build.gradle b/client/optimizerclient/build.gradle
new file mode 100644
index 000000000..1a3a0ce17
--- /dev/null
+++ b/client/optimizerclient/build.gradle
@@ -0,0 +1,11 @@
+plugins {
+ id 'openhouse.java-minimal-conventions'
+ id 'openhouse.client-codegen-convention'
+ id 'openhouse.maven-publish'
+}
+
+ext {
+ codeGenForService = ":services:optimizer"
+}
+
+apply from: "${project(':client:common').file("codegen.build.gradle")}"
diff --git a/client/secureclient/build.gradle b/client/secureclient/build.gradle
index 93b0f256f..e31146fe4 100644
--- a/client/secureclient/build.gradle
+++ b/client/secureclient/build.gradle
@@ -8,6 +8,7 @@ plugins {
dependencies {
api project(':client:tableclient')
api project(':client:jobsclient')
+ api project(':client:optimizerclient')
api project(':client:hts')
api 'org.springframework.boot:spring-boot-starter-webflux:2.7.8'
testImplementation 'io.netty:netty-resolver-dns-native-macos:4.1.70.Final:osx-x86_64'
diff --git a/client/secureclient/src/main/java/com/linkedin/openhouse/client/ssl/OptimizerApiClientFactory.java b/client/secureclient/src/main/java/com/linkedin/openhouse/client/ssl/OptimizerApiClientFactory.java
new file mode 100644
index 000000000..2594ce408
--- /dev/null
+++ b/client/secureclient/src/main/java/com/linkedin/openhouse/client/ssl/OptimizerApiClientFactory.java
@@ -0,0 +1,49 @@
+package com.linkedin.openhouse.client.ssl;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.linkedin.openhouse.optimizer.client.invoker.ApiClient;
+import java.net.MalformedURLException;
+import java.text.DateFormat;
+import javax.net.ssl.SSLException;
+import lombok.NonNull;
+import org.springframework.web.reactive.function.client.WebClient;
+
+/** Factory to create optimizer-specific {@link ApiClient}. Mirrors {@link JobsApiClientFactory}. */
+public final class OptimizerApiClientFactory extends WebClientFactory {
+
+ private static OptimizerApiClientFactory instance;
+
+ private OptimizerApiClientFactory() {
+ super();
+ }
+
+ public static synchronized OptimizerApiClientFactory getInstance() {
+ if (null == instance) {
+ instance = new OptimizerApiClientFactory();
+ }
+ return instance;
+ }
+
+ @Override
+ protected WebClient.Builder createWebClientBuilder() {
+ DateFormat defaultDateFormat = ApiClient.createDefaultDateFormat();
+ ObjectMapper defaultObjectMapper =
+ ApiClient.createDefaultObjectMapper(defaultDateFormat)
+ .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ return ApiClient.buildWebClientBuilder(defaultObjectMapper);
+ }
+
+ /**
+ * Creates the optimizer-specific {@link ApiClient} that the generated {@code
+ * TableOperationsControllerApi} / {@code TableStatsControllerApi} / {@code
+ * TableOperationsHistoryControllerApi} wrap.
+ */
+ public ApiClient createApiClient(@NonNull String baseUrl, String token, String truststoreLocation)
+ throws MalformedURLException, SSLException {
+ WebClient webClient = createWebClient(baseUrl, token, truststoreLocation);
+ ApiClient apiClient = new ApiClient(webClient);
+ apiClient.setBasePath(baseUrl);
+ return apiClient;
+ }
+}
diff --git a/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/BinItem.java b/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/BinItem.java
index 6736ea5dc..1e7b5968a 100644
--- a/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/BinItem.java
+++ b/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/BinItem.java
@@ -8,7 +8,7 @@
* when it launches a Spark job (fully-qualified table name, operation id).
*
* Implementations have a public no-arg constructor — instantiated transiently inside {@link
- * FirstFitBinPacker#pack} via a {@code Supplier} (typically a {@code
+ * FirstFitDecreasingBinPacker#pack} via a {@code Supplier} (typically a {@code
* MyItem::new} method reference) — on which {@link #fromOpAndStats} is called to return the
* populated item. Getters on the empty instance are not meaningful; it exists for the lifetime of a
* single projection call.
diff --git a/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/FirstFitBinPacker.java b/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/FirstFitBinPacker.java
deleted file mode 100644
index 9cf452709..000000000
--- a/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/FirstFitBinPacker.java
+++ /dev/null
@@ -1,92 +0,0 @@
-package com.linkedin.openhouse.optimizer.binpack;
-
-import com.linkedin.openhouse.optimizer.model.TableOperationDto;
-import com.linkedin.openhouse.optimizer.model.TableStatsDto;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-import lombok.extern.slf4j.Slf4j;
-
-/**
- * First-fit-decreasing packing, generic over the concrete {@link BinItem} subtype {@code T}.
- * Construction takes a {@code Supplier} — typically a {@code MyItem::new} method reference —
- * which the packer invokes per operation to get an empty instance, then calls {@link
- * BinItem#fromOpAndStats} on it to project the (operation, stats) pair into a populated item.
- *
- * Sorts items by weight descending, then places each into the first group whose totals stay at
- * or below {@code maxWeightPerBin} and {@code maxItemsPerBin}. An item whose weight exceeds the cap
- * on its own goes into a group by itself. Operations whose {@code tableUuid} has no entry in {@code
- * statsByTableUuid} are dropped.
- *
- *
Stateless: the constructor takes only the BinItem supplier and the cap configuration; {@link
- * #pack} is a pure function over its arguments. The packer is operation-agnostic — the scheduler
- * wraps each grouping into a {@link Bin} with the registered operation type.
- */
-@Slf4j
-public class FirstFitBinPacker implements BinPacker {
-
- private final Supplier binItemSupplier;
- private final long maxWeightPerBin;
- private final int maxItemsPerBin;
-
- public FirstFitBinPacker(Supplier binItemSupplier, long maxWeightPerBin, int maxItemsPerBin) {
- this.binItemSupplier = binItemSupplier;
- this.maxWeightPerBin = maxWeightPerBin;
- this.maxItemsPerBin = maxItemsPerBin;
- }
-
- @Override
- public List> pack(
- List operations, Map statsByTableUuid) {
- List items =
- operations.stream()
- .filter(op -> statsByTableUuid.containsKey(op.getTableUuid()))
- .map(
- op ->
- binItemSupplier
- .get()
- .fromOpAndStats(op, statsByTableUuid.get(op.getTableUuid())))
- .collect(Collectors.toList());
- List packingBins =
- items.stream()
- .sorted(Comparator.comparingLong(BinItem::getWeight).reversed())
- .collect(ArrayList::new, this::placeItem, List::addAll);
- log.info(
- "Packed {} operations ({} items after projection) into {} groupings",
- operations.size(),
- items.size(),
- packingBins.size());
- return packingBins.stream().map(pb -> pb.items).collect(Collectors.toList());
- }
-
- private void placeItem(List bins, BinItem item) {
- bins.stream()
- .filter(b -> b.fits(item, maxWeightPerBin, maxItemsPerBin))
- .findFirst()
- .ifPresentOrElse(
- b -> b.add(item),
- () -> {
- PackingBin fresh = new PackingBin();
- fresh.add(item);
- bins.add(fresh);
- });
- }
-
- /** Running-totals helper used during the fold. */
- private static class PackingBin {
- final List items = new ArrayList<>();
- long totalWeight;
-
- boolean fits(BinItem item, long maxWeight, int maxItems) {
- return items.size() < maxItems && totalWeight + item.getWeight() <= maxWeight;
- }
-
- void add(BinItem item) {
- items.add(item);
- totalWeight += item.getWeight();
- }
- }
-}
diff --git a/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/FirstFitDecreasingBinPacker.java b/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/FirstFitDecreasingBinPacker.java
new file mode 100644
index 000000000..689188ed5
--- /dev/null
+++ b/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/FirstFitDecreasingBinPacker.java
@@ -0,0 +1,133 @@
+package com.linkedin.openhouse.optimizer.binpack;
+
+import com.linkedin.openhouse.optimizer.model.OperationTypeDto;
+import com.linkedin.openhouse.optimizer.model.TableOperationDto;
+import com.linkedin.openhouse.optimizer.model.TableStatsDto;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import lombok.Builder;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * First-fit-decreasing packing. Sorts items by weight descending, then places each into the first
+ * group whose totals stay at or below {@code maxWeightPerBin} and {@code maxItemsPerBin}. An item
+ * whose weight exceeds the cap on its own goes into a group by itself.
+ *
+ * Construct via the generated builder. Defaults are sized for the orphan-files-deletion case
+ * (1M-file weight cap, 50 tables per bin) but every field is overridable. Omit {@code
+ * binItemSupplier} when you are passing pre-projected {@link BinItem}s and don't need the
+ * projection path.
+ *
+ *
Three callable shapes, all sharing the same placement core:
+ *
+ *
+ * Projection path ({@link #pack(List, Map)}, the {@link BinPacker} contract) — takes
+ * raw operations + stats, calls {@code binItemSupplier.get().fromOpAndStats(op, stats)} per
+ * row, returns the resulting groupings. Operations whose {@code tableUuid} has no entry in
+ * {@code statsByTableUuid} are dropped. Requires {@code binItemSupplier}.
+ * Pre-projected raw groupings ({@link #pack(List)}) — caller has already projected
+ * items; packer returns {@code List>}.
+ * Pre-projected tagged bins ({@link #pack(List, OperationTypeDto)}) — caller has
+ * already projected items; packer wraps each grouping in a {@link Bin} tagged with the
+ * supplied operation type. Use this when the bins flow directly to a scheduler that expects
+ * the operation type alongside the items.
+ *
+ *
+ * Stateless: {@code pack} is a pure function over its arguments.
+ */
+@Slf4j
+@Builder
+public class FirstFitDecreasingBinPacker implements BinPacker {
+
+ /** Required only for the {@link #pack(List, Map)} projection path; {@code null} otherwise. */
+ private final Supplier binItemSupplier;
+
+ @Builder.Default private final long maxWeightPerBin = 1_000_000L;
+
+ @Builder.Default private final int maxItemsPerBin = 50;
+
+ @Override
+ public List> pack(
+ List operations, Map statsByTableUuid) {
+ if (binItemSupplier == null) {
+ throw new IllegalStateException(
+ "FirstFitDecreasingBinPacker built without a binItemSupplier; use pack(List) or pack(List, OperationTypeDto) instead");
+ }
+ List items =
+ operations.stream()
+ .filter(op -> statsByTableUuid.containsKey(op.getTableUuid()))
+ .map(
+ op ->
+ binItemSupplier
+ .get()
+ .fromOpAndStats(op, statsByTableUuid.get(op.getTableUuid())))
+ .collect(Collectors.toList());
+ List> groupings = packCore(items);
+ log.info(
+ "Packed {} operations ({} items after projection) into {} groupings",
+ operations.size(),
+ items.size(),
+ groupings.size());
+ return groupings;
+ }
+
+ /** Pre-projected items; returns raw groupings. */
+ public List> pack(List items) {
+ List> groupings = packCore(items);
+ log.info(
+ "Packed {} pre-projected items into {} groupings",
+ items == null ? 0 : items.size(),
+ groupings.size());
+ return groupings;
+ }
+
+ /** Pre-projected items; returns {@link Bin}s each tagged with {@code operationType}. */
+ public List pack(List items, OperationTypeDto operationType) {
+ return pack(items).stream()
+ .map(group -> new Bin(operationType, group))
+ .collect(Collectors.toList());
+ }
+
+ private List> packCore(List items) {
+ if (items == null || items.isEmpty()) {
+ return new ArrayList<>();
+ }
+ List packingBins =
+ items.stream()
+ .sorted(Comparator.comparingLong(BinItem::getWeight).reversed())
+ .collect(ArrayList::new, this::placeItem, List::addAll);
+ return packingBins.stream().map(pb -> pb.items).collect(Collectors.toList());
+ }
+
+ private void placeItem(List bins, BinItem item) {
+ bins.stream()
+ .filter(b -> b.fits(item, maxWeightPerBin, maxItemsPerBin))
+ .findFirst()
+ .ifPresentOrElse(
+ b -> b.add(item),
+ () -> {
+ PackingBin fresh = new PackingBin();
+ fresh.add(item);
+ bins.add(fresh);
+ });
+ }
+
+ /** Running-totals helper used during the fold. */
+ private static class PackingBin {
+ final List items = new ArrayList<>();
+ long totalWeight;
+
+ boolean fits(BinItem item, long maxWeight, int maxItems) {
+ return items.size() < maxItems && totalWeight + item.getWeight() <= maxWeight;
+ }
+
+ void add(BinItem item) {
+ items.add(item);
+ totalWeight += item.getWeight();
+ }
+ }
+}
diff --git a/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/TotalFilesBinItem.java b/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/TotalFilesBinItem.java
index 70d63bb39..3ad5200de 100644
--- a/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/TotalFilesBinItem.java
+++ b/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/TotalFilesBinItem.java
@@ -12,8 +12,8 @@
* implementation knows nothing about which operation type it is wired up to.
*
* Construction: callers pass {@code TotalFilesBinItem::new} as the {@code Supplier} to {@link
- * FirstFitBinPacker}; the packer calls the supplier per operation to get an empty instance, then
- * {@link #fromOpAndStats} on it to get a populated copy.
+ * FirstFitDecreasingBinPacker}; the packer calls the supplier per operation to get an empty
+ * instance, then {@link #fromOpAndStats} on it to get a populated copy.
*/
@Getter
@ToString
diff --git a/libs/optimizer/binpack/src/test/java/com/linkedin/openhouse/optimizer/binpack/FirstFitBinPackerTest.java b/libs/optimizer/binpack/src/test/java/com/linkedin/openhouse/optimizer/binpack/FirstFitBinPackerTest.java
deleted file mode 100644
index 602f41711..000000000
--- a/libs/optimizer/binpack/src/test/java/com/linkedin/openhouse/optimizer/binpack/FirstFitBinPackerTest.java
+++ /dev/null
@@ -1,150 +0,0 @@
-package com.linkedin.openhouse.optimizer.binpack;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-import com.linkedin.openhouse.optimizer.model.TableOperationDto;
-import com.linkedin.openhouse.optimizer.model.TableStatsDto;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import lombok.Getter;
-import org.junit.jupiter.api.Test;
-
-/**
- * Tests the {@link FirstFitBinPacker} bucketing logic in isolation via a {@link TestItem} whose
- * weight comes from a {@code "weight"} entry in {@code tableProperties}. The supplier-then-{@code
- * fromOpAndStats} pattern is exercised end-to-end through the public {@code pack} entry point.
- * Projection logic for production BinItems (e.g. {@link TotalFilesBinItem}) is covered by their own
- * tests.
- */
-class FirstFitBinPackerTest {
-
- @Getter
- static class TestItem implements BinItem {
- private final String operationId;
- private final long weight;
-
- public TestItem() {
- this("", 0L);
- }
-
- private TestItem(String operationId, long weight) {
- this.operationId = operationId;
- this.weight = weight;
- }
-
- @Override
- public String getFullyQualifiedTableName() {
- return "db.tbl_" + operationId;
- }
-
- @Override
- public BinItem fromOpAndStats(TableOperationDto op, TableStatsDto stats) {
- long w = Long.parseLong(stats.getTableProperties().get("weight"));
- return new TestItem(op.getId(), w);
- }
- }
-
- private static TableOperationDto op(String id) {
- return TableOperationDto.builder().id(id).tableUuid(id).build();
- }
-
- private static TableStatsDto statsWithWeight(String uuid, long weight) {
- return TableStatsDto.builder()
- .tableUuid(uuid)
- .tableProperties(Map.of("weight", Long.toString(weight)))
- .build();
- }
-
- private static List opsList(String... ids) {
- return java.util.Arrays.stream(ids).map(FirstFitBinPackerTest::op).collect(Collectors.toList());
- }
-
- private static Map statsMap(Object... uuidWeightPairs) {
- Map map = new HashMap<>();
- for (int i = 0; i < uuidWeightPairs.length; i += 2) {
- String uuid = (String) uuidWeightPairs[i];
- long weight = (long) uuidWeightPairs[i + 1];
- map.put(uuid, statsWithWeight(uuid, weight));
- }
- return map;
- }
-
- private static FirstFitBinPacker packer(long maxWeight, int maxItems) {
- return new FirstFitBinPacker<>(TestItem::new, maxWeight, maxItems);
- }
-
- @Test
- void emptyInput_returnsEmptyGroupings() {
- assertThat(packer(100L, 10).pack(List.of(), Map.of())).isEmpty();
- }
-
- @Test
- void singleItem_oneGrouping() {
- List> groupings = packer(1_000_000L, 10).pack(opsList("a"), statsMap("a", 100L));
- assertThat(groupings).hasSize(1);
- assertThat(groupings.get(0)).hasSize(1);
- }
-
- @Test
- void underWeightLimit_oneGrouping() {
- List> groupings =
- packer(1_000_000L, 10)
- .pack(opsList("a", "b", "c"), statsMap("a", 300_000L, "b", 300_000L, "c", 300_000L));
- assertThat(groupings).hasSize(1);
- assertThat(groupings.get(0)).hasSize(3);
- }
-
- @Test
- void overWeightLimit_twoGroupings() {
- List> groupings =
- packer(1_000_000L, 10)
- .pack(opsList("a", "b", "c"), statsMap("a", 600_000L, "b", 600_000L, "c", 400_000L));
- assertThat(groupings).hasSize(2);
- // FFD: sort desc → 600, 600, 400. Place 600 → bin0; next 600 doesn't fit bin0, → bin1; 400
- // fits bin0 (total 1_000_000).
- long b0 = groupings.get(0).stream().mapToLong(BinItem::getWeight).sum();
- long b1 = groupings.get(1).stream().mapToLong(BinItem::getWeight).sum();
- assertThat(b0).isEqualTo(1_000_000L);
- assertThat(b1).isEqualTo(600_000L);
- }
-
- @Test
- void itemLargerThanCap_getsOwnGrouping() {
- List> groupings =
- packer(1_000L, 10).pack(opsList("big"), statsMap("big", 5_000L));
- assertThat(groupings).hasSize(1);
- assertThat(groupings.get(0)).hasSize(1);
- }
-
- @Test
- void sortedDescending_largestFirst() {
- List> groupings =
- packer(2_000_000L, 10)
- .pack(opsList("small", "large"), statsMap("small", 100L, "large", 900_000L));
- assertThat(groupings).hasSize(1);
- List ids =
- groupings.get(0).stream().map(BinItem::getOperationId).collect(Collectors.toList());
- assertThat(ids).containsExactly("large", "small");
- }
-
- @Test
- void maxItemsCap_splitsGroupings() {
- List> groupings =
- packer(1_000_000L, 2)
- .pack(opsList("a", "b", "c", "d"), statsMap("a", 1L, "b", 1L, "c", 1L, "d", 1L));
- assertThat(groupings).hasSize(2);
- assertThat(groupings.get(0)).hasSize(2);
- assertThat(groupings.get(1)).hasSize(2);
- }
-
- @Test
- void operationsWithoutStats_dropped() {
- List> groupings =
- packer(1_000_000L, 10).pack(opsList("a", "missing"), statsMap("a", 100L));
- assertThat(groupings).hasSize(1);
- assertThat(groupings.get(0)).hasSize(1);
- assertThat(groupings.get(0).get(0).getOperationId()).isEqualTo("a");
- }
-}
diff --git a/libs/optimizer/binpack/src/test/java/com/linkedin/openhouse/optimizer/binpack/FirstFitDecreasingBinPackerTest.java b/libs/optimizer/binpack/src/test/java/com/linkedin/openhouse/optimizer/binpack/FirstFitDecreasingBinPackerTest.java
new file mode 100644
index 000000000..e11c23fd9
--- /dev/null
+++ b/libs/optimizer/binpack/src/test/java/com/linkedin/openhouse/optimizer/binpack/FirstFitDecreasingBinPackerTest.java
@@ -0,0 +1,317 @@
+package com.linkedin.openhouse.optimizer.binpack;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import com.linkedin.openhouse.optimizer.model.OperationTypeDto;
+import com.linkedin.openhouse.optimizer.model.TableOperationDto;
+import com.linkedin.openhouse.optimizer.model.TableStatsDto;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests the three callable shapes of {@link FirstFitDecreasingBinPacker}:
+ *
+ *
+ * {@code pack(ops, stats)} — projection path via {@link ProjectingTestItem}'s {@code
+ * fromOpAndStats}.
+ * {@code pack(items)} — pre-projected raw groupings.
+ * {@code pack(items, operationType)} — pre-projected tagged {@link Bin}s.
+ *
+ *
+ * Projection logic for production BinItems (e.g. {@link TotalFilesBinItem}) is covered by their own
+ * tests.
+ */
+class FirstFitDecreasingBinPackerTest {
+
+ private static final OperationTypeDto OFD = OperationTypeDto.ORPHAN_FILES_DELETION;
+
+ // -------------------- Projection path: pack(ops, stats) --------------------
+
+ @Test
+ void emptyInput_returnsEmptyGroupings() {
+ assertThat(projectionPacker(100L, 10).pack(Collections.emptyList(), Collections.emptyMap()))
+ .isEmpty();
+ }
+
+ @Test
+ void singleItem_oneGrouping() {
+ List> groupings =
+ projectionPacker(1_000_000L, 10).pack(opsList("a"), statsMap("a", 100L));
+ assertThat(groupings).hasSize(1);
+ assertThat(groupings.get(0)).hasSize(1);
+ }
+
+ @Test
+ void underWeightLimit_oneGrouping() {
+ List> groupings =
+ projectionPacker(1_000_000L, 10)
+ .pack(opsList("a", "b", "c"), statsMap("a", 300_000L, "b", 300_000L, "c", 300_000L));
+ assertThat(groupings).hasSize(1);
+ assertThat(groupings.get(0)).hasSize(3);
+ }
+
+ @Test
+ void overWeightLimit_twoGroupings() {
+ List> groupings =
+ projectionPacker(1_000_000L, 10)
+ .pack(opsList("a", "b", "c"), statsMap("a", 600_000L, "b", 600_000L, "c", 400_000L));
+ assertThat(groupings).hasSize(2);
+ // FFD: sort desc → 600, 600, 400. Place 600 → bin0; next 600 doesn't fit bin0, → bin1; 400
+ // fits bin0 (total 1_000_000).
+ long b0 = groupings.get(0).stream().mapToLong(BinItem::getWeight).sum();
+ long b1 = groupings.get(1).stream().mapToLong(BinItem::getWeight).sum();
+ assertThat(b0).isEqualTo(1_000_000L);
+ assertThat(b1).isEqualTo(600_000L);
+ }
+
+ @Test
+ void itemLargerThanCap_getsOwnGrouping() {
+ List> groupings =
+ projectionPacker(1_000L, 10).pack(opsList("big"), statsMap("big", 5_000L));
+ assertThat(groupings).hasSize(1);
+ assertThat(groupings.get(0)).hasSize(1);
+ }
+
+ @Test
+ void sortedDescending_largestFirst() {
+ List> groupings =
+ projectionPacker(2_000_000L, 10)
+ .pack(opsList("small", "large"), statsMap("small", 100L, "large", 900_000L));
+ assertThat(groupings).hasSize(1);
+ List ids =
+ groupings.get(0).stream().map(BinItem::getOperationId).collect(Collectors.toList());
+ assertThat(ids).containsExactly("large", "small");
+ }
+
+ @Test
+ void maxItemsCap_splitsGroupings() {
+ List> groupings =
+ projectionPacker(1_000_000L, 2)
+ .pack(opsList("a", "b", "c", "d"), statsMap("a", 1L, "b", 1L, "c", 1L, "d", 1L));
+ assertThat(groupings).hasSize(2);
+ assertThat(groupings.get(0)).hasSize(2);
+ assertThat(groupings.get(1)).hasSize(2);
+ }
+
+ @Test
+ void operationsWithoutStats_dropped() {
+ List> groupings =
+ projectionPacker(1_000_000L, 10).pack(opsList("a", "missing"), statsMap("a", 100L));
+ assertThat(groupings).hasSize(1);
+ assertThat(groupings.get(0)).hasSize(1);
+ assertThat(groupings.get(0).get(0).getOperationId()).isEqualTo("a");
+ }
+
+ // -------------------- Pre-projected path: pack(items) and pack(items, OFD) --------------------
+
+ @Test
+ void preProjected_emptyInput_returnsEmpty() {
+ assertThat(preProjectedPacker(100L, 50).pack(Collections.emptyList(), OFD)).isEmpty();
+ assertThat(preProjectedPacker(100L, 50).pack((List) null, OFD)).isEmpty();
+ }
+
+ @Test
+ void preProjected_itemsSortDescendingBeforePlacing() {
+ List items =
+ Arrays.asList(item("db.t_small", 10), item("db.t_big", 100), item("db.t_mid", 50));
+
+ List bins = preProjectedPacker(1000L, 50).pack(items, OFD);
+
+ assertThat(bins).hasSize(1);
+ Bin only = bins.get(0);
+ assertThat(only.getOperationType()).isEqualTo(OFD);
+ assertThat(only.getItems())
+ .extracting(BinItem::getFullyQualifiedTableName)
+ .containsExactly("db.t_big", "db.t_mid", "db.t_small");
+ }
+
+ @Test
+ void preProjected_weightCapForcesMultipleBins() {
+ List items =
+ Arrays.asList(item("db.a", 60), item("db.b", 50), item("db.c", 40), item("db.d", 30));
+
+ List bins = preProjectedPacker(100L, 50).pack(items, OFD);
+
+ // FFD on [60, 50, 40, 30] with weight cap 100:
+ // bin0: 60 → remaining 40
+ // bin0 tries 50 → doesn't fit, new bin1: 50
+ // bin0 tries 40 → fits, bin0: 60 + 40 = 100
+ // bin1 tries 30 → fits, bin1: 50 + 30 = 80
+ assertThat(bins).hasSize(2);
+ assertThat(bins.get(0).getItems().stream().mapToLong(BinItem::getWeight).sum()).isEqualTo(100L);
+ assertThat(bins.get(1).getItems().stream().mapToLong(BinItem::getWeight).sum()).isEqualTo(80L);
+ }
+
+ @Test
+ void preProjected_maxItemsPerBinCapHonored() {
+ List items =
+ IntStream.range(0, 5).mapToObj(i -> item("db.t" + i, 1)).collect(Collectors.toList());
+
+ List bins = preProjectedPacker(1000L, 2).pack(items, OFD);
+
+ assertThat(bins).hasSize(3);
+ assertThat(bins.get(0).getItems()).hasSize(2);
+ assertThat(bins.get(1).getItems()).hasSize(2);
+ assertThat(bins.get(2).getItems()).hasSize(1);
+ }
+
+ @Test
+ void preProjected_oversizedItemGetsItsOwnBin() {
+ List items =
+ Arrays.asList(item("db.tiny1", 10), item("db.giant", 500), item("db.tiny2", 10));
+
+ List bins = preProjectedPacker(100L, 50).pack(items, OFD);
+
+ long total =
+ bins.stream().flatMap(b -> b.getItems().stream()).mapToLong(BinItem::getWeight).sum();
+ assertThat(total).isEqualTo(520L);
+ boolean giantPresent =
+ bins.stream()
+ .flatMap(b -> b.getItems().stream())
+ .anyMatch(i -> i.getFullyQualifiedTableName().equals("db.giant"));
+ assertThat(giantPresent).isTrue();
+ }
+
+ @Test
+ void preProjected_operationTypeIsPropagatedToEachBin() {
+ List items = Arrays.asList(item("db.a", 1), item("db.b", 1), item("db.c", 1));
+ List bins = preProjectedPacker(1000L, 2).pack(items, OFD);
+ assertThat(bins).hasSize(2);
+ assertThat(bins).allMatch(b -> b.getOperationType().equals(OFD));
+ }
+
+ @Test
+ void preProjected_rawGroupingsOverload_omitsOperationType() {
+ List items = Arrays.asList(item("db.a", 60), item("db.b", 50));
+ List> groupings = preProjectedPacker(100L, 50).pack(items);
+ assertThat(groupings).hasSize(2);
+ assertThat(groupings.get(0)).hasSize(1);
+ assertThat(groupings.get(1)).hasSize(1);
+ }
+
+ // -------------------- Builder / contract checks --------------------
+
+ @Test
+ void omittingBinItemSupplier_rejectsProjectionPath() {
+ FirstFitDecreasingBinPacker packer =
+ FirstFitDecreasingBinPacker.builder()
+ .maxWeightPerBin(100L)
+ .maxItemsPerBin(10)
+ .build();
+ assertThatThrownBy(() -> packer.pack(Collections.emptyList(), Collections.emptyMap()))
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining("binItemSupplier");
+ }
+
+ @Test
+ void builderDefaults_appliedWhenCapsOmitted() {
+ // The defaults are 1_000_000 weight / 50 items — sized for OFD. With 60 items of weight 100,
+ // the 50-items cap kicks in before the weight cap.
+ List items =
+ IntStream.range(0, 60).mapToObj(i -> item("db.t" + i, 100)).collect(Collectors.toList());
+
+ List bins = FirstFitDecreasingBinPacker.builder().build().pack(items, OFD);
+
+ assertThat(bins).hasSize(2);
+ assertThat(bins.get(0).getItems()).hasSize(50);
+ assertThat(bins.get(1).getItems()).hasSize(10);
+ }
+
+ // -------------------- Fixtures --------------------
+
+ private static FirstFitDecreasingBinPacker projectionPacker(
+ long maxWeight, int maxItems) {
+ return FirstFitDecreasingBinPacker.builder()
+ .binItemSupplier(ProjectingTestItem::new)
+ .maxWeightPerBin(maxWeight)
+ .maxItemsPerBin(maxItems)
+ .build();
+ }
+
+ private static FirstFitDecreasingBinPacker preProjectedPacker(
+ long maxWeight, int maxItems) {
+ return FirstFitDecreasingBinPacker.builder()
+ .maxWeightPerBin(maxWeight)
+ .maxItemsPerBin(maxItems)
+ .build();
+ }
+
+ private static TableOperationDto op(String id) {
+ return TableOperationDto.builder().id(id).tableUuid(id).build();
+ }
+
+ private static TableStatsDto statsWithWeight(String uuid, long weight) {
+ return TableStatsDto.builder()
+ .tableUuid(uuid)
+ .tableProperties(Map.of("weight", Long.toString(weight)))
+ .build();
+ }
+
+ private static List opsList(String... ids) {
+ return Arrays.stream(ids).map(FirstFitDecreasingBinPackerTest::op).collect(Collectors.toList());
+ }
+
+ private static Map statsMap(Object... uuidWeightPairs) {
+ Map map = new HashMap<>();
+ for (int i = 0; i < uuidWeightPairs.length; i += 2) {
+ String uuid = (String) uuidWeightPairs[i];
+ long weight = (long) uuidWeightPairs[i + 1];
+ map.put(uuid, statsWithWeight(uuid, weight));
+ }
+ return map;
+ }
+
+ private static BinItem item(String fqtn, long weight) {
+ return new PreProjectedTestItem(fqtn, "op-" + fqtn, weight);
+ }
+
+ /** Exercises the {@code fromOpAndStats} projection callback. */
+ @Getter
+ static class ProjectingTestItem implements BinItem {
+ private final String operationId;
+ private final long weight;
+
+ public ProjectingTestItem() {
+ this("", 0L);
+ }
+
+ private ProjectingTestItem(String operationId, long weight) {
+ this.operationId = operationId;
+ this.weight = weight;
+ }
+
+ @Override
+ public String getFullyQualifiedTableName() {
+ return "db.tbl_" + operationId;
+ }
+
+ @Override
+ public BinItem fromOpAndStats(TableOperationDto op, TableStatsDto stats) {
+ long w = Long.parseLong(stats.getTableProperties().get("weight"));
+ return new ProjectingTestItem(op.getId(), w);
+ }
+ }
+
+ /** Pre-projected: packer treats items as opaque so {@code fromOpAndStats} is a no-op. */
+ @AllArgsConstructor
+ @Getter
+ static class PreProjectedTestItem implements BinItem {
+ private final String fullyQualifiedTableName;
+ private final String operationId;
+ private final long weight;
+
+ @Override
+ public BinItem fromOpAndStats(TableOperationDto op, TableStatsDto stats) {
+ return this;
+ }
+ }
+}
diff --git a/services/optimizer/build.gradle b/services/optimizer/build.gradle
index c05c7f9c3..f5875b734 100644
--- a/services/optimizer/build.gradle
+++ b/services/optimizer/build.gradle
@@ -1,6 +1,27 @@
plugins {
id 'openhouse.springboot-ext-conventions'
id 'org.springframework.boot' version '2.7.8'
+ // OpenAPI spec generation for :client:optimizerclient.
+ id 'com.github.johnrengelman.processes' version '0.5.0'
+ id 'org.springdoc.openapi-gradle-plugin' version '1.6.0'
+ id 'openhouse.service-specgen-convention'
+}
+
+// Port allocation: tables 8000, housetables 8001, jobs 8002, optimizer 8003.
+// H2 overrides let the Spring context finish startup so springdoc can extract the spec.
+openApi {
+ customBootRun {
+ args = [
+ "--server.port=8003",
+ "--spring.datasource.url=jdbc:h2:mem:codegen;MODE=MYSQL;DB_CLOSE_DELAY=-1",
+ "--spring.datasource.driver-class-name=org.h2.Driver",
+ "--spring.datasource.username=sa",
+ "--spring.datasource.password=",
+ "--spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.H2Dialect",
+ "--spring.sql.init.mode=never"
+ ]
+ }
+ apiDocsUrl.set("http://localhost:8003/v3/api-docs")
}
dependencies {
@@ -8,7 +29,10 @@ dependencies {
implementation 'com.vladmihalcea:hibernate-types-55:2.21.1'
implementation 'org.springframework.boot:spring-boot-starter-web:2.7.8'
implementation 'mysql:mysql-connector-java:8.+'
- testImplementation 'com.h2database:h2:2.2.224'
+ // H2 only for spec-gen bootRun; excluded from bootJar. Pinned to 2.1.210 (matches
+ // :iceberg:openhouse:htscatalog) — 2.2.x switches to slf4j-api 2.x and breaks downstream bridges.
+ developmentOnly 'com.h2database:h2:2.1.210'
+ testImplementation 'com.h2database:h2:2.1.210'
testImplementation 'org.springframework.boot:spring-boot-starter-test:2.7.8'
}
diff --git a/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/config/SchedulerConfig.java b/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/config/SchedulerConfig.java
index 82af33275..cc7244115 100644
--- a/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/config/SchedulerConfig.java
+++ b/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/config/SchedulerConfig.java
@@ -1,6 +1,6 @@
package com.linkedin.openhouse.optimizer.scheduler.config;
-import com.linkedin.openhouse.optimizer.binpack.FirstFitBinPacker;
+import com.linkedin.openhouse.optimizer.binpack.FirstFitDecreasingBinPacker;
import com.linkedin.openhouse.optimizer.binpack.TotalFilesBinItem;
import com.linkedin.openhouse.optimizer.model.OperationTypeDto;
import com.linkedin.openhouse.optimizer.repository.TableOperationsRepository;
@@ -37,9 +37,9 @@ public JobsServiceClient jobsServiceClient(WebClient jobsWebClient) {
}
/**
- * Orphan files deletion: a {@link FirstFitBinPacker} over {@link TotalFilesBinItem}. Cost scales
- * with file count — per-file list, manifest joins, and delete calls dominate independent of file
- * size.
+ * Orphan files deletion: a {@link FirstFitDecreasingBinPacker} over {@link TotalFilesBinItem}.
+ * Cost scales with file count — per-file list, manifest joins, and delete calls dominate
+ * independent of file size.
*/
@Bean
public SchedulerRunner schedulerRunner(
@@ -52,6 +52,10 @@ public SchedulerRunner schedulerRunner(
return new SchedulerRunner(operationsRepo, statsRepo, jobsClient, resultsEndpoint)
.registerOperation(
OperationTypeDto.ORPHAN_FILES_DELETION,
- new FirstFitBinPacker<>(TotalFilesBinItem::new, ofdMaxFilesPerBin, ofdMaxTablesPerBin));
+ FirstFitDecreasingBinPacker.builder()
+ .binItemSupplier(TotalFilesBinItem::new)
+ .maxWeightPerBin(ofdMaxFilesPerBin)
+ .maxItemsPerBin(ofdMaxTablesPerBin)
+ .build());
}
}
diff --git a/services/optimizer/scheduler/src/test/java/com/linkedin/openhouse/optimizer/scheduler/SchedulerRunnerTest.java b/services/optimizer/scheduler/src/test/java/com/linkedin/openhouse/optimizer/scheduler/SchedulerRunnerTest.java
index 92df3287d..45f342ec7 100644
--- a/services/optimizer/scheduler/src/test/java/com/linkedin/openhouse/optimizer/scheduler/SchedulerRunnerTest.java
+++ b/services/optimizer/scheduler/src/test/java/com/linkedin/openhouse/optimizer/scheduler/SchedulerRunnerTest.java
@@ -10,7 +10,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import com.linkedin.openhouse.optimizer.binpack.FirstFitBinPacker;
+import com.linkedin.openhouse.optimizer.binpack.FirstFitDecreasingBinPacker;
import com.linkedin.openhouse.optimizer.binpack.TotalFilesBinItem;
import com.linkedin.openhouse.optimizer.db.OperationStatus;
import com.linkedin.openhouse.optimizer.db.SnapshotMetrics;
@@ -49,7 +49,11 @@ void setUp() {
new SchedulerRunner(operationsRepo, statsRepo, jobsClient, RESULTS_ENDPOINT)
.registerOperation(
ORPHAN_FILES_DELETION,
- new FirstFitBinPacker<>(TotalFilesBinItem::new, 1_000_000L, 50));
+ FirstFitDecreasingBinPacker.builder()
+ .binItemSupplier(TotalFilesBinItem::new)
+ .maxWeightPerBin(1_000_000L)
+ .maxItemsPerBin(50)
+ .build());
}
// ---- Stubbing helpers ----
diff --git a/settings.gradle b/settings.gradle
index ebfce9060..9acc07d67 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -29,6 +29,7 @@ include ':client:common'
include ':client:hts'
include ':client:jobsclient'
include ':client:tableclient'
+include ':client:optimizerclient'
include ':client:secureclient'
include ':integrations:java:iceberg-1.2:openhouse-java-runtime'