diff --git a/runners/flink/flink_runner.gradle b/runners/flink/flink_runner.gradle index c8f492a901d3..b3e0e984ba57 100644 --- a/runners/flink/flink_runner.gradle +++ b/runners/flink/flink_runner.gradle @@ -172,6 +172,7 @@ dependencies { implementation library.java.slf4j_api implementation library.java.joda_time implementation library.java.args4j + implementation library.java.failsafe // Flink 1.15 shades all remaining scala dependencies and therefor does not depend on a specific version of Scala anymore if (flink_version.compareTo("1.15") >= 0) { diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageContextFactory.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageContextFactory.java index a2e3bb3142bf..4a0fb5f7c919 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageContextFactory.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageContextFactory.java @@ -19,7 +19,9 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import org.apache.beam.runners.flink.translation.worker.SdkWorkerStatusServer; import org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext; +import org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory; import org.apache.beam.runners.fnexecution.control.ExecutableStageContext; import org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory; import org.apache.beam.runners.fnexecution.provisioning.JobInfo; @@ -49,6 +51,10 @@ public ExecutableStageContext get(JobInfo jobInfo) { jobFactories.computeIfAbsent( jobInfo.jobId(), k -> { + if (DefaultJobBundleFactory.getEnableWorkerStatus(jobInfo)) { + SdkWorkerStatusServer.create(); + } + return ReferenceCountingExecutableStageContextFactory.create( DefaultExecutableStageContext::create, // Clean up context immediately if its class is not loaded on Flink parent diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/worker/SdkWorkerStatusServer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/worker/SdkWorkerStatusServer.java new file mode 100644 index 000000000000..ef80778e4f9e --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/worker/SdkWorkerStatusServer.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.worker; + +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; +import com.sun.net.httpserver.HttpServer; +import dev.failsafe.Failsafe; +import dev.failsafe.RetryPolicy; +import java.io.IOException; +import java.io.OutputStream; +import java.net.BindException; +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.beam.runners.fnexecution.status.BeamWorkerStatusGrpcService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Servlet dedicated to provide live status info retrieved from SDK Harness. */ +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +public class SdkWorkerStatusServer { + private static final Logger LOG = LoggerFactory.getLogger(SdkWorkerStatusServer.class); + private static final int BASE_PORT = 5100; + private static final AtomicBoolean started = new AtomicBoolean(false); + + public static void create() { + if (started.compareAndSet(false, true)) { + AtomicInteger attempt = new AtomicInteger(0); + + RetryPolicy retryPolicy = + RetryPolicy.builder() + .handle(BindException.class, IOException.class) + .withBackoff( + Duration.ofMillis(500), Duration.ofSeconds(10), 2.0) // exponential backoff + .withMaxAttempts(5) + .onFailedAttempt( + e -> { + int port = BASE_PORT + attempt.get() - 1; + LOG.warn( + "Failed to bind to port " + + port + + ": " + + e.getLastException().getMessage()); + }) + .onRetriesExceeded(e -> LOG.warn("Exceeded max retries. Giving up.")) + .build(); + + try { + HttpServer server = + Failsafe.with(retryPolicy) + .get( + () -> { + int port = BASE_PORT + attempt.getAndIncrement(); + HttpServer httpServer = HttpServer.create(new InetSocketAddress(port), 0); + // Define a context (path) and handler + httpServer.createContext("/workerstatus", new WorkerStatusHandler()); + // Start the server + httpServer.setExecutor(null); // creates a default executor + httpServer.start(); + LOG.info( + "SdkWorkerStatusServer started at " + httpServer.getAddress().toString()); + return httpServer; + }); + + // Add shutdown hook + Runtime.getRuntime() + .addShutdownHook( + new Thread( + () -> { + LOG.info("Shutting down SdkWorkerStatusServer..."); + server.stop(0); + })); + } catch (Exception e) { + LOG.warn("Fail to start SdkWorkerStatusServer."); + } + } + } + + // Simple handler + static class WorkerStatusHandler implements HttpHandler { + @Override + public void handle(HttpExchange exchange) throws IOException { + try { + BeamWorkerStatusGrpcService statusGrpcService = BeamWorkerStatusGrpcService.getInstance(); + if (statusGrpcService != null) { + Map allStatuses = + statusGrpcService.getAllWorkerStatuses(10, TimeUnit.SECONDS); + StringBuilder sb = new StringBuilder(); + for (Map.Entry entry : allStatuses.entrySet()) { + sb.append(entry.getKey()); + sb.append("\n"); + sb.append(entry.getValue()); + sb.append("\n"); + } + + String response = sb.toString(); + exchange.sendResponseHeaders(200, response.length()); + try (OutputStream os = exchange.getResponseBody()) { + os.write(response.getBytes(StandardCharsets.UTF_8)); + } + } else { + LOG.info("BeamWorkerStatusGrpcService is not running."); + } + } catch (Exception e) { + LOG.warn("Exception when handling workerStatusRequest " + e.getMessage()); + } + } + } +} diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java index da5da6c9d603..9a65b8cbcba7 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java @@ -32,6 +32,7 @@ import java.util.concurrent.locks.ReentrantLock; import javax.annotation.concurrent.ThreadSafe; import org.apache.beam.model.fnexecution.v1.ProvisionApi; +import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor; import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; import org.apache.beam.model.pipeline.v1.RunnerApi.StandardEnvironments; import org.apache.beam.model.pipeline.v1.RunnerApi.StandardRunnerProtocols; @@ -52,6 +53,7 @@ import org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionService; import org.apache.beam.runners.fnexecution.state.GrpcStateService; import org.apache.beam.runners.fnexecution.state.StateRequestHandler; +import org.apache.beam.runners.fnexecution.status.BeamWorkerStatusGrpcService; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.fn.IdGenerator; import org.apache.beam.sdk.fn.IdGenerators; @@ -265,6 +267,12 @@ public WrappedSdkHarnessClient load(Environment environment) throws Exception { return caches.build(); } + public static boolean getEnableWorkerStatus(JobInfo jobInfo) { + PipelineOptions pipelineOptions = + PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions()); + return pipelineOptions.as(PortablePipelineOptions.class).getEnableWorkerStatus(); + } + private static int getEnvironmentExpirationMillis(JobInfo jobInfo) { PipelineOptions pipelineOptions = PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions()); @@ -692,6 +700,19 @@ private ServerInfo createServerInfo(JobInfo jobInfo, ServerFactory serverFactory provisionInfo.setControlEndpoint(controlServer.getApiServiceDescriptor()); provisionInfo.addRunnerCapabilities( BeamUrns.getUrn(StandardRunnerProtocols.Enum.CONTROL_RESPONSE_ELEMENTS_EMBEDDING)); + + if (getEnableWorkerStatus(jobInfo)) { + GrpcFnServer workerStatusServer = + GrpcFnServer.allocatePortAndCreateFor( + BeamWorkerStatusGrpcService.create( + ApiServiceDescriptor.getDefaultInstance(), + GrpcContextHeaderAccessorProvider.getHeaderAccessor()), + serverFactory); + + provisionInfo.setStatusEndpoint(workerStatusServer.getApiServiceDescriptor()); + } + + LOG.info("FnServer provision info: " + provisionInfo.build()); GrpcFnServer provisioningServer = GrpcFnServer.allocatePortAndCreateFor( StaticGrpcProvisionService.create( diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/status/BeamWorkerStatusGrpcService.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/status/BeamWorkerStatusGrpcService.java index 68ca36f8a369..3eb5007fce12 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/status/BeamWorkerStatusGrpcService.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/status/BeamWorkerStatusGrpcService.java @@ -39,6 +39,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; +import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,6 +58,8 @@ public class BeamWorkerStatusGrpcService extends BeamFnWorkerStatusImplBase impl Collections.synchronizedMap(new HashMap<>()); private final AtomicBoolean isClosed = new AtomicBoolean(); + private static @Nullable BeamWorkerStatusGrpcService CURRENT_INSTANCE = null; + private BeamWorkerStatusGrpcService( ApiServiceDescriptor apiServiceDescriptor, HeaderAccessor headerAccessor) { this.headerAccessor = headerAccessor; @@ -73,7 +76,13 @@ private BeamWorkerStatusGrpcService( */ public static BeamWorkerStatusGrpcService create( ApiServiceDescriptor apiServiceDescriptor, HeaderAccessor headerAccessor) { - return new BeamWorkerStatusGrpcService(apiServiceDescriptor, headerAccessor); + CURRENT_INSTANCE = new BeamWorkerStatusGrpcService(apiServiceDescriptor, headerAccessor); + return CURRENT_INSTANCE; + } + + /** Glean: return the current instance of {@link BeamWorkerStatusGrpcService}. */ + public static @Nullable BeamWorkerStatusGrpcService getInstance() { + return CURRENT_INSTANCE; } @Override diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java index ec82aebbef66..8d877bf2b484 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java @@ -140,4 +140,11 @@ static String getEnvironmentOption( boolean getEnableHeapDumps(); void setEnableHeapDumps(boolean enableHeapDumps); + + @Description( + "If {@literal true} and PipelineOption tempLocation is set, show worker status before shutting") + @Default.Boolean(false) + boolean getEnableWorkerStatus(); + + void setEnableWorkerStatus(boolean enableWorkerStatus); }