in banyand/tsdb/index/writer.go [136:193]
func (s *Writer) writeGlobalIndex(scope tsdb.Entry, ref tsdb.GlobalItemID, value Value) error {
collect := func(ruleIndexes []*partition.IndexRuleLocator, fn func(indexWriter tsdb.IndexWriter, fields []index.Field) error) error {
fields := make(map[uint][]index.Field)
for _, ruleIndex := range ruleIndexes {
values, err := getIndexValue(ruleIndex, value)
if err != nil {
return err
}
if values == nil {
continue
}
for _, val := range values {
indexShardID, err := partition.ShardID(val, s.shardNum)
if err != nil {
return err
}
rule := ruleIndex.Rule
rr := fields[indexShardID]
rr = append(rr, index.Field{
Key: index.FieldKey{
IndexRuleID: rule.GetMetadata().GetId(),
Analyzer: rule.Analyzer,
},
Term: val,
})
fields[indexShardID] = rr
}
}
for shardID, rules := range fields {
shard, err := s.db.SupplyTSDB().CreateShardsAndGetByID(common.ShardID(shardID))
if err != nil {
return err
}
builder := shard.Index().WriterBuilder()
indexWriter, err := builder.
Scope(scope).
GlobalItemID(ref).
Time(value.Timestamp).
Build()
if err != nil {
return err
}
err = fn(indexWriter, rules)
if err != nil {
return err
}
}
return nil
}
return multierr.Combine(
collect(s.invertRuleIndex[global|inverted], func(indexWriter tsdb.IndexWriter, fields []index.Field) error {
return indexWriter.WriteInvertedIndex(fields)
}),
collect(s.invertRuleIndex[global|tree], func(indexWriter tsdb.IndexWriter, fields []index.Field) error {
return indexWriter.WriteLSMIndex(fields)
}),
)
}