Skip to content

refactor(scheduler): make bin packing a generic utility per #534 review#626

Closed
mkuchenbecker wants to merge 13 commits into
mkuchenb/optimizer-4from
mkuchenb/optimizer-4.bin
Closed

refactor(scheduler): make bin packing a generic utility per #534 review#626
mkuchenbecker wants to merge 13 commits into
mkuchenb/optimizer-4from
mkuchenb/optimizer-4.bin

Conversation

@mkuchenbecker

Copy link
Copy Markdown
Collaborator

Summary

Addresses Abhishek's API ask on PR #534 (Bin.java:26): pull Bin/BinPacker out of the optimizer DTOs and into a fresh sub-package whose API shape matches the jobs.util.binpack he is introducing in #599. When #599 merges, the swap-out is an import rename + deleting the four binpack/*.java files here.

Shall we keep bin packing generic as common utility instead of referencing internal models and operations? That should give more flexibility and we should be able to integrate well with the optimizer flow as well existing scheduler flow. I am planning to leverage as a common lib as used in this PR — #604.

Changes

  • Internal API Changes
  • Refactoring
  • Tests

New (services/optimizer/scheduler/.../binpack/):

  • BinItem — concrete carrier struct with fqtn / operationId / tableUuid / databaseName / tableName / weight / sizeBytes. No optimizer DTO imports.
  • Bin — mutable accumulator; package-private mutators, public read-only view.
  • BinPacker — strategy interface: List<Bin> pack(List<BinItem>).
  • FirstFitDecreasingBinPacker — three independent caps (weight, sizeBytes, items); 0 disables a dimension; oversized items get their own bin rather than being dropped.

Refactored:

  • SchedulerRunner now owns all optimizer-specific orchestration: projects each (TableOperationDto, TableStatsDto) into a BinItem, dispatches to the packer, and per returned Bin does claim CAS, partial-claim narrowing, JobsServiceClient.launch, and SCHEDULED/PENDING transition. Bin.subset() / Bin.schedule() are gone — those were the optimizer-specific bits we wanted out of Bin.
  • SchedulerConfig registers FirstFitDecreasingBinPacker per operation type with configurable caps: optimizer.scheduler.ofd.{max-weight,max-size-bytes,max-items}-per-bin. Replaces the old single max-files-per-bin.

Removed: old Bin, BinPacker, FileCountBinPacker, SchedulingCandidate, FileCountBinPackerTest.

Testing Done

  • Added new tests for the changes made.
  • Updated existing tests to reflect the changes made.

./gradlew :services:optimizer:scheduler:test :apps:optimizer:schedulerapp:test — all green. New FirstFitDecreasingBinPackerTest covers all three caps, FFD ordering, oversized-item placement, and the zero-cap-disables-dimension semantics. SchedulerRunnerTest updated to stub the mock BinPacker by routing through a real FirstFitDecreasingBinPacker with unbounded caps, so the runner's op → BinItem projection is exercised without going around Bin's package-private mutators.

Additional Information

  • Breaking Changes

optimizer.scheduler.ofd.max-files-per-binoptimizer.scheduler.ofd.max-weight-per-bin, plus two new caps (max-size-bytes-per-bin, max-items-per-bin). Env vars likewise: SCHEDULER_OFD_MAX_FILES_PER_BINSCHEDULER_OFD_MAX_WEIGHT_PER_BIN + two new. Deployment configs pinning the old name need to update.

Same-shape duplication with #599 is intentional and short-lived.

🤖 Generated with Claude Code

…m optimizer types

Addresses Abhishek's feedback on #534 (Bin.java:26):
> Shall we keep bin packing generic as common utility instead of referencing
> internal models and operations? That should give more flexibility and we
> should be able to integrate well with the optimizer flow as well existing
> scheduler flow. I am planning to leverage as a common lib as used in
> PR #604.

