Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.guacamole.GuacamoleException;
import org.apache.guacamole.net.GuacamoleTunnel;
import org.apache.guacamole.protocol.GuacamoleInstruction;
Expand All @@ -37,14 +39,21 @@
* sent automatically.
*/
public class OutputStreamInterceptingFilter
extends StreamInterceptingFilter<OutputStream> {
extends StreamInterceptingFilter<OutputStream>
implements OutputStreamWriter.ExecutionListener {

/**
* Logger for this class.
*/
private static final Logger logger =
LoggerFactory.getLogger(OutputStreamInterceptingFilter.class);

/**
* File download stream writers which will send data asynchronosly.
*/
private final ConcurrentMap<String, OutputStreamWriter> streamWriters =
new ConcurrentHashMap<>();

/**
* Whether this OutputStreamInterceptingFilter should respond to received
* blobs with "ack" messages on behalf of the client. If false, blobs will
Expand Down Expand Up @@ -95,6 +104,23 @@ private void sendAck(String index, String message, GuacamoleStatus status) {

}

@Override
public void onBlobWritten(String streamIndex, boolean requiresAck) {
if (requiresAck) {
sendAck(streamIndex, "OK", GuacamoleStatus.SUCCESS);
}
}

@Override
public void onWriteFailed(String streamIndex) {
sendAck(streamIndex, "FAIL", GuacamoleStatus.SERVER_ERROR);
}

@Override
public void onStreamEnd(String streamIndex) {
closeInterceptedStream(streamIndex);
}

/**
* Handles a single "blob" instruction, decoding its base64 data,
* sending that data to the associated OutputStream, and ultimately
Expand All @@ -117,10 +143,12 @@ private GuacamoleInstruction handleBlob(GuacamoleInstruction instruction) {
if (args.size() < 2)
return instruction;

// Pull associated stream
String index = args.get(0);
InterceptedStream<OutputStream> stream = getInterceptedStream(index);
if (stream == null)
// Get the stream index
String streamIndex = args.get(0);

// Process the blob asynchornously if there is a worker
OutputStreamWriter streamWriter = streamWriters.get(streamIndex);
if (streamWriter == null)
return instruction;

// Decode blob
Expand All @@ -134,31 +162,25 @@ private GuacamoleInstruction handleBlob(GuacamoleInstruction instruction) {
return null;
}

try {

// Attempt to write data to stream
stream.getStream().write(blob);

// Force client to respond with their own "ack" if we need to
// confirm that they are not falling behind with respect to the
// graphical session
if (!acknowledgeBlobs) {
acknowledgeBlobs = true;
return new GuacamoleInstruction("blob", index, "");
}

// Otherwise, acknowledge the blob on the client's behalf
sendAck(index, "OK", GuacamoleStatus.SUCCESS);

}
catch (IOException e) {
sendAck(index, "FAIL", GuacamoleStatus.SERVER_ERROR);
logger.debug("Write failed for intercepted stream.", e);
// Force client to respond with their own "ack" to confirm they are not
// falling behind with respect to the graphical session, only if
// - There are no blobs in the queue currently
// - Previous blob required server side acknowledgement
// This may lead to more than one blob in the writer queue temporarily,
// but not more than two blobs anyways.
if (!acknowledgeBlobs &&
streamWriter.getQueuedMessageCount() == 0 &&
streamWriter.didPrevBlobRequireAck()) {
streamWriter.handleBlob(blob, false);
acknowledgeBlobs = true;

// Send an empty blob to trigger client "ack"
return new GuacamoleInstruction("blob", streamIndex, "");
}

// Instruction was handled purely internally
// Put the blob to the writer queue
streamWriter.handleBlob(blob, true);
return null;

}

/**
Expand All @@ -176,9 +198,13 @@ private void handleEnd(GuacamoleInstruction instruction) {
if (args.size() < 1)
return;

// Terminate stream
closeInterceptedStream(args.get(0));

OutputStreamWriter streamWriter = streamWriters.get(args.get(0));
if (streamWriter == null)
return;

// Notify the writer that the end marker has been received.
// it will terminate the stream once all blobs are written.
streamWriter.handleEnd();
}

/**
Expand Down Expand Up @@ -221,9 +247,30 @@ public GuacamoleInstruction filter(GuacamoleInstruction instruction)
@Override
protected void handleInterceptedStream(InterceptedStream<OutputStream> stream) {

// Create the stream writer
OutputStreamWriter streamWriter = new OutputStreamWriter(stream, this);

// Put it into the container and check if there was another writer for the index
OutputStreamWriter old = streamWriters.put(stream.getIndex(), streamWriter);
if (old != null) {
logger.debug("Found an older stream #{}; will close it", stream.getIndex());
// Close the stream to be sure it does not get stuck on write
closeInterceptedStream(old.getStream());
// Stop it
old.stop();
}

// Acknowledge that the stream is ready to receive data
sendAck(stream.getIndex(), "OK", GuacamoleStatus.SUCCESS);

}
// This will block the thread until the stream is closed by
// disconnection, or the end instruction is received.
streamWriter.run();

// Close the stream if not closed yet
closeInterceptedStream(stream);

// Remove the stream from the container
streamWriters.entrySet().removeIf(entry -> entry.getValue().equals(streamWriter));
}
}
Loading