query/stats.go (180 lines of code) (raw):
// Copyright (c) 2017-2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package query
import (
"encoding/json"
"fmt"
"github.com/uber/aresdb/cgoutils"
"math"
"sort"
"time"
"unsafe"
"github.com/uber/aresdb/utils"
)
// stageName represents each query stage.
type stageName string
const (
prepareForeignTableTiming stageName = "prepareForeignTable"
transferTiming = "transfer"
prepareForFilteringTiming = "prepareForFiltering"
initIndexVectorTiming = "initIndexVector"
filterEvalTiming = "filterEval"
prepareForeignRecordIDsTiming = "prepareForeignRecordIDs"
foreignTableFilterEvalTiming = "foreignTableFilterEval"
geoIntersectEvalTiming = "geoIntersectEval"
prepareForDimAndMeasureTiming = "prepareForDimAndMeasure"
dimEvalTiming = "dimEval"
measureEvalTiming = "measureEval"
hllEvalTiming = "hllEval"
sortEvalTiming = "sortEval"
reduceEvalTiming = "reduceEval"
hashReduceEvalTiming = "hashReduceEval"
expandEvalTiming = "expandEval"
cleanupTiming = "cleanUpEval"
resultTransferTiming = "resultTransfer"
resultFlushTiming = "resultFlush"
finalCleanupTiming = "finalCleanUp"
)
// oopkBatchStats stores stats for a single batch execution.
type oopkBatchStats struct {
// Store timings for each stage of a single batch.
timings map[stageName]float64
// totalTiming for this batch.
totalTiming float64
batchID int32
batchSize int
bytesTransferred int
numTransferCalls int
}
// oopkStageSummaryStats stores running info for each stage.
type oopkStageSummaryStats struct {
name stageName
max float64
min float64
avg float64
count int
total float64
percentage float64
}
// MarshalJSON marshals the message to JSON in a custom way.
func (s *oopkStageSummaryStats) MarshalJSON() ([]byte, error) {
return json.Marshal(s.total)
}
type stageSummaryStatsSlice []*oopkStageSummaryStats
// Len implements sort.Sort interface for stageSummaryStatsSlice.
func (s stageSummaryStatsSlice) Len() int {
return len(s)
}
// Swap implements sort.Sort interface for stageSummaryStatsSlice.
func (s stageSummaryStatsSlice) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
// Less implements sort.Sort interface for stageSummaryStatsSlice.
func (s stageSummaryStatsSlice) Less(i, j int) bool {
return s[i].total < s[j].total
}
// oopkQueryStats stores the overall stats for a query.
type oopkQueryStats struct {
// stats for each stage. Sorted by total time.
stageStats []*oopkStageSummaryStats
// mapping from stage name to stage stats.
Name2Stage map[stageName]*oopkStageSummaryStats `json:"stages"`
// Total timing for all query stages **including transfer**.
TotalTiming float64 `json:"latency"`
// Total number of batches.
NumBatches int `json:"batches"`
// Total number of records processed on GPU.
// A record could represent multiple data record if firstColumn is compressed.
NumRecords int `json:"records"`
// For archive batch, we skip process empty batch. For live batch, we will skip it
// if its min or max value does not pass main table filters or time filters.
NumBatchSkipped int `json:"numBatchSkipped"`
// Stats for input data transferred via PCIe.
BytesTransferred int `json:"tranBytes"`
NumTransferCalls int `json:"tranCalls"`
}
// NumRows implements the utils.TableDataSource for stats.
func (stats oopkQueryStats) NumRows() int {
return len(stats.stageStats)
}
// GetValue implements the utils.TableDataSource for stats. **Notes** row boundary
// are not checked!
func (stats oopkQueryStats) GetValue(row, col int) interface{} {
rowValue := stats.stageStats[row]
switch col {
case 0:
return rowValue.name
case 1:
return rowValue.avg
case 2:
return rowValue.max
case 3:
return rowValue.min
case 4:
return rowValue.count
case 5:
return rowValue.total
case 6:
return fmt.Sprintf("%.2f%%", rowValue.percentage*100)
}
return nil
}
// ColumnHeaders implements the utils.TableDataSource for stats.
func (stats oopkQueryStats) ColumnHeaders() []string {
return []string{"stage", "avg", "max", "minCallName", "count", "total", "percentage"}
}
// reportTimingForCurrentBatch will first wait for current cuda stream if the debug mode is set and change the timing stat accordingly.
// It will add to the total timing as well. Therefore this function should only be called one time for each stage.
func (qc *AQLQueryContext) reportTimingForCurrentBatch(stream unsafe.Pointer, start *time.Time, name stageName) {
if qc.Debug {
cgoutils.WaitForCudaStream(stream, qc.Device)
now := utils.Now()
value := now.Sub(*start).Seconds() * 1000
qc.OOPK.currentBatch.stats.timings[name] = value
qc.OOPK.currentBatch.stats.totalTiming += value
*start = now
}
}
// reportTiming is similar to reportTimingForCurrentBatch except that it modifies the query stats for the
// whole query. It's usually should be called once for each stage
func (qc *AQLQueryContext) reportTiming(stream unsafe.Pointer, start *time.Time, name stageName) {
if qc.Debug {
if stream != nil {
cgoutils.WaitForCudaStream(stream, qc.Device)
}
now := utils.Now()
value := now.Sub(*start).Seconds() * 1000
queryStats := &qc.OOPK.LiveBatchStats
queryStats.applyStageStats(name, value)
*start = now
}
}
// applyStageStats applies the stage stats to the overall query stats and compute max,minCallName and total for that
// stage.
func (stats *oopkQueryStats) applyStageStats(name stageName, value float64) {
if _, ok := stats.Name2Stage[name]; !ok {
stats.Name2Stage[name] = &oopkStageSummaryStats{name: name, max: -1, min: math.MaxFloat64}
}
stageStats := stats.Name2Stage[name]
stageStats.max = math.Max(stageStats.max, value)
stageStats.min = math.Min(stageStats.max, value)
stageStats.count++
stageStats.total += value
stats.TotalTiming += value
}
// applyBatchStats applies the current batch stats onto the overall query stats. It computes information
// like max, minCallName, average for each stage as well as the percentage.
func (stats *oopkQueryStats) applyBatchStats(batchStats oopkBatchStats) {
for name, value := range batchStats.timings {
stats.applyStageStats(name, value)
}
stats.NumBatches++
stats.NumRecords += batchStats.batchSize
stats.BytesTransferred += batchStats.bytesTransferred
stats.NumTransferCalls += batchStats.numTransferCalls
}
// writeToLog writes the summary stats for this query in a tabular format to logger.
func (stats *oopkQueryStats) writeToLog() {
if stats.NumBatches+stats.NumBatchSkipped > 0 {
// Compute average and percentage.
stats.stageStats = make([]*oopkStageSummaryStats, 0, len(stats.Name2Stage))
for _, oopkStageStats := range stats.Name2Stage {
oopkStageStats.avg = oopkStageStats.total / float64(stats.NumBatches)
oopkStageStats.percentage = oopkStageStats.total / stats.TotalTiming
stats.stageStats = append(stats.stageStats, oopkStageStats)
}
sort.Sort(sort.Reverse(stageSummaryStatsSlice(stats.stageStats)))
utils.GetQueryLogger().Infof("Total timing: %f", stats.TotalTiming)
utils.GetQueryLogger().Infof("Num batches: %d", stats.NumBatches)
utils.GetQueryLogger().Infof("Num batches skipped: %d", stats.NumBatchSkipped)
// Create tabular output.
summary := utils.WriteTable(stats)
utils.GetQueryLogger().Info("\n" + summary)
}
}
// reportBatch will report OOPK batch related stats to the query logger.
func (qc *AQLQueryContext) reportBatch(isArchiveBatch bool) {
if qc.Debug {
batchType := "live batch"
if isArchiveBatch {
batchType = "archive batch"
}
stats := qc.OOPK.currentBatch.stats
utils.GetQueryLogger().
With(
"timings", stats.timings,
"total", stats.totalTiming,
"batchID", stats.batchID,
"batchSize", stats.batchSize,
"batchType", batchType,
).Infof("Query stats")
if isArchiveBatch {
qc.OOPK.ArchiveBatchStats.applyBatchStats(stats)
} else {
qc.OOPK.LiveBatchStats.applyBatchStats(stats)
}
}
}