func()

in processor/lsmintervalprocessor/processor.go [357:391]


func (p *Processor) mergeToBatch(mb *mergeBuffer, clientMetadata []merger.KeyValues) (err error) {
	p.mu.Lock()
	defer p.mu.Unlock()

	if p.batch == nil {
		p.batch = newBatch(p.db)
	}

	for _, ivl := range p.intervals {
		key := merger.Key{
			Interval:       ivl.Duration,
			ProcessingTime: p.processingTime,
			Metadata:       clientMetadata,
		}
		var err error
		mb.key, err = key.AppendBinary(mb.key[:0])
		if err != nil {
			return fmt.Errorf("failed to marshal key to binary for ivl %s: %w", ivl.Duration, err)
		}
		if err := p.batch.Merge(mb.key, mb.value, nil); err != nil {
			return fmt.Errorf("failed to merge to db: %w", err)
		}
	}

	if p.batch.Len() >= dbCommitThresholdBytes {
		if err := p.batch.Commit(p.wOpts); err != nil {
			return fmt.Errorf("failed to commit a batch to db: %w", err)
		}
		if err := p.batch.Close(); err != nil {
			return fmt.Errorf("failed to close a batch post commit: %w", err)
		}
		p.batch = nil
	}
	return nil
}