in tsdb/db.go [727:914]
func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs []int64, stats *DBStats) (_ *DB, returnedErr error) {
if err := os.MkdirAll(dir, 0o777); err != nil {
return nil, err
}
if l == nil {
l = log.NewNopLogger()
}
if stats == nil {
stats = NewDBStats()
}
for i, v := range rngs {
if v > opts.MaxBlockDuration {
rngs = rngs[:i]
break
}
}
// Fixup bad format written by Prometheus 2.1.
if err := repairBadIndexVersion(l, dir); err != nil {
return nil, errors.Wrap(err, "repair bad index version")
}
walDir := filepath.Join(dir, "wal")
wblDir := filepath.Join(dir, wlog.WblDirName)
// Migrate old WAL if one exists.
if err := MigrateWAL(l, walDir); err != nil {
return nil, errors.Wrap(err, "migrate WAL")
}
for _, tmpDir := range []string{walDir, dir} {
// Remove tmp dirs.
if err := removeBestEffortTmpDirs(l, tmpDir); err != nil {
return nil, errors.Wrap(err, "remove tmp dirs")
}
}
db := &DB{
dir: dir,
logger: l,
opts: opts,
compactc: make(chan struct{}, 1),
donec: make(chan struct{}),
stopc: make(chan struct{}),
autoCompact: true,
chunkPool: chunkenc.NewPool(),
blocksToDelete: opts.BlocksToDelete,
registerer: r,
}
defer func() {
// Close files if startup fails somewhere.
if returnedErr == nil {
return
}
close(db.donec) // DB is never run if it was an error, so close this channel here.
returnedErr = tsdb_errors.NewMulti(
returnedErr,
errors.Wrap(db.Close(), "close DB after failed startup"),
).Err()
}()
if db.blocksToDelete == nil {
db.blocksToDelete = DefaultBlocksToDelete(db)
}
var err error
db.locker, err = tsdbutil.NewDirLocker(dir, "tsdb", db.logger, r)
if err != nil {
return nil, err
}
if !opts.NoLockfile {
if err := db.locker.Lock(); err != nil {
return nil, err
}
}
ctx, cancel := context.WithCancel(context.Background())
db.compactor, err = NewLeveledCompactorWithChunkSize(ctx, r, l, rngs, db.chunkPool, opts.MaxBlockChunkSegmentSize, nil)
if err != nil {
cancel()
return nil, errors.Wrap(err, "create leveled compactor")
}
db.compactCancel = cancel
var wal, wbl *wlog.WL
segmentSize := wlog.DefaultSegmentSize
// Wal is enabled.
if opts.WALSegmentSize >= 0 {
// Wal is set to a custom size.
if opts.WALSegmentSize > 0 {
segmentSize = opts.WALSegmentSize
}
wal, err = wlog.NewSize(l, r, walDir, segmentSize, opts.WALCompression)
if err != nil {
return nil, err
}
// Check if there is a WBL on disk, in which case we should replay that data.
wblSize, err := fileutil.DirSize(wblDir)
if err != nil && !os.IsNotExist(err) {
return nil, err
}
if opts.OutOfOrderTimeWindow > 0 || wblSize > 0 {
wbl, err = wlog.NewSize(l, r, wblDir, segmentSize, opts.WALCompression)
if err != nil {
return nil, err
}
}
}
db.oooWasEnabled.Store(opts.OutOfOrderTimeWindow > 0)
headOpts := DefaultHeadOptions()
headOpts.ChunkRange = rngs[0]
headOpts.ChunkDirRoot = dir
headOpts.ChunkPool = db.chunkPool
headOpts.ChunkWriteBufferSize = opts.HeadChunksWriteBufferSize
headOpts.ChunkWriteQueueSize = opts.HeadChunksWriteQueueSize
headOpts.SamplesPerChunk = opts.SamplesPerChunk
headOpts.StripeSize = opts.StripeSize
headOpts.SeriesCallback = opts.SeriesLifecycleCallback
headOpts.EnableExemplarStorage = opts.EnableExemplarStorage
headOpts.MaxExemplars.Store(opts.MaxExemplars)
headOpts.EnableMemorySnapshotOnShutdown = opts.EnableMemorySnapshotOnShutdown
headOpts.EnableNativeHistograms.Store(opts.EnableNativeHistograms)
headOpts.OutOfOrderTimeWindow.Store(opts.OutOfOrderTimeWindow)
headOpts.OutOfOrderCapMax.Store(opts.OutOfOrderCapMax)
if opts.WALReplayConcurrency > 0 {
headOpts.WALReplayConcurrency = opts.WALReplayConcurrency
}
if opts.IsolationDisabled {
// We only override this flag if isolation is disabled at DB level. We use the default otherwise.
headOpts.IsolationDisabled = opts.IsolationDisabled
}
db.head, err = NewHead(r, l, wal, wbl, headOpts, stats.Head)
if err != nil {
return nil, err
}
db.head.writeNotified = db.writeNotified
// Register metrics after assigning the head block.
db.metrics = newDBMetrics(db, r)
maxBytes := opts.MaxBytes
if maxBytes < 0 {
maxBytes = 0
}
db.metrics.maxBytes.Set(float64(maxBytes))
if err := db.reload(); err != nil {
return nil, err
}
// Set the min valid time for the ingested samples
// to be no lower than the maxt of the last block.
minValidTime := int64(math.MinInt64)
// We do not consider blocks created from out-of-order samples for Head's minValidTime
// since minValidTime is only for the in-order data and we do not want to discard unnecessary
// samples from the Head.
inOrderMaxTime, ok := db.inOrderBlocksMaxTime()
if ok {
minValidTime = inOrderMaxTime
}
if initErr := db.head.Init(minValidTime); initErr != nil {
db.head.metrics.walCorruptionsTotal.Inc()
isOOOErr := isErrLoadOOOWal(initErr)
if isOOOErr {
level.Warn(db.logger).Log("msg", "Encountered OOO WAL read error, attempting repair", "err", initErr)
if err := wbl.Repair(initErr); err != nil {
return nil, errors.Wrap(err, "repair corrupted OOO WAL")
}
level.Info(db.logger).Log("msg", "Successfully repaired OOO WAL")
} else {
level.Warn(db.logger).Log("msg", "Encountered WAL read error, attempting repair", "err", initErr)
if err := wal.Repair(initErr); err != nil {
return nil, errors.Wrap(err, "repair corrupted WAL")
}
level.Info(db.logger).Log("msg", "Successfully repaired WAL")
}
}
if db.head.MinOOOTime() != int64(math.MaxInt64) {
// Some OOO data was replayed from the disk that needs compaction and cleanup.
db.oooWasEnabled.Store(true)
}
go db.run()
return db, nil
}