query/aql_batchexecutor.go (188 lines of code) (raw):
// Copyright (c) 2017-2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package query
import (
"fmt"
"github.com/uber/aresdb/cgoutils"
queryCom "github.com/uber/aresdb/query/common"
"time"
"unsafe"
)
// BatchExecutor is batch executor interface for both Non-aggregation query and Aggregation query
type BatchExecutor interface {
// filter operation
filter()
// join operation
join()
// project of measure/select columns
project()
// reduce to sort and aggregate result
reduce()
// prepare work before execution
preExec(lastBatch bool, start time.Time)
// post execution after execution
postExec(start time.Time)
}
// DummyBatchExecutorImpl is a dummy executor which do nothing
type DummyBatchExecutorImpl struct {
}
// NewDummyBatchExecutor create a dummy BatchExecutor
func NewDummyBatchExecutor() BatchExecutor {
return &DummyBatchExecutorImpl{}
}
func (e *DummyBatchExecutorImpl) filter() {
}
func (e *DummyBatchExecutorImpl) join() {
}
func (e *DummyBatchExecutorImpl) project() {
}
func (e *DummyBatchExecutorImpl) reduce() {
}
func (e *DummyBatchExecutorImpl) preExec(lastBatch bool, start time.Time) {
}
func (e *DummyBatchExecutorImpl) postExec(start time.Time) {
}
// BatchExecutorImpl is batch executor implementation for original aggregation query
type BatchExecutorImpl struct {
qc *AQLQueryContext
batchID int32
isLastBatch bool
customFilterFunc customFilterExecutor
stream unsafe.Pointer
start time.Time
sizeBeforeGeoFilter int
}
// NewBatchExecutor is to create a BatchExecutor.
func NewBatchExecutor(qc *AQLQueryContext, batchID int32, customFilterFunc customFilterExecutor, stream unsafe.Pointer, start time.Time) BatchExecutor {
if qc.IsNonAggregationQuery {
return &NonAggrBatchExecutorImpl{
BatchExecutorImpl: &BatchExecutorImpl{
qc: qc,
batchID: batchID,
customFilterFunc: customFilterFunc,
stream: stream,
start: start,
},
}
}
return &BatchExecutorImpl{
qc: qc,
batchID: batchID,
customFilterFunc: customFilterFunc,
stream: stream,
start: start,
}
}
// filter
func (e *BatchExecutorImpl) filter() {
// process main table common filter
e.qc.doProfile(func() {
for _, filter := range e.qc.OOPK.MainTableCommonFilters {
e.qc.OOPK.currentBatch.processExpression(filter, nil,
e.qc.TableScanners, e.qc.OOPK.foreignTables, e.stream, e.qc.Device, e.qc.OOPK.currentBatch.filterAction)
}
e.customFilterFunc(e.stream)
e.qc.reportTimingForCurrentBatch(e.stream, &e.start, filterEvalTiming)
}, "filters", e.stream)
}
// join
func (e *BatchExecutorImpl) join() {
e.qc.doProfile(func() {
// join foreign tables
for joinTableID, foreignTable := range e.qc.OOPK.foreignTables {
if foreignTable != nil {
// prepare foreign table recordIDs
// Note:
// RecordID {
// int32_t batchID
// uint32_t index
// }
// takes up 8 bytes
e.qc.OOPK.currentBatch.foreignTableRecordIDsD = append(e.qc.OOPK.currentBatch.foreignTableRecordIDsD, deviceAllocate(8*e.qc.OOPK.currentBatch.size, e.qc.Device))
mainTableJoinColumnIndex := e.qc.TableScanners[0].ColumnsByIDs[foreignTable.remoteJoinColumn.ColumnID]
// perform hash lookup
e.qc.OOPK.currentBatch.prepareForeignRecordIDs(mainTableJoinColumnIndex, joinTableID, *foreignTable, e.stream, e.qc.Device)
}
}
e.qc.reportTimingForCurrentBatch(e.stream, &e.start, prepareForeignRecordIDsTiming)
}, "joins", e.stream)
e.qc.doProfile(func() {
// process filters that involves foreign table columns if any
for _, filter := range e.qc.OOPK.ForeignTableCommonFilters {
e.qc.OOPK.currentBatch.processExpression(filter, nil,
e.qc.TableScanners, e.qc.OOPK.foreignTables, e.stream, e.qc.Device, e.qc.OOPK.currentBatch.filterAction)
}
e.qc.reportTimingForCurrentBatch(e.stream, &e.start, foreignTableFilterEvalTiming)
}, "filters", e.stream)
if e.qc.OOPK.geoIntersection != nil {
// allocate two predicate vector for geo intersect
numWords := (e.qc.OOPK.geoIntersection.numShapes + 31) / 32
e.qc.OOPK.currentBatch.geoPredicateVectorD = deviceAllocate(e.qc.OOPK.currentBatch.size*4*numWords, e.qc.Device)
}
e.sizeBeforeGeoFilter = e.qc.OOPK.currentBatch.size
e.qc.doProfile(func() {
if e.qc.OOPK.geoIntersection != nil {
pointColumnIndex := e.qc.TableScanners[e.qc.OOPK.geoIntersection.pointTableID].
ColumnsByIDs[e.qc.OOPK.geoIntersection.pointColumnID]
e.qc.OOPK.currentBatch.geoIntersect(
e.qc.OOPK.geoIntersection,
pointColumnIndex,
e.qc.OOPK.foreignTables,
e.qc.OOPK.currentBatch.geoPredicateVectorD,
e.stream, e.qc.Device)
}
e.qc.reportTimingForCurrentBatch(e.stream, &e.start, geoIntersectEvalTiming)
}, "geo_intersect", e.stream)
}
// evalMeasures is to fill measure values
func (e *BatchExecutorImpl) evalMeasures() {
// measure evaluation.
e.qc.doProfile(func() {
measureExprRootAction := e.qc.OOPK.currentBatch.makeWriteToMeasureVectorAction(e.qc.OOPK.AggregateType, e.qc.OOPK.MeasureBytes)
e.qc.OOPK.currentBatch.processExpression(e.qc.OOPK.Measure, nil, e.qc.TableScanners, e.qc.OOPK.foreignTables, e.stream, e.qc.Device, measureExprRootAction)
e.qc.reportTimingForCurrentBatch(e.stream, &e.start, measureEvalTiming)
}, "measure", e.stream)
}
// evalDimensions is to fill dimension values
func (e *BatchExecutorImpl) evalDimensions(prevResultSize int) {
// dimension expression evaluation.
for dimIndex, dimension := range e.qc.OOPK.Dimensions {
e.qc.doProfile(func() {
dimVectorIndex := e.qc.OOPK.DimensionVectorIndex[dimIndex]
dimValueOffset, dimNullOffset := queryCom.GetDimensionStartOffsets(e.qc.OOPK.NumDimsPerDimWidth, dimVectorIndex, e.qc.OOPK.currentBatch.resultCapacity)
if e.qc.OOPK.geoIntersection != nil && e.qc.OOPK.geoIntersection.dimIndex == dimIndex {
e.qc.OOPK.currentBatch.writeGeoShapeDim(
e.qc.OOPK.geoIntersection, e.qc.OOPK.currentBatch.geoPredicateVectorD,
dimValueOffset, dimNullOffset, e.sizeBeforeGeoFilter, prevResultSize, e.stream, e.qc.Device)
} else {
dimensionExprRootAction := e.qc.OOPK.currentBatch.makeWriteToDimensionVectorAction(dimValueOffset, dimNullOffset, prevResultSize)
e.qc.OOPK.currentBatch.processExpression(dimension, nil,
e.qc.TableScanners, e.qc.OOPK.foreignTables, e.stream, e.qc.Device, dimensionExprRootAction)
}
}, fmt.Sprintf("dim%d", dimIndex), e.stream)
}
e.qc.reportTimingForCurrentBatch(e.stream, &e.start, dimEvalTiming)
}
// project is to generate dimension and measure values
func (e *BatchExecutorImpl) project() {
// Prepare for dimension and measure evaluation.
e.qc.OOPK.currentBatch.prepareForDimAndMeasureEval(
e.qc.OOPK.DimRowBytes, e.qc.OOPK.MeasureBytes, e.qc.OOPK.NumDimsPerDimWidth, e.qc.OOPK.IsHLL(),
e.qc.OOPK.UseHashReduction(), e.stream)
e.qc.reportTimingForCurrentBatch(e.stream, &e.start, prepareForDimAndMeasureTiming)
e.evalDimensions(e.qc.OOPK.currentBatch.resultSize)
e.evalMeasures()
// wait for stream to clean up non used buffer before final aggregation
cgoutils.WaitForCudaStream(e.stream, e.qc.Device)
e.qc.OOPK.currentBatch.cleanupBeforeAggregation()
}
// reduce is to aggregate measures based on dimensions and aggregation function
func (e *BatchExecutorImpl) reduce() {
// init dimIndexVectorD for sorting and reducing
if e.qc.OOPK.IsHLL() {
initIndexVector(e.qc.OOPK.currentBatch.dimIndexVectorD[0].getPointer(), 0, e.qc.OOPK.currentBatch.resultSize, e.stream, e.qc.Device)
initIndexVector(e.qc.OOPK.currentBatch.dimIndexVectorD[1].getPointer(), e.qc.OOPK.currentBatch.resultSize, e.qc.OOPK.currentBatch.resultSize+e.qc.OOPK.currentBatch.size, e.stream, e.qc.Device)
} else if !e.qc.OOPK.UseHashReduction() {
initIndexVector(e.qc.OOPK.currentBatch.dimIndexVectorD[0].getPointer(), 0, e.qc.OOPK.currentBatch.resultSize+e.qc.OOPK.currentBatch.size, e.stream, e.qc.Device)
}
if e.qc.OOPK.IsHLL() {
e.qc.doProfile(func() {
e.qc.OOPK.hllVectorD, e.qc.OOPK.hllDimRegIDCountD, e.qc.OOPK.hllVectorSize =
e.qc.OOPK.currentBatch.hll(e.qc.OOPK.NumDimsPerDimWidth, e.isLastBatch, e.stream, e.qc.Device)
e.qc.reportTimingForCurrentBatch(e.stream, &e.start, hllEvalTiming)
}, "hll", e.stream)
} else if e.qc.OOPK.UseHashReduction() {
e.qc.doProfile(func() {
e.qc.OOPK.currentBatch.hashReduce(
e.qc.OOPK.NumDimsPerDimWidth, e.qc.OOPK.MeasureBytes, e.qc.OOPK.AggregateType, e.stream, e.qc.Device)
e.qc.reportTimingForCurrentBatch(e.stream, &e.start, hashReduceEvalTiming)
}, "hash_reduce", e.stream)
} else {
// sort by key.
e.qc.doProfile(func() {
e.qc.OOPK.currentBatch.sortByKey(e.qc.OOPK.NumDimsPerDimWidth, e.stream, e.qc.Device)
e.qc.reportTimingForCurrentBatch(e.stream, &e.start, sortEvalTiming)
}, "sort", e.stream)
// reduce by key.
e.qc.doProfile(func() {
e.qc.OOPK.currentBatch.reduceByKey(e.qc.OOPK.NumDimsPerDimWidth, e.qc.OOPK.MeasureBytes, e.qc.OOPK.AggregateType, e.stream, e.qc.Device)
e.qc.reportTimingForCurrentBatch(e.stream, &e.start, reduceEvalTiming)
}, "reduce", e.stream)
}
cgoutils.WaitForCudaStream(e.stream, e.qc.Device)
}
func (e *BatchExecutorImpl) preExec(isLastBatch bool, start time.Time) {
e.isLastBatch = isLastBatch
// initialize index vector.
if !e.qc.OOPK.currentBatch.indexVectorD.isNull() {
initIndexVector(e.qc.OOPK.currentBatch.indexVectorD.getPointer(), 0, e.qc.OOPK.currentBatch.size, e.stream, e.qc.Device)
}
e.qc.reportTimingForCurrentBatch(e.stream, &start, initIndexVectorTiming)
}
func (e *BatchExecutorImpl) postExec(start time.Time) {
// swap result buffer before next batch
e.qc.OOPK.currentBatch.swapResultBufferForNextBatch()
e.qc.reportTimingForCurrentBatch(e.stream, &start, cleanupTiming)
e.qc.reportBatch(e.batchID > 0)
// Only profile one batch.
e.qc.Profiling = ""
}