banyand/tsdb/index/writer.go (228 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// Package index implements transferring data to indices.
package index
import (
"context"
"io"
"time"
"github.com/pkg/errors"
"go.uber.org/multierr"
"github.com/apache/skywalking-banyandb/api/common"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/tsdb"
"github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/partition"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
)
// Message wraps value and other info to generate relative indices.
type Message struct {
IndexWriter tsdb.IndexWriter
BlockCloser io.Closer
Value Value
Scope tsdb.Entry
GlobalItemID tsdb.GlobalItemID
}
// Value represents the input data for generating indices.
type Value struct {
Timestamp time.Time
TagFamilies []*modelv1.TagFamilyForWrite
}
// WriterOptions wrap all options to create an index writer.
type WriterOptions struct {
DB tsdb.Supplier
Families []*databasev1.TagFamilySpec
IndexRules []*databasev1.IndexRule
ShardNum uint32
EnableGlobalIndex bool
}
const (
local = 1 << iota
global
inverted
tree
)
// Writer generates indices based on index rules.
type Writer struct {
db tsdb.Supplier
l *logger.Logger
invertRuleIndex map[byte][]*partition.IndexRuleLocator
shardNum uint32
enableGlobalIndex bool
}
// NewWriter returns a new Writer with WriterOptions.
func NewWriter(ctx context.Context, options WriterOptions) *Writer {
w := new(Writer)
parentLogger := ctx.Value(logger.ContextKey)
if parentLogger != nil {
if pl, ok := parentLogger.(*logger.Logger); ok {
w.l = pl.Named("index-writer")
}
}
w.shardNum = options.ShardNum
w.db = options.DB
w.enableGlobalIndex = options.EnableGlobalIndex
w.invertRuleIndex = make(map[byte][]*partition.IndexRuleLocator, 4)
for _, ruleIndex := range partition.ParseIndexRuleLocators(options.Families, options.IndexRules) {
rule := ruleIndex.Rule
var key byte
switch rule.GetLocation() {
case databasev1.IndexRule_LOCATION_SERIES:
key |= local
case databasev1.IndexRule_LOCATION_GLOBAL:
if !w.enableGlobalIndex {
w.l.Warn().RawJSON("index-rule", logger.Proto(ruleIndex.Rule)).Msg("global index is disabled")
continue
}
key |= global
case databasev1.IndexRule_LOCATION_UNSPECIFIED:
w.l.Warn().RawJSON("index-rule", logger.Proto(ruleIndex.Rule)).Msg("invalid:unspecified location")
continue
}
switch rule.Type {
case databasev1.IndexRule_TYPE_INVERTED:
key |= inverted
case databasev1.IndexRule_TYPE_TREE:
key |= tree
case databasev1.IndexRule_TYPE_UNSPECIFIED:
w.l.Warn().RawJSON("index-rule", logger.Proto(ruleIndex.Rule)).Msg("invalid:unspecified type")
continue
}
rules := w.invertRuleIndex[key]
rules = append(rules, ruleIndex)
w.invertRuleIndex[key] = rules
}
return w
}
func (s *Writer) Write(m Message) {
err := multierr.Combine(
s.writeLocalIndex(m.IndexWriter, m.Value),
s.writeGlobalIndex(m.Scope, m.GlobalItemID, m.Value),
m.BlockCloser.Close(),
)
if err != nil {
s.l.Error().Err(err).Msg("encounter some errors when generating indices")
}
}
// TODO: should listen to pipeline in a distributed cluster.
func (s *Writer) writeGlobalIndex(scope tsdb.Entry, ref tsdb.GlobalItemID, value Value) error {
collect := func(ruleIndexes []*partition.IndexRuleLocator, fn func(indexWriter tsdb.IndexWriter, fields []index.Field) error) error {
fields := make(map[uint][]index.Field)
for _, ruleIndex := range ruleIndexes {
values, err := getIndexValue(ruleIndex, value)
if err != nil {
return err
}
if values == nil {
continue
}
for _, val := range values {
indexShardID, err := partition.ShardID(val, s.shardNum)
if err != nil {
return err
}
rule := ruleIndex.Rule
rr := fields[indexShardID]
rr = append(rr, index.Field{
Key: index.FieldKey{
IndexRuleID: rule.GetMetadata().GetId(),
Analyzer: rule.Analyzer,
},
Term: val,
})
fields[indexShardID] = rr
}
}
for shardID, rules := range fields {
shard, err := s.db.SupplyTSDB().CreateShardsAndGetByID(common.ShardID(shardID))
if err != nil {
return err
}
builder := shard.Index().WriterBuilder()
indexWriter, err := builder.
Scope(scope).
GlobalItemID(ref).
Time(value.Timestamp).
Build()
if err != nil {
return err
}
err = fn(indexWriter, rules)
if err != nil {
return err
}
}
return nil
}
return multierr.Combine(
collect(s.invertRuleIndex[global|inverted], func(indexWriter tsdb.IndexWriter, fields []index.Field) error {
return indexWriter.WriteInvertedIndex(fields)
}),
collect(s.invertRuleIndex[global|tree], func(indexWriter tsdb.IndexWriter, fields []index.Field) error {
return indexWriter.WriteLSMIndex(fields)
}),
)
}
func (s *Writer) writeLocalIndex(writer tsdb.IndexWriter, value Value) (err error) {
collect := func(ruleIndexes []*partition.IndexRuleLocator, fn func(fields []index.Field) error) error {
fields := make([]index.Field, 0)
for _, ruleIndex := range ruleIndexes {
values, err := getIndexValue(ruleIndex, value)
if err != nil {
return err
}
if values == nil {
continue
}
for _, val := range values {
rule := ruleIndex.Rule
fields = append(fields, index.Field{
Key: index.FieldKey{
IndexRuleID: rule.GetMetadata().GetId(),
Analyzer: rule.Analyzer,
},
Term: val,
})
}
}
if len(fields) == 0 {
return nil
}
return fn(fields)
}
return multierr.Combine(
collect(s.invertRuleIndex[local|inverted], func(fields []index.Field) error {
return writer.WriteInvertedIndex(fields)
}),
collect(s.invertRuleIndex[local|tree], func(fields []index.Field) error {
return writer.WriteLSMIndex(fields)
}),
)
}
var errUnsupportedIndexType = errors.New("unsupported index type")
func getIndexValue(ruleIndex *partition.IndexRuleLocator, value Value) (val [][]byte, err error) {
val = make([][]byte, 0)
if len(ruleIndex.TagIndices) != 1 {
return nil, errors.WithMessagef(errUnsupportedIndexType,
"the index rule %s(%v) didn't support composited tags",
ruleIndex.Rule.Metadata.Name, ruleIndex.Rule.Tags)
}
tIndex := ruleIndex.TagIndices[0]
tag, err := partition.GetTagByOffset(value.TagFamilies, tIndex.FamilyOffset, tIndex.TagOffset)
if errors.Is(err, partition.ErrMalformedElement) {
return val, nil
}
if err != nil {
return nil, errors.WithMessagef(err, "index rule:%v", ruleIndex.Rule.Metadata)
}
fv, err := pbv1.ParseTagValue(tag)
if err != nil {
return nil, err
}
if fv.GetValue() == nil && fv.GetArr() == nil {
return nil, nil
}
v := fv.GetValue()
if v != nil {
val = append(val, v)
return val, nil
}
arr := fv.GetArr()
if arr != nil {
val = append(val, arr...)
}
return val, nil
}