This refactor pulls Bin / BinPacker out of the optimizer types and into a
fresh sub-package that mirrors the API shape Abhishek is introducing in
#599 as `jobs.util.binpack` (BinItem + Bin + FirstFitDecreasingBinPacker
with weight + sizeBytes + items caps). When #599 merges we should be able
to swap the import to his shared lib and delete this copy with a one-line
change.

What changed:
- New sub-package `services/optimizer/scheduler/.../binpack/`:
  - BinItem — concrete struct with fqtn, operationId, tableUuid,
    databaseName, tableName, weight, sizeBytes. Carries everything the
    batched Spark app needs to do work and report back, without importing
    any optimizer DTO type.
  - Bin — mutable accumulator with totalWeight + totalSizeBytes running
    totals; package-private mutators, public read-only view.
  - BinPacker — strategy interface: `List<Bin> pack(List<BinItem>)`.
  - FirstFitDecreasingBinPacker — three-cap FFD algorithm
    (maxWeightPerBin, maxSizeBytesPerBin, maxItemsPerBin; 0 disables).
    An item over any single cap is placed in its own bin rather than
    dropped.
- SchedulerRunner now owns ALL optimizer-specific orchestration. Builds
  the BinItem list from (TableOperationDto, TableStatsDto) pairs,
  dispatches to the registered BinPacker, and per returned Bin: claim CAS
  (PENDING → SCHEDULING), partial-claim narrowing on the BinItem list,
  JobsServiceClient.launch, mark SCHEDULED with jobId / revert to PENDING.
  The old Bin.subset() / Bin.schedule() entry points are gone — the runner
  does it directly, since they were the optimizer-specific bits we wanted
  out of Bin.
- SchedulerConfig wires one FirstFitDecreasingBinPacker per operation
  type (currently OFD) with configurable caps:
  optimizer.scheduler.ofd.max-weight-per-bin (default 1_000_000)
  optimizer.scheduler.ofd.max-size-bytes-per-bin (default 5 TiB)
  optimizer.scheduler.ofd.max-items-per-bin (default 50)
  Replaces the old single-cap `max-files-per-bin` property; corresponding
  env vars renamed (SCHEDULER_OFD_MAX_FILES_PER_BIN →
  SCHEDULER_OFD_MAX_WEIGHT_PER_BIN, plus two new ones).
- SchedulerApplication imports BinPacker from the new sub-package.
- Removed: old Bin, BinPacker, FileCountBinPacker, SchedulingCandidate,
  FileCountBinPackerTest.
- New FirstFitDecreasingBinPackerTest covers all three caps, FFD
  ordering, oversized-item placement, and zero-cap-disables-dimension.
- Updated SchedulerRunnerTest: stubs the mock BinPacker by routing
  through a real FirstFitDecreasingBinPacker with unbounded caps, so the
  runner's op → BinItem projection is exercised without going around
  Bin's package-private mutators.

