diff --git a/SETUP.md b/SETUP.md
index e3988e9b8..3de74a4e7 100644
--- a/SETUP.md
+++ b/SETUP.md
@@ -618,6 +618,40 @@ docker compose --profile with_jobs_scheduler run openhouse-jobs-scheduler - \
> Try HTTP plugin in IntelliJ to trigger /jobs service local endpoint in local mode by running HTTP scripts in
services/jobs/src/test/http/.
+### Test batched orphan file deletion through job-scheduler
+
+The batched OFD scheduler runs orphan-files-deletion across multiple tables in a single Spark job, bin-packed per database. Builds on top of the table you created in [Test through Spark-shell](#test-through-spark-shell).
+
+1. **Manufacture an orphan file** that's older than the default OFD TTL (7 days). From the spark-shell session:
+ ```scala
+ scala> val fs = org.apache.hadoop.fs.FileSystem.get(spark.sparkContext.hadoopConfiguration)
+ scala> val orphan = new org.apache.hadoop.fs.Path("/data/openhouse/db/tb/data/test_orphan.orc")
+ scala> fs.createNewFile(orphan)
+ scala> fs.setTimes(orphan, System.currentTimeMillis() - 8L*24L*3600L*1000L, -1) // 8 days old
+ ```
+
+2. **Build and run the batched scheduler.**
+ ```
+ docker compose --profile with_jobs_scheduler build
+ docker compose --profile with_jobs_scheduler run openhouse-jobs-scheduler - \
+ --type ORPHAN_FILES_DELETION_BATCH --cluster local \
+ --tablesURL http://openhouse-tables:8080 --jobsURL http://openhouse-jobs:8080 \
+ --batchMaxItems 5 \
+ --tableMinAgeThresholdHours 0 --taskPollIntervalMs 5000
+ ```
+
+> [!NOTE]
+> The orphan file at `/data/openhouse/db/tb/data/test_orphan.orc` should be gone after the job completes. Check the scheduler logs for `Packed N eligible tables into M batches` and the Spark app logs for `OFD success: fqtn=db.tb orphansDetected=1`.
+
+> [!NOTE]
+> The scheduler groups tables by database before bin-packing — no batch ever crosses a database. `--batchMaxItems` caps tables per batch (default 25; the Spark app enforces a hard ceiling of `MAX_BATCH_SIZE=200`).
+
+> [!NOTE]
+> To list files under the table from the HDFS namenode container:
+> ```
+> docker exec -it local.namenode hdfs dfs -ls -R /data/openhouse/db/tb
+> ```
+
## FAQs
### Q. My docker setup fails to create LLB definition.
diff --git a/apps/spark-3.5/build.gradle b/apps/spark-3.5/build.gradle
index 6fa81cfb4..d904079a2 100644
--- a/apps/spark-3.5/build.gradle
+++ b/apps/spark-3.5/build.gradle
@@ -30,6 +30,10 @@ dependencies {
implementation(project(':libs:datalayout')) {
exclude group: 'com.linkedin.iceberg', module: 'iceberg-spark-runtime-3.1_2.12'
}
+ // Exclude log4j-slf4j2-impl: incompatible with the log4j-slf4j-impl (1.x) bridge this app ships.
+ implementation(project(':libs:optimizer:binpack')) {
+ exclude group: 'org.apache.logging.log4j', module: 'log4j-slf4j2-impl'
+ }
implementation ('org.apache.spark:spark-core_2.12:' + sparkVersion) {
exclude group: 'io.netty'
exclude group: 'org.apache.hadoop', module: 'hadoop-common'
diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/JobsScheduler.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/JobsScheduler.java
index 2067fa014..361116288 100644
--- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/JobsScheduler.java
+++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/JobsScheduler.java
@@ -583,6 +583,13 @@ protected static CommandLine parseArgs(String[] args) {
.longOpt(OperationTasksBuilder.MAX_COST_BUDGET_GB_HRS)
.desc("Maximum compute cost budget in GB hours")
.build());
+ options.addOption(
+ Option.builder(null)
+ .required(false)
+ .hasArg()
+ .longOpt(OperationTasksBuilder.BATCH_MAX_ITEMS)
+ .desc("Max tables per batched OFD job (ORPHAN_FILES_DELETION_BATCH only)")
+ .build());
options.addOption(
Option.builder(null)
.required(false)
@@ -744,6 +751,11 @@ protected static Properties getAdditionalProperties(CommandLine cmdLine) {
OperationTasksBuilder.MAX_STRATEGIES_COUNT,
cmdLine.getOptionValue(OperationTasksBuilder.MAX_STRATEGIES_COUNT));
}
+ if (cmdLine.hasOption(OperationTasksBuilder.BATCH_MAX_ITEMS)) {
+ result.setProperty(
+ OperationTasksBuilder.BATCH_MAX_ITEMS,
+ cmdLine.getOptionValue(OperationTasksBuilder.BATCH_MAX_ITEMS));
+ }
return result;
}
diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/BatchedTableOrphanFilesDeletionTask.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/BatchedTableOrphanFilesDeletionTask.java
new file mode 100644
index 000000000..7c2e13c1b
--- /dev/null
+++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/BatchedTableOrphanFilesDeletionTask.java
@@ -0,0 +1,79 @@
+package com.linkedin.openhouse.jobs.scheduler.tasks;
+
+import com.linkedin.openhouse.jobs.client.JobsClient;
+import com.linkedin.openhouse.jobs.client.TablesClient;
+import com.linkedin.openhouse.jobs.client.model.JobConf;
+import com.linkedin.openhouse.jobs.util.TableMetadata;
+import com.linkedin.openhouse.jobs.util.TableMetadataBatch;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * A task to remove orphan files from a batch of tables in a single Spark job. Pairs with {@code
+ * com.linkedin.openhouse.jobs.spark.BatchedOrphanFilesDeletionSparkApp} via the {@link
+ * JobConf.JobTypeEnum#ORPHAN_FILES_DELETION_BATCH} JobType.
+ *
+ *
The legacy {@link com.linkedin.openhouse.jobs.scheduler.JobsScheduler} pre-dates the optimizer
+ * service, so this task omits the optimizer-only CLI flags ({@code --resultsEndpoint}, {@code
+ * --operationIds}, {@code --tableUuids}). The Spark app treats them as optional and falls back to
+ * HTS-only lifecycle tracking when they are absent.
+ *
+ * @see Delete
+ * orphan files
+ */
+@Slf4j
+@Getter
+public class BatchedTableOrphanFilesDeletionTask extends OperationTask {
+ public static final JobConf.JobTypeEnum OPERATION_TYPE =
+ JobConf.JobTypeEnum.ORPHAN_FILES_DELETION_BATCH;
+
+ public BatchedTableOrphanFilesDeletionTask(
+ JobsClient jobsClient,
+ TablesClient tablesClient,
+ TableMetadataBatch metadata,
+ long pollIntervalMs,
+ long queuedTimeoutMs,
+ long taskTimeoutMs) {
+ super(jobsClient, tablesClient, metadata, pollIntervalMs, queuedTimeoutMs, taskTimeoutMs);
+ }
+
+ public BatchedTableOrphanFilesDeletionTask(
+ JobsClient jobsClient, TablesClient tablesClient, TableMetadataBatch metadata) {
+ super(jobsClient, tablesClient, metadata);
+ }
+
+ @Override
+ public JobConf.JobTypeEnum getType() {
+ return OPERATION_TYPE;
+ }
+
+ @Override
+ protected List getArgs() {
+ String tableNames =
+ metadata.getTables().stream().map(TableMetadata::fqtn).collect(Collectors.joining(","));
+ return Arrays.asList("--tableNames", tableNames);
+ }
+
+ @Override
+ protected boolean shouldRun() {
+ return !metadata.getTables().isEmpty();
+ }
+
+ @Override
+ protected boolean launchJob() {
+ String jobName =
+ String.format("%s_%s_%d", getType(), metadata.getDbName(), metadata.getTables().size());
+ Map executionProperties = Collections.emptyMap();
+ String proxyUser = metadata.getTables().get(0).getCreator();
+ jobId =
+ jobsClient
+ .launch(jobName, getType(), proxyUser, executionProperties, getArgs())
+ .orElse(null);
+ return jobId != null;
+ }
+}
diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/OperationTasksBuilder.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/OperationTasksBuilder.java
index 37e3df399..cba88bcf7 100644
--- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/OperationTasksBuilder.java
+++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/OperationTasksBuilder.java
@@ -9,6 +9,7 @@
import com.linkedin.openhouse.jobs.client.TablesClient;
import com.linkedin.openhouse.jobs.client.model.JobConf;
import com.linkedin.openhouse.jobs.scheduler.JobsScheduler;
+import com.linkedin.openhouse.jobs.spark.BatchedOrphanFilesDeletionSparkApp;
import com.linkedin.openhouse.jobs.util.AppConstants;
import com.linkedin.openhouse.jobs.util.DataLayoutUtil;
import com.linkedin.openhouse.jobs.util.DatabaseMetadata;
@@ -16,10 +17,16 @@
import com.linkedin.openhouse.jobs.util.Metadata;
import com.linkedin.openhouse.jobs.util.TableDataLayoutMetadata;
import com.linkedin.openhouse.jobs.util.TableMetadata;
+import com.linkedin.openhouse.jobs.util.TableMetadataBatch;
+import com.linkedin.openhouse.optimizer.binpack.BinItem;
+import com.linkedin.openhouse.optimizer.binpack.FirstFitDecreasingBinPacker;
+import com.linkedin.openhouse.optimizer.model.TableOperationDto;
+import com.linkedin.openhouse.optimizer.model.TableStatsDto;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
@@ -40,10 +47,12 @@
public class OperationTasksBuilder {
public static final String MAX_COST_BUDGET_GB_HRS = "maxCostBudgetGbHrs";
public static final String MAX_STRATEGIES_COUNT = "maxStrategiesCount";
+ public static final String BATCH_MAX_ITEMS = "batchMaxItems";
private static final double COMPUTE_COST_WEIGHT_DEFAULT = 0.3;
private static final double COMPACTION_GAIN_WEIGHT_DEFAULT = 0.7;
private static final double MAX_COST_BUDGET_GB_HRS_DEFAULT = 1000.0;
private static final int MAX_STRATEGIES_COUNT_DEFAULT = 10;
+ private static final int BATCH_MAX_ITEMS_DEFAULT = 25;
private static final String METRICS_SCOPE = JobsScheduler.class.getName();
private final OperationTaskFactory extends OperationTask>> taskFactory;
@@ -65,6 +74,67 @@ private List> prepareTableOperationTaskList(
return processMetadataList(tableMetadataList, jobType, operationMode, otelEmitter);
}
+ /**
+ * Builds one {@link BatchedTableOrphanFilesDeletionTask} per database-scoped bin. Groups eligible
+ * tables by database (batches never cross databases), then applies the first-fit-decreasing bin
+ * packer with a per-bin item cap from {@code properties} (defaults to {@value
+ * #BATCH_MAX_ITEMS_DEFAULT}). Tables with the maintenance op disabled are filtered out before
+ * grouping.
+ */
+ private List> prepareBatchedOrphanFilesDeletionTaskList(
+ JobConf.JobTypeEnum jobType,
+ Properties properties,
+ OperationMode operationMode,
+ OtelEmitter otelEmitter) {
+ int maxItemsPerBin =
+ NumberUtils.toInt(properties.getProperty(BATCH_MAX_ITEMS), BATCH_MAX_ITEMS_DEFAULT);
+ if (maxItemsPerBin > BatchedOrphanFilesDeletionSparkApp.MAX_BATCH_SIZE) {
+ throw new IllegalArgumentException(
+ String.format(
+ "--%s=%d exceeds Spark-app ceiling MAX_BATCH_SIZE=%d",
+ BATCH_MAX_ITEMS, maxItemsPerBin, BatchedOrphanFilesDeletionSparkApp.MAX_BATCH_SIZE));
+ }
+ List eligible =
+ tablesClient.getTableMetadataList().stream()
+ .filter(t -> !t.isMaintenanceJobDisabled(jobType))
+ .collect(Collectors.toList());
+ log.info(
+ "Fetched metadata for {} batched-OFD-eligible tables; binMaxItems={}",
+ eligible.size(),
+ maxItemsPerBin);
+
+ // Item-count cap only; the libs default maxWeightPerBin (1_000_000) is effectively unbounded
+ // for weight=1 items, so item-count is the active constraint until table_stats is wired in.
+ FirstFitDecreasingBinPacker packer =
+ FirstFitDecreasingBinPacker.builder().maxItemsPerBin(maxItemsPerBin).build();
+
+ Map> byDb =
+ eligible.stream().collect(Collectors.groupingBy(TableMetadata::getDbName));
+
+ List batches = new ArrayList<>();
+ for (Map.Entry> dbGroup : byDb.entrySet()) {
+ String dbName = dbGroup.getKey();
+ List items =
+ dbGroup.getValue().stream()
+ .map(t -> (BinItem) new FqtnBinItem(t.fqtn()))
+ .collect(Collectors.toList());
+ for (List group : packer.pack(items)) {
+ List tablesForBin =
+ group.stream()
+ .map(
+ item ->
+ dbGroup.getValue().stream()
+ .filter(t -> t.fqtn().equals(item.getFullyQualifiedTableName()))
+ .findFirst()
+ .orElseThrow(() -> new IllegalStateException("missing table for bin")))
+ .collect(Collectors.toList());
+ batches.add(TableMetadataBatch.builder().dbName(dbName).tables(tablesForBin).build());
+ }
+ }
+ log.info("Packed {} eligible tables into {} batches", eligible.size(), batches.size());
+ return processMetadataList(batches, jobType, operationMode, otelEmitter);
+ }
+
private List> prepareReplicationOperationTaskList(
JobConf.JobTypeEnum jobType, OperationMode operationMode, OtelEmitter otelEmitter) {
List replicationSetupTableMetadataList = tablesClient.getTableMetadataList();
@@ -272,6 +342,9 @@ public List> buildOperationTaskList(
case DATA_LAYOUT_STRATEGY_GENERATION:
case SORT_STATS_COLLECTION:
return prepareTableOperationTaskList(jobType, operationMode, otelEmitter);
+ case ORPHAN_FILES_DELETION_BATCH:
+ return prepareBatchedOrphanFilesDeletionTaskList(
+ jobType, properties, operationMode, otelEmitter);
case REPLICATION:
return prepareReplicationOperationTaskList(jobType, operationMode, otelEmitter);
case DATA_LAYOUT_STRATEGY_EXECUTION:
@@ -300,6 +373,22 @@ public void buildOperationTaskListInParallel(
buildDataLayoutOperationTaskListInParallel(jobType, properties, operationMode, otelEmitter);
} else if (jobType == JobConf.JobTypeEnum.TABLE_DIRECTORY_DELETION) {
buildDatabaseLevelOperationTasksInParallel(jobType, operationMode, otelEmitter);
+ } else if (jobType == JobConf.JobTypeEnum.ORPHAN_FILES_DELETION_BATCH) {
+ // Batched OFD needs the full table set in hand before it can group-by-db and bin-pack,
+ // so we use the synchronous fetch path then enqueue the tasks in bulk.
+ List> tasks =
+ prepareBatchedOrphanFilesDeletionTaskList(
+ jobType, properties, operationMode, otelEmitter);
+ for (OperationTask> task : tasks) {
+ try {
+ operationTaskManager.addData(task);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.warn("Interrupted while enqueueing batched OFD task", e);
+ }
+ }
+ operationTaskManager.updateDataGenerationCompletion();
+ log.info("Enqueued {} batched OFD tasks for job type: {}", tasks.size(), jobType);
} else {
buildOperationTaskListInParallelInternal(jobType, operationMode, otelEmitter);
}
@@ -438,4 +527,37 @@ private void buildDatabaseLevelOperationTasksInParallel(
})
.subscribe();
}
+
+ /**
+ * Minimal {@link BinItem} for the legacy scheduler path: every item has weight 1, so packing is
+ * driven purely by {@code maxItemsPerBin}. {@code fromOpAndStats} is unreachable here because we
+ * call {@code packer.pack(List)} (the pre-projected overload), not the projection path.
+ */
+ private static final class FqtnBinItem implements BinItem {
+ private final String fqtn;
+
+ FqtnBinItem(String fqtn) {
+ this.fqtn = fqtn;
+ }
+
+ @Override
+ public long getWeight() {
+ return 1L;
+ }
+
+ @Override
+ public String getFullyQualifiedTableName() {
+ return fqtn;
+ }
+
+ @Override
+ public String getOperationId() {
+ return "";
+ }
+
+ @Override
+ public BinItem fromOpAndStats(TableOperationDto op, TableStatsDto stats) {
+ return this;
+ }
+ }
}
diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/BatchedOrphanFilesDeletionSparkApp.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/BatchedOrphanFilesDeletionSparkApp.java
index ea577710f..fd92e7373 100644
--- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/BatchedOrphanFilesDeletionSparkApp.java
+++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/BatchedOrphanFilesDeletionSparkApp.java
@@ -60,6 +60,17 @@ public class BatchedOrphanFilesDeletionSparkApp extends BaseSparkApp {
private static final int DEFAULT_MAX_ORPHAN_FILE_SAMPLE_SIZE = 20000;
private static final int DEFAULT_MIN_OFD_TTL_IN_DAYS = 3;
+ /**
+ * Hard ceiling on the number of tables a single batched job can carry. The wire path is parallel
+ * CSV CLI args (see {@link #buildEntries}); at ~120 chars per entry (36-char UUID × 3 lists) this
+ * gives ~24 KB on the command line, well under the typical Linux {@code ARG_MAX} of 128 KB but
+ * leaves headroom for the {@code spark-submit} envelope and JVM flags. The scheduler-driven path
+ * uses a smaller per-entry footprint but inherits the same cap for defense in depth. Operators
+ * tune the per-job batch size with {@code --batchMaxItems} (default {@code 25}); this constant is
+ * a footgun stop, not the operating point.
+ */
+ public static final int MAX_BATCH_SIZE = 200;
+
private final List entries;
private final String resultsEndpoint;
private final int driverParallelism;
@@ -178,8 +189,14 @@ private void shutdownPool(ExecutorService pool) {
}
}
+ /**
+ * Returns a client bound to {@link #resultsEndpoint}, or {@code null} when the endpoint was not
+ * configured — in that case the legacy {@link
+ * com.linkedin.openhouse.jobs.scheduler.JobsScheduler} is the caller and reports lifecycle via
+ * HTS; the per-operation optimizer callback is skipped.
+ */
protected OptimizerServiceClient newOptimizerClient() {
- return new OptimizerServiceClient(resultsEndpoint);
+ return resultsEndpoint == null ? null : new OptimizerServiceClient(resultsEndpoint);
}
/**
@@ -381,23 +398,29 @@ public static BatchedOrphanFilesDeletionSparkApp createApp(
}
static List buildEntries(String tableNames, String operationIds, String tableUuids) {
- if (tableNames == null
- || operationIds == null
- || tableUuids == null
- || tableNames.isEmpty()
- || operationIds.isEmpty()
- || tableUuids.isEmpty()) {
- throw new IllegalArgumentException(
- "--tableNames, --operationIds, and --tableUuids are all required and must be non-empty");
+ if (tableNames == null || tableNames.isEmpty()) {
+ throw new IllegalArgumentException("--tableNames is required and must be non-empty");
}
String[] tables = tableNames.split(",");
- String[] ops = operationIds.split(",");
- String[] uuids = tableUuids.split(",");
- if (tables.length != ops.length || tables.length != uuids.length) {
+ if (tables.length > MAX_BATCH_SIZE) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Batch size %d exceeds MAX_BATCH_SIZE=%d; reduce --batchMaxItems on the scheduler",
+ tables.length, MAX_BATCH_SIZE));
+ }
+ String[] ops = isBlank(operationIds) ? null : operationIds.split(",");
+ String[] uuids = isBlank(tableUuids) ? null : tableUuids.split(",");
+ if (ops != null && ops.length != tables.length) {
throw new IllegalArgumentException(
String.format(
- "Parallel-list length mismatch: tableNames=%d operationIds=%d tableUuids=%d",
- tables.length, ops.length, uuids.length));
+ "Parallel-list length mismatch: tableNames=%d operationIds=%d",
+ tables.length, ops.length));
+ }
+ if (uuids != null && uuids.length != tables.length) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Parallel-list length mismatch: tableNames=%d tableUuids=%d",
+ tables.length, uuids.length));
}
List entries = new ArrayList<>(tables.length);
for (int i = 0; i < tables.length; i++) {
@@ -410,8 +433,8 @@ static List buildEntries(String tableNames, String operationIds, Str
entries.add(
BatchEntry.builder()
.fqtn(fqtn)
- .operationId(ops[i].trim())
- .tableUuid(uuids[i].trim())
+ .operationId(ops == null ? null : ops[i].trim())
+ .tableUuid(uuids == null ? null : uuids[i].trim())
.databaseName(dbAndTable[0])
.tableName(dbAndTable[1])
.build());
@@ -419,6 +442,10 @@ static List buildEntries(String tableNames, String operationIds, Str
return entries;
}
+ private static boolean isBlank(String s) {
+ return s == null || s.isEmpty();
+ }
+
private static String requireOption(CommandLine cmdLine, String name) {
String value = cmdLine.getOptionValue(name);
if (value == null || value.isEmpty()) {
diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/TableMetadataBatch.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/TableMetadataBatch.java
new file mode 100644
index 000000000..b388dac9d
--- /dev/null
+++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/TableMetadataBatch.java
@@ -0,0 +1,42 @@
+package com.linkedin.openhouse.jobs.util;
+
+import java.util.Collections;
+import java.util.List;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.NonNull;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+
+/**
+ * A group of {@link TableMetadata} that share a database and will be processed in a single batched
+ * Spark job (e.g. {@code BatchedOrphanFilesDeletionSparkApp}).
+ *
+ * By design the scheduler never crosses database boundaries when bin-packing — every table in
+ * {@link #tables} has the same {@link #dbName}. The {@link
+ * com.linkedin.openhouse.optimizer.binpack.FirstFitDecreasingBinPacker} is invoked per-database;
+ * each emitted bin becomes one {@code TableMetadataBatch}.
+ */
+@Getter
+@SuperBuilder
+@EqualsAndHashCode(callSuper = true)
+@ToString(callSuper = true)
+public class TableMetadataBatch extends Metadata {
+ @NonNull protected String dbName;
+ @NonNull protected List tables;
+
+ /**
+ * Identifier used in metrics and the Jobs Service {@code jobName} — combines the database with
+ * the bin size so logs/dashboards distinguish bins of different fan-outs without exposing every
+ * fqtn.
+ */
+ @Override
+ public String getEntityName() {
+ return String.format("%s[%d]", dbName, tables.size());
+ }
+
+ /** Unmodifiable view of the underlying tables list. */
+ public List getTables() {
+ return Collections.unmodifiableList(tables);
+ }
+}
diff --git a/apps/spark/src/test/java/com/linkedin/openhouse/jobs/scheduler/tasks/BatchedTableOrphanFilesDeletionTaskTest.java b/apps/spark/src/test/java/com/linkedin/openhouse/jobs/scheduler/tasks/BatchedTableOrphanFilesDeletionTaskTest.java
new file mode 100644
index 000000000..134720c49
--- /dev/null
+++ b/apps/spark/src/test/java/com/linkedin/openhouse/jobs/scheduler/tasks/BatchedTableOrphanFilesDeletionTaskTest.java
@@ -0,0 +1,91 @@
+package com.linkedin.openhouse.jobs.scheduler.tasks;
+
+import com.linkedin.openhouse.jobs.client.JobsClient;
+import com.linkedin.openhouse.jobs.client.TablesClient;
+import com.linkedin.openhouse.jobs.client.model.JobConf;
+import com.linkedin.openhouse.jobs.util.TableMetadata;
+import com.linkedin.openhouse.jobs.util.TableMetadataBatch;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+public class BatchedTableOrphanFilesDeletionTaskTest {
+
+ @Test
+ public void operationTypeIsBatchedOfd() {
+ Assertions.assertEquals(
+ JobConf.JobTypeEnum.ORPHAN_FILES_DELETION_BATCH,
+ BatchedTableOrphanFilesDeletionTask.OPERATION_TYPE);
+ }
+
+ @Test
+ public void getArgsBuildsCsvOfFqtns() {
+ TableMetadataBatch batch =
+ TableMetadataBatch.builder()
+ .dbName("db")
+ .tables(Arrays.asList(table("db", "t1"), table("db", "t2"), table("db", "t3")))
+ .build();
+ BatchedTableOrphanFilesDeletionTask task =
+ new BatchedTableOrphanFilesDeletionTask(
+ Mockito.mock(JobsClient.class), Mockito.mock(TablesClient.class), batch);
+
+ Assertions.assertEquals(Arrays.asList("--tableNames", "db.t1,db.t2,db.t3"), task.getArgs());
+ }
+
+ @Test
+ public void shouldRunFalseForEmptyBatch() {
+ TableMetadataBatch batch =
+ TableMetadataBatch.builder().dbName("db").tables(Collections.emptyList()).build();
+ BatchedTableOrphanFilesDeletionTask task =
+ new BatchedTableOrphanFilesDeletionTask(
+ Mockito.mock(JobsClient.class), Mockito.mock(TablesClient.class), batch);
+
+ Assertions.assertFalse(task.shouldRun());
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void launchJobUsesBatchScopedJobName() {
+ JobsClient jobsClient = Mockito.mock(JobsClient.class);
+ Mockito.when(
+ jobsClient.launch(
+ Mockito.anyString(),
+ Mockito.eq(JobConf.JobTypeEnum.ORPHAN_FILES_DELETION_BATCH),
+ Mockito.anyString(),
+ Mockito.anyMap(),
+ Mockito.anyList()))
+ .thenReturn(Optional.of("job-42"));
+
+ TableMetadataBatch batch =
+ TableMetadataBatch.builder()
+ .dbName("warehouse")
+ .tables(Arrays.asList(table("warehouse", "a"), table("warehouse", "b")))
+ .build();
+ BatchedTableOrphanFilesDeletionTask task =
+ new BatchedTableOrphanFilesDeletionTask(
+ jobsClient, Mockito.mock(TablesClient.class), batch);
+
+ boolean launched = task.launchJob();
+
+ Assertions.assertTrue(launched);
+ Assertions.assertEquals("job-42", task.getJobId());
+ ArgumentCaptor jobNameCaptor = ArgumentCaptor.forClass(String.class);
+ Mockito.verify(jobsClient)
+ .launch(
+ jobNameCaptor.capture(),
+ Mockito.eq(JobConf.JobTypeEnum.ORPHAN_FILES_DELETION_BATCH),
+ Mockito.anyString(),
+ Mockito.anyMap(),
+ Mockito.anyList());
+ // Job name carries db + batch size; nothing per-table is embedded so the string stays bounded.
+ Assertions.assertEquals("ORPHAN_FILES_DELETION_BATCH_warehouse_2", jobNameCaptor.getValue());
+ }
+
+ private static TableMetadata table(String db, String name) {
+ return TableMetadata.builder().dbName(db).tableName(name).creator("test-user").build();
+ }
+}
diff --git a/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/BatchedOrphanFilesDeletionSparkAppArgsTest.java b/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/BatchedOrphanFilesDeletionSparkAppArgsTest.java
index 7a32e503f..73a02017f 100644
--- a/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/BatchedOrphanFilesDeletionSparkAppArgsTest.java
+++ b/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/BatchedOrphanFilesDeletionSparkAppArgsTest.java
@@ -46,23 +46,34 @@ public void buildEntriesRejectsMismatchedLengths() {
}
@Test
- public void buildEntriesRejectsNullArguments() {
+ public void buildEntriesRejectsNullOrEmptyTableNames() {
Assertions.assertThrows(
IllegalArgumentException.class,
() -> BatchedOrphanFilesDeletionSparkApp.buildEntries(null, "op-1", "uuid-1"));
Assertions.assertThrows(
IllegalArgumentException.class,
- () -> BatchedOrphanFilesDeletionSparkApp.buildEntries("db.a", null, "uuid-1"));
- Assertions.assertThrows(
- IllegalArgumentException.class,
- () -> BatchedOrphanFilesDeletionSparkApp.buildEntries("db.a", "op-1", null));
+ () -> BatchedOrphanFilesDeletionSparkApp.buildEntries("", "op-1", "uuid-1"));
}
@Test
- public void buildEntriesRejectsEmptyStrings() {
- Assertions.assertThrows(
- IllegalArgumentException.class,
- () -> BatchedOrphanFilesDeletionSparkApp.buildEntries("", "op-1", "uuid-1"));
+ public void buildEntriesAllowsAbsentOperationIdsAndTableUuids() {
+ // The legacy JobsScheduler path doesn't know about optimizer-service operationIds or table
+ // UUIDs — it just passes the tables. Both null and empty should produce entries with null
+ // optional fields, no exception.
+ List entriesNull =
+ BatchedOrphanFilesDeletionSparkApp.buildEntries("db.a,db.b", null, null);
+ List entriesEmpty =
+ BatchedOrphanFilesDeletionSparkApp.buildEntries("db.a,db.b", "", "");
+
+ for (List entries :
+ java.util.Arrays.asList(entriesNull, entriesEmpty)) {
+ Assertions.assertEquals(2, entries.size());
+ Assertions.assertEquals("db.a", entries.get(0).getFqtn());
+ Assertions.assertNull(entries.get(0).getOperationId());
+ Assertions.assertNull(entries.get(0).getTableUuid());
+ Assertions.assertEquals("db.b", entries.get(1).getFqtn());
+ Assertions.assertNull(entries.get(1).getOperationId());
+ }
}
@Test
@@ -71,4 +82,32 @@ public void buildEntriesRejectsNonFqtn() {
IllegalArgumentException.class,
() -> BatchedOrphanFilesDeletionSparkApp.buildEntries("just_a_table", "op-1", "uuid-1"));
}
+
+ @Test
+ public void buildEntriesAcceptsAtMaxBatchSize() {
+ String tableNames = generateFqtnCsv(BatchedOrphanFilesDeletionSparkApp.MAX_BATCH_SIZE);
+ List entries =
+ BatchedOrphanFilesDeletionSparkApp.buildEntries(tableNames, null, null);
+ Assertions.assertEquals(BatchedOrphanFilesDeletionSparkApp.MAX_BATCH_SIZE, entries.size());
+ }
+
+ @Test
+ public void buildEntriesRejectsAboveMaxBatchSize() {
+ String tableNames = generateFqtnCsv(BatchedOrphanFilesDeletionSparkApp.MAX_BATCH_SIZE + 1);
+ IllegalArgumentException ex =
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () -> BatchedOrphanFilesDeletionSparkApp.buildEntries(tableNames, null, null));
+ Assertions.assertTrue(
+ ex.getMessage().contains("MAX_BATCH_SIZE"), "error should reference the constant name");
+ }
+
+ private static String generateFqtnCsv(int n) {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < n; i++) {
+ if (i > 0) sb.append(',');
+ sb.append("db.t").append(i);
+ }
+ return sb.toString();
+ }
}
diff --git a/infra/recipes/docker-compose/oh-hadoop-spark/jobs.yaml b/infra/recipes/docker-compose/oh-hadoop-spark/jobs.yaml
index 3e795d4db..58cce65ce 100644
--- a/infra/recipes/docker-compose/oh-hadoop-spark/jobs.yaml
+++ b/infra/recipes/docker-compose/oh-hadoop-spark/jobs.yaml
@@ -58,6 +58,14 @@ jobs:
<<: *spark-defaults
"spark.driver.memory": "2g"
<< : *livy-engine
+ - type: ORPHAN_FILES_DELETION_BATCH
+ class-name: com.linkedin.openhouse.jobs.spark.BatchedOrphanFilesDeletionSparkApp
+ args: ["--backupDir", ".backup"]
+ <<: *apps-defaults
+ spark-properties:
+ <<: *spark-defaults
+ "spark.driver.memory": "2g"
+ << : *livy-engine
- type: STAGED_FILES_DELETION
class-name: com.linkedin.openhouse.jobs.spark.StagedFilesDeletionSparkApp
args: ["--trashDir", ".trash", "--daysOld", "10", "--recursive", "true"]
diff --git a/services/jobs/src/main/java/com/linkedin/openhouse/jobs/model/JobConf.java b/services/jobs/src/main/java/com/linkedin/openhouse/jobs/model/JobConf.java
index 4c2d2b3da..36764bb84 100644
--- a/services/jobs/src/main/java/com/linkedin/openhouse/jobs/model/JobConf.java
+++ b/services/jobs/src/main/java/com/linkedin/openhouse/jobs/model/JobConf.java
@@ -27,6 +27,12 @@ public enum JobType {
SQL_TEST,
RETENTION,
ORPHAN_FILES_DELETION,
+ /**
+ * Multi-table orphan-files-deletion. One Spark job processes a list of tables grouped by
+ * database — bin-packing happens scheduler-side. See {@code
+ * BatchedOrphanFilesDeletionSparkApp}.
+ */
+ ORPHAN_FILES_DELETION_BATCH,
SNAPSHOTS_EXPIRATION,
STAGED_FILES_DELETION,
DATA_COMPACTION,