diff --git a/runners/flink/flink_runner.gradle b/runners/flink/flink_runner.gradle index c8f492a901d3..b3e0e984ba57 100644 --- a/runners/flink/flink_runner.gradle +++ b/runners/flink/flink_runner.gradle @@ -172,6 +172,7 @@ dependencies { implementation library.java.slf4j_api implementation library.java.joda_time implementation library.java.args4j + implementation library.java.failsafe // Flink 1.15 shades all remaining scala dependencies and therefor does not depend on a specific version of Scala anymore if (flink_version.compareTo("1.15") >= 0) { diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java index 4d9a5a516c75..19c943b67747 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java @@ -46,6 +46,7 @@ import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction; import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; import org.apache.beam.runners.flink.translation.types.KvKeySelector; +import org.apache.beam.runners.flink.translation.utils.LargeRecordFilterFunction; import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat; import org.apache.beam.runners.fnexecution.provisioning.JobInfo; import org.apache.beam.runners.fnexecution.wire.WireCoders; @@ -92,6 +93,8 @@ import org.apache.flink.api.java.operators.SingleInputUdfOperator; import org.apache.flink.api.java.operators.UnsortedGrouping; import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A translator that translates bounded portable pipelines into executable Flink pipelines. @@ -119,6 +122,8 @@ public class FlinkBatchPortablePipelineTranslator implements FlinkPortablePipelineTranslator< FlinkBatchPortablePipelineTranslator.BatchTranslationContext> { + private static final Logger LOG = + LoggerFactory.getLogger(FlinkBatchPortablePipelineTranslator.class); /** * Creates a batch translation context. The resulting Flink execution dag will live in a new @@ -206,6 +211,7 @@ public FlinkPipelineOptions getPipelineOptions() { @Override public JobExecutionResult execute(String jobName) throws Exception { + LOG.info("Executing Flink batch job with name: {}", jobName); return getExecutionEnvironment().execute(jobName); } @@ -515,8 +521,14 @@ private static void translateGroupByKey( TypeInformation>>> partialReduceTypeInfo = new CoderTypeInformation<>(outputCoder, context.getPipelineOptions()); + ///////////////////////// BEGIN GLEAN MODIFICATION /////////////////////////////// + LOG.info("Add step to filter large records before GroupBy"); + DataSet>> filteredDataSet = + inputDataSet.filter(new LargeRecordFilterFunction<>()); + Grouping>> inputGrouping = - inputDataSet.groupBy(new KvKeySelector<>(inputElementCoder.getKeyCoder())); + filteredDataSet.groupBy(new KvKeySelector<>(inputElementCoder.getKeyCoder())); + ///////////////////////// END GLEAN MODIFICATION ///////////////////////////////// FlinkPartialReduceFunction, ?> partialReduceFunction = new FlinkPartialReduceFunction<>( diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageContextFactory.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageContextFactory.java index a2e3bb3142bf..4a0fb5f7c919 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageContextFactory.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageContextFactory.java @@ -19,7 +19,9 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import org.apache.beam.runners.flink.translation.worker.SdkWorkerStatusServer; import org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext; +import org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory; import org.apache.beam.runners.fnexecution.control.ExecutableStageContext; import org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory; import org.apache.beam.runners.fnexecution.provisioning.JobInfo; @@ -49,6 +51,10 @@ public ExecutableStageContext get(JobInfo jobInfo) { jobFactories.computeIfAbsent( jobInfo.jobId(), k -> { + if (DefaultJobBundleFactory.getEnableWorkerStatus(jobInfo)) { + SdkWorkerStatusServer.create(); + } + return ReferenceCountingExecutableStageContextFactory.create( DefaultExecutableStageContext::create, // Clean up context immediately if its class is not loaded on Flink parent diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/LargeRecordFilterFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/LargeRecordFilterFunction.java new file mode 100644 index 000000000000..01fd2f831bbd --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/LargeRecordFilterFunction.java @@ -0,0 +1,45 @@ +package org.apache.beam.runners.flink.translation.utils; + +import java.util.List; +import org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.apache.flink.api.common.functions.FilterFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * [Glean] + * FilterFunction that filters out large records based on a size threshold. + */ +public class LargeRecordFilterFunction implements + FilterFunction>> { + private static final Logger LOG = + LoggerFactory.getLogger(LargeRecordFilterFunction.class); + private static final long MAX_RECORD_SIZE = 5000000; // 5 MB + + @Override + public boolean filter(WindowedValue> windowedValue) throws Exception { + KV kv = windowedValue.getValue(); + long size = getObjectSize(kv.getKey()) + getObjectSize(kv.getValue()); + if (size >= MAX_RECORD_SIZE) { + LOG.warn("Dropping large record with size: {}", size); + return false; + } + return true; + } + + /** + * Calculate the size of an object in bytes. + * This is a simplified version for objects used in portability. + */ + private static long getObjectSize(T o) { + if (o instanceof byte[]) { + return ((byte[]) o).length; + } else if(o instanceof List) { + return ((List) o).stream().mapToLong(LargeRecordFilterFunction::getObjectSize).sum(); + } else { + return 0; // for other types, we don't calculate size + } + } +} diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/worker/SdkWorkerStatusServer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/worker/SdkWorkerStatusServer.java new file mode 100644 index 000000000000..ef80778e4f9e --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/worker/SdkWorkerStatusServer.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.worker; + +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; +import com.sun.net.httpserver.HttpServer; +import dev.failsafe.Failsafe; +import dev.failsafe.RetryPolicy; +import java.io.IOException; +import java.io.OutputStream; +import java.net.BindException; +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.beam.runners.fnexecution.status.BeamWorkerStatusGrpcService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Servlet dedicated to provide live status info retrieved from SDK Harness. */ +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +public class SdkWorkerStatusServer { + private static final Logger LOG = LoggerFactory.getLogger(SdkWorkerStatusServer.class); + private static final int BASE_PORT = 5100; + private static final AtomicBoolean started = new AtomicBoolean(false); + + public static void create() { + if (started.compareAndSet(false, true)) { + AtomicInteger attempt = new AtomicInteger(0); + + RetryPolicy retryPolicy = + RetryPolicy.builder() + .handle(BindException.class, IOException.class) + .withBackoff( + Duration.ofMillis(500), Duration.ofSeconds(10), 2.0) // exponential backoff + .withMaxAttempts(5) + .onFailedAttempt( + e -> { + int port = BASE_PORT + attempt.get() - 1; + LOG.warn( + "Failed to bind to port " + + port + + ": " + + e.getLastException().getMessage()); + }) + .onRetriesExceeded(e -> LOG.warn("Exceeded max retries. Giving up.")) + .build(); + + try { + HttpServer server = + Failsafe.with(retryPolicy) + .get( + () -> { + int port = BASE_PORT + attempt.getAndIncrement(); + HttpServer httpServer = HttpServer.create(new InetSocketAddress(port), 0); + // Define a context (path) and handler + httpServer.createContext("/workerstatus", new WorkerStatusHandler()); + // Start the server + httpServer.setExecutor(null); // creates a default executor + httpServer.start(); + LOG.info( + "SdkWorkerStatusServer started at " + httpServer.getAddress().toString()); + return httpServer; + }); + + // Add shutdown hook + Runtime.getRuntime() + .addShutdownHook( + new Thread( + () -> { + LOG.info("Shutting down SdkWorkerStatusServer..."); + server.stop(0); + })); + } catch (Exception e) { + LOG.warn("Fail to start SdkWorkerStatusServer."); + } + } + } + + // Simple handler + static class WorkerStatusHandler implements HttpHandler { + @Override + public void handle(HttpExchange exchange) throws IOException { + try { + BeamWorkerStatusGrpcService statusGrpcService = BeamWorkerStatusGrpcService.getInstance(); + if (statusGrpcService != null) { + Map allStatuses = + statusGrpcService.getAllWorkerStatuses(10, TimeUnit.SECONDS); + StringBuilder sb = new StringBuilder(); + for (Map.Entry entry : allStatuses.entrySet()) { + sb.append(entry.getKey()); + sb.append("\n"); + sb.append(entry.getValue()); + sb.append("\n"); + } + + String response = sb.toString(); + exchange.sendResponseHeaders(200, response.length()); + try (OutputStream os = exchange.getResponseBody()) { + os.write(response.getBytes(StandardCharsets.UTF_8)); + } + } else { + LOG.info("BeamWorkerStatusGrpcService is not running."); + } + } catch (Exception e) { + LOG.warn("Exception when handling workerStatusRequest " + e.getMessage()); + } + } + } +} diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java index da5da6c9d603..9a65b8cbcba7 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java @@ -32,6 +32,7 @@ import java.util.concurrent.locks.ReentrantLock; import javax.annotation.concurrent.ThreadSafe; import org.apache.beam.model.fnexecution.v1.ProvisionApi; +import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor; import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; import org.apache.beam.model.pipeline.v1.RunnerApi.StandardEnvironments; import org.apache.beam.model.pipeline.v1.RunnerApi.StandardRunnerProtocols; @@ -52,6 +53,7 @@ import org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionService; import org.apache.beam.runners.fnexecution.state.GrpcStateService; import org.apache.beam.runners.fnexecution.state.StateRequestHandler; +import org.apache.beam.runners.fnexecution.status.BeamWorkerStatusGrpcService; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.fn.IdGenerator; import org.apache.beam.sdk.fn.IdGenerators; @@ -265,6 +267,12 @@ public WrappedSdkHarnessClient load(Environment environment) throws Exception { return caches.build(); } + public static boolean getEnableWorkerStatus(JobInfo jobInfo) { + PipelineOptions pipelineOptions = + PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions()); + return pipelineOptions.as(PortablePipelineOptions.class).getEnableWorkerStatus(); + } + private static int getEnvironmentExpirationMillis(JobInfo jobInfo) { PipelineOptions pipelineOptions = PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions()); @@ -692,6 +700,19 @@ private ServerInfo createServerInfo(JobInfo jobInfo, ServerFactory serverFactory provisionInfo.setControlEndpoint(controlServer.getApiServiceDescriptor()); provisionInfo.addRunnerCapabilities( BeamUrns.getUrn(StandardRunnerProtocols.Enum.CONTROL_RESPONSE_ELEMENTS_EMBEDDING)); + + if (getEnableWorkerStatus(jobInfo)) { + GrpcFnServer workerStatusServer = + GrpcFnServer.allocatePortAndCreateFor( + BeamWorkerStatusGrpcService.create( + ApiServiceDescriptor.getDefaultInstance(), + GrpcContextHeaderAccessorProvider.getHeaderAccessor()), + serverFactory); + + provisionInfo.setStatusEndpoint(workerStatusServer.getApiServiceDescriptor()); + } + + LOG.info("FnServer provision info: " + provisionInfo.build()); GrpcFnServer provisioningServer = GrpcFnServer.allocatePortAndCreateFor( StaticGrpcProvisionService.create( diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/status/BeamWorkerStatusGrpcService.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/status/BeamWorkerStatusGrpcService.java index 68ca36f8a369..3eb5007fce12 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/status/BeamWorkerStatusGrpcService.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/status/BeamWorkerStatusGrpcService.java @@ -39,6 +39,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; +import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,6 +58,8 @@ public class BeamWorkerStatusGrpcService extends BeamFnWorkerStatusImplBase impl Collections.synchronizedMap(new HashMap<>()); private final AtomicBoolean isClosed = new AtomicBoolean(); + private static @Nullable BeamWorkerStatusGrpcService CURRENT_INSTANCE = null; + private BeamWorkerStatusGrpcService( ApiServiceDescriptor apiServiceDescriptor, HeaderAccessor headerAccessor) { this.headerAccessor = headerAccessor; @@ -73,7 +76,13 @@ private BeamWorkerStatusGrpcService( */ public static BeamWorkerStatusGrpcService create( ApiServiceDescriptor apiServiceDescriptor, HeaderAccessor headerAccessor) { - return new BeamWorkerStatusGrpcService(apiServiceDescriptor, headerAccessor); + CURRENT_INSTANCE = new BeamWorkerStatusGrpcService(apiServiceDescriptor, headerAccessor); + return CURRENT_INSTANCE; + } + + /** Glean: return the current instance of {@link BeamWorkerStatusGrpcService}. */ + public static @Nullable BeamWorkerStatusGrpcService getInstance() { + return CURRENT_INSTANCE; } @Override diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java index ec82aebbef66..8d877bf2b484 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java @@ -140,4 +140,11 @@ static String getEnvironmentOption( boolean getEnableHeapDumps(); void setEnableHeapDumps(boolean enableHeapDumps); + + @Description( + "If {@literal true} and PipelineOption tempLocation is set, show worker status before shutting") + @Default.Boolean(false) + boolean getEnableWorkerStatus(); + + void setEnableWorkerStatus(boolean enableWorkerStatus); } diff --git a/sdks/python/apache_beam/runners/worker/data_plane.py b/sdks/python/apache_beam/runners/worker/data_plane.py index 2f9de24594b2..35a776c0f5cf 100644 --- a/sdks/python/apache_beam/runners/worker/data_plane.py +++ b/sdks/python/apache_beam/runners/worker/data_plane.py @@ -49,6 +49,7 @@ from apache_beam.portability.api import beam_fn_api_pb2_grpc from apache_beam.runners.worker.channel_factory import GRPCChannelFactory from apache_beam.runners.worker.worker_id_interceptor import WorkerIdInterceptor +from apache_beam.utils import retry if TYPE_CHECKING: import apache_beam.coders.slow_stream @@ -64,6 +65,7 @@ _DEFAULT_SIZE_FLUSH_THRESHOLD = 10 << 20 # 10MB _DEFAULT_TIME_FLUSH_THRESHOLD_MS = 0 # disable time-based flush by default _FLUSH_MAX_SIZE = (2 << 30) - 100 # 2GB less some overhead, protobuf/grpc limit +_SENT_TO_MAX_QSIZE = 8000 # Keep a set of completed instructions to discard late received data. The set # can have up to _MAX_CLEANED_INSTRUCTIONS items. See _GrpcDataChannel. _MAX_CLEANED_INSTRUCTIONS = 10000 @@ -457,7 +459,7 @@ class _GrpcDataChannel(DataChannel): def __init__(self, data_buffer_time_limit_ms=0): # type: (int) -> None self._data_buffer_time_limit_ms = data_buffer_time_limit_ms - self._to_send = queue.Queue() # type: queue.Queue[DataOrTimers] + self._to_send = queue.Queue(maxsize=_SENT_TO_MAX_QSIZE) # type: queue.Queue[DataOrTimers] self._received = collections.defaultdict( lambda: queue.Queue(maxsize=5) ) # type: DefaultDict[str, queue.Queue[DataOrTimers]] @@ -579,6 +581,13 @@ def input_elements( def output_stream(self, instruction_id, transform_id): # type: (str, str) -> ClosableOutputStream + @retry.with_exponential_backoff( + initial_delay_secs=1.0, + max_delay_secs=600.0, # 10 min + factor=4.0, + fuzz=False, + retry_filter=lambda e: isinstance(e, queue.Full) + ) def add_to_send_queue(data): # type: (bytes) -> None if data: @@ -586,8 +595,16 @@ def add_to_send_queue(data): beam_fn_api_pb2.Elements.Data( instruction_id=instruction_id, transform_id=transform_id, - data=data)) - + data=data), + timeout=1.0) + + @retry.with_exponential_backoff( + initial_delay_secs=1.0, + max_delay_secs=600.0, # 10 min + factor=4.0, + fuzz=False, + retry_filter=lambda e: isinstance(e, queue.Full) + ) def close_callback(data): # type: (bytes) -> None add_to_send_queue(data) @@ -596,7 +613,8 @@ def close_callback(data): beam_fn_api_pb2.Elements.Data( instruction_id=instruction_id, transform_id=transform_id, - is_last=True)) + is_last=True), + timeout=1.0) return ClosableOutputStream.create( close_callback, add_to_send_queue, self._data_buffer_time_limit_ms) @@ -608,6 +626,13 @@ def output_timer_stream( timer_family_id # type: str ): # type: (...) -> ClosableOutputStream + @retry.with_exponential_backoff( + initial_delay_secs=1.0, + max_delay_secs=600.0, # 10 min + factor=4.0, + fuzz=False, + retry_filter=lambda e: isinstance(e, queue.Full) + ) def add_to_send_queue(timer): # type: (bytes) -> None if timer: @@ -617,8 +642,16 @@ def add_to_send_queue(timer): transform_id=transform_id, timer_family_id=timer_family_id, timers=timer, - is_last=False)) - + is_last=False), + timeout=1.0) + + @retry.with_exponential_backoff( + initial_delay_secs=1.0, + max_delay_secs=600.0, # 10 min + factor=4.0, + fuzz=False, + retry_filter=lambda e: isinstance(e, queue.Full) + ) def close_callback(timer): # type: (bytes) -> None add_to_send_queue(timer) @@ -627,7 +660,8 @@ def close_callback(timer): instruction_id=instruction_id, transform_id=transform_id, timer_family_id=timer_family_id, - is_last=True)) + is_last=True), + timeout=1.0) return ClosableOutputStream.create( close_callback, add_to_send_queue, self._data_buffer_time_limit_ms) @@ -635,6 +669,8 @@ def close_callback(timer): def _write_outputs(self): # type: () -> Iterator[beam_fn_api_pb2.Elements] stream_done = False + next_size_log_time = 0 + total_size_bytes = 0 while not stream_done: streams = [self._to_send.get()] try: @@ -647,6 +683,13 @@ def _write_outputs(self): streams.append(data_or_timer) except queue.Empty: pass + + current_time = time.time() + if next_size_log_time <= current_time: + qlen = self._to_send.qsize() + _LOGGER.info(f'to_send qsize: {qlen}. total send size: {total_size_bytes}') + next_size_log_time = current_time + 30 + if streams[-1] is self._WRITES_FINISHED: stream_done = True streams.pop() diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go index 73f6a9dad396..44a88534a253 100644 --- a/sdks/python/container/boot.go +++ b/sdks/python/container/boot.go @@ -265,6 +265,7 @@ func launchSDKProcess() error { }() args := []string{ + "run", "-m", sdkHarnessEntrypoint, } @@ -284,7 +285,7 @@ func launchSDKProcess() error { return } logger.Printf(ctx, "Executing Python (worker %v): python %v", workerId, strings.Join(args, " ")) - cmd := StartCommandEnv(map[string]string{"WORKER_ID": workerId}, os.Stdin, bufLogger, bufLogger, "python", args...) + cmd := StartCommandEnv(map[string]string{"WORKER_ID": workerId}, os.Stdin, bufLogger, bufLogger, "memray", args...) childPids.v = append(childPids.v, cmd.Process.Pid) childPids.mu.Unlock() @@ -483,6 +484,13 @@ func logRuntimeDependencies(ctx context.Context, bufLogger *tools.BufferedLogger } else { bufLogger.FlushAtDebug(ctx) } + bufLogger.Printf(ctx, "Install memray") + args = []string{"-m", "pip", "install", "memray"} + if err := execx.ExecuteEnvWithIO(nil, os.Stdin, bufLogger, bufLogger, pythonVersion, args...); err != nil { + bufLogger.FlushAtError(ctx) + } else { + bufLogger.FlushAtDebug(ctx) + } return nil }