func()

in banyand/tsdb/index/writer.go [136:193]


func (s *Writer) writeGlobalIndex(scope tsdb.Entry, ref tsdb.GlobalItemID, value Value) error {
	collect := func(ruleIndexes []*partition.IndexRuleLocator, fn func(indexWriter tsdb.IndexWriter, fields []index.Field) error) error {
		fields := make(map[uint][]index.Field)
		for _, ruleIndex := range ruleIndexes {
			values, err := getIndexValue(ruleIndex, value)
			if err != nil {
				return err
			}
			if values == nil {
				continue
			}
			for _, val := range values {
				indexShardID, err := partition.ShardID(val, s.shardNum)
				if err != nil {
					return err
				}
				rule := ruleIndex.Rule
				rr := fields[indexShardID]
				rr = append(rr, index.Field{
					Key: index.FieldKey{
						IndexRuleID: rule.GetMetadata().GetId(),
						Analyzer:    rule.Analyzer,
					},
					Term: val,
				})
				fields[indexShardID] = rr
			}
		}
		for shardID, rules := range fields {
			shard, err := s.db.SupplyTSDB().CreateShardsAndGetByID(common.ShardID(shardID))
			if err != nil {
				return err
			}
			builder := shard.Index().WriterBuilder()
			indexWriter, err := builder.
				Scope(scope).
				GlobalItemID(ref).
				Time(value.Timestamp).
				Build()
			if err != nil {
				return err
			}
			err = fn(indexWriter, rules)
			if err != nil {
				return err
			}
		}
		return nil
	}
	return multierr.Combine(
		collect(s.invertRuleIndex[global|inverted], func(indexWriter tsdb.IndexWriter, fields []index.Field) error {
			return indexWriter.WriteInvertedIndex(fields)
		}),
		collect(s.invertRuleIndex[global|tree], func(indexWriter tsdb.IndexWriter, fields []index.Field) error {
			return indexWriter.WriteLSMIndex(fields)
		}),
	)
}