diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ByteStringOutputStream.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ByteStringOutputStream.java index 86f5cecce169..01d33f61c4fd 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ByteStringOutputStream.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ByteStringOutputStream.java @@ -22,6 +22,7 @@ import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString; import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.UnsafeByteOperations; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; +import org.slf4j.LoggerFactory; /** * An {@link OutputStream} that produces {@link ByteString}s. @@ -46,6 +47,8 @@ public final class ByteStringOutputStream extends OutputStream { // This number should be tuned periodically as hardware changes. private static final int MAX_CHUNK_SIZE = 256 * 1024; + private final Logger logger = LoggerFactory.getLogger(ByteStringOutputStream.class); + // ByteString to be concatenated to create the result private ByteString result; @@ -87,21 +90,33 @@ public void write(int b) { @Override public void write(byte[] b, int offset, int length) { int remainingSpaceInBuffer = buffer.length - bufferPos; + if (length < 0) { + throw new RuntimeException("Glean mod: write called with length < 0" + length); + } + while (length > remainingSpaceInBuffer) { // Use up the current buffer - System.arraycopy(b, offset, buffer, bufferPos, remainingSpaceInBuffer); - offset += remainingSpaceInBuffer; - length -= remainingSpaceInBuffer; - - result = result.concat(UnsafeByteOperations.unsafeWrap(buffer)); - // We want to increase our total capacity but not larger than the max chunk size. - remainingSpaceInBuffer = Math.min(Math.max(length, result.size()), MAX_CHUNK_SIZE); - buffer = new byte[remainingSpaceInBuffer]; - bufferPos = 0; + if (remainingSpaceInBuffer >= 0) { + System.arraycopy(b, offset, buffer, bufferPos, remainingSpaceInBuffer); + offset += remainingSpaceInBuffer; + length -= remainingSpaceInBuffer; + result = result.concat(UnsafeByteOperations.unsafeWrap(buffer)); + // We want to increase our total capacity but not larger than the max chunk size. + remainingSpaceInBuffer = Math.min(Math.max(length, result.size()), MAX_CHUNK_SIZE); + buffer = new byte[remainingSpaceInBuffer]; + bufferPos = 0; + } else { + throw new RuntimeException("Glean mod: remainingSpaceInBuffer < 0: " + remainingSpaceInBuffer); + } } - System.arraycopy(b, offset, buffer, bufferPos, length); - bufferPos += length; + if (length > 0) { + // Copy the remainder into the buffer + System.arraycopy(b, offset, buffer, bufferPos, length); + bufferPos += length; + } else { + logger.warn("Glean mod: arraycopy called with length <= 0: {}, offset: {}, buffer length: {}, bufferPos: {}", length, offset, buffer.length, bufferPos); + } } /** diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java index 95f41291f6c7..5a1e93c032c7 100644 --- a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java +++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java @@ -29,6 +29,8 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.util.ByteStringOutputStream; import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * {@link DataStreamDecoder} treats multiple {@link ByteString}s as a single input stream decoding @@ -40,7 +42,7 @@ "nullness" // TODO(https://github.com/apache/beam/issues/20497) }) public class DataStreams { - public static final int DEFAULT_OUTBOUND_BUFFER_LIMIT_BYTES = 1_000_000; + public static final int DEFAULT_OUTBOUND_BUFFER_LIMIT_BYTES = 1_048_576; /** * Converts a single element delimited {@link OutputStream} into multiple {@link ByteString @@ -80,6 +82,9 @@ public static ElementDelimitedOutputStream outbound( *
Note that users must invoke {@link #delimitElement} at each element boundary.
*/
public static final class ElementDelimitedOutputStream extends OutputStream {
+
+ private final Logger logger = LoggerFactory.getLogger(ElementDelimitedOutputStream.class);
+
private final OutputChunkConsumer