diff --git a/apps/spark/build.gradle b/apps/spark/build.gradle index a3f981b98..6caac9459 100644 --- a/apps/spark/build.gradle +++ b/apps/spark/build.gradle @@ -25,6 +25,10 @@ dependencies { implementation "org.apache.logging.log4j:log4j-slf4j-impl:${log4jVersion}" implementation project(':libs:datalayout') + // Exclude log4j-slf4j2-impl: incompatible with the log4j-slf4j-impl (1.x) bridge this app ships. + implementation(project(':libs:optimizer:binpack')) { + exclude group: 'org.apache.logging.log4j', module: 'log4j-slf4j2-impl' + } implementation ('org.apache.spark:spark-core_2.12:' + sparkVersion) { exclude group: 'io.netty' exclude group: 'org.apache.hadoop', module: 'hadoop-common' diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/BatchedOrphanFilesDeletionSparkApp.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/BatchedOrphanFilesDeletionSparkApp.java new file mode 100644 index 000000000..ea577710f --- /dev/null +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/BatchedOrphanFilesDeletionSparkApp.java @@ -0,0 +1,454 @@ +package com.linkedin.openhouse.jobs.spark; + +import com.google.common.collect.Iterables; +import com.linkedin.openhouse.common.metrics.DefaultOtelConfig; +import com.linkedin.openhouse.common.metrics.OtelEmitter; +import com.linkedin.openhouse.jobs.exception.TableValidationException; +import com.linkedin.openhouse.jobs.spark.optimizer.OptimizerServiceClient; +import com.linkedin.openhouse.jobs.spark.state.StateManager; +import com.linkedin.openhouse.jobs.util.AppConstants; +import com.linkedin.openhouse.jobs.util.AppsOtelEmitter; +import com.linkedin.openhouse.jobs.util.TableStateValidator; +import com.linkedin.openhouse.optimizer.client.model.UpdateOperationRequest; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import java.time.Duration; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.lang3.math.NumberUtils; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.DeleteOrphanFiles; + +/** + * Batched orphan-files-deletion Spark app. One Spark job processes a list of {@code (table, + * operationId)} pairs that the optimizer scheduler bin-packed into a single batch. Each table is + * handled by a worker thread; per-table failures are caught and reported back independently — the + * job continues for the remaining tables and exits 0 if at least one table succeeds. + * + *

This is the multi-table counterpart of {@link OrphanFilesDeletionSparkApp}. The single-table + * app remains the deployment unit when bin size is 1, and stays the canonical reference for the + * actual deletion logic. + * + *

Example invocation: + * + *

{@code
+ * com.linkedin.openhouse.jobs.spark.BatchedOrphanFilesDeletionSparkApp \
+ *   --tableNames db.t1,db.t2,db.t3 \
+ *   --operationIds op-uuid-1,op-uuid-2,op-uuid-3 \
+ *   --tableUuids tab-uuid-1,tab-uuid-2,tab-uuid-3 \
+ *   --resultsEndpoint http://optimizer.svc:8080 \
+ *   --driverParallelism 4
+ * }
+ */ +@Slf4j +public class BatchedOrphanFilesDeletionSparkApp extends BaseSparkApp { + + private static final int DEFAULT_MAX_ORPHAN_FILE_SAMPLE_SIZE = 20000; + private static final int DEFAULT_MIN_OFD_TTL_IN_DAYS = 3; + + private final List entries; + private final String resultsEndpoint; + private final int driverParallelism; + private final long ttlSeconds; + private final String backupDir; + private final int concurrentDeletes; + private final boolean streamResults; + private final int maxOrphanFileSampleSize; + + public BatchedOrphanFilesDeletionSparkApp( + String jobId, + StateManager stateManager, + OtelEmitter otelEmitter, + List entries, + String resultsEndpoint, + int driverParallelism, + long ttlSeconds, + String backupDir, + int concurrentDeletes, + boolean streamResults, + int maxOrphanFileSampleSize) { + super(jobId, stateManager, otelEmitter); + this.entries = entries; + this.resultsEndpoint = resultsEndpoint; + this.driverParallelism = Math.max(1, driverParallelism); + this.ttlSeconds = ttlSeconds; + this.backupDir = backupDir; + this.concurrentDeletes = concurrentDeletes; + this.streamResults = streamResults; + this.maxOrphanFileSampleSize = maxOrphanFileSampleSize; + } + + @Override + protected void runInner(Operations ops) { + log.info( + "Batched OFD start: entries={} driverParallelism={} resultsEndpoint={}", + entries.size(), + driverParallelism, + resultsEndpoint); + + if (entries.isEmpty()) { + log.warn("Batched OFD invoked with no entries; nothing to do"); + return; + } + + OptimizerServiceClient client = newOptimizerClient(); + int successCount = runBatch(ops, client); + + int failureCount = entries.size() - successCount; + log.info( + "Batched OFD finished: total={} success={} failed={}", + entries.size(), + successCount, + failureCount); + + if (successCount == 0) { + throw new RuntimeException( + String.format("All %d operations in batch failed", entries.size())); + } + } + + private int runBatch(Operations ops, OptimizerServiceClient client) { + ExecutorService pool = Executors.newFixedThreadPool(driverParallelism); + try { + // Two-phase pipeline: submit every worker first (so they run concurrently), then await each. + // Pairing each Future with its BatchEntry via AbstractMap.SimpleImmutableEntry. + List>> submissions = + entries.stream() + .map( + entry -> + new AbstractMap.SimpleImmutableEntry<>( + entry, pool.submit(new TableWorker(ops, entry, client)))) + .collect(Collectors.toList()); + return submissions.stream() + .mapToInt(submission -> awaitOne(submission.getKey(), submission.getValue(), client)) + .sum(); + } finally { + shutdownPool(pool); + } + } + + private int awaitOne(BatchEntry entry, Future future, OptimizerServiceClient client) { + try { + return Boolean.TRUE.equals(future.get()) ? 1 : 0; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.error("Worker interrupted: fqtn={}", entry.getFqtn(), e); + otelEmitter.count( + METRICS_SCOPE, + "optimizer_batch_interrupted", + 1, + Attributes.of(AttributeKey.stringKey(AppConstants.TABLE_NAME), entry.getFqtn())); + return 0; + } catch (ExecutionException e) { + // The worker catches Throwable internally and always reports its own result, so reaching + // here means the worker itself leaked an exception. Be defensive: post FAILED so the + // operation row doesn't sit SCHEDULED until the stale-timeout. + log.error( + "Worker threw outside its own catch for fqtn={} — reporting FAILED", + entry.getFqtn(), + e.getCause()); + reportResult(entry, UpdateOperationRequest.StatusEnum.FAILED, client); + return 0; + } + } + + private void shutdownPool(ExecutorService pool) { + pool.shutdown(); + try { + if (!pool.awaitTermination(30, TimeUnit.SECONDS)) { + pool.shutdownNow(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + pool.shutdownNow(); + } + } + + protected OptimizerServiceClient newOptimizerClient() { + return new OptimizerServiceClient(resultsEndpoint); + } + + /** + * POST the per-operation outcome to the Optimizer Service via the generated client. The wrapper + * returns {@link java.util.Optional#empty()} when the call exhausts retries — we log + count and + * leave the operation row at SCHEDULED so the Analyzer's stale-timeout can re-queue it. + * + *

{@code status} is passed in as a {@link UpdateOperationRequest.StatusEnum} rather than a + * boolean so the caller's intent is unambiguous and new terminal states (e.g. CANCELED) can be + * plumbed in without changing the signature. + */ + private void reportResult( + BatchEntry entry, UpdateOperationRequest.StatusEnum status, OptimizerServiceClient client) { + UpdateOperationRequest body = + new UpdateOperationRequest() + .operationId(entry.getOperationId()) + .status(status) + .tableUuid(entry.getTableUuid()) + .databaseName(entry.getDatabaseName()) + .tableName(entry.getTableName()) + .operationType(UpdateOperationRequest.OperationTypeEnum.ORPHAN_FILES_DELETION); + if (!client.updateOperation(entry.getOperationId(), body).isPresent()) { + log.error( + "Failed to report operation result after retries; row will stay SCHEDULED until stale-timeout: operationId={} fqtn={}", + entry.getOperationId(), + entry.getFqtn()); + otelEmitter.count( + METRICS_SCOPE, + "optimizer_update_failed", + 1, + Attributes.of(AttributeKey.stringKey(AppConstants.TABLE_NAME), entry.getFqtn())); + } + } + + /** One unit of work in a batched OFD job. */ + private final class TableWorker implements Callable { + private final Operations ops; + private final BatchEntry entry; + private final OptimizerServiceClient client; + + TableWorker(Operations ops, BatchEntry entry, OptimizerServiceClient client) { + this.ops = ops; + this.entry = entry; + this.client = client; + } + + @Override + public Boolean call() { + String fqtn = entry.getFqtn(); + UpdateOperationRequest.StatusEnum status = UpdateOperationRequest.StatusEnum.FAILED; + try { + log.info("OFD start: fqtn={} operationId={}", fqtn, entry.getOperationId()); + Table table = ops.getTable(fqtn); + long olderThanTimestampMillis = + System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(resolveTtlSeconds(table)); + DeleteOrphanFiles.Result result = + ops.deleteOrphanFiles( + table, + olderThanTimestampMillis, + Boolean.parseBoolean( + table.properties().getOrDefault(AppConstants.BACKUP_ENABLED_KEY, "false")), + backupDir, + concurrentDeletes, + streamResults, + maxOrphanFileSampleSize); + // Count via iteration rather than materializing the full path list: a table with millions + // of orphan files would otherwise OOM the driver, and that risk multiplies with + // driverParallelism workers running concurrently. + int orphanCount = Iterables.size(result.orphanFileLocations()); + otelEmitter.count( + METRICS_SCOPE, + AppConstants.ORPHAN_FILE_COUNT, + orphanCount, + Attributes.of(AttributeKey.stringKey(AppConstants.TABLE_NAME), fqtn)); + validate(fqtn); + status = UpdateOperationRequest.StatusEnum.SUCCESS; + log.info("OFD success: fqtn={} orphansDetected={}", fqtn, orphanCount); + } catch (Throwable t) { + log.error("OFD failed: fqtn={} operationId={}", fqtn, entry.getOperationId(), t); + } finally { + // Defensive: reportResult must not throw out of the finally block, since that would mask + // the original failure and propagate up to awaitOne, which would then report FAILED again. + try { + reportResult(entry, status, client); + } catch (Throwable t) { + log.error( + "reportResult itself threw; operation row will stay SCHEDULED until stale-timeout: fqtn={}", + fqtn, + t); + } + } + return status == UpdateOperationRequest.StatusEnum.SUCCESS; + } + + /** + * Re-runs {@link TableStateValidator} — the same post-job consistency check the single-table + * {@link OrphanFilesDeletionSparkApp} uses — to confirm the table's manifests and metadata are + * intact after deletion. A failure here is treated as a failed operation: it's logged, counted, + * and re-thrown so the outer {@link #call()} marks {@code success=false}. + */ + private void validate(String fqtn) { + try { + TableStateValidator.run(ops.spark(), fqtn); + } catch (TableValidationException e) { + log.error("Post-job validation failed: fqtn={}", fqtn, e); + otelEmitter.count( + METRICS_SCOPE, + "post_run_validation_error", + 1, + Attributes.of( + AttributeKey.stringKey(AppConstants.TABLE_NAME), + fqtn, + AttributeKey.stringKey(AppConstants.JOB_NAME), + BatchedOrphanFilesDeletionSparkApp.class.getSimpleName())); + throw e; + } + } + + private long resolveTtlSeconds(Table table) { + long resolved = ttlSeconds; + if (Boolean.parseBoolean( + table.properties().getOrDefault(AppConstants.OFD_ONE_DAY_TTL_ENABLED_KEY, "false"))) { + resolved = TimeUnit.DAYS.toSeconds(1); + } + String tableType = + table + .properties() + .getOrDefault(AppConstants.OPENHOUSE_TABLE_TYPE_KEY, AppConstants.TABLE_TYPE_PRIMARY); + if (AppConstants.TABLE_TYPE_REPLICA.equals(tableType) + && Duration.ofSeconds(resolved).toDays() < DEFAULT_MIN_OFD_TTL_IN_DAYS) { + resolved = TimeUnit.DAYS.toSeconds(DEFAULT_MIN_OFD_TTL_IN_DAYS); + } + return resolved; + } + } + + /** Per-table inputs for one operation row inside a bin. */ + @lombok.AllArgsConstructor + @lombok.Builder + @lombok.Getter + @lombok.ToString + public static class BatchEntry { + private final String fqtn; + private final String operationId; + private final String tableUuid; + private final String databaseName; + private final String tableName; + } + + public static void main(String[] args) { + OtelEmitter otelEmitter = + new AppsOtelEmitter(Collections.singletonList(DefaultOtelConfig.getOpenTelemetry())); + createApp(args, otelEmitter).run(); + } + + public static BatchedOrphanFilesDeletionSparkApp createApp( + String[] args, OtelEmitter otelEmitter) { + List

The batched Spark app calls {@link #updateOperation(String, UpdateOperationRequest)} once per + * finished operation to record SUCCESS or FAILED. Per the design, a missed update is recoverable — + * the operation row stays {@code SCHEDULED} and the Analyzer's stale-timeout will re-queue it — so + * this client surfaces but does not swallow failures. + */ +@Slf4j +public class OptimizerServiceClient { + + private static final int REQUEST_TIMEOUT_SECONDS = 5; + + private final RetryTemplate retryTemplate; + private final TableOperationsControllerApi api; + + public OptimizerServiceClient(String baseUrl) { + this(baseUrl, null); + } + + public OptimizerServiceClient(String baseUrl, String truststoreLocation) { + this(RetryUtil.getOptimizerApiRetryTemplate(), buildApi(baseUrl, truststoreLocation)); + } + + OptimizerServiceClient(RetryTemplate retryTemplate, TableOperationsControllerApi api) { + this.retryTemplate = retryTemplate; + this.api = api; + } + + /** + * Reports a terminal status for {@code operationId}. The path id and the body's {@code + * operationId} must match — callers should set both via the same value. + * + * @return the created history record, or {@link Optional#empty()} if the call failed after + * retries (logged; the caller decides what to do). + */ + public Optional updateOperation( + String operationId, UpdateOperationRequest request) { + Objects.requireNonNull(operationId, "operationId"); + Objects.requireNonNull(request, "request"); + return Optional.ofNullable( + RetryUtil.executeWithRetry( + retryTemplate, + (RetryCallback) + context -> + api.updateOperation(operationId, request) + .block(Duration.ofSeconds(REQUEST_TIMEOUT_SECONDS)), + null)); + } + + private static TableOperationsControllerApi buildApi(String baseUrl, String truststoreLocation) { + Objects.requireNonNull(baseUrl, "baseUrl"); + try { + ApiClient apiClient = + OptimizerApiClientFactory.getInstance() + .createApiClient(baseUrl, null, truststoreLocation); + return new TableOperationsControllerApi(apiClient); + } catch (MalformedURLException | SSLException e) { + throw new RuntimeException( + "Failed to construct optimizer service client for baseUrl=" + baseUrl, e); + } + } +} diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/RetryUtil.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/RetryUtil.java index 29a25e962..357b09eaf 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/RetryUtil.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/RetryUtil.java @@ -51,6 +51,15 @@ public static RetryTemplate getTablesApiRetryTemplate() { .build(); } + public static RetryTemplate getOptimizerApiRetryTemplate() { + RetryTemplateBuilder builder = new RetryTemplateBuilder(); + return builder + .maxAttempts(5) + .customBackoff(DEFAULT_JOBS_BACKOFF_POLICY) + .retryOn(WebClientResponseException.InternalServerError.class) + .build(); + } + public static RetryTemplate getTrinoClientRetryTemplate() { RetryTemplateBuilder builder = new RetryTemplateBuilder(); return builder diff --git a/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/BatchedOrphanFilesDeletionSparkAppArgsTest.java b/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/BatchedOrphanFilesDeletionSparkAppArgsTest.java new file mode 100644 index 000000000..7a32e503f --- /dev/null +++ b/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/BatchedOrphanFilesDeletionSparkAppArgsTest.java @@ -0,0 +1,74 @@ +package com.linkedin.openhouse.jobs.spark; + +import java.util.List; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +/** + * Pure-Java unit tests for {@link BatchedOrphanFilesDeletionSparkApp#buildEntries}. No Spark + * session, no HTTP — exercises the CLI-parsing edges that decide whether the app can even start. + */ +public class BatchedOrphanFilesDeletionSparkAppArgsTest { + + @Test + public void buildEntriesParsesParallelLists() { + List entries = + BatchedOrphanFilesDeletionSparkApp.buildEntries( + "db1.t1,db2.t2", "op-1,op-2", "uuid-1,uuid-2"); + + Assertions.assertEquals(2, entries.size()); + Assertions.assertEquals("db1.t1", entries.get(0).getFqtn()); + Assertions.assertEquals("db1", entries.get(0).getDatabaseName()); + Assertions.assertEquals("t1", entries.get(0).getTableName()); + Assertions.assertEquals("op-1", entries.get(0).getOperationId()); + Assertions.assertEquals("uuid-1", entries.get(0).getTableUuid()); + Assertions.assertEquals("db2.t2", entries.get(1).getFqtn()); + Assertions.assertEquals("op-2", entries.get(1).getOperationId()); + } + + @Test + public void buildEntriesTrimsWhitespaceInEachEntry() { + List entries = + BatchedOrphanFilesDeletionSparkApp.buildEntries( + " db1.t1 , db2.t2 ", " op-1 , op-2 ", " uuid-1 , uuid-2 "); + + Assertions.assertEquals("db1.t1", entries.get(0).getFqtn()); + Assertions.assertEquals("op-1", entries.get(0).getOperationId()); + Assertions.assertEquals("uuid-1", entries.get(0).getTableUuid()); + } + + @Test + public void buildEntriesRejectsMismatchedLengths() { + Assertions.assertThrows( + IllegalArgumentException.class, + () -> + BatchedOrphanFilesDeletionSparkApp.buildEntries("db.a,db.b", "op-1", "uuid-1,uuid-2")); + } + + @Test + public void buildEntriesRejectsNullArguments() { + Assertions.assertThrows( + IllegalArgumentException.class, + () -> BatchedOrphanFilesDeletionSparkApp.buildEntries(null, "op-1", "uuid-1")); + Assertions.assertThrows( + IllegalArgumentException.class, + () -> BatchedOrphanFilesDeletionSparkApp.buildEntries("db.a", null, "uuid-1")); + Assertions.assertThrows( + IllegalArgumentException.class, + () -> BatchedOrphanFilesDeletionSparkApp.buildEntries("db.a", "op-1", null)); + } + + @Test + public void buildEntriesRejectsEmptyStrings() { + Assertions.assertThrows( + IllegalArgumentException.class, + () -> BatchedOrphanFilesDeletionSparkApp.buildEntries("", "op-1", "uuid-1")); + } + + @Test + public void buildEntriesRejectsNonFqtn() { + Assertions.assertThrows( + IllegalArgumentException.class, + () -> BatchedOrphanFilesDeletionSparkApp.buildEntries("just_a_table", "op-1", "uuid-1")); + } +} diff --git a/client/optimizerclient/build.gradle b/client/optimizerclient/build.gradle new file mode 100644 index 000000000..1a3a0ce17 --- /dev/null +++ b/client/optimizerclient/build.gradle @@ -0,0 +1,11 @@ +plugins { + id 'openhouse.java-minimal-conventions' + id 'openhouse.client-codegen-convention' + id 'openhouse.maven-publish' +} + +ext { + codeGenForService = ":services:optimizer" +} + +apply from: "${project(':client:common').file("codegen.build.gradle")}" diff --git a/client/secureclient/build.gradle b/client/secureclient/build.gradle index 93b0f256f..e31146fe4 100644 --- a/client/secureclient/build.gradle +++ b/client/secureclient/build.gradle @@ -8,6 +8,7 @@ plugins { dependencies { api project(':client:tableclient') api project(':client:jobsclient') + api project(':client:optimizerclient') api project(':client:hts') api 'org.springframework.boot:spring-boot-starter-webflux:2.7.8' testImplementation 'io.netty:netty-resolver-dns-native-macos:4.1.70.Final:osx-x86_64' diff --git a/client/secureclient/src/main/java/com/linkedin/openhouse/client/ssl/OptimizerApiClientFactory.java b/client/secureclient/src/main/java/com/linkedin/openhouse/client/ssl/OptimizerApiClientFactory.java new file mode 100644 index 000000000..2594ce408 --- /dev/null +++ b/client/secureclient/src/main/java/com/linkedin/openhouse/client/ssl/OptimizerApiClientFactory.java @@ -0,0 +1,49 @@ +package com.linkedin.openhouse.client.ssl; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.linkedin.openhouse.optimizer.client.invoker.ApiClient; +import java.net.MalformedURLException; +import java.text.DateFormat; +import javax.net.ssl.SSLException; +import lombok.NonNull; +import org.springframework.web.reactive.function.client.WebClient; + +/** Factory to create optimizer-specific {@link ApiClient}. Mirrors {@link JobsApiClientFactory}. */ +public final class OptimizerApiClientFactory extends WebClientFactory { + + private static OptimizerApiClientFactory instance; + + private OptimizerApiClientFactory() { + super(); + } + + public static synchronized OptimizerApiClientFactory getInstance() { + if (null == instance) { + instance = new OptimizerApiClientFactory(); + } + return instance; + } + + @Override + protected WebClient.Builder createWebClientBuilder() { + DateFormat defaultDateFormat = ApiClient.createDefaultDateFormat(); + ObjectMapper defaultObjectMapper = + ApiClient.createDefaultObjectMapper(defaultDateFormat) + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + return ApiClient.buildWebClientBuilder(defaultObjectMapper); + } + + /** + * Creates the optimizer-specific {@link ApiClient} that the generated {@code + * TableOperationsControllerApi} / {@code TableStatsControllerApi} / {@code + * TableOperationsHistoryControllerApi} wrap. + */ + public ApiClient createApiClient(@NonNull String baseUrl, String token, String truststoreLocation) + throws MalformedURLException, SSLException { + WebClient webClient = createWebClient(baseUrl, token, truststoreLocation); + ApiClient apiClient = new ApiClient(webClient); + apiClient.setBasePath(baseUrl); + return apiClient; + } +} diff --git a/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/BinItem.java b/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/BinItem.java index 6736ea5dc..1e7b5968a 100644 --- a/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/BinItem.java +++ b/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/BinItem.java @@ -8,7 +8,7 @@ * when it launches a Spark job (fully-qualified table name, operation id). * *

Implementations have a public no-arg constructor — instantiated transiently inside {@link - * FirstFitBinPacker#pack} via a {@code Supplier} (typically a {@code + * FirstFitDecreasingBinPacker#pack} via a {@code Supplier} (typically a {@code * MyItem::new} method reference) — on which {@link #fromOpAndStats} is called to return the * populated item. Getters on the empty instance are not meaningful; it exists for the lifetime of a * single projection call. diff --git a/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/FirstFitBinPacker.java b/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/FirstFitBinPacker.java deleted file mode 100644 index 9cf452709..000000000 --- a/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/FirstFitBinPacker.java +++ /dev/null @@ -1,92 +0,0 @@ -package com.linkedin.openhouse.optimizer.binpack; - -import com.linkedin.openhouse.optimizer.model.TableOperationDto; -import com.linkedin.openhouse.optimizer.model.TableStatsDto; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.List; -import java.util.Map; -import java.util.function.Supplier; -import java.util.stream.Collectors; -import lombok.extern.slf4j.Slf4j; - -/** - * First-fit-decreasing packing, generic over the concrete {@link BinItem} subtype {@code T}. - * Construction takes a {@code Supplier} — typically a {@code MyItem::new} method reference — - * which the packer invokes per operation to get an empty instance, then calls {@link - * BinItem#fromOpAndStats} on it to project the (operation, stats) pair into a populated item. - * - *

Sorts items by weight descending, then places each into the first group whose totals stay at - * or below {@code maxWeightPerBin} and {@code maxItemsPerBin}. An item whose weight exceeds the cap - * on its own goes into a group by itself. Operations whose {@code tableUuid} has no entry in {@code - * statsByTableUuid} are dropped. - * - *

Stateless: the constructor takes only the BinItem supplier and the cap configuration; {@link - * #pack} is a pure function over its arguments. The packer is operation-agnostic — the scheduler - * wraps each grouping into a {@link Bin} with the registered operation type. - */ -@Slf4j -public class FirstFitBinPacker implements BinPacker { - - private final Supplier binItemSupplier; - private final long maxWeightPerBin; - private final int maxItemsPerBin; - - public FirstFitBinPacker(Supplier binItemSupplier, long maxWeightPerBin, int maxItemsPerBin) { - this.binItemSupplier = binItemSupplier; - this.maxWeightPerBin = maxWeightPerBin; - this.maxItemsPerBin = maxItemsPerBin; - } - - @Override - public List> pack( - List operations, Map statsByTableUuid) { - List items = - operations.stream() - .filter(op -> statsByTableUuid.containsKey(op.getTableUuid())) - .map( - op -> - binItemSupplier - .get() - .fromOpAndStats(op, statsByTableUuid.get(op.getTableUuid()))) - .collect(Collectors.toList()); - List packingBins = - items.stream() - .sorted(Comparator.comparingLong(BinItem::getWeight).reversed()) - .collect(ArrayList::new, this::placeItem, List::addAll); - log.info( - "Packed {} operations ({} items after projection) into {} groupings", - operations.size(), - items.size(), - packingBins.size()); - return packingBins.stream().map(pb -> pb.items).collect(Collectors.toList()); - } - - private void placeItem(List bins, BinItem item) { - bins.stream() - .filter(b -> b.fits(item, maxWeightPerBin, maxItemsPerBin)) - .findFirst() - .ifPresentOrElse( - b -> b.add(item), - () -> { - PackingBin fresh = new PackingBin(); - fresh.add(item); - bins.add(fresh); - }); - } - - /** Running-totals helper used during the fold. */ - private static class PackingBin { - final List items = new ArrayList<>(); - long totalWeight; - - boolean fits(BinItem item, long maxWeight, int maxItems) { - return items.size() < maxItems && totalWeight + item.getWeight() <= maxWeight; - } - - void add(BinItem item) { - items.add(item); - totalWeight += item.getWeight(); - } - } -} diff --git a/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/FirstFitDecreasingBinPacker.java b/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/FirstFitDecreasingBinPacker.java new file mode 100644 index 000000000..689188ed5 --- /dev/null +++ b/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/FirstFitDecreasingBinPacker.java @@ -0,0 +1,133 @@ +package com.linkedin.openhouse.optimizer.binpack; + +import com.linkedin.openhouse.optimizer.model.OperationTypeDto; +import com.linkedin.openhouse.optimizer.model.TableOperationDto; +import com.linkedin.openhouse.optimizer.model.TableStatsDto; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import lombok.Builder; +import lombok.extern.slf4j.Slf4j; + +/** + * First-fit-decreasing packing. Sorts items by weight descending, then places each into the first + * group whose totals stay at or below {@code maxWeightPerBin} and {@code maxItemsPerBin}. An item + * whose weight exceeds the cap on its own goes into a group by itself. + * + *

Construct via the generated builder. Defaults are sized for the orphan-files-deletion case + * (1M-file weight cap, 50 tables per bin) but every field is overridable. Omit {@code + * binItemSupplier} when you are passing pre-projected {@link BinItem}s and don't need the + * projection path. + * + *

Three callable shapes, all sharing the same placement core: + * + *

    + *
  1. Projection path ({@link #pack(List, Map)}, the {@link BinPacker} contract) — takes + * raw operations + stats, calls {@code binItemSupplier.get().fromOpAndStats(op, stats)} per + * row, returns the resulting groupings. Operations whose {@code tableUuid} has no entry in + * {@code statsByTableUuid} are dropped. Requires {@code binItemSupplier}. + *
  2. Pre-projected raw groupings ({@link #pack(List)}) — caller has already projected + * items; packer returns {@code List>}. + *
  3. Pre-projected tagged bins ({@link #pack(List, OperationTypeDto)}) — caller has + * already projected items; packer wraps each grouping in a {@link Bin} tagged with the + * supplied operation type. Use this when the bins flow directly to a scheduler that expects + * the operation type alongside the items. + *
+ * + *

Stateless: {@code pack} is a pure function over its arguments. + */ +@Slf4j +@Builder +public class FirstFitDecreasingBinPacker implements BinPacker { + + /** Required only for the {@link #pack(List, Map)} projection path; {@code null} otherwise. */ + private final Supplier binItemSupplier; + + @Builder.Default private final long maxWeightPerBin = 1_000_000L; + + @Builder.Default private final int maxItemsPerBin = 50; + + @Override + public List> pack( + List operations, Map statsByTableUuid) { + if (binItemSupplier == null) { + throw new IllegalStateException( + "FirstFitDecreasingBinPacker built without a binItemSupplier; use pack(List) or pack(List, OperationTypeDto) instead"); + } + List items = + operations.stream() + .filter(op -> statsByTableUuid.containsKey(op.getTableUuid())) + .map( + op -> + binItemSupplier + .get() + .fromOpAndStats(op, statsByTableUuid.get(op.getTableUuid()))) + .collect(Collectors.toList()); + List> groupings = packCore(items); + log.info( + "Packed {} operations ({} items after projection) into {} groupings", + operations.size(), + items.size(), + groupings.size()); + return groupings; + } + + /** Pre-projected items; returns raw groupings. */ + public List> pack(List items) { + List> groupings = packCore(items); + log.info( + "Packed {} pre-projected items into {} groupings", + items == null ? 0 : items.size(), + groupings.size()); + return groupings; + } + + /** Pre-projected items; returns {@link Bin}s each tagged with {@code operationType}. */ + public List pack(List items, OperationTypeDto operationType) { + return pack(items).stream() + .map(group -> new Bin(operationType, group)) + .collect(Collectors.toList()); + } + + private List> packCore(List items) { + if (items == null || items.isEmpty()) { + return new ArrayList<>(); + } + List packingBins = + items.stream() + .sorted(Comparator.comparingLong(BinItem::getWeight).reversed()) + .collect(ArrayList::new, this::placeItem, List::addAll); + return packingBins.stream().map(pb -> pb.items).collect(Collectors.toList()); + } + + private void placeItem(List bins, BinItem item) { + bins.stream() + .filter(b -> b.fits(item, maxWeightPerBin, maxItemsPerBin)) + .findFirst() + .ifPresentOrElse( + b -> b.add(item), + () -> { + PackingBin fresh = new PackingBin(); + fresh.add(item); + bins.add(fresh); + }); + } + + /** Running-totals helper used during the fold. */ + private static class PackingBin { + final List items = new ArrayList<>(); + long totalWeight; + + boolean fits(BinItem item, long maxWeight, int maxItems) { + return items.size() < maxItems && totalWeight + item.getWeight() <= maxWeight; + } + + void add(BinItem item) { + items.add(item); + totalWeight += item.getWeight(); + } + } +} diff --git a/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/TotalFilesBinItem.java b/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/TotalFilesBinItem.java index 70d63bb39..3ad5200de 100644 --- a/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/TotalFilesBinItem.java +++ b/libs/optimizer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/TotalFilesBinItem.java @@ -12,8 +12,8 @@ * implementation knows nothing about which operation type it is wired up to. * *

Construction: callers pass {@code TotalFilesBinItem::new} as the {@code Supplier} to {@link - * FirstFitBinPacker}; the packer calls the supplier per operation to get an empty instance, then - * {@link #fromOpAndStats} on it to get a populated copy. + * FirstFitDecreasingBinPacker}; the packer calls the supplier per operation to get an empty + * instance, then {@link #fromOpAndStats} on it to get a populated copy. */ @Getter @ToString diff --git a/libs/optimizer/binpack/src/test/java/com/linkedin/openhouse/optimizer/binpack/FirstFitBinPackerTest.java b/libs/optimizer/binpack/src/test/java/com/linkedin/openhouse/optimizer/binpack/FirstFitBinPackerTest.java deleted file mode 100644 index 602f41711..000000000 --- a/libs/optimizer/binpack/src/test/java/com/linkedin/openhouse/optimizer/binpack/FirstFitBinPackerTest.java +++ /dev/null @@ -1,150 +0,0 @@ -package com.linkedin.openhouse.optimizer.binpack; - -import static org.assertj.core.api.Assertions.assertThat; - -import com.linkedin.openhouse.optimizer.model.TableOperationDto; -import com.linkedin.openhouse.optimizer.model.TableStatsDto; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; -import lombok.Getter; -import org.junit.jupiter.api.Test; - -/** - * Tests the {@link FirstFitBinPacker} bucketing logic in isolation via a {@link TestItem} whose - * weight comes from a {@code "weight"} entry in {@code tableProperties}. The supplier-then-{@code - * fromOpAndStats} pattern is exercised end-to-end through the public {@code pack} entry point. - * Projection logic for production BinItems (e.g. {@link TotalFilesBinItem}) is covered by their own - * tests. - */ -class FirstFitBinPackerTest { - - @Getter - static class TestItem implements BinItem { - private final String operationId; - private final long weight; - - public TestItem() { - this("", 0L); - } - - private TestItem(String operationId, long weight) { - this.operationId = operationId; - this.weight = weight; - } - - @Override - public String getFullyQualifiedTableName() { - return "db.tbl_" + operationId; - } - - @Override - public BinItem fromOpAndStats(TableOperationDto op, TableStatsDto stats) { - long w = Long.parseLong(stats.getTableProperties().get("weight")); - return new TestItem(op.getId(), w); - } - } - - private static TableOperationDto op(String id) { - return TableOperationDto.builder().id(id).tableUuid(id).build(); - } - - private static TableStatsDto statsWithWeight(String uuid, long weight) { - return TableStatsDto.builder() - .tableUuid(uuid) - .tableProperties(Map.of("weight", Long.toString(weight))) - .build(); - } - - private static List opsList(String... ids) { - return java.util.Arrays.stream(ids).map(FirstFitBinPackerTest::op).collect(Collectors.toList()); - } - - private static Map statsMap(Object... uuidWeightPairs) { - Map map = new HashMap<>(); - for (int i = 0; i < uuidWeightPairs.length; i += 2) { - String uuid = (String) uuidWeightPairs[i]; - long weight = (long) uuidWeightPairs[i + 1]; - map.put(uuid, statsWithWeight(uuid, weight)); - } - return map; - } - - private static FirstFitBinPacker packer(long maxWeight, int maxItems) { - return new FirstFitBinPacker<>(TestItem::new, maxWeight, maxItems); - } - - @Test - void emptyInput_returnsEmptyGroupings() { - assertThat(packer(100L, 10).pack(List.of(), Map.of())).isEmpty(); - } - - @Test - void singleItem_oneGrouping() { - List> groupings = packer(1_000_000L, 10).pack(opsList("a"), statsMap("a", 100L)); - assertThat(groupings).hasSize(1); - assertThat(groupings.get(0)).hasSize(1); - } - - @Test - void underWeightLimit_oneGrouping() { - List> groupings = - packer(1_000_000L, 10) - .pack(opsList("a", "b", "c"), statsMap("a", 300_000L, "b", 300_000L, "c", 300_000L)); - assertThat(groupings).hasSize(1); - assertThat(groupings.get(0)).hasSize(3); - } - - @Test - void overWeightLimit_twoGroupings() { - List> groupings = - packer(1_000_000L, 10) - .pack(opsList("a", "b", "c"), statsMap("a", 600_000L, "b", 600_000L, "c", 400_000L)); - assertThat(groupings).hasSize(2); - // FFD: sort desc → 600, 600, 400. Place 600 → bin0; next 600 doesn't fit bin0, → bin1; 400 - // fits bin0 (total 1_000_000). - long b0 = groupings.get(0).stream().mapToLong(BinItem::getWeight).sum(); - long b1 = groupings.get(1).stream().mapToLong(BinItem::getWeight).sum(); - assertThat(b0).isEqualTo(1_000_000L); - assertThat(b1).isEqualTo(600_000L); - } - - @Test - void itemLargerThanCap_getsOwnGrouping() { - List> groupings = - packer(1_000L, 10).pack(opsList("big"), statsMap("big", 5_000L)); - assertThat(groupings).hasSize(1); - assertThat(groupings.get(0)).hasSize(1); - } - - @Test - void sortedDescending_largestFirst() { - List> groupings = - packer(2_000_000L, 10) - .pack(opsList("small", "large"), statsMap("small", 100L, "large", 900_000L)); - assertThat(groupings).hasSize(1); - List ids = - groupings.get(0).stream().map(BinItem::getOperationId).collect(Collectors.toList()); - assertThat(ids).containsExactly("large", "small"); - } - - @Test - void maxItemsCap_splitsGroupings() { - List> groupings = - packer(1_000_000L, 2) - .pack(opsList("a", "b", "c", "d"), statsMap("a", 1L, "b", 1L, "c", 1L, "d", 1L)); - assertThat(groupings).hasSize(2); - assertThat(groupings.get(0)).hasSize(2); - assertThat(groupings.get(1)).hasSize(2); - } - - @Test - void operationsWithoutStats_dropped() { - List> groupings = - packer(1_000_000L, 10).pack(opsList("a", "missing"), statsMap("a", 100L)); - assertThat(groupings).hasSize(1); - assertThat(groupings.get(0)).hasSize(1); - assertThat(groupings.get(0).get(0).getOperationId()).isEqualTo("a"); - } -} diff --git a/libs/optimizer/binpack/src/test/java/com/linkedin/openhouse/optimizer/binpack/FirstFitDecreasingBinPackerTest.java b/libs/optimizer/binpack/src/test/java/com/linkedin/openhouse/optimizer/binpack/FirstFitDecreasingBinPackerTest.java new file mode 100644 index 000000000..e11c23fd9 --- /dev/null +++ b/libs/optimizer/binpack/src/test/java/com/linkedin/openhouse/optimizer/binpack/FirstFitDecreasingBinPackerTest.java @@ -0,0 +1,317 @@ +package com.linkedin.openhouse.optimizer.binpack; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import com.linkedin.openhouse.optimizer.model.OperationTypeDto; +import com.linkedin.openhouse.optimizer.model.TableOperationDto; +import com.linkedin.openhouse.optimizer.model.TableStatsDto; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import lombok.AllArgsConstructor; +import lombok.Getter; +import org.junit.jupiter.api.Test; + +/** + * Tests the three callable shapes of {@link FirstFitDecreasingBinPacker}: + * + *

    + *
  • {@code pack(ops, stats)} — projection path via {@link ProjectingTestItem}'s {@code + * fromOpAndStats}. + *
  • {@code pack(items)} — pre-projected raw groupings. + *
  • {@code pack(items, operationType)} — pre-projected tagged {@link Bin}s. + *
+ * + * Projection logic for production BinItems (e.g. {@link TotalFilesBinItem}) is covered by their own + * tests. + */ +class FirstFitDecreasingBinPackerTest { + + private static final OperationTypeDto OFD = OperationTypeDto.ORPHAN_FILES_DELETION; + + // -------------------- Projection path: pack(ops, stats) -------------------- + + @Test + void emptyInput_returnsEmptyGroupings() { + assertThat(projectionPacker(100L, 10).pack(Collections.emptyList(), Collections.emptyMap())) + .isEmpty(); + } + + @Test + void singleItem_oneGrouping() { + List> groupings = + projectionPacker(1_000_000L, 10).pack(opsList("a"), statsMap("a", 100L)); + assertThat(groupings).hasSize(1); + assertThat(groupings.get(0)).hasSize(1); + } + + @Test + void underWeightLimit_oneGrouping() { + List> groupings = + projectionPacker(1_000_000L, 10) + .pack(opsList("a", "b", "c"), statsMap("a", 300_000L, "b", 300_000L, "c", 300_000L)); + assertThat(groupings).hasSize(1); + assertThat(groupings.get(0)).hasSize(3); + } + + @Test + void overWeightLimit_twoGroupings() { + List> groupings = + projectionPacker(1_000_000L, 10) + .pack(opsList("a", "b", "c"), statsMap("a", 600_000L, "b", 600_000L, "c", 400_000L)); + assertThat(groupings).hasSize(2); + // FFD: sort desc → 600, 600, 400. Place 600 → bin0; next 600 doesn't fit bin0, → bin1; 400 + // fits bin0 (total 1_000_000). + long b0 = groupings.get(0).stream().mapToLong(BinItem::getWeight).sum(); + long b1 = groupings.get(1).stream().mapToLong(BinItem::getWeight).sum(); + assertThat(b0).isEqualTo(1_000_000L); + assertThat(b1).isEqualTo(600_000L); + } + + @Test + void itemLargerThanCap_getsOwnGrouping() { + List> groupings = + projectionPacker(1_000L, 10).pack(opsList("big"), statsMap("big", 5_000L)); + assertThat(groupings).hasSize(1); + assertThat(groupings.get(0)).hasSize(1); + } + + @Test + void sortedDescending_largestFirst() { + List> groupings = + projectionPacker(2_000_000L, 10) + .pack(opsList("small", "large"), statsMap("small", 100L, "large", 900_000L)); + assertThat(groupings).hasSize(1); + List ids = + groupings.get(0).stream().map(BinItem::getOperationId).collect(Collectors.toList()); + assertThat(ids).containsExactly("large", "small"); + } + + @Test + void maxItemsCap_splitsGroupings() { + List> groupings = + projectionPacker(1_000_000L, 2) + .pack(opsList("a", "b", "c", "d"), statsMap("a", 1L, "b", 1L, "c", 1L, "d", 1L)); + assertThat(groupings).hasSize(2); + assertThat(groupings.get(0)).hasSize(2); + assertThat(groupings.get(1)).hasSize(2); + } + + @Test + void operationsWithoutStats_dropped() { + List> groupings = + projectionPacker(1_000_000L, 10).pack(opsList("a", "missing"), statsMap("a", 100L)); + assertThat(groupings).hasSize(1); + assertThat(groupings.get(0)).hasSize(1); + assertThat(groupings.get(0).get(0).getOperationId()).isEqualTo("a"); + } + + // -------------------- Pre-projected path: pack(items) and pack(items, OFD) -------------------- + + @Test + void preProjected_emptyInput_returnsEmpty() { + assertThat(preProjectedPacker(100L, 50).pack(Collections.emptyList(), OFD)).isEmpty(); + assertThat(preProjectedPacker(100L, 50).pack((List) null, OFD)).isEmpty(); + } + + @Test + void preProjected_itemsSortDescendingBeforePlacing() { + List items = + Arrays.asList(item("db.t_small", 10), item("db.t_big", 100), item("db.t_mid", 50)); + + List bins = preProjectedPacker(1000L, 50).pack(items, OFD); + + assertThat(bins).hasSize(1); + Bin only = bins.get(0); + assertThat(only.getOperationType()).isEqualTo(OFD); + assertThat(only.getItems()) + .extracting(BinItem::getFullyQualifiedTableName) + .containsExactly("db.t_big", "db.t_mid", "db.t_small"); + } + + @Test + void preProjected_weightCapForcesMultipleBins() { + List items = + Arrays.asList(item("db.a", 60), item("db.b", 50), item("db.c", 40), item("db.d", 30)); + + List bins = preProjectedPacker(100L, 50).pack(items, OFD); + + // FFD on [60, 50, 40, 30] with weight cap 100: + // bin0: 60 → remaining 40 + // bin0 tries 50 → doesn't fit, new bin1: 50 + // bin0 tries 40 → fits, bin0: 60 + 40 = 100 + // bin1 tries 30 → fits, bin1: 50 + 30 = 80 + assertThat(bins).hasSize(2); + assertThat(bins.get(0).getItems().stream().mapToLong(BinItem::getWeight).sum()).isEqualTo(100L); + assertThat(bins.get(1).getItems().stream().mapToLong(BinItem::getWeight).sum()).isEqualTo(80L); + } + + @Test + void preProjected_maxItemsPerBinCapHonored() { + List items = + IntStream.range(0, 5).mapToObj(i -> item("db.t" + i, 1)).collect(Collectors.toList()); + + List bins = preProjectedPacker(1000L, 2).pack(items, OFD); + + assertThat(bins).hasSize(3); + assertThat(bins.get(0).getItems()).hasSize(2); + assertThat(bins.get(1).getItems()).hasSize(2); + assertThat(bins.get(2).getItems()).hasSize(1); + } + + @Test + void preProjected_oversizedItemGetsItsOwnBin() { + List items = + Arrays.asList(item("db.tiny1", 10), item("db.giant", 500), item("db.tiny2", 10)); + + List bins = preProjectedPacker(100L, 50).pack(items, OFD); + + long total = + bins.stream().flatMap(b -> b.getItems().stream()).mapToLong(BinItem::getWeight).sum(); + assertThat(total).isEqualTo(520L); + boolean giantPresent = + bins.stream() + .flatMap(b -> b.getItems().stream()) + .anyMatch(i -> i.getFullyQualifiedTableName().equals("db.giant")); + assertThat(giantPresent).isTrue(); + } + + @Test + void preProjected_operationTypeIsPropagatedToEachBin() { + List items = Arrays.asList(item("db.a", 1), item("db.b", 1), item("db.c", 1)); + List bins = preProjectedPacker(1000L, 2).pack(items, OFD); + assertThat(bins).hasSize(2); + assertThat(bins).allMatch(b -> b.getOperationType().equals(OFD)); + } + + @Test + void preProjected_rawGroupingsOverload_omitsOperationType() { + List items = Arrays.asList(item("db.a", 60), item("db.b", 50)); + List> groupings = preProjectedPacker(100L, 50).pack(items); + assertThat(groupings).hasSize(2); + assertThat(groupings.get(0)).hasSize(1); + assertThat(groupings.get(1)).hasSize(1); + } + + // -------------------- Builder / contract checks -------------------- + + @Test + void omittingBinItemSupplier_rejectsProjectionPath() { + FirstFitDecreasingBinPacker packer = + FirstFitDecreasingBinPacker.builder() + .maxWeightPerBin(100L) + .maxItemsPerBin(10) + .build(); + assertThatThrownBy(() -> packer.pack(Collections.emptyList(), Collections.emptyMap())) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("binItemSupplier"); + } + + @Test + void builderDefaults_appliedWhenCapsOmitted() { + // The defaults are 1_000_000 weight / 50 items — sized for OFD. With 60 items of weight 100, + // the 50-items cap kicks in before the weight cap. + List items = + IntStream.range(0, 60).mapToObj(i -> item("db.t" + i, 100)).collect(Collectors.toList()); + + List bins = FirstFitDecreasingBinPacker.builder().build().pack(items, OFD); + + assertThat(bins).hasSize(2); + assertThat(bins.get(0).getItems()).hasSize(50); + assertThat(bins.get(1).getItems()).hasSize(10); + } + + // -------------------- Fixtures -------------------- + + private static FirstFitDecreasingBinPacker projectionPacker( + long maxWeight, int maxItems) { + return FirstFitDecreasingBinPacker.builder() + .binItemSupplier(ProjectingTestItem::new) + .maxWeightPerBin(maxWeight) + .maxItemsPerBin(maxItems) + .build(); + } + + private static FirstFitDecreasingBinPacker preProjectedPacker( + long maxWeight, int maxItems) { + return FirstFitDecreasingBinPacker.builder() + .maxWeightPerBin(maxWeight) + .maxItemsPerBin(maxItems) + .build(); + } + + private static TableOperationDto op(String id) { + return TableOperationDto.builder().id(id).tableUuid(id).build(); + } + + private static TableStatsDto statsWithWeight(String uuid, long weight) { + return TableStatsDto.builder() + .tableUuid(uuid) + .tableProperties(Map.of("weight", Long.toString(weight))) + .build(); + } + + private static List opsList(String... ids) { + return Arrays.stream(ids).map(FirstFitDecreasingBinPackerTest::op).collect(Collectors.toList()); + } + + private static Map statsMap(Object... uuidWeightPairs) { + Map map = new HashMap<>(); + for (int i = 0; i < uuidWeightPairs.length; i += 2) { + String uuid = (String) uuidWeightPairs[i]; + long weight = (long) uuidWeightPairs[i + 1]; + map.put(uuid, statsWithWeight(uuid, weight)); + } + return map; + } + + private static BinItem item(String fqtn, long weight) { + return new PreProjectedTestItem(fqtn, "op-" + fqtn, weight); + } + + /** Exercises the {@code fromOpAndStats} projection callback. */ + @Getter + static class ProjectingTestItem implements BinItem { + private final String operationId; + private final long weight; + + public ProjectingTestItem() { + this("", 0L); + } + + private ProjectingTestItem(String operationId, long weight) { + this.operationId = operationId; + this.weight = weight; + } + + @Override + public String getFullyQualifiedTableName() { + return "db.tbl_" + operationId; + } + + @Override + public BinItem fromOpAndStats(TableOperationDto op, TableStatsDto stats) { + long w = Long.parseLong(stats.getTableProperties().get("weight")); + return new ProjectingTestItem(op.getId(), w); + } + } + + /** Pre-projected: packer treats items as opaque so {@code fromOpAndStats} is a no-op. */ + @AllArgsConstructor + @Getter + static class PreProjectedTestItem implements BinItem { + private final String fullyQualifiedTableName; + private final String operationId; + private final long weight; + + @Override + public BinItem fromOpAndStats(TableOperationDto op, TableStatsDto stats) { + return this; + } + } +} diff --git a/services/optimizer/build.gradle b/services/optimizer/build.gradle index c05c7f9c3..f5875b734 100644 --- a/services/optimizer/build.gradle +++ b/services/optimizer/build.gradle @@ -1,6 +1,27 @@ plugins { id 'openhouse.springboot-ext-conventions' id 'org.springframework.boot' version '2.7.8' + // OpenAPI spec generation for :client:optimizerclient. + id 'com.github.johnrengelman.processes' version '0.5.0' + id 'org.springdoc.openapi-gradle-plugin' version '1.6.0' + id 'openhouse.service-specgen-convention' +} + +// Port allocation: tables 8000, housetables 8001, jobs 8002, optimizer 8003. +// H2 overrides let the Spring context finish startup so springdoc can extract the spec. +openApi { + customBootRun { + args = [ + "--server.port=8003", + "--spring.datasource.url=jdbc:h2:mem:codegen;MODE=MYSQL;DB_CLOSE_DELAY=-1", + "--spring.datasource.driver-class-name=org.h2.Driver", + "--spring.datasource.username=sa", + "--spring.datasource.password=", + "--spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.H2Dialect", + "--spring.sql.init.mode=never" + ] + } + apiDocsUrl.set("http://localhost:8003/v3/api-docs") } dependencies { @@ -8,7 +29,10 @@ dependencies { implementation 'com.vladmihalcea:hibernate-types-55:2.21.1' implementation 'org.springframework.boot:spring-boot-starter-web:2.7.8' implementation 'mysql:mysql-connector-java:8.+' - testImplementation 'com.h2database:h2:2.2.224' + // H2 only for spec-gen bootRun; excluded from bootJar. Pinned to 2.1.210 (matches + // :iceberg:openhouse:htscatalog) — 2.2.x switches to slf4j-api 2.x and breaks downstream bridges. + developmentOnly 'com.h2database:h2:2.1.210' + testImplementation 'com.h2database:h2:2.1.210' testImplementation 'org.springframework.boot:spring-boot-starter-test:2.7.8' } diff --git a/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/config/SchedulerConfig.java b/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/config/SchedulerConfig.java index 82af33275..cc7244115 100644 --- a/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/config/SchedulerConfig.java +++ b/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/config/SchedulerConfig.java @@ -1,6 +1,6 @@ package com.linkedin.openhouse.optimizer.scheduler.config; -import com.linkedin.openhouse.optimizer.binpack.FirstFitBinPacker; +import com.linkedin.openhouse.optimizer.binpack.FirstFitDecreasingBinPacker; import com.linkedin.openhouse.optimizer.binpack.TotalFilesBinItem; import com.linkedin.openhouse.optimizer.model.OperationTypeDto; import com.linkedin.openhouse.optimizer.repository.TableOperationsRepository; @@ -37,9 +37,9 @@ public JobsServiceClient jobsServiceClient(WebClient jobsWebClient) { } /** - * Orphan files deletion: a {@link FirstFitBinPacker} over {@link TotalFilesBinItem}. Cost scales - * with file count — per-file list, manifest joins, and delete calls dominate independent of file - * size. + * Orphan files deletion: a {@link FirstFitDecreasingBinPacker} over {@link TotalFilesBinItem}. + * Cost scales with file count — per-file list, manifest joins, and delete calls dominate + * independent of file size. */ @Bean public SchedulerRunner schedulerRunner( @@ -52,6 +52,10 @@ public SchedulerRunner schedulerRunner( return new SchedulerRunner(operationsRepo, statsRepo, jobsClient, resultsEndpoint) .registerOperation( OperationTypeDto.ORPHAN_FILES_DELETION, - new FirstFitBinPacker<>(TotalFilesBinItem::new, ofdMaxFilesPerBin, ofdMaxTablesPerBin)); + FirstFitDecreasingBinPacker.builder() + .binItemSupplier(TotalFilesBinItem::new) + .maxWeightPerBin(ofdMaxFilesPerBin) + .maxItemsPerBin(ofdMaxTablesPerBin) + .build()); } } diff --git a/services/optimizer/scheduler/src/test/java/com/linkedin/openhouse/optimizer/scheduler/SchedulerRunnerTest.java b/services/optimizer/scheduler/src/test/java/com/linkedin/openhouse/optimizer/scheduler/SchedulerRunnerTest.java index 92df3287d..45f342ec7 100644 --- a/services/optimizer/scheduler/src/test/java/com/linkedin/openhouse/optimizer/scheduler/SchedulerRunnerTest.java +++ b/services/optimizer/scheduler/src/test/java/com/linkedin/openhouse/optimizer/scheduler/SchedulerRunnerTest.java @@ -10,7 +10,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import com.linkedin.openhouse.optimizer.binpack.FirstFitBinPacker; +import com.linkedin.openhouse.optimizer.binpack.FirstFitDecreasingBinPacker; import com.linkedin.openhouse.optimizer.binpack.TotalFilesBinItem; import com.linkedin.openhouse.optimizer.db.OperationStatus; import com.linkedin.openhouse.optimizer.db.SnapshotMetrics; @@ -49,7 +49,11 @@ void setUp() { new SchedulerRunner(operationsRepo, statsRepo, jobsClient, RESULTS_ENDPOINT) .registerOperation( ORPHAN_FILES_DELETION, - new FirstFitBinPacker<>(TotalFilesBinItem::new, 1_000_000L, 50)); + FirstFitDecreasingBinPacker.builder() + .binItemSupplier(TotalFilesBinItem::new) + .maxWeightPerBin(1_000_000L) + .maxItemsPerBin(50) + .build()); } // ---- Stubbing helpers ---- diff --git a/settings.gradle b/settings.gradle index ebfce9060..9acc07d67 100644 --- a/settings.gradle +++ b/settings.gradle @@ -29,6 +29,7 @@ include ':client:common' include ':client:hts' include ':client:jobsclient' include ':client:tableclient' +include ':client:optimizerclient' include ':client:secureclient' include ':integrations:java:iceberg-1.2:openhouse-java-runtime'