in pkg/wal/wal.go [172:232]
func New(path string, options *Options) (WAL, error) {
// Check configuration options.
walOptions := DefaultOptions
if options != nil {
fileSize := options.FileSize
if fileSize <= 0 {
fileSize = DefaultOptions.FileSize
}
bufferSize := options.BufferSize
if bufferSize <= 0 {
bufferSize = DefaultOptions.BufferSize
}
bufferBatchInterval := options.BufferBatchInterval
if bufferBatchInterval <= 0 {
bufferBatchInterval = DefaultOptions.BufferBatchInterval
}
walOptions = &Options{
FileSize: fileSize,
BufferSize: bufferSize,
BufferBatchInterval: bufferBatchInterval,
NoSync: options.NoSync,
}
}
// Initial WAL path.
path, err := filepath.Abs(path)
if err != nil {
return nil, errors.Wrap(err, "Can not get absolute path: "+path)
}
if err := os.MkdirAll(path, os.ModePerm); err != nil {
return nil, err
}
writeCloser := run.NewChannelCloser()
flushCloser := run.NewChannelCloser()
chanGroupCloser := run.NewChannelGroupCloser(writeCloser, flushCloser)
log := &log{
path: path,
options: *walOptions,
logger: logger.GetLogger(moduleName),
writeChannel: make(chan logRequest),
flushChannel: make(chan buffer, walOptions.FlushQueueSize),
bufferWriter: newBufferWriter(),
writeCloser: writeCloser,
flushCloser: flushCloser,
chanGroupCloser: chanGroupCloser,
buffer: buffer{
timestampMap: make(map[common.GlobalSeriesID][]time.Time),
valueMap: make(map[common.GlobalSeriesID][]byte),
callbackMap: make(map[common.GlobalSeriesID][]func(common.GlobalSeriesID, time.Time, []byte, error)),
count: 0,
},
}
if err := log.load(); err != nil {
return nil, err
}
log.start()
log.logger.Info().Str("path", path).Msg("WAL has be initialized")
return log, nil
}