From 0561ed7b4282c0fa686f604822a5f7691d43373e Mon Sep 17 00:00:00 2001 From: Yifei Huang Date: Fri, 1 Feb 2019 11:53:53 -0800 Subject: [PATCH 1/7] adding metrics doesn't work --- .../ExternalShuffleMapOutputWriter.java | 3 + .../ExternalShufflePartitionWriter.java | 4 + .../sort/BypassMergeSortShuffleWriter.java | 3 + .../spark/executor/ShuffleWriteMetrics.scala | 14 ++++ .../org/apache/spark/shuffle/metrics.scala | 2 + .../examples/GroupByShufflePerfTest.scala | 81 +++++++++++++++++++ .../spark/examples/GroupByShuffleTest.scala | 1 + .../ShuffleMetricsOutputSparkListener.scala | 58 +++++++++++++ .../KubernetesExternalShuffleService.scala | 6 +- 9 files changed, 170 insertions(+), 2 deletions(-) create mode 100644 examples/src/main/scala/org/apache/spark/examples/GroupByShufflePerfTest.scala create mode 100644 examples/src/main/scala/org/apache/spark/examples/ShuffleMetricsOutputSparkListener.scala diff --git a/core/src/main/java/org/apache/spark/shuffle/external/ExternalShuffleMapOutputWriter.java b/core/src/main/java/org/apache/spark/shuffle/external/ExternalShuffleMapOutputWriter.java index 8866d14feca53..d75d1ccbf59c2 100644 --- a/core/src/main/java/org/apache/spark/shuffle/external/ExternalShuffleMapOutputWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/external/ExternalShuffleMapOutputWriter.java @@ -78,7 +78,10 @@ public void commitAllPartitions() { "index-upload-%s-%d-%d", appId, shuffleId, mapId); client.setClientId(requestID); logger.info("clientid: " + client.getClientId() + " " + client.isActive()); + final long startTime = System.nanoTime(); client.sendRpcSync(uploadShuffleIndex, 60000); + final long nanoSeconds = System.nanoTime() - startTime; + logger.info("METRICS: UploadIndexParam upload time: " + nanoSeconds); } catch (Exception e) { logger.error("Encountered error while creating transport client", e); client.close(); diff --git a/core/src/main/java/org/apache/spark/shuffle/external/ExternalShufflePartitionWriter.java b/core/src/main/java/org/apache/spark/shuffle/external/ExternalShufflePartitionWriter.java index 89bfe4407e5ac..21373fd764d98 100644 --- a/core/src/main/java/org/apache/spark/shuffle/external/ExternalShufflePartitionWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/external/ExternalShufflePartitionWriter.java @@ -78,7 +78,11 @@ public void onFailure(Throwable e) { appId, shuffleId, mapId, partitionId)); logger.info("clientid: " + client.getClientId() + " " + client.isActive()); logger.info("THE BUFFER HASH CODE IS: " + Arrays.hashCode(buf)); + + final long startTime = System.nanoTime(); client.uploadStream(new NioManagedBuffer(streamHeader), managedBuffer, callback); + final long nanoSeconds = System.nanoTime() - startTime; + logger.info("METRICS: UploadStream upload time: " + nanoSeconds); totalLength += size; logger.info("Partition Length: " + totalLength); logger.info("Size: " + size); diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index 26b55aa70387c..9662738b55202 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -254,7 +254,10 @@ private CommittedPartition[] combineAndWritePartitionsUsingPluggableWriter() thr try (OutputStream out = writer.openPartitionStream()) { Utils.copyStream(in, out, false, false); } + final long fileWriteStartTime = System.nanoTime(); partitions[i] = writer.commitPartition(); + writeMetrics.incNumFilesWritten(); + writeMetrics.incFileWriteTime(System.nanoTime() - fileWriteStartTime); copyThrewException = false; } catch (Exception e) { try { diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala index d0b0e7da079c9..35007d8161fa0 100644 --- a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala @@ -32,6 +32,8 @@ class ShuffleWriteMetrics private[spark] () extends ShuffleWriteMetricsReporter private[executor] val _bytesWritten = new LongAccumulator private[executor] val _recordsWritten = new LongAccumulator private[executor] val _writeTime = new LongAccumulator + private[executor] val _fileWriteTime = new LongAccumulator + private[executor] val _numFilesWritten = new LongAccumulator /** * Number of bytes written for the shuffle by this task. @@ -48,9 +50,21 @@ class ShuffleWriteMetrics private[spark] () extends ShuffleWriteMetricsReporter */ def writeTime: Long = _writeTime.sum + /** + * Time it takes to write a single partition block file. + */ + def fileWriteTime: Long = _fileWriteTime.sum + + /** + * Number of files written. + */ + def numFilesWritten: Long = _numFilesWritten.sum + private[spark] override def incBytesWritten(v: Long): Unit = _bytesWritten.add(v) private[spark] override def incRecordsWritten(v: Long): Unit = _recordsWritten.add(v) private[spark] override def incWriteTime(v: Long): Unit = _writeTime.add(v) + private[spark] override def incFileWriteTime(v: Long): Unit = _fileWriteTime.add(v) + private[spark] override def incNumFilesWritten(): Unit = _numFilesWritten.add(1) private[spark] override def decBytesWritten(v: Long): Unit = { _bytesWritten.setValue(bytesWritten - v) } diff --git a/core/src/main/scala/org/apache/spark/shuffle/metrics.scala b/core/src/main/scala/org/apache/spark/shuffle/metrics.scala index 33be677bc90cb..5f376a75bae00 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/metrics.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/metrics.scala @@ -49,4 +49,6 @@ private[spark] trait ShuffleWriteMetricsReporter { private[spark] def incWriteTime(v: Long): Unit private[spark] def decBytesWritten(v: Long): Unit private[spark] def decRecordsWritten(v: Long): Unit + private[spark] def incFileWriteTime(v: Long): Unit + private[spark] def incNumFilesWritten(): Unit } diff --git a/examples/src/main/scala/org/apache/spark/examples/GroupByShufflePerfTest.scala b/examples/src/main/scala/org/apache/spark/examples/GroupByShufflePerfTest.scala new file mode 100644 index 0000000000000..104ef0cd52081 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/GroupByShufflePerfTest.scala @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// scalastyle:off println +package org.apache.spark.examples + +import org.apache.spark.sql.SparkSession + +/** + * Usage: GroupByShufflePerfTest + */ +object GroupByShufflePerfTest { + def main(args: Array[String]) { + + var parallelization = 1 + + if (args.length != 0) { + parallelization = args(0).toInt + } + + println("Running GroupByShufflePerfTest") + + val spark = SparkSession + .builder + .appName("GroupByShuffle Test") + .getOrCreate() + + spark.sparkContext.addSparkListener(new ShuffleMetricsOutputSparkListener()) + +// val words = Array("one", "two", "two", "three", "three", "three") +// val wordPairsRDD = spark.sparkContext.parallelize(words).map(word => (word, 1)) +// +// val wordCountsWithGroup = wordPairsRDD +// .groupByKey() +// .map(t => (t._1, t._2.sum)) +// .collect() +// +// println(wordCountsWithGroup.mkString(",")) + val words = createArray(10000) + + val wordPairsRDD2 = spark.sparkContext + .parallelize(words, parallelization).map(word => (word, 1)) + + val wordCountsWithGroup2 = wordPairsRDD2 + .groupByKey() + .map(t => (t._1, t._2.sum)) + .collect() + + println(wordCountsWithGroup2.mkString(",")) + +// Thread.sleep(600000) + spark.stop() + } + + + def createArray(arraySize: Int) : Array[String] = { + val mapIntToWord: Map[Int, String] = + Map(0 -> "zero", 1 -> "one", 2 -> "two", 3 -> "three", 4 -> "four") + + val array = new Array[String](arraySize) + for (i <- 1 to arraySize) { + array(i - 1) = mapIntToWord.getOrElse(i-1, "else") + } + array + } +} +// scalastyle:on println diff --git a/examples/src/main/scala/org/apache/spark/examples/GroupByShuffleTest.scala b/examples/src/main/scala/org/apache/spark/examples/GroupByShuffleTest.scala index 883ac10718dfd..7a0a2088f8261 100644 --- a/examples/src/main/scala/org/apache/spark/examples/GroupByShuffleTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/GroupByShuffleTest.scala @@ -49,6 +49,7 @@ object GroupByShuffleTest { println(wordCountsWithGroup2.mkString(",")) + Thread.sleep(600000) spark.stop() } } diff --git a/examples/src/main/scala/org/apache/spark/examples/ShuffleMetricsOutputSparkListener.scala b/examples/src/main/scala/org/apache/spark/examples/ShuffleMetricsOutputSparkListener.scala new file mode 100644 index 0000000000000..86c187d5abc21 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/ShuffleMetricsOutputSparkListener.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// scalastyle:off println + +package org.apache.spark.examples + +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler.SparkListener +import org.apache.spark.scheduler.SparkListenerTaskEnd + +class ShuffleMetricsOutputSparkListener extends SparkListener with Logging { + + + override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { + val taskMetrics = taskEnd.taskMetrics + logInfo(s"TaskMetrics: " + + s"${taskEnd.stageId} " + + s"${taskMetrics.executorRunTime} " + + s"${taskMetrics.executorCpuTime} " + + s"${taskMetrics.jvmGCTime} " + + s"${taskMetrics.resultSize} " + + s"${taskMetrics.executorDeserializeTime} " + + s"${taskMetrics.executorDeserializeCpuTime} " + + s"${taskMetrics.peakExecutionMemory} ") + + val readMetrics = taskMetrics.shuffleReadMetrics + logInfo(s"ReadMetrics: " + + s"${readMetrics.totalBytesRead} " + + s"${readMetrics.recordsRead}") + + val writeMetrics = taskMetrics.shuffleWriteMetrics + logInfo(s"WriteMetrics: " + + s"${writeMetrics.writeTime} " + + s"${writeMetrics.bytesWritten} " + + s"${writeMetrics.recordsWritten}") +// s"${writeMetrics.fileWriteTime} " + +// s"${writeMetrics.numFilesWritten}") + } + +} + + +// scalastyle:on println \ No newline at end of file diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesExternalShuffleService.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesExternalShuffleService.scala index b9d69f1bc69fb..1eae663fc56ce 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesExternalShuffleService.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesExternalShuffleService.scala @@ -128,7 +128,8 @@ private[spark] class KubernetesExternalShuffleBlockHandler( out.close() callback.onSuccess(ByteBuffer.allocate(0)) } finally { - responseDelayContext.stop() + val nanoSeconds = responseDelayContext.stop() + logInfo("METRICS: UploadIndexParam processing time: " + nanoSeconds) } case OpenParam(appId, shuffleId, mapId, partitionId) => @@ -177,7 +178,8 @@ private[spark] class KubernetesExternalShuffleBlockHandler( getFileWriterStreamCallback( appId, shuffleId, mapId, "data", FileWriterStreamCallback.FileType.DATA) } finally { - responseDelayContext.stop() + val nanoSeconds = responseDelayContext.stop() + logInfo("METRICS: OpenStream processing time: " + nanoSeconds) } case _ => super.handleStream(header, client, callback) From 1f78af480542ee3c0a456512a723398c736fc1fd Mon Sep 17 00:00:00 2001 From: Yifei Huang Date: Mon, 11 Feb 2019 17:15:26 +0000 Subject: [PATCH 2/7] adding more metrics etc --- .../spark/network/client/TransportClient.java | 10 ++--- .../shuffle/FileWriterStreamCallback.java | 21 +++++++--- .../shuffle/api/ShuffleWriteSupport.java | 5 ++- .../external/ExternalShuffleDataIO.java | 1 - .../external/ExternalShuffleLocation.java | 6 +-- .../ExternalShuffleMapOutputWriter.java | 8 +++- .../ExternalShufflePartitionReader.java | 7 ++-- .../ExternalShufflePartitionWriter.java | 35 +++++++++-------- .../external/ExternalShuffleReadSupport.java | 4 -- .../external/ExternalShuffleWriteSupport.java | 6 ++- .../sort/BypassMergeSortShuffleWriter.java | 33 ++++++++++++++-- .../shuffle/sort/UnsafeShuffleWriter.java | 15 ++++---- .../apache/spark/InternalAccumulator.scala | 7 ++++ .../spark/executor/ShuffleWriteMetrics.scala | 38 +++++++++++++++++-- .../apache/spark/executor/TaskMetrics.scala | 7 ++++ .../shuffle/BlockStoreShuffleReader.scala | 2 +- .../org/apache/spark/shuffle/metrics.scala | 5 +++ .../apache/spark/storage/BlockManager.scala | 5 ++- .../util/collection/ExternalSorter.scala | 5 ++- .../sort/UnsafeShuffleWriterSuite.java | 3 +- .../apache/spark/SplitFilesShuffleIO.scala | 5 ++- .../examples/GroupByShufflePerfTest.scala | 15 ++++++-- .../ShuffleMetricsOutputSparkListener.scala | 17 +++++++-- .../KubernetesExternalShuffleService.scala | 7 ++-- 24 files changed, 196 insertions(+), 71 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java index 20d840baeaf6c..20b4db690b5df 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java +++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java @@ -313,18 +313,18 @@ private class StdChannelListener final Object requestId; StdChannelListener(Object requestId) { - this.startTime = System.currentTimeMillis(); + this.startTime = System.nanoTime(); this.requestId = requestId; } @Override public void operationComplete(Future future) throws Exception { if (future.isSuccess()) { - if (logger.isTraceEnabled()) { - long timeTaken = System.currentTimeMillis() - startTime; - logger.trace("Sending request {} to {} took {} ms", requestId, +// if (logger.isTraceEnabled()) { + long timeTaken = System.nanoTime() - startTime; + logger.info("Sending request {} to {} took {} ns", requestId, getRemoteAddress(channel), timeTaken); - } +// } } else { String errorMsg = String.format("Failed to send RPC %s to %s: %s", requestId, getRemoteAddress(channel), future.cause()); diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/FileWriterStreamCallback.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/FileWriterStreamCallback.java index 1f44ae8b3c78c..2e8f0719fdb77 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/FileWriterStreamCallback.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/FileWriterStreamCallback.java @@ -1,5 +1,6 @@ package org.apache.spark.network.shuffle; +import org.apache.spark.network.client.RpcResponseCallback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,21 +40,27 @@ public String toString() { private final File file; private final FileType fileType; private WritableByteChannel fileOutputChannel = null; + private long startTime; + private final RpcResponseCallback callback; public FileWriterStreamCallback( - String appId, - int shuffleId, - int mapId, - File file, - FileWriterStreamCallback.FileType fileType) { + String appId, + int shuffleId, + int mapId, + File file, + FileWriterStreamCallback.FileType fileType, + RpcResponseCallback callback) { this.appId = appId; this.shuffleId = shuffleId; this.mapId = mapId; this.file = file; this.fileType = fileType; + this.startTime = 0; + this.callback = callback; } public void open() { + this.startTime = System.nanoTime(); logger.info( "Opening {} for remote writing. File type: {}", file.getAbsolutePath(), fileType); if (fileOutputChannel != null) { @@ -118,9 +125,13 @@ public void onData(String streamId, ByteBuffer buf) throws IOException { @Override public void onComplete(String streamId) throws IOException { + if (startTime != 0) { + logger.info("METRICS: Writing file " + file.getName() + " took " + (System.nanoTime() - startTime)); + } logger.info( "Finished writing {}. File type: {}", file.getAbsolutePath(), fileType); fileOutputChannel.close(); + callback.onSuccess(ByteBuffer.allocate(0)); } @Override diff --git a/core/src/main/java/org/apache/spark/shuffle/api/ShuffleWriteSupport.java b/core/src/main/java/org/apache/spark/shuffle/api/ShuffleWriteSupport.java index 2f61dbaa17c69..7cf8fe2ff5f32 100644 --- a/core/src/main/java/org/apache/spark/shuffle/api/ShuffleWriteSupport.java +++ b/core/src/main/java/org/apache/spark/shuffle/api/ShuffleWriteSupport.java @@ -17,7 +17,10 @@ package org.apache.spark.shuffle.api; +import org.apache.spark.shuffle.ShuffleWriteMetricsReporter; + public interface ShuffleWriteSupport { - ShuffleMapOutputWriter newMapOutputWriter(String appId, int shuffleId, int mapId); + ShuffleMapOutputWriter newMapOutputWriter(String appId, int shuffleId, int mapId, + ShuffleWriteMetricsReporter writeMetrics); } diff --git a/core/src/main/java/org/apache/spark/shuffle/external/ExternalShuffleDataIO.java b/core/src/main/java/org/apache/spark/shuffle/external/ExternalShuffleDataIO.java index 2a0a39e4b82e4..0b9e4bcdc4533 100644 --- a/core/src/main/java/org/apache/spark/shuffle/external/ExternalShuffleDataIO.java +++ b/core/src/main/java/org/apache/spark/shuffle/external/ExternalShuffleDataIO.java @@ -1,6 +1,5 @@ package org.apache.spark.shuffle.external; -import org.apache.spark.MapOutputTracker; import org.apache.spark.SparkConf; import org.apache.spark.SparkEnv; import org.apache.spark.network.TransportContext; diff --git a/core/src/main/java/org/apache/spark/shuffle/external/ExternalShuffleLocation.java b/core/src/main/java/org/apache/spark/shuffle/external/ExternalShuffleLocation.java index 20ae8d376050c..4d26c34fe7e06 100644 --- a/core/src/main/java/org/apache/spark/shuffle/external/ExternalShuffleLocation.java +++ b/core/src/main/java/org/apache/spark/shuffle/external/ExternalShuffleLocation.java @@ -1,10 +1,10 @@ package org.apache.spark.shuffle.external; -import org.apache.hadoop.mapreduce.task.reduce.Shuffle; -import org.apache.spark.network.protocol.Encoders; import org.apache.spark.storage.ShuffleLocation; -import java.io.*; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; public class ExternalShuffleLocation implements ShuffleLocation { diff --git a/core/src/main/java/org/apache/spark/shuffle/external/ExternalShuffleMapOutputWriter.java b/core/src/main/java/org/apache/spark/shuffle/external/ExternalShuffleMapOutputWriter.java index d75d1ccbf59c2..a3c3f9351fbb6 100644 --- a/core/src/main/java/org/apache/spark/shuffle/external/ExternalShuffleMapOutputWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/external/ExternalShuffleMapOutputWriter.java @@ -4,6 +4,7 @@ import org.apache.spark.network.client.TransportClientFactory; import org.apache.spark.network.shuffle.protocol.RegisterShuffleIndex; import org.apache.spark.network.shuffle.protocol.UploadShuffleIndex; +import org.apache.spark.shuffle.ShuffleWriteMetricsReporter; import org.apache.spark.shuffle.api.ShuffleMapOutputWriter; import org.apache.spark.shuffle.api.ShufflePartitionWriter; import org.slf4j.Logger; @@ -20,6 +21,7 @@ public class ExternalShuffleMapOutputWriter implements ShuffleMapOutputWriter { private final String appId; private final int shuffleId; private final int mapId; + private final ShuffleWriteMetricsReporter writeMetrics; public ExternalShuffleMapOutputWriter( TransportClientFactory clientFactory, @@ -27,13 +29,15 @@ public ExternalShuffleMapOutputWriter( int port, String appId, int shuffleId, - int mapId) { + int mapId, + ShuffleWriteMetricsReporter writeMetrics) { this.clientFactory = clientFactory; this.hostName = hostName; this.port = port; this.appId = appId; this.shuffleId = shuffleId; this.mapId = mapId; + this.writeMetrics = writeMetrics; TransportClient client = null; try { @@ -59,7 +63,7 @@ public ExternalShuffleMapOutputWriter( public ShufflePartitionWriter newPartitionWriter(int partitionId) { try { return new ExternalShufflePartitionWriter(clientFactory, - hostName, port, appId, shuffleId, mapId, partitionId); + hostName, port, appId, shuffleId, mapId, partitionId, writeMetrics); } catch (Exception e) { clientFactory.close(); logger.error("Encountered error while creating transport client", e); diff --git a/core/src/main/java/org/apache/spark/shuffle/external/ExternalShufflePartitionReader.java b/core/src/main/java/org/apache/spark/shuffle/external/ExternalShufflePartitionReader.java index f027832639250..9e803a8f07948 100644 --- a/core/src/main/java/org/apache/spark/shuffle/external/ExternalShufflePartitionReader.java +++ b/core/src/main/java/org/apache/spark/shuffle/external/ExternalShufflePartitionReader.java @@ -8,7 +8,6 @@ import org.apache.spark.util.ByteBufferInputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.compat.java8.OptionConverters; import java.io.ByteArrayInputStream; import java.io.InputStream; @@ -39,8 +38,10 @@ public ExternalShufflePartitionReader( @Override public InputStream fetchPartition(int reduceId, Optional shuffleLocation) { - assert shuffleLocation.isPresent() && shuffleLocation.get() instanceof ExternalShuffleLocation; - ExternalShuffleLocation externalShuffleLocation = (ExternalShuffleLocation) shuffleLocation.get(); + assert shuffleLocation.isPresent() && + shuffleLocation.get() instanceof ExternalShuffleLocation; + ExternalShuffleLocation externalShuffleLocation = + (ExternalShuffleLocation) shuffleLocation.get(); logger.info(String.format("Found external shuffle location on node: %s:%d", externalShuffleLocation.getShuffleHostname(), externalShuffleLocation.getShufflePort())); diff --git a/core/src/main/java/org/apache/spark/shuffle/external/ExternalShufflePartitionWriter.java b/core/src/main/java/org/apache/spark/shuffle/external/ExternalShufflePartitionWriter.java index 21373fd764d98..c83037c54a2a6 100644 --- a/core/src/main/java/org/apache/spark/shuffle/external/ExternalShufflePartitionWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/external/ExternalShufflePartitionWriter.java @@ -6,16 +6,15 @@ import org.apache.spark.network.client.TransportClient; import org.apache.spark.network.client.TransportClientFactory; import org.apache.spark.network.shuffle.protocol.UploadShufflePartitionStream; +import org.apache.spark.shuffle.ShuffleWriteMetricsReporter; import org.apache.spark.shuffle.api.CommittedPartition; import org.apache.spark.shuffle.api.ShufflePartitionWriter; -import org.apache.spark.storage.ShuffleLocation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.*; import java.nio.ByteBuffer; import java.util.Arrays; -import java.util.Optional; public class ExternalShufflePartitionWriter implements ShufflePartitionWriter { @@ -29,6 +28,7 @@ public class ExternalShufflePartitionWriter implements ShufflePartitionWriter { private final int shuffleId; private final int mapId; private final int partitionId; + private final ShuffleWriteMetricsReporter writeMetrics; private long totalLength = 0; private final ByteArrayOutputStream partitionBuffer = new ByteArrayOutputStream(); @@ -40,7 +40,8 @@ public ExternalShufflePartitionWriter( String appId, int shuffleId, int mapId, - int partitionId) { + int partitionId, + ShuffleWriteMetricsReporter writeMetrics) { this.clientFactory = clientFactory; this.hostName = hostName; this.port = port; @@ -48,6 +49,7 @@ public ExternalShufflePartitionWriter( this.shuffleId = shuffleId; this.mapId = mapId; this.partitionId = partitionId; + this.writeMetrics = writeMetrics; } @Override @@ -55,17 +57,6 @@ public ExternalShufflePartitionWriter( @Override public CommittedPartition commitPartition() { - RpcResponseCallback callback = new RpcResponseCallback() { - @Override - public void onSuccess(ByteBuffer response) { - logger.info("Successfully uploaded partition"); - } - - @Override - public void onFailure(Throwable e) { - logger.error("Encountered an error uploading partition", e); - } - }; TransportClient client = null; try { byte[] buf = partitionBuffer.toByteArray(); @@ -80,6 +71,19 @@ public void onFailure(Throwable e) { logger.info("THE BUFFER HASH CODE IS: " + Arrays.hashCode(buf)); final long startTime = System.nanoTime(); + RpcResponseCallback callback = new RpcResponseCallback() { + @Override + public void onSuccess(ByteBuffer response) { + logger.info("Successfully uploaded partition: " + System.nanoTime()); + logger.info("StreamFileWriteTime: " + (System.nanoTime() - startTime)); + writeMetrics.incStreamFileWriteTime(System.nanoTime() - startTime); + } + + @Override + public void onFailure(Throwable e) { + logger.error("Encountered an error uploading partition", e); + } + }; client.uploadStream(new NioManagedBuffer(streamHeader), managedBuffer, callback); final long nanoSeconds = System.nanoTime() - startTime; logger.info("METRICS: UploadStream upload time: " + nanoSeconds); @@ -95,7 +99,8 @@ public void onFailure(Throwable e) { } finally { logger.info("Successfully sent partition to ESS"); } - return new ExternalCommittedPartition(totalLength, new ExternalShuffleLocation(hostName, port)); + return new ExternalCommittedPartition(totalLength, + new ExternalShuffleLocation(hostName, port)); } @Override diff --git a/core/src/main/java/org/apache/spark/shuffle/external/ExternalShuffleReadSupport.java b/core/src/main/java/org/apache/spark/shuffle/external/ExternalShuffleReadSupport.java index a671b80904ed0..0bde10a777668 100644 --- a/core/src/main/java/org/apache/spark/shuffle/external/ExternalShuffleReadSupport.java +++ b/core/src/main/java/org/apache/spark/shuffle/external/ExternalShuffleReadSupport.java @@ -1,7 +1,6 @@ package org.apache.spark.shuffle.external; import com.google.common.collect.Lists; -import org.apache.spark.MapOutputTracker; import org.apache.spark.network.TransportContext; import org.apache.spark.network.client.TransportClientBootstrap; import org.apache.spark.network.client.TransportClientFactory; @@ -10,13 +9,10 @@ import org.apache.spark.network.util.TransportConf; import org.apache.spark.shuffle.api.ShufflePartitionReader; import org.apache.spark.shuffle.api.ShuffleReadSupport; -import org.apache.spark.storage.ShuffleLocation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.compat.java8.OptionConverters; import java.util.List; -import java.util.Optional; public class ExternalShuffleReadSupport implements ShuffleReadSupport { diff --git a/core/src/main/java/org/apache/spark/shuffle/external/ExternalShuffleWriteSupport.java b/core/src/main/java/org/apache/spark/shuffle/external/ExternalShuffleWriteSupport.java index 413c2fd63f20a..37725b3dec172 100644 --- a/core/src/main/java/org/apache/spark/shuffle/external/ExternalShuffleWriteSupport.java +++ b/core/src/main/java/org/apache/spark/shuffle/external/ExternalShuffleWriteSupport.java @@ -7,6 +7,7 @@ import org.apache.spark.network.crypto.AuthClientBootstrap; import org.apache.spark.network.sasl.SecretKeyHolder; import org.apache.spark.network.util.TransportConf; +import org.apache.spark.shuffle.ShuffleWriteMetricsReporter; import org.apache.spark.shuffle.api.ShuffleMapOutputWriter; import org.apache.spark.shuffle.api.ShuffleWriteSupport; import org.slf4j.Logger; @@ -41,7 +42,8 @@ public ExternalShuffleWriteSupport( } @Override - public ShuffleMapOutputWriter newMapOutputWriter(String appId, int shuffleId, int mapId) { + public ShuffleMapOutputWriter newMapOutputWriter(String appId, int shuffleId, int mapId, + ShuffleWriteMetricsReporter writeMetrics) { List bootstraps = Lists.newArrayList(); if (authEnabled) { bootstraps.add(new AuthClientBootstrap(conf, appId, secretKeyHolder)); @@ -49,6 +51,6 @@ public ShuffleMapOutputWriter newMapOutputWriter(String appId, int shuffleId, in TransportClientFactory clientFactory = context.createClientFactory(bootstraps); logger.info("Clientfactory: " + clientFactory.toString()); return new ExternalShuffleMapOutputWriter( - clientFactory, hostname, port, appId, shuffleId, mapId); + clientFactory, hostname, port, appId, shuffleId, mapId, writeMetrics); } } diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index 9662738b55202..99af53b1c0850 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -46,7 +46,6 @@ import javax.annotation.Nullable; import java.io.*; import java.util.Arrays; -import java.util.stream.Collectors; /** * This class implements sort-based shuffle's hash-style shuffle fallback path. This write path @@ -171,10 +170,13 @@ public void write(Iterator> records) throws IOException { try { committedPartitions = combineAndWritePartitions(tmp); logger.info("Successfully wrote partitions without shuffle"); + final long fileWriteStartTime = System.nanoTime(); shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, Arrays.stream(committedPartitions).mapToLong(p -> p.length()).toArray(), tmp); + writeMetrics.incIndexFileWriteTime(System.nanoTime() - fileWriteStartTime); + writeMetrics.incWriteTime(System.nanoTime() - fileWriteStartTime); } finally { if (tmp != null && tmp.exists() && !tmp.delete()) { logger.error("Error while deleting temp file {}", tmp.getAbsolutePath()); @@ -212,8 +214,16 @@ private CommittedPartition[] combineAndWritePartitions(File outputFile) throws I final FileInputStream in = new FileInputStream(file); boolean copyThrewException = true; try { + final long streamCopyStartTime = System.nanoTime(); partitions[i] = - new LocalCommittedPartition(Utils.copyStream(in, out, false, transferToEnabled)); + new LocalCommittedPartition( + Utils.copyStream(in, out, false, transferToEnabled)); + if (transferToEnabled) { + logger.info("TransferTo is enabled"); + } else { + logger.info("TransferTo is not enabled"); + } + writeMetrics.incStreamCopyWriteTime(System.nanoTime() - streamCopyStartTime); copyThrewException = false; } finally { Closeables.close(in, copyThrewException); @@ -241,23 +251,36 @@ private CommittedPartition[] combineAndWritePartitionsUsingPluggableWriter() thr assert(pluggableWriteSupport != null); final long writeStartTime = System.nanoTime(); + final long mapOutputWriterCreateStartTime = System.nanoTime(); + logger.info("TIMESTAMP: writeStartTime: " + writeStartTime); ShuffleMapOutputWriter mapOutputWriter = pluggableWriteSupport.newMapOutputWriter( - appId, shuffleId, mapId); + appId, shuffleId, mapId, writeMetrics); + writeMetrics.incCreateMapOutputWriterTime(System.nanoTime() - mapOutputWriterCreateStartTime); try { for (int i = 0; i < numPartitions; i++) { final File file = partitionWriterSegments[i].file(); if (file.exists()) { final FileInputStream in = new FileInputStream(file); boolean copyThrewException = true; + final long createPartitionWriterStartTime = System.nanoTime(); + logger.info("TIMESTAMP: createPartitionWriterStartTime: " + + createPartitionWriterStartTime); ShufflePartitionWriter writer = mapOutputWriter.newPartitionWriter(i); + writeMetrics.incCreatePartitionWriterTime( + System.nanoTime() - createPartitionWriterStartTime); try { try (OutputStream out = writer.openPartitionStream()) { + final long streamCopyStartTime = System.nanoTime(); + logger.info("TIMESTAMP: streamCopyStartTime: " + streamCopyStartTime); Utils.copyStream(in, out, false, false); + writeMetrics.incStreamCopyWriteTime(System.nanoTime() - streamCopyStartTime); } final long fileWriteStartTime = System.nanoTime(); + logger.info("TIMESTAMP: fileWriteStartTime: " + fileWriteStartTime); partitions[i] = writer.commitPartition(); writeMetrics.incNumFilesWritten(); writeMetrics.incFileWriteTime(System.nanoTime() - fileWriteStartTime); + logger.info("TIMESTAMP: sentFileWriterRequestTime: " + System.nanoTime()); copyThrewException = false; } catch (Exception e) { try { @@ -273,7 +296,10 @@ private CommittedPartition[] combineAndWritePartitionsUsingPluggableWriter() thr } } } + final long fileWriteStartTime = System.nanoTime(); + logger.info("TIMESTAMP: indexFileWriteStartTime: " + fileWriteStartTime); mapOutputWriter.commitAllPartitions(); + writeMetrics.incIndexFileWriteTime(System.nanoTime() - fileWriteStartTime); } catch (Exception e) { try { mapOutputWriter.abort(e); @@ -282,6 +308,7 @@ private CommittedPartition[] combineAndWritePartitionsUsingPluggableWriter() thr } throw e; } finally { + logger.info("TIMESTAMP: finishedFileWriteTime: " + System.nanoTime()); writeMetrics.incWriteTime(System.nanoTime() - writeStartTime); } partitionWriters = null; diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index ef086e21b04d1..7550ddf657787 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -23,10 +23,8 @@ import java.util.Arrays; import java.util.Iterator; import java.util.stream.Collectors; -import java.util.stream.Stream; import org.apache.spark.shuffle.api.CommittedPartition; -import org.apache.spark.storage.ShuffleLocation; import scala.Option; import scala.Product2; import scala.collection.JavaConverters; @@ -339,14 +337,17 @@ private CommittedPartition[] mergeSpills(SpillInfo[] spills, File outputFile) th // that doesn't need to interpret the spilled bytes. if (transferToEnabled && !encryptionEnabled) { logger.debug("Using transferTo-based fast merge"); - committedPartitions = toLocalCommittedPartition(mergeSpillsWithTransferTo(spills, outputFile)); + committedPartitions = toLocalCommittedPartition( + mergeSpillsWithTransferTo(spills, outputFile)); } else { logger.debug("Using fileStream-based fast merge"); - committedPartitions = toLocalCommittedPartition(mergeSpillsWithFileStream(spills, outputFile, null)); + committedPartitions = toLocalCommittedPartition( + mergeSpillsWithFileStream(spills, outputFile, null)); } } else { logger.debug("Using slow merge"); - committedPartitions = toLocalCommittedPartition(mergeSpillsWithFileStream(spills, outputFile, compressionCodec)); + committedPartitions = toLocalCommittedPartition( + mergeSpillsWithFileStream(spills, outputFile, compressionCodec)); } // When closing an UnsafeShuffleExternalSorter that has already spilled once but also has // in-memory records, we write out the in-memory records to a file but do not count that @@ -537,7 +538,7 @@ private CommittedPartition[] mergeSpillsWithPluggableWriter( boolean threwException = true; ShuffleMapOutputWriter mapOutputWriter = pluggableWriteSupport.newMapOutputWriter( - sparkConf.getAppId(), shuffleId, mapId); + sparkConf.getAppId(), shuffleId, mapId, writeMetrics); try { for (int i = 0; i < spills.length; i++) { spillInputStreams[i] = new NioBufferedFileInputStream( @@ -607,7 +608,7 @@ private CommittedPartition[] writeSingleSpillFileUsingPluggableWriter( spillInfo.file, inputBufferSizeInBytes); ShuffleMapOutputWriter mapOutputWriter = pluggableWriteSupport.newMapOutputWriter( - sparkConf.getAppId(), shuffleId, mapId); + sparkConf.getAppId(), shuffleId, mapId, writeMetrics); try { for (int partition = 0; partition < numPartitions; partition++) { ShufflePartitionWriter writer = mapOutputWriter.newPartitionWriter(partition); diff --git a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala index 18b10d23da94c..bfd3f503e193f 100644 --- a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala +++ b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala @@ -61,6 +61,13 @@ private[spark] object InternalAccumulator { val BYTES_WRITTEN = SHUFFLE_WRITE_METRICS_PREFIX + "bytesWritten" val RECORDS_WRITTEN = SHUFFLE_WRITE_METRICS_PREFIX + "recordsWritten" val WRITE_TIME = SHUFFLE_WRITE_METRICS_PREFIX + "writeTime" + val FILE_WRITE_TIME = SHUFFLE_WRITE_METRICS_PREFIX + "fileWriteTime" + val NUM_FILES_WRITTEN = SHUFFLE_WRITE_METRICS_PREFIX + "numFilesWritten" + val INDEX_FILE_WRITE_TIME = SHUFFLE_WRITE_METRICS_PREFIX + "indexFileWriteTime" + val STREAM_COPY_WRITE_TIME = SHUFFLE_WRITE_METRICS_PREFIX + "streamCopyWriteTime" + val CREATE_PARTITION_WRITER_TIME = SHUFFLE_WRITE_METRICS_PREFIX + "createPartitionWriterTime" + val CREATE_MAP_OUTPUT_WRITER_TIME = SHUFFLE_WRITE_METRICS_PREFIX + "createMapOutputWriterTime" + val STREAM_FILE_WRITE_TIME = SHUFFLE_WRITE_METRICS_PREFIX + "streamFileWriteTime" } // Names of output metrics diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala index 35007d8161fa0..fe1d614bb8de6 100644 --- a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala @@ -34,6 +34,11 @@ class ShuffleWriteMetrics private[spark] () extends ShuffleWriteMetricsReporter private[executor] val _writeTime = new LongAccumulator private[executor] val _fileWriteTime = new LongAccumulator private[executor] val _numFilesWritten = new LongAccumulator + private[executor] val _indexFileWriteTime = new LongAccumulator + private[executor] val _streamCopyWriteTime = new LongAccumulator + private[executor] val _createPartitionWriterTime = new LongAccumulator + private[executor] val _createMapOutputWriterTime = new LongAccumulator + private[executor] val _streamFileWriteTime = new LongAccumulator /** * Number of bytes written for the shuffle by this task. @@ -51,24 +56,49 @@ class ShuffleWriteMetrics private[spark] () extends ShuffleWriteMetricsReporter def writeTime: Long = _writeTime.sum /** - * Time it takes to write a single partition block file. - */ + * Time it takes to write the file partition files. + */ def fileWriteTime: Long = _fileWriteTime.sum /** - * Number of files written. - */ + * Number of files written. + */ def numFilesWritten: Long = _numFilesWritten.sum + /** + * Time it takes to write the index file. + */ + def indexFileWriteTime: Long = _indexFileWriteTime.sum + + /** + * Time it takes to stream data. + */ + def streamCopyWriteTime: Long = _streamCopyWriteTime.sum + + def createPartitionWriterTime: Long = _createPartitionWriterTime.sum + + def createMapOutputWriterTime: Long = _createMapOutputWriterTime.sum + + def streamFileWriteTime: Long = _streamFileWriteTime.sum + private[spark] override def incBytesWritten(v: Long): Unit = _bytesWritten.add(v) private[spark] override def incRecordsWritten(v: Long): Unit = _recordsWritten.add(v) private[spark] override def incWriteTime(v: Long): Unit = _writeTime.add(v) private[spark] override def incFileWriteTime(v: Long): Unit = _fileWriteTime.add(v) private[spark] override def incNumFilesWritten(): Unit = _numFilesWritten.add(1) + private[spark] override def incIndexFileWriteTime(v: Long): Unit = _indexFileWriteTime.add(v) + private[spark] override def incStreamCopyWriteTime(v: Long): Unit = _streamCopyWriteTime.add(v) + override private[spark] def incCreatePartitionWriterTime(v: Long): Unit = + _createPartitionWriterTime.add(v) private[spark] override def decBytesWritten(v: Long): Unit = { _bytesWritten.setValue(bytesWritten - v) } private[spark] override def decRecordsWritten(v: Long): Unit = { _recordsWritten.setValue(recordsWritten - v) } + + override private[spark] def incCreateMapOutputWriterTime(v: Long): Unit = + _createMapOutputWriterTime.add(v) + + override private[spark] def incStreamFileWriteTime(v: Long): Unit = _streamFileWriteTime.add(v) } diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 85b2745a2aec4..8a2ae94f3d4a8 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -228,6 +228,13 @@ class TaskMetrics private[spark] () extends Serializable { shuffleWrite.BYTES_WRITTEN -> shuffleWriteMetrics._bytesWritten, shuffleWrite.RECORDS_WRITTEN -> shuffleWriteMetrics._recordsWritten, shuffleWrite.WRITE_TIME -> shuffleWriteMetrics._writeTime, + shuffleWrite.FILE_WRITE_TIME -> shuffleWriteMetrics._fileWriteTime, + shuffleWrite.NUM_FILES_WRITTEN -> shuffleWriteMetrics._numFilesWritten, + shuffleWrite.INDEX_FILE_WRITE_TIME -> shuffleWriteMetrics._indexFileWriteTime, + shuffleWrite.STREAM_COPY_WRITE_TIME -> shuffleWriteMetrics._streamCopyWriteTime, + shuffleWrite.CREATE_PARTITION_WRITER_TIME -> shuffleWriteMetrics._createPartitionWriterTime, + shuffleWrite.CREATE_MAP_OUTPUT_WRITER_TIME -> shuffleWriteMetrics._createMapOutputWriterTime, + shuffleWrite.STREAM_FILE_WRITE_TIME -> shuffleWriteMetrics._streamFileWriteTime, input.BYTES_READ -> inputMetrics._bytesRead, input.RECORDS_READ -> inputMetrics._recordsRead, output.BYTES_WRITTEN -> outputMetrics._bytesWritten, diff --git a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala index 70c76d5948153..caeecedc5d36e 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala @@ -20,7 +20,7 @@ package org.apache.spark.shuffle import scala.compat.java8.OptionConverters import org.apache.spark._ -import org.apache.spark.internal.{Logging, config} +import org.apache.spark.internal.{config, Logging} import org.apache.spark.serializer.SerializerManager import org.apache.spark.shuffle.api.ShuffleReadSupport import org.apache.spark.storage._ diff --git a/core/src/main/scala/org/apache/spark/shuffle/metrics.scala b/core/src/main/scala/org/apache/spark/shuffle/metrics.scala index 5f376a75bae00..3d1955adca683 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/metrics.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/metrics.scala @@ -50,5 +50,10 @@ private[spark] trait ShuffleWriteMetricsReporter { private[spark] def decBytesWritten(v: Long): Unit private[spark] def decRecordsWritten(v: Long): Unit private[spark] def incFileWriteTime(v: Long): Unit + private[spark] def incIndexFileWriteTime(v: Long): Unit + private[spark] def incStreamCopyWriteTime(v: Long): Unit private[spark] def incNumFilesWritten(): Unit + private[spark] def incCreatePartitionWriterTime(v: Long): Unit + private[spark] def incCreateMapOutputWriterTime(v: Long): Unit + private[spark] def incStreamFileWriteTime(v: Long): Unit } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 1575b076d3faf..fb1ed02c857a9 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -18,7 +18,7 @@ package org.apache.spark.storage import java.io._ -import java.lang.ref.{WeakReference, ReferenceQueue => JReferenceQueue} +import java.lang.ref.{ReferenceQueue => JReferenceQueue, WeakReference} import java.nio.ByteBuffer import java.nio.channels.Channels import java.util.Collections @@ -31,11 +31,12 @@ import scala.concurrent.duration._ import scala.reflect.ClassTag import scala.util.Random import scala.util.control.NonFatal + import com.codahale.metrics.{MetricRegistry, MetricSet} import org.apache.spark._ import org.apache.spark.executor.DataReadMethod -import org.apache.spark.internal.{Logging, config} +import org.apache.spark.internal.{config, Logging} import org.apache.spark.memory.{MemoryManager, MemoryMode} import org.apache.spark.metrics.source.Source import org.apache.spark.network._ diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 69077c644dc78..23250b1054f37 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -24,13 +24,13 @@ import com.google.common.io.ByteStreams import scala.collection.mutable import scala.collection.mutable.ArrayBuffer +import org.apache.spark.{util, _} import org.apache.spark.executor.ShuffleWriteMetrics import org.apache.spark.internal.Logging import org.apache.spark.serializer._ import org.apache.spark.shuffle.api.{CommittedPartition, ShuffleWriteSupport} import org.apache.spark.shuffle.sort.LocalCommittedPartition import org.apache.spark.storage.{BlockId, DiskBlockObjectWriter, PairsWriter, ShuffleLocation, ShufflePartitionObjectWriter} -import org.apache.spark.{util, _} /** * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner @@ -731,7 +731,8 @@ private[spark] class ExternalSorter[K, V, C]( // Track location of each range in the output file val committedPartitions = new Array[CommittedPartition](numPartitions) - val mapOutputWriter = writeSupport.newMapOutputWriter(conf.getAppId, shuffleId, mapId) + val mapOutputWriter = writeSupport.newMapOutputWriter(conf.getAppId, + shuffleId, mapId, context.taskMetrics().shuffleWriteMetrics) val writer = new ShufflePartitionObjectWriter( Math.min(serializerBatchSize, Integer.MAX_VALUE).toInt, serInstance, diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java index 93ab301a4cb9c..15f73169fec45 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java @@ -23,6 +23,7 @@ import java.nio.file.StandardOpenOption; import java.util.*; +import org.apache.spark.shuffle.ShuffleWriteMetricsReporter; import org.apache.spark.shuffle.api.CommittedPartition; import scala.Option; import scala.Product2; @@ -646,7 +647,7 @@ private final class TestShuffleWriteSupport implements ShuffleWriteSupport { @Override public ShuffleMapOutputWriter newMapOutputWriter( - String appId, int shuffleId, int mapId) { + String appId, int shuffleId, int mapId, ShuffleWriteMetricsReporter metrics) { try { if (!mergedOutputFile.exists() && !mergedOutputFile.createNewFile()) { throw new IllegalStateException( diff --git a/core/src/test/scala/org/apache/spark/SplitFilesShuffleIO.scala b/core/src/test/scala/org/apache/spark/SplitFilesShuffleIO.scala index 579fc9a45ba9b..8792461434655 100644 --- a/core/src/test/scala/org/apache/spark/SplitFilesShuffleIO.scala +++ b/core/src/test/scala/org/apache/spark/SplitFilesShuffleIO.scala @@ -20,8 +20,10 @@ package org.apache.spark import java.io._ import java.nio.file.Paths import java.util.Optional + import javax.ws.rs.core.UriBuilder +import org.apache.spark.shuffle.ShuffleWriteMetricsReporter import org.apache.spark.shuffle.api._ import org.apache.spark.storage.ShuffleLocation import org.apache.spark.util.Utils @@ -39,7 +41,8 @@ class SplitFilesShuffleIO(conf: SparkConf) extends ShuffleDataIO { } override def writeSupport(): ShuffleWriteSupport = { - (appId: String, shuffleId: Int, mapId: Int) => new ShuffleMapOutputWriter { + (appId: String, shuffleId: Int, mapId: Int, + metrics: ShuffleWriteMetricsReporter) => new ShuffleMapOutputWriter { override def newPartitionWriter(partitionId: Int): ShufflePartitionWriter = { new ShufflePartitionWriter { override def openPartitionStream(): OutputStream = { diff --git a/examples/src/main/scala/org/apache/spark/examples/GroupByShufflePerfTest.scala b/examples/src/main/scala/org/apache/spark/examples/GroupByShufflePerfTest.scala index 104ef0cd52081..425e2b054380c 100644 --- a/examples/src/main/scala/org/apache/spark/examples/GroupByShufflePerfTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/GroupByShufflePerfTest.scala @@ -18,14 +18,23 @@ // scalastyle:off println package org.apache.spark.examples +import java.util.logging.{Level, LogManager} + +import org.slf4j.{Logger, LoggerFactory} +import scala.util.Random + +import org.apache.spark.network.client.TransportClient import org.apache.spark.sql.SparkSession /** - * Usage: GroupByShufflePerfTest + * Usage: GroupByShufflePerfTest [num records] */ object GroupByShufflePerfTest { def main(args: Array[String]) { + assert(args.length > 1) + var numRecords = args(1).toInt + var parallelization = 1 if (args.length != 0) { @@ -50,7 +59,7 @@ object GroupByShufflePerfTest { // .collect() // // println(wordCountsWithGroup.mkString(",")) - val words = createArray(10000) + val words = createArray(numRecords) val wordPairsRDD2 = spark.sparkContext .parallelize(words, parallelization).map(word => (word, 1)) @@ -73,7 +82,7 @@ object GroupByShufflePerfTest { val array = new Array[String](arraySize) for (i <- 1 to arraySize) { - array(i - 1) = mapIntToWord.getOrElse(i-1, "else") + array(i - 1) = Random.alphanumeric.take(2).mkString } array } diff --git a/examples/src/main/scala/org/apache/spark/examples/ShuffleMetricsOutputSparkListener.scala b/examples/src/main/scala/org/apache/spark/examples/ShuffleMetricsOutputSparkListener.scala index 86c187d5abc21..407020bbd8586 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ShuffleMetricsOutputSparkListener.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ShuffleMetricsOutputSparkListener.scala @@ -47,9 +47,20 @@ class ShuffleMetricsOutputSparkListener extends SparkListener with Logging { logInfo(s"WriteMetrics: " + s"${writeMetrics.writeTime} " + s"${writeMetrics.bytesWritten} " + - s"${writeMetrics.recordsWritten}") -// s"${writeMetrics.fileWriteTime} " + -// s"${writeMetrics.numFilesWritten}") + s"${writeMetrics.recordsWritten} " + + s"${writeMetrics.fileWriteTime} " + + s"${writeMetrics.numFilesWritten}") + + + logInfo(s"WriteMetrics-writeTime: " + writeMetrics.writeTime) + logInfo(s"WriteMetrics-fileWriteTime: " + writeMetrics.fileWriteTime) + logInfo(s"WriteMetrics-numFilesWritten: " + writeMetrics.numFilesWritten) + logInfo(s"WriteMetrics-indexFileWriteTime: " + writeMetrics.indexFileWriteTime) + logInfo(s"WriteMetrics-streamCopyWriteTime: " + writeMetrics.streamCopyWriteTime) + logInfo(s"WriteMetrics-bytesWritten: " + writeMetrics.bytesWritten) + logInfo(s"WriteMetrics-recordsWritten: " + writeMetrics.recordsWritten) + logInfo(s"WriteMetrics-createPartitionWriterTime: " + writeMetrics.createPartitionWriterTime) + logInfo(s"WriteMetrics-streamFileWriteTime: " + writeMetrics.streamFileWriteTime) } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesExternalShuffleService.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesExternalShuffleService.scala index 1eae663fc56ce..f722106faa373 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesExternalShuffleService.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesExternalShuffleService.scala @@ -176,7 +176,7 @@ private[spark] class KubernetesExternalShuffleBlockHandler( } }) getFileWriterStreamCallback( - appId, shuffleId, mapId, "data", FileWriterStreamCallback.FileType.DATA) + appId, shuffleId, mapId, "data", FileWriterStreamCallback.FileType.DATA, callback) } finally { val nanoSeconds = responseDelayContext.stop() logInfo("METRICS: OpenStream processing time: " + nanoSeconds) @@ -193,10 +193,11 @@ private[spark] class KubernetesExternalShuffleBlockHandler( shuffleId: Int, mapId: Int, extension: String, - fileType: FileWriterStreamCallback.FileType): StreamCallbackWithID = { + fileType: FileWriterStreamCallback.FileType, + callback: RpcResponseCallback): StreamCallbackWithID = { val file = getFile(appId, shuffleId, mapId, extension) val streamCallback = - new FileWriterStreamCallback(appId, shuffleId, mapId, file, fileType) + new FileWriterStreamCallback(appId, shuffleId, mapId, file, fileType, callback) streamCallback.open() streamCallback } From 391502a1f1e3f170ea5928d3ee1ad06b278b42c9 Mon Sep 17 00:00:00 2001 From: Yifei Huang Date: Tue, 12 Feb 2019 17:12:34 -0800 Subject: [PATCH 3/7] single client --- .../external/ExternalShuffleMapOutputWriter.java | 8 ++------ .../external/ExternalShufflePartitionWriter.java | 11 +++-------- .../shuffle/sort/BypassMergeSortShuffleWriter.java | 1 + .../spark/examples/GroupByShufflePerfTest.scala | 1 + .../examples/ShuffleMetricsOutputSparkListener.scala | 1 + 5 files changed, 8 insertions(+), 14 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/external/ExternalShuffleMapOutputWriter.java b/core/src/main/java/org/apache/spark/shuffle/external/ExternalShuffleMapOutputWriter.java index a3c3f9351fbb6..67256f3bd8758 100644 --- a/core/src/main/java/org/apache/spark/shuffle/external/ExternalShuffleMapOutputWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/external/ExternalShuffleMapOutputWriter.java @@ -22,6 +22,7 @@ public class ExternalShuffleMapOutputWriter implements ShuffleMapOutputWriter { private final int shuffleId; private final int mapId; private final ShuffleWriteMetricsReporter writeMetrics; + private final TransportClient client; public ExternalShuffleMapOutputWriter( TransportClientFactory clientFactory, @@ -39,7 +40,6 @@ public ExternalShuffleMapOutputWriter( this.mapId = mapId; this.writeMetrics = writeMetrics; - TransportClient client = null; try { client = clientFactory.createUnmanagedClient(hostName, port); ByteBuffer registerShuffleIndex = new RegisterShuffleIndex( @@ -50,7 +50,6 @@ public ExternalShuffleMapOutputWriter( logger.info("clientid: " + client.getClientId() + " " + client.isActive()); client.sendRpcSync(registerShuffleIndex, 60000); } catch (Exception e) { - client.close(); logger.error("Encountered error while creating transport client", e); throw new RuntimeException(e); } @@ -62,7 +61,7 @@ public ExternalShuffleMapOutputWriter( @Override public ShufflePartitionWriter newPartitionWriter(int partitionId) { try { - return new ExternalShufflePartitionWriter(clientFactory, + return new ExternalShufflePartitionWriter(client, hostName, port, appId, shuffleId, mapId, partitionId, writeMetrics); } catch (Exception e) { clientFactory.close(); @@ -73,14 +72,11 @@ public ShufflePartitionWriter newPartitionWriter(int partitionId) { @Override public void commitAllPartitions() { - TransportClient client = null; try { - client = clientFactory.createUnmanagedClient(hostName, port); ByteBuffer uploadShuffleIndex = new UploadShuffleIndex( appId, shuffleId, mapId).toByteBuffer(); String requestID = String.format( "index-upload-%s-%d-%d", appId, shuffleId, mapId); - client.setClientId(requestID); logger.info("clientid: " + client.getClientId() + " " + client.isActive()); final long startTime = System.nanoTime(); client.sendRpcSync(uploadShuffleIndex, 60000); diff --git a/core/src/main/java/org/apache/spark/shuffle/external/ExternalShufflePartitionWriter.java b/core/src/main/java/org/apache/spark/shuffle/external/ExternalShufflePartitionWriter.java index c83037c54a2a6..7eae80ba41a94 100644 --- a/core/src/main/java/org/apache/spark/shuffle/external/ExternalShufflePartitionWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/external/ExternalShufflePartitionWriter.java @@ -4,7 +4,6 @@ import org.apache.spark.network.buffer.NioManagedBuffer; import org.apache.spark.network.client.RpcResponseCallback; import org.apache.spark.network.client.TransportClient; -import org.apache.spark.network.client.TransportClientFactory; import org.apache.spark.network.shuffle.protocol.UploadShufflePartitionStream; import org.apache.spark.shuffle.ShuffleWriteMetricsReporter; import org.apache.spark.shuffle.api.CommittedPartition; @@ -21,7 +20,7 @@ public class ExternalShufflePartitionWriter implements ShufflePartitionWriter { private static final Logger logger = LoggerFactory.getLogger(ExternalShufflePartitionWriter.class); - private final TransportClientFactory clientFactory; + private final TransportClient client; private final String hostName; private final int port; private final String appId; @@ -34,7 +33,7 @@ public class ExternalShufflePartitionWriter implements ShufflePartitionWriter { private final ByteArrayOutputStream partitionBuffer = new ByteArrayOutputStream(); public ExternalShufflePartitionWriter( - TransportClientFactory clientFactory, + TransportClient client, String hostName, int port, String appId, @@ -42,7 +41,7 @@ public ExternalShufflePartitionWriter( int mapId, int partitionId, ShuffleWriteMetricsReporter writeMetrics) { - this.clientFactory = clientFactory; + this.client = client; this.hostName = hostName; this.port = port; this.appId = appId; @@ -57,16 +56,12 @@ public ExternalShufflePartitionWriter( @Override public CommittedPartition commitPartition() { - TransportClient client = null; try { byte[] buf = partitionBuffer.toByteArray(); int size = buf.length; ByteBuffer streamHeader = new UploadShufflePartitionStream(appId, shuffleId, mapId, partitionId, size).toByteBuffer(); ManagedBuffer managedBuffer = new NioManagedBuffer(ByteBuffer.wrap(buf)); - client = clientFactory.createUnmanagedClient(hostName, port); - client.setClientId(String.format("data-%s-%d-%d-%d", - appId, shuffleId, mapId, partitionId)); logger.info("clientid: " + client.getClientId() + " " + client.isActive()); logger.info("THE BUFFER HASH CODE IS: " + Arrays.hashCode(buf)); diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index 99af53b1c0850..562fc4669eec1 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -116,6 +116,7 @@ final class BypassMergeSortShuffleWriter extends ShuffleWriter { this.shuffleId = dep.shuffleId(); this.partitioner = dep.partitioner(); this.numPartitions = partitioner.numPartitions(); + logger.info("number of partitions: " + numPartitions); this.writeMetrics = writeMetrics; this.serializer = dep.serializer(); this.shuffleBlockResolver = shuffleBlockResolver; diff --git a/examples/src/main/scala/org/apache/spark/examples/GroupByShufflePerfTest.scala b/examples/src/main/scala/org/apache/spark/examples/GroupByShufflePerfTest.scala index 425e2b054380c..3fae4f21e944d 100644 --- a/examples/src/main/scala/org/apache/spark/examples/GroupByShufflePerfTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/GroupByShufflePerfTest.scala @@ -83,6 +83,7 @@ object GroupByShufflePerfTest { val array = new Array[String](arraySize) for (i <- 1 to arraySize) { array(i - 1) = Random.alphanumeric.take(2).mkString +// array(i - 1) = mapIntToWord.getOrElse(i-1, "else:q!") } array } diff --git a/examples/src/main/scala/org/apache/spark/examples/ShuffleMetricsOutputSparkListener.scala b/examples/src/main/scala/org/apache/spark/examples/ShuffleMetricsOutputSparkListener.scala index 407020bbd8586..742ef13360e2f 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ShuffleMetricsOutputSparkListener.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ShuffleMetricsOutputSparkListener.scala @@ -61,6 +61,7 @@ class ShuffleMetricsOutputSparkListener extends SparkListener with Logging { logInfo(s"WriteMetrics-recordsWritten: " + writeMetrics.recordsWritten) logInfo(s"WriteMetrics-createPartitionWriterTime: " + writeMetrics.createPartitionWriterTime) logInfo(s"WriteMetrics-streamFileWriteTime: " + writeMetrics.streamFileWriteTime) + logInfo(s"WriteMetrics-createMapOutputWriterTime: " + writeMetrics.createMapOutputWriterTime) } } From ef1d3967212ed3fea5a23833178f2c34dbb2d714 Mon Sep 17 00:00:00 2001 From: Yifei Huang Date: Wed, 13 Feb 2019 11:30:22 -0800 Subject: [PATCH 4/7] add more metrics and cleanup metrics --- .../shuffle/FileWriterStreamCallback.java | 8 ++++- .../ExternalShuffleMapOutputWriter.java | 2 ++ .../ExternalShufflePartitionWriter.java | 8 ++++- .../external/ExternalShuffleWriteSupport.java | 2 ++ .../sort/BypassMergeSortShuffleWriter.java | 32 ++++++++----------- .../apache/spark/InternalAccumulator.scala | 3 ++ .../spark/executor/ShuffleWriteMetrics.scala | 16 ++++++++++ .../apache/spark/executor/TaskMetrics.scala | 3 ++ .../org/apache/spark/shuffle/metrics.scala | 3 ++ .../ShuffleMetricsOutputSparkListener.scala | 4 ++- .../KubernetesExternalShuffleService.scala | 5 +++ 11 files changed, 65 insertions(+), 21 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/FileWriterStreamCallback.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/FileWriterStreamCallback.java index 2e8f0719fdb77..81f4d951822dd 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/FileWriterStreamCallback.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/FileWriterStreamCallback.java @@ -126,12 +126,18 @@ public void onData(String streamId, ByteBuffer buf) throws IOException { @Override public void onComplete(String streamId) throws IOException { if (startTime != 0) { - logger.info("METRICS: Writing file " + file.getName() + " took " + (System.nanoTime() - startTime)); + logger.info("METRICS: Streaming file " + file.getName() + " took " + (System.nanoTime() - startTime)); } logger.info( "Finished writing {}. File type: {}", file.getAbsolutePath(), fileType); fileOutputChannel.close(); + if (startTime != 0) { + logger.info("METRICS: Writing file " + file.getName() + " took " + (System.nanoTime() - startTime)); + } callback.onSuccess(ByteBuffer.allocate(0)); + if (startTime != 0) { + logger.info("METRICS: Sending write file callback for " + file.getName() + " took " + (System.nanoTime() - startTime)); + } } @Override diff --git a/core/src/main/java/org/apache/spark/shuffle/external/ExternalShuffleMapOutputWriter.java b/core/src/main/java/org/apache/spark/shuffle/external/ExternalShuffleMapOutputWriter.java index 67256f3bd8758..d252042139ed5 100644 --- a/core/src/main/java/org/apache/spark/shuffle/external/ExternalShuffleMapOutputWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/external/ExternalShuffleMapOutputWriter.java @@ -41,6 +41,7 @@ public ExternalShuffleMapOutputWriter( this.writeMetrics = writeMetrics; try { + final long startClientCreationTime = System.nanoTime(); client = clientFactory.createUnmanagedClient(hostName, port); ByteBuffer registerShuffleIndex = new RegisterShuffleIndex( appId, shuffleId, mapId).toByteBuffer(); @@ -48,6 +49,7 @@ public ExternalShuffleMapOutputWriter( "index-register-%s-%d-%d", appId, shuffleId, mapId); client.setClientId(requestID); logger.info("clientid: " + client.getClientId() + " " + client.isActive()); + writeMetrics.incCreateClientTime(System.nanoTime() - startClientCreationTime); client.sendRpcSync(registerShuffleIndex, 60000); } catch (Exception e) { logger.error("Encountered error while creating transport client", e); diff --git a/core/src/main/java/org/apache/spark/shuffle/external/ExternalShufflePartitionWriter.java b/core/src/main/java/org/apache/spark/shuffle/external/ExternalShufflePartitionWriter.java index 7eae80ba41a94..f194de1356cb8 100644 --- a/core/src/main/java/org/apache/spark/shuffle/external/ExternalShufflePartitionWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/external/ExternalShufflePartitionWriter.java @@ -1,5 +1,6 @@ package org.apache.spark.shuffle.external; +import com.google.common.util.concurrent.SettableFuture; import org.apache.spark.network.buffer.ManagedBuffer; import org.apache.spark.network.buffer.NioManagedBuffer; import org.apache.spark.network.client.RpcResponseCallback; @@ -11,7 +12,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.*; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.Arrays; @@ -65,6 +68,7 @@ public CommittedPartition commitPartition() { logger.info("clientid: " + client.getClientId() + " " + client.isActive()); logger.info("THE BUFFER HASH CODE IS: " + Arrays.hashCode(buf)); + SettableFuture future = SettableFuture.create(); final long startTime = System.nanoTime(); RpcResponseCallback callback = new RpcResponseCallback() { @Override @@ -72,6 +76,7 @@ public void onSuccess(ByteBuffer response) { logger.info("Successfully uploaded partition: " + System.nanoTime()); logger.info("StreamFileWriteTime: " + (System.nanoTime() - startTime)); writeMetrics.incStreamFileWriteTime(System.nanoTime() - startTime); + future.set(null); } @Override @@ -85,6 +90,7 @@ public void onFailure(Throwable e) { totalLength += size; logger.info("Partition Length: " + totalLength); logger.info("Size: " + size); + future.get(); } catch (Exception e) { if (client != null) { client.close(); diff --git a/core/src/main/java/org/apache/spark/shuffle/external/ExternalShuffleWriteSupport.java b/core/src/main/java/org/apache/spark/shuffle/external/ExternalShuffleWriteSupport.java index 37725b3dec172..449acc6eb77c9 100644 --- a/core/src/main/java/org/apache/spark/shuffle/external/ExternalShuffleWriteSupport.java +++ b/core/src/main/java/org/apache/spark/shuffle/external/ExternalShuffleWriteSupport.java @@ -44,12 +44,14 @@ public ExternalShuffleWriteSupport( @Override public ShuffleMapOutputWriter newMapOutputWriter(String appId, int shuffleId, int mapId, ShuffleWriteMetricsReporter writeMetrics) { + final long startClientFactoryCreationTime = System.nanoTime(); List bootstraps = Lists.newArrayList(); if (authEnabled) { bootstraps.add(new AuthClientBootstrap(conf, appId, secretKeyHolder)); } TransportClientFactory clientFactory = context.createClientFactory(bootstraps); logger.info("Clientfactory: " + clientFactory.toString()); + writeMetrics.incCreateClientFactoryTime(System.nanoTime() - startClientFactoryCreationTime); return new ExternalShuffleMapOutputWriter( clientFactory, hostname, port, appId, shuffleId, mapId, writeMetrics); } diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index 562fc4669eec1..4fa6cc8d0613f 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -252,36 +252,34 @@ private CommittedPartition[] combineAndWritePartitionsUsingPluggableWriter() thr assert(pluggableWriteSupport != null); final long writeStartTime = System.nanoTime(); - final long mapOutputWriterCreateStartTime = System.nanoTime(); - logger.info("TIMESTAMP: writeStartTime: " + writeStartTime); ShuffleMapOutputWriter mapOutputWriter = pluggableWriteSupport.newMapOutputWriter( appId, shuffleId, mapId, writeMetrics); - writeMetrics.incCreateMapOutputWriterTime(System.nanoTime() - mapOutputWriterCreateStartTime); + final long endMapOutputWriterTime = System.nanoTime(); + writeMetrics.incCreateMapOutputWriterTime(endMapOutputWriterTime - writeStartTime); + long endFileWriteTime = 0; try { for (int i = 0; i < numPartitions; i++) { final File file = partitionWriterSegments[i].file(); if (file.exists()) { final FileInputStream in = new FileInputStream(file); + final long endFileInputStreamTime = System.nanoTime(); + writeMetrics.incFileInputStreamTime(endFileInputStreamTime - endMapOutputWriterTime); boolean copyThrewException = true; - final long createPartitionWriterStartTime = System.nanoTime(); - logger.info("TIMESTAMP: createPartitionWriterStartTime: " - + createPartitionWriterStartTime); ShufflePartitionWriter writer = mapOutputWriter.newPartitionWriter(i); + final long endNewPartitionWriterTime = System.nanoTime(); writeMetrics.incCreatePartitionWriterTime( - System.nanoTime() - createPartitionWriterStartTime); + endNewPartitionWriterTime - endFileInputStreamTime); try { + long endStreamCopyTime; try (OutputStream out = writer.openPartitionStream()) { - final long streamCopyStartTime = System.nanoTime(); - logger.info("TIMESTAMP: streamCopyStartTime: " + streamCopyStartTime); Utils.copyStream(in, out, false, false); - writeMetrics.incStreamCopyWriteTime(System.nanoTime() - streamCopyStartTime); + endStreamCopyTime = System.nanoTime(); + writeMetrics.incStreamCopyWriteTime(endStreamCopyTime - endNewPartitionWriterTime); } - final long fileWriteStartTime = System.nanoTime(); - logger.info("TIMESTAMP: fileWriteStartTime: " + fileWriteStartTime); partitions[i] = writer.commitPartition(); writeMetrics.incNumFilesWritten(); - writeMetrics.incFileWriteTime(System.nanoTime() - fileWriteStartTime); - logger.info("TIMESTAMP: sentFileWriterRequestTime: " + System.nanoTime()); + endFileWriteTime = System.nanoTime(); + writeMetrics.incFileWriteTime(endFileWriteTime - endStreamCopyTime); copyThrewException = false; } catch (Exception e) { try { @@ -297,10 +295,9 @@ private CommittedPartition[] combineAndWritePartitionsUsingPluggableWriter() thr } } } - final long fileWriteStartTime = System.nanoTime(); - logger.info("TIMESTAMP: indexFileWriteStartTime: " + fileWriteStartTime); mapOutputWriter.commitAllPartitions(); - writeMetrics.incIndexFileWriteTime(System.nanoTime() - fileWriteStartTime); + final long endIndexFileWriteTime = System.nanoTime(); + writeMetrics.incIndexFileWriteTime(endIndexFileWriteTime - endFileWriteTime); } catch (Exception e) { try { mapOutputWriter.abort(e); @@ -309,7 +306,6 @@ private CommittedPartition[] combineAndWritePartitionsUsingPluggableWriter() thr } throw e; } finally { - logger.info("TIMESTAMP: finishedFileWriteTime: " + System.nanoTime()); writeMetrics.incWriteTime(System.nanoTime() - writeStartTime); } partitionWriters = null; diff --git a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala index bfd3f503e193f..fd25650c805e3 100644 --- a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala +++ b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala @@ -68,6 +68,9 @@ private[spark] object InternalAccumulator { val CREATE_PARTITION_WRITER_TIME = SHUFFLE_WRITE_METRICS_PREFIX + "createPartitionWriterTime" val CREATE_MAP_OUTPUT_WRITER_TIME = SHUFFLE_WRITE_METRICS_PREFIX + "createMapOutputWriterTime" val STREAM_FILE_WRITE_TIME = SHUFFLE_WRITE_METRICS_PREFIX + "streamFileWriteTime" + val FILE_INPUT_STREAM_TIME = SHUFFLE_WRITE_METRICS_PREFIX + "fileInputStreamTime" + val CREATE_CLIENT_FACTORY_TIME = SHUFFLE_WRITE_METRICS_PREFIX + "createClientFactoryTime" + val CREATE_CLIENT_TIME = SHUFFLE_WRITE_METRICS_PREFIX + "createClientTime" } // Names of output metrics diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala index fe1d614bb8de6..537a08fa9a777 100644 --- a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala @@ -39,6 +39,9 @@ class ShuffleWriteMetrics private[spark] () extends ShuffleWriteMetricsReporter private[executor] val _createPartitionWriterTime = new LongAccumulator private[executor] val _createMapOutputWriterTime = new LongAccumulator private[executor] val _streamFileWriteTime = new LongAccumulator + private[executor] val _fileInputStreamTime = new LongAccumulator + private[executor] val _createClientFactoryTime = new LongAccumulator + private[executor] val _createClientTime = new LongAccumulator /** * Number of bytes written for the shuffle by this task. @@ -81,6 +84,12 @@ class ShuffleWriteMetrics private[spark] () extends ShuffleWriteMetricsReporter def streamFileWriteTime: Long = _streamFileWriteTime.sum + def fileInputStreamTime: Long = _fileInputStreamTime.sum + + def createClientFactoryTime: Long = _createClientFactoryTime.sum + + def createClientTime: Long = _createClientTime.sum + private[spark] override def incBytesWritten(v: Long): Unit = _bytesWritten.add(v) private[spark] override def incRecordsWritten(v: Long): Unit = _recordsWritten.add(v) private[spark] override def incWriteTime(v: Long): Unit = _writeTime.add(v) @@ -101,4 +110,11 @@ class ShuffleWriteMetrics private[spark] () extends ShuffleWriteMetricsReporter _createMapOutputWriterTime.add(v) override private[spark] def incStreamFileWriteTime(v: Long): Unit = _streamFileWriteTime.add(v) + + override private[spark] def incFileInputStreamTime(v: Long): Unit = _fileInputStreamTime.add(v) + + override private[spark] def incCreateClientFactoryTime(v: Long): Unit = + _createClientFactoryTime.add(v) + + override private[spark] def incCreateClientTime(v: Long): Unit = _createClientTime.add(v) } diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 8a2ae94f3d4a8..d213dee18ac1d 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -235,6 +235,9 @@ class TaskMetrics private[spark] () extends Serializable { shuffleWrite.CREATE_PARTITION_WRITER_TIME -> shuffleWriteMetrics._createPartitionWriterTime, shuffleWrite.CREATE_MAP_OUTPUT_WRITER_TIME -> shuffleWriteMetrics._createMapOutputWriterTime, shuffleWrite.STREAM_FILE_WRITE_TIME -> shuffleWriteMetrics._streamFileWriteTime, + shuffleWrite.FILE_INPUT_STREAM_TIME -> shuffleWriteMetrics._fileInputStreamTime, + shuffleWrite.CREATE_CLIENT_FACTORY_TIME -> shuffleWriteMetrics._createClientFactoryTime, + shuffleWrite.CREATE_CLIENT_TIME -> shuffleWriteMetrics._createClientTime, input.BYTES_READ -> inputMetrics._bytesRead, input.RECORDS_READ -> inputMetrics._recordsRead, output.BYTES_WRITTEN -> outputMetrics._bytesWritten, diff --git a/core/src/main/scala/org/apache/spark/shuffle/metrics.scala b/core/src/main/scala/org/apache/spark/shuffle/metrics.scala index 3d1955adca683..def6c850c0c6b 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/metrics.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/metrics.scala @@ -56,4 +56,7 @@ private[spark] trait ShuffleWriteMetricsReporter { private[spark] def incCreatePartitionWriterTime(v: Long): Unit private[spark] def incCreateMapOutputWriterTime(v: Long): Unit private[spark] def incStreamFileWriteTime(v: Long): Unit + private[spark] def incFileInputStreamTime(v: Long): Unit + private[spark] def incCreateClientFactoryTime(v: Long): Unit + private[spark] def incCreateClientTime(v: Long): Unit } diff --git a/examples/src/main/scala/org/apache/spark/examples/ShuffleMetricsOutputSparkListener.scala b/examples/src/main/scala/org/apache/spark/examples/ShuffleMetricsOutputSparkListener.scala index 742ef13360e2f..380a424adb9cb 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ShuffleMetricsOutputSparkListener.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ShuffleMetricsOutputSparkListener.scala @@ -51,7 +51,6 @@ class ShuffleMetricsOutputSparkListener extends SparkListener with Logging { s"${writeMetrics.fileWriteTime} " + s"${writeMetrics.numFilesWritten}") - logInfo(s"WriteMetrics-writeTime: " + writeMetrics.writeTime) logInfo(s"WriteMetrics-fileWriteTime: " + writeMetrics.fileWriteTime) logInfo(s"WriteMetrics-numFilesWritten: " + writeMetrics.numFilesWritten) @@ -62,6 +61,9 @@ class ShuffleMetricsOutputSparkListener extends SparkListener with Logging { logInfo(s"WriteMetrics-createPartitionWriterTime: " + writeMetrics.createPartitionWriterTime) logInfo(s"WriteMetrics-streamFileWriteTime: " + writeMetrics.streamFileWriteTime) logInfo(s"WriteMetrics-createMapOutputWriterTime: " + writeMetrics.createMapOutputWriterTime) + logInfo(s"WriteMetrics-fileInputStreamTime: " + writeMetrics.fileInputStreamTime) + logInfo(s"WriteMetrics-createClientFactoryTime: " + writeMetrics.createClientFactoryTime) + logInfo(s"WriteMetrics-createClientTime: " + writeMetrics.createClientTime) } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesExternalShuffleService.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesExternalShuffleService.scala index f722106faa373..a51b394f7dcbc 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesExternalShuffleService.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesExternalShuffleService.scala @@ -112,10 +112,15 @@ private[spark] class KubernetesExternalShuffleBlockHandler( } case RegisterIndexParam(appId, shuffleId, mapId) => + val startRegisterIndexTime = System.nanoTime() logInfo(s"Received register index param from app $appId") globalPartitionLengths.putIfAbsent( (appId, shuffleId, mapId), TreeMap.empty[Int, Long]) + logInfo(s"METRICS: RegisterIndexParam processing time:" + + s" ${System.nanoTime() - startRegisterIndexTime}") callback.onSuccess(ByteBuffer.allocate(0)) + logInfo(s"METRICS: RegisterIndexParam processing callback time:" + + s" ${System.nanoTime() - startRegisterIndexTime}") case UploadIndexParam(appId, shuffleId, mapId) => val responseDelayContext = metricSet.writeIndexRequestLatencyMillis.time() From e7cffccc101a309654094ac9422635477361ad96 Mon Sep 17 00:00:00 2001 From: Yifei Huang Date: Wed, 13 Feb 2019 11:47:26 -0800 Subject: [PATCH 5/7] fix regular shuffle metrics to be additive (mostly) --- .../shuffle/external/ExternalShufflePartitionWriter.java | 2 -- .../spark/shuffle/sort/BypassMergeSortShuffleWriter.java | 9 +++++---- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/external/ExternalShufflePartitionWriter.java b/core/src/main/java/org/apache/spark/shuffle/external/ExternalShufflePartitionWriter.java index f194de1356cb8..f6d992def1434 100644 --- a/core/src/main/java/org/apache/spark/shuffle/external/ExternalShufflePartitionWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/external/ExternalShufflePartitionWriter.java @@ -65,8 +65,6 @@ public CommittedPartition commitPartition() { ByteBuffer streamHeader = new UploadShufflePartitionStream(appId, shuffleId, mapId, partitionId, size).toByteBuffer(); ManagedBuffer managedBuffer = new NioManagedBuffer(ByteBuffer.wrap(buf)); - logger.info("clientid: " + client.getClientId() + " " + client.isActive()); - logger.info("THE BUFFER HASH CODE IS: " + Arrays.hashCode(buf)); SettableFuture future = SettableFuture.create(); final long startTime = System.nanoTime(); diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index 4fa6cc8d0613f..aacddb2209790 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -171,7 +171,7 @@ public void write(Iterator> records) throws IOException { try { committedPartitions = combineAndWritePartitions(tmp); logger.info("Successfully wrote partitions without shuffle"); - final long fileWriteStartTime = System.nanoTime(); + final long fileWriteStartTime = System.nanoTime(); shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, Arrays.stream(committedPartitions).mapToLong(p -> p.length()).toArray(), @@ -204,9 +204,9 @@ private CommittedPartition[] combineAndWritePartitions(File outputFile) throws I // We were passed an empty iterator return partitions; } + final long writeStartTime = System.nanoTime(); assert(outputFile != null); final FileOutputStream out = new FileOutputStream(outputFile, true); - final long writeStartTime = System.nanoTime(); boolean threwException = true; try { for (int i = 0; i < numPartitions; i++) { @@ -214,8 +214,9 @@ private CommittedPartition[] combineAndWritePartitions(File outputFile) throws I if (file.exists()) { final FileInputStream in = new FileInputStream(file); boolean copyThrewException = true; + final long endFileInputStreamTime = System.nanoTime(); + writeMetrics.incFileInputStreamTime(endFileInputStreamTime - writeStartTime); try { - final long streamCopyStartTime = System.nanoTime(); partitions[i] = new LocalCommittedPartition( Utils.copyStream(in, out, false, transferToEnabled)); @@ -224,10 +225,10 @@ private CommittedPartition[] combineAndWritePartitions(File outputFile) throws I } else { logger.info("TransferTo is not enabled"); } - writeMetrics.incStreamCopyWriteTime(System.nanoTime() - streamCopyStartTime); copyThrewException = false; } finally { Closeables.close(in, copyThrewException); + writeMetrics.incStreamCopyWriteTime(System.nanoTime() - endFileInputStreamTime); } if (!file.delete()) { logger.error("Unable to delete file for partition {}", i); From c51a7b5f2185e58b140808a6d0b5a0f598d22567 Mon Sep 17 00:00:00 2001 From: Yifei Huang Date: Thu, 14 Feb 2019 15:17:42 -0800 Subject: [PATCH 6/7] fix up metrics - final counts --- .../client/TransportClientFactory.java | 1 + .../ExternalShuffleMapOutputWriter.java | 4 ++- .../sort/BypassMergeSortShuffleWriter.java | 28 +++++++++---------- .../apache/spark/InternalAccumulator.scala | 1 + .../spark/executor/ShuffleWriteMetrics.scala | 6 ++++ .../apache/spark/executor/TaskMetrics.scala | 1 + .../org/apache/spark/shuffle/metrics.scala | 1 + .../ShuffleMetricsOutputSparkListener.scala | 2 ++ 8 files changed, 28 insertions(+), 16 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java index 16d242dbb2c47..8178cb0c052c7 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java +++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java @@ -229,6 +229,7 @@ private TransportClient createClient(InetSocketAddress address) bootstrap.handler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) { + logger.info("Initializing channel"); TransportChannelHandler clientHandler = context.initializePipeline(ch); clientRef.set(clientHandler.getClient()); channelRef.set(ch); diff --git a/core/src/main/java/org/apache/spark/shuffle/external/ExternalShuffleMapOutputWriter.java b/core/src/main/java/org/apache/spark/shuffle/external/ExternalShuffleMapOutputWriter.java index d252042139ed5..d236cb6ebe9e6 100644 --- a/core/src/main/java/org/apache/spark/shuffle/external/ExternalShuffleMapOutputWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/external/ExternalShuffleMapOutputWriter.java @@ -49,8 +49,10 @@ public ExternalShuffleMapOutputWriter( "index-register-%s-%d-%d", appId, shuffleId, mapId); client.setClientId(requestID); logger.info("clientid: " + client.getClientId() + " " + client.isActive()); - writeMetrics.incCreateClientTime(System.nanoTime() - startClientCreationTime); + final long endClientCreationTime = System.nanoTime(); + writeMetrics.incCreateClientTime(endClientCreationTime - startClientCreationTime); client.sendRpcSync(registerShuffleIndex, 60000); + writeMetrics.incSendRegisterShuffleRequestTime(System.nanoTime() - endClientCreationTime); } catch (Exception e) { logger.error("Encountered error while creating transport client", e); throw new RuntimeException(e); diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index aacddb2209790..ffafaf8f8f97a 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -212,10 +212,9 @@ private CommittedPartition[] combineAndWritePartitions(File outputFile) throws I for (int i = 0; i < numPartitions; i++) { final File file = partitionWriterSegments[i].file(); if (file.exists()) { + final long streamCopyStartTime = System.nanoTime(); final FileInputStream in = new FileInputStream(file); boolean copyThrewException = true; - final long endFileInputStreamTime = System.nanoTime(); - writeMetrics.incFileInputStreamTime(endFileInputStreamTime - writeStartTime); try { partitions[i] = new LocalCommittedPartition( @@ -228,7 +227,7 @@ private CommittedPartition[] combineAndWritePartitions(File outputFile) throws I copyThrewException = false; } finally { Closeables.close(in, copyThrewException); - writeMetrics.incStreamCopyWriteTime(System.nanoTime() - endFileInputStreamTime); + writeMetrics.incStreamCopyWriteTime(System.nanoTime() - streamCopyStartTime); } if (!file.delete()) { logger.error("Unable to delete file for partition {}", i); @@ -257,30 +256,27 @@ private CommittedPartition[] combineAndWritePartitionsUsingPluggableWriter() thr appId, shuffleId, mapId, writeMetrics); final long endMapOutputWriterTime = System.nanoTime(); writeMetrics.incCreateMapOutputWriterTime(endMapOutputWriterTime - writeStartTime); - long endFileWriteTime = 0; try { for (int i = 0; i < numPartitions; i++) { final File file = partitionWriterSegments[i].file(); if (file.exists()) { final FileInputStream in = new FileInputStream(file); - final long endFileInputStreamTime = System.nanoTime(); - writeMetrics.incFileInputStreamTime(endFileInputStreamTime - endMapOutputWriterTime); boolean copyThrewException = true; ShufflePartitionWriter writer = mapOutputWriter.newPartitionWriter(i); - final long endNewPartitionWriterTime = System.nanoTime(); - writeMetrics.incCreatePartitionWriterTime( - endNewPartitionWriterTime - endFileInputStreamTime); try { - long endStreamCopyTime; + final long startCopyStreamTime = System.nanoTime(); try (OutputStream out = writer.openPartitionStream()) { Utils.copyStream(in, out, false, false); - endStreamCopyTime = System.nanoTime(); - writeMetrics.incStreamCopyWriteTime(endStreamCopyTime - endNewPartitionWriterTime); } + final long endStreamCopyTime = System.nanoTime(); + writeMetrics.incStreamCopyWriteTime(endStreamCopyTime - startCopyStreamTime); + + final long commitPartitionStartTime = System.nanoTime(); partitions[i] = writer.commitPartition(); writeMetrics.incNumFilesWritten(); - endFileWriteTime = System.nanoTime(); - writeMetrics.incFileWriteTime(endFileWriteTime - endStreamCopyTime); + final long endFileWriteTime = System.nanoTime(); + writeMetrics.incFileWriteTime(endFileWriteTime - commitPartitionStartTime); + copyThrewException = false; } catch (Exception e) { try { @@ -296,9 +292,11 @@ private CommittedPartition[] combineAndWritePartitionsUsingPluggableWriter() thr } } } + + final long startIndexFileWriteTime = System.nanoTime(); mapOutputWriter.commitAllPartitions(); final long endIndexFileWriteTime = System.nanoTime(); - writeMetrics.incIndexFileWriteTime(endIndexFileWriteTime - endFileWriteTime); + writeMetrics.incIndexFileWriteTime(endIndexFileWriteTime - startIndexFileWriteTime); } catch (Exception e) { try { mapOutputWriter.abort(e); diff --git a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala index fd25650c805e3..adf45d12f5c79 100644 --- a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala +++ b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala @@ -71,6 +71,7 @@ private[spark] object InternalAccumulator { val FILE_INPUT_STREAM_TIME = SHUFFLE_WRITE_METRICS_PREFIX + "fileInputStreamTime" val CREATE_CLIENT_FACTORY_TIME = SHUFFLE_WRITE_METRICS_PREFIX + "createClientFactoryTime" val CREATE_CLIENT_TIME = SHUFFLE_WRITE_METRICS_PREFIX + "createClientTime" + val SEND_REGISTER_SHUFFLE_REQUEST_TIME = SHUFFLE_WRITE_METRICS_PREFIX + "sendRegisterShuffleRequestTime" } // Names of output metrics diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala index 537a08fa9a777..2eb0c100fa5d6 100644 --- a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala @@ -42,6 +42,7 @@ class ShuffleWriteMetrics private[spark] () extends ShuffleWriteMetricsReporter private[executor] val _fileInputStreamTime = new LongAccumulator private[executor] val _createClientFactoryTime = new LongAccumulator private[executor] val _createClientTime = new LongAccumulator + private[executor] val _sendRegisterShuffleRequestTime = new LongAccumulator /** * Number of bytes written for the shuffle by this task. @@ -90,6 +91,8 @@ class ShuffleWriteMetrics private[spark] () extends ShuffleWriteMetricsReporter def createClientTime: Long = _createClientTime.sum + def sendRegisterShuffleRequestTime: Long = _sendRegisterShuffleRequestTime.sum + private[spark] override def incBytesWritten(v: Long): Unit = _bytesWritten.add(v) private[spark] override def incRecordsWritten(v: Long): Unit = _recordsWritten.add(v) private[spark] override def incWriteTime(v: Long): Unit = _writeTime.add(v) @@ -117,4 +120,7 @@ class ShuffleWriteMetrics private[spark] () extends ShuffleWriteMetricsReporter _createClientFactoryTime.add(v) override private[spark] def incCreateClientTime(v: Long): Unit = _createClientTime.add(v) + + override private[spark] def incSendRegisterShuffleRequestTime(v: Long): Unit = + _sendRegisterShuffleRequestTime.add(v) } diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index d213dee18ac1d..c96f72db4cbef 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -238,6 +238,7 @@ class TaskMetrics private[spark] () extends Serializable { shuffleWrite.FILE_INPUT_STREAM_TIME -> shuffleWriteMetrics._fileInputStreamTime, shuffleWrite.CREATE_CLIENT_FACTORY_TIME -> shuffleWriteMetrics._createClientFactoryTime, shuffleWrite.CREATE_CLIENT_TIME -> shuffleWriteMetrics._createClientTime, + shuffleWrite.SEND_REGISTER_SHUFFLE_REQUEST_TIME -> shuffleWriteMetrics._sendRegisterShuffleRequestTime, input.BYTES_READ -> inputMetrics._bytesRead, input.RECORDS_READ -> inputMetrics._recordsRead, output.BYTES_WRITTEN -> outputMetrics._bytesWritten, diff --git a/core/src/main/scala/org/apache/spark/shuffle/metrics.scala b/core/src/main/scala/org/apache/spark/shuffle/metrics.scala index def6c850c0c6b..d9bc59cb1f608 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/metrics.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/metrics.scala @@ -59,4 +59,5 @@ private[spark] trait ShuffleWriteMetricsReporter { private[spark] def incFileInputStreamTime(v: Long): Unit private[spark] def incCreateClientFactoryTime(v: Long): Unit private[spark] def incCreateClientTime(v: Long): Unit + private[spark] def incSendRegisterShuffleRequestTime(v: Long): Unit } diff --git a/examples/src/main/scala/org/apache/spark/examples/ShuffleMetricsOutputSparkListener.scala b/examples/src/main/scala/org/apache/spark/examples/ShuffleMetricsOutputSparkListener.scala index 380a424adb9cb..faa34bf112f72 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ShuffleMetricsOutputSparkListener.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ShuffleMetricsOutputSparkListener.scala @@ -64,6 +64,8 @@ class ShuffleMetricsOutputSparkListener extends SparkListener with Logging { logInfo(s"WriteMetrics-fileInputStreamTime: " + writeMetrics.fileInputStreamTime) logInfo(s"WriteMetrics-createClientFactoryTime: " + writeMetrics.createClientFactoryTime) logInfo(s"WriteMetrics-createClientTime: " + writeMetrics.createClientTime) + logInfo(s"WriteMetrics-sendRegisterShuffleRequestTime: " + + writeMetrics.sendRegisterShuffleRequestTime) } } From 0bd2c12eb71202464d64d568b691cf77e9f47d8b Mon Sep 17 00:00:00 2001 From: Yifei Huang Date: Thu, 14 Feb 2019 19:06:24 -0800 Subject: [PATCH 7/7] cleanup --- .../spark/examples/GroupByShufflePerfTest.scala | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/GroupByShufflePerfTest.scala b/examples/src/main/scala/org/apache/spark/examples/GroupByShufflePerfTest.scala index 3fae4f21e944d..b8d2cc0b57a16 100644 --- a/examples/src/main/scala/org/apache/spark/examples/GroupByShufflePerfTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/GroupByShufflePerfTest.scala @@ -50,15 +50,6 @@ object GroupByShufflePerfTest { spark.sparkContext.addSparkListener(new ShuffleMetricsOutputSparkListener()) -// val words = Array("one", "two", "two", "three", "three", "three") -// val wordPairsRDD = spark.sparkContext.parallelize(words).map(word => (word, 1)) -// -// val wordCountsWithGroup = wordPairsRDD -// .groupByKey() -// .map(t => (t._1, t._2.sum)) -// .collect() -// -// println(wordCountsWithGroup.mkString(",")) val words = createArray(numRecords) val wordPairsRDD2 = spark.sparkContext @@ -71,19 +62,14 @@ object GroupByShufflePerfTest { println(wordCountsWithGroup2.mkString(",")) -// Thread.sleep(600000) spark.stop() } def createArray(arraySize: Int) : Array[String] = { - val mapIntToWord: Map[Int, String] = - Map(0 -> "zero", 1 -> "one", 2 -> "two", 3 -> "three", 4 -> "four") - val array = new Array[String](arraySize) for (i <- 1 to arraySize) { array(i - 1) = Random.alphanumeric.take(2).mkString -// array(i - 1) = mapIntToWord.getOrElse(i-1, "else:q!") } array }