Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions runners/flink/flink_runner.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ dependencies {
implementation library.java.slf4j_api
implementation library.java.joda_time
implementation library.java.args4j
implementation library.java.failsafe

// Flink 1.15 shades all remaining scala dependencies and therefor does not depend on a specific version of Scala anymore
if (flink_version.compareTo("1.15") >= 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.beam.runners.flink.translation.worker.SdkWorkerStatusServer;
import org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext;
import org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory;
import org.apache.beam.runners.fnexecution.control.ExecutableStageContext;
import org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
Expand Down Expand Up @@ -49,6 +51,10 @@ public ExecutableStageContext get(JobInfo jobInfo) {
jobFactories.computeIfAbsent(
jobInfo.jobId(),
k -> {
if (DefaultJobBundleFactory.getEnableWorkerStatus(jobInfo)) {
SdkWorkerStatusServer.create();
}

Comment on lines +54 to +57

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for learning's sake: can you explain why we initialize the server here?

My understanding is this: FlinkExecutableStageContextFactory is called upon initialization of an ExecutableStageDoFnOperator on the taskmanager. Ie, each time the taskmanager loads an operator (which encapsulates a series of beam transformations ie. executable stage)

SdkWorkerStatusServer is a wrapper that invokes BeamWorkerStatusGrpcService which is connected to grpc clients on the sdk harnesses and will use that to fetch + aggregate their statuses. (Instrumenting the status grpc service is something that needs to be done in future PRs as well)

The changes in DefaultJobBundleFactory below are to initialize the resources that the FlinkExecutableStageContextFactory will rely on (as job bundle is initialized before executable stage operators).

Is that right?

Copy link
Author

@xinyu-liu-glean xinyu-liu-glean Jun 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. The server is started during the creatiion of the ExecutableStageContext.Factory for this jobId. Based on the javadoc above ("This map should only ever have a single element..."), there should be only a single factory so the server should be created once. I also added some logic in the server to be safe. Since this server is only used when we run Beam Flink portable runner, I added to the FlinkExecutableStageContextFactory, instead of the common portablity part. such as DefaultJobBundleFactory or DefaultExecutableStageContext.

The changes in DefaultJobBundleFactory below is to create the WorkerStatus grpc server so we can fetch the worker statuf from there. Once the status grpc url is set into provision info, the python worker side should automatically connect to the status server. I am going to test this out to verify we can get worker status from the server endpoint.

Copy link

@steve-scio steve-scio Jun 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks! one follow up question if the boundary between DefaultJobBundleFactory vs ExecutableStageContextFactory is that the former is for all jobs while the latter is for portability framework jobs only, then theoretically today for non-portability jobs we'd be creating the GRPC server but never using it. is that right? (non blocking, given we don't use this custom fork for java jobs atm)

Copy link
Author

@xinyu-liu-glean xinyu-liu-glean Jun 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My experience is that DefaultJobBundleFactory is only used for portability framework too. The difference is that DefaultJobBundleFactory is used across runners while FlinkExecutableStageContextFactory is only used by Flink. For Java pipelines, Flink runs a different runner (FlinkRunner) instead of FlinkPipelineRunner (portability). For all open source runners, they have both java runner and portability runner. I am not sure about dataflow though. Do you know whether Google converged into a single (portable) runner?

The cost of adding the grpc server and http server without much traffic should be pretty small. I am thinking whether we can add this by default so we can do profiling anytime. Right now I use a pipelineoption to gate it. But it seems pretty cumbersome to pass in the option and restart the pipeline.

return ReferenceCountingExecutableStageContextFactory.create(
DefaultExecutableStageContext::create,
// Clean up context immediately if its class is not loaded on Flink parent
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.flink.translation.worker;

import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;
import dev.failsafe.Failsafe;
import dev.failsafe.RetryPolicy;
import java.io.IOException;
import java.io.OutputStream;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.beam.runners.fnexecution.status.BeamWorkerStatusGrpcService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Servlet dedicated to provide live status info retrieved from SDK Harness. */
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class SdkWorkerStatusServer {
private static final Logger LOG = LoggerFactory.getLogger(SdkWorkerStatusServer.class);
private static final int BASE_PORT = 5100;
private static final AtomicBoolean started = new AtomicBoolean(false);

public static void create() {
if (started.compareAndSet(false, true)) {
AtomicInteger attempt = new AtomicInteger(0);

RetryPolicy<HttpServer> retryPolicy =
RetryPolicy.<HttpServer>builder()
.handle(BindException.class, IOException.class)
.withBackoff(
Duration.ofMillis(500), Duration.ofSeconds(10), 2.0) // exponential backoff
.withMaxAttempts(5)
.onFailedAttempt(
e -> {
int port = BASE_PORT + attempt.get() - 1;
LOG.warn(
"Failed to bind to port "
+ port
+ ": "
+ e.getLastException().getMessage());
})
.onRetriesExceeded(e -> LOG.warn("Exceeded max retries. Giving up."))
.build();

try {
HttpServer server =
Failsafe.with(retryPolicy)
.get(
() -> {
int port = BASE_PORT + attempt.getAndIncrement();
HttpServer httpServer = HttpServer.create(new InetSocketAddress(port), 0);
// Define a context (path) and handler
httpServer.createContext("/workerstatus", new WorkerStatusHandler());
// Start the server
httpServer.setExecutor(null); // creates a default executor
httpServer.start();
LOG.info(
"SdkWorkerStatusServer started at " + httpServer.getAddress().toString());
return httpServer;
});

// Add shutdown hook
Runtime.getRuntime()
.addShutdownHook(
new Thread(
() -> {
LOG.info("Shutting down SdkWorkerStatusServer...");
server.stop(0);
}));
} catch (Exception e) {
LOG.warn("Fail to start SdkWorkerStatusServer.");
}
}
}

// Simple handler
static class WorkerStatusHandler implements HttpHandler {
@Override
public void handle(HttpExchange exchange) throws IOException {
try {
BeamWorkerStatusGrpcService statusGrpcService = BeamWorkerStatusGrpcService.getInstance();
if (statusGrpcService != null) {
Map<String, String> allStatuses =
statusGrpcService.getAllWorkerStatuses(10, TimeUnit.SECONDS);
StringBuilder sb = new StringBuilder();
for (Map.Entry<String, String> entry : allStatuses.entrySet()) {
sb.append(entry.getKey());
sb.append("\n");
sb.append(entry.getValue());
sb.append("\n");
}

String response = sb.toString();
exchange.sendResponseHeaders(200, response.length());
try (OutputStream os = exchange.getResponseBody()) {
os.write(response.getBytes(StandardCharsets.UTF_8));
}
} else {
LOG.info("BeamWorkerStatusGrpcService is not running.");
}
} catch (Exception e) {
LOG.warn("Exception when handling workerStatusRequest " + e.getMessage());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.model.fnexecution.v1.ProvisionApi;
import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
import org.apache.beam.model.pipeline.v1.RunnerApi.StandardEnvironments;
import org.apache.beam.model.pipeline.v1.RunnerApi.StandardRunnerProtocols;
Expand All @@ -52,6 +53,7 @@
import org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionService;
import org.apache.beam.runners.fnexecution.state.GrpcStateService;
import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.runners.fnexecution.status.BeamWorkerStatusGrpcService;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.IdGenerator;
import org.apache.beam.sdk.fn.IdGenerators;
Expand Down Expand Up @@ -265,6 +267,12 @@ public WrappedSdkHarnessClient load(Environment environment) throws Exception {
return caches.build();
}

public static boolean getEnableWorkerStatus(JobInfo jobInfo) {
PipelineOptions pipelineOptions =
PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions());
return pipelineOptions.as(PortablePipelineOptions.class).getEnableWorkerStatus();
}

private static int getEnvironmentExpirationMillis(JobInfo jobInfo) {
PipelineOptions pipelineOptions =
PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions());
Expand Down Expand Up @@ -692,6 +700,19 @@ private ServerInfo createServerInfo(JobInfo jobInfo, ServerFactory serverFactory
provisionInfo.setControlEndpoint(controlServer.getApiServiceDescriptor());
provisionInfo.addRunnerCapabilities(
BeamUrns.getUrn(StandardRunnerProtocols.Enum.CONTROL_RESPONSE_ELEMENTS_EMBEDDING));

if (getEnableWorkerStatus(jobInfo)) {
GrpcFnServer<BeamWorkerStatusGrpcService> workerStatusServer =
GrpcFnServer.allocatePortAndCreateFor(
BeamWorkerStatusGrpcService.create(
ApiServiceDescriptor.getDefaultInstance(),
GrpcContextHeaderAccessorProvider.getHeaderAccessor()),
serverFactory);

provisionInfo.setStatusEndpoint(workerStatusServer.getApiServiceDescriptor());
}

LOG.info("FnServer provision info: " + provisionInfo.build());
GrpcFnServer<StaticGrpcProvisionService> provisioningServer =
GrpcFnServer.allocatePortAndCreateFor(
StaticGrpcProvisionService.create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -57,6 +58,8 @@ public class BeamWorkerStatusGrpcService extends BeamFnWorkerStatusImplBase impl
Collections.synchronizedMap(new HashMap<>());
private final AtomicBoolean isClosed = new AtomicBoolean();

private static @Nullable BeamWorkerStatusGrpcService CURRENT_INSTANCE = null;

private BeamWorkerStatusGrpcService(
ApiServiceDescriptor apiServiceDescriptor, HeaderAccessor headerAccessor) {
this.headerAccessor = headerAccessor;
Expand All @@ -73,7 +76,13 @@ private BeamWorkerStatusGrpcService(
*/
public static BeamWorkerStatusGrpcService create(
ApiServiceDescriptor apiServiceDescriptor, HeaderAccessor headerAccessor) {
return new BeamWorkerStatusGrpcService(apiServiceDescriptor, headerAccessor);
CURRENT_INSTANCE = new BeamWorkerStatusGrpcService(apiServiceDescriptor, headerAccessor);
return CURRENT_INSTANCE;
}

/** Glean: return the current instance of {@link BeamWorkerStatusGrpcService}. */
public static @Nullable BeamWorkerStatusGrpcService getInstance() {
return CURRENT_INSTANCE;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,4 +140,11 @@ static String getEnvironmentOption(
boolean getEnableHeapDumps();

void setEnableHeapDumps(boolean enableHeapDumps);

@Description(
"If {@literal true} and PipelineOption tempLocation is set, show worker status before shutting")
@Default.Boolean(false)
boolean getEnableWorkerStatus();

void setEnableWorkerStatus(boolean enableWorkerStatus);
}
Loading