query/aql_context.go (215 lines of code) (raw):
// Copyright (c) 2017-2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package query
// #include "time_series_aggregate.h"
import "C"
import (
"bytes"
"github.com/uber/aresdb/cgoutils"
memCom "github.com/uber/aresdb/memstore/common"
queryCom "github.com/uber/aresdb/query/common"
"github.com/uber/aresdb/query/expr"
"github.com/uber/aresdb/utils"
"net/http"
"strings"
"time"
"unsafe"
"github.com/uber/aresdb/query/context"
)
type boundaryType int
const (
noBoundary boundaryType = iota
inclusiveBoundary
exclusiveBoundary
)
// columnUsage is a bitmap that tracks how a column is used and whether the
// column should be pushed to device memory for different types of batches.
type columnUsage int
const (
columnUsedByAllBatches columnUsage = 1 << iota
columnUsedByLiveBatches
columnUsedByFirstArchiveBatch
columnUsedByLastArchiveBatch
columnUsedByPrefilter
columnUsedHighSentinel
)
var columnUsageNames = map[columnUsage]string{
columnUsedByAllBatches: "allBatches",
columnUsedByLiveBatches: "liveBatches",
columnUsedByFirstArchiveBatch: "firstArchiveBatch",
columnUsedByLastArchiveBatch: "lastArchiveBatch",
columnUsedByPrefilter: "prefilter",
}
func (u columnUsage) MarshalJSON() ([]byte, error) {
var usageStrings []string
for mask := columnUsedByAllBatches; mask < columnUsedHighSentinel; mask <<= 1 {
usage := u & mask
if usage != 0 {
usageStrings = append(usageStrings, columnUsageNames[usage])
}
}
buffer := bytes.NewBufferString(`"`)
buffer.WriteString(strings.Join(usageStrings, "+"))
buffer.WriteString(`"`)
return buffer.Bytes(), nil
}
// TableScanner defines how data for a table should be fed to device memory for
// processing (scanner in a traditional terminology).
type TableScanner struct {
// Snapshot of the table schema for convenience.
Schema *memCom.TableSchema `json:"-"`
// IDS of all table shards to be scanned on this instance.
Shards []int `json:"shards"`
// IDs of columns to be used in this query, in the following order:
// 1. Columns not from ArchivingSortColumns.
// 2. Columns from ArchivingSortColumns in reverse order.
Columns []int `json:"columns"`
// reversed mapping from columnID to column scan order index
ColumnsByIDs map[int]int `json:"-"`
// Map from column ID to its usage by the query.
ColumnUsages map[int]columnUsage `json:"columnUsage"`
// Fact table specifics:
// Values of equality prefilters in order. Each 4 bytes of the uint32 is used
// to store any data type other than UUID (not supported).
EqualityPrefilterValues []uint32 `json:"equalityPrefilterValues,omitempty"`
// Boundary types and values of the final range prefilter.
RangePrefilterBoundaries [2]boundaryType `json:"rangePrefilterBoundaries"`
RangePrefilterValues [2]uint32 `json:"rangePrefilterValues"`
// Range of archive batches to process: [Start, end).
// Depending on the archiving progress of each shard, live batches may be
// skipped for processing if the archiving cutoff is after the time of
// ArchiveBatchIDEnd.
ArchiveBatchIDStart int `json:"archiveBatchIDStart"`
ArchiveBatchIDEnd int `json:"archiveBatchIDEnd"`
}
// foreignTables stores foreignTables data
type foreignTable struct {
// batches[batchIndex][columnIndex]
// batchIndex = batchID - BaseBatchID
// columnIndex corresponds to columnIndex in TableScanner columns order
batches [][]deviceVectorPartySlice
numRecordsInLastBatch int
// stores the remote join column in main table
remoteJoinColumn *expr.VarRef
// primary key data at host.
hostPrimaryKeyData memCom.PrimaryKeyData
devicePrimaryKeyPtr devicePointer
}
// deviceVectorPartySlice stores pointers to data for a column in device memory.
type deviceVectorPartySlice struct {
values devicePointer
nulls devicePointer
// The length of the count vector is Length+1, similar to memstore.VectorParty
counts devicePointer
// Used only by device column. We allocate device memory deviceManagerOnce for counts, nulls
// and values vector and when free we free only the base pointer. The memory layout
// is counts,nulls,values and for counts vector, we will not copy the 64 bytes padding.
basePtr devicePointer
length int
valueType memCom.DataType
// pointer to default value from schema
defaultValue memCom.DataValue
valueStartIndex int
nullStartIndex int
countStartIndex int
// offsets is used for array type vector party, which is same as basePtr
// while point to the start of offset/length
offsets devicePointer
// valueOffsetAdjust is the value pointer adjustment for array archive VP
valueOffsetAdjust int
}
// oopkBatchContext stores context for the current batch being processed by
// one-operator-per-kernel execution. For simplicity OOPK only supports data
// width up to 32 bit.
type oopkBatchContext struct {
// For convenience purpose.
device int
// Input data according to TableScanner.Columns order.
columns []deviceVectorPartySlice
// pointer to Columns[firstColumn]'s count vector
baseCountD devicePointer
// startRow when firstColumn has no count vector
startRow int
// Index for permuting elements in raw column values. Also filter will be applied on
// index vector instead on value vectors.
indexVectorD devicePointer
// Space for storing filter values. True value means we will keep the row.
// We will reuse this space for all filter processing.
predicateVectorD devicePointer
// geo predicate vector
geoPredicateVectorD devicePointer
// foreignTableRecordIDsD holds recordIDs for related to each foreign table
// to address the recordIDVector for foreignTable with tableID x, use foreignTableRecordIDsD[x-1]
foreignTableRecordIDsD []devicePointer
// timezoneLookupD points to an array of timezone offsets in seconds indexed by timezone string enum
timezoneLookupD devicePointer
timezoneLookupDSize int
// Remaining number of inputs in indexVectorD after filtering.
// Notice that this size is not necessarily number of database rows
// when columns[0] is compressed.
size int
sizeAfterPreFilter int
// Scratch vectors for evaluating the current AST expr in device memory.
// [0] stores the values and [1] stores the validities (NULLs).
// The data width of each value is always 4 bytes.
// The data width of each validity (NULL) is always 1 byte.
// Values and validities of each stack frame are allocated together,
// with the validity array following the value array.
// The length of each vector is size (same as indexVectorD).
// Note: user should always append the new output stack frame on to the stack
// before shrinking the input stack frames
exprStackD [][2]devicePointer
// Input and output storage in device memory before and after sort-reduce-by-key.
// The capacity of the dimension and measure vector should be at least
// resultSize+size.
// First resultSize records stores results from processing prior batches.
// Followed by size records from the current batch.
// Sort and reduce by key will operate on all resultSize+size records.
//
// Because reduce_by_key outputs to separate buffers, we need to alternate two
// sets of dimension and measure buffers for input and output.
// We store the input buffer in [0], and the output buffer in [1] for the
// following dimensionVectorH and measureVectorH.
// one giant dimension vector
// that contains each dimension columnar vector
// ordered in the following order:
// 4 byte dimensions -> 2 byte dimensions -> 1 byte dimensions (including validity vector).
dimensionVectorD [2]devicePointer
// hash vector is used to store the 64bit hash value hashed from
// dimension row (combining all dimension values into one byte array)
// generated in sort, used in sort and reduce
hashVectorD [2]devicePointer
// dimIndexVectorD is different from index vector,
// it is the index of dimension vector
// its length is the resultSize from previous batches + size of current batch
dimIndexVectorD [2]devicePointer
// Each element stores a 4 byte measure value.
// Except SUM that uses 8 bytes
measureVectorD [2]devicePointer
// For aggregate queries: Size of the results from prior batches.
// For non aggregate queries: result size for current batch, after decompression
resultSize int
// Capacity of the result dimension and measure vector, should be at least
// resultSize+size.
resultCapacity int
// Query execution stats for current batch.
stats oopkBatchStats
}
// OOPKContext defines additional query context for one-operator-per-kernel
// execution.
type OOPKContext struct {
// Compiled and annotated filters.
// The filters are converted to CNF equivalent so that AND does not exist in
// any underlying expr.Expr any more.
// Filters that apply to all archive and live batches.
// MainTableCommonFilters match filters with only main table columns involved
MainTableCommonFilters []expr.Expr `json:"mainTableCommonFilters,omitempty"`
// ForeignTableCommonFilters match filters with foreign table columns involved
ForeignTableCommonFilters []expr.Expr `json:"foreignTableCommonFilters,omitempty"`
// Lower bound [0] and upper bound [1] time filter. nil if not applicable.
// [0] should be applied to the first archive batch and all live batches.
// [1] should be applied to the last archive batch and all live batches.
TimeFilters [2]expr.Expr `json:"timeFilters"`
// Prefilters that only apply to live batches.
// Archiving cutoff filtering is processed directly by the query engine and not
// included here (different shards may have different cutoffs).
Prefilters []expr.Expr `json:"prefilters,omitempty"`
// Compiled and annotated ASTs for dimensions and measure.
Dimensions []expr.Expr `json:"dimensions"`
// Index of single dimension vector in global dimension vector
// Following sorted order based on bytes
DimensionVectorIndex []int `json:"dimensionVectorIndex"`
// Number of dimensions per dim width
NumDimsPerDimWidth queryCom.DimCountsPerDimWidth `json:"numDims"`
// Dim row bytes is the sum number of bytes of all dimension values
// plus validity bytes, for memory allocation convenience
DimRowBytes int `json:"dimRowBytes"`
// For one-operator-per-kernel we only support one measure per query.
Measure expr.Expr `json:"measure"`
MeasureBytes int `json:"measureBytes"`
AggregateType C.enum_AggregateFunction `json:"aggregate"`
// Storage for current batch.
currentBatch oopkBatchContext
// foreignTables holds the batches for each foreign table
// to address each foreignTable with tableID x, use foreignTables[x-1]
// nil foreignTable means not an actual foreign table join.
foreignTables []*foreignTable
// nil means no geo intersection
geoIntersection *geoIntersection
// Result storage in host memory. The format is the same as the dimension and
// measure vector in oopkBatchContext.
dimensionVectorH unsafe.Pointer
measureVectorH unsafe.Pointer
// hllVectorD stores hll dense or sparse vector in device memory.
hllVectorD devicePointer
// size of hll vector
hllVectorSize int64
// hllDimRegIDCountD stores regID count for each dim in device memory.
hllDimRegIDCountD devicePointer
ResultSize int `json:"resultSize"`
// For reporting purpose only.
DeviceMemoryRequirement int `json:"deviceMem"`
DurationWaitedForDevice time.Duration `json:"durationWaitedForDevice"`
// Stores the overall query stats for live batches and archive batches.
LiveBatchStats oopkQueryStats `json:"liveStats"`
ArchiveBatchStats oopkQueryStats `json:"archiveStats"`
// indicate query can be return in the middle, no need to process all batches,
// this is usually for non-aggregation query with limit condition
done bool
}
// timezoneTableContext stores context for timezone column queries
type timezoneTableContext struct {
tableAlias string
tableColumn string
}
// context for processing dimensions
type resultFlushContext struct {
// caches time formatted time dimension values
dimensionValueCache []map[queryCom.TimeDimensionMeta]map[int64]string
dimensionDataTypes []memCom.DataType
reverseDicts map[int][]string
// for eager flush non-agg query result
rowsFlushed int
}
// GeoIntersection is the struct to storing geo intersection related fields.
type geoIntersection struct {
// Following fields are generated by compiler.
// Geo tableID (scanner id)
shapeTableID int
// ID of the shape column.
shapeColumnID int
// Table ID of geo point.
pointTableID int
// ID of the point column in main table.
pointColumnID int
// List of shape uuids.
shapeUUIDs []string
// check point in shape or not
inOrOut bool
// dimIndex is the geo dimension index
// if <0, meaning there is no dimension for geo
// and this query only has geo filter
dimIndex int
// Following fields are generated by processor
shapeLatLongs devicePointer
shapeIndexs devicePointer
// map from shape index to index of shapeUUID
validShapeUUIDs []string
numShapes int
totalNumPoints int
}
// AQLQueryContext stores all contextual data for handling an AQL query.
type AQLQueryContext struct {
// The query input.
Query *queryCom.AQLQuery `json:"query"`
// Context for one-operator-per-kernel execution.
OOPK OOPKContext `json:"oopk"`
//// Compiled time series aggregate query structure.
//// TODO: TSAggregate is only used for VM based query engine.
//TSAggregate C.TimeSeriesAggregate `json:"-"`
// Scanner for all tables. [0] for the main table; [1:] for tables in joins.
TableScanners []*TableScanner `json:"scanners"`
// Map from table alias to ID (index to TableScanners).
TableIDByAlias map[string]int `json:"tableIDs"`
// Map from table name to schema for convenience. In case of self join,
// only one entry is referenced here by the name of the table.
TableSchemaByName map[string]*memCom.TableSchema `json:"-"`
// Index to filters in Query.Filters that are identified as prefilters.
Prefilters []int `json:"prefilters,omitempty"`
Error error `json:"error,omitempty"`
Device int `json:"device"`
Debug bool `json:"debug,omitempty"`
Profiling string `json:"profiling,omitempty"`
// We alternate with two Cuda streams between batches for pipelining.
// [0] stores the current stream, and [1] stores the other stream.
cudaStreams [2]unsafe.Pointer
Results queryCom.AQLQueryResult `json:"-"`
resultFlushContext resultFlushContext
// whether it's a DataOnly request from broker
DataOnly bool `json:"DataOnly"`
// whether to serialize the query result as HLLData. If ReturnHLLData is true, we will not release dimension
// vector and measure vector until serialization is done.
ReturnHLLData bool `json:"ReturnHLLData"`
HLLQueryResult []byte `json:"-"`
// for time filter
fixedTimezone *time.Location
fromTime *queryCom.AlignedTime
toTime *queryCom.AlignedTime
dstswitch int64
// timezone column and time filter related
timezoneTable timezoneTableContext
// fields for non aggregate query
// Flag to indicate if this query is not aggregation query
IsNonAggregationQuery bool
numberOfRowsWritten int
maxBatchSizeAfterPrefilter int
// for eager flush query result
ResponseWriter http.ResponseWriter
// helper used to share common codes
QCHelper *context.QueryContextHelper `json:"-"`
}
// IsHLL return if the aggregation function is HLL
func (ctx *OOPKContext) IsHLL() bool {
return ctx.AggregateType == C.AGGR_HLL
}
func isAtomicAggType(aggType C.enum_AggregateFunction) bool {
return aggType == C.AGGR_SUM_SIGNED || aggType == C.AGGR_SUM_FLOAT
}
// UseHashReduction return whether to use hash reduction or not
func (ctx *OOPKContext) UseHashReduction() bool {
return utils.GetConfig().Query.EnableHashReduction &&
cgoutils.SupportHashReduction() && isAtomicAggType(ctx.AggregateType)
}
// Initialize qcHelper, TODO: move to specialized constructor
func (qc *AQLQueryContext) InitQCHelper() {
qc.QCHelper = &context.QueryContextHelper{
QCOptions: qc,
}
}
func (qc *AQLQueryContext) GetSchema(tableID int) *memCom.TableSchema {
return qc.TableScanners[tableID].Schema
}
func (qc *AQLQueryContext) GetTableID(alias string) (int, bool) {
id, exists := qc.TableIDByAlias[alias]
return id, exists
}
func (qc *AQLQueryContext) GetQuery() *queryCom.AQLQuery {
return qc.Query
}
func (qc *AQLQueryContext) SetError(err error) {
qc.Error = err
}
func (qc *AQLQueryContext) IsDataOnly() bool {
return qc.DataOnly
}