Integrate batched orphan files deletion with the existing schedule workflow#604
Open
abhisheknath2011 wants to merge 4 commits into
Open
Integrate batched orphan files deletion with the existing schedule workflow#604abhisheknath2011 wants to merge 4 commits into
abhisheknath2011 wants to merge 4 commits into
Conversation
| * <p>Construct with the {@link Config} builder to override the default timeouts. | ||
| */ | ||
| @Slf4j | ||
| public class OptimizerServiceClient implements AutoCloseable { |
| .build(); | ||
|
|
||
| Map<String, List<TableMetadata>> byDb = | ||
| eligible.stream().collect(Collectors.groupingBy(TableMetadata::getDbName)); |
Member
Author
There was a problem hiding this comment.
Group the tables by DBs.
| */ | ||
| @Slf4j | ||
| @Getter | ||
| public class BatchedTableOrphanFilesDeletionTask extends OperationTask<TableMetadataBatch> { |
Member
Author
There was a problem hiding this comment.
Task implementation for batch job submission.
6 tasks
17 tasks
abhisheknath2011
added a commit
that referenced
this pull request
Jun 8, 2026
## Summary This is a follow up of Optimizer series PRs (Previous PR #534). Introduces `BatchedOrphanFilesDeletionSparkApp`, the multi-table counterpart of the existing single-table `OrphanFilesDeletionSparkApp`. One Spark job now processes a list of `(table, operationId)` pairs that the optimizer scheduler bin-packed into a single batch, reporting SUCCESS/FAILED per operation back to the Optimizer Service. Stands up the **`optimizerclient` codegen module** so consumers (this app, and future ones) talk to the Optimizer Service via an auto-generated client rather than hand-rolled HTTP — mirrors the existing `:client:jobsclient`/`:client:tableclient` pattern. Lands a **first-fit-decreasing bin packer** that is used in the followup PR (#604 ) to assemble tables in batches. Also consolidated existing bin packer under this. ### Key design choices - **Per-table failure isolation** — exceptions in one table are caught, FAILED is posted for that operationId, and remaining tables continue. The job exits 0 if at least one table succeeds. - **Recoverable result reporting** — if `POST /v1/optimizer/operations/{id}/update` fails after retries, the row stays `SCHEDULED` and the Analyzer's stale-timeout will re-queue it. No retry storms in the Spark driver. - **Optional optimizer-service callbacks** — `--resultsEndpoint`/`--operationIds`/`--tableUuids` are all optional, so the legacy `JobsScheduler` can launch the app without optimizer integration. When absent, the per-operation callback is skipped; HTS `StateManager` still tracks per-job lifecycle. - **Scheduler decides parallelism, not the app** — `--driverParallelism` is honoured verbatim; the app never picks its own thread count. - **`MAX_BATCH_SIZE = 200`** — hard ceiling enforced at `buildEntries` (Spark side) so a misconfigured scheduler can't blow past `ARG_MAX`. Operating point stays `--batchMaxItems 25`. ### Optimizer client (codegen) - **`services:optimizer`** wired with the same `service-specgen-convention` + springdoc plugins as `services:jobs`/`services:tables`/`services:housetables`. Port 8003 (slots into the existing 8000/8001/8002 allocation). - **`client:optimizerclient`** — three-line `build.gradle` mirroring `client:jobsclient`; generates `com.linkedin.openhouse.optimizer.client.{api,model,invoker}` from the spec. - **`client:secureclient`** picks up `OptimizerApiClientFactory` (parallels `JobsApiClientFactory`) for SSL/auth-aware `ApiClient` construction. - **`OptimizerServiceClient`** in `apps:spark` is a thin wrapper around the generated `TableOperationsControllerApi`, mirroring `JobsClient`'s shape — `RetryTemplate` (new `RetryUtil.getOptimizerApiRetryTemplate()`), surface is `Optional<TableOperationsHistory> updateOperation(operationId, request)`. No more OkHttp, no more hand-rolled DTOs. ### Bin packer - `FirstFitDecreasingBinPacker` — FFD by weight with secondary caps on bytes and item count; oversized items get a dedicated bin. Reuses the existing `Bin`/`BinItem` types from #534. - FirstFitBinPacker.java and FirstFitBinPackerTest.java — deleted (renamed / consolidated). ### Batched Spark app - `BatchedOrphanFilesDeletionSparkApp` — extends `BaseSparkApp`; iterates entries via a fixed thread pool, reuses `Operations.deleteOrphanFiles(...)` per table, posts per-operation status, runs the existing `TableStateValidator` per table ### Additional changes **Service-side spec generation (`services/optimizer`)** - `build.gradle` — added `service-specgen-convention` + springdoc + processes plugins. `openApi.customBootRun` overrides the production MySQL DataSource with in-memory H2 at build time (overrides live in `build.gradle`, not a profile file, so they don't ship inside the fat jar). Port 8003. - `developmentOnly 'com.h2database:h2:2.1.210'` — H2 reaches `bootRun`/specgen but is excluded from `bootJar` and from `api`/`runtimeElements` propagation. Pinned to 2.1.210 to match the version `:iceberg:openhouse:htscatalog` already pulls in repo-wide and avoid the slf4j-2.x bump that 2.2.x would have introduced. **Client codegen (`client/optimizerclient`)** - New module (3-line `build.gradle`) generating `TableOperationsControllerApi`, `TableStatsControllerApi`, `TableOperationsHistoryControllerApi` + their DTOs. - `client/secureclient/OptimizerApiClientFactory.java` — SSL/auth-aware factory mirroring `JobsApiClientFactory`. - `settings.gradle` — registers `:client:optimizerclient`. **Bin packer (`libs/optimizer/binpack`)** - `FirstFitDecreasingBinPacker.java` (moved from apps/spark). Pre-projected `BinItem` inputs, weight + item-count caps, oversized items get their own bin. - `FirstFitDecreasingBinPackerTest.java` (moved + adapted to the libs `BinItem` interface). **Spark app (`apps/spark`)** - `spark/BatchedOrphanFilesDeletionSparkApp.java` — multi-table OFD with worker-thread pool, per-table failure isolation, `Iterables.size()`-based orphan count (bounded driver heap), `MAX_BATCH_SIZE=200` guard. - `spark/optimizer/OptimizerServiceClient.java` — wraps generated `TableOperationsControllerApi`. - `util/RetryUtil.java` — added `getOptimizerApiRetryTemplate()`. - `build.gradle` — depends on `:libs:optimizer:binpack` with a targeted `exclude` on `log4j-slf4j2-impl` (transitively brought by services:optimizer; conflicts with apps:spark's bundled `log4j-slf4j-impl` 1.x bridge). **Test using CLI:** --tableNames db.t1,db.t2,db.t3 --operationIds op-uuid-1,op-uuid-2,op-uuid-3 --tableUuids tab-uuid-1,tab-uuid-2,tab-uuid-3 --resultsEndpoint http://optimizer.svc:8080 --driverParallelism 4 plus existing OFD knobs (`--ttl`, `--backupDir`, `--concurrentDeletes`, `--streamResults`, `--maxOrphanFileSampleSize`). <!--- HINT: Replace #nnn with corresponding Issue number, if you are fixing an existing issue --> [Issue](https://github.com/linkedin/openhouse/issues/#nnn)] Briefly discuss the summary of the changes made in this pull request in 2-3 lines. ## Changes - [ ] Client-facing API Changes - [ ] Internal API Changes - [ ] Bug Fixes - [x] New Features - [ ] Performance Improvements - [ ] Code Style - [ ] Refactoring - [ ] Documentation - [x] Tests For all the boxes checked, please include additional details of the changes made in this pull request. ## Testing Done <!--- Check any relevant boxes with "x" --> - [ ] Manually Tested on local docker setup. Please include commands ran, and their output. - [x] Added new tests for the changes made. - [ ] Updated existing tests to reflect the changes made. - [ ] No tests added or updated. Please explain why. If unsure, please feel free to ask for help. - [ ] Some other form of testing like staging or soak time in production. Please explain. For all the boxes checked, include a detailed description of the testing done for the changes made in this pull request. # Additional Information - [ ] Breaking Changes - [ ] Deprecations - [ ] Large PR broken into smaller PRs, and PR plan linked in the description. **Open items for reviewers:** - `OptimizerServiceClient` is add on top of the generated `TableOperationsControllerApi` — matches `JobsClient`'s pattern. Fine, or should it live inside `:client:optimizerclient` as a higher-level facade? - Apps:spark currently has to `exclude log4j-slf4j2-impl` when depending on `:libs:optimizer:binpack` because services:optimizer transitively brings the slf4j-2.x bridge while apps:spark ships the 1.x bridge. A repo-wide bridge-alignment cleanup would let us drop this exclude; happy to file a separate issue. - The bin packer ends up as two siblings in libs (`FirstFitBinPacker` — optimizer-coupled; `FirstFitDecreasingBinPacker` — agnostic). Consolidation is done in FirstFitDecreasingBinPacker. - Builder defaults (maxWeightPerBin = 1_000_000L, maxItemsPerBin = 50) are sized for OFD. If we expect other operation types with materially different cost shapes, we may want to move the defaults out of the packer class and into the per-operation config (Spring @value already provides this in SchedulerConfig). For all the boxes checked, include additional details of the changes made in this pull request.
ae1fdff to
33a4bcd
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Wires the new
BatchedOrphanFilesDeletionSparkApp(from #599) into the existingJobsSchedulerflow so operators can amortize Spark startup over many tables today, without waiting for the optimizer-service stack to land. Batched OFD is opt-in via a new JobType — the existing single-tableORPHAN_FILES_DELETIONpath is untouched.How it works
Key design choices
headroom for the spark-submit envelope. Enforced at both the Spark app boundary (buildEntries) and the scheduler boundary (fail-fast on --batchMaxItems > 200).
Issue] Briefly discuss the summary of the changes made in this
pull request in 2-3 lines.
Changes
Service contract (services/jobs)
Scheduler (apps/spark)
buildOperationTaskListInParallel (bulk-enqueue path because batching needs all metadata in hand first).
Spark app changes from #599
Rollout plan
For all the boxes checked, please include additional details of the changes made in this pull request.
Testing Done
Tested on local docker
For all the boxes checked, include a detailed description of the testing done for the changes made in this pull request.
Additional Information
For all the boxes checked, include additional details of the changes made in this pull request.