in core/src/main/scala/org/apache/spark/status/AppStatusStore.scala [255:533]
def taskSummary(
stageId: Int,
stageAttemptId: Int,
unsortedQuantiles: Array[Double]): Option[v1.TaskMetricDistributions] = {
val stageKey = Array(stageId, stageAttemptId)
val quantiles = unsortedQuantiles.sorted.toImmutableArraySeq
// We don't know how many tasks remain in the store that actually have metrics. So scan one
// metric and count how many valid tasks there are. Use skip() instead of next() since it's
// cheaper for disk stores (avoids deserialization).
val count = {
Utils.tryWithResource(
store.view(classOf[TaskDataWrapper])
.parent(stageKey)
.index(TaskIndexNames.EXEC_RUN_TIME)
.first(0L)
.closeableIterator()
) { it =>
var _count = 0L
while (it.hasNext()) {
_count += 1
it.skip(1)
}
_count
}
}
if (count <= 0) {
return None
}
// Find out which quantiles are already cached. The data in the store must match the expected
// task count to be considered, otherwise it will be re-scanned and overwritten.
val cachedQuantiles = quantiles.filter(shouldCacheQuantile).flatMap { q =>
val qkey = Array(stageId, stageAttemptId, quantileToString(q))
asOption(store.read(classOf[CachedQuantile], qkey)).filter(_.taskCount == count)
}
// If there are no missing quantiles, return the data. Otherwise, just compute everything
// to make the code simpler.
if (cachedQuantiles.size == quantiles.size) {
def toValues(fn: CachedQuantile => Double): IndexedSeq[Double] = cachedQuantiles.map(fn)
val distributions = new v1.TaskMetricDistributions(
quantiles = quantiles,
duration = toValues(_.duration),
executorDeserializeTime = toValues(_.executorDeserializeTime),
executorDeserializeCpuTime = toValues(_.executorDeserializeCpuTime),
executorRunTime = toValues(_.executorRunTime),
executorCpuTime = toValues(_.executorCpuTime),
resultSize = toValues(_.resultSize),
jvmGcTime = toValues(_.jvmGcTime),
resultSerializationTime = toValues(_.resultSerializationTime),
gettingResultTime = toValues(_.gettingResultTime),
schedulerDelay = toValues(_.schedulerDelay),
peakExecutionMemory = toValues(_.peakExecutionMemory),
memoryBytesSpilled = toValues(_.memoryBytesSpilled),
diskBytesSpilled = toValues(_.diskBytesSpilled),
inputMetrics = new v1.InputMetricDistributions(
toValues(_.bytesRead),
toValues(_.recordsRead)),
outputMetrics = new v1.OutputMetricDistributions(
toValues(_.bytesWritten),
toValues(_.recordsWritten)),
shuffleReadMetrics = new v1.ShuffleReadMetricDistributions(
toValues(_.shuffleReadBytes),
toValues(_.shuffleRecordsRead),
toValues(_.shuffleRemoteBlocksFetched),
toValues(_.shuffleLocalBlocksFetched),
toValues(_.shuffleFetchWaitTime),
toValues(_.shuffleRemoteBytesRead),
toValues(_.shuffleRemoteBytesReadToDisk),
toValues(_.shuffleTotalBlocksFetched),
toValues(_.shuffleRemoteReqsDuration),
new v1.ShufflePushReadMetricDistributions(
toValues(_.shuffleCorruptMergedBlockChunks),
toValues(_.shuffleMergedFetchFallbackCount),
toValues(_.shuffleMergedRemoteBlocksFetched),
toValues(_.shuffleMergedLocalBlocksFetched),
toValues(_.shuffleMergedRemoteChunksFetched),
toValues(_.shuffleMergedLocalChunksFetched),
toValues(_.shuffleMergedRemoteBytesRead),
toValues(_.shuffleMergedLocalBytesRead),
toValues(_.shuffleMergedRemoteReqsDuration))),
shuffleWriteMetrics = new v1.ShuffleWriteMetricDistributions(
toValues(_.shuffleWriteBytes),
toValues(_.shuffleWriteRecords),
toValues(_.shuffleWriteTime)))
return Some(distributions)
}
// Compute quantiles by scanning the tasks in the store. This is not really stable for live
// stages (e.g. the number of recorded tasks may change while this code is running), but should
// stabilize once the stage finishes. It's also slow, especially with disk stores.
val indices = quantiles.map { q => math.min((q * count).toLong, count - 1) }
def scanTasks(index: String)(fn: TaskDataWrapper => Long): IndexedSeq[Double] = {
Utils.tryWithResource(
store.view(classOf[TaskDataWrapper])
.parent(stageKey)
.index(index)
.first(0L)
.closeableIterator()
) { it =>
var last = Double.NaN
var currentIdx = -1L
indices.map { idx =>
if (idx == currentIdx) {
last
} else {
val diff = idx - currentIdx
currentIdx = idx
if (it.skip(diff - 1)) {
last = fn(it.next()).toDouble
last
} else {
Double.NaN
}
}
}
}
}
val computedQuantiles = new v1.TaskMetricDistributions(
quantiles = quantiles,
duration = scanTasks(TaskIndexNames.DURATION) { t =>
t.duration
},
executorDeserializeTime = scanTasks(TaskIndexNames.DESER_TIME) { t =>
t.executorDeserializeTime
},
executorDeserializeCpuTime = scanTasks(TaskIndexNames.DESER_CPU_TIME) { t =>
t.executorDeserializeCpuTime
},
executorRunTime = scanTasks(TaskIndexNames.EXEC_RUN_TIME) { t => t.executorRunTime },
executorCpuTime = scanTasks(TaskIndexNames.EXEC_CPU_TIME) { t => t.executorCpuTime },
resultSize = scanTasks(TaskIndexNames.RESULT_SIZE) { t => t.resultSize },
jvmGcTime = scanTasks(TaskIndexNames.GC_TIME) { t => t.jvmGcTime },
resultSerializationTime = scanTasks(TaskIndexNames.SER_TIME) { t =>
t.resultSerializationTime
},
gettingResultTime = scanTasks(TaskIndexNames.GETTING_RESULT_TIME) { t =>
t.gettingResultTime
},
schedulerDelay = scanTasks(TaskIndexNames.SCHEDULER_DELAY) { t => t.schedulerDelay },
peakExecutionMemory = scanTasks(TaskIndexNames.PEAK_MEM) { t => t.peakExecutionMemory },
memoryBytesSpilled = scanTasks(TaskIndexNames.MEM_SPILL) { t => t.memoryBytesSpilled },
diskBytesSpilled = scanTasks(TaskIndexNames.DISK_SPILL) { t => t.diskBytesSpilled },
inputMetrics = new v1.InputMetricDistributions(
scanTasks(TaskIndexNames.INPUT_SIZE) { t => t.inputBytesRead },
scanTasks(TaskIndexNames.INPUT_RECORDS) { t => t.inputRecordsRead }),
outputMetrics = new v1.OutputMetricDistributions(
scanTasks(TaskIndexNames.OUTPUT_SIZE) { t => t.outputBytesWritten },
scanTasks(TaskIndexNames.OUTPUT_RECORDS) { t => t.outputRecordsWritten }),
shuffleReadMetrics = new v1.ShuffleReadMetricDistributions(
scanTasks(TaskIndexNames.SHUFFLE_TOTAL_READS) { m =>
m.shuffleLocalBytesRead + m.shuffleRemoteBytesRead
},
scanTasks(TaskIndexNames.SHUFFLE_READ_RECORDS) { t => t.shuffleRecordsRead },
scanTasks(TaskIndexNames.SHUFFLE_REMOTE_BLOCKS) { t => t.shuffleRemoteBlocksFetched },
scanTasks(TaskIndexNames.SHUFFLE_LOCAL_BLOCKS) { t => t.shuffleLocalBlocksFetched },
scanTasks(TaskIndexNames.SHUFFLE_READ_FETCH_WAIT_TIME) { t => t.shuffleFetchWaitTime },
scanTasks(TaskIndexNames.SHUFFLE_REMOTE_READS) { t => t.shuffleRemoteBytesRead },
scanTasks(TaskIndexNames.SHUFFLE_REMOTE_READS_TO_DISK) { t =>
t.shuffleRemoteBytesReadToDisk
},
scanTasks(TaskIndexNames.SHUFFLE_TOTAL_BLOCKS) { m =>
m.shuffleLocalBlocksFetched + m.shuffleRemoteBlocksFetched
},
scanTasks(TaskIndexNames.SHUFFLE_REMOTE_REQS_DURATION) {
t => t.shuffleRemoteReqsDuration
},
new v1.ShufflePushReadMetricDistributions(
scanTasks(TaskIndexNames.SHUFFLE_PUSH_CORRUPT_MERGED_BLOCK_CHUNKS) { t =>
t.shuffleCorruptMergedBlockChunks
},
scanTasks(TaskIndexNames.SHUFFLE_PUSH_MERGED_FETCH_FALLBACK_COUNT) {
t => t.shuffleMergedFetchFallbackCount
},
scanTasks(TaskIndexNames.SHUFFLE_PUSH_MERGED_REMOTE_BLOCKS) { t =>
t.shuffleMergedRemoteBlocksFetched
},
scanTasks(TaskIndexNames.SHUFFLE_PUSH_MERGED_LOCAL_BLOCKS) { t =>
t.shuffleMergedLocalBlocksFetched
},
scanTasks(TaskIndexNames.SHUFFLE_PUSH_MERGED_REMOTE_CHUNKS) { t =>
t.shuffleMergedRemoteChunksFetched
},
scanTasks(TaskIndexNames.SHUFFLE_PUSH_MERGED_LOCAL_CHUNKS) { t =>
t.shuffleMergedLocalChunksFetched
},
scanTasks(TaskIndexNames.SHUFFLE_PUSH_MERGED_REMOTE_READS) { t =>
t.shuffleMergedRemoteBytesRead
},
scanTasks(TaskIndexNames.SHUFFLE_PUSH_MERGED_LOCAL_READS) { t =>
t.shuffleMergedLocalBytesRead
},
scanTasks(TaskIndexNames.SHUFFLE_PUSH_MERGED_REMOTE_REQS_DURATION) { t =>
t.shuffleMergedRemoteReqDuration
})),
shuffleWriteMetrics = new v1.ShuffleWriteMetricDistributions(
scanTasks(TaskIndexNames.SHUFFLE_WRITE_SIZE) { t => t.shuffleBytesWritten },
scanTasks(TaskIndexNames.SHUFFLE_WRITE_RECORDS) { t => t.shuffleRecordsWritten },
scanTasks(TaskIndexNames.SHUFFLE_WRITE_TIME) { t => t.shuffleWriteTime }))
// Go through the computed quantiles and cache the values that match the caching criteria.
computedQuantiles.quantiles.zipWithIndex
.filter { case (q, _) => quantiles.contains(q) && shouldCacheQuantile(q) }
.foreach { case (q, idx) =>
val cached = new CachedQuantile(stageId, stageAttemptId, quantileToString(q), count,
duration = computedQuantiles.duration(idx),
executorDeserializeTime = computedQuantiles.executorDeserializeTime(idx),
executorDeserializeCpuTime = computedQuantiles.executorDeserializeCpuTime(idx),
executorRunTime = computedQuantiles.executorRunTime(idx),
executorCpuTime = computedQuantiles.executorCpuTime(idx),
resultSize = computedQuantiles.resultSize(idx),
jvmGcTime = computedQuantiles.jvmGcTime(idx),
resultSerializationTime = computedQuantiles.resultSerializationTime(idx),
gettingResultTime = computedQuantiles.gettingResultTime(idx),
schedulerDelay = computedQuantiles.schedulerDelay(idx),
peakExecutionMemory = computedQuantiles.peakExecutionMemory(idx),
memoryBytesSpilled = computedQuantiles.memoryBytesSpilled(idx),
diskBytesSpilled = computedQuantiles.diskBytesSpilled(idx),
bytesRead = computedQuantiles.inputMetrics.bytesRead(idx),
recordsRead = computedQuantiles.inputMetrics.recordsRead(idx),
bytesWritten = computedQuantiles.outputMetrics.bytesWritten(idx),
recordsWritten = computedQuantiles.outputMetrics.recordsWritten(idx),
shuffleReadBytes = computedQuantiles.shuffleReadMetrics.readBytes(idx),
shuffleRecordsRead = computedQuantiles.shuffleReadMetrics.readRecords(idx),
shuffleRemoteBlocksFetched =
computedQuantiles.shuffleReadMetrics.remoteBlocksFetched(idx),
shuffleLocalBlocksFetched = computedQuantiles.shuffleReadMetrics.localBlocksFetched(idx),
shuffleFetchWaitTime = computedQuantiles.shuffleReadMetrics.fetchWaitTime(idx),
shuffleRemoteBytesRead = computedQuantiles.shuffleReadMetrics.remoteBytesRead(idx),
shuffleRemoteBytesReadToDisk =
computedQuantiles.shuffleReadMetrics.remoteBytesReadToDisk(idx),
shuffleTotalBlocksFetched = computedQuantiles.shuffleReadMetrics.totalBlocksFetched(idx),
shuffleCorruptMergedBlockChunks =
computedQuantiles.shuffleReadMetrics.shufflePushReadMetricsDist
.corruptMergedBlockChunks(idx),
shuffleMergedFetchFallbackCount =
computedQuantiles.shuffleReadMetrics.shufflePushReadMetricsDist
.mergedFetchFallbackCount(idx),
shuffleMergedRemoteBlocksFetched =
computedQuantiles.shuffleReadMetrics.shufflePushReadMetricsDist
.remoteMergedBlocksFetched(idx),
shuffleMergedLocalBlocksFetched =
computedQuantiles.shuffleReadMetrics.shufflePushReadMetricsDist
.localMergedBlocksFetched(idx),
shuffleMergedRemoteChunksFetched =
computedQuantiles.shuffleReadMetrics.shufflePushReadMetricsDist
.remoteMergedChunksFetched(idx),
shuffleMergedLocalChunksFetched =
computedQuantiles.shuffleReadMetrics.shufflePushReadMetricsDist
.localMergedChunksFetched(idx),
shuffleMergedRemoteBytesRead =
computedQuantiles.shuffleReadMetrics.shufflePushReadMetricsDist
.remoteMergedBytesRead(idx),
shuffleMergedLocalBytesRead =
computedQuantiles.shuffleReadMetrics.shufflePushReadMetricsDist
.localMergedBytesRead(idx),
shuffleRemoteReqsDuration =
computedQuantiles.shuffleReadMetrics.remoteReqsDuration(idx),
shuffleMergedRemoteReqsDuration =
computedQuantiles.shuffleReadMetrics.shufflePushReadMetricsDist
.remoteMergedReqsDuration(idx),
shuffleWriteBytes = computedQuantiles.shuffleWriteMetrics.writeBytes(idx),
shuffleWriteRecords = computedQuantiles.shuffleWriteMetrics.writeRecords(idx),
shuffleWriteTime = computedQuantiles.shuffleWriteMetrics.writeTime(idx))
store.write(cached)
}
Some(computedQuantiles)
}