Skip to content
Merged
4 changes: 4 additions & 0 deletions apps/spark/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ dependencies {

implementation "org.apache.logging.log4j:log4j-slf4j-impl:${log4jVersion}"
implementation project(':libs:datalayout')
// Exclude log4j-slf4j2-impl: incompatible with the log4j-slf4j-impl (1.x) bridge this app ships.
implementation(project(':libs:optimizer:binpack')) {
exclude group: 'org.apache.logging.log4j', module: 'log4j-slf4j2-impl'
}
implementation ('org.apache.spark:spark-core_2.12:' + sparkVersion) {
exclude group: 'io.netty'
exclude group: 'org.apache.hadoop', module: 'hadoop-common'
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package com.linkedin.openhouse.jobs.spark.optimizer;

import com.linkedin.openhouse.client.ssl.OptimizerApiClientFactory;
import com.linkedin.openhouse.jobs.util.RetryUtil;
import com.linkedin.openhouse.optimizer.client.api.TableOperationsControllerApi;
import com.linkedin.openhouse.optimizer.client.invoker.ApiClient;
import com.linkedin.openhouse.optimizer.client.model.TableOperationsHistory;
import com.linkedin.openhouse.optimizer.client.model.UpdateOperationRequest;
import java.net.MalformedURLException;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import javax.net.ssl.SSLException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.support.RetryTemplate;

/**
* Thin wrapper around the generated optimizer-service {@link TableOperationsControllerApi}. Mirrors
* the structure of {@link com.linkedin.openhouse.jobs.client.JobsClient}: a hand-written facade
* over the auto-generated client, wired with a {@link RetryTemplate} that retries 5xx with
* exponential backoff.
*
* <p>The batched Spark app calls {@link #updateOperation(String, UpdateOperationRequest)} once per
* finished operation to record SUCCESS or FAILED. Per the design, a missed update is recoverable —
* the operation row stays {@code SCHEDULED} and the Analyzer's stale-timeout will re-queue it — so
* this client surfaces but does not swallow failures.
*/
@Slf4j
public class OptimizerServiceClient {
Comment thread
abhisheknath2011 marked this conversation as resolved.

private static final int REQUEST_TIMEOUT_SECONDS = 5;

private final RetryTemplate retryTemplate;
private final TableOperationsControllerApi api;

public OptimizerServiceClient(String baseUrl) {
this(baseUrl, null);
}

public OptimizerServiceClient(String baseUrl, String truststoreLocation) {
this(RetryUtil.getOptimizerApiRetryTemplate(), buildApi(baseUrl, truststoreLocation));
}

OptimizerServiceClient(RetryTemplate retryTemplate, TableOperationsControllerApi api) {
this.retryTemplate = retryTemplate;
this.api = api;
}

/**
* Reports a terminal status for {@code operationId}. The path id and the body's {@code
* operationId} must match — callers should set both via the same value.
*
* @return the created history record, or {@link Optional#empty()} if the call failed after
* retries (logged; the caller decides what to do).
*/
public Optional<TableOperationsHistory> updateOperation(
String operationId, UpdateOperationRequest request) {
Objects.requireNonNull(operationId, "operationId");
Objects.requireNonNull(request, "request");
return Optional.ofNullable(
RetryUtil.executeWithRetry(
retryTemplate,
(RetryCallback<TableOperationsHistory, Exception>)
context ->
api.updateOperation(operationId, request)
.block(Duration.ofSeconds(REQUEST_TIMEOUT_SECONDS)),
null));
}

private static TableOperationsControllerApi buildApi(String baseUrl, String truststoreLocation) {
Objects.requireNonNull(baseUrl, "baseUrl");
try {
ApiClient apiClient =
OptimizerApiClientFactory.getInstance()
.createApiClient(baseUrl, null, truststoreLocation);
return new TableOperationsControllerApi(apiClient);
} catch (MalformedURLException | SSLException e) {
throw new RuntimeException(
"Failed to construct optimizer service client for baseUrl=" + baseUrl, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ public static RetryTemplate getTablesApiRetryTemplate() {
.build();
}

public static RetryTemplate getOptimizerApiRetryTemplate() {
RetryTemplateBuilder builder = new RetryTemplateBuilder();
return builder
.maxAttempts(5)
Comment thread
abhisheknath2011 marked this conversation as resolved.
.customBackoff(DEFAULT_JOBS_BACKOFF_POLICY)
.retryOn(WebClientResponseException.InternalServerError.class)
.build();
}

public static RetryTemplate getTrinoClientRetryTemplate() {
RetryTemplateBuilder builder = new RetryTemplateBuilder();
return builder
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package com.linkedin.openhouse.jobs.spark;

import java.util.List;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

/**
* Pure-Java unit tests for {@link BatchedOrphanFilesDeletionSparkApp#buildEntries}. No Spark
* session, no HTTP — exercises the CLI-parsing edges that decide whether the app can even start.
*/
public class BatchedOrphanFilesDeletionSparkAppArgsTest {

@Test
public void buildEntriesParsesParallelLists() {
List<BatchedOrphanFilesDeletionSparkApp.BatchEntry> entries =
BatchedOrphanFilesDeletionSparkApp.buildEntries(
"db1.t1,db2.t2", "op-1,op-2", "uuid-1,uuid-2");

Assertions.assertEquals(2, entries.size());
Assertions.assertEquals("db1.t1", entries.get(0).getFqtn());
Assertions.assertEquals("db1", entries.get(0).getDatabaseName());
Assertions.assertEquals("t1", entries.get(0).getTableName());
Assertions.assertEquals("op-1", entries.get(0).getOperationId());
Assertions.assertEquals("uuid-1", entries.get(0).getTableUuid());
Assertions.assertEquals("db2.t2", entries.get(1).getFqtn());
Assertions.assertEquals("op-2", entries.get(1).getOperationId());
}

@Test
public void buildEntriesTrimsWhitespaceInEachEntry() {
List<BatchedOrphanFilesDeletionSparkApp.BatchEntry> entries =
BatchedOrphanFilesDeletionSparkApp.buildEntries(
" db1.t1 , db2.t2 ", " op-1 , op-2 ", " uuid-1 , uuid-2 ");

Assertions.assertEquals("db1.t1", entries.get(0).getFqtn());
Assertions.assertEquals("op-1", entries.get(0).getOperationId());
Assertions.assertEquals("uuid-1", entries.get(0).getTableUuid());
}

@Test
public void buildEntriesRejectsMismatchedLengths() {
Assertions.assertThrows(
IllegalArgumentException.class,
() ->
BatchedOrphanFilesDeletionSparkApp.buildEntries("db.a,db.b", "op-1", "uuid-1,uuid-2"));
}

@Test
public void buildEntriesRejectsNullArguments() {
Assertions.assertThrows(
IllegalArgumentException.class,
() -> BatchedOrphanFilesDeletionSparkApp.buildEntries(null, "op-1", "uuid-1"));
Assertions.assertThrows(
IllegalArgumentException.class,
() -> BatchedOrphanFilesDeletionSparkApp.buildEntries("db.a", null, "uuid-1"));
Assertions.assertThrows(
IllegalArgumentException.class,
() -> BatchedOrphanFilesDeletionSparkApp.buildEntries("db.a", "op-1", null));
}

@Test
public void buildEntriesRejectsEmptyStrings() {
Assertions.assertThrows(
IllegalArgumentException.class,
() -> BatchedOrphanFilesDeletionSparkApp.buildEntries("", "op-1", "uuid-1"));
}

@Test
public void buildEntriesRejectsNonFqtn() {
Assertions.assertThrows(
IllegalArgumentException.class,
() -> BatchedOrphanFilesDeletionSparkApp.buildEntries("just_a_table", "op-1", "uuid-1"));
}
}
11 changes: 11 additions & 0 deletions client/optimizerclient/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
plugins {
id 'openhouse.java-minimal-conventions'
id 'openhouse.client-codegen-convention'
id 'openhouse.maven-publish'
}

ext {
codeGenForService = ":services:optimizer"
}

apply from: "${project(':client:common').file("codegen.build.gradle")}"
1 change: 1 addition & 0 deletions client/secureclient/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ plugins {
dependencies {
api project(':client:tableclient')
api project(':client:jobsclient')
api project(':client:optimizerclient')
api project(':client:hts')
api 'org.springframework.boot:spring-boot-starter-webflux:2.7.8'
testImplementation 'io.netty:netty-resolver-dns-native-macos:4.1.70.Final:osx-x86_64'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.linkedin.openhouse.client.ssl;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkedin.openhouse.optimizer.client.invoker.ApiClient;
import java.net.MalformedURLException;
import java.text.DateFormat;
import javax.net.ssl.SSLException;
import lombok.NonNull;
import org.springframework.web.reactive.function.client.WebClient;

/** Factory to create optimizer-specific {@link ApiClient}. Mirrors {@link JobsApiClientFactory}. */
public final class OptimizerApiClientFactory extends WebClientFactory {

private static OptimizerApiClientFactory instance;

private OptimizerApiClientFactory() {
super();
}

public static synchronized OptimizerApiClientFactory getInstance() {
if (null == instance) {
instance = new OptimizerApiClientFactory();
}
return instance;
}

@Override
protected WebClient.Builder createWebClientBuilder() {
DateFormat defaultDateFormat = ApiClient.createDefaultDateFormat();
ObjectMapper defaultObjectMapper =
ApiClient.createDefaultObjectMapper(defaultDateFormat)
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
return ApiClient.buildWebClientBuilder(defaultObjectMapper);
}

/**
* Creates the optimizer-specific {@link ApiClient} that the generated {@code
* TableOperationsControllerApi} / {@code TableStatsControllerApi} / {@code
* TableOperationsHistoryControllerApi} wrap.
*/
public ApiClient createApiClient(@NonNull String baseUrl, String token, String truststoreLocation)
throws MalformedURLException, SSLException {
WebClient webClient = createWebClient(baseUrl, token, truststoreLocation);
ApiClient apiClient = new ApiClient(webClient);
apiClient.setBasePath(baseUrl);
return apiClient;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
* when it launches a Spark job (fully-qualified table name, operation id).
*
* <p>Implementations have a public no-arg constructor — instantiated transiently inside {@link
* FirstFitBinPacker#pack} via a {@code Supplier<T extends BinItem>} (typically a {@code
* FirstFitDecreasingBinPacker#pack} via a {@code Supplier<T extends BinItem>} (typically a {@code
* MyItem::new} method reference) — on which {@link #fromOpAndStats} is called to return the
* populated item. Getters on the empty instance are not meaningful; it exists for the lifetime of a
* single projection call.
Expand Down

This file was deleted.

Loading