in pkg/index/inverted/inverted.go [292:357]
func (s *store) run() {
go func() {
defer s.closer.Done()
size := 0
batch := bluge.NewBatch()
flush := func() {
if size < 1 {
return
}
if err := s.writer.Batch(batch); err != nil {
s.l.Error().Err(err).Msg("write to the inverted index")
}
batch.Reset()
size = 0
}
var docIDBuffer bytes.Buffer
for {
timer := time.NewTimer(s.batchInterval)
select {
case <-s.closer.CloseNotify():
timer.Stop()
return
case event, more := <-s.ch:
if !more {
timer.Stop()
return
}
switch d := event.(type) {
case flushEvent:
flush()
close(d.onComplete)
case doc:
// TODO: generate a segment directly.
fk := d.fields[0].Key
docIDBuffer.Reset()
if fk.HasSeriesID() {
docIDBuffer.Write(fk.SeriesID.Marshal())
}
docIDBuffer.Write(convert.Uint64ToBytes(d.docID))
doc := bluge.NewDocument(docIDBuffer.String())
toAddSeriesIDField := false
for _, f := range d.fields {
if f.Key.Analyzer == databasev1.IndexRule_ANALYZER_UNSPECIFIED {
doc.AddField(bluge.NewKeywordFieldBytes(f.Key.MarshalIndexRule(), f.Marshal()).StoreValue().Sortable())
} else {
toAddSeriesIDField = true
doc.AddField(bluge.NewKeywordFieldBytes(f.Key.MarshalIndexRule(), f.Term).StoreValue().Sortable().
WithAnalyzer(analyzers[f.Key.Analyzer]))
}
}
if toAddSeriesIDField && fk.HasSeriesID() {
doc.AddField(bluge.NewKeywordFieldBytes(seriesIDField, fk.SeriesID.Marshal()))
}
size++
batch.Update(doc.ID(), doc)
if size >= batchSize {
flush()
}
}
case <-timer.C:
flush()
}
timer.Stop()
}
}()
}