pkg/index/inverted/query.go (639 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package inverted
import (
"encoding/json"
"fmt"
"math"
"strings"
"github.com/blugelabs/bluge"
"github.com/pkg/errors"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/query/logical"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
const (
termRangeQuery = "termRangeQuery"
timeRangeQuery = "timeRangeQuery"
)
var (
minTerm = string([][]byte{convert.Int64ToBytes(math.MinInt64)}[0])
maxTerm = string([][]byte{convert.Int64ToBytes(math.MaxInt64)}[0])
minInf = "-inf"
maxInf = "+inf"
)
// GlobalIndexError represents a index rule is "global".
// The local filter can't handle it.
type GlobalIndexError struct {
IndexRule *databasev1.IndexRule
Expr logical.LiteralExpr
}
func (g GlobalIndexError) Error() string { return g.IndexRule.String() }
var _ index.Query = (*queryNode)(nil)
// queryNode is a wrapper for bluge.Query.
type queryNode struct {
query bluge.Query
node
}
func (q *queryNode) String() string {
return q.node.String()
}
// BuildQuery returns blugeQuery for local indices.
func BuildQuery(criteria *modelv1.Criteria, schema logical.Schema, entityDict map[string]int,
entity []*modelv1.TagValue,
) (index.Query, [][]*modelv1.TagValue, bool, error) {
if criteria == nil {
return nil, [][]*modelv1.TagValue{entity}, false, nil
}
switch criteria.GetExp().(type) {
case *modelv1.Criteria_Condition:
cond := criteria.GetCondition()
expr, parsedEntity, err := logical.ParseExprOrEntity(entityDict, entity, cond)
if err != nil {
return nil, nil, false, err
}
if parsedEntity != nil {
return nil, parsedEntity, false, nil
}
if ok, indexRule := schema.IndexDefined(cond.Name); ok {
fk := index.FieldKey{IndexRuleID: indexRule.Metadata.Id}
q, err := parseConditionToQuery(cond, indexRule, expr, fk.Marshal())
if err != nil {
return nil, nil, false, err
}
return q, [][]*modelv1.TagValue{entity}, false, nil
}
return nil, nil, false, errors.Wrapf(logical.ErrUnsupportedConditionOp, "mandatory index rule conf:%s", cond)
case *modelv1.Criteria_Le:
le := criteria.GetLe()
if le.GetLeft() == nil && le.GetRight() == nil {
return nil, nil, false, errors.WithMessagef(logical.ErrInvalidLogicalExpression, "both sides(left and right) of [%v] are empty", criteria)
}
if le.GetLeft() == nil {
return BuildQuery(le.Right, schema, entityDict, entity)
}
if le.GetRight() == nil {
return BuildQuery(le.Left, schema, entityDict, entity)
}
left, leftEntities, leftIsMatchAllQuery, err := BuildQuery(le.Left, schema, entityDict, entity)
if err != nil {
return nil, nil, false, err
}
right, rightEntities, rightIsMatchAllQuery, err := BuildQuery(le.Right, schema, entityDict, entity)
if err != nil {
return nil, nil, false, err
}
entities := logical.ParseEntities(le.Op, entity, leftEntities, rightEntities)
if entities == nil {
return nil, nil, false, nil
}
if left == nil && right == nil {
return nil, entities, false, nil
}
if leftIsMatchAllQuery && rightIsMatchAllQuery {
return &queryNode{
query: bluge.NewMatchAllQuery(),
node: newMatchAllNode(),
}, entities, true, nil
}
switch le.Op {
case modelv1.LogicalExpression_LOGICAL_OP_AND:
query, node := bluge.NewBooleanQuery(), newMustNode()
if left != nil {
query.AddMust(left.(*queryNode).query)
node.Append(left.(*queryNode).node)
}
if right != nil {
query.AddMust(right.(*queryNode).query)
node.Append(right.(*queryNode).node)
}
return &queryNode{query, node}, entities, false, nil
case modelv1.LogicalExpression_LOGICAL_OP_OR:
if leftIsMatchAllQuery || rightIsMatchAllQuery {
return &queryNode{
query: bluge.NewMatchAllQuery(),
node: newMatchAllNode(),
}, entities, true, nil
}
query, node := bluge.NewBooleanQuery(), newShouldNode()
query.SetMinShould(1)
if left != nil {
query.AddShould(left.(*queryNode).query)
node.Append(left.(*queryNode).node)
}
if right != nil {
query.AddShould(right.(*queryNode).query)
node.Append(right.(*queryNode).node)
}
return &queryNode{query, node}, entities, false, nil
}
return nil, nil, false, logical.ErrInvalidCriteriaType
}
return nil, nil, false, logical.ErrInvalidCriteriaType
}
// BuildIndexModeQuery returns blugeQuery for index mode.
func BuildIndexModeQuery(measureName string, criteria *modelv1.Criteria, schema logical.Schema) (index.Query, error) {
var subjectQuery bluge.Query
var subjectNode node
if measureName != "" {
subjectQuery = bluge.NewTermQuery(measureName).SetField(index.IndexModeName)
subjectNode = newTermNode(measureName, nil)
}
if criteria == nil {
if subjectQuery == nil {
return nil, nil
}
return &queryNode{
query: subjectQuery,
node: subjectNode,
}, nil
}
entityList := schema.EntityList()
entityDict := make(map[string]int)
for idx, e := range entityList {
entityDict[e] = idx
}
criteriaQuery, err := buildIndexModeCriteria(criteria, schema, entityDict)
if err != nil {
return nil, err
}
if subjectQuery == nil {
return criteriaQuery, nil
}
query, node := bluge.NewBooleanQuery(), newMustNode()
query.AddMust(subjectQuery)
query.AddMust(criteriaQuery.(*queryNode).query)
node.Append(subjectNode)
node.Append(criteriaQuery.(*queryNode).node)
return &queryNode{query, node}, nil
}
func buildIndexModeCriteria(criteria *modelv1.Criteria, schema logical.Schema, entityDict map[string]int) (index.Query, error) {
switch criteria.GetExp().(type) {
case *modelv1.Criteria_Condition:
cond := criteria.GetCondition()
expr, err := logical.ParseExpr(cond)
if err != nil {
return nil, err
}
if ok, indexRule := schema.IndexDefined(cond.Name); ok {
fk := index.FieldKey{IndexRuleID: indexRule.Metadata.Id}
return parseConditionToQuery(cond, indexRule, expr, fk.Marshal())
}
if _, ok := entityDict[cond.Name]; ok {
fk := index.FieldKey{TagName: index.IndexModeEntityTagPrefix + cond.Name}
return parseConditionToQuery(cond, nil, expr, fk.Marshal())
}
return nil, errors.Wrapf(logical.ErrUnsupportedConditionOp, "mandatory index rule conf:%s", cond)
case *modelv1.Criteria_Le:
le := criteria.GetLe()
if le.GetLeft() == nil && le.GetRight() == nil {
return nil, errors.WithMessagef(logical.ErrInvalidLogicalExpression, "both sides(left and right) of [%v] are empty", criteria)
}
if le.GetLeft() == nil {
return buildIndexModeCriteria(le.Right, schema, entityDict)
}
if le.GetRight() == nil {
return buildIndexModeCriteria(le.Left, schema, entityDict)
}
left, err := buildIndexModeCriteria(le.Left, schema, entityDict)
if err != nil {
return nil, err
}
right, err := buildIndexModeCriteria(le.Right, schema, entityDict)
if err != nil {
return nil, err
}
switch le.Op {
case modelv1.LogicalExpression_LOGICAL_OP_AND:
query, node := bluge.NewBooleanQuery(), newMustNode()
if left != nil {
query.AddMust(left.(*queryNode).query)
node.Append(left.(*queryNode).node)
}
if right != nil {
query.AddMust(right.(*queryNode).query)
node.Append(right.(*queryNode).node)
}
return &queryNode{query, node}, nil
case modelv1.LogicalExpression_LOGICAL_OP_OR:
query, node := bluge.NewBooleanQuery(), newShouldNode()
query.SetMinShould(1)
if left != nil {
query.AddShould(left.(*queryNode).query)
node.Append(left.(*queryNode).node)
}
if right != nil {
query.AddShould(right.(*queryNode).query)
node.Append(right.(*queryNode).node)
}
return &queryNode{query, node}, nil
}
return nil, logical.ErrInvalidCriteriaType
}
return nil, logical.ErrInvalidCriteriaType
}
func parseConditionToQuery(cond *modelv1.Condition, indexRule *databasev1.IndexRule,
expr logical.LiteralExpr, fieldKey string,
) (*queryNode, error) {
str := expr.String()
switch cond.Op {
case modelv1.Condition_BINARY_OP_GT:
bb := expr.Bytes()
if len(bb) != 1 {
return nil, errors.WithMessagef(logical.ErrUnsupportedConditionOp, "don't support multiple or null value: %s", cond)
}
query := bluge.NewTermRangeInclusiveQuery(convert.BytesToString(bb[0]), maxTerm, false, false).SetField(fieldKey)
node := newTermRangeInclusiveNode(str, maxInf, false, false, indexRule, false)
return &queryNode{query, node}, nil
case modelv1.Condition_BINARY_OP_GE:
bb := expr.Bytes()
if len(bb) != 1 {
return nil, errors.WithMessagef(logical.ErrUnsupportedConditionOp, "don't support multiple or null value: %s", cond)
}
query := bluge.NewTermRangeInclusiveQuery(convert.BytesToString(bb[0]), maxTerm, true, false).SetField(fieldKey)
node := newTermRangeInclusiveNode(str, maxInf, true, false, indexRule, false)
return &queryNode{query, node}, nil
case modelv1.Condition_BINARY_OP_LT:
bb := expr.Bytes()
if len(bb) != 1 {
return nil, errors.WithMessagef(logical.ErrUnsupportedConditionOp, "don't support multiple or null value: %s", cond)
}
query := bluge.NewTermRangeInclusiveQuery(minTerm, convert.BytesToString(bb[0]), false, false).SetField(fieldKey)
node := newTermRangeInclusiveNode(minInf, str, false, false, indexRule, false)
return &queryNode{query, node}, nil
case modelv1.Condition_BINARY_OP_LE:
bb := expr.Bytes()
if len(bb) != 1 {
return nil, errors.WithMessagef(logical.ErrUnsupportedConditionOp, "don't support multiple or null value: %s", cond)
}
query := bluge.NewTermRangeInclusiveQuery(minTerm, convert.BytesToString(bb[0]), false, true).SetField(fieldKey)
node := newTermRangeInclusiveNode(minInf, str, false, true, indexRule, false)
return &queryNode{query, node}, nil
case modelv1.Condition_BINARY_OP_EQ:
bb := expr.Bytes()
if len(bb) != 1 {
return nil, errors.WithMessagef(logical.ErrUnsupportedConditionOp, "don't support multiple or null value: %s", cond)
}
query := bluge.NewTermQuery(convert.BytesToString(bb[0])).SetField(fieldKey)
node := newTermNode(str, indexRule)
return &queryNode{query, node}, nil
case modelv1.Condition_BINARY_OP_MATCH:
if indexRule == nil {
return nil, errors.WithMessagef(logical.ErrUnsupportedConditionOp, "index rule is mandatory for match operation: %s", cond)
}
bb := expr.Bytes()
if len(bb) != 1 {
return nil, errors.WithMessagef(logical.ErrUnsupportedConditionOp, "don't support multiple or null value: %s", cond)
}
analyzer, operator := getMatchOptions(indexRule.Analyzer, cond.MatchOption)
query := bluge.NewMatchQuery(convert.BytesToString(bb[0])).SetField(fieldKey).SetAnalyzer(analyzer).SetOperator(operator)
node := newMatchNode(str, indexRule)
return &queryNode{query, node}, nil
case modelv1.Condition_BINARY_OP_NE:
bb := expr.Bytes()
if len(bb) != 1 {
return nil, errors.WithMessagef(logical.ErrUnsupportedConditionOp, "don't support multiple or null value: %s", cond)
}
query, node := bluge.NewBooleanQuery(), newMustNotNode()
query.AddMustNot(bluge.NewTermQuery(convert.BytesToString(bb[0])).SetField(fieldKey))
node.SetSubNode(newTermNode(str, indexRule))
return &queryNode{query, node}, nil
case modelv1.Condition_BINARY_OP_HAVING:
bb, elements := expr.Bytes(), expr.Elements()
query, node := bluge.NewBooleanQuery(), newMustNode()
for _, b := range bb {
query.AddMust(bluge.NewTermQuery(string(b)).SetField(fieldKey))
}
for _, e := range elements {
node.Append(newTermNode(e, indexRule))
}
return &queryNode{query, node}, nil
case modelv1.Condition_BINARY_OP_NOT_HAVING:
bb, elements := expr.Bytes(), expr.Elements()
subQuery, subNode := bluge.NewBooleanQuery(), newMustNode()
for _, b := range bb {
subQuery.AddMust(bluge.NewTermQuery(string(b)).SetField(fieldKey))
}
for _, e := range elements {
subNode.Append(newTermNode(e, indexRule))
}
query, node := bluge.NewBooleanQuery(), newMustNotNode()
query.AddMustNot(subQuery)
node.SetSubNode(node)
return &queryNode{query, node}, nil
case modelv1.Condition_BINARY_OP_IN:
bb, elements := expr.Bytes(), expr.Elements()
query, node := bluge.NewBooleanQuery(), newShouldNode()
query.SetMinShould(1)
for _, b := range bb {
query.AddShould(bluge.NewTermQuery(string(b)).SetField(fieldKey))
}
for _, e := range elements {
node.Append(newTermNode(e, indexRule))
}
return &queryNode{query, node}, nil
case modelv1.Condition_BINARY_OP_NOT_IN:
bb, elements := expr.Bytes(), expr.Elements()
subQuery, subNode := bluge.NewBooleanQuery(), newShouldNode()
subQuery.SetMinShould(1)
for _, b := range bb {
subQuery.AddShould(bluge.NewTermQuery(string(b)).SetField(fieldKey))
}
for _, e := range elements {
subNode.Append(newTermNode(e, indexRule))
}
query, node := bluge.NewBooleanQuery(), newMustNotNode()
query.AddMustNot(subQuery)
node.SetSubNode(subNode)
return &queryNode{query, node}, nil
}
return nil, errors.WithMessagef(logical.ErrUnsupportedConditionOp, "index filter parses %v", cond)
}
type node interface {
fmt.Stringer
}
type mustNode struct {
subNodes []node
}
func newMustNode() *mustNode {
return &mustNode{
subNodes: make([]node, 0),
}
}
func (m *mustNode) Append(subNode node) {
m.subNodes = append(m.subNodes, subNode)
}
func (m *mustNode) MarshalJSON() ([]byte, error) {
data := make(map[string]interface{}, 1)
data["must"] = m.subNodes
return json.Marshal(data)
}
func (m *mustNode) String() string {
return convert.JSONToString(m)
}
type shouldNode struct {
subNodes []node
}
func newShouldNode() *shouldNode {
return &shouldNode{
subNodes: make([]node, 0),
}
}
func (s *shouldNode) Append(subNode node) {
s.subNodes = append(s.subNodes, subNode)
}
func (s *shouldNode) MarshalJSON() ([]byte, error) {
data := make(map[string]interface{}, 1)
data["should"] = s.subNodes
return json.Marshal(data)
}
func (s *shouldNode) String() string {
return convert.JSONToString(s)
}
type mustNotNode struct {
subNode node
}
func newMustNotNode() *mustNotNode {
return &mustNotNode{}
}
func (m *mustNotNode) SetSubNode(subNode node) {
m.subNode = subNode
}
func (m *mustNotNode) MarshalJSON() ([]byte, error) {
data := make(map[string]interface{}, 1)
data["mustNot"] = m.subNode
return json.Marshal(data)
}
func (m *mustNotNode) String() string {
return convert.JSONToString(m)
}
type matchAllNode struct{}
func newMatchAllNode() *matchAllNode {
return &matchAllNode{}
}
func (m *matchAllNode) String() string {
return "matchAll"
}
type termRangeInclusiveNode struct {
indexRule *databasev1.IndexRule
min string
max string
minInclusive bool
maxInclusive bool
isTimeRangeQuery bool
}
func newTermRangeInclusiveNode(minVal, maxVal string, minInclusive, maxInclusive bool, indexRule *databasev1.IndexRule, isTimeRangeQuery bool) *termRangeInclusiveNode {
return &termRangeInclusiveNode{
indexRule: indexRule,
min: minVal,
max: maxVal,
minInclusive: minInclusive,
maxInclusive: maxInclusive,
isTimeRangeQuery: isTimeRangeQuery,
}
}
func (t *termRangeInclusiveNode) MarshalJSON() ([]byte, error) {
inner := make(map[string]interface{}, 1)
var builder strings.Builder
if t.minInclusive {
builder.WriteString("[")
} else {
builder.WriteString("(")
}
builder.WriteString(t.min + " ")
builder.WriteString(t.max)
if t.maxInclusive {
builder.WriteString("]")
} else {
builder.WriteString(")")
}
inner["range"] = builder.String()
if t.indexRule != nil {
inner["index"] = t.indexRule.Metadata.Name + ":" + t.indexRule.Metadata.Group
}
if t.isTimeRangeQuery {
inner["queryType"] = timeRangeQuery
} else {
inner["queryType"] = termRangeQuery
}
data := make(map[string]interface{}, 1)
data["termRangeInclusive"] = inner
return json.Marshal(data)
}
func (t *termRangeInclusiveNode) String() string {
return convert.JSONToString(t)
}
type termNode struct {
indexRule *databasev1.IndexRule
term string
}
func newTermNode(term string, indexRule *databasev1.IndexRule) *termNode {
return &termNode{
indexRule: indexRule,
term: term,
}
}
func (t *termNode) MarshalJSON() ([]byte, error) {
inner := make(map[string]interface{}, 1)
if t.indexRule != nil {
inner["index"] = t.indexRule.Metadata.Name + ":" + t.indexRule.Metadata.Group
}
inner["value"] = t.term
data := make(map[string]interface{}, 1)
data["term"] = inner
return json.Marshal(data)
}
func (t *termNode) String() string {
return convert.JSONToString(t)
}
type matchNode struct {
indexRule *databasev1.IndexRule
match string
}
func newMatchNode(match string, indexRule *databasev1.IndexRule) *matchNode {
return &matchNode{
indexRule: indexRule,
match: match,
}
}
func (m *matchNode) MarshalJSON() ([]byte, error) {
inner := make(map[string]interface{}, 1)
inner["index"] = m.indexRule.Metadata.Name + ":" + m.indexRule.Metadata.Group
inner["value"] = m.match
inner["analyzer"] = m.indexRule.Analyzer
data := make(map[string]interface{}, 1)
data["match"] = inner
return json.Marshal(data)
}
func (m *matchNode) String() string {
return convert.JSONToString(m)
}
type prefixNode struct {
prefix string
}
func newPrefixNode(prefix string) *prefixNode {
return &prefixNode{
prefix: prefix,
}
}
func (m *prefixNode) MarshalJSON() ([]byte, error) {
data := make(map[string]interface{}, 1)
data["prefix"] = m.prefix
return json.Marshal(data)
}
func (m *prefixNode) String() string {
return convert.JSONToString(m)
}
type wildcardNode struct {
wildcard string
}
func newWildcardNode(wildcard string) *wildcardNode {
return &wildcardNode{
wildcard: wildcard,
}
}
func (m *wildcardNode) MarshalJSON() ([]byte, error) {
data := make(map[string]interface{}, 1)
data["wildcard"] = m.wildcard
return json.Marshal(data)
}
func (m *wildcardNode) String() string {
return convert.JSONToString(m)
}
type timeRangeNode struct {
timeRange *timestamp.TimeRange
}
func newTimeRangeNode(timeRange *timestamp.TimeRange) *timeRangeNode {
return &timeRangeNode{
timeRange: timeRange,
}
}
func (t *timeRangeNode) MarshalJSON() ([]byte, error) {
data := make(map[string]interface{}, 1)
data["time_range"] = t.timeRange.String()
return json.Marshal(data)
}
func (t *timeRangeNode) String() string {
return convert.JSONToString(t)
}
// BuildPropertyQuery returns blugeQuery for property query.
func BuildPropertyQuery(req *propertyv1.QueryRequest, groupField, idField string) (index.Query, error) {
iq, err := BuildIndexModeQuery(req.Name, req.Criteria, schemaInstance)
if err != nil {
return nil, err
}
bq := bluge.NewBooleanQuery()
bn := newMustNode()
if iq != nil {
iqn := iq.(*queryNode)
bq.AddMust(iqn.query)
bn.Append(iqn.node)
}
if len(req.Groups) > 1 {
gq := bluge.NewBooleanQuery()
gn := newShouldNode()
for _, g := range req.Groups {
gq.AddShould(bluge.NewTermQuery(g).SetField(groupField))
gn.Append(newTermNode(g, nil))
}
gq.SetMinShould(1)
bq.AddMust(gq)
bn.Append(gn)
} else {
bq.AddMust(bluge.NewTermQuery(req.Groups[0]).SetField(groupField))
bn.Append(newTermNode(req.Groups[0], nil))
}
switch len(req.Ids) {
case 0:
case 1:
bq.AddMust(bluge.NewTermQuery(req.Ids[0]).SetField(idField))
bn.Append(newTermNode(req.Ids[0], nil))
default:
iq := bluge.NewBooleanQuery()
in := newShouldNode()
for _, id := range req.Ids {
iq.AddShould(bluge.NewTermQuery(id).SetField(idField))
in.Append(newTermNode(id, nil))
}
iq.SetMinShould(1)
bq.AddMust(iq)
bn.Append(in)
}
return &queryNode{
query: bq,
node: bn,
}, nil
}
var (
_ logical.Schema = (*schema)(nil)
schemaInstance = &schema{}
)
type schema struct{}
func (p *schema) CreateFieldRef(...*logical.Field) ([]*logical.FieldRef, error) {
panic("unimplemented")
}
func (p *schema) CreateTagRef(...[]*logical.Tag) ([][]*logical.TagRef, error) {
panic("unimplemented")
}
func (p *schema) EntityList() []string {
return nil
}
func (p *schema) Equal(logical.Schema) bool {
panic("unimplemented")
}
func (p *schema) FindTagSpecByName(string) *logical.TagSpec {
panic("unimplemented")
}
func (p *schema) IndexDefined(tagName string) (bool, *databasev1.IndexRule) {
return true, &databasev1.IndexRule{
Metadata: &commonv1.Metadata{
Id: uint32(convert.HashStr(tagName)),
},
}
}
func (p *schema) IndexRuleDefined(string) (bool, *databasev1.IndexRule) {
return false, nil
}
func (p *schema) ProjFields(...*logical.FieldRef) logical.Schema {
panic("unimplemented")
}
func (p *schema) ProjTags(...[]*logical.TagRef) logical.Schema {
panic("unimplemented")
}