in v1storage/persistence.go [1396:1499]
func (p *persistence) processIndexingQueue() {
batchSize := 0
nameToValues := index.LabelNameLabelValuesMapping{}
pairToFPs := index.LabelPairFingerprintsMapping{}
batchTimeout := time.NewTimer(indexingBatchTimeout)
defer batchTimeout.Stop()
commitBatch := func() {
p.indexingBatchSizes.Observe(float64(batchSize))
defer func(begin time.Time) {
p.indexingBatchDuration.Observe(time.Since(begin).Seconds())
}(time.Now())
if err := p.labelPairToFingerprints.IndexBatch(pairToFPs); err != nil {
log.Error("Error indexing label pair to fingerprints batch: ", err)
p.setDirty(err)
}
if err := p.labelNameToLabelValues.IndexBatch(nameToValues); err != nil {
log.Error("Error indexing label name to label values batch: ", err)
p.setDirty(err)
}
batchSize = 0
nameToValues = index.LabelNameLabelValuesMapping{}
pairToFPs = index.LabelPairFingerprintsMapping{}
batchTimeout.Reset(indexingBatchTimeout)
}
var flush chan chan int
loop:
for {
// Only process flush requests if the queue is currently empty.
if len(p.indexingQueue) == 0 {
flush = p.indexingFlush
} else {
flush = nil
}
select {
case <-batchTimeout.C:
// Only commit if we have something to commit _and_
// nothing is waiting in the queue to be picked up. That
// prevents a death spiral if the LookupSet calls below
// are slow for some reason.
if batchSize > 0 && len(p.indexingQueue) == 0 {
commitBatch()
} else {
batchTimeout.Reset(indexingBatchTimeout)
}
case r := <-flush:
if batchSize > 0 {
commitBatch()
}
r <- len(p.indexingQueue)
case op, ok := <-p.indexingQueue:
if !ok {
if batchSize > 0 {
commitBatch()
}
break loop
}
batchSize++
for ln, lv := range op.metric {
lp := model.LabelPair{Name: ln, Value: lv}
baseFPs, ok := pairToFPs[lp]
if !ok {
var err error
baseFPs, _, err = p.labelPairToFingerprints.LookupSet(lp)
if err != nil {
log.Errorf("Error looking up label pair %v: %s", lp, err)
continue
}
pairToFPs[lp] = baseFPs
}
baseValues, ok := nameToValues[ln]
if !ok {
var err error
baseValues, _, err = p.labelNameToLabelValues.LookupSet(ln)
if err != nil {
log.Errorf("Error looking up label name %v: %s", ln, err)
continue
}
nameToValues[ln] = baseValues
}
switch op.opType {
case add:
baseFPs[op.fingerprint] = struct{}{}
baseValues[lv] = struct{}{}
case remove:
delete(baseFPs, op.fingerprint)
if len(baseFPs) == 0 {
delete(baseValues, lv)
}
default:
panic("unknown op type")
}
}
if batchSize >= indexingMaxBatchSize {
commitBatch()
}
}
}
close(p.indexingStopped)
}