query/aql_compiler.go (1,122 lines of code) (raw):
// Copyright (c) 2017-2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package query
// #include "time_series_aggregate.h"
import "C"
import (
"github.com/uber/aresdb/cluster/topology"
"sort"
"strings"
"unsafe"
"fmt"
memCom "github.com/uber/aresdb/memstore/common"
metaCom "github.com/uber/aresdb/metastore/common"
"github.com/uber/aresdb/query/common"
"github.com/uber/aresdb/query/expr"
"github.com/uber/aresdb/utils"
)
const (
unsupportedInputType = "unsupported input type for %s: %s"
defaultTimezoneTableAlias = "__timezone_lookup"
geoShapeLimit = 100
nonAggregationQueryLimit = 1000
)
// Compile compiles AQLQueryContext for data feeding and query
// execution. Caller should check for AQLQueryContext.Error.
func (qc *AQLQueryContext) Compile(tableSchemaReader memCom.TableSchemaReader, shardOwner topology.ShardOwner) {
// processTimezone might append additional joins
qc.processTimezone()
if qc.Error != nil {
return
}
// Read schema for every table used.
qc.readSchema(tableSchemaReader, shardOwner)
defer qc.releaseSchema()
if qc.Error != nil {
return
}
// Parse all other SQL expressions to ASTs.
qc.parseExprs()
if qc.Error != nil {
return
}
// Resolve data types in the ASTs against schema, also translate enum values.
qc.resolveTypes()
if qc.Error != nil {
return
}
// Process join conditions first to collect information about geo join.
qc.processJoinConditions()
if qc.Error != nil {
return
}
// Identify prefilters.
qc.matchPrefilters()
// Process filters.
qc.processFilters()
if qc.Error != nil {
return
}
// Process measure and dimensions.
qc.processMeasure()
if qc.Error != nil {
return
}
qc.processDimensions()
if qc.Error != nil {
return
}
qc.sortUsedColumns()
qc.sortDimensionColumns()
if qc.Error != nil {
return
}
// TODO: VM instruction generation
}
// adjustFilterToTimeFilter try to find one rowfilter to be time filter if there is no timefilter for fact table query
func (qc *AQLQueryContext) adjustFilterToTimeFilter() {
toBeRemovedFilters := []int{}
timeFilter := common.TimeFilter{}
for i, filter := range qc.Query.FiltersParsed {
if e, ok := filter.(*expr.BinaryExpr); ok {
lhs, isCol := e.LHS.(*expr.VarRef)
if !isCol {
continue
}
// check if this filter on main table event time column
tableID, columnID, err := qc.QCHelper.ResolveColumn(lhs.Val)
if err != nil || tableID != 0 || columnID != 0 {
continue
}
val := ""
// only support number literal or string literal
switch rhs := e.RHS.(type) {
case *expr.NumberLiteral:
val = rhs.String()
case *expr.StringLiteral:
val = rhs.Val
}
if val == "" {
continue
}
switch e.Op {
case expr.LT:
if timeFilter.To == "" {
// only convert first LT
timeFilter.To = val
toBeRemovedFilters = append(toBeRemovedFilters, i)
} else {
qc.Error = utils.StackError(nil, "Only one '<' filter allowed for event time column")
return
}
case expr.GTE:
if timeFilter.From == "" {
// only convert first GTE
timeFilter.From = val
toBeRemovedFilters = append(toBeRemovedFilters, i)
} else {
qc.Error = utils.StackError(nil, "Only one '>=' filter allowed for event time column")
return
}
}
}
}
if timeFilter.From != "" || timeFilter.To != "" {
// processTimeFilter will handle the from is nil case
if qc.fromTime, qc.toTime, qc.Error = common.ParseTimeFilter(timeFilter, qc.fixedTimezone, utils.Now()); qc.Error != nil {
return
}
// remove from original query filter
for i := len(toBeRemovedFilters) - 1; i >= 0; i-- {
index := toBeRemovedFilters[i]
qc.Query.FiltersParsed = append(qc.Query.FiltersParsed[:index], qc.Query.FiltersParsed[index+1:]...)
}
}
}
func (qc *AQLQueryContext) processJoinConditions() {
if len(qc.Query.Joins) > 8 {
qc.Error = utils.StackError(nil, "At most %d foreign tables allowed, got: %d", 8, len(qc.Query.Joins))
return
}
qc.OOPK.foreignTables = make([]*foreignTable, len(qc.Query.Joins))
mainTableSchema := qc.TableSchemaByName[qc.Query.Table]
for joinTableID, join := range qc.Query.Joins {
joinSchema := qc.TableSchemaByName[join.Table]
if isGeoJoin(join) {
if qc.OOPK.geoIntersection != nil {
qc.Error = utils.StackError(nil, "At most one geo join allowed")
return
}
qc.matchGeoJoin(joinTableID, mainTableSchema, joinSchema, join.ConditionsParsed)
if qc.Error != nil {
return
}
} else {
// we will extract the geo join out of the join conditions since we are going to handle geo intersects
// as filter instead of an equal join.
qc.OOPK.foreignTables[joinTableID] = &foreignTable{}
qc.matchEqualJoin(joinTableID, joinSchema, join.ConditionsParsed)
if qc.Error != nil {
return
}
}
}
}
// matchGeoJoin initializes the GeoIntersection struct for later query process use. For now only one geo join is
// allowed per query. If users want to intersect with multiple geo join conditions, they should specify multiple geo
// shapeLatLongs in the geo filter.
// There are following constrictions:
// 1. At most one geo join condition.
// 2. Geo table must be dimension table.
// 3. The join condition must include exactly one shape column and one point column.
// 4. Exactly one geo filter should be specified.
// 5. Geo filter column must be the primary key of the geo table.
// 6. Geo UUIDs must be string in query.
// 7. Geo filter operator must be EQ or IN
// 8. Geo table's fields are not allowed in measures.
// 9. Only one geo dimension allowed.
func (qc *AQLQueryContext) matchGeoJoin(joinTableID int, mainTableSchema *memCom.TableSchema,
joinSchema *memCom.TableSchema, conditions []expr.Expr) {
if len(conditions) != 1 {
qc.Error = utils.StackError(nil, "At most one join condition allowed per geo join")
return
}
if joinSchema.Schema.IsFactTable {
qc.Error = utils.StackError(nil, "Only dimension table is allowed in geo join")
return
}
// one foreign table primary key columns only.
if len(joinSchema.Schema.PrimaryKeyColumns) > 1 {
qc.Error = utils.StackError(nil, "Composite primary key for geo table is not allowed")
return
}
c, _ := conditions[0].(*expr.Call)
// guaranteed by query rewrite.
shape, _ := c.Args[0].(*expr.VarRef)
point, _ := c.Args[1].(*expr.VarRef)
if shape.TableID != joinTableID+1 {
qc.Error = utils.StackError(nil, "Only shape in geo table can be referenced as join condition")
return
}
qc.OOPK.geoIntersection = &geoIntersection{
shapeTableID: shape.TableID,
shapeColumnID: shape.ColumnID,
pointTableID: point.TableID,
pointColumnID: point.ColumnID,
dimIndex: -1,
inOrOut: true,
}
// Set column usage for geo points.
expr.Walk(columnUsageCollector{
tableScanners: qc.TableScanners,
usages: columnUsedByAllBatches,
}, point)
}
func isGeoJoin(j common.Join) bool {
if len(j.ConditionsParsed) >= 1 {
c, ok := j.ConditionsParsed[0].(*expr.Call)
if !ok {
return false
}
return c.Name == expr.GeographyIntersectsCallName
}
return false
}
// list of join conditions enforced for now
// 1. equi-join only
// 2. many-to-one join only
// 3. foreign table must be a dimension table
// 4. one foreign table primary key columns only
// 5. foreign table primary key can have only one column
// 6. every foreign table must be joined directly to the main table, i.e. no bridges?
// 7. up to 8 foreign tables
func (qc *AQLQueryContext) matchEqualJoin(joinTableID int, joinSchema *memCom.TableSchema, conditions []expr.Expr) {
if len(conditions) != 1 {
qc.Error = utils.StackError(nil, "%d join conditions expected, got %d", 1, len(conditions))
return
}
// foreign table must be a dimension table
if joinSchema.Schema.IsFactTable {
qc.Error = utils.StackError(nil, "join table %s is fact table, only dimension table supported", qc.Query.Table)
return
}
// one foreign table primary key columns only
if len(joinSchema.Schema.PrimaryKeyColumns) > 1 {
qc.Error = utils.StackError(nil, "composite key not supported")
return
}
// equi-join only
e, ok := conditions[0].(*expr.BinaryExpr)
if !ok {
qc.Error = utils.StackError(nil, "binary expression expected, got %s", conditions[0].String())
return
}
if e.Op != expr.EQ {
qc.Error = utils.StackError(nil, "equal join expected, got %s", e.Op.String())
return
}
left, ok := e.LHS.(*expr.VarRef)
if !ok {
qc.Error = utils.StackError(nil, "column in join condition expected, got %s", e.LHS.String())
return
}
right, ok := e.RHS.(*expr.VarRef)
if !ok {
qc.Error = utils.StackError(nil, "column in join condition expected, got %s", e.RHS.String())
return
}
// main table at left and foreign table at right
if left.TableID != 0 {
left, right = right, left
}
// every foreign table must be joined directly to the main table
if left.TableID != 0 || right.TableID != joinTableID+1 {
qc.Error = utils.StackError(nil, "foreign table must be joined directly to the main table, join condition: %s", e.String())
return
}
// many-to-one join only (join with foreign table's primary key)
if joinSchema.Schema.PrimaryKeyColumns[0] != right.ColumnID {
qc.Error = utils.StackError(nil, "join column is not primary key of foreign table")
return
}
qc.OOPK.foreignTables[joinTableID].remoteJoinColumn = left
// set column usage for join column in main table
// no need to set usage for remote join column in foreign table since
// we only use primary key of foreign table to join
expr.Walk(columnUsageCollector{
tableScanners: qc.TableScanners,
usages: columnUsedByAllBatches,
}, left)
}
func (qc *AQLQueryContext) parseExprs() {
var err error
// Join conditions.
for i, join := range qc.Query.Joins {
join.ConditionsParsed = make([]expr.Expr, len(join.Conditions))
for j, cond := range join.Conditions {
join.ConditionsParsed[j], err = expr.ParseExpr(cond)
if err != nil {
qc.Error = utils.StackError(err, "Failed to parse join condition: %s", cond)
return
}
}
qc.Query.Joins[i] = join
}
qc.fromTime, qc.toTime, qc.Error = common.ParseTimeFilter(qc.Query.TimeFilter, qc.fixedTimezone, utils.Now())
if qc.Error != nil {
return
}
// Filters.
qc.Query.FiltersParsed = make([]expr.Expr, len(qc.Query.Filters))
for i, filter := range qc.Query.Filters {
qc.Query.FiltersParsed[i], err = expr.ParseExpr(filter)
if err != nil {
qc.Error = utils.StackError(err, "Failed to parse filter %s", filter)
return
}
}
if qc.fromTime == nil && qc.toTime == nil && len(qc.TableScanners) > 0 && qc.TableScanners[0].Schema.Schema.IsFactTable {
qc.adjustFilterToTimeFilter()
if qc.Error != nil {
return
}
}
// Dimensions.
rawDimensions := qc.Query.Dimensions
qc.Query.Dimensions = []common.Dimension{}
for _, dim := range rawDimensions {
dim.TimeBucketizer = strings.Trim(dim.TimeBucketizer, " ")
if dim.TimeBucketizer != "" {
// make sure time column is defined
if dim.Expr == "" {
qc.Error = utils.StackError(err, "Failed to parse TimeSeriesBucketizer '%s' since time column is empty ", dim.TimeBucketizer)
return
}
timeColumnExpr, err := expr.ParseExpr(dim.Expr)
if err != nil {
qc.Error = utils.StackError(err, "Failed to parse timeColumn '%s'", dim.Expr)
return
}
dim.ExprParsed, err = qc.buildTimeDimensionExpr(dim.TimeBucketizer, timeColumnExpr)
if err != nil {
qc.Error = utils.StackError(err, "Failed to parse dimension: %s", dim.TimeBucketizer)
return
}
qc.Query.Dimensions = append(qc.Query.Dimensions, dim)
} else {
// dimension is defined as sqlExpression
dim.ExprParsed, err = expr.ParseExpr(dim.Expr)
if err != nil {
qc.Error = utils.StackError(err, "Failed to parse dimension: %s", dim.Expr)
return
}
if _, ok := dim.ExprParsed.(*expr.Wildcard); ok {
qc.Query.Dimensions = append(qc.Query.Dimensions, qc.getAllColumnsDimension()...)
} else {
qc.Query.Dimensions = append(qc.Query.Dimensions, dim)
}
}
}
// Measures.
for i, measure := range qc.Query.Measures {
measure.ExprParsed, err = expr.ParseExpr(measure.Expr)
if err != nil {
qc.Error = utils.StackError(err, "Failed to parse measure: %s", measure.Expr)
return
}
measure.FiltersParsed = make([]expr.Expr, len(measure.Filters))
for j, filter := range measure.Filters {
measure.FiltersParsed[j], err = expr.ParseExpr(filter)
if err != nil {
qc.Error = utils.StackError(err, "Failed to parse measure filter %s", filter)
return
}
}
qc.Query.Measures[i] = measure
}
}
func (qc *AQLQueryContext) processTimezone() {
if timezoneColumn, joinKey, success := parseTimezoneColumnString(qc.Query.Timezone); success {
timezoneTable := utils.GetConfig().Query.TimezoneTable.TableName
qc.timezoneTable.tableColumn = timezoneColumn
for _, join := range qc.Query.Joins {
if join.Table == timezoneTable {
qc.timezoneTable.tableAlias = join.Alias
}
}
// append timezone table to joins
if qc.timezoneTable.tableAlias == "" {
qc.timezoneTable.tableAlias = defaultTimezoneTableAlias
qc.Query.Joins = append(qc.Query.Joins, common.Join{
Table: timezoneTable,
Alias: defaultTimezoneTableAlias,
Conditions: []string{fmt.Sprintf("%s=%s.id", joinKey, defaultTimezoneTableAlias)},
})
}
} else {
loc, err := common.ParseTimezone(qc.Query.Timezone)
if err != nil {
qc.Error = utils.StackError(err, "timezone Failed to parse: %s", qc.Query.Timezone)
return
}
qc.fixedTimezone = loc
}
}
func (qc *AQLQueryContext) readSchema(tableSchemaReader memCom.TableSchemaReader, shardOwner topology.ShardOwner) {
qc.TableScanners = make([]*TableScanner, 1+len(qc.Query.Joins))
qc.TableIDByAlias = make(map[string]int)
qc.TableSchemaByName = make(map[string]*memCom.TableSchema)
tableSchemaReader.RLock()
defer tableSchemaReader.RUnlock()
// Main table.
schema := tableSchemaReader.GetSchemas()[qc.Query.Table]
if schema == nil {
qc.Error = utils.StackError(nil, "unknown main table %s", qc.Query.Table)
return
}
qc.TableSchemaByName[qc.Query.Table] = schema
schema.RLock()
qc.TableScanners[0] = &TableScanner{}
qc.TableScanners[0].Schema = schema
// use user query specified shards
// or all shards it owns when user did not specify
if len(qc.Query.Shards) == 0 {
qc.TableScanners[0].Shards = shardOwner.GetOwnedShards()
} else {
qc.TableScanners[0].Shards = qc.Query.Shards
}
qc.TableScanners[0].ColumnUsages = make(map[int]columnUsage)
if schema.Schema.IsFactTable {
// Archiving cutoff filter usage for fact table.
qc.TableScanners[0].ColumnUsages[0] = columnUsedByLiveBatches
}
qc.TableIDByAlias[qc.Query.Table] = 0
// Foreign tables.
for i, join := range qc.Query.Joins {
schema = tableSchemaReader.GetSchemas()[join.Table]
if schema == nil {
qc.Error = utils.StackError(nil, "unknown join table %s", join.Table)
return
}
if qc.TableSchemaByName[join.Table] == nil {
qc.TableSchemaByName[join.Table] = schema
// Prevent double locking.
schema.RLock()
}
qc.TableScanners[1+i] = &TableScanner{}
qc.TableScanners[1+i].Schema = schema
qc.TableScanners[1+i].ColumnUsages = make(map[int]columnUsage)
if schema.Schema.IsFactTable {
// we will only support fact to fact join within same shard
qc.TableScanners[1+i].Shards = qc.TableScanners[0].Shards
// Archiving cutoff filter usage for fact table.
qc.TableScanners[1+i].ColumnUsages[0] = columnUsedByLiveBatches
} else {
// for fact to dimension table join
// we can assume shard zero for dimension table
// since dimension table is not sharded
qc.TableScanners[1+i].Shards = []int{0}
}
alias := join.Alias
if alias == "" {
alias = join.Table
}
_, exists := qc.TableIDByAlias[alias]
if exists {
qc.Error = utils.StackError(nil, "table alias %s is redefined", alias)
return
}
qc.TableIDByAlias[alias] = 1 + i
}
}
func (qc *AQLQueryContext) releaseSchema() {
for _, schema := range qc.TableSchemaByName {
schema.RUnlock()
}
}
// Rewrite walks the expresison AST and resolves data types bottom up.
// In addition it also translates enum strings and rewrites their predicates.
func (qc *AQLQueryContext) Rewrite(expression expr.Expr) expr.Expr {
return qc.QCHelper.Rewrite(expression)
}
// resolveTypes walks all expresison ASTs and resolves data types bottom up.
// In addition it also translates enum strings and rewrites their predicates.
func (qc *AQLQueryContext) resolveTypes() {
// Join conditions.
for i, join := range qc.Query.Joins {
for j, cond := range join.ConditionsParsed {
join.ConditionsParsed[j] = expr.Rewrite(qc, cond)
if qc.Error != nil {
return
}
}
qc.Query.Joins[i] = join
}
// Dimensions.
for i, dim := range qc.Query.Dimensions {
dim.ExprParsed = expr.Rewrite(qc, dim.ExprParsed)
if qc.Error != nil {
return
}
qc.Query.Dimensions[i] = dim
}
// Measures.
for i, measure := range qc.Query.Measures {
measure.ExprParsed = expr.Rewrite(qc, measure.ExprParsed)
if qc.Error != nil {
return
}
for j, filter := range measure.FiltersParsed {
measure.FiltersParsed[j] = expr.Rewrite(qc, filter)
if qc.Error != nil {
return
}
}
measure.FiltersParsed = qc.QCHelper.NormalizeAndFilters(measure.FiltersParsed)
qc.Query.Measures[i] = measure
}
// Filters.
for i, filter := range qc.Query.FiltersParsed {
qc.Query.FiltersParsed[i] = expr.Rewrite(qc, filter)
if qc.Error != nil {
return
}
}
qc.Query.FiltersParsed = qc.QCHelper.NormalizeAndFilters(qc.Query.FiltersParsed)
}
// extractFitler processes the specified query level filter and matches it
// against the following formats:
// column = value
// column > value
// column >= value
// column < value
// column <= value
// column
// not column
// It returns the numeric constant value associated with the filter in a uint32
// space (for all types including float32).
// In addition it also returns the boundaryType for >, >=, <, <= operators.
// Note that since the candidate filters have already been preselected against
// some criterias, this function does not perform full format validation.
func (qc *AQLQueryContext) extractFilter(filterID int) (
value uint32, boundary boundaryType, success bool) {
switch f := qc.Query.FiltersParsed[filterID].(type) {
case *expr.VarRef:
// Match `column` format
value = 1
success = true
case *expr.UnaryExpr:
// Match `not column` format
success = true
case *expr.BinaryExpr:
// Match `column op value` format
rhs, _ := f.RHS.(*expr.NumberLiteral)
if rhs == nil {
return
}
switch rhs.ExprType {
case expr.Float:
*(*float32)(unsafe.Pointer(&value)) = float32(rhs.Val)
case expr.Signed:
*(*int32)(unsafe.Pointer(&value)) = int32(rhs.Int)
case expr.Unsigned:
value = uint32(rhs.Int)
default:
return
}
switch f.Op {
case expr.GTE, expr.LTE:
boundary = inclusiveBoundary
case expr.GT, expr.LT:
boundary = exclusiveBoundary
}
success = true
}
return
}
// matchPrefilters identifies all prefilters from query level filters,
// stores them in AQLQueryContext.Prefilters,
// and stores their values in TableScanner for future prefilter vector slicing.
func (qc *AQLQueryContext) matchPrefilters() {
// Format of candidateFilters:
// [tableID]map[columnID]{filterIDs for lower bound, upper bound, equality}
// tableID is query scoped, while columnID is schema scoped.
candidateFilters := make([]map[int][3]int, len(qc.TableScanners))
for tableID := range qc.TableScanners {
candidateFilters[tableID] = make(map[int][3]int)
}
// Index candidate filters by table/column
for filterID, filter := range qc.Query.FiltersParsed {
f, _ := filter.(*expr.BinaryExpr)
if f == nil {
switch f := filter.(type) {
case *expr.VarRef:
// Match `column` format
if f.ExprType == expr.Boolean {
candidateFilters[f.TableID][f.ColumnID] = [3]int{-1, -1, filterID}
}
case *expr.UnaryExpr:
// Match `not column` format
if f.Op == expr.NOT {
f, _ := f.Expr.(*expr.VarRef)
if f != nil && f.ExprType == expr.Boolean {
candidateFilters[f.TableID][f.ColumnID] = [3]int{-1, -1, filterID}
}
}
// TODO: IS_NULL can be matched as an equality filter.
// TODO: IS_NOT_NULL can be matched as the final range filter.
}
continue
}
// Match `column op value` format, where op can be =, <, <=, >, >=.
if f.Op < expr.EQ || f.Op > expr.GTE {
continue
}
lhs, _ := f.LHS.(*expr.VarRef)
if lhs == nil {
continue
}
columnToFilterMap := candidateFilters[lhs.TableID]
filters, exists := columnToFilterMap[lhs.ColumnID]
if !exists {
filters = [3]int{-1, -1, -1}
}
switch f.Op {
case expr.GT, expr.GTE:
filters[0] = filterID
case expr.LT, expr.LTE:
filters[1] = filterID
case expr.EQ:
filters[2] = filterID
}
columnToFilterMap[lhs.ColumnID] = filters
}
// Prefilter matching
for tableID, scanner := range qc.TableScanners {
// Match in archiving sort column order
for _, columnID := range scanner.Schema.Schema.ArchivingSortColumns {
filterIndex, exists := candidateFilters[tableID][columnID]
if !exists {
// Stop on first missing column
break
}
// Equality
if filterIndex[2] >= 0 {
value, _, success := qc.extractFilter(filterIndex[2])
if !success {
// Stop if the value fails to be extracted
break
}
scanner.EqualityPrefilterValues = append(
scanner.EqualityPrefilterValues, value)
qc.Prefilters = append(qc.Prefilters, filterIndex[2])
scanner.ColumnUsages[columnID] |= columnUsedByPrefilter
// Continue matching the next column
continue
}
// Lower bound
if filterIndex[0] >= 0 {
value, boundaryType, success := qc.extractFilter(filterIndex[0])
if success {
scanner.RangePrefilterValues[0] = value
scanner.RangePrefilterBoundaries[0] = boundaryType
qc.Prefilters = append(qc.Prefilters, filterIndex[0])
scanner.ColumnUsages[columnID] |= columnUsedByPrefilter
}
}
// Upper bound
if filterIndex[1] >= 0 {
value, boundaryType, success := qc.extractFilter(filterIndex[1])
if success {
scanner.RangePrefilterValues[1] = value
scanner.RangePrefilterBoundaries[1] = boundaryType
qc.Prefilters = append(qc.Prefilters, filterIndex[1])
scanner.ColumnUsages[columnID] |= columnUsedByPrefilter
}
}
// Stop after the first range filter
break
}
}
sort.Ints(qc.Prefilters)
}
// columnUsageCollector is the visitor used to traverses an AST, finds VarRef columns
// and sets the usage bits in tableScanners. The VarRef nodes must have already
// been resolved and annotated with TableID and ColumnID.
type columnUsageCollector struct {
tableScanners []*TableScanner
usages columnUsage
}
func (c columnUsageCollector) Visit(expression expr.Expr) expr.Visitor {
switch e := expression.(type) {
case *expr.VarRef:
c.tableScanners[e.TableID].ColumnUsages[e.ColumnID] |= c.usages
}
return c
}
// foreignTableColumnDetector detects foreign table columns involved in AST
type foreignTableColumnDetector struct {
hasForeignTableColumn bool
}
func (c *foreignTableColumnDetector) Visit(expression expr.Expr) expr.Visitor {
switch e := expression.(type) {
case *expr.VarRef:
c.hasForeignTableColumn = c.hasForeignTableColumn || (e.TableID > 0)
}
return c
}
// processFilters processes all filters and categorize them into common filters,
// prefilters, and time filters. It also collect column usages from the filters.
func (qc *AQLQueryContext) processFilters() {
// OOPK engine only supports one measure per query.
if len(qc.Query.Measures) != 1 {
qc.Error = utils.StackError(nil, "expect one measure per query, but got %d",
len(qc.Query.Measures))
return
}
// Categorize common filters and prefilters based on matched prefilters.
commonFilters := qc.Query.Measures[0].FiltersParsed
prefilters := qc.Prefilters
for index, filter := range qc.Query.FiltersParsed {
if len(prefilters) == 0 || prefilters[0] > index {
// common filters
commonFilters = append(commonFilters, filter)
} else {
qc.OOPK.Prefilters = append(qc.OOPK.Prefilters, filter)
prefilters = prefilters[1:]
}
}
var geoFilterFound bool
for _, filter := range commonFilters {
foreignTableColumnDetector := foreignTableColumnDetector{}
expr.Walk(&foreignTableColumnDetector, filter)
if foreignTableColumnDetector.hasForeignTableColumn {
var isGeoFilter bool
if qc.OOPK.geoIntersection != nil {
geoTableID := qc.OOPK.geoIntersection.shapeTableID
joinSchema := qc.TableSchemaByName[qc.Query.Joins[geoTableID-1].Table]
isGeoFilter = qc.matchGeoFilter(filter, geoTableID, joinSchema, geoFilterFound)
if qc.Error != nil {
return
}
}
if !isGeoFilter {
qc.OOPK.ForeignTableCommonFilters = append(qc.OOPK.ForeignTableCommonFilters, filter)
} else {
geoFilterFound = true
}
} else {
qc.OOPK.MainTableCommonFilters = append(qc.OOPK.MainTableCommonFilters, filter)
}
}
if qc.OOPK.geoIntersection != nil && !geoFilterFound {
qc.Error = utils.StackError(nil, "Exact one geo filter is needed if geo intersection"+
" is used during join")
return
}
// Process time filter.
qc.processTimeFilter()
if qc.Error != nil {
return
}
// Collect column usages from the filters.
for _, f := range qc.OOPK.MainTableCommonFilters {
expr.Walk(columnUsageCollector{
tableScanners: qc.TableScanners,
usages: columnUsedByAllBatches,
}, f)
}
for _, f := range qc.OOPK.ForeignTableCommonFilters {
expr.Walk(columnUsageCollector{
tableScanners: qc.TableScanners,
usages: columnUsedByAllBatches,
}, f)
}
for _, f := range qc.OOPK.Prefilters {
expr.Walk(columnUsageCollector{
tableScanners: qc.TableScanners,
usages: columnUsedByLiveBatches,
}, f)
}
if qc.OOPK.TimeFilters[0] != nil {
expr.Walk(columnUsageCollector{
tableScanners: qc.TableScanners,
usages: columnUsedByFirstArchiveBatch | columnUsedByLiveBatches,
}, qc.OOPK.TimeFilters[0])
}
if qc.OOPK.TimeFilters[1] != nil {
expr.Walk(columnUsageCollector{
tableScanners: qc.TableScanners,
usages: columnUsedByLastArchiveBatch | columnUsedByLiveBatches,
}, qc.OOPK.TimeFilters[1])
}
}
func getStrFromNumericalOrStrLiteral(e expr.Expr) (string, error) {
var str string
if strExpr, ok := e.(*expr.StringLiteral); ok {
str = strExpr.Val
} else {
if numExpr, ok := e.(*expr.NumberLiteral); ok {
str = numExpr.String()
} else {
return str, utils.StackError(nil,
"Unable to extract string from %s", e.String())
}
}
return str, nil
}
// matchGeoFilter tries to match the filter as a geo filter and prepare shapeUUIDs for aql processor. It returns whether
// the filterExpr is a geo filter.
func (qc *AQLQueryContext) matchGeoFilter(filterExpr expr.Expr, joinTableID int,
joinSchema *memCom.TableSchema, geoFilterFound bool) (geoFilterFoundInCurrentExpr bool) {
var shapeUUIDs []string
invalidOpsFound, geoFilterFoundInCurrentExpr := qc.matchGeoFilterHelper(filterExpr, joinTableID, joinSchema, &shapeUUIDs)
if qc.Error != nil {
return
}
if geoFilterFoundInCurrentExpr && invalidOpsFound {
qc.Error = utils.StackError(nil, "Only EQ and IN allowed for geo filters")
return
}
if geoFilterFoundInCurrentExpr && geoFilterFound {
qc.Error = utils.StackError(nil, "Only one geo filter is allowed")
return
}
if len(shapeUUIDs) > geoShapeLimit {
qc.Error = utils.StackError(nil, "At most %d gep shapes supported, got %d", geoShapeLimit, len(shapeUUIDs))
return
}
if geoFilterFoundInCurrentExpr {
qc.OOPK.geoIntersection.shapeUUIDs = shapeUUIDs
}
return
}
func (qc *AQLQueryContext) matchGeoFilterHelper(filterExpr expr.Expr, joinTableID int,
joinSchema *memCom.TableSchema, shapeUUIDs *[]string) (inValidOpFound, foundGeoFilter bool) {
switch e := filterExpr.(type) {
case *expr.BinaryExpr:
if e.Op == expr.OR {
inValidOpFoundL, foundGeoFilterL := qc.matchGeoFilterHelper(e.LHS, joinTableID, joinSchema, shapeUUIDs)
inValidOpFoundR, foundGeoFilterR := qc.matchGeoFilterHelper(e.RHS, joinTableID, joinSchema, shapeUUIDs)
inValidOpFound = inValidOpFoundL || inValidOpFoundR
foundGeoFilter = foundGeoFilterL || foundGeoFilterR
} else if e.Op == expr.EQ {
columnExpr := e.LHS
if paren, ok := columnExpr.(*expr.ParenExpr); ok {
columnExpr = paren.Expr
}
if column, ok := columnExpr.(*expr.VarRef); ok && column.TableID == joinTableID {
// geo filter's column must be primary key.
if joinSchema.Schema.PrimaryKeyColumns[0] != column.ColumnID {
qc.Error = utils.StackError(nil, "Geo filter column is not the primary key")
return
}
uuidStr, err := getStrFromNumericalOrStrLiteral(e.RHS)
if err != nil {
qc.Error = utils.StackError(err,
"Unable to extract uuid from expression %s", e.RHS.String())
return
}
normalizedUUID, err := utils.NormalizeUUIDString(uuidStr)
if err != nil {
qc.Error = err
return
}
foundGeoFilter = true
*shapeUUIDs = append(*shapeUUIDs, normalizedUUID)
}
} else {
inValidOpFound = true
// keep traversing to find geo fields
_, foundGeoFilterL := qc.matchGeoFilterHelper(e.LHS, joinTableID, joinSchema, shapeUUIDs)
_, foundGeoFilterR := qc.matchGeoFilterHelper(e.RHS, joinTableID, joinSchema, shapeUUIDs)
foundGeoFilter = foundGeoFilterL || foundGeoFilterR
}
case *expr.UnaryExpr:
inValidOpFound = true
_, foundGeoFilter = qc.matchGeoFilterHelper(e.Expr, joinTableID, joinSchema, shapeUUIDs)
}
return
}
// processTimeFilter processes the time filter by matching it against the time
// column of the main fact table. The time filter will be identified as common
// filter if it does not match with the designated time column.
func (qc *AQLQueryContext) processTimeFilter() {
from, to := qc.fromTime, qc.toTime
// Match against time column of the main fact table.
var timeColumnMatched bool
tableColumnPair := strings.SplitN(qc.Query.TimeFilter.Column, ".", 2)
if len(tableColumnPair) < 2 {
qc.Query.TimeFilter.Column = tableColumnPair[0]
} else {
qc.Query.TimeFilter.Column = tableColumnPair[1]
if tableColumnPair[0] != qc.Query.Table {
qc.Error = utils.StackError(nil, "timeFilter only supports main table: %s, got: %s", qc.Query.Table, tableColumnPair[0])
return
}
}
if qc.TableScanners[0].Schema.Schema.IsFactTable {
if from == nil {
qc.Error = utils.StackError(nil, "'from' of time filter is missing")
return
}
timeColumn := qc.TableScanners[0].Schema.Schema.Columns[0].Name
if qc.Query.TimeFilter.Column == "" || qc.Query.TimeFilter.Column == timeColumn {
timeColumnMatched = true
qc.Query.TimeFilter.Column = timeColumn
}
}
// TODO: resolve time filter column against foreign tables.
timeColumnID := 0
found := false
if qc.Query.TimeFilter.Column != "" {
// Validate column existence and type.
timeColumnID, found = qc.TableScanners[0].Schema.ColumnIDs[qc.Query.TimeFilter.Column]
if !found {
qc.Error = utils.StackError(nil, "unknown time filter column %s",
qc.Query.TimeFilter.Column)
return
}
timeColumnType := qc.TableScanners[0].Schema.ValueTypeByColumn[timeColumnID]
if timeColumnType != memCom.Uint32 {
qc.Error = utils.StackError(nil,
"expect time filter column %s of type Uint32, but got %s",
qc.Query.TimeFilter.Column, memCom.DataTypeName[timeColumnType])
return
}
}
fromExpr, toExpr := common.CreateTimeFilterExpr(&expr.VarRef{
Val: qc.Query.TimeFilter.Column,
ExprType: expr.Unsigned,
TableID: 0,
ColumnID: timeColumnID,
DataType: memCom.Uint32,
}, from, to)
qc.TableScanners[0].ArchiveBatchIDEnd = int((utils.Now().Unix() + 86399) / 86400)
if timeColumnMatched {
qc.OOPK.TimeFilters[0] = fromExpr
qc.OOPK.TimeFilters[1] = toExpr
if from != nil {
qc.TableScanners[0].ArchiveBatchIDStart = int(from.Time.Unix() / 86400)
}
if to != nil {
qc.TableScanners[0].ArchiveBatchIDEnd = int((to.Time.Unix() + 86399) / 86400)
}
} else {
if fromExpr != nil {
qc.OOPK.MainTableCommonFilters = append(qc.OOPK.MainTableCommonFilters, fromExpr)
}
if toExpr != nil {
qc.OOPK.MainTableCommonFilters = append(qc.OOPK.MainTableCommonFilters, toExpr)
}
}
}
// matchAndRewriteGeoDimension tells whether a dimension matches geo join and whether it's a valid
// geo join. It returns the rewritten geo dimension and error. If the err is non nil, it means it's a invalid geo join.
// A valid geo dimension can only in one of the following format:
// 1. UUID
// 2. hex(UUID)
func (qc *AQLQueryContext) matchAndRewriteGeoDimension(dimExpr expr.Expr) (expr.Expr, error) {
gc := &geoTableUsageCollector{
geoIntersection: *qc.OOPK.geoIntersection,
}
expr.Walk(gc, dimExpr)
if !gc.useGeoTable {
return nil, nil
}
if callExpr, ok := dimExpr.(*expr.Call); ok {
if callExpr.Name != expr.HexCallName {
return nil, utils.StackError(nil,
"Only hex function is supported on UUID type, but got %s", callExpr.Name)
}
if len(callExpr.Args) != 1 {
return nil, utils.StackError(nil,
"Exactly 1 argument allowed for hex, got %d", len(callExpr.Args))
}
dimExpr = callExpr.Args[0]
}
joinSchema := qc.TableSchemaByName[qc.Query.Joins[gc.geoIntersection.shapeTableID-1].Table]
if varRefExpr, ok := dimExpr.(*expr.VarRef); ok {
var err error
if varRefExpr.ColumnID != joinSchema.Schema.PrimaryKeyColumns[0] {
err = utils.StackError(nil, "Only geo uuid is allowed in dimensions")
}
varRefExpr.DataType = memCom.Uint8
return varRefExpr, err
}
return nil, utils.StackError(nil, "Only hex(uuid) or uuid supported, got %s", dimExpr.String())
}
// geoTableUsageCollector traverses an AST expression tree, finds VarRef columns
// and check whether it uses any geo table columns.
type geoTableUsageCollector struct {
geoIntersection geoIntersection
useGeoTable bool
}
func (g *geoTableUsageCollector) Visit(expression expr.Expr) expr.Visitor {
switch e := expression.(type) {
case *expr.VarRef:
g.useGeoTable = g.useGeoTable || e.TableID == g.geoIntersection.shapeTableID
}
return g
}
// arrayColumnUsageCollector traverses an AST expression tree, finds VarRef columns
// and check whether it uses any array column
type arrayColumnUsageCollector struct {
useArrayColumn bool
}
func (ac *arrayColumnUsageCollector) Visit(expression expr.Expr) expr.Visitor {
switch e := expression.(type) {
case *expr.VarRef:
ac.useArrayColumn = ac.useArrayColumn || memCom.IsArrayType(e.DataType)
}
return ac
}
func (qc *AQLQueryContext) processMeasure() {
// OOPK engine only supports one measure per query.
if len(qc.Query.Measures) != 1 {
qc.Error = utils.StackError(nil, "expect one measure per query, but got %d",
len(qc.Query.Measures))
return
}
if _, ok := qc.Query.Measures[0].ExprParsed.(*expr.NumberLiteral); ok {
qc.IsNonAggregationQuery = true
// in case user forgot to provide limit
if qc.Query.Limit == 0 {
qc.Query.Limit = nonAggregationQueryLimit
}
return
}
// Match and strip the aggregate function.
aggregate, ok := qc.Query.Measures[0].ExprParsed.(*expr.Call)
if !ok {
qc.Error = utils.StackError(nil, "expect aggregate function, but got %s",
qc.Query.Measures[0].Expr)
return
}
if qc.ReturnHLLData && aggregate.Name != expr.HllCallName {
qc.Error = utils.StackError(nil, "expect hll aggregate function as client specify 'Accept' as "+
"'application/hll', but got %s",
qc.Query.Measures[0].Expr)
return
}
if len(aggregate.Args) != 1 {
qc.Error = utils.StackError(nil,
"expect one parameter for aggregate function %s, but got %d",
aggregate.Name, len(aggregate.Args))
return
}
qc.OOPK.Measure = aggregate.Args[0]
// check if any array column is used in measure
ac := &arrayColumnUsageCollector{}
expr.Walk(ac, qc.OOPK.Measure)
if ac.useArrayColumn {
qc.Error = utils.StackError(nil,
"Array column is not allowed to be used in measure: %s", qc.OOPK.Measure.String())
return
}
// default is 4 bytes
qc.OOPK.MeasureBytes = 4
switch strings.ToLower(aggregate.Name) {
case expr.CountCallName:
qc.OOPK.Measure = &expr.NumberLiteral{
Int: 1,
Expr: "1",
ExprType: expr.Unsigned,
}
qc.OOPK.AggregateType = C.AGGR_SUM_UNSIGNED
case expr.SumCallName:
qc.OOPK.MeasureBytes = 8
switch qc.OOPK.Measure.Type() {
case expr.Float:
qc.OOPK.AggregateType = C.AGGR_SUM_FLOAT
case expr.Signed:
qc.OOPK.AggregateType = C.AGGR_SUM_SIGNED
case expr.Unsigned:
qc.OOPK.AggregateType = C.AGGR_SUM_UNSIGNED
default:
qc.Error = utils.StackError(nil,
unsupportedInputType, expr.SumCallName, qc.OOPK.Measure.String())
return
}
case expr.AvgCallName:
// 4 bytes for storing average result and another 4 byte for count
qc.OOPK.MeasureBytes = 8
// for average, we should always use float type as the agg type.
qc.OOPK.AggregateType = C.AGGR_AVG_FLOAT
case expr.MinCallName:
switch qc.OOPK.Measure.Type() {
case expr.Float:
qc.OOPK.AggregateType = C.AGGR_MIN_FLOAT
case expr.Signed:
qc.OOPK.AggregateType = C.AGGR_MIN_SIGNED
case expr.Unsigned:
qc.OOPK.AggregateType = C.AGGR_MIN_UNSIGNED
default:
qc.Error = utils.StackError(nil,
unsupportedInputType, expr.MinCallName, qc.OOPK.Measure.String())
return
}
case expr.MaxCallName:
switch qc.OOPK.Measure.Type() {
case expr.Float:
qc.OOPK.AggregateType = C.AGGR_MAX_FLOAT
case expr.Signed:
qc.OOPK.AggregateType = C.AGGR_MAX_SIGNED
case expr.Unsigned:
qc.OOPK.AggregateType = C.AGGR_MAX_UNSIGNED
default:
qc.Error = utils.StackError(nil,
unsupportedInputType, expr.MaxCallName, qc.OOPK.Measure.String())
return
}
case expr.HllCallName:
qc.OOPK.AggregateType = C.AGGR_HLL
default:
qc.Error = utils.StackError(nil,
"unsupported aggregate function: %s", aggregate.Name)
return
}
}
func (qc *AQLQueryContext) getAllColumnsDimension() (columns []common.Dimension) {
// only main table columns wildcard match supported
for _, column := range qc.TableScanners[0].Schema.Schema.Columns {
dataType := memCom.DataTypeFromString(column.Type)
// no geoshape and array type directly supported as dimension
if !column.Deleted && column.Type != metaCom.GeoShape && !memCom.IsArrayType(dataType) {
columns = append(columns, common.Dimension{
ExprParsed: &expr.VarRef{Val: column.Name},
Expr: column.Name,
})
}
}
return
}
func (qc *AQLQueryContext) processDimensions() {
// Copy dimension ASTs.
qc.OOPK.Dimensions = make([]expr.Expr, len(qc.Query.Dimensions))
for i, dim := range qc.Query.Dimensions {
// TODO: support numeric bucketizer.
qc.OOPK.Dimensions[i] = dim.ExprParsed
if dim.ExprParsed.Type() == expr.GeoShape {
qc.Error = utils.StackError(nil,
"GeoShape can not be used for dimension: %s", dim.Expr)
return
}
// array column can not be used as dimension directly
if dimExpr, ok := dim.ExprParsed.(*expr.VarRef); ok && memCom.IsArrayType(dimExpr.DataType) {
qc.Error = utils.StackError(nil,
"Array column can not be used for dimension directly: %s", dim.Expr)
return
}
}
if qc.OOPK.geoIntersection != nil {
gc := &geoTableUsageCollector{
geoIntersection: *qc.OOPK.geoIntersection,
}
// Check whether measure and dimensions are referencing any geo table columns.
expr.Walk(gc, qc.OOPK.Measure)
if gc.useGeoTable {
qc.Error = utils.StackError(nil,
"Geo table column is not allowed to be used in measure: %s", qc.OOPK.Measure.String())
return
}
foundGeoJoin := false
for i, dimExpr := range qc.OOPK.Dimensions {
geoDimExpr, err := qc.matchAndRewriteGeoDimension(dimExpr)
if err != nil {
qc.Error = err
return
}
if geoDimExpr != nil {
if foundGeoJoin {
qc.Error = utils.StackError(nil,
"Only one geo dimension allowed: %s", dimExpr.String())
return
}
foundGeoJoin = true
qc.OOPK.Dimensions[i] = geoDimExpr
qc.OOPK.geoIntersection.dimIndex = i
}
}
}
// Collect column usage from measure and dimensions
expr.Walk(columnUsageCollector{
tableScanners: qc.TableScanners,
usages: columnUsedByAllBatches,
}, qc.OOPK.Measure)
for _, dim := range qc.OOPK.Dimensions {
expr.Walk(columnUsageCollector{
tableScanners: qc.TableScanners,
usages: columnUsedByAllBatches,
}, dim)
}
}
// Sort dimension columns based on the data width in bytes
// dimension columns in OOPK will not be reordered, but a mapping
// from original id to ordered offsets (value and validity) in
// dimension vector will be stored.
// GeoUUID dimension will be 1 bytes. VarRef expression will use column data length,
// others will be default to 4 bytes.
func (qc *AQLQueryContext) sortDimensionColumns() {
orderedIndex := 0
numDimensions := len(qc.OOPK.Dimensions)
qc.OOPK.DimensionVectorIndex = make([]int, numDimensions)
byteWidth := 1 << uint(len(qc.OOPK.NumDimsPerDimWidth)-1)
for byteIndex := range qc.OOPK.NumDimsPerDimWidth {
for originIndex, dim := range qc.OOPK.Dimensions {
dataBytes := common.GetDimensionDataBytes(dim)
if dataBytes == byteWidth {
// record value offset, null offset pair
// null offsets will have to add total dim bytes later
qc.OOPK.DimensionVectorIndex[originIndex] = orderedIndex
qc.OOPK.NumDimsPerDimWidth[byteIndex]++
qc.OOPK.DimRowBytes += dataBytes
orderedIndex++
}
}
byteWidth >>= 1
}
// plus one byte per dimension column for validity
qc.OOPK.DimRowBytes += numDimensions
if !qc.IsNonAggregationQuery {
// no dimension size checking for non-aggregation query
if qc.OOPK.DimRowBytes > C.MAX_DIMENSION_BYTES {
qc.Error = utils.StackError(nil, "maximum dimension bytes: %d, got: %d", C.MAX_DIMENSION_BYTES, qc.OOPK.DimRowBytes)
return
}
}
}
func (qc *AQLQueryContext) sortUsedColumns() {
for _, scanner := range qc.TableScanners {
scanner.Columns = make([]int, 0, len(scanner.ColumnUsages))
scanner.ColumnsByIDs = make(map[int]int)
// Unsorted/uncompressed columns
for columnID := range scanner.ColumnUsages {
if utils.IndexOfInt(scanner.Schema.Schema.ArchivingSortColumns, columnID) < 0 {
scanner.ColumnsByIDs[columnID] = len(scanner.Columns)
scanner.Columns = append(scanner.Columns, columnID)
}
}
// Sorted/compressed columns
for i := len(scanner.Schema.Schema.ArchivingSortColumns) - 1; i >= 0; i-- {
columnID := scanner.Schema.Schema.ArchivingSortColumns[i]
_, found := scanner.ColumnUsages[columnID]
if found {
scanner.ColumnsByIDs[columnID] = len(scanner.Columns)
scanner.Columns = append(scanner.Columns, columnID)
}
}
}
}
func parseTimezoneColumnString(timezoneColumnString string) (column, joinKey string, success bool) {
exp, err := expr.ParseExpr(timezoneColumnString)
if err != nil {
return
}
if c, ok := exp.(*expr.Call); ok {
if len(c.Args) == 1 {
return c.Name, c.Args[0].String(), true
}
}
return
}
func (qc *AQLQueryContext) expandINop(e *expr.BinaryExpr) (expandedExpr expr.Expr) {
lhs, ok := e.LHS.(*expr.VarRef)
if !ok {
qc.Error = utils.StackError(nil, "lhs of IN or NOT_IN must be a valid column")
}
rhs := e.RHS
switch rhsTyped := rhs.(type) {
case *expr.Call:
expandedExpr = &expr.BooleanLiteral{Val: false}
for _, value := range rhsTyped.Args {
switch expandedExpr.(type) {
case *expr.BooleanLiteral:
expandedExpr = qc.Rewrite(&expr.BinaryExpr{
Op: expr.EQ,
LHS: lhs,
RHS: value,
}).(*expr.BinaryExpr)
default:
lastExpr := expandedExpr
expandedExpr = &expr.BinaryExpr{
Op: expr.OR,
LHS: lastExpr,
RHS: qc.Rewrite(&expr.BinaryExpr{
Op: expr.EQ,
LHS: lhs,
RHS: value,
}).(*expr.BinaryExpr),
}
}
}
break
default:
qc.Error = utils.StackError(nil, "only EQ and IN operators are supported for geo fields")
}
return
}