Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -313,18 +313,18 @@ private class StdChannelListener
final Object requestId;

StdChannelListener(Object requestId) {
this.startTime = System.currentTimeMillis();
this.startTime = System.nanoTime();
this.requestId = requestId;
}

@Override
public void operationComplete(Future<? super Void> future) throws Exception {
if (future.isSuccess()) {
if (logger.isTraceEnabled()) {
long timeTaken = System.currentTimeMillis() - startTime;
logger.trace("Sending request {} to {} took {} ms", requestId,
// if (logger.isTraceEnabled()) {
long timeTaken = System.nanoTime() - startTime;
logger.info("Sending request {} to {} took {} ns", requestId,
getRemoteAddress(channel), timeTaken);
}
// }
} else {
String errorMsg = String.format("Failed to send RPC %s to %s: %s", requestId,
getRemoteAddress(channel), future.cause());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ private TransportClient createClient(InetSocketAddress address)
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
logger.info("Initializing channel");
TransportChannelHandler clientHandler = context.initializePipeline(ch);
clientRef.set(clientHandler.getClient());
channelRef.set(ch);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.apache.spark.network.shuffle;

import org.apache.spark.network.client.RpcResponseCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -39,21 +40,27 @@ public String toString() {
private final File file;
private final FileType fileType;
private WritableByteChannel fileOutputChannel = null;
private long startTime;
private final RpcResponseCallback callback;

public FileWriterStreamCallback(
String appId,
int shuffleId,
int mapId,
File file,
FileWriterStreamCallback.FileType fileType) {
String appId,
int shuffleId,
int mapId,
File file,
FileWriterStreamCallback.FileType fileType,
RpcResponseCallback callback) {
this.appId = appId;
this.shuffleId = shuffleId;
this.mapId = mapId;
this.file = file;
this.fileType = fileType;
this.startTime = 0;
this.callback = callback;
}

public void open() {
this.startTime = System.nanoTime();
logger.info(
"Opening {} for remote writing. File type: {}", file.getAbsolutePath(), fileType);
if (fileOutputChannel != null) {
Expand Down Expand Up @@ -118,9 +125,19 @@ public void onData(String streamId, ByteBuffer buf) throws IOException {

@Override
public void onComplete(String streamId) throws IOException {
if (startTime != 0) {
logger.info("METRICS: Streaming file " + file.getName() + " took " + (System.nanoTime() - startTime));
}
logger.info(
"Finished writing {}. File type: {}", file.getAbsolutePath(), fileType);
fileOutputChannel.close();
if (startTime != 0) {
logger.info("METRICS: Writing file " + file.getName() + " took " + (System.nanoTime() - startTime));
}
callback.onSuccess(ByteBuffer.allocate(0));
if (startTime != 0) {
logger.info("METRICS: Sending write file callback for " + file.getName() + " took " + (System.nanoTime() - startTime));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package org.apache.spark.shuffle.api;

import org.apache.spark.shuffle.ShuffleWriteMetricsReporter;

public interface ShuffleWriteSupport {

ShuffleMapOutputWriter newMapOutputWriter(String appId, int shuffleId, int mapId);
ShuffleMapOutputWriter newMapOutputWriter(String appId, int shuffleId, int mapId,
ShuffleWriteMetricsReporter writeMetrics);
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.apache.spark.shuffle.external;

import org.apache.spark.MapOutputTracker;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkEnv;
import org.apache.spark.network.TransportContext;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package org.apache.spark.shuffle.external;

import org.apache.hadoop.mapreduce.task.reduce.Shuffle;
import org.apache.spark.network.protocol.Encoders;
import org.apache.spark.storage.ShuffleLocation;

import java.io.*;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;

public class ExternalShuffleLocation implements ShuffleLocation {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import org.apache.spark.network.client.TransportClientFactory;
import org.apache.spark.network.shuffle.protocol.RegisterShuffleIndex;
import org.apache.spark.network.shuffle.protocol.UploadShuffleIndex;
import org.apache.spark.shuffle.ShuffleWriteMetricsReporter;
import org.apache.spark.shuffle.api.ShuffleMapOutputWriter;
import org.apache.spark.shuffle.api.ShufflePartitionWriter;
import org.slf4j.Logger;
Expand All @@ -20,33 +21,39 @@ public class ExternalShuffleMapOutputWriter implements ShuffleMapOutputWriter {
private final String appId;
private final int shuffleId;
private final int mapId;
private final ShuffleWriteMetricsReporter writeMetrics;
private final TransportClient client;

public ExternalShuffleMapOutputWriter(
TransportClientFactory clientFactory,
String hostName,
int port,
String appId,
int shuffleId,
int mapId) {
int mapId,
ShuffleWriteMetricsReporter writeMetrics) {
this.clientFactory = clientFactory;
this.hostName = hostName;
this.port = port;
this.appId = appId;
this.shuffleId = shuffleId;
this.mapId = mapId;
this.writeMetrics = writeMetrics;

TransportClient client = null;
try {
final long startClientCreationTime = System.nanoTime();
client = clientFactory.createUnmanagedClient(hostName, port);
ByteBuffer registerShuffleIndex = new RegisterShuffleIndex(
appId, shuffleId, mapId).toByteBuffer();
String requestID = String.format(
"index-register-%s-%d-%d", appId, shuffleId, mapId);
client.setClientId(requestID);
logger.info("clientid: " + client.getClientId() + " " + client.isActive());
final long endClientCreationTime = System.nanoTime();
writeMetrics.incCreateClientTime(endClientCreationTime - startClientCreationTime);
client.sendRpcSync(registerShuffleIndex, 60000);
writeMetrics.incSendRegisterShuffleRequestTime(System.nanoTime() - endClientCreationTime);
} catch (Exception e) {
client.close();
logger.error("Encountered error while creating transport client", e);
throw new RuntimeException(e);
}
Expand All @@ -58,8 +65,8 @@ public ExternalShuffleMapOutputWriter(
@Override
public ShufflePartitionWriter newPartitionWriter(int partitionId) {
try {
return new ExternalShufflePartitionWriter(clientFactory,
hostName, port, appId, shuffleId, mapId, partitionId);
return new ExternalShufflePartitionWriter(client,
hostName, port, appId, shuffleId, mapId, partitionId, writeMetrics);
} catch (Exception e) {
clientFactory.close();
logger.error("Encountered error while creating transport client", e);
Expand All @@ -69,16 +76,16 @@ public ShufflePartitionWriter newPartitionWriter(int partitionId) {

@Override
public void commitAllPartitions() {
TransportClient client = null;
try {
client = clientFactory.createUnmanagedClient(hostName, port);
ByteBuffer uploadShuffleIndex = new UploadShuffleIndex(
appId, shuffleId, mapId).toByteBuffer();
String requestID = String.format(
"index-upload-%s-%d-%d", appId, shuffleId, mapId);
client.setClientId(requestID);
logger.info("clientid: " + client.getClientId() + " " + client.isActive());
final long startTime = System.nanoTime();
client.sendRpcSync(uploadShuffleIndex, 60000);
final long nanoSeconds = System.nanoTime() - startTime;
logger.info("METRICS: UploadIndexParam upload time: " + nanoSeconds);
} catch (Exception e) {
logger.error("Encountered error while creating transport client", e);
client.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import org.apache.spark.util.ByteBufferInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.compat.java8.OptionConverters;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
Expand Down Expand Up @@ -39,8 +38,10 @@ public ExternalShufflePartitionReader(

@Override
public InputStream fetchPartition(int reduceId, Optional<ShuffleLocation> shuffleLocation) {
assert shuffleLocation.isPresent() && shuffleLocation.get() instanceof ExternalShuffleLocation;
ExternalShuffleLocation externalShuffleLocation = (ExternalShuffleLocation) shuffleLocation.get();
assert shuffleLocation.isPresent() &&
shuffleLocation.get() instanceof ExternalShuffleLocation;
ExternalShuffleLocation externalShuffleLocation =
(ExternalShuffleLocation) shuffleLocation.get();
logger.info(String.format("Found external shuffle location on node: %s:%d",
externalShuffleLocation.getShuffleHostname(),
externalShuffleLocation.getShufflePort()));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,87 +1,94 @@
package org.apache.spark.shuffle.external;

import com.google.common.util.concurrent.SettableFuture;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.buffer.NioManagedBuffer;
import org.apache.spark.network.client.RpcResponseCallback;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.client.TransportClientFactory;
import org.apache.spark.network.shuffle.protocol.UploadShufflePartitionStream;
import org.apache.spark.shuffle.ShuffleWriteMetricsReporter;
import org.apache.spark.shuffle.api.CommittedPartition;
import org.apache.spark.shuffle.api.ShufflePartitionWriter;
import org.apache.spark.storage.ShuffleLocation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Optional;

public class ExternalShufflePartitionWriter implements ShufflePartitionWriter {

private static final Logger logger =
LoggerFactory.getLogger(ExternalShufflePartitionWriter.class);

private final TransportClientFactory clientFactory;
private final TransportClient client;
private final String hostName;
private final int port;
private final String appId;
private final int shuffleId;
private final int mapId;
private final int partitionId;
private final ShuffleWriteMetricsReporter writeMetrics;

private long totalLength = 0;
private final ByteArrayOutputStream partitionBuffer = new ByteArrayOutputStream();

public ExternalShufflePartitionWriter(
TransportClientFactory clientFactory,
TransportClient client,
String hostName,
int port,
String appId,
int shuffleId,
int mapId,
int partitionId) {
this.clientFactory = clientFactory;
int partitionId,
ShuffleWriteMetricsReporter writeMetrics) {
this.client = client;
this.hostName = hostName;
this.port = port;
this.appId = appId;
this.shuffleId = shuffleId;
this.mapId = mapId;
this.partitionId = partitionId;
this.writeMetrics = writeMetrics;
}

@Override
public OutputStream openPartitionStream() { return partitionBuffer; }

@Override
public CommittedPartition commitPartition() {
RpcResponseCallback callback = new RpcResponseCallback() {
@Override
public void onSuccess(ByteBuffer response) {
logger.info("Successfully uploaded partition");
}

@Override
public void onFailure(Throwable e) {
logger.error("Encountered an error uploading partition", e);
}
};
TransportClient client = null;
try {
byte[] buf = partitionBuffer.toByteArray();
int size = buf.length;
ByteBuffer streamHeader = new UploadShufflePartitionStream(appId, shuffleId, mapId,
partitionId, size).toByteBuffer();
ManagedBuffer managedBuffer = new NioManagedBuffer(ByteBuffer.wrap(buf));
client = clientFactory.createUnmanagedClient(hostName, port);
client.setClientId(String.format("data-%s-%d-%d-%d",
appId, shuffleId, mapId, partitionId));
logger.info("clientid: " + client.getClientId() + " " + client.isActive());
logger.info("THE BUFFER HASH CODE IS: " + Arrays.hashCode(buf));

SettableFuture future = SettableFuture.create();
final long startTime = System.nanoTime();
RpcResponseCallback callback = new RpcResponseCallback() {
@Override
public void onSuccess(ByteBuffer response) {
logger.info("Successfully uploaded partition: " + System.nanoTime());
logger.info("StreamFileWriteTime: " + (System.nanoTime() - startTime));
writeMetrics.incStreamFileWriteTime(System.nanoTime() - startTime);
future.set(null);
}

@Override
public void onFailure(Throwable e) {
logger.error("Encountered an error uploading partition", e);
}
};
client.uploadStream(new NioManagedBuffer(streamHeader), managedBuffer, callback);
final long nanoSeconds = System.nanoTime() - startTime;
logger.info("METRICS: UploadStream upload time: " + nanoSeconds);
totalLength += size;
logger.info("Partition Length: " + totalLength);
logger.info("Size: " + size);
future.get();
} catch (Exception e) {
if (client != null) {
client.close();
Expand All @@ -91,7 +98,8 @@ public void onFailure(Throwable e) {
} finally {
logger.info("Successfully sent partition to ESS");
}
return new ExternalCommittedPartition(totalLength, new ExternalShuffleLocation(hostName, port));
return new ExternalCommittedPartition(totalLength,
new ExternalShuffleLocation(hostName, port));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.apache.spark.shuffle.external;

import com.google.common.collect.Lists;
import org.apache.spark.MapOutputTracker;
import org.apache.spark.network.TransportContext;
import org.apache.spark.network.client.TransportClientBootstrap;
import org.apache.spark.network.client.TransportClientFactory;
Expand All @@ -10,13 +9,10 @@
import org.apache.spark.network.util.TransportConf;
import org.apache.spark.shuffle.api.ShufflePartitionReader;
import org.apache.spark.shuffle.api.ShuffleReadSupport;
import org.apache.spark.storage.ShuffleLocation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.compat.java8.OptionConverters;

import java.util.List;
import java.util.Optional;

public class ExternalShuffleReadSupport implements ShuffleReadSupport {

Expand Down
Loading