feat(optimizer): [4/N] Scheduler app#534
Conversation
The empty @configuration class did nothing. @SpringBootApplication on AnalyzerApplication already triggers @componentscan, which discovers all @Component-annotated beans without help. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The circuit breaker was hardcoded (threshold=5, no reset, no operator visibility) and forced the AnalyzerRunner to materialize the full history of every (table, operation_type) just to check the last N rows. Cadence policy only needs the single latest history entry; pulling everything was wasted I/O. Changes: - Remove getCircuitBreakerThreshold and isCircuitBroken from OperationAnalyzer. - Add a TODO documenting requirements for the eventual replacement (configurable threshold, exponential-backoff reset, operator-visible signal). - In AnalyzerRunner, fold history loading into a per-(uuid, type) map holding only the most-recent entry; drop the per-table history list and the isCircuitBroken call. - Add a TODO to switch the history scan to a windowed query that returns at most one row per (uuid, type). - Drop the two circuit-breaker tests from AnalyzerRunnerTest. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The Analyzer evaluates cadence using only the most-recent history row per (table_uuid, operation_type); pulling the full history scan per analyzer pass is wasted I/O. Add a dedicated query that returns at most one row per (table_uuid, operation_type), restricted to a single operation type. The query uses a correlated MAX subquery for portability across MySQL and H2. For large history volume, a (operation_type, table_uuid, completed_at) index on the schema would make the subquery index-only; TODO noted in javadoc. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Switch the AnalyzerRunner from scanning every history row per analyzer pass to the dedicated findLatestPerTable query (added in apps/optimizer). The analyzer only consumes the latest entry per (table_uuid, operation_type); the previous full-history scan was bounded but unnecessary I/O. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…nce + TableOperation The analyzer was using raw Strings everywhere for operation type and status. Per-layer types: introduce analyzer-internal OperationType and OperationStatus enums in apps/optimizer-analyzer/model and convert at the entity boundary. The wire API (services/optimizer/api/model) and DB columns (apps/optimizer entity rows) keep their own representations and are unaffected. Changes: - New enums OperationType and OperationStatus in the analyzer model package. - TableOperation: operationType and status become enums. from(row) parses the String columns; toRow() emits .name() back. from() and pending() share a private build() factory. - TableOperation javadocs: drop "denormalized for display" wording. - OperationAnalyzer.getOperationType returns OperationType. - AnalyzerRunner: filter parameter and per-type maps are keyed on OperationType; calls to repos still pass the String .name(). - CadencePolicy.shouldSchedule: switch on OperationStatus is exhaustive (now including CANCELED), unknown values throw IllegalStateException, and the SCHEDULED branch has an inline comment explaining the two cases. - OrphanFilesDeletionAnalyzer: returns the enum. - Tests updated to construct enum values; OFD test helper takes the enum. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…denceBasedOrphanFilesDeletionAnalyzer The class composes CadencePolicy and is one of potentially many strategies (volume-based, schema-aware, etc.) we could write later for the same operation type. Encode the scheduling driver in the class name so the distinction is visible at registration. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The rename in the previous commit moved the files but did not change the class identifiers inside. Update both class declarations and the constructor calls in the test to match the new file name. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Add idx_toph_optype_uuid_completed (operation_type, table_uuid, completed_at) on table_operations_history. TableOperationHistoryRepository.findLatestPerTable uses a correlated MAX(completed_at) subquery; without this index it degenerates to O(N²) and does not complete at 1M-row history scale. With it the inner subquery becomes an index-only lookup per outer row. Update the repo method's javadoc to point at the new index by name and drop the resolved TODO. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Capture optimizations (a)–(m) discussed for scaling AnalyzerRunner past ~100k tables, the failure modes that trigger each, and which items have already landed. (d) — the table_operations_history index on (operation_type, table_uuid, completed_at) — landed on optimizer-1 and is noted inline. The remaining items stay queued. Also tighten the class javadoc: drop the misleading "safe at any realistic scale" wording and the two prior inline TODOs (now subsumed by the block comment), point readers at the roadmap. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The (a)–(m) optimization roadmap was a ~75-line block comment on top of
AnalyzerRunner. The content now lives in Jira BDP-102182 ("Optimizer
analyzer: scale past 100k tables"), where it can be tracked, assigned, and
broken into work without churning the source. Replace the block comment
with a short class-javadoc pointer.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Enables per-database iteration in the analyzer. Returns the bounded set of database_name values present in table_stats; the analyzer uses it to drive the outer loop when no specific databaseName filter is supplied. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…e layer OptimizerDataService's filter-style methods (listTableOperations, listTableStats, getStatsHistory, listHistory) accepted nullable strings/ enums to mean "no filter". Switch to Optional<T> at the service boundary; controllers wrap their nullable @RequestParam values via Optional.ofNullable. The implementation unwraps via .orElse(null) at the JPA repo call site — the @query "IS NULL OR ..." pattern is idiomatic with nullable parameters and stays unchanged. No behavior change. No tests required updating. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Apply the reviewer's full suggestion (#534): SchedulerApplication implements CommandLineRunner + ExitCodeGenerator directly, wraps the work in try/catch, tracks exitCode, and reports it via getExitCode(). SpringApplication.exit propagates that to System.exit so the k8s CronJob pod status reflects batch outcome. Removes the prior @bean CommandLineRunner. Also adds spring.main.banner-mode=off per the suggestion. Verified with the boot jar: - empty H2 schema (runner throws) -> caught, JPA pool drained, exit 1 - schema preloaded, no PENDING ops -> exit 0 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Layout: bin packing now lives in scheduler.binpack as a self-contained utility; the scheduler depends on the abstraction, not OFD specifics. - BinItem: data interface (weight, fqtn, operationId) + fromOpAndStats factory; immutable populated instances - BinPacker: pack(ops, statsByUuid) -> List<List<BinItem>>; op-type agnostic; the scheduler wraps each grouping into a Bin - FirstFitBinPacker<T extends BinItem>: concrete generic; Supplier<T> constructs each empty instance; FFD bucketing internal - TotalFilesBinItem: file-count-weighted BinItem; one all-args ctor + no-arg constructor for Supplier use - SchedulerRunner: immutable registry (Map.copyOf); registerOperation returns a new runner; no @component — SchedulerConfig produces the bean and chains the registration so the published instance is fully-registered - Tests: drive op-type from the model (.toDb() at matcher sites), drop abbreviation aliases, exercise FFD via TestItem + production projection via TotalFilesBinItemTest Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
|
I thought though this deeply: Scheduler operation is as follows: We register a FirstFitBinPacker bin packer for OFD or any operation that scales based on files size. There might be a singleton bin packer for operations we can't safely pack. Another could be based on table size, or take database into account. The Bin Packer interface just cares that you can convert a TableStat to BinItem. By making a FirstFitBinPacker we are delegating the conversion to the generic type. The pattern is basically "A bin item is valid if if can be constructed from stats" and we can evolve that definition over time. |
- Inline loadAndDedupPending, cancelDuplicates, loadStatsByUuid into schedule(). Single-use helpers fragment the read; the flow is short enough to live linearly. - Split toSchedule and toCancel as two independent derivations of the same per-tableUuid grouping. Scheduling is no longer expressed as "what's left after canceling" — it's the oldest per group, full stop. Cancellation runs as its own side effect, unread by the scheduling path. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Drop List<BinItem> items = bin.getItems() and OperationTypeDto type = bin.getOperationType(); use bin.getItems() / bin.getOperationType() at the call sites. The aliases saved ~10 chars but split context across the function and would diverge if bin or a shadow local ever moved. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
The comment claimed paging would drop work past page 0 — false, the next cycle reads what page 0 missed. It also referenced the paged alternative the code does not take, which the project style file calls out as the first comment anti-pattern. Pageable.unpaged() reads as written. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
…oSchedule - TableOperationsRepository.cancel(): default wrapper short-circuits on empty ids -> 0. cancelInternal carries the @query. - SchedulerRunner: inline the toCancel derivation directly into cancel(...); drop the if-non-empty guard. Drop the uuidsToSchedule local — statsRepo.findAllById(byTableUuid.keySet()) is the direct expression. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Per PR feedback — stop caching values used once. - pending: chain operationsRepo.find(...) straight into .groupingBy. - cancelled / log: call operationsRepo.cancel(...) for its side effect; no int capture, no warn line. Empty list is now a no-op at the repo. - toSchedule: inline the derivation into packer.pack(...). - bins: inline; pipe straight to forEach(this::scheduleBin). - "no PENDING" and "Packed N into M" log lines: drop. Empty inputs flow through the pipelines naturally; observability comes from the packer's own log and the per-bin scheduleBin log. - Unknown op type now silently skips scheduling via Optional.ofNullable(registry.get(type)).ifPresent(...) — cleanup still runs. Test renamed and rewritten to verify the no-launch contract. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
- Revert TableOperationsRepository: cancel goes back to a single @query method, no default wrapper. Empty-list guard is a scheduler concern, not a repo concern. - SchedulerRunner: local BiConsumer<TableOperationsRepository, List<String>> named cancelIfNonEmpty, defined inside schedule(). No return. - Inline statsByUuid into packer.pack(...); no intermediate locals beyond byTableUuid (used 3x) and oldestFirst (used 2x). - Test stub for statsRepo.findAllById removed from the unknown-op-type case (no packer means ifPresent body never runs). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Drop the BiConsumer + .accept(...) wrapping. The dedup pipeline ends in a single collectingAndThen whose finisher runs the empty-guarded cancel as a side effect and returns null. Whole thing is one expression — no helper local, no explicit if-check at the call site. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
- New private claim(Bin, Instant) helper: CAS-claims via updateBatch,
finds owned rows by watermark, returns the surviving BinItems. Captures
the IO-boundary unit; ids/claimedSet locals live inside it.
- scheduleBin: jobsClient.launch(...).ifPresentOrElse(...) replaces the
if/else on Optional. Success branch marks SCHEDULED + logs; absent
branch reverts to PENDING + warns.
- Inline jobName as String.format("batched-%s-%d", ...); inline tableNames
into the launch arg list; replace operationIds with claimedIds (one
list, used in launch + both branches).
- Drop HashSet import; claim uses Collectors.toSet().
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Per PR #534 review (abhisheknath2011): bin-packing utilities are common infrastructure, not scheduler-internal. Moving them under libs/ lets other modules consume them without depending on the scheduler service. - New module :libs:optimizer:binpack (libs/optimizer/binpack/) with java-conventions + maven-publish; api on :services:optimizer for the TableOperationDto / TableStatsDto types the BinPacker interface uses. - Move Bin, BinItem, BinPacker, FirstFitBinPacker, TotalFilesBinItem (and their two tests) from services/optimizer/scheduler/.../scheduler/binpack/ to libs/optimizer/binpack/.../optimizer/binpack/. - Package rename: com.linkedin.openhouse.optimizer.scheduler.binpack -> com.linkedin.openhouse.optimizer.binpack. Updated all imports in SchedulerRunner, SchedulerConfig, SchedulerRunnerTest. - :services:optimizer:scheduler depends on the new module via api. Also addresses the PR #628 pattern for the scheduler half: apply 'openhouse.maven-publish' to :services:optimizer:scheduler, :apps:optimizer:schedulerapp, and the new :libs:optimizer:binpack so all three publish to JFrog alongside the all-modules aggregate. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
abhisheknath2011
left a comment
There was a problem hiding this comment.
Thanks @mkuchenbecker for addressing the comments!
## Summary This is a follow up of Optimizer series PRs (Previous PR #534). Introduces `BatchedOrphanFilesDeletionSparkApp`, the multi-table counterpart of the existing single-table `OrphanFilesDeletionSparkApp`. One Spark job now processes a list of `(table, operationId)` pairs that the optimizer scheduler bin-packed into a single batch, reporting SUCCESS/FAILED per operation back to the Optimizer Service. Stands up the **`optimizerclient` codegen module** so consumers (this app, and future ones) talk to the Optimizer Service via an auto-generated client rather than hand-rolled HTTP — mirrors the existing `:client:jobsclient`/`:client:tableclient` pattern. Lands a **first-fit-decreasing bin packer** that is used in the followup PR (#604 ) to assemble tables in batches. Also consolidated existing bin packer under this. ### Key design choices - **Per-table failure isolation** — exceptions in one table are caught, FAILED is posted for that operationId, and remaining tables continue. The job exits 0 if at least one table succeeds. - **Recoverable result reporting** — if `POST /v1/optimizer/operations/{id}/update` fails after retries, the row stays `SCHEDULED` and the Analyzer's stale-timeout will re-queue it. No retry storms in the Spark driver. - **Optional optimizer-service callbacks** — `--resultsEndpoint`/`--operationIds`/`--tableUuids` are all optional, so the legacy `JobsScheduler` can launch the app without optimizer integration. When absent, the per-operation callback is skipped; HTS `StateManager` still tracks per-job lifecycle. - **Scheduler decides parallelism, not the app** — `--driverParallelism` is honoured verbatim; the app never picks its own thread count. - **`MAX_BATCH_SIZE = 200`** — hard ceiling enforced at `buildEntries` (Spark side) so a misconfigured scheduler can't blow past `ARG_MAX`. Operating point stays `--batchMaxItems 25`. ### Optimizer client (codegen) - **`services:optimizer`** wired with the same `service-specgen-convention` + springdoc plugins as `services:jobs`/`services:tables`/`services:housetables`. Port 8003 (slots into the existing 8000/8001/8002 allocation). - **`client:optimizerclient`** — three-line `build.gradle` mirroring `client:jobsclient`; generates `com.linkedin.openhouse.optimizer.client.{api,model,invoker}` from the spec. - **`client:secureclient`** picks up `OptimizerApiClientFactory` (parallels `JobsApiClientFactory`) for SSL/auth-aware `ApiClient` construction. - **`OptimizerServiceClient`** in `apps:spark` is a thin wrapper around the generated `TableOperationsControllerApi`, mirroring `JobsClient`'s shape — `RetryTemplate` (new `RetryUtil.getOptimizerApiRetryTemplate()`), surface is `Optional<TableOperationsHistory> updateOperation(operationId, request)`. No more OkHttp, no more hand-rolled DTOs. ### Bin packer - `FirstFitDecreasingBinPacker` — FFD by weight with secondary caps on bytes and item count; oversized items get a dedicated bin. Reuses the existing `Bin`/`BinItem` types from #534. - FirstFitBinPacker.java and FirstFitBinPackerTest.java — deleted (renamed / consolidated). ### Batched Spark app - `BatchedOrphanFilesDeletionSparkApp` — extends `BaseSparkApp`; iterates entries via a fixed thread pool, reuses `Operations.deleteOrphanFiles(...)` per table, posts per-operation status, runs the existing `TableStateValidator` per table ### Additional changes **Service-side spec generation (`services/optimizer`)** - `build.gradle` — added `service-specgen-convention` + springdoc + processes plugins. `openApi.customBootRun` overrides the production MySQL DataSource with in-memory H2 at build time (overrides live in `build.gradle`, not a profile file, so they don't ship inside the fat jar). Port 8003. - `developmentOnly 'com.h2database:h2:2.1.210'` — H2 reaches `bootRun`/specgen but is excluded from `bootJar` and from `api`/`runtimeElements` propagation. Pinned to 2.1.210 to match the version `:iceberg:openhouse:htscatalog` already pulls in repo-wide and avoid the slf4j-2.x bump that 2.2.x would have introduced. **Client codegen (`client/optimizerclient`)** - New module (3-line `build.gradle`) generating `TableOperationsControllerApi`, `TableStatsControllerApi`, `TableOperationsHistoryControllerApi` + their DTOs. - `client/secureclient/OptimizerApiClientFactory.java` — SSL/auth-aware factory mirroring `JobsApiClientFactory`. - `settings.gradle` — registers `:client:optimizerclient`. **Bin packer (`libs/optimizer/binpack`)** - `FirstFitDecreasingBinPacker.java` (moved from apps/spark). Pre-projected `BinItem` inputs, weight + item-count caps, oversized items get their own bin. - `FirstFitDecreasingBinPackerTest.java` (moved + adapted to the libs `BinItem` interface). **Spark app (`apps/spark`)** - `spark/BatchedOrphanFilesDeletionSparkApp.java` — multi-table OFD with worker-thread pool, per-table failure isolation, `Iterables.size()`-based orphan count (bounded driver heap), `MAX_BATCH_SIZE=200` guard. - `spark/optimizer/OptimizerServiceClient.java` — wraps generated `TableOperationsControllerApi`. - `util/RetryUtil.java` — added `getOptimizerApiRetryTemplate()`. - `build.gradle` — depends on `:libs:optimizer:binpack` with a targeted `exclude` on `log4j-slf4j2-impl` (transitively brought by services:optimizer; conflicts with apps:spark's bundled `log4j-slf4j-impl` 1.x bridge). **Test using CLI:** --tableNames db.t1,db.t2,db.t3 --operationIds op-uuid-1,op-uuid-2,op-uuid-3 --tableUuids tab-uuid-1,tab-uuid-2,tab-uuid-3 --resultsEndpoint http://optimizer.svc:8080 --driverParallelism 4 plus existing OFD knobs (`--ttl`, `--backupDir`, `--concurrentDeletes`, `--streamResults`, `--maxOrphanFileSampleSize`). <!--- HINT: Replace #nnn with corresponding Issue number, if you are fixing an existing issue --> [Issue](https://github.com/linkedin/openhouse/issues/#nnn)] Briefly discuss the summary of the changes made in this pull request in 2-3 lines. ## Changes - [ ] Client-facing API Changes - [ ] Internal API Changes - [ ] Bug Fixes - [x] New Features - [ ] Performance Improvements - [ ] Code Style - [ ] Refactoring - [ ] Documentation - [x] Tests For all the boxes checked, please include additional details of the changes made in this pull request. ## Testing Done <!--- Check any relevant boxes with "x" --> - [ ] Manually Tested on local docker setup. Please include commands ran, and their output. - [x] Added new tests for the changes made. - [ ] Updated existing tests to reflect the changes made. - [ ] No tests added or updated. Please explain why. If unsure, please feel free to ask for help. - [ ] Some other form of testing like staging or soak time in production. Please explain. For all the boxes checked, include a detailed description of the testing done for the changes made in this pull request. # Additional Information - [ ] Breaking Changes - [ ] Deprecations - [ ] Large PR broken into smaller PRs, and PR plan linked in the description. **Open items for reviewers:** - `OptimizerServiceClient` is add on top of the generated `TableOperationsControllerApi` — matches `JobsClient`'s pattern. Fine, or should it live inside `:client:optimizerclient` as a higher-level facade? - Apps:spark currently has to `exclude log4j-slf4j2-impl` when depending on `:libs:optimizer:binpack` because services:optimizer transitively brings the slf4j-2.x bridge while apps:spark ships the 1.x bridge. A repo-wide bridge-alignment cleanup would let us drop this exclude; happy to file a separate issue. - The bin packer ends up as two siblings in libs (`FirstFitBinPacker` — optimizer-coupled; `FirstFitDecreasingBinPacker` — agnostic). Consolidation is done in FirstFitDecreasingBinPacker. - Builder defaults (maxWeightPerBin = 1_000_000L, maxItemsPerBin = 50) are sized for OFD. If we expect other operation types with materially different cost shapes, we may want to move the defaults out of the packer class and into the per-operation config (Spring @value already provides this in SchedulerConfig). For all the boxes checked, include additional details of the changes made in this pull request.
Optimizer Stack
Summary
PR 4 of N in the optimizer stack.
Introduces
apps/optimizer-scheduler, a Spring Boot CommandLineRunner that claims PENDING operations and submits batched Spark jobs via the Jobs Service.State machine:
Analyzer creates all Operations as PENDING
Changes
Scheduler runner: Loads PENDING ops, bin-packs by file count, claims via two-step CAS (PENDING → SCHEDULING → SCHEDULED), submits one Spark job per bin.
Bin packer: Greedy first-fit descending algorithm. Oversized tables get their own bin (never dropped). Tables with no stats default to cost 0.
Jobs client: WebClient-based REST client submitting
POST /jobsto the Jobs Service with table names, operation IDs, and results endpoint.Repository additions: Three
@ModifyingCAS methods onTableOperationsRepository—cancelDuplicatePending,markScheduling,markScheduled— required for safe concurrent scheduling.Testing Done
13 unit tests:
BinPackerTest(7 tests) — empty input, single table, under/over limit, oversized table, no stats, descending sortSchedulerRunnerTest(6 tests) — no pending ops, two-step claim + schedule, launch failure, already-claimed skip, duplicate cancellation, multi-row bin claimAdditional Information