Skip to content

Integrate batched orphan files deletion with the existing schedule workflow#604

Open
abhisheknath2011 wants to merge 4 commits into
linkedin:mainfrom
abhisheknath2011:batched-tables
Open

Integrate batched orphan files deletion with the existing schedule workflow#604
abhisheknath2011 wants to merge 4 commits into
linkedin:mainfrom
abhisheknath2011:batched-tables

Conversation

@abhisheknath2011

@abhisheknath2011 abhisheknath2011 commented May 27, 2026

Copy link
Copy Markdown
Member

Summary

Wires the new BatchedOrphanFilesDeletionSparkApp (from #599) into the existing JobsScheduler flow so operators can amortize Spark startup over many tables today, without waiting for the optimizer-service stack to land. Batched OFD is opt-in via a new JobType — the existing single-table ORPHAN_FILES_DELETION path is untouched.

How it works

java JobsScheduler \
  --type ORPHAN_FILES_DELETION_BATCH \
  --tablesURL ... --jobsURL ... --cluster ... \
  --batchMaxItems 25
  1. JobsScheduler fetches all eligible tables (existing fetch path).
  2. New builder method groups by database (batches never cross databases) and runs FirstFitDecreasingBinPacker per group with an item-count cap from --batchMaxItems (default 25).
  3. Each bin becomes one BatchedTableOrphanFilesDeletionTask whose getArgs() returns ["--tableNames", "db.t1,db.t2,…"].
  4. The Jobs Service maps the new ORPHAN_FILES_DELETION_BATCH JobType to BatchedOrphanFilesDeletionSparkApp via jobs.yaml.
  5. The Spark app processes each table in a worker-thread pool, reusing Operations.deleteOrphanFiles(...) per table.

Key design choices

  • Additive, opt-in. Existing ORPHAN_FILES_DELETION JobType + OrphanFilesDeletionSparkApp keep working unchanged. Operators choose mode at scheduler launch via --type.
  • No optimizer service required. BatchedOrphanFilesDeletionSparkApp now treats --resultsEndpoint, --operationIds, --tableUuids as optional. When absent (the legacy scheduler path), the per-operation optimizer callback is skipped entirely; HTS StateManager still tracks the Spark job's lifecycle as before.
  • Batches never cross databases. Grouping happens before bin-packing, not after.
  • Item-count cap only for now. Weight (numCurrentFiles) and sizeBytes dimensions are disabled in this PR — the legacy scheduler doesn't have table_stats. Future PR can layer in Iceberg-snapshot-based sizing without touching the wire.
  • MAX_BATCH_SIZE = 200 hard ceiling. Footgun stop, not the operating point. At ~120 chars per entry (3 parallel CSVs of 36-char UUIDs in the optimizer path), 200 tables ≈ 24 KB on the command line — well under Linux ARG_MAX = 128 KB with
    headroom for the spark-submit envelope. Enforced at both the Spark app boundary (buildEntries) and the scheduler boundary (fail-fast on --batchMaxItems > 200).

Issue] Briefly discuss the summary of the changes made in this
pull request in 2-3 lines.

Changes

  • Client-facing API Changes
  • Internal API Changes
  • Bug Fixes
  • New Features
  • Performance Improvements
  • Code Style
  • Refactoring
  • Documentation
  • Tests

Service contract (services/jobs)

  • JobConf.java — added ORPHAN_FILES_DELETION_BATCH to JobType. Codegen regenerates JobConf.JobTypeEnum in client/jobsclient automatically.

Scheduler (apps/spark)

  • util/TableMetadataBatch.java (new) — Metadata subclass: dbName + List. getEntityName() returns "db[N]" for metrics granularity.
  • scheduler/tasks/BatchedTableOrphanFilesDeletionTask.java (new) — OperationTask; auto-discovered by JobsScheduler's reflection registry. jobName = "ORPHAN_FILES_DELETION_BATCH__".
  • scheduler/tasks/OperationTasksBuilder.java — new prepareBatchedOrphanFilesDeletionTaskList: filter eligible → groupBy(dbName) → FFD pack → emit one task per bin. Wired into both buildOperationTaskList (sync) and
    buildOperationTaskListInParallel (bulk-enqueue path because batching needs all metadata in hand first).
  • scheduler/JobsScheduler.java — --batchMaxItems CLI option threaded through getAdditionalProperties.

