broker/result_merge.go (111 lines of code) (raw):

// Copyright (c) 2017-2018 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package broker import ( "fmt" "github.com/uber/aresdb/broker/common" queryCom "github.com/uber/aresdb/query/common" "github.com/uber/aresdb/utils" "reflect" ) func newResultMergeContext(aggType common.AggType) resultMergeContext { return resultMergeContext{ agg: aggType, path: []string{}, } } // resultMergeContext is the context for merging results // caller should check for err after calling type resultMergeContext struct { agg common.AggType parent queryCom.AQLQueryResult path []string err error } // run merges results from rhs to lhs in place func (c *resultMergeContext) run(lhs, rhs queryCom.AQLQueryResult) queryCom.AQLQueryResult { c.mergeResultsRecursive(map[string]interface{}(lhs), map[string]interface{}(rhs)) return lhs } func (c *resultMergeContext) mergeResultsRecursive(lhs, rhs interface{}) { if lhs == nil && rhs == nil { return } if lhs == nil { if c.agg == common.Avg { c.err = utils.StackError(nil, "error calculating avg: some dimension has only sum. path: %v", c.path) } c.parent[c.path[len(c.path)-1]] = rhs lhs = rhs return } if rhs == nil { if c.agg == common.Avg { c.err = utils.StackError(nil, "error calculating avg: some dimension has only count. path: %v", c.path) } c.parent[c.path[len(c.path)-1]] = lhs return } lhsType := reflect.TypeOf(lhs) rhsType := reflect.TypeOf(rhs) if lhsType != rhsType { c.err = utils.StackError(nil, fmt.Sprintf("error merging: different type lhs: %s vs. rhs: %s", lhsType, rhsType)) return } switch l := lhs.(type) { case float64: r := rhs.(float64) switch c.agg { case common.Count, common.Sum: l = l + r case common.Max: if r > l { l = r } case common.Min: if r < l { l = r } case common.Avg: l = l / r } c.parent[c.path[len(c.path)-1]] = l case queryCom.HLL: r := rhs.(queryCom.HLL) if c.agg != common.Hll { c.err = utils.StackError(nil, fmt.Sprintf("error merging: HLL value found for non Hll aggregation: %d", c.agg)) } l.Merge(r) c.parent[c.path[len(c.path)-1]] = l case map[string]interface{}: r := rhs.(map[string]interface{}) for k, lv := range l { prevPath := c.path c.path = append(c.path, k) prevParent := c.parent c.parent = l c.mergeResultsRecursive(lv, r[k]) if c.err != nil { c.err = utils.StackError(c.err, "failed to merge results, path: %v", c.path) return } c.path = prevPath c.parent = prevParent } for k, rv := range r { if _, exists := l[k]; !exists { prevPath := c.path c.path = append(c.path, k) prevParent := c.parent c.parent = l c.mergeResultsRecursive(l[k], rv) if c.err != nil { c.err = utils.StackError(c.err, "failed to merge results, path: %v", c.path) return } c.path = prevPath c.parent = prevParent } } if c.parent != nil { c.parent[c.path[len(c.path)-1]] = l } default: // should not happen utils.GetLogger().Panic("unknown type ", reflect.TypeOf(lhs)) } }