From 3a760da9d290a56e398326536b43881f47fbd6e8 Mon Sep 17 00:00:00 2001 From: Abhishek Nath Date: Wed, 27 May 2026 09:59:10 -0700 Subject: [PATCH 1/4] Integrate batched orphan files deletion with the existing schedule workflow --- .../jobs/scheduler/JobsScheduler.java | 12 +++ .../BatchedTableOrphanFilesDeletionTask.java | 79 ++++++++++++++ .../tasks/OperationTasksBuilder.java | 101 ++++++++++++++++++ .../BatchedOrphanFilesDeletionSparkApp.java | 62 ++++++++--- .../jobs/util/TableMetadataBatch.java | 42 ++++++++ ...tchedTableOrphanFilesDeletionTaskTest.java | 91 ++++++++++++++++ ...edOrphanFilesDeletionSparkAppArgsTest.java | 57 ++++++++-- .../docker-compose/oh-hadoop-spark/jobs.yaml | 8 ++ .../openhouse/jobs/model/JobConf.java | 6 ++ 9 files changed, 433 insertions(+), 25 deletions(-) create mode 100644 apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/BatchedTableOrphanFilesDeletionTask.java create mode 100644 apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/TableMetadataBatch.java create mode 100644 apps/spark/src/test/java/com/linkedin/openhouse/jobs/scheduler/tasks/BatchedTableOrphanFilesDeletionTaskTest.java diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/JobsScheduler.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/JobsScheduler.java index 2067fa014..361116288 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/JobsScheduler.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/JobsScheduler.java @@ -583,6 +583,13 @@ protected static CommandLine parseArgs(String[] args) { .longOpt(OperationTasksBuilder.MAX_COST_BUDGET_GB_HRS) .desc("Maximum compute cost budget in GB hours") .build()); + options.addOption( + Option.builder(null) + .required(false) + .hasArg() + .longOpt(OperationTasksBuilder.BATCH_MAX_ITEMS) + .desc("Max tables per batched OFD job (ORPHAN_FILES_DELETION_BATCH only)") + .build()); options.addOption( Option.builder(null) .required(false) @@ -744,6 +751,11 @@ protected static Properties getAdditionalProperties(CommandLine cmdLine) { OperationTasksBuilder.MAX_STRATEGIES_COUNT, cmdLine.getOptionValue(OperationTasksBuilder.MAX_STRATEGIES_COUNT)); } + if (cmdLine.hasOption(OperationTasksBuilder.BATCH_MAX_ITEMS)) { + result.setProperty( + OperationTasksBuilder.BATCH_MAX_ITEMS, + cmdLine.getOptionValue(OperationTasksBuilder.BATCH_MAX_ITEMS)); + } return result; } diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/BatchedTableOrphanFilesDeletionTask.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/BatchedTableOrphanFilesDeletionTask.java new file mode 100644 index 000000000..7c2e13c1b --- /dev/null +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/BatchedTableOrphanFilesDeletionTask.java @@ -0,0 +1,79 @@ +package com.linkedin.openhouse.jobs.scheduler.tasks; + +import com.linkedin.openhouse.jobs.client.JobsClient; +import com.linkedin.openhouse.jobs.client.TablesClient; +import com.linkedin.openhouse.jobs.client.model.JobConf; +import com.linkedin.openhouse.jobs.util.TableMetadata; +import com.linkedin.openhouse.jobs.util.TableMetadataBatch; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +/** + * A task to remove orphan files from a batch of tables in a single Spark job. Pairs with {@code + * com.linkedin.openhouse.jobs.spark.BatchedOrphanFilesDeletionSparkApp} via the {@link + * JobConf.JobTypeEnum#ORPHAN_FILES_DELETION_BATCH} JobType. + * + *

The legacy {@link com.linkedin.openhouse.jobs.scheduler.JobsScheduler} pre-dates the optimizer + * service, so this task omits the optimizer-only CLI flags ({@code --resultsEndpoint}, {@code + * --operationIds}, {@code --tableUuids}). The Spark app treats them as optional and falls back to + * HTS-only lifecycle tracking when they are absent. + * + * @see Delete + * orphan files + */ +@Slf4j +@Getter +public class BatchedTableOrphanFilesDeletionTask extends OperationTask { + public static final JobConf.JobTypeEnum OPERATION_TYPE = + JobConf.JobTypeEnum.ORPHAN_FILES_DELETION_BATCH; + + public BatchedTableOrphanFilesDeletionTask( + JobsClient jobsClient, + TablesClient tablesClient, + TableMetadataBatch metadata, + long pollIntervalMs, + long queuedTimeoutMs, + long taskTimeoutMs) { + super(jobsClient, tablesClient, metadata, pollIntervalMs, queuedTimeoutMs, taskTimeoutMs); + } + + public BatchedTableOrphanFilesDeletionTask( + JobsClient jobsClient, TablesClient tablesClient, TableMetadataBatch metadata) { + super(jobsClient, tablesClient, metadata); + } + + @Override + public JobConf.JobTypeEnum getType() { + return OPERATION_TYPE; + } + + @Override + protected List getArgs() { + String tableNames = + metadata.getTables().stream().map(TableMetadata::fqtn).collect(Collectors.joining(",")); + return Arrays.asList("--tableNames", tableNames); + } + + @Override + protected boolean shouldRun() { + return !metadata.getTables().isEmpty(); + } + + @Override + protected boolean launchJob() { + String jobName = + String.format("%s_%s_%d", getType(), metadata.getDbName(), metadata.getTables().size()); + Map executionProperties = Collections.emptyMap(); + String proxyUser = metadata.getTables().get(0).getCreator(); + jobId = + jobsClient + .launch(jobName, getType(), proxyUser, executionProperties, getArgs()) + .orElse(null); + return jobId != null; + } +} diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/OperationTasksBuilder.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/OperationTasksBuilder.java index 37e3df399..7de5bf926 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/OperationTasksBuilder.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/OperationTasksBuilder.java @@ -9,6 +9,7 @@ import com.linkedin.openhouse.jobs.client.TablesClient; import com.linkedin.openhouse.jobs.client.model.JobConf; import com.linkedin.openhouse.jobs.scheduler.JobsScheduler; +import com.linkedin.openhouse.jobs.spark.BatchedOrphanFilesDeletionSparkApp; import com.linkedin.openhouse.jobs.util.AppConstants; import com.linkedin.openhouse.jobs.util.DataLayoutUtil; import com.linkedin.openhouse.jobs.util.DatabaseMetadata; @@ -16,10 +17,15 @@ import com.linkedin.openhouse.jobs.util.Metadata; import com.linkedin.openhouse.jobs.util.TableDataLayoutMetadata; import com.linkedin.openhouse.jobs.util.TableMetadata; +import com.linkedin.openhouse.jobs.util.TableMetadataBatch; +import com.linkedin.openhouse.jobs.util.binpack.Bin; +import com.linkedin.openhouse.jobs.util.binpack.BinItem; +import com.linkedin.openhouse.jobs.util.binpack.FirstFitDecreasingBinPacker; import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Properties; import java.util.stream.Collectors; @@ -40,10 +46,12 @@ public class OperationTasksBuilder { public static final String MAX_COST_BUDGET_GB_HRS = "maxCostBudgetGbHrs"; public static final String MAX_STRATEGIES_COUNT = "maxStrategiesCount"; + public static final String BATCH_MAX_ITEMS = "batchMaxItems"; private static final double COMPUTE_COST_WEIGHT_DEFAULT = 0.3; private static final double COMPACTION_GAIN_WEIGHT_DEFAULT = 0.7; private static final double MAX_COST_BUDGET_GB_HRS_DEFAULT = 1000.0; private static final int MAX_STRATEGIES_COUNT_DEFAULT = 10; + private static final int BATCH_MAX_ITEMS_DEFAULT = 25; private static final String METRICS_SCOPE = JobsScheduler.class.getName(); private final OperationTaskFactory> taskFactory; @@ -65,6 +73,80 @@ private List> prepareTableOperationTaskList( return processMetadataList(tableMetadataList, jobType, operationMode, otelEmitter); } + /** + * Builds one {@link BatchedTableOrphanFilesDeletionTask} per database-scoped bin. Groups eligible + * tables by database (batches never cross databases), then applies the first-fit-decreasing bin + * packer with a per-bin item cap from {@code properties} (defaults to {@value + * #BATCH_MAX_ITEMS_DEFAULT}). Tables with the maintenance op disabled are filtered out before + * grouping. + */ + private List> prepareBatchedOrphanFilesDeletionTaskList( + JobConf.JobTypeEnum jobType, + Properties properties, + OperationMode operationMode, + OtelEmitter otelEmitter) { + int maxItemsPerBin = + NumberUtils.toInt(properties.getProperty(BATCH_MAX_ITEMS), BATCH_MAX_ITEMS_DEFAULT); + if (maxItemsPerBin > BatchedOrphanFilesDeletionSparkApp.MAX_BATCH_SIZE) { + throw new IllegalArgumentException( + String.format( + "--%s=%d exceeds Spark-app ceiling MAX_BATCH_SIZE=%d", + BATCH_MAX_ITEMS, maxItemsPerBin, BatchedOrphanFilesDeletionSparkApp.MAX_BATCH_SIZE)); + } + List eligible = + tablesClient.getTableMetadataList().stream() + .filter(t -> !t.isMaintenanceJobDisabled(jobType)) + .collect(Collectors.toList()); + log.info( + "Fetched metadata for {} batched-OFD-eligible tables; binMaxItems={}", + eligible.size(), + maxItemsPerBin); + + FirstFitDecreasingBinPacker packer = + FirstFitDecreasingBinPacker.builder() + .maxItemsPerBin(maxItemsPerBin) + // Item-count cap only; weight/size dimensions disabled until table_stats is wired in. + .maxWeightPerBin(0) + .maxSizeBytesPerBin(0) + .build(); + + Map> byDb = + eligible.stream().collect(Collectors.groupingBy(TableMetadata::getDbName)); + + List batches = new ArrayList<>(); + for (Map.Entry> dbGroup : byDb.entrySet()) { + String dbName = dbGroup.getKey(); + List items = + dbGroup.getValue().stream() + .map( + t -> + BinItem.builder() + .fqtn(t.fqtn()) + .operationId("") + .tableUuid("") + .databaseName(t.getDbName()) + .tableName(t.getTableName()) + .weight(1L) + .sizeBytes(0L) + .build()) + .collect(Collectors.toList()); + for (Bin bin : packer.pack(items)) { + List tablesForBin = + bin.items().stream() + .map( + item -> + dbGroup.getValue().stream() + .filter(t -> t.fqtn().equals(item.getFqtn())) + .findFirst() + .orElseThrow(() -> new IllegalStateException("missing table for bin"))) + .collect(Collectors.toList()); + batches.add(TableMetadataBatch.builder().dbName(dbName).tables(tablesForBin).build()); + } + } + log.info("Packed {} eligible tables into {} batches", eligible.size(), batches.size()); + return processMetadataList(batches, jobType, operationMode, otelEmitter); + } + private List> prepareReplicationOperationTaskList( JobConf.JobTypeEnum jobType, OperationMode operationMode, OtelEmitter otelEmitter) { List replicationSetupTableMetadataList = tablesClient.getTableMetadataList(); @@ -272,6 +354,9 @@ public List> buildOperationTaskList( case DATA_LAYOUT_STRATEGY_GENERATION: case SORT_STATS_COLLECTION: return prepareTableOperationTaskList(jobType, operationMode, otelEmitter); + case ORPHAN_FILES_DELETION_BATCH: + return prepareBatchedOrphanFilesDeletionTaskList( + jobType, properties, operationMode, otelEmitter); case REPLICATION: return prepareReplicationOperationTaskList(jobType, operationMode, otelEmitter); case DATA_LAYOUT_STRATEGY_EXECUTION: @@ -300,6 +385,22 @@ public void buildOperationTaskListInParallel( buildDataLayoutOperationTaskListInParallel(jobType, properties, operationMode, otelEmitter); } else if (jobType == JobConf.JobTypeEnum.TABLE_DIRECTORY_DELETION) { buildDatabaseLevelOperationTasksInParallel(jobType, operationMode, otelEmitter); + } else if (jobType == JobConf.JobTypeEnum.ORPHAN_FILES_DELETION_BATCH) { + // Batched OFD needs the full table set in hand before it can group-by-db and bin-pack, + // so we use the synchronous fetch path then enqueue the tasks in bulk. + List> tasks = + prepareBatchedOrphanFilesDeletionTaskList( + jobType, properties, operationMode, otelEmitter); + for (OperationTask task : tasks) { + try { + operationTaskManager.addData(task); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.warn("Interrupted while enqueueing batched OFD task", e); + } + } + operationTaskManager.updateDataGenerationCompletion(); + log.info("Enqueued {} batched OFD tasks for job type: {}", tasks.size(), jobType); } else { buildOperationTaskListInParallelInternal(jobType, operationMode, otelEmitter); } diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/BatchedOrphanFilesDeletionSparkApp.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/BatchedOrphanFilesDeletionSparkApp.java index ea577710f..3dcf48cef 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/BatchedOrphanFilesDeletionSparkApp.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/BatchedOrphanFilesDeletionSparkApp.java @@ -57,9 +57,23 @@ @Slf4j public class BatchedOrphanFilesDeletionSparkApp extends BaseSparkApp { + private static final String OPERATION_TYPE = "ORPHAN_FILES_DELETION"; + private static final String STATUS_SUCCESS = "SUCCESS"; + private static final String STATUS_FAILED = "FAILED"; private static final int DEFAULT_MAX_ORPHAN_FILE_SAMPLE_SIZE = 20000; private static final int DEFAULT_MIN_OFD_TTL_IN_DAYS = 3; + /** + * Hard ceiling on the number of tables a single batched job can carry. The wire path is parallel + * CSV CLI args (see {@link #buildEntries}); at ~120 chars per entry (36-char UUID × 3 lists) this + * gives ~24 KB on the command line, well under the typical Linux {@code ARG_MAX} of 128 KB but + * leaves headroom for the {@code spark-submit} envelope and JVM flags. The scheduler-driven path + * uses a smaller per-entry footprint but inherits the same cap for defense in depth. Operators + * tune the per-job batch size with {@code --batchMaxItems} (default {@code 25}); this constant is + * a footgun stop, not the operating point. + */ + public static final int MAX_BATCH_SIZE = 200; + private final List entries; private final String resultsEndpoint; private final int driverParallelism; @@ -178,8 +192,14 @@ private void shutdownPool(ExecutorService pool) { } } + /** + * Returns a client bound to {@link #resultsEndpoint}, or {@code null} when the endpoint was not + * configured — in that case the legacy {@link + * com.linkedin.openhouse.jobs.scheduler.JobsScheduler} is the caller and reports lifecycle via + * HTS; the per-operation optimizer callback is skipped. + */ protected OptimizerServiceClient newOptimizerClient() { - return new OptimizerServiceClient(resultsEndpoint); + return resultsEndpoint == null ? null : new OptimizerServiceClient(resultsEndpoint); } /** @@ -381,23 +401,29 @@ public static BatchedOrphanFilesDeletionSparkApp createApp( } static List buildEntries(String tableNames, String operationIds, String tableUuids) { - if (tableNames == null - || operationIds == null - || tableUuids == null - || tableNames.isEmpty() - || operationIds.isEmpty() - || tableUuids.isEmpty()) { - throw new IllegalArgumentException( - "--tableNames, --operationIds, and --tableUuids are all required and must be non-empty"); + if (tableNames == null || tableNames.isEmpty()) { + throw new IllegalArgumentException("--tableNames is required and must be non-empty"); } String[] tables = tableNames.split(","); - String[] ops = operationIds.split(","); - String[] uuids = tableUuids.split(","); - if (tables.length != ops.length || tables.length != uuids.length) { + if (tables.length > MAX_BATCH_SIZE) { + throw new IllegalArgumentException( + String.format( + "Batch size %d exceeds MAX_BATCH_SIZE=%d; reduce --batchMaxItems on the scheduler", + tables.length, MAX_BATCH_SIZE)); + } + String[] ops = isBlank(operationIds) ? null : operationIds.split(","); + String[] uuids = isBlank(tableUuids) ? null : tableUuids.split(","); + if (ops != null && ops.length != tables.length) { throw new IllegalArgumentException( String.format( - "Parallel-list length mismatch: tableNames=%d operationIds=%d tableUuids=%d", - tables.length, ops.length, uuids.length)); + "Parallel-list length mismatch: tableNames=%d operationIds=%d", + tables.length, ops.length)); + } + if (uuids != null && uuids.length != tables.length) { + throw new IllegalArgumentException( + String.format( + "Parallel-list length mismatch: tableNames=%d tableUuids=%d", + tables.length, uuids.length)); } List entries = new ArrayList<>(tables.length); for (int i = 0; i < tables.length; i++) { @@ -410,8 +436,8 @@ static List buildEntries(String tableNames, String operationIds, Str entries.add( BatchEntry.builder() .fqtn(fqtn) - .operationId(ops[i].trim()) - .tableUuid(uuids[i].trim()) + .operationId(ops == null ? null : ops[i].trim()) + .tableUuid(uuids == null ? null : uuids[i].trim()) .databaseName(dbAndTable[0]) .tableName(dbAndTable[1]) .build()); @@ -419,6 +445,10 @@ static List buildEntries(String tableNames, String operationIds, Str return entries; } + private static boolean isBlank(String s) { + return s == null || s.isEmpty(); + } + private static String requireOption(CommandLine cmdLine, String name) { String value = cmdLine.getOptionValue(name); if (value == null || value.isEmpty()) { diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/TableMetadataBatch.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/TableMetadataBatch.java new file mode 100644 index 000000000..5eac1da9b --- /dev/null +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/TableMetadataBatch.java @@ -0,0 +1,42 @@ +package com.linkedin.openhouse.jobs.util; + +import java.util.Collections; +import java.util.List; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NonNull; +import lombok.ToString; +import lombok.experimental.SuperBuilder; + +/** + * A group of {@link TableMetadata} that share a database and will be processed in a single batched + * Spark job (e.g. {@code BatchedOrphanFilesDeletionSparkApp}). + * + *

By design the scheduler never crosses database boundaries when bin-packing — every table in + * {@link #tables} has the same {@link #dbName}. The {@link + * com.linkedin.openhouse.jobs.util.binpack.FirstFitDecreasingBinPacker} is invoked per-database; + * each emitted bin becomes one {@code TableMetadataBatch}. + */ +@Getter +@SuperBuilder +@EqualsAndHashCode(callSuper = true) +@ToString(callSuper = true) +public class TableMetadataBatch extends Metadata { + @NonNull protected String dbName; + @NonNull protected List tables; + + /** + * Identifier used in metrics and the Jobs Service {@code jobName} — combines the database with + * the bin size so logs/dashboards distinguish bins of different fan-outs without exposing every + * fqtn. + */ + @Override + public String getEntityName() { + return String.format("%s[%d]", dbName, tables.size()); + } + + /** Unmodifiable view of the underlying tables list. */ + public List getTables() { + return Collections.unmodifiableList(tables); + } +} diff --git a/apps/spark/src/test/java/com/linkedin/openhouse/jobs/scheduler/tasks/BatchedTableOrphanFilesDeletionTaskTest.java b/apps/spark/src/test/java/com/linkedin/openhouse/jobs/scheduler/tasks/BatchedTableOrphanFilesDeletionTaskTest.java new file mode 100644 index 000000000..134720c49 --- /dev/null +++ b/apps/spark/src/test/java/com/linkedin/openhouse/jobs/scheduler/tasks/BatchedTableOrphanFilesDeletionTaskTest.java @@ -0,0 +1,91 @@ +package com.linkedin.openhouse.jobs.scheduler.tasks; + +import com.linkedin.openhouse.jobs.client.JobsClient; +import com.linkedin.openhouse.jobs.client.TablesClient; +import com.linkedin.openhouse.jobs.client.model.JobConf; +import com.linkedin.openhouse.jobs.util.TableMetadata; +import com.linkedin.openhouse.jobs.util.TableMetadataBatch; +import java.util.Arrays; +import java.util.Collections; +import java.util.Optional; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +public class BatchedTableOrphanFilesDeletionTaskTest { + + @Test + public void operationTypeIsBatchedOfd() { + Assertions.assertEquals( + JobConf.JobTypeEnum.ORPHAN_FILES_DELETION_BATCH, + BatchedTableOrphanFilesDeletionTask.OPERATION_TYPE); + } + + @Test + public void getArgsBuildsCsvOfFqtns() { + TableMetadataBatch batch = + TableMetadataBatch.builder() + .dbName("db") + .tables(Arrays.asList(table("db", "t1"), table("db", "t2"), table("db", "t3"))) + .build(); + BatchedTableOrphanFilesDeletionTask task = + new BatchedTableOrphanFilesDeletionTask( + Mockito.mock(JobsClient.class), Mockito.mock(TablesClient.class), batch); + + Assertions.assertEquals(Arrays.asList("--tableNames", "db.t1,db.t2,db.t3"), task.getArgs()); + } + + @Test + public void shouldRunFalseForEmptyBatch() { + TableMetadataBatch batch = + TableMetadataBatch.builder().dbName("db").tables(Collections.emptyList()).build(); + BatchedTableOrphanFilesDeletionTask task = + new BatchedTableOrphanFilesDeletionTask( + Mockito.mock(JobsClient.class), Mockito.mock(TablesClient.class), batch); + + Assertions.assertFalse(task.shouldRun()); + } + + @Test + @SuppressWarnings("unchecked") + public void launchJobUsesBatchScopedJobName() { + JobsClient jobsClient = Mockito.mock(JobsClient.class); + Mockito.when( + jobsClient.launch( + Mockito.anyString(), + Mockito.eq(JobConf.JobTypeEnum.ORPHAN_FILES_DELETION_BATCH), + Mockito.anyString(), + Mockito.anyMap(), + Mockito.anyList())) + .thenReturn(Optional.of("job-42")); + + TableMetadataBatch batch = + TableMetadataBatch.builder() + .dbName("warehouse") + .tables(Arrays.asList(table("warehouse", "a"), table("warehouse", "b"))) + .build(); + BatchedTableOrphanFilesDeletionTask task = + new BatchedTableOrphanFilesDeletionTask( + jobsClient, Mockito.mock(TablesClient.class), batch); + + boolean launched = task.launchJob(); + + Assertions.assertTrue(launched); + Assertions.assertEquals("job-42", task.getJobId()); + ArgumentCaptor jobNameCaptor = ArgumentCaptor.forClass(String.class); + Mockito.verify(jobsClient) + .launch( + jobNameCaptor.capture(), + Mockito.eq(JobConf.JobTypeEnum.ORPHAN_FILES_DELETION_BATCH), + Mockito.anyString(), + Mockito.anyMap(), + Mockito.anyList()); + // Job name carries db + batch size; nothing per-table is embedded so the string stays bounded. + Assertions.assertEquals("ORPHAN_FILES_DELETION_BATCH_warehouse_2", jobNameCaptor.getValue()); + } + + private static TableMetadata table(String db, String name) { + return TableMetadata.builder().dbName(db).tableName(name).creator("test-user").build(); + } +} diff --git a/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/BatchedOrphanFilesDeletionSparkAppArgsTest.java b/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/BatchedOrphanFilesDeletionSparkAppArgsTest.java index 7a32e503f..73a02017f 100644 --- a/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/BatchedOrphanFilesDeletionSparkAppArgsTest.java +++ b/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/BatchedOrphanFilesDeletionSparkAppArgsTest.java @@ -46,23 +46,34 @@ public void buildEntriesRejectsMismatchedLengths() { } @Test - public void buildEntriesRejectsNullArguments() { + public void buildEntriesRejectsNullOrEmptyTableNames() { Assertions.assertThrows( IllegalArgumentException.class, () -> BatchedOrphanFilesDeletionSparkApp.buildEntries(null, "op-1", "uuid-1")); Assertions.assertThrows( IllegalArgumentException.class, - () -> BatchedOrphanFilesDeletionSparkApp.buildEntries("db.a", null, "uuid-1")); - Assertions.assertThrows( - IllegalArgumentException.class, - () -> BatchedOrphanFilesDeletionSparkApp.buildEntries("db.a", "op-1", null)); + () -> BatchedOrphanFilesDeletionSparkApp.buildEntries("", "op-1", "uuid-1")); } @Test - public void buildEntriesRejectsEmptyStrings() { - Assertions.assertThrows( - IllegalArgumentException.class, - () -> BatchedOrphanFilesDeletionSparkApp.buildEntries("", "op-1", "uuid-1")); + public void buildEntriesAllowsAbsentOperationIdsAndTableUuids() { + // The legacy JobsScheduler path doesn't know about optimizer-service operationIds or table + // UUIDs — it just passes the tables. Both null and empty should produce entries with null + // optional fields, no exception. + List entriesNull = + BatchedOrphanFilesDeletionSparkApp.buildEntries("db.a,db.b", null, null); + List entriesEmpty = + BatchedOrphanFilesDeletionSparkApp.buildEntries("db.a,db.b", "", ""); + + for (List entries : + java.util.Arrays.asList(entriesNull, entriesEmpty)) { + Assertions.assertEquals(2, entries.size()); + Assertions.assertEquals("db.a", entries.get(0).getFqtn()); + Assertions.assertNull(entries.get(0).getOperationId()); + Assertions.assertNull(entries.get(0).getTableUuid()); + Assertions.assertEquals("db.b", entries.get(1).getFqtn()); + Assertions.assertNull(entries.get(1).getOperationId()); + } } @Test @@ -71,4 +82,32 @@ public void buildEntriesRejectsNonFqtn() { IllegalArgumentException.class, () -> BatchedOrphanFilesDeletionSparkApp.buildEntries("just_a_table", "op-1", "uuid-1")); } + + @Test + public void buildEntriesAcceptsAtMaxBatchSize() { + String tableNames = generateFqtnCsv(BatchedOrphanFilesDeletionSparkApp.MAX_BATCH_SIZE); + List entries = + BatchedOrphanFilesDeletionSparkApp.buildEntries(tableNames, null, null); + Assertions.assertEquals(BatchedOrphanFilesDeletionSparkApp.MAX_BATCH_SIZE, entries.size()); + } + + @Test + public void buildEntriesRejectsAboveMaxBatchSize() { + String tableNames = generateFqtnCsv(BatchedOrphanFilesDeletionSparkApp.MAX_BATCH_SIZE + 1); + IllegalArgumentException ex = + Assertions.assertThrows( + IllegalArgumentException.class, + () -> BatchedOrphanFilesDeletionSparkApp.buildEntries(tableNames, null, null)); + Assertions.assertTrue( + ex.getMessage().contains("MAX_BATCH_SIZE"), "error should reference the constant name"); + } + + private static String generateFqtnCsv(int n) { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < n; i++) { + if (i > 0) sb.append(','); + sb.append("db.t").append(i); + } + return sb.toString(); + } } diff --git a/infra/recipes/docker-compose/oh-hadoop-spark/jobs.yaml b/infra/recipes/docker-compose/oh-hadoop-spark/jobs.yaml index 3e795d4db..58cce65ce 100644 --- a/infra/recipes/docker-compose/oh-hadoop-spark/jobs.yaml +++ b/infra/recipes/docker-compose/oh-hadoop-spark/jobs.yaml @@ -58,6 +58,14 @@ jobs: <<: *spark-defaults "spark.driver.memory": "2g" << : *livy-engine + - type: ORPHAN_FILES_DELETION_BATCH + class-name: com.linkedin.openhouse.jobs.spark.BatchedOrphanFilesDeletionSparkApp + args: ["--backupDir", ".backup"] + <<: *apps-defaults + spark-properties: + <<: *spark-defaults + "spark.driver.memory": "2g" + << : *livy-engine - type: STAGED_FILES_DELETION class-name: com.linkedin.openhouse.jobs.spark.StagedFilesDeletionSparkApp args: ["--trashDir", ".trash", "--daysOld", "10", "--recursive", "true"] diff --git a/services/jobs/src/main/java/com/linkedin/openhouse/jobs/model/JobConf.java b/services/jobs/src/main/java/com/linkedin/openhouse/jobs/model/JobConf.java index 4c2d2b3da..36764bb84 100644 --- a/services/jobs/src/main/java/com/linkedin/openhouse/jobs/model/JobConf.java +++ b/services/jobs/src/main/java/com/linkedin/openhouse/jobs/model/JobConf.java @@ -27,6 +27,12 @@ public enum JobType { SQL_TEST, RETENTION, ORPHAN_FILES_DELETION, + /** + * Multi-table orphan-files-deletion. One Spark job processes a list of tables grouped by + * database — bin-packing happens scheduler-side. See {@code + * BatchedOrphanFilesDeletionSparkApp}. + */ + ORPHAN_FILES_DELETION_BATCH, SNAPSHOTS_EXPIRATION, STAGED_FILES_DELETION, DATA_COMPACTION, From 33a4bcd81f103859d767d40732e9fbacc9e3f7b8 Mon Sep 17 00:00:00 2001 From: Abhishek Nath Date: Tue, 9 Jun 2026 20:39:55 -0700 Subject: [PATCH 2/4] Fix for import and build issues --- apps/spark-3.5/build.gradle | 4 ++ .../tasks/OperationTasksBuilder.java | 69 ++++++++++++------- .../jobs/util/TableMetadataBatch.java | 2 +- 3 files changed, 50 insertions(+), 25 deletions(-) diff --git a/apps/spark-3.5/build.gradle b/apps/spark-3.5/build.gradle index 6fa81cfb4..d904079a2 100644 --- a/apps/spark-3.5/build.gradle +++ b/apps/spark-3.5/build.gradle @@ -30,6 +30,10 @@ dependencies { implementation(project(':libs:datalayout')) { exclude group: 'com.linkedin.iceberg', module: 'iceberg-spark-runtime-3.1_2.12' } + // Exclude log4j-slf4j2-impl: incompatible with the log4j-slf4j-impl (1.x) bridge this app ships. + implementation(project(':libs:optimizer:binpack')) { + exclude group: 'org.apache.logging.log4j', module: 'log4j-slf4j2-impl' + } implementation ('org.apache.spark:spark-core_2.12:' + sparkVersion) { exclude group: 'io.netty' exclude group: 'org.apache.hadoop', module: 'hadoop-common' diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/OperationTasksBuilder.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/OperationTasksBuilder.java index 7de5bf926..cba88bcf7 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/OperationTasksBuilder.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/OperationTasksBuilder.java @@ -18,9 +18,10 @@ import com.linkedin.openhouse.jobs.util.TableDataLayoutMetadata; import com.linkedin.openhouse.jobs.util.TableMetadata; import com.linkedin.openhouse.jobs.util.TableMetadataBatch; -import com.linkedin.openhouse.jobs.util.binpack.Bin; -import com.linkedin.openhouse.jobs.util.binpack.BinItem; -import com.linkedin.openhouse.jobs.util.binpack.FirstFitDecreasingBinPacker; +import com.linkedin.openhouse.optimizer.binpack.BinItem; +import com.linkedin.openhouse.optimizer.binpack.FirstFitDecreasingBinPacker; +import com.linkedin.openhouse.optimizer.model.TableOperationDto; +import com.linkedin.openhouse.optimizer.model.TableStatsDto; import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; import java.util.ArrayList; @@ -102,13 +103,10 @@ private List> prepareBatchedOrphanFilesDeletionTaskList( eligible.size(), maxItemsPerBin); - FirstFitDecreasingBinPacker packer = - FirstFitDecreasingBinPacker.builder() - .maxItemsPerBin(maxItemsPerBin) - // Item-count cap only; weight/size dimensions disabled until table_stats is wired in. - .maxWeightPerBin(0) - .maxSizeBytesPerBin(0) - .build(); + // Item-count cap only; the libs default maxWeightPerBin (1_000_000) is effectively unbounded + // for weight=1 items, so item-count is the active constraint until table_stats is wired in. + FirstFitDecreasingBinPacker packer = + FirstFitDecreasingBinPacker.builder().maxItemsPerBin(maxItemsPerBin).build(); Map> byDb = eligible.stream().collect(Collectors.groupingBy(TableMetadata::getDbName)); @@ -118,25 +116,15 @@ private List> prepareBatchedOrphanFilesDeletionTaskList( String dbName = dbGroup.getKey(); List items = dbGroup.getValue().stream() - .map( - t -> - BinItem.builder() - .fqtn(t.fqtn()) - .operationId("") - .tableUuid("") - .databaseName(t.getDbName()) - .tableName(t.getTableName()) - .weight(1L) - .sizeBytes(0L) - .build()) + .map(t -> (BinItem) new FqtnBinItem(t.fqtn())) .collect(Collectors.toList()); - for (Bin bin : packer.pack(items)) { + for (List group : packer.pack(items)) { List tablesForBin = - bin.items().stream() + group.stream() .map( item -> dbGroup.getValue().stream() - .filter(t -> t.fqtn().equals(item.getFqtn())) + .filter(t -> t.fqtn().equals(item.getFullyQualifiedTableName())) .findFirst() .orElseThrow(() -> new IllegalStateException("missing table for bin"))) .collect(Collectors.toList()); @@ -539,4 +527,37 @@ private void buildDatabaseLevelOperationTasksInParallel( }) .subscribe(); } + + /** + * Minimal {@link BinItem} for the legacy scheduler path: every item has weight 1, so packing is + * driven purely by {@code maxItemsPerBin}. {@code fromOpAndStats} is unreachable here because we + * call {@code packer.pack(List)} (the pre-projected overload), not the projection path. + */ + private static final class FqtnBinItem implements BinItem { + private final String fqtn; + + FqtnBinItem(String fqtn) { + this.fqtn = fqtn; + } + + @Override + public long getWeight() { + return 1L; + } + + @Override + public String getFullyQualifiedTableName() { + return fqtn; + } + + @Override + public String getOperationId() { + return ""; + } + + @Override + public BinItem fromOpAndStats(TableOperationDto op, TableStatsDto stats) { + return this; + } + } } diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/TableMetadataBatch.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/TableMetadataBatch.java index 5eac1da9b..b388dac9d 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/TableMetadataBatch.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/TableMetadataBatch.java @@ -14,7 +14,7 @@ * *

By design the scheduler never crosses database boundaries when bin-packing — every table in * {@link #tables} has the same {@link #dbName}. The {@link - * com.linkedin.openhouse.jobs.util.binpack.FirstFitDecreasingBinPacker} is invoked per-database; + * com.linkedin.openhouse.optimizer.binpack.FirstFitDecreasingBinPacker} is invoked per-database; * each emitted bin becomes one {@code TableMetadataBatch}. */ @Getter From 7398b9fdb1a6989449fd9e03bd1bd18e5aba02bc Mon Sep 17 00:00:00 2001 From: Abhishek Nath Date: Wed, 10 Jun 2026 16:38:28 -0700 Subject: [PATCH 3/4] Removed unused constants --- .../jobs/spark/BatchedOrphanFilesDeletionSparkApp.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/BatchedOrphanFilesDeletionSparkApp.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/BatchedOrphanFilesDeletionSparkApp.java index 3dcf48cef..fd92e7373 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/BatchedOrphanFilesDeletionSparkApp.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/BatchedOrphanFilesDeletionSparkApp.java @@ -57,9 +57,6 @@ @Slf4j public class BatchedOrphanFilesDeletionSparkApp extends BaseSparkApp { - private static final String OPERATION_TYPE = "ORPHAN_FILES_DELETION"; - private static final String STATUS_SUCCESS = "SUCCESS"; - private static final String STATUS_FAILED = "FAILED"; private static final int DEFAULT_MAX_ORPHAN_FILE_SAMPLE_SIZE = 20000; private static final int DEFAULT_MIN_OFD_TTL_IN_DAYS = 3; From 7a8624a569c7d06c9682276d12e1f55fcd13e5ce Mon Sep 17 00:00:00 2001 From: Abhishek Nath Date: Wed, 10 Jun 2026 19:40:00 -0700 Subject: [PATCH 4/4] Updated setup.md with batched ofd scheduler run instruction --- SETUP.md | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/SETUP.md b/SETUP.md index e3988e9b8..3de74a4e7 100644 --- a/SETUP.md +++ b/SETUP.md @@ -618,6 +618,40 @@ docker compose --profile with_jobs_scheduler run openhouse-jobs-scheduler - \ > Try HTTP plugin in IntelliJ to trigger /jobs service local endpoint in local mode by running HTTP scripts in services/jobs/src/test/http/. +### Test batched orphan file deletion through job-scheduler + +The batched OFD scheduler runs orphan-files-deletion across multiple tables in a single Spark job, bin-packed per database. Builds on top of the table you created in [Test through Spark-shell](#test-through-spark-shell). + +1. **Manufacture an orphan file** that's older than the default OFD TTL (7 days). From the spark-shell session: + ```scala + scala> val fs = org.apache.hadoop.fs.FileSystem.get(spark.sparkContext.hadoopConfiguration) + scala> val orphan = new org.apache.hadoop.fs.Path("/data/openhouse/db/tb/data/test_orphan.orc") + scala> fs.createNewFile(orphan) + scala> fs.setTimes(orphan, System.currentTimeMillis() - 8L*24L*3600L*1000L, -1) // 8 days old + ``` + +2. **Build and run the batched scheduler.** + ``` + docker compose --profile with_jobs_scheduler build + docker compose --profile with_jobs_scheduler run openhouse-jobs-scheduler - \ + --type ORPHAN_FILES_DELETION_BATCH --cluster local \ + --tablesURL http://openhouse-tables:8080 --jobsURL http://openhouse-jobs:8080 \ + --batchMaxItems 5 \ + --tableMinAgeThresholdHours 0 --taskPollIntervalMs 5000 + ``` + +> [!NOTE] +> The orphan file at `/data/openhouse/db/tb/data/test_orphan.orc` should be gone after the job completes. Check the scheduler logs for `Packed N eligible tables into M batches` and the Spark app logs for `OFD success: fqtn=db.tb orphansDetected=1`. + +> [!NOTE] +> The scheduler groups tables by database before bin-packing — no batch ever crosses a database. `--batchMaxItems` caps tables per batch (default 25; the Spark app enforces a hard ceiling of `MAX_BATCH_SIZE=200`). + +> [!NOTE] +> To list files under the table from the HDFS namenode container: +> ``` +> docker exec -it local.namenode hdfs dfs -ls -R /data/openhouse/db/tb +> ``` + ## FAQs ### Q. My docker setup fails to create LLB definition.