def taskSummary()

in core/src/main/scala/org/apache/spark/status/AppStatusStore.scala [255:533]


  def taskSummary(
      stageId: Int,
      stageAttemptId: Int,
      unsortedQuantiles: Array[Double]): Option[v1.TaskMetricDistributions] = {
    val stageKey = Array(stageId, stageAttemptId)
    val quantiles = unsortedQuantiles.sorted.toImmutableArraySeq

    // We don't know how many tasks remain in the store that actually have metrics. So scan one
    // metric and count how many valid tasks there are. Use skip() instead of next() since it's
    // cheaper for disk stores (avoids deserialization).
    val count = {
      Utils.tryWithResource(
        store.view(classOf[TaskDataWrapper])
          .parent(stageKey)
          .index(TaskIndexNames.EXEC_RUN_TIME)
          .first(0L)
          .closeableIterator()
      ) { it =>
        var _count = 0L
        while (it.hasNext()) {
          _count += 1
          it.skip(1)
        }
        _count
      }
    }

    if (count <= 0) {
      return None
    }

    // Find out which quantiles are already cached. The data in the store must match the expected
    // task count to be considered, otherwise it will be re-scanned and overwritten.
    val cachedQuantiles = quantiles.filter(shouldCacheQuantile).flatMap { q =>
      val qkey = Array(stageId, stageAttemptId, quantileToString(q))
      asOption(store.read(classOf[CachedQuantile], qkey)).filter(_.taskCount == count)
    }

    // If there are no missing quantiles, return the data. Otherwise, just compute everything
    // to make the code simpler.
    if (cachedQuantiles.size == quantiles.size) {
      def toValues(fn: CachedQuantile => Double): IndexedSeq[Double] = cachedQuantiles.map(fn)

      val distributions = new v1.TaskMetricDistributions(
        quantiles = quantiles,
        duration = toValues(_.duration),
        executorDeserializeTime = toValues(_.executorDeserializeTime),
        executorDeserializeCpuTime = toValues(_.executorDeserializeCpuTime),
        executorRunTime = toValues(_.executorRunTime),
        executorCpuTime = toValues(_.executorCpuTime),
        resultSize = toValues(_.resultSize),
        jvmGcTime = toValues(_.jvmGcTime),
        resultSerializationTime = toValues(_.resultSerializationTime),
        gettingResultTime = toValues(_.gettingResultTime),
        schedulerDelay = toValues(_.schedulerDelay),
        peakExecutionMemory = toValues(_.peakExecutionMemory),
        memoryBytesSpilled = toValues(_.memoryBytesSpilled),
        diskBytesSpilled = toValues(_.diskBytesSpilled),
        inputMetrics = new v1.InputMetricDistributions(
          toValues(_.bytesRead),
          toValues(_.recordsRead)),
        outputMetrics = new v1.OutputMetricDistributions(
          toValues(_.bytesWritten),
          toValues(_.recordsWritten)),
        shuffleReadMetrics = new v1.ShuffleReadMetricDistributions(
          toValues(_.shuffleReadBytes),
          toValues(_.shuffleRecordsRead),
          toValues(_.shuffleRemoteBlocksFetched),
          toValues(_.shuffleLocalBlocksFetched),
          toValues(_.shuffleFetchWaitTime),
          toValues(_.shuffleRemoteBytesRead),
          toValues(_.shuffleRemoteBytesReadToDisk),
          toValues(_.shuffleTotalBlocksFetched),
          toValues(_.shuffleRemoteReqsDuration),
          new v1.ShufflePushReadMetricDistributions(
            toValues(_.shuffleCorruptMergedBlockChunks),
            toValues(_.shuffleMergedFetchFallbackCount),
            toValues(_.shuffleMergedRemoteBlocksFetched),
            toValues(_.shuffleMergedLocalBlocksFetched),
            toValues(_.shuffleMergedRemoteChunksFetched),
            toValues(_.shuffleMergedLocalChunksFetched),
            toValues(_.shuffleMergedRemoteBytesRead),
            toValues(_.shuffleMergedLocalBytesRead),
            toValues(_.shuffleMergedRemoteReqsDuration))),
        shuffleWriteMetrics = new v1.ShuffleWriteMetricDistributions(
          toValues(_.shuffleWriteBytes),
          toValues(_.shuffleWriteRecords),
          toValues(_.shuffleWriteTime)))

      return Some(distributions)
    }

    // Compute quantiles by scanning the tasks in the store. This is not really stable for live
    // stages (e.g. the number of recorded tasks may change while this code is running), but should
    // stabilize once the stage finishes. It's also slow, especially with disk stores.
    val indices = quantiles.map { q => math.min((q * count).toLong, count - 1) }

    def scanTasks(index: String)(fn: TaskDataWrapper => Long): IndexedSeq[Double] = {
      Utils.tryWithResource(
        store.view(classOf[TaskDataWrapper])
          .parent(stageKey)
          .index(index)
          .first(0L)
          .closeableIterator()
      ) { it =>
        var last = Double.NaN
        var currentIdx = -1L
        indices.map { idx =>
          if (idx == currentIdx) {
            last
          } else {
            val diff = idx - currentIdx
            currentIdx = idx
            if (it.skip(diff - 1)) {
              last = fn(it.next()).toDouble
              last
            } else {
              Double.NaN
            }
          }
        }
      }
    }

    val computedQuantiles = new v1.TaskMetricDistributions(
      quantiles = quantiles,
      duration = scanTasks(TaskIndexNames.DURATION) { t =>
        t.duration
      },
      executorDeserializeTime = scanTasks(TaskIndexNames.DESER_TIME) { t =>
        t.executorDeserializeTime
      },
      executorDeserializeCpuTime = scanTasks(TaskIndexNames.DESER_CPU_TIME) { t =>
        t.executorDeserializeCpuTime
      },
      executorRunTime = scanTasks(TaskIndexNames.EXEC_RUN_TIME) { t => t.executorRunTime },
      executorCpuTime = scanTasks(TaskIndexNames.EXEC_CPU_TIME) { t => t.executorCpuTime },
      resultSize = scanTasks(TaskIndexNames.RESULT_SIZE) { t => t.resultSize },
      jvmGcTime = scanTasks(TaskIndexNames.GC_TIME) { t => t.jvmGcTime },
      resultSerializationTime = scanTasks(TaskIndexNames.SER_TIME) { t =>
        t.resultSerializationTime
      },
      gettingResultTime = scanTasks(TaskIndexNames.GETTING_RESULT_TIME) { t =>
        t.gettingResultTime
      },
      schedulerDelay = scanTasks(TaskIndexNames.SCHEDULER_DELAY) { t => t.schedulerDelay },
      peakExecutionMemory = scanTasks(TaskIndexNames.PEAK_MEM) { t => t.peakExecutionMemory },
      memoryBytesSpilled = scanTasks(TaskIndexNames.MEM_SPILL) { t => t.memoryBytesSpilled },
      diskBytesSpilled = scanTasks(TaskIndexNames.DISK_SPILL) { t => t.diskBytesSpilled },
      inputMetrics = new v1.InputMetricDistributions(
        scanTasks(TaskIndexNames.INPUT_SIZE) { t => t.inputBytesRead },
        scanTasks(TaskIndexNames.INPUT_RECORDS) { t => t.inputRecordsRead }),
      outputMetrics = new v1.OutputMetricDistributions(
        scanTasks(TaskIndexNames.OUTPUT_SIZE) { t => t.outputBytesWritten },
        scanTasks(TaskIndexNames.OUTPUT_RECORDS) { t => t.outputRecordsWritten }),
      shuffleReadMetrics = new v1.ShuffleReadMetricDistributions(
        scanTasks(TaskIndexNames.SHUFFLE_TOTAL_READS) { m =>
          m.shuffleLocalBytesRead + m.shuffleRemoteBytesRead
        },
        scanTasks(TaskIndexNames.SHUFFLE_READ_RECORDS) { t => t.shuffleRecordsRead },
        scanTasks(TaskIndexNames.SHUFFLE_REMOTE_BLOCKS) { t => t.shuffleRemoteBlocksFetched },
        scanTasks(TaskIndexNames.SHUFFLE_LOCAL_BLOCKS) { t => t.shuffleLocalBlocksFetched },
        scanTasks(TaskIndexNames.SHUFFLE_READ_FETCH_WAIT_TIME) { t => t.shuffleFetchWaitTime },
        scanTasks(TaskIndexNames.SHUFFLE_REMOTE_READS) { t => t.shuffleRemoteBytesRead },
        scanTasks(TaskIndexNames.SHUFFLE_REMOTE_READS_TO_DISK) { t =>
          t.shuffleRemoteBytesReadToDisk
        },
        scanTasks(TaskIndexNames.SHUFFLE_TOTAL_BLOCKS) { m =>
          m.shuffleLocalBlocksFetched + m.shuffleRemoteBlocksFetched
        },
        scanTasks(TaskIndexNames.SHUFFLE_REMOTE_REQS_DURATION) {
          t => t.shuffleRemoteReqsDuration
        },
        new v1.ShufflePushReadMetricDistributions(
          scanTasks(TaskIndexNames.SHUFFLE_PUSH_CORRUPT_MERGED_BLOCK_CHUNKS) { t =>
            t.shuffleCorruptMergedBlockChunks
          },
          scanTasks(TaskIndexNames.SHUFFLE_PUSH_MERGED_FETCH_FALLBACK_COUNT) {
            t => t.shuffleMergedFetchFallbackCount
          },
          scanTasks(TaskIndexNames.SHUFFLE_PUSH_MERGED_REMOTE_BLOCKS) { t =>
            t.shuffleMergedRemoteBlocksFetched
          },
          scanTasks(TaskIndexNames.SHUFFLE_PUSH_MERGED_LOCAL_BLOCKS) { t =>
            t.shuffleMergedLocalBlocksFetched
          },
          scanTasks(TaskIndexNames.SHUFFLE_PUSH_MERGED_REMOTE_CHUNKS) { t =>
            t.shuffleMergedRemoteChunksFetched
          },
          scanTasks(TaskIndexNames.SHUFFLE_PUSH_MERGED_LOCAL_CHUNKS) { t =>
            t.shuffleMergedLocalChunksFetched
          },
          scanTasks(TaskIndexNames.SHUFFLE_PUSH_MERGED_REMOTE_READS) { t =>
            t.shuffleMergedRemoteBytesRead
          },
          scanTasks(TaskIndexNames.SHUFFLE_PUSH_MERGED_LOCAL_READS) { t =>
            t.shuffleMergedLocalBytesRead
          },
          scanTasks(TaskIndexNames.SHUFFLE_PUSH_MERGED_REMOTE_REQS_DURATION) { t =>
            t.shuffleMergedRemoteReqDuration
          })),
        shuffleWriteMetrics = new v1.ShuffleWriteMetricDistributions(
          scanTasks(TaskIndexNames.SHUFFLE_WRITE_SIZE) { t => t.shuffleBytesWritten },
          scanTasks(TaskIndexNames.SHUFFLE_WRITE_RECORDS) { t => t.shuffleRecordsWritten },
          scanTasks(TaskIndexNames.SHUFFLE_WRITE_TIME) { t => t.shuffleWriteTime }))

    // Go through the computed quantiles and cache the values that match the caching criteria.
    computedQuantiles.quantiles.zipWithIndex
      .filter { case (q, _) => quantiles.contains(q) && shouldCacheQuantile(q) }
      .foreach { case (q, idx) =>
        val cached = new CachedQuantile(stageId, stageAttemptId, quantileToString(q), count,
          duration = computedQuantiles.duration(idx),
          executorDeserializeTime = computedQuantiles.executorDeserializeTime(idx),
          executorDeserializeCpuTime = computedQuantiles.executorDeserializeCpuTime(idx),
          executorRunTime = computedQuantiles.executorRunTime(idx),
          executorCpuTime = computedQuantiles.executorCpuTime(idx),
          resultSize = computedQuantiles.resultSize(idx),
          jvmGcTime = computedQuantiles.jvmGcTime(idx),
          resultSerializationTime = computedQuantiles.resultSerializationTime(idx),
          gettingResultTime = computedQuantiles.gettingResultTime(idx),
          schedulerDelay = computedQuantiles.schedulerDelay(idx),
          peakExecutionMemory = computedQuantiles.peakExecutionMemory(idx),
          memoryBytesSpilled = computedQuantiles.memoryBytesSpilled(idx),
          diskBytesSpilled = computedQuantiles.diskBytesSpilled(idx),

          bytesRead = computedQuantiles.inputMetrics.bytesRead(idx),
          recordsRead = computedQuantiles.inputMetrics.recordsRead(idx),

          bytesWritten = computedQuantiles.outputMetrics.bytesWritten(idx),
          recordsWritten = computedQuantiles.outputMetrics.recordsWritten(idx),

          shuffleReadBytes = computedQuantiles.shuffleReadMetrics.readBytes(idx),
          shuffleRecordsRead = computedQuantiles.shuffleReadMetrics.readRecords(idx),
          shuffleRemoteBlocksFetched =
            computedQuantiles.shuffleReadMetrics.remoteBlocksFetched(idx),
          shuffleLocalBlocksFetched = computedQuantiles.shuffleReadMetrics.localBlocksFetched(idx),
          shuffleFetchWaitTime = computedQuantiles.shuffleReadMetrics.fetchWaitTime(idx),
          shuffleRemoteBytesRead = computedQuantiles.shuffleReadMetrics.remoteBytesRead(idx),
          shuffleRemoteBytesReadToDisk =
            computedQuantiles.shuffleReadMetrics.remoteBytesReadToDisk(idx),
          shuffleTotalBlocksFetched = computedQuantiles.shuffleReadMetrics.totalBlocksFetched(idx),
          shuffleCorruptMergedBlockChunks =
            computedQuantiles.shuffleReadMetrics.shufflePushReadMetricsDist
              .corruptMergedBlockChunks(idx),
          shuffleMergedFetchFallbackCount =
            computedQuantiles.shuffleReadMetrics.shufflePushReadMetricsDist
              .mergedFetchFallbackCount(idx),
          shuffleMergedRemoteBlocksFetched =
            computedQuantiles.shuffleReadMetrics.shufflePushReadMetricsDist
              .remoteMergedBlocksFetched(idx),
          shuffleMergedLocalBlocksFetched =
            computedQuantiles.shuffleReadMetrics.shufflePushReadMetricsDist
              .localMergedBlocksFetched(idx),
          shuffleMergedRemoteChunksFetched =
            computedQuantiles.shuffleReadMetrics.shufflePushReadMetricsDist
              .remoteMergedChunksFetched(idx),
          shuffleMergedLocalChunksFetched =
            computedQuantiles.shuffleReadMetrics.shufflePushReadMetricsDist
              .localMergedChunksFetched(idx),
          shuffleMergedRemoteBytesRead =
            computedQuantiles.shuffleReadMetrics.shufflePushReadMetricsDist
              .remoteMergedBytesRead(idx),
          shuffleMergedLocalBytesRead =
            computedQuantiles.shuffleReadMetrics.shufflePushReadMetricsDist
              .localMergedBytesRead(idx),
          shuffleRemoteReqsDuration =
            computedQuantiles.shuffleReadMetrics.remoteReqsDuration(idx),
          shuffleMergedRemoteReqsDuration =
            computedQuantiles.shuffleReadMetrics.shufflePushReadMetricsDist
              .remoteMergedReqsDuration(idx),

          shuffleWriteBytes = computedQuantiles.shuffleWriteMetrics.writeBytes(idx),
          shuffleWriteRecords = computedQuantiles.shuffleWriteMetrics.writeRecords(idx),
          shuffleWriteTime = computedQuantiles.shuffleWriteMetrics.writeTime(idx))
        store.write(cached)
      }

    Some(computedQuantiles)
  }