Spark app changes from #599

  • --resultsEndpoint, --operationIds, --tableUuids now optional. BatchEntry.operationId/tableUuid nullable. newOptimizerClient() returns null when endpoint is absent; reportResult() short-circuits.
  • MAX_BATCH_SIZE = 200 constant + guards.

Rollout plan

  • Phase 1 — merge this PR. ORPHAN_FILES_DELETION_BATCH is registered but no scheduler is launched with it. Zero behavior change for existing deployments.
  • Phase 2 — pick one cluster + a small database set, launch a parallel scheduler with --type ORPHAN_FILES_DELETION_BATCH --batchMaxItems 5. Compare success rate and total runtime against the single-table scheduler.
  • Phase 3 — raise --batchMaxItems (10 → 25 → 50) and broaden DB coverage. Drop the single-table scheduler for OFD once the batched one matches or beats it.
  • Rollback at any time: flip the scheduler back to --type ORPHAN_FILES_DELETION. Both paths coexist.

For all the boxes checked, please include additional details of the changes made in this pull request.

Testing Done

  • Manually Tested on local docker setup. Please include commands ran, and their output.
  • Added new tests for the changes made.
  • Updated existing tests to reflect the changes made.
  • No tests added or updated. Please explain why. If unsure, please feel free to ask for help.
  • Some other form of testing like staging or soak time in production. Please explain.

Tested on local docker

anath1@anath1-mn4233 oh-hadoop-spark % docker compose --profile with_jobs_scheduler run openhouse-jobs-scheduler - \
      --type ORPHAN_FILES_DELETION_BATCH --cluster local \
      --tablesURL http://openhouse-tables:8080 --jobsURL http://openhouse-jobs:8080 \
      --batchMaxItems 5 \
      --tableMinAgeThresholdHours 0 --taskPollIntervalMs 5000
WARN[0000] /Users/anath1/IdeaProjects/openhouse/infra/recipes/docker-compose/oh-hadoop-spark/docker-compose.yml: the attribute `version` is obsolete, it will be ignored, please remove it to avoid potential confusion 
[+] Creating 9/9
 ✔ Container local.jaeger                  Running                                                                                                                                                                                             0.0s 
 ✔ Container oh-hadoop-spark-prometheus-1  Running                                                                                                                                                                                             0.0s 
 ✔ Container local.datanode                Running                                                                                                                                                                                             0.0s 
 ✔ Container local.opa                     Running                                                                                                                                                                                             0.0s 
 ✔ Container local.mysql                   Running                                                                                                                                                                                             0.0s 
 ✔ Container local.namenode                Running                                                                                                                                                                                             0.0s 
 ✔ Container local.openhouse-housetables   Running                                                                                                                                                                                             0.0s 
 ✔ Container local.openhouse-tables        Running                                                                                                                                                                                             0.0s 
 ✔ Container local.openhouse-jobs          Running                                                                                                                                                                                             0.0s 
