Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.UnsafeByteOperations;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.slf4j.LoggerFactory;

/**
* An {@link OutputStream} that produces {@link ByteString}s.
Expand All @@ -46,6 +47,8 @@ public final class ByteStringOutputStream extends OutputStream {
// This number should be tuned periodically as hardware changes.
private static final int MAX_CHUNK_SIZE = 256 * 1024;

private final Logger logger = LoggerFactory.getLogger(ByteStringOutputStream.class);

// ByteString to be concatenated to create the result
private ByteString result;

Expand Down Expand Up @@ -87,21 +90,33 @@ public void write(int b) {
@Override
public void write(byte[] b, int offset, int length) {
int remainingSpaceInBuffer = buffer.length - bufferPos;
if (length < 0) {
throw new RuntimeException("Glean mod: write called with length < 0" + length);
}

while (length > remainingSpaceInBuffer) {
// Use up the current buffer
System.arraycopy(b, offset, buffer, bufferPos, remainingSpaceInBuffer);
offset += remainingSpaceInBuffer;
length -= remainingSpaceInBuffer;

result = result.concat(UnsafeByteOperations.unsafeWrap(buffer));
// We want to increase our total capacity but not larger than the max chunk size.
remainingSpaceInBuffer = Math.min(Math.max(length, result.size()), MAX_CHUNK_SIZE);
buffer = new byte[remainingSpaceInBuffer];
bufferPos = 0;
if (remainingSpaceInBuffer >= 0) {
System.arraycopy(b, offset, buffer, bufferPos, remainingSpaceInBuffer);
offset += remainingSpaceInBuffer;
length -= remainingSpaceInBuffer;
result = result.concat(UnsafeByteOperations.unsafeWrap(buffer));
// We want to increase our total capacity but not larger than the max chunk size.
remainingSpaceInBuffer = Math.min(Math.max(length, result.size()), MAX_CHUNK_SIZE);
buffer = new byte[remainingSpaceInBuffer];
bufferPos = 0;
} else {
throw new RuntimeException("Glean mod: remainingSpaceInBuffer < 0: " + remainingSpaceInBuffer);
}
}

System.arraycopy(b, offset, buffer, bufferPos, length);
bufferPos += length;
if (length > 0) {
// Copy the remainder into the buffer
System.arraycopy(b, offset, buffer, bufferPos, length);
bufferPos += length;
} else {
logger.warn("Glean mod: arraycopy called with length <= 0: {}, offset: {}, buffer length: {}, bufferPos: {}", length, offset, buffer.length, bufferPos);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* {@link DataStreamDecoder} treats multiple {@link ByteString}s as a single input stream decoding
Expand All @@ -40,7 +42,7 @@
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class DataStreams {
public static final int DEFAULT_OUTBOUND_BUFFER_LIMIT_BYTES = 1_000_000;
public static final int DEFAULT_OUTBOUND_BUFFER_LIMIT_BYTES = 1_048_576;

/**
* Converts a single element delimited {@link OutputStream} into multiple {@link ByteString
Expand Down Expand Up @@ -80,6 +82,9 @@ public static ElementDelimitedOutputStream outbound(
* <p>Note that users must invoke {@link #delimitElement} at each element boundary.
*/
public static final class ElementDelimitedOutputStream extends OutputStream {

private final Logger logger = LoggerFactory.getLogger(ElementDelimitedOutputStream.class);

private final OutputChunkConsumer<ByteString> consumer;
private final ByteStringOutputStream output;
private final int maximumChunkSize;
Expand Down Expand Up @@ -111,23 +116,30 @@ public void write(int i) throws IOException {

@Override
public void write(byte[] b, int offset, int length) throws IOException {
int spaceRemaining = maximumChunkSize - output.size();
int spaceRemaining = Math.max(maximumChunkSize - output.size(), 0);
// Fill the first partially filled buffer.
if (length > spaceRemaining) {
output.write(b, offset, spaceRemaining);
offset += spaceRemaining;
length -= spaceRemaining;
internalFlush();
}
// Fill buffers completely.
while (length > maximumChunkSize) {
output.write(b, offset, maximumChunkSize);
offset += maximumChunkSize;
length -= maximumChunkSize;
internalFlush();
try {
if (length > spaceRemaining) {
output.write(b, offset, spaceRemaining);
offset += spaceRemaining;
length -= spaceRemaining;
internalFlush();
}
// Fill buffers completely.
while (length > maximumChunkSize) {
output.write(b, offset, maximumChunkSize);
offset += maximumChunkSize;
length -= maximumChunkSize;
internalFlush();
}
if (length > 0) {
// Fill any remainder.
output.write(b, offset, length);
}
} catch (RuntimeException e) {
logger.warn("Glean mod: DataStreams exception: {}, spaceRemaining: {}, offset: {}, length: {}, output size: {}", e, spaceRemaining, offset, length, output.size());
throw e;
}
// Fill any remainder.
output.write(b, offset, length);
}

@Override
Expand Down
14 changes: 8 additions & 6 deletions sdks/python/container/boot.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,11 @@ func launchSDKProcess() error {
if err != nil {
return errors.New(
"failed to create a virtual environment. If running on Ubuntu systems, " +
"you might need to install `python3-venv` package. " +
"To run the SDK process in default environment instead, " +
"set the environment variable `RUN_PYTHON_SDK_IN_DEFAULT_ENVIRONMENT=1`. " +
"In custom Docker images, you can do that with an `ENV` statement. " +
fmt.Sprintf("Encountered error: %v", err))
"you might need to install `python3-venv` package. " +
"To run the SDK process in default environment instead, " +
"set the environment variable `RUN_PYTHON_SDK_IN_DEFAULT_ENVIRONMENT=1`. " +
"In custom Docker images, you can do that with an `ENV` statement. " +
fmt.Sprintf("Encountered error: %v", err))
}
cleanupFunc := func() {
os.RemoveAll(venvDir)
Expand Down Expand Up @@ -238,9 +238,11 @@ func launchSDKProcess() error {
childPids.canceled = true
for _, pid := range childPids.v {
go func(pid int) {
logger.Printf(ctx, "Sending SIGTERM to worker process %v with 120 timeout", pid)
// This goroutine will be canceled if the main process exits before the 5 seconds
// have elapsed, i.e., as soon as all subprocesses have returned from Wait().
time.Sleep(5 * time.Second)
time.Sleep(120 * time.Second)
logger.Printf(ctx, "Finished SIGTERM to worker process %v with 120 timeout, killing", pid)
if err := syscall.Kill(-pid, syscall.SIGKILL); err == nil {
logger.Printf(ctx, "Worker process %v did not respond, killed it.", pid)
}
Expand Down