broker/query_compiler.go (327 lines of code) (raw):

// Copyright (c) 2017-2018 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package broker import ( memCom "github.com/uber/aresdb/memstore/common" metaCom "github.com/uber/aresdb/metastore/common" "github.com/uber/aresdb/query/common" "github.com/uber/aresdb/query/expr" "github.com/uber/aresdb/utils" "net/http" "github.com/uber/aresdb/query/context" ) const ( nonAggregationQueryLimit = 1000 ) // QueryContext is broker query context type QueryContext struct { AQLQuery *common.AQLQuery IsNonAggregationQuery bool ReturnHLLBinary bool Writer http.ResponseWriter Error error Tables []*memCom.TableSchema TableIDByAlias map[string]int TableSchemaByName map[string]*memCom.TableSchema NumDimsPerDimWidth common.DimCountsPerDimWidth // lookup table from enum dimension index to EnumDict, used for postprocessing DimensionEnumReverseDicts map[int][]string // this should be the same as generated by datanodes. in the future we should pass // it down to datanodes DimensionVectorIndex []int DimRowBytes int RequestID string // helper used to share common codes QCHelper *context.QueryContextHelper } // NewQueryContext creates new query context func NewQueryContext(aql *common.AQLQuery, returnHLLBinary bool, w http.ResponseWriter) *QueryContext { ctx := QueryContext{ AQLQuery: aql, ReturnHLLBinary: returnHLLBinary, Writer: w, DimensionEnumReverseDicts: make(map[int][]string), } ctx.QCHelper = &context.QueryContextHelper{ QCOptions: &ctx, } return &ctx } // GetRewrittenQuery get the rewritten query after query parsing func (qc *QueryContext) GetRewrittenQuery() common.AQLQuery { newQuery := *qc.AQLQuery for i, measure := range newQuery.Measures { if measure.ExprParsed != nil { measure.Expr = measure.ExprParsed.String() newQuery.Measures[i] = measure } } for i, join := range newQuery.Joins { for j := range join.Conditions { if j < len(join.ConditionsParsed) && join.ConditionsParsed[j] != nil { join.Conditions[j] = join.ConditionsParsed[j].String() } } newQuery.Joins[i] = join } for i, dim := range newQuery.Dimensions { if dim.ExprParsed != nil { dim.Expr = dim.ExprParsed.String() newQuery.Dimensions[i] = dim } } for i := range newQuery.Filters { if i < len(newQuery.FiltersParsed) && newQuery.FiltersParsed[i] != nil { newQuery.Filters[i] = newQuery.FiltersParsed[i].String() } } for i, measure := range newQuery.SupportingMeasures { if measure.ExprParsed != nil { measure.Expr = measure.ExprParsed.String() newQuery.SupportingMeasures[i] = measure } } for i, dim := range newQuery.SupportingDimensions { if dim.ExprParsed != nil { dim.Expr = dim.ExprParsed.String() newQuery.SupportingDimensions[i] = dim } } return newQuery } // Compile parses expressions into ast, load schema from schema reader, resolve types, // and collects meta data needed by post processing func (qc *QueryContext) Compile(tableSchemaReader memCom.TableSchemaReader) { qc.readSchema(tableSchemaReader) defer qc.releaseSchema() if qc.Error != nil { return } qc.processJoins() if qc.Error != nil { return } qc.processMeasures() if qc.Error != nil { return } qc.processDimensions() if qc.Error != nil { return } qc.processFilters() if qc.Error != nil { return } qc.sortDimensionColumns() return } func (qc *QueryContext) readSchema(tableSchemaReader memCom.TableSchemaReader) { qc.Tables = make([]*memCom.TableSchema, 1+len(qc.AQLQuery.Joins)) qc.TableIDByAlias = make(map[string]int) qc.TableSchemaByName = make(map[string]*memCom.TableSchema) tableSchemaReader.RLock() defer tableSchemaReader.RUnlock() var ( err error schema *memCom.TableSchema ) // Main table. schema, err = tableSchemaReader.GetSchema(qc.AQLQuery.Table) if err != nil { qc.Error = utils.StackError(err, "unknown main table %s", qc.AQLQuery.Table) return } qc.TableSchemaByName[qc.AQLQuery.Table] = schema schema.RLock() qc.Tables[0] = schema qc.TableIDByAlias[qc.AQLQuery.Table] = 0 // Foreign tables. for i, join := range qc.AQLQuery.Joins { schema, err = tableSchemaReader.GetSchema(join.Table) if err != nil { qc.Error = utils.StackError(err, "unknown join table %s", join.Table) return } if qc.TableSchemaByName[join.Table] == nil { qc.TableSchemaByName[join.Table] = schema // Prevent double locking. schema.RLock() } qc.Tables[1+i] = schema alias := join.Alias if alias == "" { alias = join.Table } _, exists := qc.TableIDByAlias[alias] if exists { qc.Error = utils.StackError(nil, "table alias %s is redefined", alias) return } qc.TableIDByAlias[alias] = 1 + i } } func (qc *QueryContext) releaseSchema() { for _, schema := range qc.TableSchemaByName { schema.RUnlock() } } func (qc *QueryContext) processJoins() { var err error for i, join := range qc.AQLQuery.Joins { join.ConditionsParsed = make([]expr.Expr, len(join.Conditions)) for j, cond := range join.Conditions { join.ConditionsParsed[j], err = expr.ParseExpr(cond) if err != nil { qc.Error = utils.StackError(err, "Failed to parse join condition: %s", cond) return } join.ConditionsParsed[j] = expr.Rewrite(qc, join.ConditionsParsed[j]) if qc.Error != nil { return } } qc.AQLQuery.Joins[i] = join } } func (qc *QueryContext) processFilters() { var err error qc.AQLQuery.FiltersParsed = make([]expr.Expr, len(qc.AQLQuery.Filters)) for i, filter := range qc.AQLQuery.Filters { qc.AQLQuery.FiltersParsed[i], err = expr.ParseExpr(filter) if err != nil { qc.Error = utils.StackError(err, "Failed to parse filter %s", filter) return } qc.AQLQuery.FiltersParsed[i] = expr.Rewrite(qc, qc.AQLQuery.FiltersParsed[i]) if qc.Error != nil { return } } qc.AQLQuery.FiltersParsed = qc.QCHelper.NormalizeAndFilters(qc.AQLQuery.FiltersParsed) } func (qc *QueryContext) processMeasures() { var err error for i, measure := range qc.AQLQuery.Measures { measure.ExprParsed, err = expr.ParseExpr(measure.Expr) if err != nil { qc.Error = utils.StackError(err, "Failed to parse measure: %s", measure.Expr) return } measure.ExprParsed = expr.Rewrite(qc, measure.ExprParsed) if qc.Error != nil { return } measure.FiltersParsed = make([]expr.Expr, len(measure.Filters)) for j, filter := range measure.Filters { measure.FiltersParsed[j], err = expr.ParseExpr(filter) if err != nil { qc.Error = utils.StackError(err, "Failed to parse measure filter %s", filter) return } measure.FiltersParsed[j] = expr.Rewrite(qc, measure.FiltersParsed[j]) if qc.Error != nil { return } } measure.FiltersParsed = qc.QCHelper.NormalizeAndFilters(measure.FiltersParsed) qc.AQLQuery.Measures[i] = measure } // ony support 1 measure for now if len(qc.AQLQuery.Measures) != 1 { qc.Error = utils.StackError(nil, "expect one measure per query, but got %d", len(qc.AQLQuery.Measures)) return } if _, ok := qc.AQLQuery.Measures[0].ExprParsed.(*expr.NumberLiteral); ok { qc.IsNonAggregationQuery = true // in case user forgot to provide limit if qc.AQLQuery.Limit == 0 { qc.AQLQuery.Limit = nonAggregationQueryLimit } return } aggregate, ok := qc.AQLQuery.Measures[0].ExprParsed.(*expr.Call) if !ok { qc.Error = utils.StackError(nil, "expect aggregate function, but got %s", qc.AQLQuery.Measures[0].Expr) return } if len(aggregate.Args) != 1 { qc.Error = utils.StackError(nil, "expect one parameter for aggregate function %s, but got %d", aggregate.Name, len(aggregate.Args)) return } if qc.ReturnHLLBinary && aggregate.Name != expr.HllCallName { qc.Error = utils.StackError(nil, "expect hll aggregate function as client specify 'Accept' as "+ "'application/hll', but got %s", qc.AQLQuery.Measures[0].Expr) return } } func (qc *QueryContext) processDimensions() { rawDims := qc.AQLQuery.Dimensions qc.AQLQuery.Dimensions = []common.Dimension{} qc.DimensionVectorIndex = make([]int, len(rawDims)) for _, dim := range rawDims { var err error dim.ExprParsed, err = expr.ParseExpr(dim.Expr) if err != nil { qc.Error = utils.StackError(err, "Failed to parse dimension: %s", dim.Expr) return } if _, ok := dim.ExprParsed.(*expr.Wildcard); ok && qc.IsNonAggregationQuery { qc.AQLQuery.Dimensions = append(qc.AQLQuery.Dimensions, qc.getAllColumnsDimension()...) } else { qc.AQLQuery.Dimensions = append(qc.AQLQuery.Dimensions, dim) } } for idx, dim := range qc.AQLQuery.Dimensions { dim.ExprParsed = expr.Rewrite(qc, dim.ExprParsed) if vr, ok := dim.ExprParsed.(*expr.VarRef); ok { if len(vr.EnumReverseDict) > 0 { qc.DimensionEnumReverseDicts[idx] = vr.EnumReverseDict } } qc.AQLQuery.Dimensions[idx] = dim } } func (qc *QueryContext) sortDimensionColumns() { orderedIndex := 0 numDimensions := len(qc.AQLQuery.Dimensions) qc.DimensionVectorIndex = make([]int, numDimensions) byteWidth := 1 << uint(len(qc.NumDimsPerDimWidth)-1) for byteIndex := range qc.NumDimsPerDimWidth { for originIndex, dim := range qc.AQLQuery.Dimensions { dataBytes := common.GetDimensionDataBytes(dim.ExprParsed) if dataBytes == byteWidth { // record value offset, null offset pair // null offsets will have to add total dim bytes later qc.DimensionVectorIndex[originIndex] = orderedIndex qc.NumDimsPerDimWidth[byteIndex]++ qc.DimRowBytes += dataBytes orderedIndex++ } } byteWidth >>= 1 } // plus one byte per dimension column for validity qc.DimRowBytes += numDimensions } func (qc *QueryContext) getAllColumnsDimension() (columns []common.Dimension) { // only main table columns wildcard match supported for _, column := range qc.Tables[0].Schema.Columns { if !column.Deleted && column.Type != metaCom.GeoShape { columns = append(columns, common.Dimension{ Expr: column.Name, ExprParsed: &expr.VarRef{Val: column.Name}, }) } } return } // Rewrite walks the expresison AST and resolves data types bottom up. // In addition it also translates enum strings and rewrites their predicates. func (qc *QueryContext) Rewrite(expression expr.Expr) expr.Expr { return qc.QCHelper.Rewrite(expression) } func (qc *QueryContext) InitQCHelper() { qc.QCHelper = &context.QueryContextHelper{ QCOptions: qc, } } func (qc *QueryContext) GetSchema(tableID int) *memCom.TableSchema { return qc.Tables[tableID] } func (qc *QueryContext) GetTableID(alias string) (int, bool) { id, exists := qc.TableIDByAlias[alias] return id, exists } func (qc *QueryContext) GetQuery() *common.AQLQuery { return qc.AQLQuery } func (qc *QueryContext) SetError(err error) { qc.Error = err } func (qc *QueryContext) IsDataOnly() bool { return false }