diff --git a/apps/optimizer/schedulerapp/src/main/java/com/linkedin/openhouse/optimizer/scheduler/SchedulerApplication.java b/apps/optimizer/schedulerapp/src/main/java/com/linkedin/openhouse/optimizer/scheduler/SchedulerApplication.java index d83db7524..b1f06e5d3 100644 --- a/apps/optimizer/schedulerapp/src/main/java/com/linkedin/openhouse/optimizer/scheduler/SchedulerApplication.java +++ b/apps/optimizer/schedulerapp/src/main/java/com/linkedin/openhouse/optimizer/scheduler/SchedulerApplication.java @@ -1,7 +1,5 @@ package com.linkedin.openhouse.optimizer.scheduler; -import com.linkedin.openhouse.optimizer.model.OperationTypeDto; -import java.util.Map; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; @@ -26,13 +24,11 @@ public class SchedulerApplication implements CommandLineRunner, ExitCodeGenerator { private final SchedulerRunner runner; - private final Map binPackers; private int exitCode = 0; @Autowired - public SchedulerApplication(SchedulerRunner runner, Map binPackers) { + public SchedulerApplication(SchedulerRunner runner) { this.runner = runner; - this.binPackers = binPackers; } public static void main(String[] args) { @@ -40,15 +36,15 @@ public static void main(String[] args) { } /** - * Runs the scheduler once per registered {@link BinPacker} per process invocation. Each call is - * scoped to one operation type. Any thrown exception is logged and surfaces as a non-zero exit - * code via {@link #getExitCode()} after the context is shut down cleanly. + * Runs the scheduler once per registered operation type per process invocation. Any thrown + * exception is logged and surfaces as a non-zero exit code via {@link #getExitCode()} after the + * context is shut down cleanly. */ @Override public void run(String... args) { try { - log.info("Scheduler starting; operation types: {}", binPackers.keySet()); - binPackers.keySet().forEach(runner::schedule); + log.info("Scheduler starting; operation types: {}", runner.getRegisteredOperationTypes()); + runner.getRegisteredOperationTypes().forEach(runner::schedule); log.info("Scheduler completed successfully"); } catch (Exception e) { log.error("Scheduler failed", e); diff --git a/apps/optimizer/schedulerapp/src/main/resources/application.properties b/apps/optimizer/schedulerapp/src/main/resources/application.properties index 5184cf1bc..b43a66459 100644 --- a/apps/optimizer/schedulerapp/src/main/resources/application.properties +++ b/apps/optimizer/schedulerapp/src/main/resources/application.properties @@ -6,6 +6,9 @@ spring.datasource.username=${OPTIMIZER_DB_USER:sa} spring.datasource.password=${OPTIMIZER_DB_PASSWORD:} spring.jpa.hibernate.ddl-auto=none optimizer.scheduler.jobs.base-uri=${JOBS_BASE_URI:http://localhost:8002} +# Per-bin caps for ORPHAN_FILES_DELETION; 0 disables a dimension. File count is the OFD cost +# driver — per-file list, manifest joins, and delete calls dominate, independent of file size. optimizer.scheduler.ofd.max-files-per-bin=${SCHEDULER_OFD_MAX_FILES_PER_BIN:1000000} +optimizer.scheduler.ofd.max-tables-per-bin=${SCHEDULER_OFD_MAX_TABLES_PER_BIN:50} optimizer.scheduler.results-endpoint=${SCHEDULER_RESULTS_ENDPOINT:http://openhouse-optimizer:8080/v1/optimizer/operations} optimizer.scheduler.cluster-id=${SCHEDULER_CLUSTER_ID:LocalHadoopCluster} diff --git a/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/Bin.java b/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/Bin.java deleted file mode 100644 index 082a3bbd7..000000000 --- a/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/Bin.java +++ /dev/null @@ -1,61 +0,0 @@ -package com.linkedin.openhouse.optimizer.scheduler; - -import com.linkedin.openhouse.optimizer.model.OperationTypeDto; -import com.linkedin.openhouse.optimizer.model.TableOperationDto; -import com.linkedin.openhouse.optimizer.scheduler.client.JobsServiceClient; -import java.time.Instant; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.Optional; -import java.util.Set; -import java.util.stream.Collectors; -import lombok.Getter; -import lombok.RequiredArgsConstructor; - -/** - * A set of operations the scheduler will submit together as a single Spark job. A bin owns its own - * launch — callers ask it to schedule itself and react to the returned job id. The surrounding - * status-update machinery (claim, mark-scheduled, revert-to-pending) lives in the scheduler because - * it is shared across all bins regardless of operation type. - */ -@RequiredArgsConstructor -public class Bin { - - @Getter private final OperationTypeDto operationType; - @Getter private final List operations; - - /** Operation UUIDs in this bin, parallel to {@link #getTableNames()}. */ - public List getOperationIds() { - return operations.stream().map(TableOperationDto::getId).collect(Collectors.toList()); - } - - /** Fully-qualified {@code database.table} identifiers for the operations in this bin. */ - public List getTableNames() { - return operations.stream() - .map(op -> op.getDatabaseName() + "." + op.getTableName()) - .collect(Collectors.toList()); - } - - /** - * Return a new {@link Bin} containing only the operations whose IDs are in {@code keepIds}. Used - * by the scheduler to narrow the bin to the rows it actually claimed before launching the job. - */ - public Bin subset(Collection keepIds) { - Set keep = new HashSet<>(keepIds); - List filtered = - operations.stream().filter(op -> keep.contains(op.getId())).collect(Collectors.toList()); - return new Bin(operationType, filtered); - } - - /** - * Submit this bin as a single Spark job. Returns the job id on success, or empty on submission - * failure — the caller is responsible for the surrounding status updates. - */ - public Optional schedule(JobsServiceClient client, String resultsEndpoint) { - String jobName = - "batched-" + operationType.name().toLowerCase() + "-" + Instant.now().toEpochMilli(); - return client.launch( - jobName, operationType.name(), getTableNames(), getOperationIds(), resultsEndpoint); - } -} diff --git a/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/BinPacker.java b/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/BinPacker.java deleted file mode 100644 index 509c37b75..000000000 --- a/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/BinPacker.java +++ /dev/null @@ -1,24 +0,0 @@ -package com.linkedin.openhouse.optimizer.scheduler; - -import com.linkedin.openhouse.optimizer.model.TableStatsDto; -import java.util.List; - -/** - * Strategy for packing a set of operations into bins for batched job submission. Implementations - * encode the constraints of a particular packing dimension (file count, partition count, etc.); - * binding to an operation type is the responsibility of the scheduler configuration, not the - * strategy class. - * - *

{@link TableStatsDto} is the cost source at the interface boundary, carried alongside each - * operation in a {@link SchedulingCandidate}. Implementations project the stats down to the minimal - * data needed to make their packing decision (e.g. file count for OFD) and do not retain the full - * stats payload in the returned bins. - */ -public interface BinPacker { - - /** - * Pack {@code pending} into one or more {@link Bin}s. Each returned bin is non-empty; the - * scheduler dispatches one Spark job per bin. - */ - List pack(List pending); -} diff --git a/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/FileCountBinPacker.java b/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/FileCountBinPacker.java deleted file mode 100644 index b62e1bf9b..000000000 --- a/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/FileCountBinPacker.java +++ /dev/null @@ -1,84 +0,0 @@ -package com.linkedin.openhouse.optimizer.scheduler; - -import com.linkedin.openhouse.optimizer.model.OperationTypeDto; -import com.linkedin.openhouse.optimizer.model.TableOperationDto; -import com.linkedin.openhouse.optimizer.model.TableStatsDto; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.List; -import java.util.Map; -import java.util.OptionalInt; -import java.util.stream.Collectors; -import java.util.stream.IntStream; -import lombok.RequiredArgsConstructor; - -/** - * Greedy first-fit-descending bin-packer keyed on per-table file count, projected from each - * candidate's {@link TableStatsDto}. - * - *

Candidates are sorted by descending file count, then assigned to the first bin whose running - * total stays at or below {@code maxFilesPerBin}. An operation larger than the limit gets its own - * bin (oversized bins are allowed — we never drop an operation). - */ -@RequiredArgsConstructor -public class FileCountBinPacker implements BinPacker { - - private final OperationTypeDto operationType; - private final long maxFilesPerBin; - - @Override - public List pack(List pending) { - if (pending.isEmpty()) { - return List.of(); - } - - // Project once: each candidate's packing cost is just a long, keyed by operation id. - Map costByOperationId = - pending.stream() - .collect(Collectors.toMap(c -> c.getOperation().getId(), c -> cost(c.getStats()))); - - List sorted = - pending.stream() - .map(SchedulingCandidate::getOperation) - .sorted( - Comparator.comparingLong( - (TableOperationDto op) -> costByOperationId.get(op.getId())) - .reversed()) - .collect(Collectors.toList()); - - // First-fit-descending is inherently stateful — each placement depends on the running totals - // for bins assembled so far. - List> binContents = new ArrayList<>(); - List binTotals = new ArrayList<>(); - sorted.forEach( - op -> { - long c = costByOperationId.get(op.getId()); - OptionalInt placed = - IntStream.range(0, binContents.size()) - .filter(i -> binTotals.get(i) + c <= maxFilesPerBin || binTotals.get(i) == 0) - .findFirst(); - if (placed.isPresent()) { - int idx = placed.getAsInt(); - binContents.get(idx).add(op); - binTotals.set(idx, binTotals.get(idx) + c); - } else { - List newBin = new ArrayList<>(); - newBin.add(op); - binContents.add(newBin); - binTotals.add(c); - } - }); - - return binContents.stream() - .map(ops -> new Bin(operationType, ops)) - .collect(Collectors.toList()); - } - - private static long cost(TableStatsDto stats) { - if (stats == null || stats.getSnapshot() == null) { - return 0L; - } - Long n = stats.getSnapshot().getNumCurrentFiles(); - return n != null ? n : 0L; - } -} diff --git a/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/SchedulerRunner.java b/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/SchedulerRunner.java index 7b4f7594b..e10853f7a 100644 --- a/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/SchedulerRunner.java +++ b/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/SchedulerRunner.java @@ -8,75 +8,125 @@ import com.linkedin.openhouse.optimizer.model.TableStatsDto; import com.linkedin.openhouse.optimizer.repository.TableOperationsRepository; import com.linkedin.openhouse.optimizer.repository.TableStatsRepository; +import com.linkedin.openhouse.optimizer.scheduler.binpack.Bin; +import com.linkedin.openhouse.optimizer.scheduler.binpack.BinItem; +import com.linkedin.openhouse.optimizer.scheduler.binpack.BinPacker; import com.linkedin.openhouse.optimizer.scheduler.client.JobsServiceClient; import java.time.Instant; import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Value; import org.springframework.data.domain.Pageable; -import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; /** - * For one operation type per call, reads PENDING rows, looks up per-table stats, dispatches to the - * registered {@link BinPacker}, and submits one Spark job per returned {@link Bin}. The {@link - * com.linkedin.openhouse.optimizer.scheduler.SchedulerApplication}'s CommandLineRunner loops over - * the registered packers and invokes {@code schedule(opType)} for each. + * Generic scheduler. Operation types are registered at construction via {@link #registerOperation}, + * which returns a new instance with the additional entry — the registry is immutable, so the bean + * Spring publishes is the fully-registered runner produced in {@link + * com.linkedin.openhouse.optimizer.scheduler.config.SchedulerConfig}. For each registered type the + * runner: + * + *

    + *
  1. Reads PENDING rows from MySQL. + *
  2. Deduplicates duplicate PENDING rows for the same {@code tableUuid}. + *
  3. Loads the stats row for every survivor. + *
  4. Hands the (operations, stats) pair to the {@link BinPacker} and receives one grouping per + * batch. + *
  5. Wraps each grouping into a {@link Bin} tagged with the operation type and schedules it + * (claim CAS, narrow to claimed, launch, record). + *
+ * + *

The runner is operation-agnostic. All IO and the claim/launch/mark lifecycle live here; the + * only per-operation knowledge in the module is the {@link BinPacker} the caller registers. */ @Slf4j -@Component public class SchedulerRunner { + private final TableOperationsRepository operationsRepo; private final TableStatsRepository statsRepo; private final JobsServiceClient jobsClient; - private final Map binPackers; private final String resultsEndpoint; + private final Map registry; public SchedulerRunner( TableOperationsRepository operationsRepo, TableStatsRepository statsRepo, JobsServiceClient jobsClient, - Map binPackers, - @Value("${optimizer.scheduler.results-endpoint}") String resultsEndpoint) { + String resultsEndpoint) { + this(operationsRepo, statsRepo, jobsClient, resultsEndpoint, Map.of()); + } + + private SchedulerRunner( + TableOperationsRepository operationsRepo, + TableStatsRepository statsRepo, + JobsServiceClient jobsClient, + String resultsEndpoint, + Map registry) { this.operationsRepo = operationsRepo; this.statsRepo = statsRepo; this.jobsClient = jobsClient; - this.binPackers = binPackers; this.resultsEndpoint = resultsEndpoint; - } - - /** Schedule all PENDING operations of the given type across all databases. */ - @Transactional - public void schedule(OperationTypeDto operationType) { - schedule(operationType, Optional.empty(), Optional.empty()); + this.registry = registry; } /** - * Schedule PENDING operations for {@code operationType}, optionally scoped to a single database - * or table name. + * Return a new {@link SchedulerRunner} whose registry is this one's plus {@code (type, packer)}. + * If {@code type} was already registered, the new entry replaces the prior one. Pure: the + * receiver is unchanged. */ - @Transactional + public SchedulerRunner registerOperation(OperationTypeDto type, BinPacker packer) { + HashMap next = new HashMap<>(registry); + next.put(type, packer); + return new SchedulerRunner( + operationsRepo, statsRepo, jobsClient, resultsEndpoint, Map.copyOf(next)); + } + + public Set getRegisteredOperationTypes() { + return registry.keySet(); + } + + public void schedule(OperationTypeDto type) { + schedule(type, Optional.empty(), Optional.empty()); + } + public void schedule( - OperationTypeDto operationType, Optional databaseName, Optional tableName) { + OperationTypeDto type, Optional databaseName, Optional tableName) { + BinPacker packer = + Optional.ofNullable(registry.get(type)) + .orElseThrow( + () -> + new IllegalStateException( + "No BinPacker registered for operation type " + type)); - BinPacker packer = binPackers.get(operationType); - if (packer == null) { - throw new IllegalStateException( - "No BinPacker registered for operation type " + operationType); + List pending = loadAndDedupPending(type, databaseName, tableName); + if (pending.isEmpty()) { + return; } + Map statsByUuid = loadStatsByUuid(pending); - // Unpaged: a single-page truncation would silently drop work past page 0 (next cycle would - // re-load the same first page in MySQL row order, leaving the tail unscheduled until the - // ordering shifts). Correctness here requires the full PENDING set in one cycle; the working - // set is bounded by count(PENDING for this op type). + List bins = + packer.pack(pending, statsByUuid).stream() + .map(grouping -> new Bin(type, grouping)) + .collect(Collectors.toList()); + log.info("Packed {} PENDING {} operations into {} bins", pending.size(), type, bins.size()); + + bins.forEach(this::scheduleBin); + } + + private List loadAndDedupPending( + OperationTypeDto type, Optional databaseName, Optional tableName) { + // Unpaged: correctness requires the full PENDING set in one cycle; the working set is bounded + // by count(PENDING for this op type). Single-page truncation would silently drop work past + // page 0. List pendingRows = operationsRepo.find( - Optional.of(operationType.toDb()), + Optional.of(type.toDb()), Optional.of(OperationStatus.PENDING), Optional.empty(), databaseName, @@ -85,65 +135,16 @@ public void schedule( Optional.empty(), Pageable.unpaged()); if (pendingRows.isEmpty()) { - log.info("No PENDING operations of type {}; nothing to schedule", operationType); - return; + log.info("No PENDING operations of type {}; nothing to schedule", type); + return List.of(); } - - // Deduplicate before claiming: if multiple PENDING rows exist for the same tableUuid, keep - // the oldest (lex-tiebreak on id) and cancel the rest. Per-cycle, not per-bin — running this - // inside the bin loop nuked rows belonging to other bins of the same cycle. List survivors = cancelDuplicates(pendingRows); - if (survivors.isEmpty()) { - return; - } - - List pending = - survivors.stream().map(TableOperationDto::fromRow).collect(Collectors.toList()); - - // Tradeoff: we fetch fresh table_stats per scheduling cycle (one batched query) rather than - // denormalizing the relevant fields onto TableOperationDto. The denormalized alternative would - // remove the per-cycle lookup but widen the TableOperationDto row and serve staler data; the - // current shape favors smaller operations + freshness over fewer queries. - Set uuids = - pending.stream().map(TableOperationDto::getTableUuid).collect(Collectors.toSet()); - Map statsByUuid = - statsRepo.findAllById(uuids).stream() - .collect(Collectors.toMap(TableStatsRow::getTableUuid, TableStatsDto::fromRow)); - - // Filter at the boundary so SchedulingCandidate.stats is guaranteed non-null. A table without - // a stats row gets skipped this cycle and reconsidered after stats land. - List withStats = - pending.stream() - .filter(op -> statsByUuid.containsKey(op.getTableUuid())) - .collect(Collectors.toList()); - if (withStats.size() < pending.size()) { - log.warn( - "Skipped {} {} operations with no table_stats row", - pending.size() - withStats.size(), - operationType); - } - if (withStats.isEmpty()) { - return; - } - - List candidates = - withStats.stream() - .map(op -> new SchedulingCandidate(op, statsByUuid.get(op.getTableUuid()))) - .collect(Collectors.toList()); - - List bins = packer.pack(candidates); - log.info( - "Packed {} PENDING {} operations into {} bins", - candidates.size(), - operationType, - bins.size()); - - bins.forEach(this::submitBin); + return survivors.stream().map(TableOperationDto::fromRow).collect(Collectors.toList()); } /** * Group {@code pendingRows} by {@code tableUuid}; for any group with more than one row, cancel - * all but the oldest (lex-tiebreak on id). Returns the survivors in input order. Deterministic. + * all but the oldest (lex-tiebreak on id). Returns survivors in input order. Deterministic. */ private List cancelDuplicates(List pendingRows) { Map> byTableUuid = @@ -175,13 +176,23 @@ private List cancelDuplicates(List pendi .collect(Collectors.toList()); } - private void submitBin(Bin bin) { - List ids = bin.getOperationIds(); + private Map loadStatsByUuid(List ops) { + Set uuids = + ops.stream().map(TableOperationDto::getTableUuid).collect(Collectors.toSet()); + return statsRepo.findAllById(uuids).stream() + .collect(Collectors.toMap(TableStatsRow::getTableUuid, TableStatsDto::fromRow)); + } + + /** + * Claim the bin's operations, narrow to the rows actually owned, launch one batched Spark job for + * the claimed subset, and mark SCHEDULED — or revert to PENDING if launch failed. + */ + @Transactional + void scheduleBin(Bin bin) { + List items = bin.getItems(); + OperationTypeDto type = bin.getOperationType(); + List ids = items.stream().map(BinItem::getOperationId).collect(Collectors.toList()); - // Claim the rows in one batched UPDATE: PENDING → SCHEDULING. The UPDATE's row count is just - // an aggregate — to know *which* rows we own, re-query for SCHEDULING rows tagged with our - // scheduledAt watermark. Anything not in that subset belongs to another instance or was - // canceled, and must not be submitted or marked SCHEDULED. Instant claimedAt = Instant.now(); operationsRepo.updateBatch( ids, @@ -189,8 +200,6 @@ private void submitBin(Bin bin) { OperationStatus.SCHEDULING, Optional.of(claimedAt), Optional.empty()); - // Unpaged: the result set is already bounded by ids.size() (the bin we just claimed); no - // need to cap it further. List claimedIds = operationsRepo .find( @@ -216,8 +225,21 @@ private void submitBin(Bin bin) { ids.size()); } - Bin claimedBin = bin.subset(claimedIds); - Optional jobId = claimedBin.schedule(jobsClient, resultsEndpoint); + Set claimedSet = new HashSet<>(claimedIds); + List claimedItems = + items.stream() + .filter(item -> claimedSet.contains(item.getOperationId())) + .collect(Collectors.toList()); + List tableNames = + claimedItems.stream().map(BinItem::getFullyQualifiedTableName).collect(Collectors.toList()); + List operationIds = + claimedItems.stream().map(BinItem::getOperationId).collect(Collectors.toList()); + + String jobTypeName = type.name(); + String jobName = "batched-" + jobTypeName.toLowerCase() + "-" + claimedAt.toEpochMilli(); + Optional jobId = + jobsClient.launch(jobName, jobTypeName, tableNames, operationIds, resultsEndpoint); + if (jobId.isPresent()) { int updated = operationsRepo.updateBatch( @@ -229,7 +251,7 @@ private void submitBin(Bin bin) { log.info( "Submitted job {} for {} tables ({} rows marked SCHEDULED)", jobId.get(), - claimedBin.getOperations().size(), + claimedItems.size(), updated); } else { int reverted = diff --git a/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/SchedulingCandidate.java b/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/SchedulingCandidate.java deleted file mode 100644 index b031ae6b7..000000000 --- a/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/SchedulingCandidate.java +++ /dev/null @@ -1,19 +0,0 @@ -package com.linkedin.openhouse.optimizer.scheduler; - -import com.linkedin.openhouse.optimizer.model.TableOperationDto; -import com.linkedin.openhouse.optimizer.model.TableStatsDto; -import lombok.NonNull; -import lombok.Value; - -/** - * A pending operation paired with the stats the bin packer will use as its cost source. Built by - * the scheduler at scheduling time and handed to the {@link BinPacker} as the unit of packing. - * - *

Both fields are non-null. The scheduler filters out operations whose tables have no stats row - * before constructing candidates. - */ -@Value -public class SchedulingCandidate { - @NonNull TableOperationDto operation; - @NonNull TableStatsDto stats; -} diff --git a/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/binpack/Bin.java b/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/binpack/Bin.java new file mode 100644 index 000000000..7105aae23 --- /dev/null +++ b/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/binpack/Bin.java @@ -0,0 +1,20 @@ +package com.linkedin.openhouse.optimizer.scheduler.binpack; + +import com.linkedin.openhouse.optimizer.model.OperationTypeDto; +import java.util.List; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.ToString; + +/** + * One scheduling unit: the operation type the bin will run as, and the items the scheduler will + * claim, narrow to claimed, and launch a single Spark job for. Pure data — the scheduler reads from + * a bin to do the work; the bin does no IO itself. + */ +@AllArgsConstructor +@Getter +@ToString +public class Bin { + private final OperationTypeDto operationType; + private final List items; +} diff --git a/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/binpack/BinItem.java b/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/binpack/BinItem.java new file mode 100644 index 000000000..4dc9be00e --- /dev/null +++ b/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/binpack/BinItem.java @@ -0,0 +1,24 @@ +package com.linkedin.openhouse.optimizer.scheduler.binpack; + +import com.linkedin.openhouse.optimizer.model.TableOperationDto; +import com.linkedin.openhouse.optimizer.model.TableStatsDto; + +/** + * One packable unit. Exposes the weight a packer keys on, plus the identity the scheduler reads + * when it launches a Spark job (fully-qualified table name, operation id). + * + *

Implementations have a public no-arg constructor — instantiated transiently inside {@link + * FirstFitBinPacker#pack} via a {@code Supplier} (typically a {@code + * MyItem::new} method reference) — on which {@link #fromOpAndStats} is called to return the + * populated item. Getters on the empty instance are not meaningful; it exists for the lifetime of a + * single projection call. + */ +public interface BinItem { + long getWeight(); + + String getFullyQualifiedTableName(); + + String getOperationId(); + + BinItem fromOpAndStats(TableOperationDto op, TableStatsDto stats); +} diff --git a/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/binpack/BinPacker.java b/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/binpack/BinPacker.java new file mode 100644 index 000000000..aed87f762 --- /dev/null +++ b/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/binpack/BinPacker.java @@ -0,0 +1,18 @@ +package com.linkedin.openhouse.optimizer.scheduler.binpack; + +import com.linkedin.openhouse.optimizer.model.TableOperationDto; +import com.linkedin.openhouse.optimizer.model.TableStatsDto; +import java.util.List; +import java.util.Map; + +/** + * Per-operation-type strategy. Given a flat list of operations and the corresponding stats, returns + * one grouping per batch the scheduler should submit. The scheduler wraps each grouping into a + * {@link Bin} with the registered operation type. Implementations do no IO and hold no mutable + * state; the projection from {@code (op, stats)} to {@link BinItem} and the bucketing strategy both + * live in the implementation. + */ +public interface BinPacker { + List> pack( + List operations, Map statsByTableUuid); +} diff --git a/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/binpack/FirstFitBinPacker.java b/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/binpack/FirstFitBinPacker.java new file mode 100644 index 000000000..be94158f8 --- /dev/null +++ b/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/binpack/FirstFitBinPacker.java @@ -0,0 +1,92 @@ +package com.linkedin.openhouse.optimizer.scheduler.binpack; + +import com.linkedin.openhouse.optimizer.model.TableOperationDto; +import com.linkedin.openhouse.optimizer.model.TableStatsDto; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; + +/** + * First-fit-decreasing packing, generic over the concrete {@link BinItem} subtype {@code T}. + * Construction takes a {@code Supplier} — typically a {@code MyItem::new} method reference — + * which the packer invokes per operation to get an empty instance, then calls {@link + * BinItem#fromOpAndStats} on it to project the (operation, stats) pair into a populated item. + * + *

Sorts items by weight descending, then places each into the first group whose totals stay at + * or below {@code maxWeightPerBin} and {@code maxItemsPerBin}. An item whose weight exceeds the cap + * on its own goes into a group by itself. Operations whose {@code tableUuid} has no entry in {@code + * statsByTableUuid} are dropped. + * + *

Stateless: the constructor takes only the BinItem supplier and the cap configuration; {@link + * #pack} is a pure function over its arguments. The packer is operation-agnostic — the scheduler + * wraps each grouping into a {@link Bin} with the registered operation type. + */ +@Slf4j +public class FirstFitBinPacker implements BinPacker { + + private final Supplier binItemSupplier; + private final long maxWeightPerBin; + private final int maxItemsPerBin; + + public FirstFitBinPacker(Supplier binItemSupplier, long maxWeightPerBin, int maxItemsPerBin) { + this.binItemSupplier = binItemSupplier; + this.maxWeightPerBin = maxWeightPerBin; + this.maxItemsPerBin = maxItemsPerBin; + } + + @Override + public List> pack( + List operations, Map statsByTableUuid) { + List items = + operations.stream() + .filter(op -> statsByTableUuid.containsKey(op.getTableUuid())) + .map( + op -> + binItemSupplier + .get() + .fromOpAndStats(op, statsByTableUuid.get(op.getTableUuid()))) + .collect(Collectors.toList()); + List packingBins = + items.stream() + .sorted(Comparator.comparingLong(BinItem::getWeight).reversed()) + .collect(ArrayList::new, this::placeItem, List::addAll); + log.info( + "Packed {} operations ({} items after projection) into {} groupings", + operations.size(), + items.size(), + packingBins.size()); + return packingBins.stream().map(pb -> pb.items).collect(Collectors.toList()); + } + + private void placeItem(List bins, BinItem item) { + bins.stream() + .filter(b -> b.fits(item, maxWeightPerBin, maxItemsPerBin)) + .findFirst() + .ifPresentOrElse( + b -> b.add(item), + () -> { + PackingBin fresh = new PackingBin(); + fresh.add(item); + bins.add(fresh); + }); + } + + /** Running-totals helper used during the fold. */ + private static class PackingBin { + final List items = new ArrayList<>(); + long totalWeight; + + boolean fits(BinItem item, long maxWeight, int maxItems) { + return items.size() < maxItems && totalWeight + item.getWeight() <= maxWeight; + } + + void add(BinItem item) { + items.add(item); + totalWeight += item.getWeight(); + } + } +} diff --git a/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/binpack/TotalFilesBinItem.java b/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/binpack/TotalFilesBinItem.java new file mode 100644 index 000000000..d9bdf135f --- /dev/null +++ b/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/binpack/TotalFilesBinItem.java @@ -0,0 +1,49 @@ +package com.linkedin.openhouse.optimizer.scheduler.binpack; + +import com.linkedin.openhouse.optimizer.model.TableOperationDto; +import com.linkedin.openhouse.optimizer.model.TableStatsDto; +import java.util.Optional; +import lombok.Getter; +import lombok.ToString; + +/** + * {@link BinItem} that weights by the table's current file count. Suitable for any operation whose + * Spark cost scales with file count — orphan files deletion, stats collection, etc. The + * implementation knows nothing about which operation type it is wired up to. + * + *

Construction: callers pass {@code TotalFilesBinItem::new} as the {@code Supplier} to {@link + * FirstFitBinPacker}; the packer calls the supplier per operation to get an empty instance, then + * {@link #fromOpAndStats} on it to get a populated copy. + */ +@Getter +@ToString +public class TotalFilesBinItem implements BinItem { + + private final String fullyQualifiedTableName; + private final String operationId; + private final long weight; + + /** Empty constructor: call {@link #fromOpAndStats} on the result to get a populated instance. */ + public TotalFilesBinItem() { + this("", "", 0L); + } + + private TotalFilesBinItem(String fullyQualifiedTableName, String operationId, long weight) { + this.fullyQualifiedTableName = fullyQualifiedTableName; + this.operationId = operationId; + this.weight = weight; + } + + @Override + public BinItem fromOpAndStats(TableOperationDto op, TableStatsDto stats) { + return new TotalFilesBinItem( + op.getDatabaseName() + "." + op.getTableName(), op.getId(), currentFileCount(stats)); + } + + private static long currentFileCount(TableStatsDto stats) { + return Optional.ofNullable(stats) + .map(TableStatsDto::getSnapshot) + .map(TableStatsDto.SnapshotMetrics::getNumCurrentFiles) + .orElse(0L); + } +} diff --git a/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/config/SchedulerConfig.java b/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/config/SchedulerConfig.java index 796e707f4..124860943 100644 --- a/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/config/SchedulerConfig.java +++ b/services/optimizer/scheduler/src/main/java/com/linkedin/openhouse/optimizer/scheduler/config/SchedulerConfig.java @@ -1,15 +1,22 @@ package com.linkedin.openhouse.optimizer.scheduler.config; import com.linkedin.openhouse.optimizer.model.OperationTypeDto; -import com.linkedin.openhouse.optimizer.scheduler.BinPacker; -import com.linkedin.openhouse.optimizer.scheduler.FileCountBinPacker; +import com.linkedin.openhouse.optimizer.repository.TableOperationsRepository; +import com.linkedin.openhouse.optimizer.repository.TableStatsRepository; +import com.linkedin.openhouse.optimizer.scheduler.SchedulerRunner; +import com.linkedin.openhouse.optimizer.scheduler.binpack.FirstFitBinPacker; +import com.linkedin.openhouse.optimizer.scheduler.binpack.TotalFilesBinItem; import com.linkedin.openhouse.optimizer.scheduler.client.JobsServiceClient; -import java.util.Map; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.reactive.function.client.WebClient; +/** + * Cross-cutting wiring (jobs-service client) plus the {@link SchedulerRunner} bean. Each operation + * type's identity (type, packing strategy, item supplier) is composed in {@link #schedulerRunner}; + * the runner itself never names an operation type beyond the keys in its registry. + */ @Configuration public class SchedulerConfig { @@ -19,9 +26,6 @@ public class SchedulerConfig { @Value("${optimizer.scheduler.cluster-id}") private String clusterId; - @Value("${optimizer.scheduler.ofd.max-files-per-bin}") - private long ofdMaxFilesPerBin; - @Bean public WebClient jobsWebClient() { return WebClient.builder().baseUrl(jobsBaseUri).build(); @@ -33,14 +37,21 @@ public JobsServiceClient jobsServiceClient(WebClient jobsWebClient) { } /** - * Map of {@link OperationTypeDto} to the {@link BinPacker} strategy that handles it. Adding a new - * operation type means adding an entry here and configuring its packer; the strategy class itself - * stays generic. + * Orphan files deletion: a {@link FirstFitBinPacker} over {@link TotalFilesBinItem}. Cost scales + * with file count — per-file list, manifest joins, and delete calls dominate independent of file + * size. */ @Bean - public Map binPackers() { - return Map.of( - OperationTypeDto.ORPHAN_FILES_DELETION, - new FileCountBinPacker(OperationTypeDto.ORPHAN_FILES_DELETION, ofdMaxFilesPerBin)); + public SchedulerRunner schedulerRunner( + TableOperationsRepository operationsRepo, + TableStatsRepository statsRepo, + JobsServiceClient jobsClient, + @Value("${optimizer.scheduler.results-endpoint}") String resultsEndpoint, + @Value("${optimizer.scheduler.ofd.max-files-per-bin}") long ofdMaxFilesPerBin, + @Value("${optimizer.scheduler.ofd.max-tables-per-bin}") int ofdMaxTablesPerBin) { + return new SchedulerRunner(operationsRepo, statsRepo, jobsClient, resultsEndpoint) + .registerOperation( + OperationTypeDto.ORPHAN_FILES_DELETION, + new FirstFitBinPacker<>(TotalFilesBinItem::new, ofdMaxFilesPerBin, ofdMaxTablesPerBin)); } } diff --git a/services/optimizer/scheduler/src/test/java/com/linkedin/openhouse/optimizer/scheduler/FileCountBinPackerTest.java b/services/optimizer/scheduler/src/test/java/com/linkedin/openhouse/optimizer/scheduler/FileCountBinPackerTest.java deleted file mode 100644 index dc3b96b5c..000000000 --- a/services/optimizer/scheduler/src/test/java/com/linkedin/openhouse/optimizer/scheduler/FileCountBinPackerTest.java +++ /dev/null @@ -1,104 +0,0 @@ -package com.linkedin.openhouse.optimizer.scheduler; - -import static org.assertj.core.api.Assertions.assertThat; - -import com.linkedin.openhouse.optimizer.model.OperationTypeDto; -import com.linkedin.openhouse.optimizer.model.TableOperationDto; -import com.linkedin.openhouse.optimizer.model.TableStatsDto; -import java.util.List; -import java.util.UUID; -import java.util.stream.Collectors; -import org.junit.jupiter.api.Test; - -class FileCountBinPackerTest { - - private static final long MAX = 1_000_000L; - private final FileCountBinPacker packer = - new FileCountBinPacker(OperationTypeDto.ORPHAN_FILES_DELETION, MAX); - - private static TableOperationDto op(String uuid) { - return TableOperationDto.builder() - .id(UUID.randomUUID().toString()) - .tableUuid(uuid) - .databaseName("db") - .tableName("tbl_" + uuid) - .operationType(OperationTypeDto.ORPHAN_FILES_DELETION) - .build(); - } - - private static TableStatsDto stats(Long fileCount) { - return TableStatsDto.builder() - .snapshot(TableStatsDto.SnapshotMetrics.builder().numCurrentFiles(fileCount).build()) - .build(); - } - - private static SchedulingCandidate candidate(String uuid, Long fileCount) { - return new SchedulingCandidate(op(uuid), stats(fileCount)); - } - - @Test - void emptyInput_returnsEmptyBins() { - assertThat(packer.pack(List.of())).isEmpty(); - } - - @Test - void singleTable_oneBin() { - SchedulingCandidate c = candidate("uuid-1", 100L); - List bins = packer.pack(List.of(c)); - assertThat(bins).hasSize(1); - assertThat(bins.get(0).getOperations()).containsExactly(c.getOperation()); - } - - @Test - void tablesUnderLimit_oneBin() { - List bins = - packer.pack( - List.of(candidate("a", 300_000L), candidate("b", 300_000L), candidate("c", 300_000L))); - assertThat(bins).hasSize(1); - assertThat(bins.get(0).getOperations()).hasSize(3); - } - - @Test - void tablesOverLimit_twoBins() { - List bins = - packer.pack( - List.of(candidate("a", 600_000L), candidate("b", 600_000L), candidate("c", 400_000L))); - assertThat(bins).hasSize(2); - assertThat(bins.get(0).getOperations()).hasSize(2); // 600k + 400k - assertThat(bins.get(1).getOperations()).hasSize(1); // 600k alone - } - - @Test - void largeTableAlone_exceedsLimitSingleBin() { - SchedulingCandidate big = candidate("big", 5_000_000L); - List bins = packer.pack(List.of(big)); - assertThat(bins).hasSize(1); - assertThat(bins.get(0).getOperations()).containsExactly(big.getOperation()); - } - - @Test - void nullFileCount_treatedAsZero() { - List bins = packer.pack(List.of(candidate("x", null), candidate("y", null))); - assertThat(bins).hasSize(1); - assertThat(bins.get(0).getOperations()).hasSize(2); - } - - @Test - void sortedDescending_largestFirst() { - SchedulingCandidate small = candidate("small", 100L); - SchedulingCandidate large = candidate("large", 900_000L); - List bins = packer.pack(List.of(small, large)); - assertThat(bins).hasSize(1); - List ordered = - bins.get(0).getOperations().stream() - .map(TableOperationDto::getTableUuid) - .collect(Collectors.toList()); - assertThat(ordered).containsExactly("large", "small"); - } - - @Test - void binCarriesOperationType() { - List bins = packer.pack(List.of(candidate("u", 1L))); - assertThat(bins.get(0).getOperationType()).isEqualTo(OperationTypeDto.ORPHAN_FILES_DELETION); - } -} diff --git a/services/optimizer/scheduler/src/test/java/com/linkedin/openhouse/optimizer/scheduler/SchedulerRunnerTest.java b/services/optimizer/scheduler/src/test/java/com/linkedin/openhouse/optimizer/scheduler/SchedulerRunnerTest.java index aa4abce8f..dcd7ec975 100644 --- a/services/optimizer/scheduler/src/test/java/com/linkedin/openhouse/optimizer/scheduler/SchedulerRunnerTest.java +++ b/services/optimizer/scheduler/src/test/java/com/linkedin/openhouse/optimizer/scheduler/SchedulerRunnerTest.java @@ -1,5 +1,6 @@ package com.linkedin.openhouse.optimizer.scheduler; +import static com.linkedin.openhouse.optimizer.model.OperationTypeDto.ORPHAN_FILES_DELETION; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; @@ -14,16 +15,15 @@ import com.linkedin.openhouse.optimizer.db.SnapshotMetrics; import com.linkedin.openhouse.optimizer.db.TableOperationsRow; import com.linkedin.openhouse.optimizer.db.TableStatsRow; -import com.linkedin.openhouse.optimizer.model.OperationTypeDto; import com.linkedin.openhouse.optimizer.repository.TableOperationsRepository; import com.linkedin.openhouse.optimizer.repository.TableStatsRepository; +import com.linkedin.openhouse.optimizer.scheduler.binpack.FirstFitBinPacker; +import com.linkedin.openhouse.optimizer.scheduler.binpack.TotalFilesBinItem; import com.linkedin.openhouse.optimizer.scheduler.client.JobsServiceClient; import java.time.Instant; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.UUID; -import java.util.stream.Collectors; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -34,32 +34,30 @@ @ExtendWith(MockitoExtension.class) class SchedulerRunnerTest { - private static final OperationTypeDto OFD = OperationTypeDto.ORPHAN_FILES_DELETION; - private static final com.linkedin.openhouse.optimizer.db.OperationType OFD_DB = - com.linkedin.openhouse.optimizer.db.OperationType.ORPHAN_FILES_DELETION; - private static final String OFD_STR = OFD.name(); private static final String RESULTS_ENDPOINT = "http://localhost:8080/v1/optimizer/operations"; @Mock private TableOperationsRepository operationsRepo; @Mock private TableStatsRepository statsRepo; @Mock private JobsServiceClient jobsClient; - @Mock private BinPacker binPacker; private SchedulerRunner runner; @BeforeEach void setUp() { + // A real packer — the runner exercises the full pipeline against actual bucketing and the + // packer's projection logic, while the IO is mocked. runner = - new SchedulerRunner( - operationsRepo, statsRepo, jobsClient, Map.of(OFD, binPacker), RESULTS_ENDPOINT); + new SchedulerRunner(operationsRepo, statsRepo, jobsClient, RESULTS_ENDPOINT) + .registerOperation( + ORPHAN_FILES_DELETION, + new FirstFitBinPacker<>(TotalFilesBinItem::new, 1_000_000L, 50)); } // ---- Stubbing helpers ---- - /** Stubs the initial "find PENDING" call. */ private void stubFindPending(List rows) { when(operationsRepo.find( - eq(Optional.of(OFD_DB)), + eq(Optional.of(ORPHAN_FILES_DELETION.toDb())), eq(Optional.of(OperationStatus.PENDING)), eq(Optional.empty()), eq(Optional.empty()), @@ -70,7 +68,6 @@ private void stubFindPending(List rows) { .thenReturn(rows); } - /** Stubs the post-claim "find SCHEDULING" call. */ private void stubFindClaimed(List rows) { when(operationsRepo.find( eq(Optional.empty()), @@ -84,26 +81,13 @@ private void stubFindClaimed(List rows) { .thenReturn(rows); } - /** Stubs the bin packer to return one bin containing every candidate. */ - private void stubOneBinForAllCandidates() { - when(binPacker.pack(anyList())) - .thenAnswer( - inv -> - List.of( - new Bin( - OFD, - inv.>getArgument(0).stream() - .map(SchedulingCandidate::getOperation) - .collect(Collectors.toList())))); - } - private TableOperationsRow pendingRow(String uuid, String db, String table) { return TableOperationsRow.builder() .id(UUID.randomUUID().toString()) .tableUuid(uuid) .databaseName(db) .tableName(table) - .operationType(OFD_DB) + .operationType(ORPHAN_FILES_DELETION.toDb()) .status(OperationStatus.PENDING) .createdAt(Instant.now()) .build(); @@ -122,26 +106,38 @@ private TableStatsRow statsRow(String uuid, long numCurrentFiles) { // ---- Tests ---- + @Test + void schedule_unknownOperationType_throws() { + SchedulerRunner empty = + new SchedulerRunner(operationsRepo, statsRepo, jobsClient, RESULTS_ENDPOINT); + + assertThatThrownBy(() -> empty.schedule(ORPHAN_FILES_DELETION)) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("No BinPacker registered"); + } + + @Test + void getRegisteredOperationTypes_returnsRegisteredSet() { + assertThat(runner.getRegisteredOperationTypes()).containsExactly(ORPHAN_FILES_DELETION); + } + @Test void schedule_noPendingOps_noJobSubmitted() { stubFindPending(List.of()); - runner.schedule(OFD); + runner.schedule(ORPHAN_FILES_DELETION); verify(jobsClient, never()).launch(anyString(), anyString(), anyList(), anyList(), anyString()); - verify(binPacker, never()).pack(anyList()); } @Test - void schedule_unknownOperationType_throws() { - SchedulerRunner emptyRunner = - new SchedulerRunner(operationsRepo, statsRepo, jobsClient, Map.of(), RESULTS_ENDPOINT); + void schedule_allOpsWithoutStats_noJobSubmitted() { + TableOperationsRow row = pendingRow(UUID.randomUUID().toString(), "db1", "tbl1"); + stubFindPending(List.of(row)); + when(statsRepo.findAllById(any())).thenReturn(List.of()); - assertThatThrownBy(() -> emptyRunner.schedule(OFD)) - .isInstanceOf(IllegalStateException.class) - .hasMessageContaining("No BinPacker registered"); + runner.schedule(ORPHAN_FILES_DELETION); - verify(operationsRepo, never()).find(any(), any(), any(), any(), any(), any(), any(), any()); verify(jobsClient, never()).launch(anyString(), anyString(), anyList(), anyList(), anyString()); } @@ -152,7 +148,6 @@ void schedule_singleBin_claimsAndMarksScheduled() { stubFindPending(List.of(row)); when(statsRepo.findAllById(any())).thenReturn(List.of(statsRow(uuid, 100_000L))); - stubOneBinForAllCandidates(); when(operationsRepo.updateBatch( anyList(), eq(OperationStatus.PENDING), eq(OperationStatus.SCHEDULING), any(), any())) .thenReturn(1); @@ -163,7 +158,7 @@ void schedule_singleBin_claimsAndMarksScheduled() { when(jobsClient.launch(anyString(), anyString(), anyList(), anyList(), anyString())) .thenReturn(Optional.of("job-123")); - runner.schedule(OFD); + runner.schedule(ORPHAN_FILES_DELETION); verify(operationsRepo) .updateBatch( @@ -178,7 +173,12 @@ void schedule_singleBin_claimsAndMarksScheduled() { ArgumentCaptor> tableNames = ArgumentCaptor.forClass(List.class); verify(jobsClient) - .launch(anyString(), eq(OFD_STR), tableNames.capture(), anyList(), anyString()); + .launch( + anyString(), + eq(ORPHAN_FILES_DELETION.name()), + tableNames.capture(), + anyList(), + anyString()); assertThat(tableNames.getValue()).containsExactly("db1.tbl1"); } @@ -189,7 +189,6 @@ void schedule_jobLaunchFails_marksPendingForRetry() { stubFindPending(List.of(row)); when(statsRepo.findAllById(any())).thenReturn(List.of(statsRow(uuid, 100L))); - stubOneBinForAllCandidates(); when(operationsRepo.updateBatch( anyList(), eq(OperationStatus.PENDING), eq(OperationStatus.SCHEDULING), any(), any())) .thenReturn(1); @@ -200,7 +199,7 @@ void schedule_jobLaunchFails_marksPendingForRetry() { anyList(), eq(OperationStatus.SCHEDULING), eq(OperationStatus.PENDING), any(), any())) .thenReturn(1); - runner.schedule(OFD); + runner.schedule(ORPHAN_FILES_DELETION); verify(operationsRepo) .updateBatch( @@ -221,13 +220,12 @@ void schedule_rowsAlreadyClaimed_skipsSubmit() { stubFindPending(List.of(row)); when(statsRepo.findAllById(any())).thenReturn(List.of(statsRow(uuid, 100L))); - stubOneBinForAllCandidates(); when(operationsRepo.updateBatch( anyList(), eq(OperationStatus.PENDING), eq(OperationStatus.SCHEDULING), any(), any())) .thenReturn(0); stubFindClaimed(List.of()); - runner.schedule(OFD); + runner.schedule(ORPHAN_FILES_DELETION); verify(jobsClient, never()).launch(anyString(), anyString(), anyList(), anyList(), anyString()); verify(operationsRepo, never()) @@ -247,11 +245,9 @@ void schedule_cancelsDuplicatePendingPerCycle() { stubFindPending(List.of(row1, row2)); when(operationsRepo.cancel(anyList())).thenReturn(1); when(statsRepo.findAllById(any())).thenReturn(List.of(statsRow(uuid, 100L))); - stubOneBinForAllCandidates(); when(operationsRepo.updateBatch( anyList(), eq(OperationStatus.PENDING), eq(OperationStatus.SCHEDULING), any(), any())) .thenReturn(1); - // After dedup, only row1 (oldest by createdAt then id) survives. TableOperationsRow survivor = row1.getCreatedAt().isBefore(row2.getCreatedAt()) ? row1 : row2; if (row1.getCreatedAt().equals(row2.getCreatedAt())) { survivor = row1.getId().compareTo(row2.getId()) <= 0 ? row1 : row2; @@ -263,9 +259,8 @@ void schedule_cancelsDuplicatePendingPerCycle() { when(jobsClient.launch(anyString(), anyString(), anyList(), anyList(), anyString())) .thenReturn(Optional.of("job-dedup")); - runner.schedule(OFD); + runner.schedule(ORPHAN_FILES_DELETION); - // Exactly one ID was cancelled (the duplicate). ArgumentCaptor> cancelled = ArgumentCaptor.forClass(List.class); verify(operationsRepo).cancel(cancelled.capture()); assertThat(cancelled.getValue()).hasSize(1); @@ -281,11 +276,10 @@ void schedule_partialClaim_launchesAndMarksOnlyClaimedSubset() { stubFindPending(List.of(rowA, rowB)); when(statsRepo.findAllById(any())) .thenReturn(List.of(statsRow(uuidA, 100L), statsRow(uuidB, 100L))); - stubOneBinForAllCandidates(); when(operationsRepo.updateBatch( anyList(), eq(OperationStatus.PENDING), eq(OperationStatus.SCHEDULING), any(), any())) .thenReturn(1); - // Only A actually claimed (B owned by another instance). + // Only A actually claimed. stubFindClaimed(List.of(schedulingRow(rowA))); when(operationsRepo.updateBatch( anyList(), eq(OperationStatus.SCHEDULING), eq(OperationStatus.SCHEDULED), any(), any())) @@ -293,7 +287,7 @@ void schedule_partialClaim_launchesAndMarksOnlyClaimedSubset() { when(jobsClient.launch(anyString(), anyString(), anyList(), anyList(), anyString())) .thenReturn(Optional.of("job-partial")); - runner.schedule(OFD); + runner.schedule(ORPHAN_FILES_DELETION); ArgumentCaptor> launchedTableNames = ArgumentCaptor.forClass(List.class); ArgumentCaptor> launchedOpIds = ArgumentCaptor.forClass(List.class); @@ -325,7 +319,6 @@ void schedule_opsWithoutStats_skipped() { stubFindPending(List.of(withStatsRow, missingRow)); when(statsRepo.findAllById(any())).thenReturn(List.of(statsRow(withStats, 50L))); - stubOneBinForAllCandidates(); when(operationsRepo.updateBatch( anyList(), eq(OperationStatus.PENDING), eq(OperationStatus.SCHEDULING), any(), any())) .thenReturn(1); @@ -336,7 +329,7 @@ void schedule_opsWithoutStats_skipped() { when(jobsClient.launch(anyString(), anyString(), anyList(), anyList(), anyString())) .thenReturn(Optional.of("job-skip")); - runner.schedule(OFD); + runner.schedule(ORPHAN_FILES_DELETION); ArgumentCaptor> ids = ArgumentCaptor.forClass(List.class); verify(operationsRepo) @@ -348,17 +341,4 @@ void schedule_opsWithoutStats_skipped() { any()); assertThat(ids.getValue()).containsExactly(withStatsRow.getId()); } - - @Test - void schedule_allOpsWithoutStats_noJobSubmitted() { - TableOperationsRow row = pendingRow(UUID.randomUUID().toString(), "db1", "tbl1"); - - stubFindPending(List.of(row)); - when(statsRepo.findAllById(any())).thenReturn(List.of()); - - runner.schedule(OFD); - - verify(binPacker, never()).pack(anyList()); - verify(jobsClient, never()).launch(anyString(), anyString(), anyList(), anyList(), anyString()); - } } diff --git a/services/optimizer/scheduler/src/test/java/com/linkedin/openhouse/optimizer/scheduler/binpack/FirstFitBinPackerTest.java b/services/optimizer/scheduler/src/test/java/com/linkedin/openhouse/optimizer/scheduler/binpack/FirstFitBinPackerTest.java new file mode 100644 index 000000000..fb77d3963 --- /dev/null +++ b/services/optimizer/scheduler/src/test/java/com/linkedin/openhouse/optimizer/scheduler/binpack/FirstFitBinPackerTest.java @@ -0,0 +1,150 @@ +package com.linkedin.openhouse.optimizer.scheduler.binpack; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.linkedin.openhouse.optimizer.model.TableOperationDto; +import com.linkedin.openhouse.optimizer.model.TableStatsDto; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import lombok.Getter; +import org.junit.jupiter.api.Test; + +/** + * Tests the {@link FirstFitBinPacker} bucketing logic in isolation via a {@link TestItem} whose + * weight comes from a {@code "weight"} entry in {@code tableProperties}. The supplier-then-{@code + * fromOpAndStats} pattern is exercised end-to-end through the public {@code pack} entry point. + * Projection logic for production BinItems (e.g. {@link TotalFilesBinItem}) is covered by their own + * tests. + */ +class FirstFitBinPackerTest { + + @Getter + static class TestItem implements BinItem { + private final String operationId; + private final long weight; + + public TestItem() { + this("", 0L); + } + + private TestItem(String operationId, long weight) { + this.operationId = operationId; + this.weight = weight; + } + + @Override + public String getFullyQualifiedTableName() { + return "db.tbl_" + operationId; + } + + @Override + public BinItem fromOpAndStats(TableOperationDto op, TableStatsDto stats) { + long w = Long.parseLong(stats.getTableProperties().get("weight")); + return new TestItem(op.getId(), w); + } + } + + private static TableOperationDto op(String id) { + return TableOperationDto.builder().id(id).tableUuid(id).build(); + } + + private static TableStatsDto statsWithWeight(String uuid, long weight) { + return TableStatsDto.builder() + .tableUuid(uuid) + .tableProperties(Map.of("weight", Long.toString(weight))) + .build(); + } + + private static List opsList(String... ids) { + return java.util.Arrays.stream(ids).map(FirstFitBinPackerTest::op).collect(Collectors.toList()); + } + + private static Map statsMap(Object... uuidWeightPairs) { + Map map = new HashMap<>(); + for (int i = 0; i < uuidWeightPairs.length; i += 2) { + String uuid = (String) uuidWeightPairs[i]; + long weight = (long) uuidWeightPairs[i + 1]; + map.put(uuid, statsWithWeight(uuid, weight)); + } + return map; + } + + private static FirstFitBinPacker packer(long maxWeight, int maxItems) { + return new FirstFitBinPacker<>(TestItem::new, maxWeight, maxItems); + } + + @Test + void emptyInput_returnsEmptyGroupings() { + assertThat(packer(100L, 10).pack(List.of(), Map.of())).isEmpty(); + } + + @Test + void singleItem_oneGrouping() { + List> groupings = packer(1_000_000L, 10).pack(opsList("a"), statsMap("a", 100L)); + assertThat(groupings).hasSize(1); + assertThat(groupings.get(0)).hasSize(1); + } + + @Test + void underWeightLimit_oneGrouping() { + List> groupings = + packer(1_000_000L, 10) + .pack(opsList("a", "b", "c"), statsMap("a", 300_000L, "b", 300_000L, "c", 300_000L)); + assertThat(groupings).hasSize(1); + assertThat(groupings.get(0)).hasSize(3); + } + + @Test + void overWeightLimit_twoGroupings() { + List> groupings = + packer(1_000_000L, 10) + .pack(opsList("a", "b", "c"), statsMap("a", 600_000L, "b", 600_000L, "c", 400_000L)); + assertThat(groupings).hasSize(2); + // FFD: sort desc → 600, 600, 400. Place 600 → bin0; next 600 doesn't fit bin0, → bin1; 400 + // fits bin0 (total 1_000_000). + long b0 = groupings.get(0).stream().mapToLong(BinItem::getWeight).sum(); + long b1 = groupings.get(1).stream().mapToLong(BinItem::getWeight).sum(); + assertThat(b0).isEqualTo(1_000_000L); + assertThat(b1).isEqualTo(600_000L); + } + + @Test + void itemLargerThanCap_getsOwnGrouping() { + List> groupings = + packer(1_000L, 10).pack(opsList("big"), statsMap("big", 5_000L)); + assertThat(groupings).hasSize(1); + assertThat(groupings.get(0)).hasSize(1); + } + + @Test + void sortedDescending_largestFirst() { + List> groupings = + packer(2_000_000L, 10) + .pack(opsList("small", "large"), statsMap("small", 100L, "large", 900_000L)); + assertThat(groupings).hasSize(1); + List ids = + groupings.get(0).stream().map(BinItem::getOperationId).collect(Collectors.toList()); + assertThat(ids).containsExactly("large", "small"); + } + + @Test + void maxItemsCap_splitsGroupings() { + List> groupings = + packer(1_000_000L, 2) + .pack(opsList("a", "b", "c", "d"), statsMap("a", 1L, "b", 1L, "c", 1L, "d", 1L)); + assertThat(groupings).hasSize(2); + assertThat(groupings.get(0)).hasSize(2); + assertThat(groupings.get(1)).hasSize(2); + } + + @Test + void operationsWithoutStats_dropped() { + List> groupings = + packer(1_000_000L, 10).pack(opsList("a", "missing"), statsMap("a", 100L)); + assertThat(groupings).hasSize(1); + assertThat(groupings.get(0)).hasSize(1); + assertThat(groupings.get(0).get(0).getOperationId()).isEqualTo("a"); + } +} diff --git a/services/optimizer/scheduler/src/test/java/com/linkedin/openhouse/optimizer/scheduler/binpack/TotalFilesBinItemTest.java b/services/optimizer/scheduler/src/test/java/com/linkedin/openhouse/optimizer/scheduler/binpack/TotalFilesBinItemTest.java new file mode 100644 index 000000000..bdbab91d6 --- /dev/null +++ b/services/optimizer/scheduler/src/test/java/com/linkedin/openhouse/optimizer/scheduler/binpack/TotalFilesBinItemTest.java @@ -0,0 +1,70 @@ +package com.linkedin.openhouse.optimizer.scheduler.binpack; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.linkedin.openhouse.optimizer.model.OperationTypeDto; +import com.linkedin.openhouse.optimizer.model.TableOperationDto; +import com.linkedin.openhouse.optimizer.model.TableStatsDto; +import java.util.UUID; +import org.junit.jupiter.api.Test; + +class TotalFilesBinItemTest { + + private static TableOperationDto op() { + return TableOperationDto.builder() + .id(UUID.randomUUID().toString()) + .tableUuid(UUID.randomUUID().toString()) + .databaseName("db1") + .tableName("tbl1") + .operationType(OperationTypeDto.ORPHAN_FILES_DELETION) + .build(); + } + + private static TableStatsDto statsWithFiles(Long fileCount) { + return TableStatsDto.builder() + .snapshot(TableStatsDto.SnapshotMetrics.builder().numCurrentFiles(fileCount).build()) + .build(); + } + + @Test + void fromOpAndStats_buildsFullyQualifiedNameAndOperationId() { + TableOperationDto op = op(); + BinItem item = new TotalFilesBinItem().fromOpAndStats(op, statsWithFiles(42L)); + + assertThat(item.getFullyQualifiedTableName()).isEqualTo("db1.tbl1"); + assertThat(item.getOperationId()).isEqualTo(op.getId()); + } + + @Test + void fromOpAndStats_weightIsCurrentFileCount() { + BinItem item = new TotalFilesBinItem().fromOpAndStats(op(), statsWithFiles(123_456L)); + assertThat(item.getWeight()).isEqualTo(123_456L); + } + + @Test + void fromOpAndStats_nullStats_weightIsZero() { + BinItem item = new TotalFilesBinItem().fromOpAndStats(op(), null); + assertThat(item.getWeight()).isEqualTo(0L); + } + + @Test + void fromOpAndStats_nullSnapshot_weightIsZero() { + BinItem item = new TotalFilesBinItem().fromOpAndStats(op(), TableStatsDto.builder().build()); + assertThat(item.getWeight()).isEqualTo(0L); + } + + @Test + void fromOpAndStats_nullFileCount_weightIsZero() { + BinItem item = new TotalFilesBinItem().fromOpAndStats(op(), statsWithFiles(null)); + assertThat(item.getWeight()).isEqualTo(0L); + } + + @Test + void emptyInstance_doesNotShareStateWithPopulated() { + TotalFilesBinItem empty = new TotalFilesBinItem(); + BinItem populated = empty.fromOpAndStats(op(), statsWithFiles(7L)); + + assertThat(empty.getWeight()).isEqualTo(0L); + assertThat(populated.getWeight()).isEqualTo(7L); + } +} diff --git a/services/optimizer/scheduler/src/test/resources/application-test.properties b/services/optimizer/scheduler/src/test/resources/application-test.properties index b0609fa34..57354728e 100644 --- a/services/optimizer/scheduler/src/test/resources/application-test.properties +++ b/services/optimizer/scheduler/src/test/resources/application-test.properties @@ -6,5 +6,6 @@ spring.sql.init.mode=always spring.sql.init.schema-locations=classpath:db/optimizer-schema.sql optimizer.scheduler.jobs.base-uri=http://localhost:9999 optimizer.scheduler.ofd.max-files-per-bin=1000000 +optimizer.scheduler.ofd.max-tables-per-bin=50 optimizer.scheduler.results-endpoint=http://localhost:8080/v1/optimizer/operations optimizer.scheduler.cluster-id=test-cluster