Skip to content

Optimizer: Batched orphan file deletion using bin packing#599

Merged
abhisheknath2011 merged 9 commits into
linkedin:mainfrom
abhisheknath2011:batched-ofd
Jun 8, 2026
Merged

Optimizer: Batched orphan file deletion using bin packing#599
abhisheknath2011 merged 9 commits into
linkedin:mainfrom
abhisheknath2011:batched-ofd

Conversation

@abhisheknath2011

@abhisheknath2011 abhisheknath2011 commented May 22, 2026

Copy link
Copy Markdown
Member

Summary

This is a follow up of Optimizer series PRs (Previous PR #534). Introduces BatchedOrphanFilesDeletionSparkApp, the multi-table counterpart of the existing single-table OrphanFilesDeletionSparkApp. One Spark job now processes a list of (table, operationId) pairs that the optimizer scheduler bin-packed into a single batch, reporting SUCCESS/FAILED per operation back to the Optimizer Service.

Stands up the optimizerclient codegen module so consumers (this app, and future ones) talk to the Optimizer Service via an auto-generated client rather than hand-rolled HTTP — mirrors the existing
:client:jobsclient/:client:tableclient pattern.

Lands a first-fit-decreasing bin packer that is used in the followup PR (#604 ) to assemble tables in batches. Also consolidated existing bin packer under this.

Key design choices

  • Per-table failure isolation — exceptions in one table are caught, FAILED is posted for that operationId, and remaining tables continue. The job exits 0 if at least one table succeeds.
  • Recoverable result reporting — if POST /v1/optimizer/operations/{id}/update fails after retries, the row stays SCHEDULED and the Analyzer's stale-timeout will re-queue it. No retry storms in the Spark driver.
  • Optional optimizer-service callbacks--resultsEndpoint/--operationIds/--tableUuids are all optional, so the legacy JobsScheduler can launch the app without optimizer integration. When absent, the per-operation callback is
    skipped; HTS StateManager still tracks per-job lifecycle.
  • Scheduler decides parallelism, not the app--driverParallelism is honoured verbatim; the app never picks its own thread count.
  • MAX_BATCH_SIZE = 200 — hard ceiling enforced at buildEntries (Spark side) so a misconfigured scheduler can't blow past ARG_MAX. Operating point stays --batchMaxItems 25.

Optimizer client (codegen)

  • services:optimizer wired with the same service-specgen-convention + springdoc plugins as services:jobs/services:tables/services:housetables. Port 8003 (slots into the existing 8000/8001/8002 allocation).
  • client:optimizerclient — three-line build.gradle mirroring client:jobsclient; generates com.linkedin.openhouse.optimizer.client.{api,model,invoker} from the spec.
  • client:secureclient picks up OptimizerApiClientFactory (parallels JobsApiClientFactory) for SSL/auth-aware ApiClient construction.
  • OptimizerServiceClient in apps:spark is a thin wrapper around the generated TableOperationsControllerApi, mirroring JobsClient's shape — RetryTemplate (new RetryUtil.getOptimizerApiRetryTemplate()), surface is
    Optional<TableOperationsHistory> updateOperation(operationId, request). No more OkHttp, no more hand-rolled DTOs.

Bin packer

  • FirstFitDecreasingBinPacker — FFD by weight with secondary caps on bytes and item count; oversized items get a dedicated bin. Reuses the existing Bin/BinItem types from feat(optimizer): [4/N] Scheduler app #534.
  • FirstFitBinPacker.java and FirstFitBinPackerTest.java — deleted (renamed / consolidated).

Batched Spark app

  • BatchedOrphanFilesDeletionSparkApp — extends BaseSparkApp; iterates entries via a fixed thread pool, reuses Operations.deleteOrphanFiles(...) per table, posts per-operation status, runs the existing TableStateValidator per table

Additional changes

Service-side spec generation (services/optimizer)

  • build.gradle — added service-specgen-convention + springdoc + processes plugins. openApi.customBootRun overrides the production MySQL DataSource with in-memory H2 at build time (overrides live in build.gradle, not a profile file, so
    they don't ship inside the fat jar). Port 8003.
  • developmentOnly 'com.h2database:h2:2.1.210' — H2 reaches bootRun/specgen but is excluded from bootJar and from api/runtimeElements propagation. Pinned to 2.1.210 to match the version :iceberg:openhouse:htscatalog already pulls
    in repo-wide and avoid the slf4j-2.x bump that 2.2.x would have introduced.

Client codegen (client/optimizerclient)

  • New module (3-line build.gradle) generating TableOperationsControllerApi, TableStatsControllerApi, TableOperationsHistoryControllerApi + their DTOs.
  • client/secureclient/OptimizerApiClientFactory.java — SSL/auth-aware factory mirroring JobsApiClientFactory.
  • settings.gradle — registers :client:optimizerclient.

Bin packer (libs/optimizer/binpack)

  • FirstFitDecreasingBinPacker.java (moved from apps/spark). Pre-projected BinItem inputs, weight + item-count caps, oversized items get their own bin.
  • FirstFitDecreasingBinPackerTest.java (moved + adapted to the libs BinItem interface).

Spark app (apps/spark)

  • spark/BatchedOrphanFilesDeletionSparkApp.java — multi-table OFD with worker-thread pool, per-table failure isolation, Iterables.size()-based orphan count (bounded driver heap), MAX_BATCH_SIZE=200 guard.
  • spark/optimizer/OptimizerServiceClient.java — wraps generated TableOperationsControllerApi.
  • util/RetryUtil.java — added getOptimizerApiRetryTemplate().
  • build.gradle — depends on :libs:optimizer:binpack with a targeted exclude on log4j-slf4j2-impl (transitively brought by services:optimizer; conflicts with apps:spark's bundled log4j-slf4j-impl 1.x bridge).

Test using CLI:
--tableNames db.t1,db.t2,db.t3
--operationIds op-uuid-1,op-uuid-2,op-uuid-3
--tableUuids tab-uuid-1,tab-uuid-2,tab-uuid-3
--resultsEndpoint http://optimizer.svc:8080
--driverParallelism 4
plus existing OFD knobs (--ttl, --backupDir, --concurrentDeletes, --streamResults, --maxOrphanFileSampleSize).

Issue] Briefly discuss the summary of the changes made in this
pull request in 2-3 lines.

Changes

  • Client-facing API Changes
  • Internal API Changes
  • Bug Fixes
  • New Features
  • Performance Improvements
  • Code Style
  • Refactoring
  • Documentation
  • Tests

For all the boxes checked, please include additional details of the changes made in this pull request.

Testing Done

  • Manually Tested on local docker setup. Please include commands ran, and their output.
  • Added new tests for the changes made.
  • Updated existing tests to reflect the changes made.
  • No tests added or updated. Please explain why. If unsure, please feel free to ask for help.
  • Some other form of testing like staging or soak time in production. Please explain.

For all the boxes checked, include a detailed description of the testing done for the changes made in this pull request.

Additional Information

  • Breaking Changes

  • Deprecations

  • Large PR broken into smaller PRs, and PR plan linked in the description.

    Open items for reviewers:

  • OptimizerServiceClient is add on top of the generated TableOperationsControllerApi — matches JobsClient's pattern. Fine, or should it live inside :client:optimizerclient as a higher-level facade?

  • Apps:spark currently has to exclude log4j-slf4j2-impl when depending on :libs:optimizer:binpack because services:optimizer transitively brings the slf4j-2.x bridge while apps:spark ships the 1.x bridge. A repo-wide bridge-alignment
    cleanup would let us drop this exclude; happy to file a separate issue.

  • The bin packer ends up as two siblings in libs (FirstFitBinPacker — optimizer-coupled; FirstFitDecreasingBinPacker — agnostic). Consolidation is done in FirstFitDecreasingBinPacker.

  • Builder defaults (maxWeightPerBin = 1_000_000L, maxItemsPerBin = 50) are sized for OFD. If we expect other operation types with materially different cost shapes, we may want to move the defaults out of the packer class and into the per-operation config (Spring @value already provides this in SchedulerConfig).

For all the boxes checked, include additional details of the changes made in this pull request.

@mkuchenbecker mkuchenbecker mentioned this pull request May 22, 2026
17 tasks
mkuchenbecker added a commit that referenced this pull request May 27, 2026
## Optimizer Stack

| PR | Content |
|---|---|
| #527 | Data Model |
| #530 | Database Repos |
| #531 | REST service |
| #533 **(this)** | Analyzer
app |
| #534 | Scheduler app |
| #599 | Spark BatchedOFD app
|
| #tbd | Infra, docker-compose, smoke test |

## Summary

PR 3 of N in the optimizer stack.

Introduces `apps/optimizer-analyzer`, a Spring Boot CommandLineRunner
that evaluates every table in `table_stats` against pluggable
`OperationAnalyzer` strategies. The first strategy,
`OrphanFilesDeletionAnalyzer`, schedules OFD operations with 24h success
/ 1h failure retry cadence, a 6h SCHEDULED timeout, and a 5-strike
circuit breaker.

Key design choices:
- Bulk-loads operations and history into maps (one query per type), then
iterates the stats list — O(types) queries, not O(tables).
- Uses the existing generic `find()` repository methods with null
params.
- Pure unit tests with Mockito — no Spring context needed.

## Changes

- [ ] Client-facing API Changes
- [ ] Internal API Changes
- [ ] Bug Fixes
- [x] New Features
- [ ] Performance Improvements
- [ ] Code Style
- [ ] Refactoring
- [ ] Documentation
- [x] Tests

**Core**: `AnalyzerRunner` — loads table_stats, pre-loads operations and
history into maps, evaluates each table against all analyzers, circuit
breaker logic.

**Strategy interface**: `OperationAnalyzer` — `isEnabled(table)`,
`shouldSchedule(table, currentOp, latestHistory)`,
`getCircuitBreakerThreshold()`.

**Cadence policy**: `CadencePolicy` — encapsulates time-based retry
logic shared across operation types.

**OFD analyzer**: `OrphanFilesDeletionAnalyzer` — enabled via
`maintenance.optimizer.ofd.enabled` table property.

## Testing Done

- [ ] Manually Tested on local docker setup. Please include commands
ran, and their output.
- [x] Added new tests for the changes made.
- [ ] Updated existing tests to reflect the changes made.
- [ ] No tests added or updated. Please explain why. If unsure, please
feel free to ask for help.
- [ ] Some other form of testing like staging or soak time in
production. Please explain.

25 unit tests:
- `AnalyzerRunnerTest` (7 tests) — eligible table insertion, cadence
skip, disabled table, shouldSchedule=false, null UUID, circuit breaker
trip, below-threshold pass
- `OrphanFilesDeletionAnalyzerTest` (18 tests) — isEnabled variants,
shouldSchedule for no-op/PENDING/SCHEDULING/SCHEDULED with history
combinations

```
./gradlew :apps:optimizer-analyzer:test
# BUILD SUCCESSFUL — 25 tests pass
```

# Additional Information

- [ ] Breaking Changes
- [ ] Deprecations
- [x] Large PR broken into smaller PRs, and PR plan linked in the
description.

---------

Co-authored-by: mkuchenbecker <mkuchenbecker@users.noreply.github.com>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
Co-authored-by: Abhishek Nath <anath1@linkedin.com>
@abhisheknath2011 abhisheknath2011 marked this pull request as ready for review May 27, 2026 19:58
mkuchenbecker added a commit that referenced this pull request Jun 1, 2026
…m self-weights, functional FFD

Addresses the four review comments on the prior bin-refactor commit:

1. BinItem becomes an interface (`long getWeight()`). Each operation type
   ships its own impl that encodes its own cost model. The packer never
   imports operation DTOs; the impls do.

2. Bin/BinPacker/FirstFitDecreasingBinPacker are generic over T extends
   BinItem. Heterogeneous packers register in a
   `Map<OperationTypeDto, BinPacker<? extends BinItem>>` and the
   scheduler narrows the wildcard with one cast at the per-op-type
   dispatch boundary. Compile-time `T`-consistency end-to-end through
   the packer pipeline.

3. New `operations/ofd/OfdBinItem` (package parallel to scheduler) holds
   only what the dispatch needs: fqtn, operationId, weight. The
   weighting logic — file count from `TableStatsDto.snapshot.
   numCurrentFiles` — lives in a private static `currentFileCount` on
   the impl, fed by a static factory `OfdBinItem.from(op, stats)` so
   callers do
   `withStats.stream().map(op -> OfdBinItem.from(op, statsByUuid.
   get(op.getTableUuid())))`.

4. FirstFitDecreasingBinPacker.pack() is one stream pipeline:
   `items.stream().sorted(...).collect(ArrayList::new, this::placeItem,
   List::addAll)`. The inner first-fit search is
   `bins.stream().filter(b -> b.fits(...)).findFirst().
   ifPresentOrElse(...)`. No imperative for-loops; the fold maintains
   the running list of bins as its accumulator. Compiler enforces
   T-consistency across the pipeline.

5. Dropped `maxSizeBytesPerBin` entirely. OFD cost is per-file (list +
   manifest joins + delete calls); bytes don't add information. A 10 GB
   table with 100k files is more expensive to OFD than a 1 TB table
   with 2k files. Bin/Packer now carry just `maxWeightPerBin` +
   `maxItemsPerBin`. Other op types encode their own dimension in
   `getWeight()`; the packer needn't know.

6. OFD config keys back to human-readable per-op vocabulary:
   `optimizer.scheduler.ofd.max-files-per-bin` (file count) +
   `optimizer.scheduler.ofd.max-tables-per-bin` (table count). Env vars
   `SCHEDULER_OFD_MAX_FILES_PER_BIN` + `SCHEDULER_OFD_MAX_TABLES_PER_BIN`.
   SchedulerConfig translates these into the packer's
   `maxWeightPerBin` + `maxItemsPerBin`.

7. Refactored SchedulerRunner:
   - `Map<OperationTypeDto, BinPacker<? extends BinItem>>` registration
   - Switch by operation type narrows to BinPacker<OfdBinItem> with one
     suppressed unchecked cast (safe by registration invariant; comment
     calls out the OperationScheduler<T> handler factoring once a
     second op type lands)
   - `scheduleOfd(...)` builds `OfdBinItem` via the factory and dispatches
   - `submitOfdBin(Bin<OfdBinItem>)` claims, narrows to claimed-only via
     OfdBinItem.getOperationId, launches, marks SCHEDULED/PENDING — same
     orchestration as before, but typed `Bin<OfdBinItem>` end-to-end

Tests:
- FirstFitDecreasingBinPackerTest uses a local `TestItem implements
  BinItem` (no optimizer-domain imports in the binpack test).
  Byte-cap test removed; max-items, max-weight, FFD order, oversized,
  and zero-cap-disables-dimension all preserved.
- SchedulerRunnerTest mocks `BinPacker<OfdBinItem>` and stubs through
  a real FFD packer with unbounded caps so the runner's projection
  (op + stats → OfdBinItem) is exercised without bypassing Bin's
  package-private mutators.

Divergence from #599: Abhishek's `jobs.util.binpack.BinItem` is a
concrete struct with optimizer-aware identity fields baked in. Ours is
a contract (`long getWeight()`) with per-op impls. The "swap to his
lib by import rename" gimmick no longer applies — instead this PR
proposes the interface-based shape as the common lib, and #599 would
rebase to adopt it (or at minimum offer an interface alongside his
concrete struct). Discussed in PR #626 thread.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Comment thread apps/spark/build.gradle Outdated
Comment thread services/optimizer/build.gradle Outdated

@mkuchenbecker mkuchenbecker left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a couple comments. If the binpacker was just renamed and we align on reties im good to ship

mkuchenbecker
mkuchenbecker previously approved these changes Jun 8, 2026

@mkuchenbecker mkuchenbecker left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am interested in the ternary operator on boolean, but that is non-blocking.

@abhisheknath2011 abhisheknath2011 merged commit 00902b3 into linkedin:main Jun 8, 2026
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants