func()

in tsdb/head.go [590:810]


func (h *Head) Init(minValidTime int64) error {
	h.minValidTime.Store(minValidTime)
	defer func() {
		h.postings.EnsureOrder(h.opts.WALReplayConcurrency)
	}()
	defer h.gc() // After loading the wal remove the obsolete data from the head.
	defer func() {
		// Loading of m-mapped chunks and snapshot can make the mint of the Head
		// to go below minValidTime.
		if h.MinTime() < h.minValidTime.Load() {
			h.minTime.Store(h.minValidTime.Load())
		}
	}()

	level.Info(h.logger).Log("msg", "Replaying on-disk memory mappable chunks if any")
	start := time.Now()

	snapIdx, snapOffset := -1, 0
	refSeries := make(map[chunks.HeadSeriesRef]*memSeries)

	snapshotLoaded := false
	if h.opts.EnableMemorySnapshotOnShutdown {
		level.Info(h.logger).Log("msg", "Chunk snapshot is enabled, replaying from the snapshot")
		// If there are any WAL files, there should be at least one WAL file with an index that is current or newer
		// than the snapshot index. If the WAL index is behind the snapshot index somehow, the snapshot is assumed
		// to be outdated.
		loadSnapshot := true
		if h.wal != nil {
			_, endAt, err := wlog.Segments(h.wal.Dir())
			if err != nil {
				return errors.Wrap(err, "finding WAL segments")
			}

			_, idx, _, err := LastChunkSnapshot(h.opts.ChunkDirRoot)
			if err != nil && err != record.ErrNotFound {
				level.Error(h.logger).Log("msg", "Could not find last snapshot", "err", err)
			}

			if err == nil && endAt < idx {
				loadSnapshot = false
				level.Warn(h.logger).Log("msg", "Last WAL file is behind snapshot, removing snapshots")
				if err := DeleteChunkSnapshots(h.opts.ChunkDirRoot, math.MaxInt, math.MaxInt); err != nil {
					level.Error(h.logger).Log("msg", "Error while deleting snapshot directories", "err", err)
				}
			}
		}
		if loadSnapshot {
			var err error
			snapIdx, snapOffset, refSeries, err = h.loadChunkSnapshot()
			if err == nil {
				snapshotLoaded = true
				level.Info(h.logger).Log("msg", "Chunk snapshot loading time", "duration", time.Since(start).String())
			}
			if err != nil {
				snapIdx, snapOffset = -1, 0
				refSeries = make(map[chunks.HeadSeriesRef]*memSeries)

				h.metrics.snapshotReplayErrorTotal.Inc()
				level.Error(h.logger).Log("msg", "Failed to load chunk snapshot", "err", err)
				// We clear the partially loaded data to replay fresh from the WAL.
				if err := h.resetInMemoryState(); err != nil {
					return err
				}
			}
		}
	}

	mmapChunkReplayStart := time.Now()
	var (
		mmappedChunks    map[chunks.HeadSeriesRef][]*mmappedChunk
		oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk
		lastMmapRef      chunks.ChunkDiskMapperRef
		err              error
	)
	if snapshotLoaded || h.wal != nil {
		// If snapshot was not loaded and if there is no WAL, then m-map chunks will be discarded
		// anyway. So we only load m-map chunks when it won't be discarded.
		mmappedChunks, oooMmappedChunks, lastMmapRef, err = h.loadMmappedChunks(refSeries)
		if err != nil {
			// TODO(codesome): clear out all m-map chunks here for refSeries.
			level.Error(h.logger).Log("msg", "Loading on-disk chunks failed", "err", err)
			if _, ok := errors.Cause(err).(*chunks.CorruptionErr); ok {
				h.metrics.mmapChunkCorruptionTotal.Inc()
			}

			// Discard snapshot data since we need to replay the WAL for the missed m-map chunks data.
			snapIdx, snapOffset = -1, 0

			// If this fails, data will be recovered from WAL.
			// Hence we wont lose any data (given WAL is not corrupt).
			mmappedChunks, oooMmappedChunks, lastMmapRef, err = h.removeCorruptedMmappedChunks(err)
			if err != nil {
				return err
			}
		}
		level.Info(h.logger).Log("msg", "On-disk memory mappable chunks replay completed", "duration", time.Since(mmapChunkReplayStart).String())
	}

	if h.wal == nil {
		level.Info(h.logger).Log("msg", "WAL not found")
		return nil
	}

	level.Info(h.logger).Log("msg", "Replaying WAL, this may take a while")

	checkpointReplayStart := time.Now()
	// Backfill the checkpoint first if it exists.
	dir, startFrom, err := wlog.LastCheckpoint(h.wal.Dir())
	if err != nil && err != record.ErrNotFound {
		return errors.Wrap(err, "find last checkpoint")
	}

	// Find the last segment.
	_, endAt, e := wlog.Segments(h.wal.Dir())
	if e != nil {
		return errors.Wrap(e, "finding WAL segments")
	}

	h.startWALReplayStatus(startFrom, endAt)

	multiRef := map[chunks.HeadSeriesRef]chunks.HeadSeriesRef{}
	if err == nil && startFrom >= snapIdx {
		sr, err := wlog.NewSegmentsReader(dir)
		if err != nil {
			return errors.Wrap(err, "open checkpoint")
		}
		defer func() {
			if err := sr.Close(); err != nil {
				level.Warn(h.logger).Log("msg", "Error while closing the wal segments reader", "err", err)
			}
		}()

		// A corrupted checkpoint is a hard error for now and requires user
		// intervention. There's likely little data that can be recovered anyway.
		if err := h.loadWAL(wlog.NewReader(sr), multiRef, mmappedChunks, oooMmappedChunks); err != nil {
			return errors.Wrap(err, "backfill checkpoint")
		}
		h.updateWALReplayStatusRead(startFrom)
		startFrom++
		level.Info(h.logger).Log("msg", "WAL checkpoint loaded")
	}
	checkpointReplayDuration := time.Since(checkpointReplayStart)

	walReplayStart := time.Now()

	if snapIdx > startFrom {
		startFrom = snapIdx
	}
	// Backfill segments from the most recent checkpoint onwards.
	for i := startFrom; i <= endAt; i++ {
		s, err := wlog.OpenReadSegment(wlog.SegmentName(h.wal.Dir(), i))
		if err != nil {
			return errors.Wrap(err, fmt.Sprintf("open WAL segment: %d", i))
		}

		offset := 0
		if i == snapIdx {
			offset = snapOffset
		}
		sr, err := wlog.NewSegmentBufReaderWithOffset(offset, s)
		if errors.Is(err, io.EOF) {
			// File does not exist.
			continue
		}
		if err != nil {
			return errors.Wrapf(err, "segment reader (offset=%d)", offset)
		}
		err = h.loadWAL(wlog.NewReader(sr), multiRef, mmappedChunks, oooMmappedChunks)
		if err := sr.Close(); err != nil {
			level.Warn(h.logger).Log("msg", "Error while closing the wal segments reader", "err", err)
		}
		if err != nil {
			return err
		}
		level.Info(h.logger).Log("msg", "WAL segment loaded", "segment", i, "maxSegment", endAt)
		h.updateWALReplayStatusRead(i)
	}
	walReplayDuration := time.Since(walReplayStart)

	wblReplayStart := time.Now()
	if h.wbl != nil {
		// Replay OOO WAL.
		startFrom, endAt, e = wlog.Segments(h.wbl.Dir())
		if e != nil {
			return errors.Wrap(e, "finding OOO WAL segments")
		}
		h.startWALReplayStatus(startFrom, endAt)

		for i := startFrom; i <= endAt; i++ {
			s, err := wlog.OpenReadSegment(wlog.SegmentName(h.wbl.Dir(), i))
			if err != nil {
				return errors.Wrap(err, fmt.Sprintf("open WBL segment: %d", i))
			}

			sr := wlog.NewSegmentBufReader(s)
			err = h.loadWBL(wlog.NewReader(sr), multiRef, lastMmapRef)
			if err := sr.Close(); err != nil {
				level.Warn(h.logger).Log("msg", "Error while closing the wbl segments reader", "err", err)
			}
			if err != nil {
				return err
			}
			level.Info(h.logger).Log("msg", "WBL segment loaded", "segment", i, "maxSegment", endAt)
			h.updateWALReplayStatusRead(i)
		}
	}

	wblReplayDuration := time.Since(wblReplayStart)

	totalReplayDuration := time.Since(start)
	h.metrics.dataTotalReplayDuration.Set(totalReplayDuration.Seconds())
	level.Info(h.logger).Log(
		"msg", "WAL replay completed",
		"checkpoint_replay_duration", checkpointReplayDuration.String(),
		"wal_replay_duration", walReplayDuration.String(),
		"wbl_replay_duration", wblReplayDuration.String(),
		"total_replay_duration", totalReplayDuration.String(),
	)

	return nil
}