diff --git a/guacamole/src/main/java/org/apache/guacamole/tunnel/OutputStreamInterceptingFilter.java b/guacamole/src/main/java/org/apache/guacamole/tunnel/OutputStreamInterceptingFilter.java index 7fab4ff87a..27bc6c2c0a 100644 --- a/guacamole/src/main/java/org/apache/guacamole/tunnel/OutputStreamInterceptingFilter.java +++ b/guacamole/src/main/java/org/apache/guacamole/tunnel/OutputStreamInterceptingFilter.java @@ -23,6 +23,8 @@ import java.io.IOException; import java.io.OutputStream; import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.guacamole.GuacamoleException; import org.apache.guacamole.net.GuacamoleTunnel; import org.apache.guacamole.protocol.GuacamoleInstruction; @@ -37,7 +39,8 @@ * sent automatically. */ public class OutputStreamInterceptingFilter - extends StreamInterceptingFilter { + extends StreamInterceptingFilter + implements OutputStreamWriter.ExecutionListener { /** * Logger for this class. @@ -45,6 +48,12 @@ public class OutputStreamInterceptingFilter private static final Logger logger = LoggerFactory.getLogger(OutputStreamInterceptingFilter.class); + /** + * File download stream writers which will send data asynchronosly. + */ + private final ConcurrentMap streamWriters = + new ConcurrentHashMap<>(); + /** * Whether this OutputStreamInterceptingFilter should respond to received * blobs with "ack" messages on behalf of the client. If false, blobs will @@ -95,6 +104,23 @@ private void sendAck(String index, String message, GuacamoleStatus status) { } + @Override + public void onBlobWritten(String streamIndex, boolean requiresAck) { + if (requiresAck) { + sendAck(streamIndex, "OK", GuacamoleStatus.SUCCESS); + } + } + + @Override + public void onWriteFailed(String streamIndex) { + sendAck(streamIndex, "FAIL", GuacamoleStatus.SERVER_ERROR); + } + + @Override + public void onStreamEnd(String streamIndex) { + closeInterceptedStream(streamIndex); + } + /** * Handles a single "blob" instruction, decoding its base64 data, * sending that data to the associated OutputStream, and ultimately @@ -117,10 +143,12 @@ private GuacamoleInstruction handleBlob(GuacamoleInstruction instruction) { if (args.size() < 2) return instruction; - // Pull associated stream - String index = args.get(0); - InterceptedStream stream = getInterceptedStream(index); - if (stream == null) + // Get the stream index + String streamIndex = args.get(0); + + // Process the blob asynchornously if there is a worker + OutputStreamWriter streamWriter = streamWriters.get(streamIndex); + if (streamWriter == null) return instruction; // Decode blob @@ -134,31 +162,25 @@ private GuacamoleInstruction handleBlob(GuacamoleInstruction instruction) { return null; } - try { - - // Attempt to write data to stream - stream.getStream().write(blob); - - // Force client to respond with their own "ack" if we need to - // confirm that they are not falling behind with respect to the - // graphical session - if (!acknowledgeBlobs) { - acknowledgeBlobs = true; - return new GuacamoleInstruction("blob", index, ""); - } - - // Otherwise, acknowledge the blob on the client's behalf - sendAck(index, "OK", GuacamoleStatus.SUCCESS); - - } - catch (IOException e) { - sendAck(index, "FAIL", GuacamoleStatus.SERVER_ERROR); - logger.debug("Write failed for intercepted stream.", e); + // Force client to respond with their own "ack" to confirm they are not + // falling behind with respect to the graphical session, only if + // - There are no blobs in the queue currently + // - Previous blob required server side acknowledgement + // This may lead to more than one blob in the writer queue temporarily, + // but not more than two blobs anyways. + if (!acknowledgeBlobs && + streamWriter.getQueuedMessageCount() == 0 && + streamWriter.didPrevBlobRequireAck()) { + streamWriter.handleBlob(blob, false); + acknowledgeBlobs = true; + + // Send an empty blob to trigger client "ack" + return new GuacamoleInstruction("blob", streamIndex, ""); } - // Instruction was handled purely internally + // Put the blob to the writer queue + streamWriter.handleBlob(blob, true); return null; - } /** @@ -176,9 +198,13 @@ private void handleEnd(GuacamoleInstruction instruction) { if (args.size() < 1) return; - // Terminate stream - closeInterceptedStream(args.get(0)); - + OutputStreamWriter streamWriter = streamWriters.get(args.get(0)); + if (streamWriter == null) + return; + + // Notify the writer that the end marker has been received. + // it will terminate the stream once all blobs are written. + streamWriter.handleEnd(); } /** @@ -221,9 +247,30 @@ public GuacamoleInstruction filter(GuacamoleInstruction instruction) @Override protected void handleInterceptedStream(InterceptedStream stream) { + // Create the stream writer + OutputStreamWriter streamWriter = new OutputStreamWriter(stream, this); + + // Put it into the container and check if there was another writer for the index + OutputStreamWriter old = streamWriters.put(stream.getIndex(), streamWriter); + if (old != null) { + logger.debug("Found an older stream #{}; will close it", stream.getIndex()); + // Close the stream to be sure it does not get stuck on write + closeInterceptedStream(old.getStream()); + // Stop it + old.stop(); + } + // Acknowledge that the stream is ready to receive data sendAck(stream.getIndex(), "OK", GuacamoleStatus.SUCCESS); - } + // This will block the thread until the stream is closed by + // disconnection, or the end instruction is received. + streamWriter.run(); + + // Close the stream if not closed yet + closeInterceptedStream(stream); + // Remove the stream from the container + streamWriters.entrySet().removeIf(entry -> entry.getValue().equals(streamWriter)); + } } diff --git a/guacamole/src/main/java/org/apache/guacamole/tunnel/OutputStreamWriter.java b/guacamole/src/main/java/org/apache/guacamole/tunnel/OutputStreamWriter.java new file mode 100644 index 0000000000..364d1e5951 --- /dev/null +++ b/guacamole/src/main/java/org/apache/guacamole/tunnel/OutputStreamWriter.java @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.guacamole.tunnel; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Handles writing of data blobs to the output stream in a separate thread. + * Tracks and returns the results of the writing operation for each blob. + */ +public class OutputStreamWriter { + + /** + * Listener interface for receiveing results of blob writing operations + * or anything which happens inside the writer thread. + */ + public interface ExecutionListener { + void onBlobWritten(String streamIndex, boolean requiresAck); + void onWriteFailed(String streamIndex); + void onStreamEnd(String streamIndex); + } + + /** + * Thread message interface. + */ + private interface Message {} + + /** + * Thread message to send a blob. + */ + private final class MessageBlob implements Message { + public final byte[] blob; + public final boolean requiresAck; + public MessageBlob(byte[] blob, boolean requiresAck) { + this.blob = blob; + this.requiresAck = requiresAck; + } + } + + /** + * Stream end marker thread message. + */ + private final class MessageEnd implements Message {} + + /** + * Message to stop the thread. + * Used together with setting isRunning to false. + */ + private final class MessageStop implements Message {} + + /** + * Logger for this class. + */ + private static final Logger logger = + LoggerFactory.getLogger(OutputStreamWriter.class); + + /** + * Index of the output stream. + */ + private final String streamIndex; + + /** + * The stream to write blobs. + */ + private final InterceptedStream stream; + + /** + * Reference to the object where to send results of blob + * writing operations and other events from the writer thread. + */ + private final ExecutionListener executionListener; + + /** + * The queue for thread messages which includes blobs to write to the stream. + */ + private final BlockingQueue messageQueue = new LinkedBlockingQueue<>(); + + /** + * Whether the writer is alive and the thread is writing. + */ + private volatile boolean isRunning = true; + + /** + * Whether the latest queued blob will trigger an acknowledge packet. + */ + private boolean prevBlobRequiresAck = false; + + /** + * Creates a new OutputStreamWriter which will write blobs + * into the output stream and return results of the writing operations + * to the execution listener. + * + * @param stream + * The stream to write blobs. + * + * @param executionListener + * Exception listener object to send results of the writing operations. + */ + public OutputStreamWriter(InterceptedStream stream, + ExecutionListener executionListener) { + this.streamIndex = stream.getIndex(); + this.stream = stream; + this.executionListener = executionListener; + } + + /** + * Signals the streaming thread to stop. + */ + public void stop() { + isRunning = false; + messageQueue.offer(new MessageStop()); + } + + /** + * Return the stream where the blobs are written. + * + * @return + * The stream related to this writer. + */ + public InterceptedStream getStream() { + return stream; + } + + /** + * Puts a blob into the internal queue to be write into the stream. + * + * @param blob + * Blob which has to be written into the stream. + * + * @param requiresAck + * ACK packet must be sent when the blob is written. + */ + public void handleBlob(byte[] blob, boolean requiresAck) { + messageQueue.offer(new MessageBlob(blob, requiresAck)); + prevBlobRequiresAck = requiresAck; + } + + /** + * Puts the end marker message into the queue. + */ + public void handleEnd() { + messageQueue.offer(new MessageEnd()); + } + + /** + * Checks whether the previous blob required ACK to be sent to the sender. + * + * @return + * true if the previous blob required ACK. + */ + public boolean didPrevBlobRequireAck() { + return prevBlobRequiresAck; + } + + /** + * Returns current queue message count. + * + * @return + * Current queue message count. + */ + public int getQueuedMessageCount() { + int size = messageQueue.size(); + return size; + } + + /** + * Runs the loop to send queued blobs. + */ + public void run() { + + logger.debug("Started processing message queue for stream #{}", + streamIndex); + + try { + + // Run the writing loop + while (isRunning) { + + // Pull a queued message + Message message = messageQueue.take(); + + // We do not check for StreamWriterStop here. + // isRunning check is enough. + if (!isRunning) { + break; + } + + // End marker means close the stream and exit + if (message instanceof MessageEnd) { + logger.debug("Received the end marker for stream #{}", streamIndex); + executionListener.onStreamEnd(streamIndex); + isRunning = false; + break; + } + + // Write the blob + if (message instanceof MessageBlob) { + MessageBlob streamWriterBlob = (MessageBlob) message; + + // Attempt to write data to stream + stream.getStream().write(streamWriterBlob.blob); + + // Otherwise, acknowledge the blob on the client's behalf + executionListener.onBlobWritten(stream.getIndex(), + streamWriterBlob.requiresAck); + } + + } + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + catch (IOException e) { + executionListener.onWriteFailed(streamIndex); + logger.debug("Write failed for intercepted stream.", e); + } + + logger.debug("Finished processing message queue for stream #{}", + streamIndex); + } +} +