From ad2cfef190654d21992bc2ced3d35d8b95ab9743 Mon Sep 17 00:00:00 2001 From: Steve Jiang Date: Mon, 1 Apr 2024 10:39:57 -0700 Subject: [PATCH 1/2] steve/glean-sideinput-fixes --- .../beam/sdk/util/ByteStringOutputStream.java | 37 +++++++++++----- .../beam/sdk/fn/stream/DataStreams.java | 44 ++++++++++++------- 2 files changed, 54 insertions(+), 27 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ByteStringOutputStream.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ByteStringOutputStream.java index 86f5cecce169..01d33f61c4fd 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ByteStringOutputStream.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ByteStringOutputStream.java @@ -22,6 +22,7 @@ import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString; import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.UnsafeByteOperations; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; +import org.slf4j.LoggerFactory; /** * An {@link OutputStream} that produces {@link ByteString}s. @@ -46,6 +47,8 @@ public final class ByteStringOutputStream extends OutputStream { // This number should be tuned periodically as hardware changes. private static final int MAX_CHUNK_SIZE = 256 * 1024; + private final Logger logger = LoggerFactory.getLogger(ByteStringOutputStream.class); + // ByteString to be concatenated to create the result private ByteString result; @@ -87,21 +90,33 @@ public void write(int b) { @Override public void write(byte[] b, int offset, int length) { int remainingSpaceInBuffer = buffer.length - bufferPos; + if (length < 0) { + throw new RuntimeException("Glean mod: write called with length < 0" + length); + } + while (length > remainingSpaceInBuffer) { // Use up the current buffer - System.arraycopy(b, offset, buffer, bufferPos, remainingSpaceInBuffer); - offset += remainingSpaceInBuffer; - length -= remainingSpaceInBuffer; - - result = result.concat(UnsafeByteOperations.unsafeWrap(buffer)); - // We want to increase our total capacity but not larger than the max chunk size. - remainingSpaceInBuffer = Math.min(Math.max(length, result.size()), MAX_CHUNK_SIZE); - buffer = new byte[remainingSpaceInBuffer]; - bufferPos = 0; + if (remainingSpaceInBuffer >= 0) { + System.arraycopy(b, offset, buffer, bufferPos, remainingSpaceInBuffer); + offset += remainingSpaceInBuffer; + length -= remainingSpaceInBuffer; + result = result.concat(UnsafeByteOperations.unsafeWrap(buffer)); + // We want to increase our total capacity but not larger than the max chunk size. + remainingSpaceInBuffer = Math.min(Math.max(length, result.size()), MAX_CHUNK_SIZE); + buffer = new byte[remainingSpaceInBuffer]; + bufferPos = 0; + } else { + throw new RuntimeException("Glean mod: remainingSpaceInBuffer < 0: " + remainingSpaceInBuffer); + } } - System.arraycopy(b, offset, buffer, bufferPos, length); - bufferPos += length; + if (length > 0) { + // Copy the remainder into the buffer + System.arraycopy(b, offset, buffer, bufferPos, length); + bufferPos += length; + } else { + logger.warn("Glean mod: arraycopy called with length <= 0: {}, offset: {}, buffer length: {}, bufferPos: {}", length, offset, buffer.length, bufferPos); + } } /** diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java index 95f41291f6c7..5a1e93c032c7 100644 --- a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java +++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java @@ -29,6 +29,8 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.util.ByteStringOutputStream; import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * {@link DataStreamDecoder} treats multiple {@link ByteString}s as a single input stream decoding @@ -40,7 +42,7 @@ "nullness" // TODO(https://github.com/apache/beam/issues/20497) }) public class DataStreams { - public static final int DEFAULT_OUTBOUND_BUFFER_LIMIT_BYTES = 1_000_000; + public static final int DEFAULT_OUTBOUND_BUFFER_LIMIT_BYTES = 1_048_576; /** * Converts a single element delimited {@link OutputStream} into multiple {@link ByteString @@ -80,6 +82,9 @@ public static ElementDelimitedOutputStream outbound( *

Note that users must invoke {@link #delimitElement} at each element boundary. */ public static final class ElementDelimitedOutputStream extends OutputStream { + + private final Logger logger = LoggerFactory.getLogger(ElementDelimitedOutputStream.class); + private final OutputChunkConsumer consumer; private final ByteStringOutputStream output; private final int maximumChunkSize; @@ -111,23 +116,30 @@ public void write(int i) throws IOException { @Override public void write(byte[] b, int offset, int length) throws IOException { - int spaceRemaining = maximumChunkSize - output.size(); + int spaceRemaining = Math.max(maximumChunkSize - output.size(), 0); // Fill the first partially filled buffer. - if (length > spaceRemaining) { - output.write(b, offset, spaceRemaining); - offset += spaceRemaining; - length -= spaceRemaining; - internalFlush(); - } - // Fill buffers completely. - while (length > maximumChunkSize) { - output.write(b, offset, maximumChunkSize); - offset += maximumChunkSize; - length -= maximumChunkSize; - internalFlush(); + try { + if (length > spaceRemaining) { + output.write(b, offset, spaceRemaining); + offset += spaceRemaining; + length -= spaceRemaining; + internalFlush(); + } + // Fill buffers completely. + while (length > maximumChunkSize) { + output.write(b, offset, maximumChunkSize); + offset += maximumChunkSize; + length -= maximumChunkSize; + internalFlush(); + } + if (length > 0) { + // Fill any remainder. + output.write(b, offset, length); + } + } catch (RuntimeException e) { + logger.warn("Glean mod: DataStreams exception: {}, spaceRemaining: {}, offset: {}, length: {}, output size: {}", e, spaceRemaining, offset, length, output.size()); + throw e; } - // Fill any remainder. - output.write(b, offset, length); } @Override From 72c848b0b87b100958b86691f67102a6a412b5b0 Mon Sep 17 00:00:00 2001 From: Steve Jiang Date: Mon, 1 Apr 2024 10:44:22 -0700 Subject: [PATCH 2/2] steve/glean-boot-fork --- sdks/python/container/boot.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go index 1e70e0db1513..ab645c0ff104 100644 --- a/sdks/python/container/boot.go +++ b/sdks/python/container/boot.go @@ -164,11 +164,11 @@ func launchSDKProcess() error { if err != nil { return errors.New( "failed to create a virtual environment. If running on Ubuntu systems, " + - "you might need to install `python3-venv` package. " + - "To run the SDK process in default environment instead, " + - "set the environment variable `RUN_PYTHON_SDK_IN_DEFAULT_ENVIRONMENT=1`. " + - "In custom Docker images, you can do that with an `ENV` statement. " + - fmt.Sprintf("Encountered error: %v", err)) + "you might need to install `python3-venv` package. " + + "To run the SDK process in default environment instead, " + + "set the environment variable `RUN_PYTHON_SDK_IN_DEFAULT_ENVIRONMENT=1`. " + + "In custom Docker images, you can do that with an `ENV` statement. " + + fmt.Sprintf("Encountered error: %v", err)) } cleanupFunc := func() { os.RemoveAll(venvDir) @@ -238,9 +238,11 @@ func launchSDKProcess() error { childPids.canceled = true for _, pid := range childPids.v { go func(pid int) { + logger.Printf(ctx, "Sending SIGTERM to worker process %v with 120 timeout", pid) // This goroutine will be canceled if the main process exits before the 5 seconds // have elapsed, i.e., as soon as all subprocesses have returned from Wait(). - time.Sleep(5 * time.Second) + time.Sleep(120 * time.Second) + logger.Printf(ctx, "Finished SIGTERM to worker process %v with 120 timeout, killing", pid) if err := syscall.Kill(-pid, syscall.SIGKILL); err == nil { logger.Printf(ctx, "Worker process %v did not respond, killed it.", pid) }