Optimizer: Batched orphan file deletion using bin packing#599
Merged
Conversation
17 tasks
17 tasks
mkuchenbecker
added a commit
that referenced
this pull request
May 27, 2026
## Optimizer Stack | PR | Content | |---|---| | #527 | Data Model | | #530 | Database Repos | | #531 | REST service | | #533 **(this)** | Analyzer app | | #534 | Scheduler app | | #599 | Spark BatchedOFD app | | #tbd | Infra, docker-compose, smoke test | ## Summary PR 3 of N in the optimizer stack. Introduces `apps/optimizer-analyzer`, a Spring Boot CommandLineRunner that evaluates every table in `table_stats` against pluggable `OperationAnalyzer` strategies. The first strategy, `OrphanFilesDeletionAnalyzer`, schedules OFD operations with 24h success / 1h failure retry cadence, a 6h SCHEDULED timeout, and a 5-strike circuit breaker. Key design choices: - Bulk-loads operations and history into maps (one query per type), then iterates the stats list — O(types) queries, not O(tables). - Uses the existing generic `find()` repository methods with null params. - Pure unit tests with Mockito — no Spring context needed. ## Changes - [ ] Client-facing API Changes - [ ] Internal API Changes - [ ] Bug Fixes - [x] New Features - [ ] Performance Improvements - [ ] Code Style - [ ] Refactoring - [ ] Documentation - [x] Tests **Core**: `AnalyzerRunner` — loads table_stats, pre-loads operations and history into maps, evaluates each table against all analyzers, circuit breaker logic. **Strategy interface**: `OperationAnalyzer` — `isEnabled(table)`, `shouldSchedule(table, currentOp, latestHistory)`, `getCircuitBreakerThreshold()`. **Cadence policy**: `CadencePolicy` — encapsulates time-based retry logic shared across operation types. **OFD analyzer**: `OrphanFilesDeletionAnalyzer` — enabled via `maintenance.optimizer.ofd.enabled` table property. ## Testing Done - [ ] Manually Tested on local docker setup. Please include commands ran, and their output. - [x] Added new tests for the changes made. - [ ] Updated existing tests to reflect the changes made. - [ ] No tests added or updated. Please explain why. If unsure, please feel free to ask for help. - [ ] Some other form of testing like staging or soak time in production. Please explain. 25 unit tests: - `AnalyzerRunnerTest` (7 tests) — eligible table insertion, cadence skip, disabled table, shouldSchedule=false, null UUID, circuit breaker trip, below-threshold pass - `OrphanFilesDeletionAnalyzerTest` (18 tests) — isEnabled variants, shouldSchedule for no-op/PENDING/SCHEDULING/SCHEDULED with history combinations ``` ./gradlew :apps:optimizer-analyzer:test # BUILD SUCCESSFUL — 25 tests pass ``` # Additional Information - [ ] Breaking Changes - [ ] Deprecations - [x] Large PR broken into smaller PRs, and PR plan linked in the description. --------- Co-authored-by: mkuchenbecker <mkuchenbecker@users.noreply.github.com> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com> Co-authored-by: Abhishek Nath <anath1@linkedin.com>
6 tasks
mkuchenbecker
added a commit
that referenced
this pull request
Jun 1, 2026
…m self-weights, functional FFD
Addresses the four review comments on the prior bin-refactor commit:
1. BinItem becomes an interface (`long getWeight()`). Each operation type
ships its own impl that encodes its own cost model. The packer never
imports operation DTOs; the impls do.
2. Bin/BinPacker/FirstFitDecreasingBinPacker are generic over T extends
BinItem. Heterogeneous packers register in a
`Map<OperationTypeDto, BinPacker<? extends BinItem>>` and the
scheduler narrows the wildcard with one cast at the per-op-type
dispatch boundary. Compile-time `T`-consistency end-to-end through
the packer pipeline.
3. New `operations/ofd/OfdBinItem` (package parallel to scheduler) holds
only what the dispatch needs: fqtn, operationId, weight. The
weighting logic — file count from `TableStatsDto.snapshot.
numCurrentFiles` — lives in a private static `currentFileCount` on
the impl, fed by a static factory `OfdBinItem.from(op, stats)` so
callers do
`withStats.stream().map(op -> OfdBinItem.from(op, statsByUuid.
get(op.getTableUuid())))`.
4. FirstFitDecreasingBinPacker.pack() is one stream pipeline:
`items.stream().sorted(...).collect(ArrayList::new, this::placeItem,
List::addAll)`. The inner first-fit search is
`bins.stream().filter(b -> b.fits(...)).findFirst().
ifPresentOrElse(...)`. No imperative for-loops; the fold maintains
the running list of bins as its accumulator. Compiler enforces
T-consistency across the pipeline.
5. Dropped `maxSizeBytesPerBin` entirely. OFD cost is per-file (list +
manifest joins + delete calls); bytes don't add information. A 10 GB
table with 100k files is more expensive to OFD than a 1 TB table
with 2k files. Bin/Packer now carry just `maxWeightPerBin` +
`maxItemsPerBin`. Other op types encode their own dimension in
`getWeight()`; the packer needn't know.
6. OFD config keys back to human-readable per-op vocabulary:
`optimizer.scheduler.ofd.max-files-per-bin` (file count) +
`optimizer.scheduler.ofd.max-tables-per-bin` (table count). Env vars
`SCHEDULER_OFD_MAX_FILES_PER_BIN` + `SCHEDULER_OFD_MAX_TABLES_PER_BIN`.
SchedulerConfig translates these into the packer's
`maxWeightPerBin` + `maxItemsPerBin`.
7. Refactored SchedulerRunner:
- `Map<OperationTypeDto, BinPacker<? extends BinItem>>` registration
- Switch by operation type narrows to BinPacker<OfdBinItem> with one
suppressed unchecked cast (safe by registration invariant; comment
calls out the OperationScheduler<T> handler factoring once a
second op type lands)
- `scheduleOfd(...)` builds `OfdBinItem` via the factory and dispatches
- `submitOfdBin(Bin<OfdBinItem>)` claims, narrows to claimed-only via
OfdBinItem.getOperationId, launches, marks SCHEDULED/PENDING — same
orchestration as before, but typed `Bin<OfdBinItem>` end-to-end
Tests:
- FirstFitDecreasingBinPackerTest uses a local `TestItem implements
BinItem` (no optimizer-domain imports in the binpack test).
Byte-cap test removed; max-items, max-weight, FFD order, oversized,
and zero-cap-disables-dimension all preserved.
- SchedulerRunnerTest mocks `BinPacker<OfdBinItem>` and stubs through
a real FFD packer with unbounded caps so the runner's projection
(op + stats → OfdBinItem) is exercised without bypassing Bin's
package-private mutators.
Divergence from #599: Abhishek's `jobs.util.binpack.BinItem` is a
concrete struct with optimizer-aware identity fields baked in. Ours is
a contract (`long getWeight()`) with per-op impls. The "swap to his
lib by import rename" gimmick no longer applies — instead this PR
proposes the interface-based shape as the common lib, and #599 would
rebase to adopt it (or at minimum offer an interface alongside his
concrete struct). Discussed in PR #626 thread.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…easingBinPacker class
mkuchenbecker
left a comment
Collaborator
There was a problem hiding this comment.
Added a couple comments. If the binpacker was just renamed and we align on reties im good to ship
mkuchenbecker
previously approved these changes
Jun 8, 2026
mkuchenbecker
left a comment
Collaborator
There was a problem hiding this comment.
I am interested in the ternary operator on boolean, but that is non-blocking.
mkuchenbecker
approved these changes
Jun 8, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
This is a follow up of Optimizer series PRs (Previous PR #534). Introduces
BatchedOrphanFilesDeletionSparkApp, the multi-table counterpart of the existing single-tableOrphanFilesDeletionSparkApp. One Spark job now processes a list of(table, operationId)pairs that the optimizer scheduler bin-packed into a single batch, reporting SUCCESS/FAILED per operation back to the Optimizer Service.Stands up the
optimizerclientcodegen module so consumers (this app, and future ones) talk to the Optimizer Service via an auto-generated client rather than hand-rolled HTTP — mirrors the existing:client:jobsclient/:client:tableclientpattern.Lands a first-fit-decreasing bin packer that is used in the followup PR (#604 ) to assemble tables in batches. Also consolidated existing bin packer under this.
Key design choices
POST /v1/optimizer/operations/{id}/updatefails after retries, the row staysSCHEDULEDand the Analyzer's stale-timeout will re-queue it. No retry storms in the Spark driver.--resultsEndpoint/--operationIds/--tableUuidsare all optional, so the legacyJobsSchedulercan launch the app without optimizer integration. When absent, the per-operation callback isskipped; HTS
StateManagerstill tracks per-job lifecycle.--driverParallelismis honoured verbatim; the app never picks its own thread count.MAX_BATCH_SIZE = 200— hard ceiling enforced atbuildEntries(Spark side) so a misconfigured scheduler can't blow pastARG_MAX. Operating point stays--batchMaxItems 25.Optimizer client (codegen)
services:optimizerwired with the sameservice-specgen-convention+ springdoc plugins asservices:jobs/services:tables/services:housetables. Port 8003 (slots into the existing 8000/8001/8002 allocation).client:optimizerclient— three-linebuild.gradlemirroringclient:jobsclient; generatescom.linkedin.openhouse.optimizer.client.{api,model,invoker}from the spec.client:secureclientpicks upOptimizerApiClientFactory(parallelsJobsApiClientFactory) for SSL/auth-awareApiClientconstruction.OptimizerServiceClientinapps:sparkis a thin wrapper around the generatedTableOperationsControllerApi, mirroringJobsClient's shape —RetryTemplate(newRetryUtil.getOptimizerApiRetryTemplate()), surface isOptional<TableOperationsHistory> updateOperation(operationId, request). No more OkHttp, no more hand-rolled DTOs.Bin packer
FirstFitDecreasingBinPacker— FFD by weight with secondary caps on bytes and item count; oversized items get a dedicated bin. Reuses the existingBin/BinItemtypes from feat(optimizer): [4/N] Scheduler app #534.Batched Spark app
BatchedOrphanFilesDeletionSparkApp— extendsBaseSparkApp; iterates entries via a fixed thread pool, reusesOperations.deleteOrphanFiles(...)per table, posts per-operation status, runs the existingTableStateValidatorper tableAdditional changes
Service-side spec generation (
services/optimizer)build.gradle— addedservice-specgen-convention+ springdoc + processes plugins.openApi.customBootRunoverrides the production MySQL DataSource with in-memory H2 at build time (overrides live inbuild.gradle, not a profile file, sothey don't ship inside the fat jar). Port 8003.
developmentOnly 'com.h2database:h2:2.1.210'— H2 reachesbootRun/specgen but is excluded frombootJarand fromapi/runtimeElementspropagation. Pinned to 2.1.210 to match the version:iceberg:openhouse:htscatalogalready pullsin repo-wide and avoid the slf4j-2.x bump that 2.2.x would have introduced.
Client codegen (
client/optimizerclient)build.gradle) generatingTableOperationsControllerApi,TableStatsControllerApi,TableOperationsHistoryControllerApi+ their DTOs.client/secureclient/OptimizerApiClientFactory.java— SSL/auth-aware factory mirroringJobsApiClientFactory.settings.gradle— registers:client:optimizerclient.Bin packer (
libs/optimizer/binpack)FirstFitDecreasingBinPacker.java(moved from apps/spark). Pre-projectedBinIteminputs, weight + item-count caps, oversized items get their own bin.FirstFitDecreasingBinPackerTest.java(moved + adapted to the libsBinIteminterface).Spark app (
apps/spark)spark/BatchedOrphanFilesDeletionSparkApp.java— multi-table OFD with worker-thread pool, per-table failure isolation,Iterables.size()-based orphan count (bounded driver heap),MAX_BATCH_SIZE=200guard.spark/optimizer/OptimizerServiceClient.java— wraps generatedTableOperationsControllerApi.util/RetryUtil.java— addedgetOptimizerApiRetryTemplate().build.gradle— depends on:libs:optimizer:binpackwith a targetedexcludeonlog4j-slf4j2-impl(transitively brought by services:optimizer; conflicts with apps:spark's bundledlog4j-slf4j-impl1.x bridge).Test using CLI:
--tableNames db.t1,db.t2,db.t3
--operationIds op-uuid-1,op-uuid-2,op-uuid-3
--tableUuids tab-uuid-1,tab-uuid-2,tab-uuid-3
--resultsEndpoint http://optimizer.svc:8080
--driverParallelism 4
plus existing OFD knobs (
--ttl,--backupDir,--concurrentDeletes,--streamResults,--maxOrphanFileSampleSize).Issue] Briefly discuss the summary of the changes made in this
pull request in 2-3 lines.
Changes
For all the boxes checked, please include additional details of the changes made in this pull request.
Testing Done
For all the boxes checked, include a detailed description of the testing done for the changes made in this pull request.
Additional Information
Breaking Changes
Deprecations
Large PR broken into smaller PRs, and PR plan linked in the description.
Open items for reviewers:
OptimizerServiceClientis add on top of the generatedTableOperationsControllerApi— matchesJobsClient's pattern. Fine, or should it live inside:client:optimizerclientas a higher-level facade?Apps:spark currently has to
exclude log4j-slf4j2-implwhen depending on:libs:optimizer:binpackbecause services:optimizer transitively brings the slf4j-2.x bridge while apps:spark ships the 1.x bridge. A repo-wide bridge-alignmentcleanup would let us drop this exclude; happy to file a separate issue.
The bin packer ends up as two siblings in libs (
FirstFitBinPacker— optimizer-coupled;FirstFitDecreasingBinPacker— agnostic). Consolidation is done in FirstFitDecreasingBinPacker.Builder defaults (maxWeightPerBin = 1_000_000L, maxItemsPerBin = 50) are sized for OFD. If we expect other operation types with materially different cost shapes, we may want to move the defaults out of the packer class and into the per-operation config (Spring @value already provides this in SchedulerConfig).
For all the boxes checked, include additional details of the changes made in this pull request.