Notes for the merge:
- Same-shape duplication with #599 is intentional and short-lived; the
  swap-out is an import rename plus deleting the four binpack/*.java
  files here.
- Existing FileCountBinPacker semantics carry over: file count is the
  weight dimension. sizeBytes is now populated from
  TableStatsDto.snapshot.tableSizeBytes when available, otherwise 0 (the
  packer just ignores the dimension for items that don't carry it).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
List<Bin> bins = new ArrayList<>();
for (BinItem item : sorted) {
Bin target = null;
for (Bin bin : bins) {

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prefer bins.foreach

Comment on lines +45 to +46
List<Bin> bins = new ArrayList<>();
for (BinItem item : sorted) {

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this whole section is a map function.

Comment on lines +10 to +11
optimizer.scheduler.ofd.max-weight-per-bin=${SCHEDULER_OFD_MAX_WEIGHT_PER_BIN:1000000}
optimizer.scheduler.ofd.max-size-bytes-per-bin=${SCHEDULER_OFD_MAX_SIZE_BYTES_PER_BIN:5497558138880}

@mkuchenbecker mkuchenbecker Jun 1, 2026

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems arbitrary. I don;t like this as a config as its unclear what the unit its. If we are configuring OFD, we should be configuring the binpacker with human-readable options and not system limits. i.e. we should set a # file limit, not a weight limit.

Max bytes per bin does not make sense for OFD because it scales with number of files and not number of bytes.

* (the packing dimension OFD cares about); sizeBytes is the on-disk footprint when stats expose
* it, else 0.
*/
private static BinItem toBinItem(TableOperationDto op, TableStatsDto stats) {

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the opposite of flexible. We should have an OFD binitem that weights itself based on file size stats. BinItem should be an interface that has implementations.

…m self-weights, functional FFD

Addresses the four review comments on the prior bin-refactor commit:

1. BinItem becomes an interface (`long getWeight()`). Each operation type
   ships its own impl that encodes its own cost model. The packer never
   imports operation DTOs; the impls do.

2. Bin/BinPacker/FirstFitDecreasingBinPacker are generic over T extends
   BinItem. Heterogeneous packers register in a
   `Map<OperationTypeDto, BinPacker<? extends BinItem>>` and the
   scheduler narrows the wildcard with one cast at the per-op-type
   dispatch boundary. Compile-time `T`-consistency end-to-end through
   the packer pipeline.

3. New `operations/ofd/OfdBinItem` (package parallel to scheduler) holds
   only what the dispatch needs: fqtn, operationId, weight. The
   weighting logic — file count from `TableStatsDto.snapshot.
   numCurrentFiles` — lives in a private static `currentFileCount` on
   the impl, fed by a static factory `OfdBinItem.from(op, stats)` so
   callers do
   `withStats.stream().map(op -> OfdBinItem.from(op, statsByUuid.
   get(op.getTableUuid())))`.

4. FirstFitDecreasingBinPacker.pack() is one stream pipeline:
   `items.stream().sorted(...).collect(ArrayList::new, this::placeItem,
   List::addAll)`. The inner first-fit search is
   `bins.stream().filter(b -> b.fits(...)).findFirst().
   ifPresentOrElse(...)`. No imperative for-loops; the fold maintains
   the running list of bins as its accumulator. Compiler enforces
   T-consistency across the pipeline.

5. Dropped `maxSizeBytesPerBin` entirely. OFD cost is per-file (list +
   manifest joins + delete calls); bytes don't add information. A 10 GB
   table with 100k files is more expensive to OFD than a 1 TB table
   with 2k files. Bin/Packer now carry just `maxWeightPerBin` +
   `maxItemsPerBin`. Other op types encode their own dimension in
   `getWeight()`; the packer needn't know.

6. OFD config keys back to human-readable per-op vocabulary:
   `optimizer.scheduler.ofd.max-files-per-bin` (file count) +
   `optimizer.scheduler.ofd.max-tables-per-bin` (table count). Env vars
   `SCHEDULER_OFD_MAX_FILES_PER_BIN` + `SCHEDULER_OFD_MAX_TABLES_PER_BIN`.
   SchedulerConfig translates these into the packer's
   `maxWeightPerBin` + `maxItemsPerBin`.

7. Refactored SchedulerRunner:
   - `Map<OperationTypeDto, BinPacker<? extends BinItem>>` registration
   - Switch by operation type narrows to BinPacker<OfdBinItem> with one
     suppressed unchecked cast (safe by registration invariant; comment
     calls out the OperationScheduler<T> handler factoring once a
     second op type lands)
   - `scheduleOfd(...)` builds `OfdBinItem` via the factory and dispatches
   - `submitOfdBin(Bin<OfdBinItem>)` claims, narrows to claimed-only via
     OfdBinItem.getOperationId, launches, marks SCHEDULED/PENDING — same
     orchestration as before, but typed `Bin<OfdBinItem>` end-to-end

Tests:
- FirstFitDecreasingBinPackerTest uses a local `TestItem implements
  BinItem` (no optimizer-domain imports in the binpack test).
  Byte-cap test removed; max-items, max-weight, FFD order, oversized,
  and zero-cap-disables-dimension all preserved.
- SchedulerRunnerTest mocks `BinPacker<OfdBinItem>` and stubs through
  a real FFD packer with unbounded caps so the runner's projection
  (op + stats → OfdBinItem) is exercised without bypassing Bin's
  package-private mutators.

Divergence from #599: Abhishek's `jobs.util.binpack.BinItem` is a
concrete struct with optimizer-aware identity fields baked in. Ours is
a contract (`long getWeight()`) with per-op impls. The "swap to his
lib by import rename" gimmick no longer applies — instead this PR
proposes the interface-based shape as the common lib, and #599 would
rebase to adopt it (or at minimum offer an interface alongside his
concrete struct). Discussed in PR #626 thread.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* @param <T> concrete {@link BinItem} implementation carried by this bin
*/
@ToString
public class Bin<T extends BinItem> {

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

implements vs exends?

…inItem directly

Follow-up to the prior commit: drop <T extends BinItem> from Bin,
BinPacker, FirstFitDecreasingBinPacker. The interface alone is enough
for the polymorphism we wanted — per-op-type impls (OfdBinItem and
future siblings) implement BinItem; the packer never knows the concrete
type. The dispatcher narrows once at access time.

What changed:

- Bin → non-generic; items() returns List<BinItem>; fits/add take BinItem.
- BinPacker → non-generic interface, `List<Bin> pack(List<? extends BinItem>)`.
  The wildcard on the input is the standard PECS shape so callers can
  pass `List<OfdBinItem>` directly without fighting invariance at the
  call site.
- FirstFitDecreasingBinPacker → non-generic, otherwise unchanged
  (functional stream pack body preserved).
- SchedulerConfig → `Map<OperationTypeDto, BinPacker>` (no wildcard);
  `FirstFitDecreasingBinPacker.builder()` no longer needs a type witness.
- SchedulerApplication → `Map<OperationTypeDto, BinPacker>` likewise.
- SchedulerRunner → drop generics on the map and on submitOfdBin. The
  per-op-type switch no longer needs a `BinPacker<OfdBinItem>` cast; the
  downcast to OfdBinItem happens once at the top of submitOfdBin via
  `bin.items().stream().map(OfdBinItem.class::cast).collect(toList())`,
  then everything downstream uses OfdBinItem directly. Same safety
  invariant — SchedulerConfig only feeds OfdBinItem instances to the
  OFD packer — just expressed through a runtime cast instead of an
  unchecked-suppression at the generic boundary.
- Tests updated; FirstFitDecreasingBinPackerTest's TestItem stays the
  same shape but the packer/bins are typed plainly.

Why: discussed in PR #626. The type parameter on Bin/BinPacker bought
compile-time `T`-consistency through the packer pipeline, but at the
cost of `Map<OperationTypeDto, BinPacker<? extends BinItem>>` and an
`@SuppressWarnings("unchecked")` cast at the switch boundary. With one
op type today, the cleaner shape is no generics on the type and one
explicit cast at the access site — the cast's locality makes the
invariant obvious. If a future op type adds its own subtle access
pattern, we can revisit per-handler abstraction (OperationScheduler).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@Builder
public class FirstFitDecreasingBinPacker implements BinPacker {

@Builder.Default private final long maxWeightPerBin = 1_000_000L;

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not add abstract knobs. What is a weight in this context that you need to limit the max as a constant? Tell me what 1m unitless weight means and why that is the proper threashold if you want this to remain a constant rather than a parameter.

@Builder.Default private final int maxItemsPerBin = 50;

@Override
public List<Bin> pack(List<? extends BinItem> items) {

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any of the <? extents Foo> was in-scope to fix as part of the previous iteration.

Two follow-ups from #626 review:

- Drop `@Builder.Default` on `FirstFitDecreasingBinPacker.maxWeightPerBin`
  (was 1_000_000L) and `.maxItemsPerBin` (was 50). "Weight" has no
  domain meaning inside the packer — picking a constant there was an
  arbitrary knob with no story attached. Callers (SchedulerConfig)
  supply the cap with units and a justification visible at the config
  site. Primitive defaults (0) carry the "disabled" sentinel meaning if
  a caller doesn't set the field, so explicit-not-required stays
  expressible.

- Change `BinPacker.pack(List<? extends BinItem>)` to
  `pack(List<BinItem>)`. The wildcard was a half-measure left from the
  prior pass — type parameters are gone but the variance marker on the
  method signature still leaked the generics shape. Callers widen at the
  call site via a stream type witness: `.<BinItem>map(op -> OfdBinItem
  .from(...))`. Docstring updated to drop the now-invalid "pass a typed
  list of a concrete impl" note.

- SchedulerRunner.scheduleOfd builds `List<BinItem>` directly via the
  type witness. Comment names the invariance reason so a future reader
  doesn't undo it.

- SchedulerRunnerTest's stubOneBinForAllItems answer cast updated to
  `inv.<List<BinItem>>getArgument(0)` to match the new signature.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
}

private static long currentFileCount(TableStatsDto stats) {
if (stats == null || stats.getSnapshot() == null) {

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we allowing nullable? Those functions should return optionals.

Comment on lines +17 to +21
* <p>Both caps are explicit on construction. Neither has a default — "weight" has no domain meaning
* at this layer, so picking a constant here would be an arbitrary knob; callers (e.g. {@link
* com.linkedin.openhouse.optimizer.scheduler.config.SchedulerConfig}) supply the per-op- type cap
* with the unit attached and a justification at the config site. Pass {@code 0} or a negative value
* for either cap to disable that dimension.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding rationalle for not adding a default is stupid. You are explaini9ng what you didn't do rather than what it does do. Apply this lesson to the other ocmments as well. Negatives do not make sense and the realistic minimum min size is 1. These need not be constants nor explained. Explain what this does, not what it doesn't do.


/** Max number of tables per batched OFD Spark job. 0 disables. */
@Value("${optimizer.scheduler.ofd.max-tables-per-bin:50}")
private int ofdMaxTablesPerBin;

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be defined on a binpacker. Scheduler does not care except to configure OFD. Its the difference between a monolith scheduler that grows in complexity with each oepration and one with arbitrary pluggable operations.

mkuchenbecker and others added 2 commits June 2, 2026 08:20
…ro op references

Bigger structural turn off #626 review. The runner is now a registration
map only; OFD lives entirely under operations/ofd/ and shows up as a
BinPacker bean. Adding a future operation type is one new @component;
the scheduler module is untouched.

What changed:

- binpack/Bin → interface with `void schedule()`. Bins own their own
  scheduling (claim CAS, narrow to claimed, launch, mark
  SCHEDULED/PENDING).
- binpack/BinPacker → orchestration interface: `getOperationType()` plus
  `prepare(db, tableName) → List<Bin>`. Per-op-type orchestrator.
- binpack/FirstFitDecreasingBinPacker → standalone algorithm class (not
  a BinPacker in this vocabulary). Returns flat groupings
  `List<List<BinItem>>` that the per-op-type packer wraps into its own
  Bin impl. Internal `PackingBin` helper holds items + totalWeight
  during the fold so the public `Bin` interface stays minimal.
- scheduler/SchedulerRunner → rewritten as a thin dispatcher.
  Constructor takes `List<BinPacker>` via Spring injection; builds an
  immutable `Map<OperationTypeDto, BinPacker>` with `Map.copyOf`.
  `schedule(type)` calls `bp.prepare(...).forEach(Bin::schedule)`.
  Imports only `binpack.BinPacker`, `binpack.Bin`, and the model enum —
  no operations.* anywhere. Grep-able invariant.
- scheduler/config/SchedulerConfig → stripped to the shared infra
  (`WebClient`, `JobsServiceClient`, cluster id). No more OFD @value
  fields, no more binPackers @bean — those moved to OfdBinPacker's ctor.
- operations/ofd/OfdBinPacker → new @component implementing BinPacker.
  Holds @Value-bound caps, an FFD instance, repos, jobs client, results
  endpoint. `prepare(...)` does load PENDING (OFD-filtered) → dedup →
  stats lookup → project to OfdBinItem → FFD pack → wrap each grouping
  in a new OfdBin.
- operations/ofd/OfdBin → new class implementing Bin. Holds the bin's
  OfdBinItems plus refs to repo/jobs client/endpoint. `@Transactional
  schedule()` does the claim CAS, partial-claim narrow, launch, mark
  SCHEDULED/PENDING. The OFD-specific job name and arg shape live here.
- operations/ofd/OfdBinItem → `currentFileCount` wraps nulls in
  Optional locally (`Optional.ofNullable(stats).map(...).map(...)
  .orElse(0L)`). DTO getters stay nullable; converting them to return
  Optional is deferred to a follow-up PR per the null-is-a-code-smell
  lesson.
- schedulerapp/SchedulerApplication → injects SchedulerRunner only;
  loops via `runner.getRegisteredOperationTypes()`.

Tests reorganized along the same lines:
- FirstFitDecreasingBinPackerTest asserts groupings (`List<List<BinItem>>`)
  instead of bins. zero-cap-disables test gone — caps are required
  positive now. maxItemsCap test uses a real positive weight cap.
- SchedulerRunnerTest slim: three tests for the dispatcher (unknown type
  throws, delegates to the right packer + schedules each returned bin,
  passes scope args through). Mock BinPacker, mock Bin, mock the
  delegation invariant.
- OfdBinPackerTest new: covers load-PENDING + dedup + stats-filter +
  groupings-to-OfdBins. Direct-constructor (no Spring).
- OfdBinTest new: covers OfdBin.schedule()'s claim + narrow + launch +
  mark paths (success, launch fails reverts to PENDING, rows already
  claimed skips, partial claim launches only the claimed subset).

Doc cleanup pass per "describe what code does, not what it doesn't do"
lesson: dropped the "no defaults" rationale and "0 disables a dimension"
sentinel from FFD's javadoc; Bin.fits (now a private PackingBin method
inside FFD) lost its `> 0` guards. Class doc reads positively now.

Global side: created ~/.claude/code-lessons.md with the null/Optional
lesson, the comment-style lesson, the no-abstract-knobs lesson, the
generics-vs-interface lesson, and the scheduler-pluggability principle.
Linked from ~/.claude/CLAUDE.md so future sessions surface them
proactively.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
… tuple

The full restructuring from the PR #626 review. Operations layer
deleted; OFD's only footprint is one @bean in SchedulerConfig.

binpack/ (op-agnostic):
- Bin: pure data class — operationType + items. The scheduler reads
  from a bin to schedule it; the bin does no IO.
- BinItem: interface — getWeight + getFullyQualifiedTableName +
  getOperationId + withOpAndStats(op, stats). Implementations
  self-weight from a (pending operation, stats) pair via withOpAndStats
  on a seat instance (no-arg constructor).
- BinPacker: interface — getOperationType + pack(List<BinItem>) →
  List<Bin>. Stateless, no IO.
- FirstFitBinPacker: pure FFD algorithm with file count + item count
  caps. Constructor takes only immutable configuration (operationType,
  maxWeightPerBin, maxItemsPerBin); pack() is a pure function. No
  BinItem instances, no repos.
- TotalFilesBinItem: BinItem that weights by current file count. Knows
  nothing about which operation type uses it — usable by any
  per-table-fanout job whose Spark cost scales with file count (OFD,
  stats collection, etc).

scheduler/:
- BinPackerRegistration: tuple bundling (operationType, packer,
  prototype). The one place each operation's full identity is composed.
- SchedulerRunner: rewritten as the generic dispatcher that owns all
  IO. Constructor injects List<BinPackerRegistration> via Spring,
  indexes by operation type into an immutable Map.copyOf. schedule()
  reads PENDING rows, dedups, fetches stats, projects via the
  registration's prototype, packs via the registration's packer, then
  calls scheduleBin(bin) for each result. scheduleBin() is generic:
  claim CAS (PENDING → SCHEDULING with watermark), re-query for
  claimed rows, narrow to claimed items, launch a batched Spark job
  (jobName = "batched-<optype>-<ts>", with claimed tableNames +
  operationIds + opType + resultsEndpoint), mark SCHEDULED on
  success or revert to PENDING on launch failure. The runner imports
  only binpack.* and the model enum — no operations.* anywhere.
- SchedulerConfig: cross-cutting beans (WebClient, JobsServiceClient)
  plus one @bean per operation type. ofdRegistration() wires a
  FirstFitBinPacker(ORPHAN_FILES_DELETION, maxFiles, maxTables) with a
  new TotalFilesBinItem() prototype. This is the only file in the
  scheduler module that references OFD by name.

operations/ofd/ deleted entirely. OfdBin, OfdBinItem, OfdBinPacker —
gone. The behavior they encoded now lives generically across Bin,
TotalFilesBinItem, FirstFitBinPacker, and the @bean wiring.

Other small polish in line with the review's lessons:
- BinItem.getFullyQualifiedTableName() spelled out, no FQTN
  abbreviation.
- FirstFitBinPacker caps are required positive (no @Builder.Default,
  no "0 disables" semantic, no arbitrary 1_000_000 constant).
- No <? extends Foo> wildcards anywhere — invariance handled with
  concrete types in the binpack interface (List<BinItem>) and an
  immutable Map<OperationTypeDto, BinPackerRegistration> for the
  registry.
- BinItem.withOpAndStats returns a new instance; no mutable state on
  the seat prototype.
- OFD's null-chain (stats → snapshot → numCurrentFiles) wrapped in an
  Optional chain inside TotalFilesBinItem.currentFileCount; DTO Optional
  conversion deferred to a follow-up PR per the null-is-a-code-smell
  lesson.

Tests rewritten:
- FirstFitBinPackerTest: pure algorithm tests with a local TestItem
  implementing BinItem; covers empty, single, under/over weight cap,
  oversized-on-its-own, FFD-decreasing order, max-items cap, and that
  produced bins carry the configured operation type. No optimizer-domain
  imports.
- TotalFilesBinItemTest: covers withOpAndStats projection of fqtn +
  operationId + weight, and the Optional chain on null stats / null
  snapshot / null file count. Asserts seat prototype state is not
  shared with the populated copy.
- SchedulerRunnerTest: full pipeline tests with mocked repos and jobs
  client, real FirstFitBinPacker + TotalFilesBinItem registration.
  Covers unknown-type-throws, no-pending-ops, ops-without-stats,
  single-bin claim+launch+mark, launch-fails-reverts, already-claimed
  skip, dedup-per-cycle, partial-claim launches only claimed subset,
  and ops-without-stats skipped from the projection.
- OfdBinPackerTest, OfdBinTest, FirstFitDecreasingBinPackerTest
  deleted.

SchedulerApplication keeps its prior shape — injects SchedulerRunner,
loops runner.getRegisteredOperationTypes().forEach(runner::schedule).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…e.md

Sweep for the two recurring style violations the user flagged on the prior
commit:

- FirstFitBinPacker.pack(): drop the `items == null` guard (callers
  contract for non-null) and the trailing PackingBin → Bin for-loop.
  The wrap-into-bins step is now `packingBins.stream().map(pb -> new
  Bin(operationType, pb.items)).collect(toList())`. The `isEmpty()`
  early-return goes too — the stream pipeline handles empty input
  cleanly and the log line is information-neutral.
- SchedulerRunner.schedule(): replace
  `if (reg == null) throw new IllegalStateException(...)` with
  `Optional.ofNullable(registry.get(type)).orElseThrow(() -> ...)`. The
  null comes from `Map.get` (stdlib boundary); wrap and dispatch.

No behavior change; tests unchanged.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…tBinPacker<T>

- Delete BinPackerRegistration; replace with SchedulerRunner.registerOperation(type, packer)
- BinPacker.pack(ops, statsByUuid) → List<List<BinItem>>; op type lives only in Bin
- FirstFitBinPacker<T extends BinItem> abstract; subclass owns T create(op, stats)
- TotalFilesFirstFitBinPacker extends FirstFitBinPacker<TotalFilesBinItem>
- TotalFilesBinItem immutable (drop seat ctor + withOpAndStats); BinItem is pure data
- SchedulerConfig registers via @PostConstruct on autowired runner

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
List<BinItem> items =
operations.stream()
.filter(op -> statsByTableUuid.containsKey(op.getTableUuid()))
.map(op -> (BinItem) create(op, statsByTableUuid.get(op.getTableUuid())))

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is cleaner if something like it works. I don't like the abstract function and explicitly reject the creation of TotalFilesFirstFitBinBacker.java

Suggested change
.map(op -> (BinItem) create(op, statsByTableUuid.get(op.getTableUuid())))
.map(op -> {
(new T()).fromStats(statsByTableUuid.get(op.getTableUuid()))
})

schedulerRunner.registerOperation(
OperationTypeDto.ORPHAN_FILES_DELETION,
new FileCountBinPacker(OperationTypeDto.ORPHAN_FILES_DELETION, ofdMaxFilesPerBin));
new TotalFilesFirstFitBinPacker(ofdMaxFilesPerBin, ofdMaxTablesPerBin));

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new FirstFitBinPacker<TotalFilesBinItem>(maxWeight = ofdMaxFilesPerBin, maxTables = ofdMaxTablesPerBin)

… factory

Drops the abstract method and named subclass per PR feedback. The packer
takes a Supplier<T> (typically MyItem::new); pack invokes it per operation
to get a seat, then calls fromOpAndStats(op, stats) on the seat to project.

- BinItem regains fromOpAndStats(op, stats); contract documents the seat pattern
- FirstFitBinPacker<T extends BinItem> concrete; Supplier<T> + caps; pack is final
- TotalFilesBinItem keeps no-arg seat + private all-args ctor; fromOpAndStats returns populated copy
- Delete TotalFilesFirstFitBinPacker
- SchedulerConfig registers new FirstFitBinPacker<>(TotalFilesBinItem::new, max, maxItems)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
private final JobsServiceClient jobsClient;
private final Map<OperationTypeDto, BinPacker> binPackers;
private final String resultsEndpoint;
private final Map<OperationTypeDto, BinPacker> registry = new ConcurrentHashMap<>();

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

concurrent is stupid. make it immutable and make RegisterOperation return a copy of SchedulerRunner

mkuchenbecker and others added 4 commits June 2, 2026 13:14
'seat' is not a Java idiom and shows up in zero other codebases. Rename
the field to binItemSupplier and rewrite the surrounding javadoc + test
name to describe the empty no-arg-constructed instance plainly.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
…new runner

Drops ConcurrentHashMap per PR feedback. The registry is a Map.copyOf
final field; registerOperation(type, packer) returns a new SchedulerRunner
with the additional entry — the receiver is unchanged. SchedulerRunner
loses @component; SchedulerConfig produces it via @bean and chains the
registration so the bean Spring publishes is the fully-registered runner.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Aliasing enum values to abbreviated constants is the worst of both:
abbreviated *and* an unnecessary indirection. Static-import
OperationTypeDto.ORPHAN_FILES_DELETION and use OperationType.ORPHAN_FILES_DELETION
for the DB enum directly.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
…er sites

The DB OperationType is an internal mapping; the test should reference
only OperationTypeDto and call .toDb() where the repo matcher needs the
DB enum.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant