Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions SETUP.md
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,40 @@ docker compose --profile with_jobs_scheduler run openhouse-jobs-scheduler - \
> Try HTTP plugin in IntelliJ to trigger /jobs service local endpoint in local mode by running HTTP scripts in
services/jobs/src/test/http/.

### Test batched orphan file deletion through job-scheduler

The batched OFD scheduler runs orphan-files-deletion across multiple tables in a single Spark job, bin-packed per database. Builds on top of the table you created in [Test through Spark-shell](#test-through-spark-shell).

1. **Manufacture an orphan file** that's older than the default OFD TTL (7 days). From the spark-shell session:
```scala
scala> val fs = org.apache.hadoop.fs.FileSystem.get(spark.sparkContext.hadoopConfiguration)
scala> val orphan = new org.apache.hadoop.fs.Path("/data/openhouse/db/tb/data/test_orphan.orc")
scala> fs.createNewFile(orphan)
scala> fs.setTimes(orphan, System.currentTimeMillis() - 8L*24L*3600L*1000L, -1) // 8 days old
```

2. **Build and run the batched scheduler.**
```
docker compose --profile with_jobs_scheduler build
docker compose --profile with_jobs_scheduler run openhouse-jobs-scheduler - \
--type ORPHAN_FILES_DELETION_BATCH --cluster local \
--tablesURL http://openhouse-tables:8080 --jobsURL http://openhouse-jobs:8080 \
--batchMaxItems 5 \
--tableMinAgeThresholdHours 0 --taskPollIntervalMs 5000
```

> [!NOTE]
> The orphan file at `/data/openhouse/db/tb/data/test_orphan.orc` should be gone after the job completes. Check the scheduler logs for `Packed N eligible tables into M batches` and the Spark app logs for `OFD success: fqtn=db.tb orphansDetected=1`.

> [!NOTE]
> The scheduler groups tables by database before bin-packing — no batch ever crosses a database. `--batchMaxItems` caps tables per batch (default 25; the Spark app enforces a hard ceiling of `MAX_BATCH_SIZE=200`).

> [!NOTE]
> To list files under the table from the HDFS namenode container:
> ```
> docker exec -it local.namenode hdfs dfs -ls -R /data/openhouse/db/tb
> ```

## FAQs

### Q. My docker setup fails to create LLB definition.
Expand Down
4 changes: 4 additions & 0 deletions apps/spark-3.5/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ dependencies {
implementation(project(':libs:datalayout')) {
exclude group: 'com.linkedin.iceberg', module: 'iceberg-spark-runtime-3.1_2.12'
}
// Exclude log4j-slf4j2-impl: incompatible with the log4j-slf4j-impl (1.x) bridge this app ships.
implementation(project(':libs:optimizer:binpack')) {
exclude group: 'org.apache.logging.log4j', module: 'log4j-slf4j2-impl'
}
implementation ('org.apache.spark:spark-core_2.12:' + sparkVersion) {
exclude group: 'io.netty'
exclude group: 'org.apache.hadoop', module: 'hadoop-common'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,13 @@ protected static CommandLine parseArgs(String[] args) {
.longOpt(OperationTasksBuilder.MAX_COST_BUDGET_GB_HRS)
.desc("Maximum compute cost budget in GB hours")
.build());
options.addOption(
Option.builder(null)
.required(false)
.hasArg()
.longOpt(OperationTasksBuilder.BATCH_MAX_ITEMS)
.desc("Max tables per batched OFD job (ORPHAN_FILES_DELETION_BATCH only)")
.build());
options.addOption(
Option.builder(null)
.required(false)
Expand Down Expand Up @@ -744,6 +751,11 @@ protected static Properties getAdditionalProperties(CommandLine cmdLine) {
OperationTasksBuilder.MAX_STRATEGIES_COUNT,
cmdLine.getOptionValue(OperationTasksBuilder.MAX_STRATEGIES_COUNT));
}
if (cmdLine.hasOption(OperationTasksBuilder.BATCH_MAX_ITEMS)) {
result.setProperty(
OperationTasksBuilder.BATCH_MAX_ITEMS,
cmdLine.getOptionValue(OperationTasksBuilder.BATCH_MAX_ITEMS));
}
return result;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package com.linkedin.openhouse.jobs.scheduler.tasks;

import com.linkedin.openhouse.jobs.client.JobsClient;
import com.linkedin.openhouse.jobs.client.TablesClient;
import com.linkedin.openhouse.jobs.client.model.JobConf;
import com.linkedin.openhouse.jobs.util.TableMetadata;
import com.linkedin.openhouse.jobs.util.TableMetadataBatch;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

/**
* A task to remove orphan files from a batch of tables in a single Spark job. Pairs with {@code
* com.linkedin.openhouse.jobs.spark.BatchedOrphanFilesDeletionSparkApp} via the {@link
* JobConf.JobTypeEnum#ORPHAN_FILES_DELETION_BATCH} JobType.
*
* <p>The legacy {@link com.linkedin.openhouse.jobs.scheduler.JobsScheduler} pre-dates the optimizer
* service, so this task omits the optimizer-only CLI flags ({@code --resultsEndpoint}, {@code
* --operationIds}, {@code --tableUuids}). The Spark app treats them as optional and falls back to
* HTS-only lifecycle tracking when they are absent.
*
* @see <a href="https://iceberg.apache.org/docs/latest/maintenance/#delete-orphan-files">Delete
* orphan files</a>
*/
@Slf4j
@Getter
public class BatchedTableOrphanFilesDeletionTask extends OperationTask<TableMetadataBatch> {

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Task implementation for batch job submission.

public static final JobConf.JobTypeEnum OPERATION_TYPE =
JobConf.JobTypeEnum.ORPHAN_FILES_DELETION_BATCH;

public BatchedTableOrphanFilesDeletionTask(
JobsClient jobsClient,
TablesClient tablesClient,
TableMetadataBatch metadata,
long pollIntervalMs,
long queuedTimeoutMs,
long taskTimeoutMs) {
super(jobsClient, tablesClient, metadata, pollIntervalMs, queuedTimeoutMs, taskTimeoutMs);
}

public BatchedTableOrphanFilesDeletionTask(
JobsClient jobsClient, TablesClient tablesClient, TableMetadataBatch metadata) {
super(jobsClient, tablesClient, metadata);
}

@Override
public JobConf.JobTypeEnum getType() {
return OPERATION_TYPE;
}

@Override
protected List<String> getArgs() {
String tableNames =
metadata.getTables().stream().map(TableMetadata::fqtn).collect(Collectors.joining(","));
return Arrays.asList("--tableNames", tableNames);
}

@Override
protected boolean shouldRun() {
return !metadata.getTables().isEmpty();
}

@Override
protected boolean launchJob() {
String jobName =
String.format("%s_%s_%d", getType(), metadata.getDbName(), metadata.getTables().size());
Map<String, String> executionProperties = Collections.emptyMap();
String proxyUser = metadata.getTables().get(0).getCreator();
jobId =
jobsClient
.launch(jobName, getType(), proxyUser, executionProperties, getArgs())
.orElse(null);
return jobId != null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,24 @@
import com.linkedin.openhouse.jobs.client.TablesClient;
import com.linkedin.openhouse.jobs.client.model.JobConf;
import com.linkedin.openhouse.jobs.scheduler.JobsScheduler;
import com.linkedin.openhouse.jobs.spark.BatchedOrphanFilesDeletionSparkApp;
import com.linkedin.openhouse.jobs.util.AppConstants;
import com.linkedin.openhouse.jobs.util.DataLayoutUtil;
import com.linkedin.openhouse.jobs.util.DatabaseMetadata;
import com.linkedin.openhouse.jobs.util.DirectoryMetadata;
import com.linkedin.openhouse.jobs.util.Metadata;
import com.linkedin.openhouse.jobs.util.TableDataLayoutMetadata;
import com.linkedin.openhouse.jobs.util.TableMetadata;
import com.linkedin.openhouse.jobs.util.TableMetadataBatch;
import com.linkedin.openhouse.optimizer.binpack.BinItem;
import com.linkedin.openhouse.optimizer.binpack.FirstFitDecreasingBinPacker;
import com.linkedin.openhouse.optimizer.model.TableOperationDto;
import com.linkedin.openhouse.optimizer.model.TableStatsDto;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
Expand All @@ -40,10 +47,12 @@
public class OperationTasksBuilder {
public static final String MAX_COST_BUDGET_GB_HRS = "maxCostBudgetGbHrs";
public static final String MAX_STRATEGIES_COUNT = "maxStrategiesCount";
public static final String BATCH_MAX_ITEMS = "batchMaxItems";
private static final double COMPUTE_COST_WEIGHT_DEFAULT = 0.3;
private static final double COMPACTION_GAIN_WEIGHT_DEFAULT = 0.7;
private static final double MAX_COST_BUDGET_GB_HRS_DEFAULT = 1000.0;
private static final int MAX_STRATEGIES_COUNT_DEFAULT = 10;
private static final int BATCH_MAX_ITEMS_DEFAULT = 25;
private static final String METRICS_SCOPE = JobsScheduler.class.getName();

private final OperationTaskFactory<? extends OperationTask<?>> taskFactory;
Expand All @@ -65,6 +74,67 @@ private List<OperationTask<?>> prepareTableOperationTaskList(
return processMetadataList(tableMetadataList, jobType, operationMode, otelEmitter);
}

/**
* Builds one {@link BatchedTableOrphanFilesDeletionTask} per database-scoped bin. Groups eligible
* tables by database (batches never cross databases), then applies the first-fit-decreasing bin
* packer with a per-bin item cap from {@code properties} (defaults to {@value
* #BATCH_MAX_ITEMS_DEFAULT}). Tables with the maintenance op disabled are filtered out before
* grouping.
*/
private List<OperationTask<?>> prepareBatchedOrphanFilesDeletionTaskList(
JobConf.JobTypeEnum jobType,
Properties properties,
OperationMode operationMode,
OtelEmitter otelEmitter) {
int maxItemsPerBin =
NumberUtils.toInt(properties.getProperty(BATCH_MAX_ITEMS), BATCH_MAX_ITEMS_DEFAULT);
if (maxItemsPerBin > BatchedOrphanFilesDeletionSparkApp.MAX_BATCH_SIZE) {
throw new IllegalArgumentException(
String.format(
"--%s=%d exceeds Spark-app ceiling MAX_BATCH_SIZE=%d",
BATCH_MAX_ITEMS, maxItemsPerBin, BatchedOrphanFilesDeletionSparkApp.MAX_BATCH_SIZE));
}
List<TableMetadata> eligible =
tablesClient.getTableMetadataList().stream()
.filter(t -> !t.isMaintenanceJobDisabled(jobType))
.collect(Collectors.toList());
log.info(
"Fetched metadata for {} batched-OFD-eligible tables; binMaxItems={}",
eligible.size(),
maxItemsPerBin);

// Item-count cap only; the libs default maxWeightPerBin (1_000_000) is effectively unbounded
// for weight=1 items, so item-count is the active constraint until table_stats is wired in.
FirstFitDecreasingBinPacker<BinItem> packer =
FirstFitDecreasingBinPacker.<BinItem>builder().maxItemsPerBin(maxItemsPerBin).build();

Map<String, List<TableMetadata>> byDb =
eligible.stream().collect(Collectors.groupingBy(TableMetadata::getDbName));

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Group the tables by DBs.


List<TableMetadataBatch> batches = new ArrayList<>();
for (Map.Entry<String, List<TableMetadata>> dbGroup : byDb.entrySet()) {
String dbName = dbGroup.getKey();
List<BinItem> items =
dbGroup.getValue().stream()
.map(t -> (BinItem) new FqtnBinItem(t.fqtn()))
.collect(Collectors.toList());
for (List<BinItem> group : packer.pack(items)) {
List<TableMetadata> tablesForBin =
group.stream()
.map(
item ->
dbGroup.getValue().stream()
.filter(t -> t.fqtn().equals(item.getFullyQualifiedTableName()))
.findFirst()
.orElseThrow(() -> new IllegalStateException("missing table for bin")))
.collect(Collectors.toList());
batches.add(TableMetadataBatch.builder().dbName(dbName).tables(tablesForBin).build());
}
}
log.info("Packed {} eligible tables into {} batches", eligible.size(), batches.size());
return processMetadataList(batches, jobType, operationMode, otelEmitter);
}

private List<OperationTask<?>> prepareReplicationOperationTaskList(
JobConf.JobTypeEnum jobType, OperationMode operationMode, OtelEmitter otelEmitter) {
List<TableMetadata> replicationSetupTableMetadataList = tablesClient.getTableMetadataList();
Expand Down Expand Up @@ -272,6 +342,9 @@ public List<OperationTask<?>> buildOperationTaskList(
case DATA_LAYOUT_STRATEGY_GENERATION:
case SORT_STATS_COLLECTION:
return prepareTableOperationTaskList(jobType, operationMode, otelEmitter);
case ORPHAN_FILES_DELETION_BATCH:
return prepareBatchedOrphanFilesDeletionTaskList(
jobType, properties, operationMode, otelEmitter);
case REPLICATION:
return prepareReplicationOperationTaskList(jobType, operationMode, otelEmitter);
case DATA_LAYOUT_STRATEGY_EXECUTION:
Expand Down Expand Up @@ -300,6 +373,22 @@ public void buildOperationTaskListInParallel(
buildDataLayoutOperationTaskListInParallel(jobType, properties, operationMode, otelEmitter);
} else if (jobType == JobConf.JobTypeEnum.TABLE_DIRECTORY_DELETION) {
buildDatabaseLevelOperationTasksInParallel(jobType, operationMode, otelEmitter);
} else if (jobType == JobConf.JobTypeEnum.ORPHAN_FILES_DELETION_BATCH) {
// Batched OFD needs the full table set in hand before it can group-by-db and bin-pack,
// so we use the synchronous fetch path then enqueue the tasks in bulk.
List<OperationTask<?>> tasks =
prepareBatchedOrphanFilesDeletionTaskList(
jobType, properties, operationMode, otelEmitter);
for (OperationTask<?> task : tasks) {
try {
operationTaskManager.addData(task);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("Interrupted while enqueueing batched OFD task", e);
}
}
operationTaskManager.updateDataGenerationCompletion();
log.info("Enqueued {} batched OFD tasks for job type: {}", tasks.size(), jobType);
} else {
buildOperationTaskListInParallelInternal(jobType, operationMode, otelEmitter);
}
Expand Down Expand Up @@ -438,4 +527,37 @@ private void buildDatabaseLevelOperationTasksInParallel(
})
.subscribe();
}

/**
* Minimal {@link BinItem} for the legacy scheduler path: every item has weight 1, so packing is
* driven purely by {@code maxItemsPerBin}. {@code fromOpAndStats} is unreachable here because we
* call {@code packer.pack(List<BinItem>)} (the pre-projected overload), not the projection path.
*/
private static final class FqtnBinItem implements BinItem {
private final String fqtn;

FqtnBinItem(String fqtn) {
this.fqtn = fqtn;
}

@Override
public long getWeight() {
return 1L;
}

@Override
public String getFullyQualifiedTableName() {
return fqtn;
}

@Override
public String getOperationId() {
return "";
}

@Override
public BinItem fromOpAndStats(TableOperationDto op, TableStatsDto stats) {
return this;
}
}
}
Loading