in processor/lsmintervalprocessor/processor.go [357:391]
func (p *Processor) mergeToBatch(mb *mergeBuffer, clientMetadata []merger.KeyValues) (err error) {
p.mu.Lock()
defer p.mu.Unlock()
if p.batch == nil {
p.batch = newBatch(p.db)
}
for _, ivl := range p.intervals {
key := merger.Key{
Interval: ivl.Duration,
ProcessingTime: p.processingTime,
Metadata: clientMetadata,
}
var err error
mb.key, err = key.AppendBinary(mb.key[:0])
if err != nil {
return fmt.Errorf("failed to marshal key to binary for ivl %s: %w", ivl.Duration, err)
}
if err := p.batch.Merge(mb.key, mb.value, nil); err != nil {
return fmt.Errorf("failed to merge to db: %w", err)
}
}
if p.batch.Len() >= dbCommitThresholdBytes {
if err := p.batch.Commit(p.wOpts); err != nil {
return fmt.Errorf("failed to commit a batch to db: %w", err)
}
if err := p.batch.Close(); err != nil {
return fmt.Errorf("failed to close a batch post commit: %w", err)
}
p.batch = nil
}
return nil
}