-
Notifications
You must be signed in to change notification settings - Fork 78
Optimizer: Batched orphan file deletion using bin packing #599
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
200508e
Optimizer: Batched orphan file deletion using bin packing
abhisheknath2011 ff6c881
Addressed review comments
abhisheknath2011 ec76920
Count orphan files using Iterables to reduce driver memeory usage
abhisheknath2011 9f4a7f0
Merge branch 'main' into batched-ofd
abhisheknath2011 6bad9e1
Refactored as bin packing libs are added as part of the PR #534
abhisheknath2011 9e5e071
Sets up optimizerclient codegen module and use the client
abhisheknath2011 432732c
Consolidate all the bin packer functionality to a single FirstFitDecr…
abhisheknath2011 d976840
Addressed Mike's review comments
abhisheknath2011 24cdff8
Fix for compilation issue
abhisheknath2011 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
454 changes: 454 additions & 0 deletions
454
...k/src/main/java/com/linkedin/openhouse/jobs/spark/BatchedOrphanFilesDeletionSparkApp.java
Large diffs are not rendered by default.
Oops, something went wrong.
83 changes: 83 additions & 0 deletions
83
...ark/src/main/java/com/linkedin/openhouse/jobs/spark/optimizer/OptimizerServiceClient.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,83 @@ | ||
| package com.linkedin.openhouse.jobs.spark.optimizer; | ||
|
|
||
| import com.linkedin.openhouse.client.ssl.OptimizerApiClientFactory; | ||
| import com.linkedin.openhouse.jobs.util.RetryUtil; | ||
| import com.linkedin.openhouse.optimizer.client.api.TableOperationsControllerApi; | ||
| import com.linkedin.openhouse.optimizer.client.invoker.ApiClient; | ||
| import com.linkedin.openhouse.optimizer.client.model.TableOperationsHistory; | ||
| import com.linkedin.openhouse.optimizer.client.model.UpdateOperationRequest; | ||
| import java.net.MalformedURLException; | ||
| import java.time.Duration; | ||
| import java.util.Objects; | ||
| import java.util.Optional; | ||
| import javax.net.ssl.SSLException; | ||
| import lombok.extern.slf4j.Slf4j; | ||
| import org.springframework.retry.RetryCallback; | ||
| import org.springframework.retry.support.RetryTemplate; | ||
|
|
||
| /** | ||
| * Thin wrapper around the generated optimizer-service {@link TableOperationsControllerApi}. Mirrors | ||
| * the structure of {@link com.linkedin.openhouse.jobs.client.JobsClient}: a hand-written facade | ||
| * over the auto-generated client, wired with a {@link RetryTemplate} that retries 5xx with | ||
| * exponential backoff. | ||
| * | ||
| * <p>The batched Spark app calls {@link #updateOperation(String, UpdateOperationRequest)} once per | ||
| * finished operation to record SUCCESS or FAILED. Per the design, a missed update is recoverable — | ||
| * the operation row stays {@code SCHEDULED} and the Analyzer's stale-timeout will re-queue it — so | ||
| * this client surfaces but does not swallow failures. | ||
| */ | ||
| @Slf4j | ||
| public class OptimizerServiceClient { | ||
|
|
||
| private static final int REQUEST_TIMEOUT_SECONDS = 5; | ||
|
|
||
| private final RetryTemplate retryTemplate; | ||
| private final TableOperationsControllerApi api; | ||
|
|
||
| public OptimizerServiceClient(String baseUrl) { | ||
| this(baseUrl, null); | ||
| } | ||
|
|
||
| public OptimizerServiceClient(String baseUrl, String truststoreLocation) { | ||
| this(RetryUtil.getOptimizerApiRetryTemplate(), buildApi(baseUrl, truststoreLocation)); | ||
| } | ||
|
|
||
| OptimizerServiceClient(RetryTemplate retryTemplate, TableOperationsControllerApi api) { | ||
| this.retryTemplate = retryTemplate; | ||
| this.api = api; | ||
| } | ||
|
|
||
| /** | ||
| * Reports a terminal status for {@code operationId}. The path id and the body's {@code | ||
| * operationId} must match — callers should set both via the same value. | ||
| * | ||
| * @return the created history record, or {@link Optional#empty()} if the call failed after | ||
| * retries (logged; the caller decides what to do). | ||
| */ | ||
| public Optional<TableOperationsHistory> updateOperation( | ||
| String operationId, UpdateOperationRequest request) { | ||
| Objects.requireNonNull(operationId, "operationId"); | ||
| Objects.requireNonNull(request, "request"); | ||
| return Optional.ofNullable( | ||
| RetryUtil.executeWithRetry( | ||
| retryTemplate, | ||
| (RetryCallback<TableOperationsHistory, Exception>) | ||
| context -> | ||
| api.updateOperation(operationId, request) | ||
| .block(Duration.ofSeconds(REQUEST_TIMEOUT_SECONDS)), | ||
| null)); | ||
| } | ||
|
|
||
| private static TableOperationsControllerApi buildApi(String baseUrl, String truststoreLocation) { | ||
| Objects.requireNonNull(baseUrl, "baseUrl"); | ||
| try { | ||
| ApiClient apiClient = | ||
| OptimizerApiClientFactory.getInstance() | ||
| .createApiClient(baseUrl, null, truststoreLocation); | ||
| return new TableOperationsControllerApi(apiClient); | ||
| } catch (MalformedURLException | SSLException e) { | ||
| throw new RuntimeException( | ||
| "Failed to construct optimizer service client for baseUrl=" + baseUrl, e); | ||
| } | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
74 changes: 74 additions & 0 deletions
74
...st/java/com/linkedin/openhouse/jobs/spark/BatchedOrphanFilesDeletionSparkAppArgsTest.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,74 @@ | ||
| package com.linkedin.openhouse.jobs.spark; | ||
|
|
||
| import java.util.List; | ||
| import org.junit.jupiter.api.Assertions; | ||
| import org.junit.jupiter.api.Test; | ||
|
|
||
| /** | ||
| * Pure-Java unit tests for {@link BatchedOrphanFilesDeletionSparkApp#buildEntries}. No Spark | ||
| * session, no HTTP — exercises the CLI-parsing edges that decide whether the app can even start. | ||
| */ | ||
| public class BatchedOrphanFilesDeletionSparkAppArgsTest { | ||
|
|
||
| @Test | ||
| public void buildEntriesParsesParallelLists() { | ||
| List<BatchedOrphanFilesDeletionSparkApp.BatchEntry> entries = | ||
| BatchedOrphanFilesDeletionSparkApp.buildEntries( | ||
| "db1.t1,db2.t2", "op-1,op-2", "uuid-1,uuid-2"); | ||
|
|
||
| Assertions.assertEquals(2, entries.size()); | ||
| Assertions.assertEquals("db1.t1", entries.get(0).getFqtn()); | ||
| Assertions.assertEquals("db1", entries.get(0).getDatabaseName()); | ||
| Assertions.assertEquals("t1", entries.get(0).getTableName()); | ||
| Assertions.assertEquals("op-1", entries.get(0).getOperationId()); | ||
| Assertions.assertEquals("uuid-1", entries.get(0).getTableUuid()); | ||
| Assertions.assertEquals("db2.t2", entries.get(1).getFqtn()); | ||
| Assertions.assertEquals("op-2", entries.get(1).getOperationId()); | ||
| } | ||
|
|
||
| @Test | ||
| public void buildEntriesTrimsWhitespaceInEachEntry() { | ||
| List<BatchedOrphanFilesDeletionSparkApp.BatchEntry> entries = | ||
| BatchedOrphanFilesDeletionSparkApp.buildEntries( | ||
| " db1.t1 , db2.t2 ", " op-1 , op-2 ", " uuid-1 , uuid-2 "); | ||
|
|
||
| Assertions.assertEquals("db1.t1", entries.get(0).getFqtn()); | ||
| Assertions.assertEquals("op-1", entries.get(0).getOperationId()); | ||
| Assertions.assertEquals("uuid-1", entries.get(0).getTableUuid()); | ||
| } | ||
|
|
||
| @Test | ||
| public void buildEntriesRejectsMismatchedLengths() { | ||
| Assertions.assertThrows( | ||
| IllegalArgumentException.class, | ||
| () -> | ||
| BatchedOrphanFilesDeletionSparkApp.buildEntries("db.a,db.b", "op-1", "uuid-1,uuid-2")); | ||
| } | ||
|
|
||
| @Test | ||
| public void buildEntriesRejectsNullArguments() { | ||
| Assertions.assertThrows( | ||
| IllegalArgumentException.class, | ||
| () -> BatchedOrphanFilesDeletionSparkApp.buildEntries(null, "op-1", "uuid-1")); | ||
| Assertions.assertThrows( | ||
| IllegalArgumentException.class, | ||
| () -> BatchedOrphanFilesDeletionSparkApp.buildEntries("db.a", null, "uuid-1")); | ||
| Assertions.assertThrows( | ||
| IllegalArgumentException.class, | ||
| () -> BatchedOrphanFilesDeletionSparkApp.buildEntries("db.a", "op-1", null)); | ||
| } | ||
|
|
||
| @Test | ||
| public void buildEntriesRejectsEmptyStrings() { | ||
| Assertions.assertThrows( | ||
| IllegalArgumentException.class, | ||
| () -> BatchedOrphanFilesDeletionSparkApp.buildEntries("", "op-1", "uuid-1")); | ||
| } | ||
|
|
||
| @Test | ||
| public void buildEntriesRejectsNonFqtn() { | ||
| Assertions.assertThrows( | ||
| IllegalArgumentException.class, | ||
| () -> BatchedOrphanFilesDeletionSparkApp.buildEntries("just_a_table", "op-1", "uuid-1")); | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,11 @@ | ||
| plugins { | ||
| id 'openhouse.java-minimal-conventions' | ||
| id 'openhouse.client-codegen-convention' | ||
| id 'openhouse.maven-publish' | ||
| } | ||
|
|
||
| ext { | ||
| codeGenForService = ":services:optimizer" | ||
| } | ||
|
|
||
| apply from: "${project(':client:common').file("codegen.build.gradle")}" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
49 changes: 49 additions & 0 deletions
49
...cureclient/src/main/java/com/linkedin/openhouse/client/ssl/OptimizerApiClientFactory.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,49 @@ | ||
| package com.linkedin.openhouse.client.ssl; | ||
|
|
||
| import com.fasterxml.jackson.databind.DeserializationFeature; | ||
| import com.fasterxml.jackson.databind.ObjectMapper; | ||
| import com.linkedin.openhouse.optimizer.client.invoker.ApiClient; | ||
| import java.net.MalformedURLException; | ||
| import java.text.DateFormat; | ||
| import javax.net.ssl.SSLException; | ||
| import lombok.NonNull; | ||
| import org.springframework.web.reactive.function.client.WebClient; | ||
|
|
||
| /** Factory to create optimizer-specific {@link ApiClient}. Mirrors {@link JobsApiClientFactory}. */ | ||
| public final class OptimizerApiClientFactory extends WebClientFactory { | ||
|
|
||
| private static OptimizerApiClientFactory instance; | ||
|
|
||
| private OptimizerApiClientFactory() { | ||
| super(); | ||
| } | ||
|
|
||
| public static synchronized OptimizerApiClientFactory getInstance() { | ||
| if (null == instance) { | ||
| instance = new OptimizerApiClientFactory(); | ||
| } | ||
| return instance; | ||
| } | ||
|
|
||
| @Override | ||
| protected WebClient.Builder createWebClientBuilder() { | ||
| DateFormat defaultDateFormat = ApiClient.createDefaultDateFormat(); | ||
| ObjectMapper defaultObjectMapper = | ||
| ApiClient.createDefaultObjectMapper(defaultDateFormat) | ||
| .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); | ||
| return ApiClient.buildWebClientBuilder(defaultObjectMapper); | ||
| } | ||
|
|
||
| /** | ||
| * Creates the optimizer-specific {@link ApiClient} that the generated {@code | ||
| * TableOperationsControllerApi} / {@code TableStatsControllerApi} / {@code | ||
| * TableOperationsHistoryControllerApi} wrap. | ||
| */ | ||
| public ApiClient createApiClient(@NonNull String baseUrl, String token, String truststoreLocation) | ||
| throws MalformedURLException, SSLException { | ||
| WebClient webClient = createWebClient(baseUrl, token, truststoreLocation); | ||
| ApiClient apiClient = new ApiClient(webClient); | ||
| apiClient.setBasePath(baseUrl); | ||
| return apiClient; | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
92 changes: 0 additions & 92 deletions
92
...zer/binpack/src/main/java/com/linkedin/openhouse/optimizer/binpack/FirstFitBinPacker.java
This file was deleted.
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.