Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions runners/flink/flink_runner.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ dependencies {
implementation library.java.slf4j_api
implementation library.java.joda_time
implementation library.java.args4j
implementation library.java.failsafe

// Flink 1.15 shades all remaining scala dependencies and therefor does not depend on a specific version of Scala anymore
if (flink_version.compareTo("1.15") >= 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction;
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
import org.apache.beam.runners.flink.translation.types.KvKeySelector;
import org.apache.beam.runners.flink.translation.utils.LargeRecordFilterFunction;
import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.fnexecution.wire.WireCoders;
Expand Down Expand Up @@ -92,6 +93,8 @@
import org.apache.flink.api.java.operators.SingleInputUdfOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A translator that translates bounded portable pipelines into executable Flink pipelines.
Expand Down Expand Up @@ -119,6 +122,8 @@
public class FlinkBatchPortablePipelineTranslator
implements FlinkPortablePipelineTranslator<
FlinkBatchPortablePipelineTranslator.BatchTranslationContext> {
private static final Logger LOG =
LoggerFactory.getLogger(FlinkBatchPortablePipelineTranslator.class);

/**
* Creates a batch translation context. The resulting Flink execution dag will live in a new
Expand Down Expand Up @@ -206,6 +211,7 @@ public FlinkPipelineOptions getPipelineOptions() {

@Override
public JobExecutionResult execute(String jobName) throws Exception {
LOG.info("Executing Flink batch job with name: {}", jobName);
return getExecutionEnvironment().execute(jobName);
}

Expand Down Expand Up @@ -515,8 +521,14 @@ private static <K, V> void translateGroupByKey(
TypeInformation<WindowedValue<KV<K, List<V>>>> partialReduceTypeInfo =
new CoderTypeInformation<>(outputCoder, context.getPipelineOptions());

///////////////////////// BEGIN GLEAN MODIFICATION ///////////////////////////////
LOG.info("Add step to filter large records before GroupBy");
DataSet<WindowedValue<KV<K, V>>> filteredDataSet =
inputDataSet.filter(new LargeRecordFilterFunction<>());

Grouping<WindowedValue<KV<K, V>>> inputGrouping =
inputDataSet.groupBy(new KvKeySelector<>(inputElementCoder.getKeyCoder()));
filteredDataSet.groupBy(new KvKeySelector<>(inputElementCoder.getKeyCoder()));
///////////////////////// END GLEAN MODIFICATION /////////////////////////////////

FlinkPartialReduceFunction<K, V, List<V>, ?> partialReduceFunction =
new FlinkPartialReduceFunction<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.beam.runners.flink.translation.worker.SdkWorkerStatusServer;
import org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext;
import org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory;
import org.apache.beam.runners.fnexecution.control.ExecutableStageContext;
import org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
Expand Down Expand Up @@ -49,6 +51,10 @@ public ExecutableStageContext get(JobInfo jobInfo) {
jobFactories.computeIfAbsent(
jobInfo.jobId(),
k -> {
if (DefaultJobBundleFactory.getEnableWorkerStatus(jobInfo)) {
SdkWorkerStatusServer.create();
}

return ReferenceCountingExecutableStageContextFactory.create(
DefaultExecutableStageContext::create,
// Clean up context immediately if its class is not loaded on Flink parent
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package org.apache.beam.runners.flink.translation.utils;

import java.util.List;
import org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.flink.api.common.functions.FilterFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* [Glean]
* FilterFunction that filters out large records based on a size threshold.
*/
public class LargeRecordFilterFunction<K, V> implements
FilterFunction<WindowedValue<KV<K, V>>> {
private static final Logger LOG =
LoggerFactory.getLogger(LargeRecordFilterFunction.class);
private static final long MAX_RECORD_SIZE = 5000000; // 5 MB

@Override
public boolean filter(WindowedValue<KV<K, V>> windowedValue) throws Exception {
KV<K, V> kv = windowedValue.getValue();
long size = getObjectSize(kv.getKey()) + getObjectSize(kv.getValue());
if (size >= MAX_RECORD_SIZE) {
LOG.warn("Dropping large record with size: {}", size);
return false;
}
return true;
}

/**
* Calculate the size of an object in bytes.
* This is a simplified version for objects used in portability.
*/
private static <T> long getObjectSize(T o) {
if (o instanceof byte[]) {
return ((byte[]) o).length;
} else if(o instanceof List) {
return ((List<?>) o).stream().mapToLong(LargeRecordFilterFunction::getObjectSize).sum();
} else {
return 0; // for other types, we don't calculate size
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.flink.translation.worker;

import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;
import dev.failsafe.Failsafe;
import dev.failsafe.RetryPolicy;
import java.io.IOException;
import java.io.OutputStream;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.beam.runners.fnexecution.status.BeamWorkerStatusGrpcService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Servlet dedicated to provide live status info retrieved from SDK Harness. */
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class SdkWorkerStatusServer {
private static final Logger LOG = LoggerFactory.getLogger(SdkWorkerStatusServer.class);
private static final int BASE_PORT = 5100;
private static final AtomicBoolean started = new AtomicBoolean(false);

public static void create() {
if (started.compareAndSet(false, true)) {
AtomicInteger attempt = new AtomicInteger(0);

RetryPolicy<HttpServer> retryPolicy =
RetryPolicy.<HttpServer>builder()
.handle(BindException.class, IOException.class)
.withBackoff(
Duration.ofMillis(500), Duration.ofSeconds(10), 2.0) // exponential backoff
.withMaxAttempts(5)
.onFailedAttempt(
e -> {
int port = BASE_PORT + attempt.get() - 1;
LOG.warn(
"Failed to bind to port "
+ port
+ ": "
+ e.getLastException().getMessage());
})
.onRetriesExceeded(e -> LOG.warn("Exceeded max retries. Giving up."))
.build();

try {
HttpServer server =
Failsafe.with(retryPolicy)
.get(
() -> {
int port = BASE_PORT + attempt.getAndIncrement();
HttpServer httpServer = HttpServer.create(new InetSocketAddress(port), 0);
// Define a context (path) and handler
httpServer.createContext("/workerstatus", new WorkerStatusHandler());
// Start the server
httpServer.setExecutor(null); // creates a default executor
httpServer.start();
LOG.info(
"SdkWorkerStatusServer started at " + httpServer.getAddress().toString());
return httpServer;
});

// Add shutdown hook
Runtime.getRuntime()
.addShutdownHook(
new Thread(
() -> {
LOG.info("Shutting down SdkWorkerStatusServer...");
server.stop(0);
}));
} catch (Exception e) {
LOG.warn("Fail to start SdkWorkerStatusServer.");
}
}
}

// Simple handler
static class WorkerStatusHandler implements HttpHandler {
@Override
public void handle(HttpExchange exchange) throws IOException {
try {
BeamWorkerStatusGrpcService statusGrpcService = BeamWorkerStatusGrpcService.getInstance();
if (statusGrpcService != null) {
Map<String, String> allStatuses =
statusGrpcService.getAllWorkerStatuses(10, TimeUnit.SECONDS);
StringBuilder sb = new StringBuilder();
for (Map.Entry<String, String> entry : allStatuses.entrySet()) {
sb.append(entry.getKey());
sb.append("\n");
sb.append(entry.getValue());
sb.append("\n");
}

String response = sb.toString();
exchange.sendResponseHeaders(200, response.length());
try (OutputStream os = exchange.getResponseBody()) {
os.write(response.getBytes(StandardCharsets.UTF_8));
}
} else {
LOG.info("BeamWorkerStatusGrpcService is not running.");
}
} catch (Exception e) {
LOG.warn("Exception when handling workerStatusRequest " + e.getMessage());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.model.fnexecution.v1.ProvisionApi;
import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
import org.apache.beam.model.pipeline.v1.RunnerApi.StandardEnvironments;
import org.apache.beam.model.pipeline.v1.RunnerApi.StandardRunnerProtocols;
Expand All @@ -52,6 +53,7 @@
import org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionService;
import org.apache.beam.runners.fnexecution.state.GrpcStateService;
import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.runners.fnexecution.status.BeamWorkerStatusGrpcService;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.IdGenerator;
import org.apache.beam.sdk.fn.IdGenerators;
Expand Down Expand Up @@ -265,6 +267,12 @@ public WrappedSdkHarnessClient load(Environment environment) throws Exception {
return caches.build();
}

public static boolean getEnableWorkerStatus(JobInfo jobInfo) {
PipelineOptions pipelineOptions =
PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions());
return pipelineOptions.as(PortablePipelineOptions.class).getEnableWorkerStatus();
}

private static int getEnvironmentExpirationMillis(JobInfo jobInfo) {
PipelineOptions pipelineOptions =
PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions());
Expand Down Expand Up @@ -692,6 +700,19 @@ private ServerInfo createServerInfo(JobInfo jobInfo, ServerFactory serverFactory
provisionInfo.setControlEndpoint(controlServer.getApiServiceDescriptor());
provisionInfo.addRunnerCapabilities(
BeamUrns.getUrn(StandardRunnerProtocols.Enum.CONTROL_RESPONSE_ELEMENTS_EMBEDDING));

if (getEnableWorkerStatus(jobInfo)) {
GrpcFnServer<BeamWorkerStatusGrpcService> workerStatusServer =
GrpcFnServer.allocatePortAndCreateFor(
BeamWorkerStatusGrpcService.create(
ApiServiceDescriptor.getDefaultInstance(),
GrpcContextHeaderAccessorProvider.getHeaderAccessor()),
serverFactory);

provisionInfo.setStatusEndpoint(workerStatusServer.getApiServiceDescriptor());
}

LOG.info("FnServer provision info: " + provisionInfo.build());
GrpcFnServer<StaticGrpcProvisionService> provisioningServer =
GrpcFnServer.allocatePortAndCreateFor(
StaticGrpcProvisionService.create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -57,6 +58,8 @@ public class BeamWorkerStatusGrpcService extends BeamFnWorkerStatusImplBase impl
Collections.synchronizedMap(new HashMap<>());
private final AtomicBoolean isClosed = new AtomicBoolean();

private static @Nullable BeamWorkerStatusGrpcService CURRENT_INSTANCE = null;

private BeamWorkerStatusGrpcService(
ApiServiceDescriptor apiServiceDescriptor, HeaderAccessor headerAccessor) {
this.headerAccessor = headerAccessor;
Expand All @@ -73,7 +76,13 @@ private BeamWorkerStatusGrpcService(
*/
public static BeamWorkerStatusGrpcService create(
ApiServiceDescriptor apiServiceDescriptor, HeaderAccessor headerAccessor) {
return new BeamWorkerStatusGrpcService(apiServiceDescriptor, headerAccessor);
CURRENT_INSTANCE = new BeamWorkerStatusGrpcService(apiServiceDescriptor, headerAccessor);
return CURRENT_INSTANCE;
}

/** Glean: return the current instance of {@link BeamWorkerStatusGrpcService}. */
public static @Nullable BeamWorkerStatusGrpcService getInstance() {
return CURRENT_INSTANCE;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,4 +140,11 @@ static String getEnvironmentOption(
boolean getEnableHeapDumps();

void setEnableHeapDumps(boolean enableHeapDumps);

@Description(
"If {@literal true} and PipelineOption tempLocation is set, show worker status before shutting")
@Default.Boolean(false)
boolean getEnableWorkerStatus();

void setEnableWorkerStatus(boolean enableWorkerStatus);
}
Loading
Loading