func New()

in pkg/wal/wal.go [172:232]


func New(path string, options *Options) (WAL, error) {
	//  Check configuration options.
	walOptions := DefaultOptions
	if options != nil {
		fileSize := options.FileSize
		if fileSize <= 0 {
			fileSize = DefaultOptions.FileSize
		}
		bufferSize := options.BufferSize
		if bufferSize <= 0 {
			bufferSize = DefaultOptions.BufferSize
		}
		bufferBatchInterval := options.BufferBatchInterval
		if bufferBatchInterval <= 0 {
			bufferBatchInterval = DefaultOptions.BufferBatchInterval
		}
		walOptions = &Options{
			FileSize:            fileSize,
			BufferSize:          bufferSize,
			BufferBatchInterval: bufferBatchInterval,
			NoSync:              options.NoSync,
		}
	}

	// Initial WAL path.
	path, err := filepath.Abs(path)
	if err != nil {
		return nil, errors.Wrap(err, "Can not get absolute path: "+path)
	}
	if err := os.MkdirAll(path, os.ModePerm); err != nil {
		return nil, err
	}

	writeCloser := run.NewChannelCloser()
	flushCloser := run.NewChannelCloser()
	chanGroupCloser := run.NewChannelGroupCloser(writeCloser, flushCloser)
	log := &log{
		path:            path,
		options:         *walOptions,
		logger:          logger.GetLogger(moduleName),
		writeChannel:    make(chan logRequest),
		flushChannel:    make(chan buffer, walOptions.FlushQueueSize),
		bufferWriter:    newBufferWriter(),
		writeCloser:     writeCloser,
		flushCloser:     flushCloser,
		chanGroupCloser: chanGroupCloser,
		buffer: buffer{
			timestampMap: make(map[common.GlobalSeriesID][]time.Time),
			valueMap:     make(map[common.GlobalSeriesID][]byte),
			callbackMap:  make(map[common.GlobalSeriesID][]func(common.GlobalSeriesID, time.Time, []byte, error)),
			count:        0,
		},
	}
	if err := log.load(); err != nil {
		return nil, err
	}
	log.start()

	log.logger.Info().Str("path", path).Msg("WAL has be initialized")
	return log, nil
}