func open()

in tsdb/db.go [727:914]


func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs []int64, stats *DBStats) (_ *DB, returnedErr error) {
	if err := os.MkdirAll(dir, 0o777); err != nil {
		return nil, err
	}
	if l == nil {
		l = log.NewNopLogger()
	}
	if stats == nil {
		stats = NewDBStats()
	}

	for i, v := range rngs {
		if v > opts.MaxBlockDuration {
			rngs = rngs[:i]
			break
		}
	}

	// Fixup bad format written by Prometheus 2.1.
	if err := repairBadIndexVersion(l, dir); err != nil {
		return nil, errors.Wrap(err, "repair bad index version")
	}

	walDir := filepath.Join(dir, "wal")
	wblDir := filepath.Join(dir, wlog.WblDirName)

	// Migrate old WAL if one exists.
	if err := MigrateWAL(l, walDir); err != nil {
		return nil, errors.Wrap(err, "migrate WAL")
	}
	for _, tmpDir := range []string{walDir, dir} {
		// Remove tmp dirs.
		if err := removeBestEffortTmpDirs(l, tmpDir); err != nil {
			return nil, errors.Wrap(err, "remove tmp dirs")
		}
	}

	db := &DB{
		dir:            dir,
		logger:         l,
		opts:           opts,
		compactc:       make(chan struct{}, 1),
		donec:          make(chan struct{}),
		stopc:          make(chan struct{}),
		autoCompact:    true,
		chunkPool:      chunkenc.NewPool(),
		blocksToDelete: opts.BlocksToDelete,
		registerer:     r,
	}
	defer func() {
		// Close files if startup fails somewhere.
		if returnedErr == nil {
			return
		}

		close(db.donec) // DB is never run if it was an error, so close this channel here.

		returnedErr = tsdb_errors.NewMulti(
			returnedErr,
			errors.Wrap(db.Close(), "close DB after failed startup"),
		).Err()
	}()

	if db.blocksToDelete == nil {
		db.blocksToDelete = DefaultBlocksToDelete(db)
	}

	var err error
	db.locker, err = tsdbutil.NewDirLocker(dir, "tsdb", db.logger, r)
	if err != nil {
		return nil, err
	}
	if !opts.NoLockfile {
		if err := db.locker.Lock(); err != nil {
			return nil, err
		}
	}

	ctx, cancel := context.WithCancel(context.Background())
	db.compactor, err = NewLeveledCompactorWithChunkSize(ctx, r, l, rngs, db.chunkPool, opts.MaxBlockChunkSegmentSize, nil)
	if err != nil {
		cancel()
		return nil, errors.Wrap(err, "create leveled compactor")
	}
	db.compactCancel = cancel

	var wal, wbl *wlog.WL
	segmentSize := wlog.DefaultSegmentSize
	// Wal is enabled.
	if opts.WALSegmentSize >= 0 {
		// Wal is set to a custom size.
		if opts.WALSegmentSize > 0 {
			segmentSize = opts.WALSegmentSize
		}
		wal, err = wlog.NewSize(l, r, walDir, segmentSize, opts.WALCompression)
		if err != nil {
			return nil, err
		}
		// Check if there is a WBL on disk, in which case we should replay that data.
		wblSize, err := fileutil.DirSize(wblDir)
		if err != nil && !os.IsNotExist(err) {
			return nil, err
		}
		if opts.OutOfOrderTimeWindow > 0 || wblSize > 0 {
			wbl, err = wlog.NewSize(l, r, wblDir, segmentSize, opts.WALCompression)
			if err != nil {
				return nil, err
			}
		}
	}
	db.oooWasEnabled.Store(opts.OutOfOrderTimeWindow > 0)
	headOpts := DefaultHeadOptions()
	headOpts.ChunkRange = rngs[0]
	headOpts.ChunkDirRoot = dir
	headOpts.ChunkPool = db.chunkPool
	headOpts.ChunkWriteBufferSize = opts.HeadChunksWriteBufferSize
	headOpts.ChunkWriteQueueSize = opts.HeadChunksWriteQueueSize
	headOpts.SamplesPerChunk = opts.SamplesPerChunk
	headOpts.StripeSize = opts.StripeSize
	headOpts.SeriesCallback = opts.SeriesLifecycleCallback
	headOpts.EnableExemplarStorage = opts.EnableExemplarStorage
	headOpts.MaxExemplars.Store(opts.MaxExemplars)
	headOpts.EnableMemorySnapshotOnShutdown = opts.EnableMemorySnapshotOnShutdown
	headOpts.EnableNativeHistograms.Store(opts.EnableNativeHistograms)
	headOpts.OutOfOrderTimeWindow.Store(opts.OutOfOrderTimeWindow)
	headOpts.OutOfOrderCapMax.Store(opts.OutOfOrderCapMax)
	if opts.WALReplayConcurrency > 0 {
		headOpts.WALReplayConcurrency = opts.WALReplayConcurrency
	}
	if opts.IsolationDisabled {
		// We only override this flag if isolation is disabled at DB level. We use the default otherwise.
		headOpts.IsolationDisabled = opts.IsolationDisabled
	}
	db.head, err = NewHead(r, l, wal, wbl, headOpts, stats.Head)
	if err != nil {
		return nil, err
	}
	db.head.writeNotified = db.writeNotified

	// Register metrics after assigning the head block.
	db.metrics = newDBMetrics(db, r)
	maxBytes := opts.MaxBytes
	if maxBytes < 0 {
		maxBytes = 0
	}
	db.metrics.maxBytes.Set(float64(maxBytes))

	if err := db.reload(); err != nil {
		return nil, err
	}
	// Set the min valid time for the ingested samples
	// to be no lower than the maxt of the last block.
	minValidTime := int64(math.MinInt64)
	// We do not consider blocks created from out-of-order samples for Head's minValidTime
	// since minValidTime is only for the in-order data and we do not want to discard unnecessary
	// samples from the Head.
	inOrderMaxTime, ok := db.inOrderBlocksMaxTime()
	if ok {
		minValidTime = inOrderMaxTime
	}

	if initErr := db.head.Init(minValidTime); initErr != nil {
		db.head.metrics.walCorruptionsTotal.Inc()
		isOOOErr := isErrLoadOOOWal(initErr)
		if isOOOErr {
			level.Warn(db.logger).Log("msg", "Encountered OOO WAL read error, attempting repair", "err", initErr)
			if err := wbl.Repair(initErr); err != nil {
				return nil, errors.Wrap(err, "repair corrupted OOO WAL")
			}
			level.Info(db.logger).Log("msg", "Successfully repaired OOO WAL")
		} else {
			level.Warn(db.logger).Log("msg", "Encountered WAL read error, attempting repair", "err", initErr)
			if err := wal.Repair(initErr); err != nil {
				return nil, errors.Wrap(err, "repair corrupted WAL")
			}
			level.Info(db.logger).Log("msg", "Successfully repaired WAL")
		}
	}

	if db.head.MinOOOTime() != int64(math.MaxInt64) {
		// Some OOO data was replayed from the disk that needs compaction and cleanup.
		db.oooWasEnabled.Store(true)
	}

	go db.run()

	return db, nil
}