forked from apache/beam
-
Notifications
You must be signed in to change notification settings - Fork 0
Initial support of querying sdk worker status #3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
xinyu-liu-glean
wants to merge
2
commits into
timmy-2.59
Choose a base branch
from
sdk-worker-status
base: timmy-2.59
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
131 changes: 131 additions & 0 deletions
131
...src/main/java/org/apache/beam/runners/flink/translation/worker/SdkWorkerStatusServer.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,131 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.beam.runners.flink.translation.worker; | ||
|
|
||
| import com.sun.net.httpserver.HttpExchange; | ||
| import com.sun.net.httpserver.HttpHandler; | ||
| import com.sun.net.httpserver.HttpServer; | ||
| import dev.failsafe.Failsafe; | ||
| import dev.failsafe.RetryPolicy; | ||
| import java.io.IOException; | ||
| import java.io.OutputStream; | ||
| import java.net.BindException; | ||
| import java.net.InetSocketAddress; | ||
| import java.nio.charset.StandardCharsets; | ||
| import java.time.Duration; | ||
| import java.util.Map; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
| import org.apache.beam.runners.fnexecution.status.BeamWorkerStatusGrpcService; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| /** Servlet dedicated to provide live status info retrieved from SDK Harness. */ | ||
| @SuppressWarnings({ | ||
| "nullness" // TODO(https://github.com/apache/beam/issues/20497) | ||
| }) | ||
| public class SdkWorkerStatusServer { | ||
| private static final Logger LOG = LoggerFactory.getLogger(SdkWorkerStatusServer.class); | ||
| private static final int BASE_PORT = 5100; | ||
| private static final AtomicBoolean started = new AtomicBoolean(false); | ||
|
|
||
| public static void create() { | ||
| if (started.compareAndSet(false, true)) { | ||
| AtomicInteger attempt = new AtomicInteger(0); | ||
|
|
||
| RetryPolicy<HttpServer> retryPolicy = | ||
| RetryPolicy.<HttpServer>builder() | ||
| .handle(BindException.class, IOException.class) | ||
| .withBackoff( | ||
| Duration.ofMillis(500), Duration.ofSeconds(10), 2.0) // exponential backoff | ||
| .withMaxAttempts(5) | ||
| .onFailedAttempt( | ||
| e -> { | ||
| int port = BASE_PORT + attempt.get() - 1; | ||
| LOG.warn( | ||
| "Failed to bind to port " | ||
| + port | ||
| + ": " | ||
| + e.getLastException().getMessage()); | ||
| }) | ||
| .onRetriesExceeded(e -> LOG.warn("Exceeded max retries. Giving up.")) | ||
| .build(); | ||
|
|
||
| try { | ||
| HttpServer server = | ||
| Failsafe.with(retryPolicy) | ||
| .get( | ||
| () -> { | ||
| int port = BASE_PORT + attempt.getAndIncrement(); | ||
| HttpServer httpServer = HttpServer.create(new InetSocketAddress(port), 0); | ||
| // Define a context (path) and handler | ||
| httpServer.createContext("/workerstatus", new WorkerStatusHandler()); | ||
| // Start the server | ||
| httpServer.setExecutor(null); // creates a default executor | ||
| httpServer.start(); | ||
| LOG.info( | ||
| "SdkWorkerStatusServer started at " + httpServer.getAddress().toString()); | ||
| return httpServer; | ||
| }); | ||
|
|
||
| // Add shutdown hook | ||
| Runtime.getRuntime() | ||
| .addShutdownHook( | ||
| new Thread( | ||
| () -> { | ||
| LOG.info("Shutting down SdkWorkerStatusServer..."); | ||
| server.stop(0); | ||
| })); | ||
| } catch (Exception e) { | ||
| LOG.warn("Fail to start SdkWorkerStatusServer."); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Simple handler | ||
| static class WorkerStatusHandler implements HttpHandler { | ||
| @Override | ||
| public void handle(HttpExchange exchange) throws IOException { | ||
| try { | ||
| BeamWorkerStatusGrpcService statusGrpcService = BeamWorkerStatusGrpcService.getInstance(); | ||
| if (statusGrpcService != null) { | ||
| Map<String, String> allStatuses = | ||
| statusGrpcService.getAllWorkerStatuses(10, TimeUnit.SECONDS); | ||
| StringBuilder sb = new StringBuilder(); | ||
| for (Map.Entry<String, String> entry : allStatuses.entrySet()) { | ||
| sb.append(entry.getKey()); | ||
| sb.append("\n"); | ||
| sb.append(entry.getValue()); | ||
| sb.append("\n"); | ||
| } | ||
|
|
||
| String response = sb.toString(); | ||
| exchange.sendResponseHeaders(200, response.length()); | ||
| try (OutputStream os = exchange.getResponseBody()) { | ||
| os.write(response.getBytes(StandardCharsets.UTF_8)); | ||
| } | ||
| } else { | ||
| LOG.info("BeamWorkerStatusGrpcService is not running."); | ||
| } | ||
| } catch (Exception e) { | ||
| LOG.warn("Exception when handling workerStatusRequest " + e.getMessage()); | ||
| } | ||
| } | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for learning's sake: can you explain why we initialize the server here?
My understanding is this: FlinkExecutableStageContextFactory is called upon initialization of an ExecutableStageDoFnOperator on the taskmanager. Ie, each time the taskmanager loads an operator (which encapsulates a series of beam transformations ie. executable stage)
SdkWorkerStatusServer is a wrapper that invokes
BeamWorkerStatusGrpcServicewhich is connected to grpc clients on the sdk harnesses and will use that to fetch + aggregate their statuses. (Instrumenting the status grpc service is something that needs to be done in future PRs as well)The changes in DefaultJobBundleFactory below are to initialize the resources that the FlinkExecutableStageContextFactory will rely on (as job bundle is initialized before executable stage operators).
Is that right?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question. The server is started during the creatiion of the
ExecutableStageContext.Factoryfor this jobId. Based on the javadoc above ("This map should only ever have a single element..."), there should be only a single factory so the server should be created once. I also added some logic in the server to be safe. Since this server is only used when we run Beam Flink portable runner, I added to theFlinkExecutableStageContextFactory, instead of the common portablity part. such asDefaultJobBundleFactoryorDefaultExecutableStageContext.The changes in
DefaultJobBundleFactorybelow is to create the WorkerStatus grpc server so we can fetch the worker statuf from there. Once the status grpc url is set into provision info, the python worker side should automatically connect to the status server. I am going to test this out to verify we can get worker status from the server endpoint.Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks! one follow up question if the boundary between DefaultJobBundleFactory vs ExecutableStageContextFactory is that the former is for all jobs while the latter is for portability framework jobs only, then theoretically today for non-portability jobs we'd be creating the GRPC server but never using it. is that right? (non blocking, given we don't use this custom fork for java jobs atm)
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My experience is that DefaultJobBundleFactory is only used for portability framework too. The difference is that DefaultJobBundleFactory is used across runners while FlinkExecutableStageContextFactory is only used by Flink. For Java pipelines, Flink runs a different runner (FlinkRunner) instead of FlinkPipelineRunner (portability). For all open source runners, they have both java runner and portability runner. I am not sure about dataflow though. Do you know whether Google converged into a single (portable) runner?
The cost of adding the grpc server and http server without much traffic should be pretty small. I am thinking whether we can add this by default so we can do profiling anytime. Right now I use a pipelineoption to gate it. But it seems pretty cumbersome to pass in the option and restart the pipeline.