2026-06-11 02:17:38 INFO  Reflections:219 - Reflections took 67 ms to scan 1 urls, producing 8 keys and 20 values
2026-06-11 02:17:38 INFO  JobsScheduler:188 - Starting scheduler
2026-06-11 02:17:38 INFO  WebClientFactory:121 - Using connection pool strategy
2026-06-11 02:17:38 INFO  WebClientFactory:218 - Creating custom connection provider
2026-06-11 02:17:38 INFO  WebClientFactory:196 - Client session id: 787d65ac-dfe4-4937-9ac0-fc3dd263adfb
2026-06-11 02:17:38 INFO  WebClientFactory:209 - Client name: null
2026-06-11 02:17:38 INFO  WebClientFactory:121 - Using connection pool strategy
2026-06-11 02:17:38 INFO  WebClientFactory:218 - Creating custom connection provider
2026-06-11 02:17:38 INFO  WebClientFactory:196 - Client session id: 3f74cbee-4e92-4211-8edd-d49dd6bc240a
2026-06-11 02:17:38 INFO  WebClientFactory:209 - Client name: null
2026-06-11 02:17:38 INFO  DefaultOtelConfig:79 - initializing open-telemetry sdk (no agent detected)
2026-06-11 02:17:38 INFO  JobsScheduler:301 - Fetching task list based on the job type: ORPHAN_FILES_DELETION_BATCH
2026-06-11 02:17:39 INFO  OperationTasksBuilder:101 - Fetched metadata for 12 batched-OFD-eligible tables; binMaxItems=5
2026-06-11 02:17:39 INFO  FirstFitDecreasingBinPacker:81 - Packed 5 pre-projected items into 1 groupings
2026-06-11 02:17:39 INFO  FirstFitDecreasingBinPacker:81 - Packed 5 pre-projected items into 1 groupings
2026-06-11 02:17:39 INFO  FirstFitDecreasingBinPacker:81 - Packed 2 pre-projected items into 1 groupings
2026-06-11 02:17:39 INFO  OperationTasksBuilder:134 - Packed 12 eligible tables into 3 batches
2026-06-11 02:17:39 INFO  OperationTasksBuilder:238 - Found metadata TableMetadataBatch(super=Metadata(creator=), dbName=d1, tables=[TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d1, tableName=t1, creationTimeMs=1781142262532, isPrimary=true, isTimePartitioned=true, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null), TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d1, tableName=t2, creationTimeMs=1781142286398, isPrimary=true, isTimePartitioned=true, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null), TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d1, tableName=t3, creationTimeMs=1781142305610, isPrimary=true, isTimePartitioned=true, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null), TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d1, tableName=t4, creationTimeMs=1781142316354, isPrimary=true, isTimePartitioned=true, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null), TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d1, tableName=t5, creationTimeMs=1781142326737, isPrimary=true, isTimePartitioned=true, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null)])
2026-06-11 02:17:39 INFO  OperationTasksBuilder:238 - Found metadata TableMetadataBatch(super=Metadata(creator=), dbName=d2, tables=[TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d2, tableName=t10, creationTimeMs=1781142392651, isPrimary=true, isTimePartitioned=true, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null), TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d2, tableName=t6, creationTimeMs=1781142352669, isPrimary=true, isTimePartitioned=true, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null), TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d2, tableName=t7, creationTimeMs=1781142364195, isPrimary=true, isTimePartitioned=true, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null), TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d2, tableName=t8, creationTimeMs=1781142375093, isPrimary=true, isTimePartitioned=true, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null), TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d2, tableName=t9, creationTimeMs=1781142384531, isPrimary=true, isTimePartitioned=true, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null)])
2026-06-11 02:17:39 INFO  OperationTasksBuilder:238 - Found metadata TableMetadataBatch(super=Metadata(creator=), dbName=d3, tables=[TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d3, tableName=t2, creationTimeMs=1774646046137, isPrimary=true, isTimePartitioned=false, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null), TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d3, tableName=t3, creationTimeMs=1775245640368, isPrimary=true, isTimePartitioned=false, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null)])
2026-06-11 02:17:39 INFO  JobsScheduler:370 - Submitting and running 3 jobs based on the job type: ORPHAN_FILES_DELETION_BATCH
2026-06-11 02:17:39 INFO  OperationTask:149 - Launching job for TableMetadataBatch(super=Metadata(creator=), dbName=d2, tables=[TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d2, tableName=t10, creationTimeMs=1781142392651, isPrimary=true, isTimePartitioned=true, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null), TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d2, tableName=t6, creationTimeMs=1781142352669, isPrimary=true, isTimePartitioned=true, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null), TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d2, tableName=t7, creationTimeMs=1781142364195, isPrimary=true, isTimePartitioned=true, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null), TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d2, tableName=t8, creationTimeMs=1781142375093, isPrimary=true, isTimePartitioned=true, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null), TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d2, tableName=t9, creationTimeMs=1781142384531, isPrimary=true, isTimePartitioned=true, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null)])
2026-06-11 02:17:39 INFO  OperationTask:149 - Launching job for TableMetadataBatch(super=Metadata(creator=), dbName=d3, tables=[TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d3, tableName=t2, creationTimeMs=1774646046137, isPrimary=true, isTimePartitioned=false, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null), TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d3, tableName=t3, creationTimeMs=1775245640368, isPrimary=true, isTimePartitioned=false, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null)])
2026-06-11 02:17:39 INFO  OperationTask:149 - Launching job for TableMetadataBatch(super=Metadata(creator=), dbName=d1, tables=[TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d1, tableName=t1, creationTimeMs=1781142262532, isPrimary=true, isTimePartitioned=true, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null), TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d1, tableName=t2, creationTimeMs=1781142286398, isPrimary=true, isTimePartitioned=true, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null), TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d1, tableName=t3, creationTimeMs=1781142305610, isPrimary=true, isTimePartitioned=true, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null), TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d1, tableName=t4, creationTimeMs=1781142316354, isPrimary=true, isTimePartitioned=true, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null), TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d1, tableName=t5, creationTimeMs=1781142326737, isPrimary=true, isTimePartitioned=true, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null)])
2026-06-11 02:17:40 INFO  OperationTask:167 - Launched a job with id ORPHAN_FILES_DELETION_BATCH_d2_5_eae0ba6d-bff8-4f85-ad33-4a4f399e93c3 for TableMetadataBatch(super=Metadata(creator=), dbName=d2, tables=[TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d2, tableName=t10, creationTimeMs=1781142392651, isPrimary=true, isTimePartitioned=true, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null), TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d2, tableName=t6, creationTimeMs=1781142352669, isPrimary=true, isTimePartitioned=true, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null), TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d2, tableName=t7, creationTimeMs=1781142364195, isPrimary=true, isTimePartitioned=true, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null), TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d2, tableName=t8, creationTimeMs=1781142375093, isPrimary=true, isTimePartitioned=true, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null), TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d2, tableName=t9, creationTimeMs=1781142384531, isPrimary=true, isTimePartitioned=true, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null)])
2026-06-11 02:17:40 INFO  OperationTask:167 - Launched a job with id ORPHAN_FILES_DELETION_BATCH_d3_2_082ed2fc-07da-49d5-b92e-1aa6d215e9a7 for TableMetadataBatch(super=Metadata(creator=), dbName=d3, tables=[TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d3, tableName=t2, creationTimeMs=1774646046137, isPrimary=true, isTimePartitioned=false, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null), TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d3, tableName=t3, creationTimeMs=1775245640368, isPrimary=true, isTimePartitioned=false, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null)])
2026-06-11 02:17:40 INFO  OperationTask:167 - Launched a job with id ORPHAN_FILES_DELETION_BATCH_d1_5_25dedd5e-499b-484f-b49a-32936caa2d2a for TableMetadataBatch(super=Metadata(creator=), dbName=d1, tables=[TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d1, tableName=t1, creationTimeMs=1781142262532, isPrimary=true, isTimePartitioned=true, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null), TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d1, tableName=t2, creationTimeMs=1781142286398, isPrimary=true, isTimePartitioned=true, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null), TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d1, tableName=t3, creationTimeMs=1781142305610, isPrimary=true, isTimePartitioned=true, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null), TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d1, tableName=t4, creationTimeMs=1781142316354, isPrimary=true, isTimePartitioned=true, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null), TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d1, tableName=t5, creationTimeMs=1781142326737, isPrimary=true, isTimePartitioned=true, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null)])
---------------------------------------------------------------------------------------
2026-06-11 02:27:43 INFO  OperationTask:181 - Exiting status check for ORPHAN_FILES_DELETION_BATCH for TableMetadataBatch(super=Metadata(creator=), dbName=d2, tables=[TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d2, tableName=t10, creationTimeMs=1781142392651, isPrimary=true, isTimePartitioned=true, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null), TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d2, tableName=t6, creationTimeMs=1781142352669, isPrimary=true, isTimePartitioned=true, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null), TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d2, tableName=t7, creationTimeMs=1781142364195, isPrimary=true, isTimePartitioned=true, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null), TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d2, tableName=t8, creationTimeMs=1781142375093, isPrimary=true, isTimePartitioned=true, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null), TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d2, tableName=t9, creationTimeMs=1781142384531, isPrimary=true, isTimePartitioned=true, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null)]) due to queued timeout
2026-06-11 02:27:43 INFO  OperationTask:181 - Exiting status check for ORPHAN_FILES_DELETION_BATCH for TableMetadataBatch(super=Metadata(creator=), dbName=d1, tables=[TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d1, tableName=t1, creationTimeMs=1781142262532, isPrimary=true, isTimePartitioned=true, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null), TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d1, tableName=t2, creationTimeMs=1781142286398, isPrimary=true, isTimePartitioned=true, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null), TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d1, tableName=t3, creationTimeMs=1781142305610, isPrimary=true, isTimePartitioned=true, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null), TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d1, tableName=t4, creationTimeMs=1781142316354, isPrimary=true, isTimePartitioned=true, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null), TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d1, tableName=t5, creationTimeMs=1781142326737, isPrimary=true, isTimePartitioned=true, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null)]) due to queued timeout
2026-06-11 02:27:43 INFO  OperationTask:181 - Exiting status check for ORPHAN_FILES_DELETION_BATCH for TableMetadataBatch(super=Metadata(creator=), dbName=d3, tables=[TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d3, tableName=t2, creationTimeMs=1774646046137, isPrimary=true, isTimePartitioned=false, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null), TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d3, tableName=t3, creationTimeMs=1775245640368, isPrimary=true, isTimePartitioned=false, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null)]) due to queued timeout
2026-06-11 02:27:43 INFO  OperationTask:236 - Finished job for entity TableMetadataBatch(super=Metadata(creator=), dbName=d1, tables=[TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d1, tableName=t1, creationTimeMs=1781142262532, isPrimary=true, isTimePartitioned=true, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null), TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d1, tableName=t2, creationTimeMs=1781142286398, isPrimary=true, isTimePartitioned=true, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null), TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d1, tableName=t3, creationTimeMs=1781142305610, isPrimary=true, isTimePartitioned=true, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null), TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d1, tableName=t4, creationTimeMs=1781142316354, isPrimary=true, isTimePartitioned=true, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null), TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d1, tableName=t5, creationTimeMs=1781142326737, isPrimary=true, isTimePartitioned=true, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null)]): JobId ORPHAN_FILES_DELETION_BATCH_d1_5_25dedd5e-499b-484f-b49a-32936caa2d2a, executionId 2, runTime -1781144260595, queuedTime -1781144260595, state QUEUED
2026-06-11 02:27:43 INFO  OperationTask:236 - Finished job for entity TableMetadataBatch(super=Metadata(creator=), dbName=d2, tables=[TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d2, tableName=t10, creationTimeMs=1781142392651, isPrimary=true, isTimePartitioned=true, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null), TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d2, tableName=t6, creationTimeMs=1781142352669, isPrimary=true, isTimePartitioned=true, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null), TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d2, tableName=t7, creationTimeMs=1781142364195, isPrimary=true, isTimePartitioned=true, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null), TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d2, tableName=t8, creationTimeMs=1781142375093, isPrimary=true, isTimePartitioned=true, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null), TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d2, tableName=t9, creationTimeMs=1781142384531, isPrimary=true, isTimePartitioned=true, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null)]): JobId ORPHAN_FILES_DELETION_BATCH_d2_5_eae0ba6d-bff8-4f85-ad33-4a4f399e93c3, executionId 0, runTime -1781144259215, queuedTime -1781144259215, state QUEUED
2026-06-11 02:27:43 INFO  OperationTask:236 - Finished job for entity TableMetadataBatch(super=Metadata(creator=), dbName=d3, tables=[TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d3, tableName=t2, creationTimeMs=1774646046137, isPrimary=true, isTimePartitioned=false, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null), TableMetadata(super=Metadata(creator=DUMMY_ANONYMOUS_USER), dbName=d3, tableName=t3, creationTimeMs=1775245640368, isPrimary=true, isTimePartitioned=false, isClustered=true, jobExecutionProperties={}, retentionConfig=null, historyConfig=null, replicationConfig=null)]): JobId ORPHAN_FILES_DELETION_BATCH_d3_2_082ed2fc-07da-49d5-b92e-1aa6d215e9a7, executionId 1, runTime -1781144260505, queuedTime -1781144260505, state QUEUED
2026-06-11 02:27:43 INFO  JobsScheduler:1166 - Collected job state for task ORPHAN_FILES_DELETION_BATCH_d3_2_082ed2fc-07da-49d5-b92e-1aa6d215e9a7: QUEUED
2026-06-11 02:27:43 INFO  JobsScheduler:1166 - Collected job state for task ORPHAN_FILES_DELETION_BATCH_d1_5_25dedd5e-499b-484f-b49a-32936caa2d2a: QUEUED
2026-06-11 02:27:43 INFO  JobsScheduler:1166 - Collected job state for task ORPHAN_FILES_DELETION_BATCH_d2_5_eae0ba6d-bff8-4f85-ad33-4a4f399e93c3: QUEUED
2026-06-11 02:27:43 INFO  JobsScheduler:1183 - Completed collecting jobs state on futures for job type ORPHAN_FILES_DELETION_BATCH
2026-06-11 02:27:43 INFO  JobsScheduler:387 - Finishing scheduler for job type ORPHAN_FILES_DELETION_BATCH, tasks stats: 3 created, 0 succeeded, 0 running, 3 queued 0 cancelled (scheduler timeout), 0 failed, 0 skipped (no state)
2026-06-11 02:27:43 INFO  JobsScheduler:401 - The total run duration of job scheduler for job type ORPHAN_FILES_DELETION_BATCH is : 00:10 hours (HH:mm format)
2026-06-11 02:27:43 INFO  JobsScheduler:259 - SIGTERM received, initiating graceful shutdown of scheduler...
2026-06-11 02:27:43 INFO  JobsScheduler:178 - Initiating graceful shutdown of scheduler...
2026-06-11 02:27:43 INFO  JobsScheduler:266 - Shutdown hook completed.

