func()

in pkg/index/inverted/inverted.go [292:357]


func (s *store) run() {
	go func() {
		defer s.closer.Done()
		size := 0
		batch := bluge.NewBatch()
		flush := func() {
			if size < 1 {
				return
			}
			if err := s.writer.Batch(batch); err != nil {
				s.l.Error().Err(err).Msg("write to the inverted index")
			}
			batch.Reset()
			size = 0
		}
		var docIDBuffer bytes.Buffer
		for {
			timer := time.NewTimer(s.batchInterval)
			select {
			case <-s.closer.CloseNotify():
				timer.Stop()
				return
			case event, more := <-s.ch:
				if !more {
					timer.Stop()
					return
				}
				switch d := event.(type) {
				case flushEvent:
					flush()
					close(d.onComplete)
				case doc:
					// TODO: generate a segment directly.
					fk := d.fields[0].Key
					docIDBuffer.Reset()
					if fk.HasSeriesID() {
						docIDBuffer.Write(fk.SeriesID.Marshal())
					}
					docIDBuffer.Write(convert.Uint64ToBytes(d.docID))
					doc := bluge.NewDocument(docIDBuffer.String())
					toAddSeriesIDField := false
					for _, f := range d.fields {
						if f.Key.Analyzer == databasev1.IndexRule_ANALYZER_UNSPECIFIED {
							doc.AddField(bluge.NewKeywordFieldBytes(f.Key.MarshalIndexRule(), f.Marshal()).StoreValue().Sortable())
						} else {
							toAddSeriesIDField = true
							doc.AddField(bluge.NewKeywordFieldBytes(f.Key.MarshalIndexRule(), f.Term).StoreValue().Sortable().
								WithAnalyzer(analyzers[f.Key.Analyzer]))
						}
					}
					if toAddSeriesIDField && fk.HasSeriesID() {
						doc.AddField(bluge.NewKeywordFieldBytes(seriesIDField, fk.SeriesID.Marshal()))
					}
					size++
					batch.Update(doc.ID(), doc)
					if size >= batchSize {
						flush()
					}
				}
			case <-timer.C:
				flush()
			}
			timer.Stop()
		}
	}()
}