From d430dca4acfc16b8f16368d38644cba24269e306 Mon Sep 17 00:00:00 2001 From: Wojciech Biela Date: Tue, 15 Nov 2016 19:26:27 +0100 Subject: [PATCH 1/4] add spilled data size to JSON Add tracking of spilled data size on various levels (from operator to query) and expose it in JSON. --- .../presto/jdbc/TestProgressMonitor.java | 2 +- .../presto/execution/QueryStateMachine.java | 6 ++++- .../facebook/presto/execution/QueryStats.java | 14 +++++++++++- .../presto/execution/StageStateMachine.java | 6 ++++- .../facebook/presto/execution/StageStats.java | 14 +++++++++++- .../presto/operator/DriverContext.java | 5 ++++- .../facebook/presto/operator/DriverStats.java | 16 +++++++++++++- .../presto/operator/OperatorContext.java | 13 ++++++++++- .../presto/operator/OperatorStats.java | 22 ++++++++++++++++--- .../presto/operator/PipelineContext.java | 6 ++++- .../presto/operator/PipelineStats.java | 16 ++++++++++++-- .../facebook/presto/operator/TaskContext.java | 6 ++++- .../facebook/presto/operator/TaskStats.java | 20 ++++++++++++++--- .../presto/execution/TestQueryStats.java | 5 ++++- .../presto/execution/TestStageStats.java | 4 +++- .../presto/operator/TestDriverStats.java | 4 +++- .../presto/operator/TestOperatorStats.java | 9 ++++++-- .../presto/operator/TestPipelineStats.java | 4 +++- .../presto/operator/TestTaskStats.java | 4 +++- .../presto/server/TestBasicQueryInfo.java | 3 ++- 20 files changed, 153 insertions(+), 26 deletions(-) diff --git a/presto-jdbc/src/test/java/com/facebook/presto/jdbc/TestProgressMonitor.java b/presto-jdbc/src/test/java/com/facebook/presto/jdbc/TestProgressMonitor.java index 66c21737a46fa..e012ba5fd4332 100644 --- a/presto-jdbc/src/test/java/com/facebook/presto/jdbc/TestProgressMonitor.java +++ b/presto-jdbc/src/test/java/com/facebook/presto/jdbc/TestProgressMonitor.java @@ -75,7 +75,7 @@ private static String newQueryResults(Integer partialCancelId, Integer nextUriId nextUriId == null ? null : URI.create(format(NEXT_URI, nextUriId)), responseColumns, data, - new StatementStats(state, state.equals("QUEUED"), true, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, null), + new StatementStats(state, state.equals("QUEUED"), true, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, null, null), null, null, null); diff --git a/presto-main/src/main/java/com/facebook/presto/execution/QueryStateMachine.java b/presto-main/src/main/java/com/facebook/presto/execution/QueryStateMachine.java index f7381354a6219..e587259006499 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/QueryStateMachine.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/QueryStateMachine.java @@ -303,6 +303,8 @@ public QueryInfo getQueryInfo(Optional rootStage) boolean fullyBlocked = rootStage.isPresent(); Set blockedReasons = new HashSet<>(); + long spilledBytes = 0; + boolean completeInfo = true; for (StageInfo stageInfo : getAllStages(rootStage)) { StageStats stageStats = stageInfo.getStageStats(); @@ -337,6 +339,7 @@ public QueryInfo getQueryInfo(Optional rootStage) processedInputPositions += stageStats.getProcessedInputPositions(); } completeInfo = completeInfo && stageInfo.isCompleteInfo(); + spilledBytes += stageStats.getSpilledDataSize().toBytes(); } if (rootStage.isPresent()) { @@ -382,7 +385,8 @@ public QueryInfo getQueryInfo(Optional rootStage) succinctBytes(processedInputDataSize), processedInputPositions, succinctBytes(outputDataSize), - outputPositions); + outputPositions, + succinctBytes(spilledBytes)); return new QueryInfo(queryId, session.toSessionRepresentation(), diff --git a/presto-main/src/main/java/com/facebook/presto/execution/QueryStats.java b/presto-main/src/main/java/com/facebook/presto/execution/QueryStats.java index 7fe73b25d3a0d..e256c99fd2970 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/QueryStats.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/QueryStats.java @@ -73,6 +73,8 @@ public class QueryStats private final DataSize outputDataSize; private final long outputPositions; + private final DataSize spilledDataSize; + @VisibleForTesting public QueryStats() { @@ -108,6 +110,7 @@ public QueryStats() this.processedInputPositions = 0; this.outputDataSize = null; this.outputPositions = 0; + this.spilledDataSize = null; } @JsonCreator @@ -151,7 +154,8 @@ public QueryStats( @JsonProperty("processedInputPositions") long processedInputPositions, @JsonProperty("outputDataSize") DataSize outputDataSize, - @JsonProperty("outputPositions") long outputPositions) + @JsonProperty("outputPositions") long outputPositions, + @JsonProperty("spilledDataSize") DataSize spilledDataSize) { this.createTime = requireNonNull(createTime, "createTime is null"); this.executionStartTime = executionStartTime; @@ -202,6 +206,8 @@ public QueryStats( this.outputDataSize = requireNonNull(outputDataSize, "outputDataSize is null"); checkArgument(outputPositions >= 0, "outputPositions is negative"); this.outputPositions = outputPositions; + + this.spilledDataSize = spilledDataSize; } @JsonProperty @@ -405,4 +411,10 @@ public long getOutputPositions() { return outputPositions; } + + @JsonProperty + public DataSize getSpilledDataSize() + { + return spilledDataSize; + } } diff --git a/presto-main/src/main/java/com/facebook/presto/execution/StageStateMachine.java b/presto-main/src/main/java/com/facebook/presto/execution/StageStateMachine.java index 9198c642a9cd3..1fe5072eee25a 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/StageStateMachine.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/StageStateMachine.java @@ -223,6 +223,8 @@ public StageInfo getStageInfo(Supplier> taskInfosSupplier, Su boolean fullyBlocked = true; Set blockedReasons = new HashSet<>(); + long spilledBytes = 0; + for (TaskInfo taskInfo : taskInfos) { TaskState taskState = taskInfo.getTaskStatus().getState(); if (taskState.isDone()) { @@ -259,6 +261,7 @@ public StageInfo getStageInfo(Supplier> taskInfosSupplier, Su outputDataSize += taskStats.getOutputDataSize().toBytes(); outputPositions += taskStats.getOutputPositions(); + spilledBytes += taskStats.getSpilledDataSize().toBytes(); } StageStats stageStats = new StageStats( @@ -291,7 +294,8 @@ public StageInfo getStageInfo(Supplier> taskInfosSupplier, Su succinctBytes(processedInputDataSize), processedInputPositions, succinctBytes(outputDataSize), - outputPositions); + outputPositions, + succinctBytes(spilledBytes)); ExecutionFailureInfo failureInfo = null; if (state == FAILED) { diff --git a/presto-main/src/main/java/com/facebook/presto/execution/StageStats.java b/presto-main/src/main/java/com/facebook/presto/execution/StageStats.java index a89a9b036ef3e..63320947d2db6 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/StageStats.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/StageStats.java @@ -68,6 +68,8 @@ public class StageStats private final DataSize outputDataSize; private final long outputPositions; + private final DataSize spilledDataSize; + @VisibleForTesting public StageStats() { @@ -97,6 +99,7 @@ public StageStats() this.processedInputPositions = 0; this.outputDataSize = null; this.outputPositions = 0; + this.spilledDataSize = null; } @JsonCreator @@ -134,7 +137,8 @@ public StageStats( @JsonProperty("processedInputPositions") long processedInputPositions, @JsonProperty("outputDataSize") DataSize outputDataSize, - @JsonProperty("outputPositions") long outputPositions) + @JsonProperty("outputPositions") long outputPositions, + @JsonProperty("spilledDataSize") DataSize spilledDataSize) { this.schedulingComplete = schedulingComplete; this.getSplitDistribution = requireNonNull(getSplitDistribution, "getSplitDistribution is null"); @@ -179,6 +183,8 @@ public StageStats( this.outputDataSize = requireNonNull(outputDataSize, "outputDataSize is null"); checkArgument(outputPositions >= 0, "outputPositions is negative"); this.outputPositions = outputPositions; + + this.spilledDataSize = spilledDataSize; } @JsonProperty @@ -336,4 +342,10 @@ public long getOutputPositions() { return outputPositions; } + + @JsonProperty + public DataSize getSpilledDataSize() + { + return spilledDataSize; + } } diff --git a/presto-main/src/main/java/com/facebook/presto/operator/DriverContext.java b/presto-main/src/main/java/com/facebook/presto/operator/DriverContext.java index cd6eef676fa04..001d8878d5483 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/DriverContext.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/DriverContext.java @@ -82,6 +82,7 @@ public class DriverContext private final List operatorContexts = new CopyOnWriteArrayList<>(); private final boolean partitioned; + private final AtomicLong spilledBytes = new AtomicLong(); public DriverContext(PipelineContext pipelineContext, Executor executor, boolean partitioned) { @@ -207,6 +208,7 @@ public ListenableFuture reserveSystemMemory(long bytes) public ListenableFuture reserveSpill(long bytes) { + spilledBytes.getAndAdd(bytes); return pipelineContext.reserveSpill(bytes); } @@ -404,7 +406,8 @@ public DriverStats getDriverStats() processedInputPositions, outputDataSize.convertToMostSuccinctDataSize(), outputPositions, - ImmutableList.copyOf(transform(operatorContexts, OperatorContext::getOperatorStats))); + ImmutableList.copyOf(transform(operatorContexts, OperatorContext::getOperatorStats)), + succinctBytes(spilledBytes.get())); } public boolean isPartitioned() diff --git a/presto-main/src/main/java/com/facebook/presto/operator/DriverStats.java b/presto-main/src/main/java/com/facebook/presto/operator/DriverStats.java index 55aa29f1fbf85..74232116b2308 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/DriverStats.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/DriverStats.java @@ -64,6 +64,8 @@ public class DriverStats private final List operatorStats; + private final DataSize spilledDataSize; + public DriverStats() { this.createTime = DateTime.now(); @@ -93,6 +95,8 @@ public DriverStats() this.outputPositions = 0; this.operatorStats = ImmutableList.of(); + + this.spilledDataSize = new DataSize(0, BYTE); } @JsonCreator @@ -123,7 +127,9 @@ public DriverStats( @JsonProperty("outputDataSize") DataSize outputDataSize, @JsonProperty("outputPositions") long outputPositions, - @JsonProperty("operatorStats") List operatorStats) + @JsonProperty("operatorStats") List operatorStats, + + @JsonProperty("spilledDataSize") DataSize spilledDataSize) { this.createTime = requireNonNull(createTime, "createTime is null"); this.startTime = startTime; @@ -155,6 +161,8 @@ public DriverStats( this.outputPositions = outputPositions; this.operatorStats = ImmutableList.copyOf(requireNonNull(operatorStats, "operatorStats is null")); + + this.spilledDataSize = spilledDataSize; } @JsonProperty @@ -284,4 +292,10 @@ public List getOperatorStats() { return operatorStats; } + + @JsonProperty + public DataSize getSpilledDataSize() + { + return spilledDataSize; + } } diff --git a/presto-main/src/main/java/com/facebook/presto/operator/OperatorContext.java b/presto-main/src/main/java/com/facebook/presto/operator/OperatorContext.java index c0ff761abd176..1d207309c47aa 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/OperatorContext.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/OperatorContext.java @@ -395,7 +395,10 @@ public OperatorStats getOperatorStats() succinctBytes(memoryReservation.get()), succinctBytes(systemMemoryContext.getReservedBytes()), memoryFuture.get().isDone() ? Optional.empty() : Optional.of(WAITING_FOR_MEMORY), - info); + + info, + + succinctBytes(spillContext.getSpilledBytes())); } private long currentThreadUserTime() @@ -502,6 +505,7 @@ private class OperatorSpillContext private final DriverContext driverContext; private long reservedBytes; + private long spilledBytes; public OperatorSpillContext(DriverContext driverContext) { @@ -513,6 +517,7 @@ public void updateBytes(long bytes) { if (bytes > 0) { driverContext.reserveSpill(bytes); + spilledBytes += bytes; } else { checkArgument(reservedBytes + bytes >= 0, "tried to free %s spilled bytes from %s bytes reserved", -bytes, reservedBytes); @@ -526,7 +531,13 @@ public String toString() { return toStringHelper(this) .add("usedBytes", reservedBytes) + .add("spilledBytes", spilledBytes) .toString(); } + + public long getSpilledBytes() + { + return spilledBytes; + } } } diff --git a/presto-main/src/main/java/com/facebook/presto/operator/OperatorStats.java b/presto-main/src/main/java/com/facebook/presto/operator/OperatorStats.java index 64d21a4cc6d4a..ef6375c030762 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/OperatorStats.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/OperatorStats.java @@ -62,6 +62,8 @@ public class OperatorStats private final DataSize systemMemoryReservation; private final Optional blockedReason; + private final DataSize spilledDataSize; + private final Object info; @JsonCreator @@ -95,7 +97,9 @@ public OperatorStats( @JsonProperty("systemMemoryReservation") DataSize systemMemoryReservation, @JsonProperty("blockedReason") Optional blockedReason, - @JsonProperty("info") Object info) + @JsonProperty("info") Object info, + + @JsonProperty("spilledDataSize") DataSize spilledDataSize) { checkArgument(operatorId >= 0, "operatorId is negative"); this.operatorId = operatorId; @@ -129,6 +133,8 @@ public OperatorStats( this.systemMemoryReservation = requireNonNull(systemMemoryReservation, "systemMemoryReservation is null"); this.blockedReason = blockedReason; + this.spilledDataSize = spilledDataSize; + this.info = info; } @@ -270,6 +276,12 @@ public Optional getBlockedReason() return blockedReason; } + @JsonProperty + public DataSize getSpilledDataSize() + { + return spilledDataSize; + } + @Nullable @JsonProperty public Object getInfo() @@ -308,7 +320,7 @@ public OperatorStats add(Iterable operators) long memoryReservation = this.memoryReservation.toBytes(); long systemMemoryReservation = this.systemMemoryReservation.toBytes(); Optional blockedReason = this.blockedReason; - + long spilledBytes = this.spilledDataSize.toBytes(); Mergeable base = null; if (info instanceof Mergeable) { base = (Mergeable) info; @@ -343,6 +355,8 @@ public OperatorStats add(Iterable operators) blockedReason = operator.getBlockedReason(); } + spilledBytes += operator.getSpilledDataSize().toBytes(); + Object info = operator.getInfo(); if (base != null && info != null && base.getClass() == info.getClass()) { base = mergeInfo(base, info); @@ -379,7 +393,9 @@ public OperatorStats add(Iterable operators) succinctBytes(systemMemoryReservation), blockedReason, - base); + base, + + succinctBytes(spilledBytes)); } public static > Mergeable mergeInfo(Object base, Object other) diff --git a/presto-main/src/main/java/com/facebook/presto/operator/PipelineContext.java b/presto-main/src/main/java/com/facebook/presto/operator/PipelineContext.java index 53505b97a45b6..39cb0b6314c8e 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/PipelineContext.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/PipelineContext.java @@ -84,6 +84,7 @@ public class PipelineContext private final CounterStat outputPositions = new CounterStat(); private final ConcurrentMap operatorSummaries = new ConcurrentHashMap<>(); + private final AtomicLong spilledBytes = new AtomicLong(); public PipelineContext(TaskContext taskContext, Executor executor, boolean inputPipeline, boolean outputPipeline) { @@ -225,6 +226,7 @@ public synchronized ListenableFuture reserveSystemMemory(long bytes) public synchronized ListenableFuture reserveSpill(long bytes) { + spilledBytes.getAndAdd(bytes); return taskContext.reserveSpill(bytes); } @@ -451,7 +453,9 @@ public PipelineStats getPipelineStats() outputPositions, ImmutableList.copyOf(operatorSummaries.values()), - drivers); + drivers, + + succinctBytes(spilledBytes.get())); } private static boolean compareAndSet(ConcurrentMap map, K key, V oldValue, V newValue) diff --git a/presto-main/src/main/java/com/facebook/presto/operator/PipelineStats.java b/presto-main/src/main/java/com/facebook/presto/operator/PipelineStats.java index 768ddd5aac1f3..6229a46082fa6 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/PipelineStats.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/PipelineStats.java @@ -73,6 +73,8 @@ public class PipelineStats private final List operatorSummaries; private final List drivers; + private final DataSize spilledDataSize; + @JsonCreator public PipelineStats( @JsonProperty("firstStartTime") DateTime firstStartTime, @@ -112,7 +114,8 @@ public PipelineStats( @JsonProperty("outputPositions") long outputPositions, @JsonProperty("operatorSummaries") List operatorSummaries, - @JsonProperty("drivers") List drivers) + @JsonProperty("drivers") List drivers, + @JsonProperty("spilledDataSize") DataSize spilledDataSize) { this.firstStartTime = firstStartTime; this.lastStartTime = lastStartTime; @@ -161,6 +164,8 @@ public PipelineStats( this.operatorSummaries = ImmutableList.copyOf(requireNonNull(operatorSummaries, "operatorSummaries is null")); this.drivers = ImmutableList.copyOf(requireNonNull(drivers, "drivers is null")); + + this.spilledDataSize = spilledDataSize; } @Nullable @@ -340,6 +345,12 @@ public List getDrivers() return drivers; } + @JsonProperty + public DataSize getSpilledDataSize() + { + return spilledDataSize; + } + public PipelineStats summarize() { return new PipelineStats( @@ -371,6 +382,7 @@ public PipelineStats summarize() outputDataSize, outputPositions, operatorSummaries, - ImmutableList.of()); + ImmutableList.of(), + spilledDataSize); } } diff --git a/presto-main/src/main/java/com/facebook/presto/operator/TaskContext.java b/presto-main/src/main/java/com/facebook/presto/operator/TaskContext.java index d2e00c7285d13..71cad9388be80 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/TaskContext.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/TaskContext.java @@ -78,6 +78,8 @@ public class TaskContext @GuardedBy("cumulativeMemoryLock") private long lastTaskStatCallNanos = 0; + private final AtomicLong spilledBytes = new AtomicLong(); + public TaskContext(QueryContext queryContext, TaskStateMachine taskStateMachine, Executor executor, @@ -167,6 +169,7 @@ public synchronized ListenableFuture reserveSystemMemory(long bytes) public synchronized ListenableFuture reserveSpill(long bytes) { checkArgument(bytes >= 0, "bytes is negative"); + spilledBytes.getAndAdd(bytes); return queryContext.reserveSpill(bytes); } @@ -390,6 +393,7 @@ public TaskStats getTaskStats() processedInputPositions, succinctBytes(outputDataSize), outputPositions, - pipelineStats); + pipelineStats, + succinctBytes(spilledBytes.get())); } } diff --git a/presto-main/src/main/java/com/facebook/presto/operator/TaskStats.java b/presto-main/src/main/java/com/facebook/presto/operator/TaskStats.java index 15373ff37ad91..b1c690b6fed87 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/TaskStats.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/TaskStats.java @@ -71,6 +71,8 @@ public class TaskStats private final List pipelines; + private final DataSize spilledDataSize; + public TaskStats(DateTime createTime, DateTime endTime) { this(createTime, @@ -101,7 +103,8 @@ public TaskStats(DateTime createTime, DateTime endTime) 0, new DataSize(0, BYTE), 0, - ImmutableList.of()); + ImmutableList.of(), + new DataSize(0, BYTE)); } @JsonCreator @@ -141,7 +144,9 @@ public TaskStats( @JsonProperty("outputDataSize") DataSize outputDataSize, @JsonProperty("outputPositions") long outputPositions, - @JsonProperty("pipelines") List pipelines) + @JsonProperty("pipelines") List pipelines, + + @JsonProperty("spilledDataSize") DataSize spilledDataSize) { this.createTime = requireNonNull(createTime, "createTime is null"); this.firstStartTime = firstStartTime; @@ -190,6 +195,8 @@ public TaskStats( this.outputPositions = outputPositions; this.pipelines = ImmutableList.copyOf(requireNonNull(pipelines, "pipelines is null")); + + this.spilledDataSize = spilledDataSize; } @JsonProperty @@ -370,6 +377,12 @@ public int getRunningPartitionedDrivers() return runningPartitionedDrivers; } + @JsonProperty + public DataSize getSpilledDataSize() + { + return spilledDataSize; + } + public TaskStats summarize() { return new TaskStats( @@ -401,6 +414,7 @@ public TaskStats summarize() processedInputPositions, outputDataSize, outputPositions, - ImmutableList.of()); + ImmutableList.of(), + spilledDataSize); } } diff --git a/presto-main/src/test/java/com/facebook/presto/execution/TestQueryStats.java b/presto-main/src/test/java/com/facebook/presto/execution/TestQueryStats.java index 7a09834abedf3..ad4fe9c2c6994 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/TestQueryStats.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/TestQueryStats.java @@ -67,7 +67,8 @@ public class TestQueryStats 27, new DataSize(28, BYTE), - 29); + 29, + new DataSize(30, BYTE)); @Test public void testJson() @@ -122,5 +123,7 @@ public static void assertExpectedQueryStats(QueryStats actual) assertEquals(actual.getOutputDataSize(), new DataSize(28, BYTE)); assertEquals(actual.getOutputPositions(), 29); + + assertEquals(actual.getSpilledDataSize(), new DataSize(30, BYTE)); } } diff --git a/presto-main/src/test/java/com/facebook/presto/execution/TestStageStats.java b/presto-main/src/test/java/com/facebook/presto/execution/TestStageStats.java index e32bf12cfbbe9..02c781223dbba 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/TestStageStats.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/TestStageStats.java @@ -62,7 +62,8 @@ public class TestStageStats 22, new DataSize(23, BYTE), - 24); + 24, + new DataSize(25, BYTE)); @Test public void testJson() @@ -109,6 +110,7 @@ public static void assertExpectedStageStats(StageStats actual) assertEquals(actual.getOutputDataSize(), new DataSize(23, BYTE)); assertEquals(actual.getOutputPositions(), 24); + assertEquals(actual.getSpilledDataSize(), new DataSize(25, BYTE)); } private static DistributionSnapshot getTestDistribution(int count) diff --git a/presto-main/src/test/java/com/facebook/presto/operator/TestDriverStats.java b/presto-main/src/test/java/com/facebook/presto/operator/TestDriverStats.java index bbf0af7931376..ab41c89af32bd 100644 --- a/presto-main/src/test/java/com/facebook/presto/operator/TestDriverStats.java +++ b/presto-main/src/test/java/com/facebook/presto/operator/TestDriverStats.java @@ -57,7 +57,8 @@ public class TestDriverStats new DataSize(17, BYTE), 18, - ImmutableList.of(TestOperatorStats.EXPECTED)); + ImmutableList.of(TestOperatorStats.EXPECTED), + new DataSize(19, BYTE)); @Test public void testJson() @@ -98,5 +99,6 @@ public static void assertExpectedDriverStats(DriverStats actual) assertEquals(actual.getOperatorStats().size(), 1); assertExpectedOperatorStats(actual.getOperatorStats().get(0)); + assertEquals(actual.getSpilledDataSize(), new DataSize(19, BYTE)); } } diff --git a/presto-main/src/test/java/com/facebook/presto/operator/TestOperatorStats.java b/presto-main/src/test/java/com/facebook/presto/operator/TestOperatorStats.java index 17280e57ee3ae..bb49045d17e98 100644 --- a/presto-main/src/test/java/com/facebook/presto/operator/TestOperatorStats.java +++ b/presto-main/src/test/java/com/facebook/presto/operator/TestOperatorStats.java @@ -58,7 +58,8 @@ public class TestOperatorStats new DataSize(18, BYTE), new DataSize(19, BYTE), Optional.empty(), - "20"); + "20", + new DataSize(21, BYTE)); public static final OperatorStats MERGEABLE = new OperatorStats( 41, @@ -89,7 +90,8 @@ public class TestOperatorStats new DataSize(18, BYTE), new DataSize(19, BYTE), Optional.empty(), - new LongMergeable(20)); + new LongMergeable(20), + new DataSize(21, BYTE)); @Test public void testJson() @@ -131,6 +133,7 @@ public static void assertExpectedOperatorStats(OperatorStats actual) assertEquals(actual.getMemoryReservation(), new DataSize(18, BYTE)); assertEquals(actual.getSystemMemoryReservation(), new DataSize(19, BYTE)); assertEquals(actual.getInfo(), "20"); + assertEquals(actual.getSpilledDataSize(), new DataSize(21, BYTE)); } @Test @@ -164,6 +167,7 @@ public void testAdd() assertEquals(actual.getMemoryReservation(), new DataSize(3 * 18, BYTE)); assertEquals(actual.getSystemMemoryReservation(), new DataSize(3 * 19, BYTE)); assertEquals(actual.getInfo(), null); + assertEquals(actual.getSpilledDataSize(), new DataSize(3 * 21, BYTE)); } @Test @@ -197,6 +201,7 @@ public void testAddMergeable() assertEquals(actual.getMemoryReservation(), new DataSize(3 * 18, BYTE)); assertEquals(actual.getSystemMemoryReservation(), new DataSize(3 * 19, BYTE)); assertEquals(actual.getInfo(), new LongMergeable(20 * 3)); + assertEquals(actual.getSpilledDataSize(), new DataSize(3 * 21, BYTE)); } private static class LongMergeable diff --git a/presto-main/src/test/java/com/facebook/presto/operator/TestPipelineStats.java b/presto-main/src/test/java/com/facebook/presto/operator/TestPipelineStats.java index e03bd7dfd32e8..650c54740c627 100644 --- a/presto-main/src/test/java/com/facebook/presto/operator/TestPipelineStats.java +++ b/presto-main/src/test/java/com/facebook/presto/operator/TestPipelineStats.java @@ -70,7 +70,8 @@ public class TestPipelineStats 18, ImmutableList.of(TestOperatorStats.EXPECTED), - ImmutableList.of(TestDriverStats.EXPECTED)); + ImmutableList.of(TestDriverStats.EXPECTED), + new DataSize(19, BYTE)); @Test public void testJson() @@ -123,6 +124,7 @@ public static void assertExpectedPipelineStats(PipelineStats actual) assertEquals(actual.getDrivers().size(), 1); assertExpectedDriverStats(actual.getDrivers().get(0)); + assertEquals(actual.getSpilledDataSize(), new DataSize(19, BYTE)); } private static DistributionSnapshot getTestDistribution(int count) diff --git a/presto-main/src/test/java/com/facebook/presto/operator/TestTaskStats.java b/presto-main/src/test/java/com/facebook/presto/operator/TestTaskStats.java index 70de54aa7dbd0..d7fe128f780d2 100644 --- a/presto-main/src/test/java/com/facebook/presto/operator/TestTaskStats.java +++ b/presto-main/src/test/java/com/facebook/presto/operator/TestTaskStats.java @@ -64,7 +64,8 @@ public class TestTaskStats new DataSize(22, BYTE), 23, - ImmutableList.of(TestPipelineStats.EXPECTED)); + ImmutableList.of(TestPipelineStats.EXPECTED), + new DataSize(24, BYTE)); @Test public void testJson() @@ -114,5 +115,6 @@ public static void assertExpectedTaskStats(TaskStats actual) assertEquals(actual.getPipelines().size(), 1); assertExpectedPipelineStats(actual.getPipelines().get(0)); + assertEquals(actual.getSpilledDataSize(), new DataSize(24, BYTE)); } } diff --git a/presto-main/src/test/java/com/facebook/presto/server/TestBasicQueryInfo.java b/presto-main/src/test/java/com/facebook/presto/server/TestBasicQueryInfo.java index 3be861c09c0e6..0461c9ca6b3e7 100644 --- a/presto-main/src/test/java/com/facebook/presto/server/TestBasicQueryInfo.java +++ b/presto-main/src/test/java/com/facebook/presto/server/TestBasicQueryInfo.java @@ -82,7 +82,8 @@ public void testConstructor() DataSize.valueOf("29GB"), 30, DataSize.valueOf("31GB"), - 32), + 32, + DataSize.valueOf("33GB")), ImmutableMap.of(), ImmutableSet.of(), ImmutableMap.of(), From 7f42b13d38fbafad77cf877a64c0d8a794b29700 Mon Sep 17 00:00:00 2001 From: Wojciech Biela Date: Thu, 17 Nov 2016 10:08:51 +0100 Subject: [PATCH 2/4] add spilled data size to EXPLAIN ANALYZE Data size added to EXPLAIN ANALYZE output, at Fragment and Operator levels. This information is added to the "Cost" line and displayed only if the value is non-zero. --- .../presto/sql/planner/PlanPrinter.java | 44 +++++++++++++++---- 1 file changed, 35 insertions(+), 9 deletions(-) diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanPrinter.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanPrinter.java index f38c3974b728f..a4daee6c0d7ae 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanPrinter.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanPrinter.java @@ -207,11 +207,13 @@ private static List getPlanNodeStats(TaskStats taskStats) // are collected from the leaf stages. Map outputPositions = new HashMap<>(); Map outputBytes = new HashMap<>(); + Map spilledBytes = new HashMap<>(); Map wallMillis = new HashMap<>(); for (PipelineStats pipelineStats : taskStats.getPipelines()) { Map pipelineOutputPositions = new HashMap<>(); Map pipelineOutputBytes = new HashMap<>(); + Map pipelineSpilledBytes = new HashMap<>(); List operatorSummaries = pipelineStats.getOperatorSummaries(); for (int i = 0; i < operatorSummaries.size(); i++) { @@ -230,25 +232,27 @@ private static List getPlanNodeStats(TaskStats taskStats) pipelineOutputPositions.put(planNodeId, operatorStats.getOutputPositions()); pipelineOutputBytes.put(planNodeId, operatorStats.getOutputDataSize().toBytes()); } + pipelineSpilledBytes.put(planNodeId, operatorStats.getSpilledDataSize().toBytes()); } for (Map.Entry entry : pipelineOutputPositions.entrySet()) { outputBytes.merge(entry.getKey(), pipelineOutputBytes.get(entry.getKey()), Long::sum); outputPositions.merge(entry.getKey(), entry.getValue(), Long::sum); + spilledBytes.merge(entry.getKey(), pipelineSpilledBytes.get(entry.getKey()), Long::sum); } } List stats = new ArrayList<>(); for (Map.Entry entry : wallMillis.entrySet()) { if (outputPositions.containsKey(entry.getKey())) { - stats.add(new PlanNodeStats(entry.getKey(), new Duration(entry.getValue(), MILLISECONDS), outputPositions.get(entry.getKey()), succinctDataSize(outputBytes.get(entry.getKey()), BYTE))); + stats.add(new PlanNodeStats(entry.getKey(), new Duration(entry.getValue(), MILLISECONDS), succinctBytes(spilledBytes.get(entry.getKey())), outputPositions.get(entry.getKey()), succinctDataSize(outputBytes.get(entry.getKey()), BYTE))); } else { // It's possible there will be no output stats because all the pipelines that we observed were non-output. // For example in a query like SELECT * FROM a JOIN b ON c = d LIMIT 1 // It's possible to observe stats after the build starts, but before the probe does // and therefore only have wall time, but no output stats - stats.add(new PlanNodeStats(entry.getKey(), new Duration(entry.getValue(), MILLISECONDS))); + stats.add(new PlanNodeStats(entry.getKey(), new Duration(entry.getValue(), MILLISECONDS), succinctBytes(spilledBytes.get(entry.getKey())))); } } return stats; @@ -273,12 +277,16 @@ private static String formatFragment(Metadata metadata, Session session, PlanFra if (stageStats.isPresent()) { builder.append(indentString(1)) - .append(format("Cost: CPU %s, Input %d (%s), Output %d (%s)\n", + .append(format("Cost: CPU %s, Input %d (%s), Output %d (%s)", stageStats.get().getTotalCpuTime(), stageStats.get().getProcessedInputPositions(), stageStats.get().getProcessedInputDataSize(), stageStats.get().getOutputPositions(), stageStats.get().getOutputDataSize())); + if (isNonZero(stageStats.get().getSpilledDataSize())) { + builder.append(", Spilled " + stageStats.get().getSpilledDataSize()); + } + builder.append("\n"); } PartitioningScheme partitioningScheme = fragment.getPartitioningScheme(); @@ -323,6 +331,11 @@ private static String formatFragment(Metadata metadata, Session session, PlanFra return builder.toString(); } + private static boolean isNonZero(DataSize dataSize) + { + return dataSize != null && dataSize.getValue() != 0; + } + public static String graphvizLogicalPlan(PlanNode plan, Map types) { PlanFragment fragment = new PlanFragment( @@ -384,7 +397,11 @@ private void printStats(int indent, PlanNodeId planNodeId) outputString = "unknown"; } output.append(indentString(indent)) - .append(format("Cost: %s, Output: %s\n", fractionString, outputString)); + .append(format("Cost: %s, Output: %s", fractionString, outputString)); + if (isNonZero(stats.getSpilledDataSize())) { + output.append(", Spilled: " + stats.getSpilledDataSize()); + } + output.append("\n"); } private static String indentString(int indent) @@ -1061,23 +1078,26 @@ private static class PlanNodeStats private final Duration wallTime; private final Optional outputPositions; private final Optional outputDataSize; + private final DataSize spilledDataSize; - public PlanNodeStats(PlanNodeId planNodeId, Duration wallTime) + public PlanNodeStats(PlanNodeId planNodeId, Duration wallTime, DataSize spilledDataSize) { - this(planNodeId, wallTime, Optional.empty(), Optional.empty()); + this(planNodeId, wallTime, spilledDataSize, Optional.empty(), Optional.empty()); + spilledDataSize = spilledDataSize; } - public PlanNodeStats(PlanNodeId planNodeId, Duration wallTime, long outputPositions, DataSize outputDataSize) + public PlanNodeStats(PlanNodeId planNodeId, Duration wallTime, DataSize spilledDataSize, long outputPositions, DataSize outputDataSize) { - this(planNodeId, wallTime, Optional.of(outputPositions), Optional.of(outputDataSize)); + this(planNodeId, wallTime, spilledDataSize, Optional.of(outputPositions), Optional.of(outputDataSize)); } - private PlanNodeStats(PlanNodeId planNodeId, Duration wallTime, Optional outputPositions, Optional outputDataSize) + private PlanNodeStats(PlanNodeId planNodeId, Duration wallTime, DataSize spilledDataSize, Optional outputPositions, Optional outputDataSize) { this.planNodeId = requireNonNull(planNodeId, "planNodeId is null"); this.wallTime = requireNonNull(wallTime, "wallTime is null"); this.outputPositions = outputPositions; this.outputDataSize = outputDataSize; + this.spilledDataSize = spilledDataSize; } public PlanNodeId getPlanNodeId() @@ -1100,6 +1120,11 @@ public Optional getOutputDataSize() return outputDataSize; } + public DataSize getSpilledDataSize() + { + return spilledDataSize; + } + public static PlanNodeStats merge(PlanNodeStats planNodeStats1, PlanNodeStats planNodeStats2) { checkArgument(planNodeStats1.getPlanNodeId().equals(planNodeStats2.getPlanNodeId()), "planNodeIds do not match. %s != %s", planNodeStats1.getPlanNodeId(), planNodeStats2.getPlanNodeId()); @@ -1126,6 +1151,7 @@ else if (planNodeStats1.getOutputDataSize().isPresent()) { return new PlanNodeStats( planNodeStats1.getPlanNodeId(), new Duration(planNodeStats1.getWallTime().toMillis() + planNodeStats2.getWallTime().toMillis(), MILLISECONDS), + succinctBytes(planNodeStats1.getSpilledDataSize().toBytes() + planNodeStats2.getSpilledDataSize().toBytes()), outputPositions, outputDataSize); } From fb131de1c259a271889cefb33ca983a929aed57c Mon Sep 17 00:00:00 2001 From: Wojciech Biela Date: Thu, 17 Nov 2016 10:11:16 +0100 Subject: [PATCH 3/4] add spilled data size to query summary debug When CLI is started with --debug then spilled data size for the entire query is being displayed (both in the running total, and the final summary). The information is added in a separate line below "Parallelism". --- .../facebook/presto/cli/StatusPrinter.java | 2 ++ .../presto/client/StatementStats.java | 23 +++++++++++++++++-- .../presto/server/StatementResource.java | 1 + 3 files changed, 24 insertions(+), 2 deletions(-) diff --git a/presto-cli/src/main/java/com/facebook/presto/cli/StatusPrinter.java b/presto-cli/src/main/java/com/facebook/presto/cli/StatusPrinter.java index 09f1eaec94f9c..05ed9758629bb 100644 --- a/presto-cli/src/main/java/com/facebook/presto/cli/StatusPrinter.java +++ b/presto-cli/src/main/java/com/facebook/presto/cli/StatusPrinter.java @@ -193,6 +193,7 @@ public void printFinalInfo() reprintLine(perNodeSummary); out.println(String.format("Parallelism: %.1f", parallelism)); + out.println("Spilled: " + stats.getSpilledDataSize()); } // 0:32 [2.12GB, 15M rows] [67MB/s, 463K rows/s] @@ -280,6 +281,7 @@ private void printQueryInfo(QueryResults results) reprintLine(perNodeSummary); reprintLine(String.format("Parallelism: %.1f", parallelism)); + reprintLine("Spilled: " + stats.getSpilledDataSize()); } assert terminalWidth >= 75; diff --git a/presto-client/src/main/java/com/facebook/presto/client/StatementStats.java b/presto-client/src/main/java/com/facebook/presto/client/StatementStats.java index b611f9bd62841..210bcfe4401d5 100644 --- a/presto-client/src/main/java/com/facebook/presto/client/StatementStats.java +++ b/presto-client/src/main/java/com/facebook/presto/client/StatementStats.java @@ -15,6 +15,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import io.airlift.units.DataSize; import javax.annotation.Nullable; import javax.annotation.concurrent.Immutable; @@ -40,6 +41,7 @@ public class StatementStats private final long processedRows; private final long processedBytes; private final StageStats rootStage; + private final DataSize spilledDataSize; @JsonCreator public StatementStats( @@ -56,7 +58,8 @@ public StatementStats( @JsonProperty("wallTimeMillis") long wallTimeMillis, @JsonProperty("processedRows") long processedRows, @JsonProperty("processedBytes") long processedBytes, - @JsonProperty("rootStage") StageStats rootStage) + @JsonProperty("rootStage") StageStats rootStage, + @JsonProperty("spilledDataSize") DataSize spilledDataSize) { this.state = requireNonNull(state, "state is null"); this.queued = queued; @@ -72,6 +75,7 @@ public StatementStats( this.processedRows = processedRows; this.processedBytes = processedBytes; this.rootStage = rootStage; + this.spilledDataSize = spilledDataSize; } @NotNull @@ -160,6 +164,12 @@ public StageStats getRootStage() return rootStage; } + @JsonProperty + public DataSize getSpilledDataSize() + { + return spilledDataSize; + } + @Override public String toString() { @@ -178,6 +188,7 @@ public String toString() .add("processedRows", processedRows) .add("processedBytes", processedBytes) .add("rootStage", rootStage) + .add("spilledDataSize", spilledDataSize) .toString(); } @@ -202,6 +213,7 @@ public static class Builder private long processedRows; private long processedBytes; private StageStats rootStage; + private DataSize spilledDataSize; private Builder() {} @@ -289,6 +301,12 @@ public Builder setRootStage(StageStats rootStage) return this; } + public Builder setSpilledDataSize(DataSize spilledDataSize) + { + this.spilledDataSize = spilledDataSize; + return this; + } + public StatementStats build() { return new StatementStats( @@ -305,7 +323,8 @@ public StatementStats build() wallTimeMillis, processedRows, processedBytes, - rootStage); + rootStage, + spilledDataSize); } } } diff --git a/presto-main/src/main/java/com/facebook/presto/server/StatementResource.java b/presto-main/src/main/java/com/facebook/presto/server/StatementResource.java index a0cf0a13eaca8..f3c2a0a52d7b9 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/StatementResource.java +++ b/presto-main/src/main/java/com/facebook/presto/server/StatementResource.java @@ -615,6 +615,7 @@ private static StatementStats toStatementStats(QueryInfo queryInfo) .setProcessedRows(queryStats.getRawInputPositions()) .setProcessedBytes(queryStats.getRawInputDataSize().toBytes()) .setRootStage(toStageStats(outputStage)) + .setSpilledDataSize(queryStats.getSpilledDataSize()) .build(); } From 7c1d6d5b550cf16975ef12f4c5784b469e80a43a Mon Sep 17 00:00:00 2001 From: Wojciech Biela Date: Thu, 17 Nov 2016 14:43:35 +0100 Subject: [PATCH 4/4] add spilled data size to Web UI Added spilled data size to the Query Details page in the Resource Utilization Summary section and to the Live Plan page in the Web UI just below the "Splits" line (per stage). --- .../main/resources/com/facebook/presto/server/plan.html | 1 + presto-main/src/main/resources/webapp/assets/query.js | 8 ++++++++ 2 files changed, 9 insertions(+) diff --git a/presto-main/src/main/resources/com/facebook/presto/server/plan.html b/presto-main/src/main/resources/com/facebook/presto/server/plan.html index 290175ffcc45a..dc37ea7c8d412 100644 --- a/presto-main/src/main/resources/com/facebook/presto/server/plan.html +++ b/presto-main/src/main/resources/com/facebook/presto/server/plan.html @@ -181,6 +181,7 @@ ) + "
Memory: " + stats.totalMemoryReservation + "
" + "
Splits: Q:" + stats.queuedDrivers + ", R:" + stats.runningDrivers + ", F:" + stats.completedDrivers + "
" + + "
Spilled: " + stats.spilledDataSize + "
" + "
" + "
Input: " + stats.processedInputDataSize + " / " + formatCount(stats.processedInputPositions) + " rows
"; diff --git a/presto-main/src/main/resources/webapp/assets/query.js b/presto-main/src/main/resources/webapp/assets/query.js index 61548b83882d7..645de789030cc 100644 --- a/presto-main/src/main/resources/webapp/assets/query.js +++ b/presto-main/src/main/resources/webapp/assets/query.js @@ -1160,6 +1160,14 @@ var QueryDetail = React.createClass({ { formatDataSizeBytes(query.queryStats.cumulativeMemory / 1000.0, "") + " seconds" } + + + Spilled Data + + + { query.queryStats.spilledDataSize } + +