query/time_series_aggregate.go (667 lines of code) (raw):
// Copyright (c) 2017-2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package query
// #cgo LDFLAGS: -L${SRCDIR}/../lib -lalgorithm
// #include "time_series_aggregate.h"
import "C"
import (
"github.com/uber/aresdb/utils"
"strconv"
"unsafe"
"github.com/uber/aresdb/cgoutils"
"github.com/uber/aresdb/memstore"
memCom "github.com/uber/aresdb/memstore/common"
"github.com/uber/aresdb/query/common"
"github.com/uber/aresdb/query/expr"
)
// DataTypeToCDataType mapps from memstore data type to c data types
var DataTypeToCDataType = map[memCom.DataType]C.enum_DataType{
memCom.Bool: C.Bool,
memCom.Int8: C.Int8,
memCom.Uint8: C.Uint8,
memCom.Int16: C.Int16,
memCom.Uint16: C.Uint16,
memCom.Int32: C.Int32,
memCom.Int64: C.Int64,
memCom.Uint32: C.Uint32,
memCom.Float32: C.Float32,
memCom.SmallEnum: C.Uint8,
memCom.BigEnum: C.Uint16,
memCom.GeoPoint: C.GeoPoint,
memCom.UUID: C.UUID,
}
// UnaryExprTypeToCFunctorType maps from unary operator to C UnaryFunctorType
var UnaryExprTypeToCFunctorType = map[expr.Token]C.enum_UnaryFunctorType{
expr.NOT: C.Not,
expr.UNARY_MINUS: C.Negate,
expr.IS_NULL: C.IsNull,
expr.IS_NOT_NULL: C.IsNotNull,
expr.BITWISE_NOT: C.BitwiseNot,
expr.GET_WEEK_START: C.GetWeekStart,
expr.GET_MONTH_START: C.GetMonthStart,
expr.GET_QUARTER_START: C.GetQuarterStart,
expr.GET_YEAR_START: C.GetYearStart,
expr.GET_DAY_OF_MONTH: C.GetDayOfMonth,
expr.GET_DAY_OF_YEAR: C.GetDayOfYear,
expr.GET_MONTH_OF_YEAR: C.GetMonthOfYear,
expr.GET_QUARTER_OF_YEAR: C.GetQuarterOfYear,
expr.GET_HLL_VALUE: C.GetHLLValue,
expr.ARRAY_LENGTH: C.ArrayLength,
}
// BinaryExprTypeToCFunctorType maps from binary operator to C BinaryFunctorType
var BinaryExprTypeToCFunctorType = map[expr.Token]C.enum_BinaryFunctorType{
expr.AND: C.And,
expr.OR: C.Or,
expr.EQ: C.Equal,
expr.NEQ: C.NotEqual,
expr.LT: C.LessThan,
expr.LTE: C.LessThanOrEqual,
expr.GT: C.GreaterThan,
expr.GTE: C.GreaterThanOrEqual,
expr.ADD: C.Plus,
expr.SUB: C.Minus,
expr.MUL: C.Multiply,
expr.DIV: C.Divide,
expr.MOD: C.Mod,
expr.BITWISE_AND: C.BitwiseAnd,
expr.BITWISE_OR: C.BitwiseOr,
expr.BITWISE_XOR: C.BitwiseXor,
expr.FLOOR: C.Floor,
expr.CONVERT_TZ: C.Plus,
expr.ARRAY_CONTAINS: C.ArrayContains,
expr.ARRAY_ELEMENT_AT: C.ArrayElementAt,
// TODO: expr.BITWISE_LEFT_SHIFT ?
// TODO: expr.BITWISE_RIGHT_SHIFT ?
}
type rootAction func(functorType uint32, stream unsafe.Pointer, device int, inputs []C.InputVector, exp expr.Expr)
func makeForeignColumnInput(columnIndex int, recordIDs unsafe.Pointer, table foreignTable, timezoneLookup unsafe.Pointer, timezoneLookupSize int) C.InputVector {
var vector C.InputVector
var foreignColumnVector C.ForeignColumnVector
vpSlices := make([]C.VectorPartySlice, len(table.batches))
var dataType memCom.DataType
var defaultValue memCom.DataValue
for batchIndex, batch := range table.batches {
column := batch[columnIndex]
dataType = column.valueType
defaultValue = column.defaultValue
vpSlices[batchIndex] = makeVectorPartySlice(batch[columnIndex])
}
foreignColumnVector.RecordIDs = (*C.RecordID)(recordIDs)
if len(vpSlices) > 0 {
foreignColumnVector.Batches = (*C.VectorPartySlice)(unsafe.Pointer(&vpSlices[0]))
}
foreignColumnVector.BaseBatchID = (C.int32_t)(memstore.BaseBatchID)
foreignColumnVector.NumBatches = (C.int32_t)(len(table.batches))
foreignColumnVector.NumRecordsInLastBatch = (C.int32_t)(table.numRecordsInLastBatch)
foreignColumnVector.DataType = DataTypeToCDataType[dataType]
foreignColumnVector.DefaultValue = makeDefaultValue(defaultValue)
foreignColumnVector.TimezoneLookup = (*C.int16_t)(timezoneLookup)
foreignColumnVector.TimezoneLookupSize = (C.int16_t)(timezoneLookupSize)
*(*C.ForeignColumnVector)(unsafe.Pointer(&vector.Vector)) = foreignColumnVector
vector.Type = C.ForeignColumnInput
return vector
}
func makeDefaultValue(value memCom.DataValue) C.DefaultValue {
var defaultValue C.DefaultValue
defaultValue.HasDefault = (C.bool)(value.Valid)
if value.Valid {
switch value.DataType {
case memCom.Bool:
*(*C.bool)(unsafe.Pointer(&defaultValue.Value)) = (C.bool)(value.BoolVal)
case memCom.Int8:
*(*C.int32_t)(unsafe.Pointer(&defaultValue.Value)) = (C.int32_t)(*(*int8)(value.OtherVal))
case memCom.Int16:
*(*C.int32_t)(unsafe.Pointer(&defaultValue.Value)) = (C.int32_t)(*(*int16)(value.OtherVal))
case memCom.Int32:
*(*C.int32_t)(unsafe.Pointer(&defaultValue.Value)) = (C.int32_t)(*(*int32)(value.OtherVal))
case memCom.SmallEnum:
fallthrough
case memCom.Uint8:
*(*C.uint32_t)(unsafe.Pointer(&defaultValue.Value)) = (C.uint32_t)(*(*uint8)(value.OtherVal))
case memCom.BigEnum:
fallthrough
case memCom.Uint16:
*(*C.uint32_t)(unsafe.Pointer(&defaultValue.Value)) = (C.uint32_t)(*(*uint16)(value.OtherVal))
case memCom.Uint32:
*(*C.uint32_t)(unsafe.Pointer(&defaultValue.Value)) = (C.uint32_t)(*(*uint32)(value.OtherVal))
case memCom.Float32:
*(*C.float)(unsafe.Pointer(&defaultValue.Value)) = (C.float)(*(*float32)(value.OtherVal))
case memCom.Int64:
*(*C.int64_t)(unsafe.Pointer(&defaultValue.Value)) = (C.int64_t)(*(*int64)(value.OtherVal))
case memCom.GeoPoint:
*(*C.GeoPointT)(unsafe.Pointer(&defaultValue.Value)) = *(*C.GeoPointT)(value.OtherVal)
case memCom.UUID:
*(*C.UUIDT)(unsafe.Pointer(&defaultValue.Value)) = *(*C.UUIDT)(value.OtherVal)
default:
// Otherwise it's the default value type we don't support yet, setting it to null to be safe.
defaultValue.HasDefault = false
}
}
return defaultValue
}
func makeVectorPartySlice(column deviceVectorPartySlice) C.VectorPartySlice {
var vpSlice C.VectorPartySlice
var basePtr unsafe.Pointer
var startingIndex int
var nullsOffset uint32
var valuesOffset uint32
if !column.counts.isNull() {
basePtr = utils.MemAccess(column.counts.getPointer(), column.countStartIndex*4)
}
if !column.nulls.isNull() {
startingIndex = column.nullStartIndex % 8
nulls := utils.MemAccess(column.nulls.getPointer(), column.nullStartIndex/8)
if basePtr == nil {
basePtr = nulls
} else {
nullsOffset = uint32(utils.MemDist(nulls, basePtr))
}
}
if !column.values.isNull() {
values := utils.MemAccess(column.values.getPointer(),
column.valueStartIndex*memCom.DataTypeBits(column.valueType)/8)
if basePtr == nil {
basePtr = values
} else {
valuesOffset = uint32(utils.MemDist(values, basePtr))
}
}
vpSlice.BasePtr = (*C.uint8_t)(basePtr)
vpSlice.NullsOffset = (C.uint32_t)(nullsOffset)
vpSlice.ValuesOffset = (C.uint32_t)(valuesOffset)
vpSlice.StartingIndex = (C.uint8_t)(startingIndex)
vpSlice.Length = (C.uint32_t)(column.length)
vpSlice.DataType = DataTypeToCDataType[column.valueType]
vpSlice.DefaultValue = makeDefaultValue(column.defaultValue)
return vpSlice
}
func makeArrayVectorPartySlice(column deviceVectorPartySlice) C.ArrayVectorPartySlice {
var vpSlice C.ArrayVectorPartySlice
vpSlice.OffsetLengthVector = (*C.uint8_t)(column.basePtr.getPointer())
vpSlice.ValueOffsetAdj = (C.uint32_t)(column.valueOffsetAdjust)
vpSlice.Length = (C.uint32_t)(column.length)
vpSlice.DataType = DataTypeToCDataType[memCom.GetElementDataType(column.valueType)]
return vpSlice
}
func makeVectorPartySliceInput(column deviceVectorPartySlice) C.InputVector {
if memCom.IsArrayType(column.valueType) {
return makeArrayVectorPartySliceInput(column)
}
var vector C.InputVector
*(*C.VectorPartySlice)(unsafe.Pointer(&vector.Vector)) = makeVectorPartySlice(column)
vector.Type = C.VectorPartyInput
return vector
}
func makeArrayVectorPartySliceInput(column deviceVectorPartySlice) C.InputVector {
var vector C.InputVector
*(*C.ArrayVectorPartySlice)(unsafe.Pointer(&vector.Vector)) = makeArrayVectorPartySlice(column)
vector.Type = C.ArrayVectorPartyInput
return vector
}
func makeConstantInput(val interface{}, isValid bool) C.InputVector {
var constVector C.ConstantVector
constVector.IsValid = C.bool(isValid)
switch val.(type) {
case float64, float32:
floatVal := val.(float64)
*(*C.float)(unsafe.Pointer(&constVector.Value)) = C.float(floatVal)
constVector.DataType = C.ConstFloat
case *expr.GeopointLiteral:
geopoint := val.(*expr.GeopointLiteral).Val
*(*C.GeoPointT)(unsafe.Pointer(&constVector.Value)) = *(*C.GeoPointT)(unsafe.Pointer(&geopoint[0]))
constVector.DataType = C.ConstGeoPoint
case *expr.UUIDLiteral:
uuidVal := val.(*expr.UUIDLiteral).Val
*(*C.UUIDT)(unsafe.Pointer(&constVector.Value)) = *(*C.UUIDT)(unsafe.Pointer(&uuidVal[0]))
constVector.DataType = C.ConstUUID
case *expr.NumberLiteral:
t := val.(*expr.NumberLiteral)
if t.Type() == expr.Float {
*(*C.float)(unsafe.Pointer(&constVector.Value)) = C.float(t.Val)
constVector.DataType = C.ConstFloat
} else {
*(*C.int32_t)(unsafe.Pointer(&constVector.Value)) = C.int32_t(t.Int)
constVector.DataType = C.ConstInt
}
default:
intVal := val.(int)
*(*C.int32_t)(unsafe.Pointer(&constVector.Value)) = C.int32_t(intVal)
constVector.DataType = C.ConstInt
}
var vector C.InputVector
*(*C.ConstantVector)(unsafe.Pointer(&vector.Vector)) = constVector
vector.Type = C.ConstantInput
return vector
}
func makeScratchSpaceInput(values unsafe.Pointer, nulls unsafe.Pointer, dataType C.enum_DataType) C.InputVector {
var scratchSpaceVector C.ScratchSpaceVector
scratchSpaceVector.Values = (*C.uint8_t)(values)
scratchSpaceVector.NullsOffset = (C.uint32_t)(utils.MemDist(nulls, values))
scratchSpaceVector.DataType = dataType
var vector C.InputVector
*(*C.ScratchSpaceVector)(unsafe.Pointer(&vector.Vector)) = scratchSpaceVector
vector.Type = C.ScratchSpaceInput
return vector
}
func makeMeasureVectorOutput(measureVector unsafe.Pointer, outputDataType C.enum_DataType, aggFunc C.enum_AggregateFunction) C.OutputVector {
var measureOutputVector C.MeasureOutputVector
measureOutputVector.Values = (*C.uint32_t)(measureVector)
measureOutputVector.DataType = outputDataType
measureOutputVector.AggFunc = aggFunc
var vector C.OutputVector
*(*C.MeasureOutputVector)(unsafe.Pointer(&vector.Vector)) = measureOutputVector
vector.Type = C.MeasureOutput
return vector
}
func makeDimensionVectorOutput(dimensionVector unsafe.Pointer, valueOffset, nullOffset int, dataType C.enum_DataType) C.OutputVector {
var dimensionOutputVector C.DimensionOutputVector
dimensionOutputVector.DimValues = (*C.uint8_t)(utils.MemAccess(dimensionVector, valueOffset))
dimensionOutputVector.DimNulls = (*C.uint8_t)(utils.MemAccess(dimensionVector, nullOffset))
dimensionOutputVector.DataType = dataType
var vector C.OutputVector
*(*C.DimensionOutputVector)(unsafe.Pointer(&vector.Vector)) = dimensionOutputVector
vector.Type = C.DimensionOutput
return vector
}
func makeScratchSpaceOutput(values unsafe.Pointer, nulls unsafe.Pointer, dataType C.enum_DataType) C.OutputVector {
var scratchSpaceVector C.ScratchSpaceVector
scratchSpaceVector.Values = (*C.uint8_t)(values)
scratchSpaceVector.NullsOffset = (C.uint32_t)(utils.MemDist(nulls, values))
scratchSpaceVector.DataType = dataType
var vector C.OutputVector
*(*C.ScratchSpaceVector)(unsafe.Pointer(&vector.Vector)) = scratchSpaceVector
vector.Type = C.ScratchSpaceOutput
return vector
}
func makeDimensionVector(valueVector, hashVector, indexVector unsafe.Pointer, numDims common.DimCountsPerDimWidth, vectorCapacity int) C.DimensionVector {
var dimensionVector C.DimensionVector
dimensionVector.DimValues = (*C.uint8_t)(valueVector)
dimensionVector.HashValues = (*C.uint64_t)(hashVector)
dimensionVector.IndexVector = (*C.uint32_t)(indexVector)
dimensionVector.VectorCapacity = (C.int)(vectorCapacity)
for i := 0; i < len(numDims); i++ {
dimensionVector.NumDimsPerDimWidth[i] = (C.uint8_t)(numDims[i])
}
return dimensionVector
}
func getOutputDataType(exprType expr.Type, outputWidthInByte int) C.enum_DataType {
if exprType == expr.UUID {
return C.UUID
} else if exprType == expr.GeoPoint {
return C.GeoPoint
}
if outputWidthInByte == 4 {
switch exprType {
case expr.Float:
return C.Float32
case expr.Unsigned:
return C.Uint32
default:
return C.Int32
}
} else {
switch exprType {
case expr.Float:
return C.Float64
// For reducing the measure output iterator cardinality.
case expr.Unsigned:
return C.Int64
default:
return C.Int64
}
}
}
func initIndexVector(vector unsafe.Pointer, start, size int, stream unsafe.Pointer, device int) {
C.InitIndexVector((*C.uint32_t)(vector), (C.uint32_t)(start), (C.int)(size), stream, (C.int)(device))
}
func (bc *oopkBatchContext) filterAction(functorType uint32, stream unsafe.Pointer, device int, inputs []C.InputVector, exp expr.Expr) {
numForeignTables := len(bc.foreignTableRecordIDsD)
// If current batch size is already 0, short circuit to avoid issuing a noop cuda call.
if bc.size <= 0 {
return
}
foreignTableRecordIDs := unsafe.Pointer(nil)
if numForeignTables > 0 {
foreignTableRecordIDs = unsafe.Pointer(&bc.foreignTableRecordIDsD[0].pointer)
}
if len(inputs) == 1 {
bc.size = int(doCGoCall(func() C.CGoCallResHandle {
return C.UnaryFilter(inputs[0], (*C.uint32_t)(bc.indexVectorD.getPointer()),
(*C.uint8_t)(bc.predicateVectorD.getPointer()),
(C.int)(bc.size), (**C.RecordID)(foreignTableRecordIDs),
(C.int)(numForeignTables),
(*C.uint32_t)(bc.baseCountD.getPointer()), (C.uint32_t)(bc.startRow), functorType, stream, C.int(device))
}))
} else if len(inputs) == 2 {
bc.size = int(doCGoCall(func() C.CGoCallResHandle {
return C.BinaryFilter(inputs[0], inputs[1], (*C.uint32_t)(bc.indexVectorD.getPointer()),
(*C.uint8_t)(bc.predicateVectorD.getPointer()),
(C.int)(bc.size), (**C.RecordID)(foreignTableRecordIDs), (C.int)(numForeignTables),
(*C.uint32_t)(bc.baseCountD.getPointer()), (C.uint32_t)(bc.startRow), functorType, stream, C.int(device))
}))
}
}
func (bc *oopkBatchContext) makeWriteToMeasureVectorAction(aggFunc C.enum_AggregateFunction, outputWidthInByte int) rootAction {
return func(functorType uint32, stream unsafe.Pointer, device int, inputs []C.InputVector, exp expr.Expr) {
// If current batch size is already 0, short circuit to avoid issuing a noop cuda call.
if bc.size <= 0 {
return
}
measureVector := utils.MemAccess(bc.measureVectorD[0].getPointer(), bc.resultSize*outputWidthInByte)
// write measure out to measureVectorD[1] for hll query
if aggFunc == C.AGGR_HLL {
measureVector = bc.measureVectorD[1].getPointer()
}
outputVector := makeMeasureVectorOutput(measureVector, getOutputDataType(exp.Type(), outputWidthInByte), aggFunc)
if len(inputs) == 1 {
doCGoCall(func() C.CGoCallResHandle {
return C.UnaryTransform(inputs[0], outputVector, (*C.uint32_t)(bc.indexVectorD.getPointer()),
(C.int)(bc.size), (*C.uint32_t)(bc.baseCountD.getPointer()), (C.uint32_t)(bc.startRow), functorType, stream, C.int(device))
})
} else if len(inputs) == 2 {
doCGoCall(func() C.CGoCallResHandle {
return C.BinaryTransform(inputs[0], inputs[1], outputVector,
(*C.uint32_t)(bc.indexVectorD.getPointer()), (C.int)(bc.size), (*C.uint32_t)(bc.baseCountD.getPointer()), (C.uint32_t)(bc.startRow),
functorType, stream, C.int(device))
})
}
}
}
func (bc *oopkBatchContext) makeWriteToDimensionVectorAction(valueOffset, nullOffset, prevResultSize int) rootAction {
return func(functorType uint32, stream unsafe.Pointer, device int, inputs []C.InputVector, exp expr.Expr) {
// If current batch size is already 0, short circuit to avoid issuing a noop cuda call.
if bc.size <= 0 {
return
}
dataType := common.GetDimensionDataType(exp)
dataBytes := common.GetDimensionDataBytes(exp)
outputVector := makeDimensionVectorOutput(
bc.dimensionVectorD[0].getPointer(),
// move dimensionVectorD to the start position of current batch
// dimension vector start position + bc.resultSize * dataBytes
// null vector start position + bc.resultSize
valueOffset+dataBytes*prevResultSize,
nullOffset+prevResultSize,
DataTypeToCDataType[dataType])
if len(inputs) == 1 {
doCGoCall(func() C.CGoCallResHandle {
return C.UnaryTransform(inputs[0], outputVector, (*C.uint32_t)(bc.indexVectorD.getPointer()),
(C.int)(bc.size), (*C.uint32_t)(bc.baseCountD.getPointer()), (C.uint32_t)(bc.startRow), functorType,
stream, C.int(device))
})
} else if len(inputs) == 2 {
doCGoCall(func() C.CGoCallResHandle {
return C.BinaryTransform(inputs[0], inputs[1], outputVector,
(*C.uint32_t)(bc.indexVectorD.getPointer()), (C.int)(bc.size),
(*C.uint32_t)(bc.baseCountD.getPointer()), (C.uint32_t)(bc.startRow),
functorType, stream, C.int(device))
})
}
}
}
func makeCuckooHashIndex(primaryKeyData memCom.PrimaryKeyData, deviceData unsafe.Pointer) C.CuckooHashIndex {
var cuckooHashIndex C.CuckooHashIndex
cuckooHashIndex.buckets = (*C.uint8_t)(deviceData)
for index, seed := range primaryKeyData.Seeds {
cuckooHashIndex.seeds[index] = (C.uint32_t)(seed)
}
cuckooHashIndex.keyBytes = (C.int)(primaryKeyData.KeyBytes)
cuckooHashIndex.numHashes = (C.int)(len(primaryKeyData.Seeds))
cuckooHashIndex.numBuckets = (C.int)(primaryKeyData.NumBuckets)
return cuckooHashIndex
}
func (bc *oopkBatchContext) prepareForeignRecordIDs(mainTableJoinColumnIndex int, joinTableID int, table foreignTable,
stream unsafe.Pointer, device int) {
// If current batch size is already 0, short circuit to avoid issuing a noop cuda call.
if bc.size <= 0 {
return
}
column := bc.columns[mainTableJoinColumnIndex]
inputVector := makeVectorPartySliceInput(column)
hashIndex := makeCuckooHashIndex(table.hostPrimaryKeyData, table.devicePrimaryKeyPtr.getPointer())
doCGoCall(func() C.CGoCallResHandle {
return C.HashLookup(
inputVector, (*C.RecordID)(bc.foreignTableRecordIDsD[joinTableID].getPointer()),
(*C.uint32_t)(bc.indexVectorD.getPointer()), (C.int)(bc.size), (*C.uint32_t)(bc.baseCountD.getPointer()),
(C.uint32_t)(bc.startRow), hashIndex, stream, C.int(device))
})
}
// processExpression does AST tree dfs traversal and apply root action on the root level,
// rootAction includes filterAction, writeToDimensionVectorAction and makeWriteToMeasureVectorAction
func (bc *oopkBatchContext) processExpression(exp, parentExp expr.Expr, tableScanners []*TableScanner, foreignTables []*foreignTable,
stream unsafe.Pointer, device int, action rootAction) C.InputVector {
switch e := exp.(type) {
case *expr.ParenExpr:
return bc.processExpression(e.Expr, e, tableScanners, foreignTables, stream, device, action)
case *expr.VarRef:
columnIndex := tableScanners[e.TableID].ColumnsByIDs[e.ColumnID]
var inputVector C.InputVector
// main table
if e.TableID == 0 {
column := bc.columns[columnIndex]
inputVector = makeVectorPartySliceInput(column)
} else {
var timezoneLookup unsafe.Pointer
var timezoneLookupDSize int
switch pe := parentExp.(type) {
case *expr.BinaryExpr:
if pe.Op == expr.CONVERT_TZ {
timezoneLookup = bc.timezoneLookupD.getPointer()
timezoneLookupDSize = bc.timezoneLookupDSize
}
default:
}
inputVector = makeForeignColumnInput(columnIndex, bc.foreignTableRecordIDsD[e.TableID-1].getPointer(), *foreignTables[e.TableID-1], timezoneLookup, timezoneLookupDSize)
}
if action != nil {
action(C.Noop, stream, device, []C.InputVector{inputVector}, e)
return C.InputVector{}
}
return inputVector
case *expr.NumberLiteral, *expr.GeopointLiteral, *expr.UUIDLiteral:
var inputVector C.InputVector
inputVector = makeConstantInput(e, true)
if action != nil {
action(C.Noop, stream, device, []C.InputVector{inputVector}, e)
return C.InputVector{}
}
return inputVector
case *expr.UnaryExpr:
inputVector := bc.processExpression(e.Expr, e, tableScanners, foreignTables, stream, device, nil)
functorType, exist := UnaryExprTypeToCFunctorType[e.Op]
if !exist {
functorType = C.Noop
}
if action != nil {
action(functorType, stream, device, []C.InputVector{inputVector}, e)
return C.InputVector{}
}
dataType := getOutputDataType(e.Type(), 4)
values, nulls := bc.allocateStackFrame(dataType)
var outputVector = makeScratchSpaceOutput(values.getPointer(), nulls.getPointer(), dataType)
doCGoCall(func() C.CGoCallResHandle {
return C.UnaryTransform(inputVector, outputVector, (*C.uint32_t)(bc.indexVectorD.getPointer()),
(C.int)(bc.size), (*C.uint32_t)(bc.baseCountD.getPointer()), (C.uint32_t)(bc.startRow), functorType, stream, C.int(device))
})
if inputVector.Type == C.ScratchSpaceInput {
bc.shrinkStackFrame()
}
return makeScratchSpaceInput(values.getPointer(), nulls.getPointer(), dataType)
case *expr.BinaryExpr:
lhsInputVector := bc.processExpression(e.LHS, e, tableScanners, foreignTables, stream, device, nil)
rhsInputVector := bc.processExpression(e.RHS, e, tableScanners, foreignTables, stream, device, nil)
functorType, exist := BinaryExprTypeToCFunctorType[e.Op]
if !exist {
return makeConstantInput(0.0, false)
}
if action != nil {
action(functorType, stream, device, []C.InputVector{lhsInputVector, rhsInputVector}, e)
return C.InputVector{}
}
outputDataType := getOutputDataType(e.Type(), 4)
values, nulls := bc.allocateStackFrame(outputDataType)
var outputVector = makeScratchSpaceOutput(values.getPointer(), nulls.getPointer(), outputDataType)
doCGoCall(func() C.CGoCallResHandle {
return C.BinaryTransform(lhsInputVector, rhsInputVector, outputVector,
(*C.uint32_t)(bc.indexVectorD.getPointer()), (C.int)(bc.size), (*C.uint32_t)(bc.baseCountD.getPointer()),
(C.uint32_t)(bc.startRow),
functorType, stream, C.int(device))
})
if rhsInputVector.Type == C.ScratchSpaceInput {
bc.shrinkStackFrame()
}
if lhsInputVector.Type == C.ScratchSpaceInput {
bc.shrinkStackFrame()
}
return makeScratchSpaceInput(values.getPointer(), nulls.getPointer(), outputDataType)
default:
return C.InputVector{}
}
}
func makeGeoShapeBatch(shapesLatLongs devicePointer, numShapes, totalNumPoints int) C.GeoShapeBatch {
var geoShapes C.GeoShapeBatch
geoShapes.LatLongs = (*C.uint8_t)(shapesLatLongs.getPointer())
totalWords := (numShapes + 31) / 32
geoShapes.TotalNumPoints = (C.int32_t)(totalNumPoints)
geoShapes.TotalWords = (C.uint8_t)(totalWords)
return geoShapes
}
func (bc *oopkBatchContext) makeGeoPointInputVector(pointTableID int, pointColumnIndex int, foreignTables []*foreignTable) C.InputVector {
if pointTableID == 0 {
return makeVectorPartySliceInput(bc.columns[pointColumnIndex])
}
return makeForeignColumnInput(pointColumnIndex,
bc.foreignTableRecordIDsD[pointTableID-1].getPointer(),
*foreignTables[pointTableID-1], nil, 0)
}
func (bc *oopkBatchContext) writeGeoShapeDim(geo *geoIntersection,
outputPredicate devicePointer, dimValueOffset, dimNullOffset int, sizeBeforeGeoFilter, prevResultSize int, stream unsafe.Pointer, device int) {
if bc.size <= 0 || geo.shapeLatLongs.isNull() {
return
}
// geo dimension always take 1 byte and has type uint8
// compiler should have checked the number of geo shapes for join is less than 256
var dimensionOutputVector C.DimensionOutputVector
dimensionVector := bc.dimensionVectorD[0].getPointer()
// move dimensionVectorD to the start position of current batch
// dimension vector start position + prevResultSize * dataBytes
// null vector start position + prevResultSize
dimensionOutputVector.DimValues = (*C.uint8_t)(utils.MemAccess(dimensionVector, dimValueOffset+prevResultSize))
dimensionOutputVector.DimNulls = (*C.uint8_t)(utils.MemAccess(dimensionVector, dimNullOffset+prevResultSize))
dimensionOutputVector.DataType = C.Uint8
totalWords := (geo.numShapes + 31) / 32
doCGoCall(func() C.CGoCallResHandle {
return C.WriteGeoShapeDim((C.int)(totalWords), dimensionOutputVector, (C.int)(sizeBeforeGeoFilter),
(*C.uint32_t)(outputPredicate.getPointer()), stream, (C.int)(device))
})
}
func (bc *oopkBatchContext) geoIntersect(geo *geoIntersection, pointColumnIndex int,
foreignTables []*foreignTable,
outputPredicte devicePointer, stream unsafe.Pointer, device int) {
// If current batch size is already 0, short circuit to avoid issuing a noop cuda call.
if bc.size <= 0 || geo.shapeLatLongs.isNull() {
return
}
numForeignTables := len(bc.foreignTableRecordIDsD)
foreignTableRecordIDs := unsafe.Pointer(nil)
if numForeignTables > 0 {
foreignTableRecordIDs = unsafe.Pointer(&bc.foreignTableRecordIDsD[0].pointer)
}
geoShapes := makeGeoShapeBatch(geo.shapeLatLongs, geo.numShapes, geo.totalNumPoints)
points := bc.makeGeoPointInputVector(geo.pointTableID, pointColumnIndex, foreignTables)
bc.size = int(doCGoCall(func() C.CGoCallResHandle {
return C.GeoBatchIntersects(
geoShapes, points, (*C.uint32_t)(bc.indexVectorD.getPointer()),
(C.int)(bc.size), (C.uint32_t)(bc.startRow), (**C.RecordID)(foreignTableRecordIDs),
(C.int)(numForeignTables), (*C.uint32_t)(outputPredicte.getPointer()),
(C.bool)(geo.inOrOut), stream, (C.int)(device))
}))
}
func (bc *oopkBatchContext) hll(numDims common.DimCountsPerDimWidth, isLastBatch bool, stream unsafe.Pointer, device int) (
hllVector, dimRegCount devicePointer, hllVectorSize int64) {
prevDimOut := makeDimensionVector(bc.dimensionVectorD[0].getPointer(), bc.hashVectorD[0].getPointer(),
bc.dimIndexVectorD[0].getPointer(), numDims, bc.resultCapacity)
curDimOut := makeDimensionVector(bc.dimensionVectorD[1].getPointer(), bc.hashVectorD[1].getPointer(),
bc.dimIndexVectorD[1].getPointer(), numDims, bc.resultCapacity)
prevValuesOut, curValuesOut := (*C.uint32_t)(bc.measureVectorD[0].getPointer()), (*C.uint32_t)(bc.measureVectorD[1].getPointer())
bc.resultSize = int(doCGoCall(func() C.CGoCallResHandle {
return C.HyperLogLog(prevDimOut, curDimOut,
prevValuesOut, curValuesOut,
(C.int)(bc.resultSize), (C.int)(bc.size), (C.bool)(isLastBatch),
(**C.uint8_t)(unsafe.Pointer(&hllVector.pointer)), (*C.size_t)(unsafe.Pointer(&hllVectorSize)),
(**C.uint16_t)(unsafe.Pointer(&dimRegCount.pointer)), stream, (C.int)(device))
}))
// TODO: we also need a way to report this allocation in C++ code. Maybe can be done via calling a golang function from c++
hllVector.device = device
hllVector.allocated = true
dimRegCount.device = device
dimRegCount.allocated = true
return
}
func (bc *oopkBatchContext) sortByKey(numDims common.DimCountsPerDimWidth, stream unsafe.Pointer, device int) {
keys := makeDimensionVector(bc.dimensionVectorD[0].getPointer(), bc.hashVectorD[0].getPointer(),
bc.dimIndexVectorD[0].getPointer(), numDims, bc.resultCapacity)
doCGoCall(func() C.CGoCallResHandle {
// sort the previous result with current batch together
return C.Sort(keys, (C.int)(bc.resultSize+bc.size), stream, C.int(device))
})
}
func (bc *oopkBatchContext) reduceByKey(numDims common.DimCountsPerDimWidth, valueWidth int, aggFunc C.enum_AggregateFunction, stream unsafe.Pointer,
device int) {
inputKeys := makeDimensionVector(
bc.dimensionVectorD[0].getPointer(), bc.hashVectorD[0].getPointer(), bc.dimIndexVectorD[0].getPointer(), numDims, bc.resultCapacity)
outputKeys := makeDimensionVector(
bc.dimensionVectorD[1].getPointer(), bc.hashVectorD[1].getPointer(), bc.dimIndexVectorD[1].getPointer(), numDims, bc.resultCapacity)
inputValues, outputValues := (*C.uint8_t)(bc.measureVectorD[0].getPointer()), (*C.uint8_t)(bc.measureVectorD[1].getPointer())
bc.resultSize = int(doCGoCall(func() C.CGoCallResHandle {
return C.Reduce(inputKeys, inputValues, outputKeys, outputValues, (C.int)(valueWidth), (C.int)(bc.resultSize+bc.size), aggFunc,
stream, C.int(device))
}))
}
func (bc *oopkBatchContext) hashReduce(numDims common.DimCountsPerDimWidth, valueWidth int, aggFunc C.enum_AggregateFunction, stream unsafe.Pointer,
device int) {
inputKeys := makeDimensionVector(
bc.dimensionVectorD[0].getPointer(), bc.hashVectorD[0].getPointer(), bc.dimIndexVectorD[0].getPointer(), numDims, bc.resultCapacity)
outputKeys := makeDimensionVector(
bc.dimensionVectorD[1].getPointer(), bc.hashVectorD[1].getPointer(), bc.dimIndexVectorD[1].getPointer(), numDims, bc.resultCapacity)
inputValues, outputValues := (*C.uint8_t)(bc.measureVectorD[0].getPointer()), (*C.uint8_t)(bc.measureVectorD[1].getPointer())
bc.resultSize = int(doCGoCall(func() C.CGoCallResHandle {
return C.HashReduce(inputKeys, inputValues, outputKeys, outputValues, (C.int)(valueWidth), (C.int)(bc.resultSize+bc.size), aggFunc,
stream, C.int(device))
}))
}
func (bc *oopkBatchContext) expand(numDims common.DimCountsPerDimWidth, stream unsafe.Pointer, device int) {
inputKeys := makeDimensionVector(
bc.dimensionVectorD[0].getPointer(), bc.hashVectorD[0].getPointer(), bc.dimIndexVectorD[0].getPointer(), numDims, bc.resultCapacity)
outputKeys := makeDimensionVector(
bc.dimensionVectorD[1].getPointer(), bc.hashVectorD[1].getPointer(), bc.dimIndexVectorD[1].getPointer(), numDims, bc.resultCapacity)
bc.resultSize = int(doCGoCall(func() C.CGoCallResHandle {
return C.Expand(inputKeys, outputKeys, (*C.uint32_t)(bc.baseCountD.getPointer()), (*C.uint32_t)(bc.indexVectorD.getPointer()),
C.int(bc.size), 0, stream, C.int(device))
}))
bc.dimensionVectorD[0], bc.dimensionVectorD[1] = bc.dimensionVectorD[1], bc.dimensionVectorD[0]
}
func (bc *oopkBatchContext) getDataTypeLength(dataType C.enum_DataType) int {
switch dataType {
case C.Int64, C.Uint64, C.Float64:
return 8
case C.GeoPoint:
return 8
case C.UUID:
return 16
default:
return 4
}
}
// allocate new stack frame and append to the end of the stack
func (bc *oopkBatchContext) allocateStackFrame(dataType C.enum_DataType) (values, nulls devicePointer) {
dataTypeLen := bc.getDataTypeLength(dataType)
// width bytes * bc.size (value buffer) + 1 byte * bc.size (null buffer)
valuesPointer := deviceAllocate((dataTypeLen+1)*bc.size, bc.device)
nullsPointer := valuesPointer.offset(dataTypeLen * bc.size)
// append output buffer to the end
bc.exprStackD = append(bc.exprStackD, [2]devicePointer{valuesPointer, nullsPointer})
return valuesPointer, nullsPointer
}
// shrink stack by one, but keep top element as is
func (bc *oopkBatchContext) shrinkStackFrame() {
var stackFrame [2]devicePointer
// swap last two elements
bc.exprStackD[len(bc.exprStackD)-1], bc.exprStackD[len(bc.exprStackD)-2] = bc.exprStackD[len(bc.exprStackD)-2], bc.exprStackD[len(bc.exprStackD)-1]
// pop last element
stackFrame, bc.exprStackD = bc.exprStackD[len(bc.exprStackD)-1], bc.exprStackD[:len(bc.exprStackD)-1]
deviceFreeAndSetNil(&stackFrame[0])
}
func (qc *AQLQueryContext) createCutoffTimeFilter(cutoff uint32) expr.Expr {
column := &expr.VarRef{
Val: qc.Query.TimeFilter.Column,
ExprType: expr.Unsigned,
TableID: 0,
// time column is always 0
ColumnID: 0,
}
return &expr.BinaryExpr{
ExprType: expr.Boolean,
Op: expr.GTE,
LHS: column,
RHS: &expr.NumberLiteral{
Int: int(cutoff),
Expr: strconv.FormatInt(int64(cutoff), 10),
ExprType: expr.Unsigned,
},
}
}
// doCGoCall does the cgo call by converting CGoCallResHandle to C.int and *C.char and calls doCGoCall.
// The reason to have this wrapper is because CGo types are bound to package name, thereby even C.int are different types
// under different packages.
func doCGoCall(f func() C.CGoCallResHandle) uintptr {
return cgoutils.DoCGoCall(func() (uintptr, unsafe.Pointer) {
ret := f()
return uintptr(ret.res), unsafe.Pointer(ret.pStrErr)
})
}
// bootstrapDevice is the go wrapper of BootstrapDevice. It will panic and crash the server if any exceptions are thrown
// in this function.
func bootstrapDevice() {
doCGoCall(func() C.CGoCallResHandle {
return C.BootstrapDevice()
})
}