refactor(scheduler): make bin packing a generic utility per #534 review#626
refactor(scheduler): make bin packing a generic utility per #534 review#626mkuchenbecker wants to merge 13 commits into
Conversation
…m optimizer types Addresses Abhishek's feedback on #534 (Bin.java:26): > Shall we keep bin packing generic as common utility instead of referencing > internal models and operations? That should give more flexibility and we > should be able to integrate well with the optimizer flow as well existing > scheduler flow. I am planning to leverage as a common lib as used in > PR #604. This refactor pulls Bin / BinPacker out of the optimizer types and into a fresh sub-package that mirrors the API shape Abhishek is introducing in #599 as `jobs.util.binpack` (BinItem + Bin + FirstFitDecreasingBinPacker with weight + sizeBytes + items caps). When #599 merges we should be able to swap the import to his shared lib and delete this copy with a one-line change. What changed: - New sub-package `services/optimizer/scheduler/.../binpack/`: - BinItem — concrete struct with fqtn, operationId, tableUuid, databaseName, tableName, weight, sizeBytes. Carries everything the batched Spark app needs to do work and report back, without importing any optimizer DTO type. - Bin — mutable accumulator with totalWeight + totalSizeBytes running totals; package-private mutators, public read-only view. - BinPacker — strategy interface: `List<Bin> pack(List<BinItem>)`. - FirstFitDecreasingBinPacker — three-cap FFD algorithm (maxWeightPerBin, maxSizeBytesPerBin, maxItemsPerBin; 0 disables). An item over any single cap is placed in its own bin rather than dropped. - SchedulerRunner now owns ALL optimizer-specific orchestration. Builds the BinItem list from (TableOperationDto, TableStatsDto) pairs, dispatches to the registered BinPacker, and per returned Bin: claim CAS (PENDING → SCHEDULING), partial-claim narrowing on the BinItem list, JobsServiceClient.launch, mark SCHEDULED with jobId / revert to PENDING. The old Bin.subset() / Bin.schedule() entry points are gone — the runner does it directly, since they were the optimizer-specific bits we wanted out of Bin. - SchedulerConfig wires one FirstFitDecreasingBinPacker per operation type (currently OFD) with configurable caps: optimizer.scheduler.ofd.max-weight-per-bin (default 1_000_000) optimizer.scheduler.ofd.max-size-bytes-per-bin (default 5 TiB) optimizer.scheduler.ofd.max-items-per-bin (default 50) Replaces the old single-cap `max-files-per-bin` property; corresponding env vars renamed (SCHEDULER_OFD_MAX_FILES_PER_BIN → SCHEDULER_OFD_MAX_WEIGHT_PER_BIN, plus two new ones). - SchedulerApplication imports BinPacker from the new sub-package. - Removed: old Bin, BinPacker, FileCountBinPacker, SchedulingCandidate, FileCountBinPackerTest. - New FirstFitDecreasingBinPackerTest covers all three caps, FFD ordering, oversized-item placement, and zero-cap-disables-dimension. - Updated SchedulerRunnerTest: stubs the mock BinPacker by routing through a real FirstFitDecreasingBinPacker with unbounded caps, so the runner's op → BinItem projection is exercised without going around Bin's package-private mutators. Notes for the merge: - Same-shape duplication with #599 is intentional and short-lived; the swap-out is an import rename plus deleting the four binpack/*.java files here. - Existing FileCountBinPacker semantics carry over: file count is the weight dimension. sizeBytes is now populated from TableStatsDto.snapshot.tableSizeBytes when available, otherwise 0 (the packer just ignores the dimension for items that don't carry it). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
| List<Bin> bins = new ArrayList<>(); | ||
| for (BinItem item : sorted) { | ||
| Bin target = null; | ||
| for (Bin bin : bins) { |
There was a problem hiding this comment.
prefer bins.foreach
| List<Bin> bins = new ArrayList<>(); | ||
| for (BinItem item : sorted) { |
There was a problem hiding this comment.
this whole section is a map function.
| optimizer.scheduler.ofd.max-weight-per-bin=${SCHEDULER_OFD_MAX_WEIGHT_PER_BIN:1000000} | ||
| optimizer.scheduler.ofd.max-size-bytes-per-bin=${SCHEDULER_OFD_MAX_SIZE_BYTES_PER_BIN:5497558138880} |
There was a problem hiding this comment.
this seems arbitrary. I don;t like this as a config as its unclear what the unit its. If we are configuring OFD, we should be configuring the binpacker with human-readable options and not system limits. i.e. we should set a # file limit, not a weight limit.
Max bytes per bin does not make sense for OFD because it scales with number of files and not number of bytes.
| * (the packing dimension OFD cares about); sizeBytes is the on-disk footprint when stats expose | ||
| * it, else 0. | ||
| */ | ||
| private static BinItem toBinItem(TableOperationDto op, TableStatsDto stats) { |
There was a problem hiding this comment.
This is the opposite of flexible. We should have an OFD binitem that weights itself based on file size stats. BinItem should be an interface that has implementations.
…m self-weights, functional FFD
Addresses the four review comments on the prior bin-refactor commit:
1. BinItem becomes an interface (`long getWeight()`). Each operation type
ships its own impl that encodes its own cost model. The packer never
imports operation DTOs; the impls do.
2. Bin/BinPacker/FirstFitDecreasingBinPacker are generic over T extends
BinItem. Heterogeneous packers register in a
`Map<OperationTypeDto, BinPacker<? extends BinItem>>` and the
scheduler narrows the wildcard with one cast at the per-op-type
dispatch boundary. Compile-time `T`-consistency end-to-end through
the packer pipeline.
3. New `operations/ofd/OfdBinItem` (package parallel to scheduler) holds
only what the dispatch needs: fqtn, operationId, weight. The
weighting logic — file count from `TableStatsDto.snapshot.
numCurrentFiles` — lives in a private static `currentFileCount` on
the impl, fed by a static factory `OfdBinItem.from(op, stats)` so
callers do
`withStats.stream().map(op -> OfdBinItem.from(op, statsByUuid.
get(op.getTableUuid())))`.
4. FirstFitDecreasingBinPacker.pack() is one stream pipeline:
`items.stream().sorted(...).collect(ArrayList::new, this::placeItem,
List::addAll)`. The inner first-fit search is
`bins.stream().filter(b -> b.fits(...)).findFirst().
ifPresentOrElse(...)`. No imperative for-loops; the fold maintains
the running list of bins as its accumulator. Compiler enforces
T-consistency across the pipeline.
5. Dropped `maxSizeBytesPerBin` entirely. OFD cost is per-file (list +
manifest joins + delete calls); bytes don't add information. A 10 GB
table with 100k files is more expensive to OFD than a 1 TB table
with 2k files. Bin/Packer now carry just `maxWeightPerBin` +
`maxItemsPerBin`. Other op types encode their own dimension in
`getWeight()`; the packer needn't know.
6. OFD config keys back to human-readable per-op vocabulary:
`optimizer.scheduler.ofd.max-files-per-bin` (file count) +
`optimizer.scheduler.ofd.max-tables-per-bin` (table count). Env vars
`SCHEDULER_OFD_MAX_FILES_PER_BIN` + `SCHEDULER_OFD_MAX_TABLES_PER_BIN`.
SchedulerConfig translates these into the packer's
`maxWeightPerBin` + `maxItemsPerBin`.
7. Refactored SchedulerRunner:
- `Map<OperationTypeDto, BinPacker<? extends BinItem>>` registration
- Switch by operation type narrows to BinPacker<OfdBinItem> with one
suppressed unchecked cast (safe by registration invariant; comment
calls out the OperationScheduler<T> handler factoring once a
second op type lands)
- `scheduleOfd(...)` builds `OfdBinItem` via the factory and dispatches
- `submitOfdBin(Bin<OfdBinItem>)` claims, narrows to claimed-only via
OfdBinItem.getOperationId, launches, marks SCHEDULED/PENDING — same
orchestration as before, but typed `Bin<OfdBinItem>` end-to-end
Tests:
- FirstFitDecreasingBinPackerTest uses a local `TestItem implements
BinItem` (no optimizer-domain imports in the binpack test).
Byte-cap test removed; max-items, max-weight, FFD order, oversized,
and zero-cap-disables-dimension all preserved.
- SchedulerRunnerTest mocks `BinPacker<OfdBinItem>` and stubs through
a real FFD packer with unbounded caps so the runner's projection
(op + stats → OfdBinItem) is exercised without bypassing Bin's
package-private mutators.
Divergence from #599: Abhishek's `jobs.util.binpack.BinItem` is a
concrete struct with optimizer-aware identity fields baked in. Ours is
a contract (`long getWeight()`) with per-op impls. The "swap to his
lib by import rename" gimmick no longer applies — instead this PR
proposes the interface-based shape as the common lib, and #599 would
rebase to adopt it (or at minimum offer an interface alongside his
concrete struct). Discussed in PR #626 thread.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
| * @param <T> concrete {@link BinItem} implementation carried by this bin | ||
| */ | ||
| @ToString | ||
| public class Bin<T extends BinItem> { |
There was a problem hiding this comment.
implements vs exends?
…inItem directly Follow-up to the prior commit: drop <T extends BinItem> from Bin, BinPacker, FirstFitDecreasingBinPacker. The interface alone is enough for the polymorphism we wanted — per-op-type impls (OfdBinItem and future siblings) implement BinItem; the packer never knows the concrete type. The dispatcher narrows once at access time. What changed: - Bin → non-generic; items() returns List<BinItem>; fits/add take BinItem. - BinPacker → non-generic interface, `List<Bin> pack(List<? extends BinItem>)`. The wildcard on the input is the standard PECS shape so callers can pass `List<OfdBinItem>` directly without fighting invariance at the call site. - FirstFitDecreasingBinPacker → non-generic, otherwise unchanged (functional stream pack body preserved). - SchedulerConfig → `Map<OperationTypeDto, BinPacker>` (no wildcard); `FirstFitDecreasingBinPacker.builder()` no longer needs a type witness. - SchedulerApplication → `Map<OperationTypeDto, BinPacker>` likewise. - SchedulerRunner → drop generics on the map and on submitOfdBin. The per-op-type switch no longer needs a `BinPacker<OfdBinItem>` cast; the downcast to OfdBinItem happens once at the top of submitOfdBin via `bin.items().stream().map(OfdBinItem.class::cast).collect(toList())`, then everything downstream uses OfdBinItem directly. Same safety invariant — SchedulerConfig only feeds OfdBinItem instances to the OFD packer — just expressed through a runtime cast instead of an unchecked-suppression at the generic boundary. - Tests updated; FirstFitDecreasingBinPackerTest's TestItem stays the same shape but the packer/bins are typed plainly. Why: discussed in PR #626. The type parameter on Bin/BinPacker bought compile-time `T`-consistency through the packer pipeline, but at the cost of `Map<OperationTypeDto, BinPacker<? extends BinItem>>` and an `@SuppressWarnings("unchecked")` cast at the switch boundary. With one op type today, the cleaner shape is no generics on the type and one explicit cast at the access site — the cast's locality makes the invariant obvious. If a future op type adds its own subtle access pattern, we can revisit per-handler abstraction (OperationScheduler). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
| @Builder | ||
| public class FirstFitDecreasingBinPacker implements BinPacker { | ||
|
|
||
| @Builder.Default private final long maxWeightPerBin = 1_000_000L; |
There was a problem hiding this comment.
Do not add abstract knobs. What is a weight in this context that you need to limit the max as a constant? Tell me what 1m unitless weight means and why that is the proper threashold if you want this to remain a constant rather than a parameter.
| @Builder.Default private final int maxItemsPerBin = 50; | ||
|
|
||
| @Override | ||
| public List<Bin> pack(List<? extends BinItem> items) { |
There was a problem hiding this comment.
Any of the <? extents Foo> was in-scope to fix as part of the previous iteration.
Two follow-ups from #626 review: - Drop `@Builder.Default` on `FirstFitDecreasingBinPacker.maxWeightPerBin` (was 1_000_000L) and `.maxItemsPerBin` (was 50). "Weight" has no domain meaning inside the packer — picking a constant there was an arbitrary knob with no story attached. Callers (SchedulerConfig) supply the cap with units and a justification visible at the config site. Primitive defaults (0) carry the "disabled" sentinel meaning if a caller doesn't set the field, so explicit-not-required stays expressible. - Change `BinPacker.pack(List<? extends BinItem>)` to `pack(List<BinItem>)`. The wildcard was a half-measure left from the prior pass — type parameters are gone but the variance marker on the method signature still leaked the generics shape. Callers widen at the call site via a stream type witness: `.<BinItem>map(op -> OfdBinItem .from(...))`. Docstring updated to drop the now-invalid "pass a typed list of a concrete impl" note. - SchedulerRunner.scheduleOfd builds `List<BinItem>` directly via the type witness. Comment names the invariance reason so a future reader doesn't undo it. - SchedulerRunnerTest's stubOneBinForAllItems answer cast updated to `inv.<List<BinItem>>getArgument(0)` to match the new signature. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
| } | ||
|
|
||
| private static long currentFileCount(TableStatsDto stats) { | ||
| if (stats == null || stats.getSnapshot() == null) { |
There was a problem hiding this comment.
why are we allowing nullable? Those functions should return optionals.
| * <p>Both caps are explicit on construction. Neither has a default — "weight" has no domain meaning | ||
| * at this layer, so picking a constant here would be an arbitrary knob; callers (e.g. {@link | ||
| * com.linkedin.openhouse.optimizer.scheduler.config.SchedulerConfig}) supply the per-op- type cap | ||
| * with the unit attached and a justification at the config site. Pass {@code 0} or a negative value | ||
| * for either cap to disable that dimension. |
There was a problem hiding this comment.
Adding rationalle for not adding a default is stupid. You are explaini9ng what you didn't do rather than what it does do. Apply this lesson to the other ocmments as well. Negatives do not make sense and the realistic minimum min size is 1. These need not be constants nor explained. Explain what this does, not what it doesn't do.
|
|
||
| /** Max number of tables per batched OFD Spark job. 0 disables. */ | ||
| @Value("${optimizer.scheduler.ofd.max-tables-per-bin:50}") | ||
| private int ofdMaxTablesPerBin; |
There was a problem hiding this comment.
This should be defined on a binpacker. Scheduler does not care except to configure OFD. Its the difference between a monolith scheduler that grows in complexity with each oepration and one with arbitrary pluggable operations.
…ro op references Bigger structural turn off #626 review. The runner is now a registration map only; OFD lives entirely under operations/ofd/ and shows up as a BinPacker bean. Adding a future operation type is one new @component; the scheduler module is untouched. What changed: - binpack/Bin → interface with `void schedule()`. Bins own their own scheduling (claim CAS, narrow to claimed, launch, mark SCHEDULED/PENDING). - binpack/BinPacker → orchestration interface: `getOperationType()` plus `prepare(db, tableName) → List<Bin>`. Per-op-type orchestrator. - binpack/FirstFitDecreasingBinPacker → standalone algorithm class (not a BinPacker in this vocabulary). Returns flat groupings `List<List<BinItem>>` that the per-op-type packer wraps into its own Bin impl. Internal `PackingBin` helper holds items + totalWeight during the fold so the public `Bin` interface stays minimal. - scheduler/SchedulerRunner → rewritten as a thin dispatcher. Constructor takes `List<BinPacker>` via Spring injection; builds an immutable `Map<OperationTypeDto, BinPacker>` with `Map.copyOf`. `schedule(type)` calls `bp.prepare(...).forEach(Bin::schedule)`. Imports only `binpack.BinPacker`, `binpack.Bin`, and the model enum — no operations.* anywhere. Grep-able invariant. - scheduler/config/SchedulerConfig → stripped to the shared infra (`WebClient`, `JobsServiceClient`, cluster id). No more OFD @value fields, no more binPackers @bean — those moved to OfdBinPacker's ctor. - operations/ofd/OfdBinPacker → new @component implementing BinPacker. Holds @Value-bound caps, an FFD instance, repos, jobs client, results endpoint. `prepare(...)` does load PENDING (OFD-filtered) → dedup → stats lookup → project to OfdBinItem → FFD pack → wrap each grouping in a new OfdBin. - operations/ofd/OfdBin → new class implementing Bin. Holds the bin's OfdBinItems plus refs to repo/jobs client/endpoint. `@Transactional schedule()` does the claim CAS, partial-claim narrow, launch, mark SCHEDULED/PENDING. The OFD-specific job name and arg shape live here. - operations/ofd/OfdBinItem → `currentFileCount` wraps nulls in Optional locally (`Optional.ofNullable(stats).map(...).map(...) .orElse(0L)`). DTO getters stay nullable; converting them to return Optional is deferred to a follow-up PR per the null-is-a-code-smell lesson. - schedulerapp/SchedulerApplication → injects SchedulerRunner only; loops via `runner.getRegisteredOperationTypes()`. Tests reorganized along the same lines: - FirstFitDecreasingBinPackerTest asserts groupings (`List<List<BinItem>>`) instead of bins. zero-cap-disables test gone — caps are required positive now. maxItemsCap test uses a real positive weight cap. - SchedulerRunnerTest slim: three tests for the dispatcher (unknown type throws, delegates to the right packer + schedules each returned bin, passes scope args through). Mock BinPacker, mock Bin, mock the delegation invariant. - OfdBinPackerTest new: covers load-PENDING + dedup + stats-filter + groupings-to-OfdBins. Direct-constructor (no Spring). - OfdBinTest new: covers OfdBin.schedule()'s claim + narrow + launch + mark paths (success, launch fails reverts to PENDING, rows already claimed skips, partial claim launches only the claimed subset). Doc cleanup pass per "describe what code does, not what it doesn't do" lesson: dropped the "no defaults" rationale and "0 disables a dimension" sentinel from FFD's javadoc; Bin.fits (now a private PackingBin method inside FFD) lost its `> 0` guards. Class doc reads positively now. Global side: created ~/.claude/code-lessons.md with the null/Optional lesson, the comment-style lesson, the no-abstract-knobs lesson, the generics-vs-interface lesson, and the scheduler-pluggability principle. Linked from ~/.claude/CLAUDE.md so future sessions surface them proactively. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
… tuple The full restructuring from the PR #626 review. Operations layer deleted; OFD's only footprint is one @bean in SchedulerConfig. binpack/ (op-agnostic): - Bin: pure data class — operationType + items. The scheduler reads from a bin to schedule it; the bin does no IO. - BinItem: interface — getWeight + getFullyQualifiedTableName + getOperationId + withOpAndStats(op, stats). Implementations self-weight from a (pending operation, stats) pair via withOpAndStats on a seat instance (no-arg constructor). - BinPacker: interface — getOperationType + pack(List<BinItem>) → List<Bin>. Stateless, no IO. - FirstFitBinPacker: pure FFD algorithm with file count + item count caps. Constructor takes only immutable configuration (operationType, maxWeightPerBin, maxItemsPerBin); pack() is a pure function. No BinItem instances, no repos. - TotalFilesBinItem: BinItem that weights by current file count. Knows nothing about which operation type uses it — usable by any per-table-fanout job whose Spark cost scales with file count (OFD, stats collection, etc). scheduler/: - BinPackerRegistration: tuple bundling (operationType, packer, prototype). The one place each operation's full identity is composed. - SchedulerRunner: rewritten as the generic dispatcher that owns all IO. Constructor injects List<BinPackerRegistration> via Spring, indexes by operation type into an immutable Map.copyOf. schedule() reads PENDING rows, dedups, fetches stats, projects via the registration's prototype, packs via the registration's packer, then calls scheduleBin(bin) for each result. scheduleBin() is generic: claim CAS (PENDING → SCHEDULING with watermark), re-query for claimed rows, narrow to claimed items, launch a batched Spark job (jobName = "batched-<optype>-<ts>", with claimed tableNames + operationIds + opType + resultsEndpoint), mark SCHEDULED on success or revert to PENDING on launch failure. The runner imports only binpack.* and the model enum — no operations.* anywhere. - SchedulerConfig: cross-cutting beans (WebClient, JobsServiceClient) plus one @bean per operation type. ofdRegistration() wires a FirstFitBinPacker(ORPHAN_FILES_DELETION, maxFiles, maxTables) with a new TotalFilesBinItem() prototype. This is the only file in the scheduler module that references OFD by name. operations/ofd/ deleted entirely. OfdBin, OfdBinItem, OfdBinPacker — gone. The behavior they encoded now lives generically across Bin, TotalFilesBinItem, FirstFitBinPacker, and the @bean wiring. Other small polish in line with the review's lessons: - BinItem.getFullyQualifiedTableName() spelled out, no FQTN abbreviation. - FirstFitBinPacker caps are required positive (no @Builder.Default, no "0 disables" semantic, no arbitrary 1_000_000 constant). - No <? extends Foo> wildcards anywhere — invariance handled with concrete types in the binpack interface (List<BinItem>) and an immutable Map<OperationTypeDto, BinPackerRegistration> for the registry. - BinItem.withOpAndStats returns a new instance; no mutable state on the seat prototype. - OFD's null-chain (stats → snapshot → numCurrentFiles) wrapped in an Optional chain inside TotalFilesBinItem.currentFileCount; DTO Optional conversion deferred to a follow-up PR per the null-is-a-code-smell lesson. Tests rewritten: - FirstFitBinPackerTest: pure algorithm tests with a local TestItem implementing BinItem; covers empty, single, under/over weight cap, oversized-on-its-own, FFD-decreasing order, max-items cap, and that produced bins carry the configured operation type. No optimizer-domain imports. - TotalFilesBinItemTest: covers withOpAndStats projection of fqtn + operationId + weight, and the Optional chain on null stats / null snapshot / null file count. Asserts seat prototype state is not shared with the populated copy. - SchedulerRunnerTest: full pipeline tests with mocked repos and jobs client, real FirstFitBinPacker + TotalFilesBinItem registration. Covers unknown-type-throws, no-pending-ops, ops-without-stats, single-bin claim+launch+mark, launch-fails-reverts, already-claimed skip, dedup-per-cycle, partial-claim launches only claimed subset, and ops-without-stats skipped from the projection. - OfdBinPackerTest, OfdBinTest, FirstFitDecreasingBinPackerTest deleted. SchedulerApplication keeps its prior shape — injects SchedulerRunner, loops runner.getRegisteredOperationTypes().forEach(runner::schedule). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…e.md Sweep for the two recurring style violations the user flagged on the prior commit: - FirstFitBinPacker.pack(): drop the `items == null` guard (callers contract for non-null) and the trailing PackingBin → Bin for-loop. The wrap-into-bins step is now `packingBins.stream().map(pb -> new Bin(operationType, pb.items)).collect(toList())`. The `isEmpty()` early-return goes too — the stream pipeline handles empty input cleanly and the log line is information-neutral. - SchedulerRunner.schedule(): replace `if (reg == null) throw new IllegalStateException(...)` with `Optional.ofNullable(registry.get(type)).orElseThrow(() -> ...)`. The null comes from `Map.get` (stdlib boundary); wrap and dispatch. No behavior change; tests unchanged. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…tBinPacker<T> - Delete BinPackerRegistration; replace with SchedulerRunner.registerOperation(type, packer) - BinPacker.pack(ops, statsByUuid) → List<List<BinItem>>; op type lives only in Bin - FirstFitBinPacker<T extends BinItem> abstract; subclass owns T create(op, stats) - TotalFilesFirstFitBinPacker extends FirstFitBinPacker<TotalFilesBinItem> - TotalFilesBinItem immutable (drop seat ctor + withOpAndStats); BinItem is pure data - SchedulerConfig registers via @PostConstruct on autowired runner Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
| List<BinItem> items = | ||
| operations.stream() | ||
| .filter(op -> statsByTableUuid.containsKey(op.getTableUuid())) | ||
| .map(op -> (BinItem) create(op, statsByTableUuid.get(op.getTableUuid()))) |
There was a problem hiding this comment.
This is cleaner if something like it works. I don't like the abstract function and explicitly reject the creation of TotalFilesFirstFitBinBacker.java
| .map(op -> (BinItem) create(op, statsByTableUuid.get(op.getTableUuid()))) | |
| .map(op -> { | |
| (new T()).fromStats(statsByTableUuid.get(op.getTableUuid())) | |
| }) |
| schedulerRunner.registerOperation( | ||
| OperationTypeDto.ORPHAN_FILES_DELETION, | ||
| new FileCountBinPacker(OperationTypeDto.ORPHAN_FILES_DELETION, ofdMaxFilesPerBin)); | ||
| new TotalFilesFirstFitBinPacker(ofdMaxFilesPerBin, ofdMaxTablesPerBin)); |
There was a problem hiding this comment.
new FirstFitBinPacker<TotalFilesBinItem>(maxWeight = ofdMaxFilesPerBin, maxTables = ofdMaxTablesPerBin)
… factory Drops the abstract method and named subclass per PR feedback. The packer takes a Supplier<T> (typically MyItem::new); pack invokes it per operation to get a seat, then calls fromOpAndStats(op, stats) on the seat to project. - BinItem regains fromOpAndStats(op, stats); contract documents the seat pattern - FirstFitBinPacker<T extends BinItem> concrete; Supplier<T> + caps; pack is final - TotalFilesBinItem keeps no-arg seat + private all-args ctor; fromOpAndStats returns populated copy - Delete TotalFilesFirstFitBinPacker - SchedulerConfig registers new FirstFitBinPacker<>(TotalFilesBinItem::new, max, maxItems) Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
| private final JobsServiceClient jobsClient; | ||
| private final Map<OperationTypeDto, BinPacker> binPackers; | ||
| private final String resultsEndpoint; | ||
| private final Map<OperationTypeDto, BinPacker> registry = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
concurrent is stupid. make it immutable and make RegisterOperation return a copy of SchedulerRunner
'seat' is not a Java idiom and shows up in zero other codebases. Rename the field to binItemSupplier and rewrite the surrounding javadoc + test name to describe the empty no-arg-constructed instance plainly. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
…new runner Drops ConcurrentHashMap per PR feedback. The registry is a Map.copyOf final field; registerOperation(type, packer) returns a new SchedulerRunner with the additional entry — the receiver is unchanged. SchedulerRunner loses @component; SchedulerConfig produces it via @bean and chains the registration so the bean Spring publishes is the fully-registered runner. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Aliasing enum values to abbreviated constants is the worst of both: abbreviated *and* an unnecessary indirection. Static-import OperationTypeDto.ORPHAN_FILES_DELETION and use OperationType.ORPHAN_FILES_DELETION for the DB enum directly. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
…er sites The DB OperationType is an internal mapping; the test should reference only OperationTypeDto and call .toDb() where the repo matcher needs the DB enum. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Summary
Addresses Abhishek's API ask on PR #534 (Bin.java:26): pull
Bin/BinPackerout of the optimizer DTOs and into a fresh sub-package whose API shape matches thejobs.util.binpackhe is introducing in #599. When #599 merges, the swap-out is an import rename + deleting the fourbinpack/*.javafiles here.Changes
New (
services/optimizer/scheduler/.../binpack/):BinItem— concrete carrier struct with fqtn / operationId / tableUuid / databaseName / tableName / weight / sizeBytes. No optimizer DTO imports.Bin— mutable accumulator; package-private mutators, public read-only view.BinPacker— strategy interface:List<Bin> pack(List<BinItem>).FirstFitDecreasingBinPacker— three independent caps (weight, sizeBytes, items); 0 disables a dimension; oversized items get their own bin rather than being dropped.Refactored:
SchedulerRunnernow owns all optimizer-specific orchestration: projects each (TableOperationDto,TableStatsDto) into aBinItem, dispatches to the packer, and per returnedBindoes claim CAS, partial-claim narrowing,JobsServiceClient.launch, and SCHEDULED/PENDING transition.Bin.subset()/Bin.schedule()are gone — those were the optimizer-specific bits we wanted out ofBin.SchedulerConfigregistersFirstFitDecreasingBinPackerper operation type with configurable caps:optimizer.scheduler.ofd.{max-weight,max-size-bytes,max-items}-per-bin. Replaces the old singlemax-files-per-bin.Removed: old
Bin,BinPacker,FileCountBinPacker,SchedulingCandidate,FileCountBinPackerTest.Testing Done
./gradlew :services:optimizer:scheduler:test :apps:optimizer:schedulerapp:test— all green. NewFirstFitDecreasingBinPackerTestcovers all three caps, FFD ordering, oversized-item placement, and the zero-cap-disables-dimension semantics.SchedulerRunnerTestupdated to stub the mockBinPackerby routing through a realFirstFitDecreasingBinPackerwith unbounded caps, so the runner's op →BinItemprojection is exercised without going aroundBin's package-private mutators.Additional Information
optimizer.scheduler.ofd.max-files-per-bin→optimizer.scheduler.ofd.max-weight-per-bin, plus two new caps (max-size-bytes-per-bin,max-items-per-bin). Env vars likewise:SCHEDULER_OFD_MAX_FILES_PER_BIN→SCHEDULER_OFD_MAX_WEIGHT_PER_BIN+ two new. Deployment configs pinning the old name need to update.Same-shape duplication with #599 is intentional and short-lived.
🤖 Generated with Claude Code