func()

in pkg/wal/wal.go [339:430]


func (log *log) start() {
	var initialTasks sync.WaitGroup
	initialTasks.Add(2)

	go func() {
		if !log.writeCloser.AddReceiver() {
			panic("writeCloser already closed")
		}
		defer log.writeCloser.ReceiverDone()

		log.logger.Info().Msg("Start batch task...")
		initialTasks.Done()

		bufferVolume := 0
		for {
			timer := time.NewTimer(log.options.BufferBatchInterval)
			select {
			case request, chOpen := <-log.writeChannel:
				if !chOpen {
					timer.Stop()
					log.logger.Info().Msg("Stop batch task when write-channel closed")
					return
				}

				log.buffer.write(request)
				if log.logger.Debug().Enabled() {
					log.logger.Debug().Msg("Write request to buffer. elements: " + strconv.Itoa(log.buffer.count))
				}

				bufferVolume += request.seriesID.Volume() + timestampVolumeLength + len(request.data)
				if bufferVolume > log.options.BufferSize {
					log.triggerFlushing()
					bufferVolume = 0
				}
			case <-timer.C:
				if bufferVolume == 0 {
					continue
				}
				log.triggerFlushing()
				bufferVolume = 0
			case <-log.writeCloser.CloseNotify():
				timer.Stop()
				log.logger.Info().Msg("Stop batch task when close notify")
				return
			}
			timer.Stop()
		}
	}()

	go func() {
		if !log.flushCloser.AddReceiver() {
			panic("flushCloser already closed")
		}
		defer log.flushCloser.ReceiverDone()

		log.logger.Info().Msg("Start flush task...")
		initialTasks.Done()

		for {
			select {
			case batch, chOpen := <-log.flushChannel:
				if !chOpen {
					log.logger.Info().Msg("Stop flush task when flush-channel closed")
					return
				}

				startTime := time.Now()
				var err error
				for i := 0; i < maxRetries; i++ {
					if err = log.flushBuffer(batch); err != nil {
						log.logger.Err(err).Msg("Flushing buffer failed. Retrying...")
						time.Sleep(time.Second)
						continue
					}
					break
				}
				if log.logger.Debug().Enabled() {
					log.logger.Debug().Msg("Flushed buffer to WAL file. elements: " +
						strconv.Itoa(batch.count) + ", cost: " + time.Since(startTime).String())
				}

				batch.notifyRequests(err)
			case <-log.flushCloser.CloseNotify():
				log.logger.Info().Msg("Stop flush task when close notify")
				return
			}
		}
	}()

	initialTasks.Wait()
	log.logger.Info().Msg("Started WAL")
}