in processor/lsmintervalprocessor/processor.go [150:194]
func (p *Processor) Start(ctx context.Context, host component.Host) error {
p.mu.Lock()
if p.db == nil {
db, err := pebble.Open(p.dataDir, p.dbOpts)
if err != nil {
return fmt.Errorf("failed to open database: %w", err)
}
p.db = db
}
if p.exportStopped == nil {
p.exportStopped = make(chan struct{})
}
p.mu.Unlock()
go func() {
defer close(p.exportStopped)
to := p.processingTime.Add(p.intervals[0].Duration)
timer := time.NewTimer(time.Until(to))
defer timer.Stop()
for {
select {
case <-p.ctx.Done():
return
case <-timer.C:
}
p.mu.Lock()
batch := p.batch
p.batch = nil
p.processingTime = to
p.mu.Unlock()
// Export the batch
if err := p.commitAndExport(p.ctx, batch, to); err != nil {
p.logger.Warn("failed to export", zap.Error(err), zap.Time("end_time", to))
}
to = to.Add(p.intervals[0].Duration)
timer.Reset(time.Until(to))
}
}()
return nil
}