in pkg/wal/wal.go [339:430]
func (log *log) start() {
var initialTasks sync.WaitGroup
initialTasks.Add(2)
go func() {
if !log.writeCloser.AddReceiver() {
panic("writeCloser already closed")
}
defer log.writeCloser.ReceiverDone()
log.logger.Info().Msg("Start batch task...")
initialTasks.Done()
bufferVolume := 0
for {
timer := time.NewTimer(log.options.BufferBatchInterval)
select {
case request, chOpen := <-log.writeChannel:
if !chOpen {
timer.Stop()
log.logger.Info().Msg("Stop batch task when write-channel closed")
return
}
log.buffer.write(request)
if log.logger.Debug().Enabled() {
log.logger.Debug().Msg("Write request to buffer. elements: " + strconv.Itoa(log.buffer.count))
}
bufferVolume += request.seriesID.Volume() + timestampVolumeLength + len(request.data)
if bufferVolume > log.options.BufferSize {
log.triggerFlushing()
bufferVolume = 0
}
case <-timer.C:
if bufferVolume == 0 {
continue
}
log.triggerFlushing()
bufferVolume = 0
case <-log.writeCloser.CloseNotify():
timer.Stop()
log.logger.Info().Msg("Stop batch task when close notify")
return
}
timer.Stop()
}
}()
go func() {
if !log.flushCloser.AddReceiver() {
panic("flushCloser already closed")
}
defer log.flushCloser.ReceiverDone()
log.logger.Info().Msg("Start flush task...")
initialTasks.Done()
for {
select {
case batch, chOpen := <-log.flushChannel:
if !chOpen {
log.logger.Info().Msg("Stop flush task when flush-channel closed")
return
}
startTime := time.Now()
var err error
for i := 0; i < maxRetries; i++ {
if err = log.flushBuffer(batch); err != nil {
log.logger.Err(err).Msg("Flushing buffer failed. Retrying...")
time.Sleep(time.Second)
continue
}
break
}
if log.logger.Debug().Enabled() {
log.logger.Debug().Msg("Flushed buffer to WAL file. elements: " +
strconv.Itoa(batch.count) + ", cost: " + time.Since(startTime).String())
}
batch.notifyRequests(err)
case <-log.flushCloser.CloseNotify():
log.logger.Info().Msg("Stop flush task when close notify")
return
}
}
}()
initialTasks.Wait()
log.logger.Info().Msg("Started WAL")
}