For all the boxes checked, include a detailed description of the testing done for the changes made in this pull request.

Additional Information

  • Breaking Changes
  • Deprecations
  • Large PR broken into smaller PRs, and PR plan linked in the description.

For all the boxes checked, include additional details of the changes made in this pull request.

* <p>Construct with the {@link Config} builder to override the default timeouts.
*/
@Slf4j
public class OptimizerServiceClient implements AutoCloseable {

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the previous PR #599

@abhisheknath2011 abhisheknath2011 changed the title Batched tables Integrate batched orphan files deletion with the existing schedule workflow May 27, 2026
.build();

Map<String, List<TableMetadata>> byDb =
eligible.stream().collect(Collectors.groupingBy(TableMetadata::getDbName));

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Group the tables by DBs.

*/
@Slf4j
@Getter
public class BatchedTableOrphanFilesDeletionTask extends OperationTask<TableMetadataBatch> {

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Task implementation for batch job submission.

abhisheknath2011 added a commit that referenced this pull request Jun 8, 2026
## Summary
This is a follow up of Optimizer series PRs (Previous PR #534).
Introduces `BatchedOrphanFilesDeletionSparkApp`, the multi-table
counterpart of the existing single-table `OrphanFilesDeletionSparkApp`.
One Spark job now processes a list of `(table, operationId)` pairs that
the optimizer scheduler bin-packed into a single batch, reporting
SUCCESS/FAILED per operation back to the Optimizer Service.

Stands up the **`optimizerclient` codegen module** so consumers (this
app, and future ones) talk to the Optimizer Service via an
auto-generated client rather than hand-rolled HTTP — mirrors the
existing
  `:client:jobsclient`/`:client:tableclient` pattern.

Lands a **first-fit-decreasing bin packer** that is used in the followup
PR (#604 ) to assemble tables in batches. Also consolidated existing bin
packer under this.

  ### Key design choices

- **Per-table failure isolation** — exceptions in one table are caught,
FAILED is posted for that operationId, and remaining tables continue.
The job exits 0 if at least one table succeeds.
- **Recoverable result reporting** — if `POST
/v1/optimizer/operations/{id}/update` fails after retries, the row stays
`SCHEDULED` and the Analyzer's stale-timeout will re-queue it. No retry
storms in the Spark driver.
- **Optional optimizer-service callbacks** —
`--resultsEndpoint`/`--operationIds`/`--tableUuids` are all optional, so
the legacy `JobsScheduler` can launch the app without optimizer
integration. When absent, the per-operation callback is
  skipped; HTS `StateManager` still tracks per-job lifecycle.
- **Scheduler decides parallelism, not the app** — `--driverParallelism`
is honoured verbatim; the app never picks its own thread count.
- **`MAX_BATCH_SIZE = 200`** — hard ceiling enforced at `buildEntries`
(Spark side) so a misconfigured scheduler can't blow past `ARG_MAX`.
Operating point stays `--batchMaxItems 25`.

  ### Optimizer client (codegen)

- **`services:optimizer`** wired with the same
`service-specgen-convention` + springdoc plugins as
`services:jobs`/`services:tables`/`services:housetables`. Port 8003
(slots into the existing 8000/8001/8002 allocation).
- **`client:optimizerclient`** — three-line `build.gradle` mirroring
`client:jobsclient`; generates
`com.linkedin.openhouse.optimizer.client.{api,model,invoker}` from the
spec.
- **`client:secureclient`** picks up `OptimizerApiClientFactory`
(parallels `JobsApiClientFactory`) for SSL/auth-aware `ApiClient`
construction.
- **`OptimizerServiceClient`** in `apps:spark` is a thin wrapper around
the generated `TableOperationsControllerApi`, mirroring `JobsClient`'s
shape — `RetryTemplate` (new
`RetryUtil.getOptimizerApiRetryTemplate()`), surface is
`Optional<TableOperationsHistory> updateOperation(operationId,
request)`. No more OkHttp, no more hand-rolled DTOs.

 ### Bin packer
- `FirstFitDecreasingBinPacker` — FFD by weight with secondary caps on
bytes and item count; oversized items get a dedicated bin. Reuses the
existing `Bin`/`BinItem` types from #534.
- FirstFitBinPacker.java and FirstFitBinPackerTest.java — deleted
(renamed / consolidated).

  ### Batched Spark app
- `BatchedOrphanFilesDeletionSparkApp` — extends `BaseSparkApp`;
iterates entries via a fixed thread pool, reuses
`Operations.deleteOrphanFiles(...)` per table, posts per-operation
status, runs the existing `TableStateValidator` per table

### Additional changes
  **Service-side spec generation (`services/optimizer`)**
- `build.gradle` — added `service-specgen-convention` + springdoc +
processes plugins. `openApi.customBootRun` overrides the production
MySQL DataSource with in-memory H2 at build time (overrides live in
`build.gradle`, not a profile file, so
   they don't ship inside the fat jar). Port 8003.
- `developmentOnly 'com.h2database:h2:2.1.210'` — H2 reaches
`bootRun`/specgen but is excluded from `bootJar` and from
`api`/`runtimeElements` propagation. Pinned to 2.1.210 to match the
version `:iceberg:openhouse:htscatalog` already pulls
in repo-wide and avoid the slf4j-2.x bump that 2.2.x would have
introduced.

  **Client codegen (`client/optimizerclient`)**
- New module (3-line `build.gradle`) generating
`TableOperationsControllerApi`, `TableStatsControllerApi`,
`TableOperationsHistoryControllerApi` + their DTOs.
- `client/secureclient/OptimizerApiClientFactory.java` — SSL/auth-aware
factory mirroring `JobsApiClientFactory`.
  - `settings.gradle` — registers `:client:optimizerclient`.

  **Bin packer (`libs/optimizer/binpack`)**
- `FirstFitDecreasingBinPacker.java` (moved from apps/spark).
Pre-projected `BinItem` inputs, weight + item-count caps, oversized
items get their own bin.
- `FirstFitDecreasingBinPackerTest.java` (moved + adapted to the libs
`BinItem` interface).

  **Spark app (`apps/spark`)**
- `spark/BatchedOrphanFilesDeletionSparkApp.java` — multi-table OFD with
worker-thread pool, per-table failure isolation,
`Iterables.size()`-based orphan count (bounded driver heap),
`MAX_BATCH_SIZE=200` guard.
- `spark/optimizer/OptimizerServiceClient.java` — wraps generated
`TableOperationsControllerApi`.
  - `util/RetryUtil.java` — added `getOptimizerApiRetryTemplate()`.
- `build.gradle` — depends on `:libs:optimizer:binpack` with a targeted
`exclude` on `log4j-slf4j2-impl` (transitively brought by
services:optimizer; conflicts with apps:spark's bundled
`log4j-slf4j-impl` 1.x bridge).

**Test using CLI:**
  --tableNames db.t1,db.t2,db.t3
  --operationIds op-uuid-1,op-uuid-2,op-uuid-3
  --tableUuids tab-uuid-1,tab-uuid-2,tab-uuid-3
  --resultsEndpoint http://optimizer.svc:8080
  --driverParallelism 4
plus existing OFD knobs (`--ttl`, `--backupDir`, `--concurrentDeletes`,
`--streamResults`, `--maxOrphanFileSampleSize`).

<!--- HINT: Replace #nnn with corresponding Issue number, if you are
fixing an existing issue -->

[Issue](https://github.com/linkedin/openhouse/issues/#nnn)] Briefly
discuss the summary of the changes made in this
pull request in 2-3 lines.

## Changes

- [ ] Client-facing API Changes
- [ ] Internal API Changes
- [ ] Bug Fixes
- [x] New Features
- [ ] Performance Improvements
- [ ] Code Style
- [ ] Refactoring
- [ ] Documentation
- [x] Tests

For all the boxes checked, please include additional details of the
changes made in this pull request.

## Testing Done
<!--- Check any relevant boxes with "x" -->

- [ ] Manually Tested on local docker setup. Please include commands
ran, and their output.
- [x] Added new tests for the changes made.
- [ ] Updated existing tests to reflect the changes made.
- [ ] No tests added or updated. Please explain why. If unsure, please
feel free to ask for help.
- [ ] Some other form of testing like staging or soak time in
production. Please explain.

For all the boxes checked, include a detailed description of the testing
done for the changes made in this pull request.

# Additional Information

- [ ] Breaking Changes
- [ ] Deprecations
- [ ] Large PR broken into smaller PRs, and PR plan linked in the
description.

  **Open items for reviewers:** 
- `OptimizerServiceClient` is add on top of the generated
`TableOperationsControllerApi` — matches `JobsClient`'s pattern. Fine,
or should it live inside `:client:optimizerclient` as a higher-level
facade?
- Apps:spark currently has to `exclude log4j-slf4j2-impl` when depending
on `:libs:optimizer:binpack` because services:optimizer transitively
brings the slf4j-2.x bridge while apps:spark ships the 1.x bridge. A
repo-wide bridge-alignment
cleanup would let us drop this exclude; happy to file a separate issue.
- The bin packer ends up as two siblings in libs (`FirstFitBinPacker` —
optimizer-coupled; `FirstFitDecreasingBinPacker` — agnostic).
Consolidation is done in FirstFitDecreasingBinPacker.
- Builder defaults (maxWeightPerBin = 1_000_000L, maxItemsPerBin = 50)
are sized for OFD. If we expect other operation types with materially
different cost shapes, we may want to move the defaults out of the
packer class and into the per-operation config (Spring @value already
provides this in SchedulerConfig).
 
For all the boxes checked, include additional details of the changes
made in this pull request.
@abhisheknath2011 abhisheknath2011 marked this pull request as ready for review June 10, 2026 23:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant