func newPersistence()

in v1storage/persistence.go [143:357]


func newPersistence(
	basePath string,
	dirty, pedanticChecks bool,
	shouldSync syncStrategy,
	minShrinkRatio float64,
) (*persistence, error) {
	dirtyPath := filepath.Join(basePath, dirtyFileName)
	versionPath := filepath.Join(basePath, versionFileName)

	if versionData, err := ioutil.ReadFile(versionPath); err == nil {
		if persistedVersion, err := strconv.Atoi(strings.TrimSpace(string(versionData))); err != nil {
			return nil, fmt.Errorf("cannot parse content of %s: %s", versionPath, versionData)
		} else if persistedVersion != Version {
			return nil, fmt.Errorf("found storage version %d on disk, need version %d - please wipe storage or run a version of Prometheus compatible with storage version %d", persistedVersion, Version, persistedVersion)
		}
	} else if os.IsNotExist(err) {
		// No version file found. Let's create the directory (in case
		// it's not there yet) and then check if it is actually
		// empty. If not, we have found an old storage directory without
		// version file, so we have to bail out.
		if err := os.MkdirAll(basePath, 0700); err != nil {
			if abspath, e := filepath.Abs(basePath); e == nil {
				return nil, fmt.Errorf("cannot create persistent directory %s: %s", abspath, err)
			}
			return nil, fmt.Errorf("cannot create persistent directory %s: %s", basePath, err)
		}
		fis, err := ioutil.ReadDir(basePath)
		if err != nil {
			return nil, err
		}
		filesPresent := len(fis)
		for i := range fis {
			switch {
			case fis[i].Name() == "lost+found" && fis[i].IsDir():
				filesPresent--
			case strings.HasPrefix(fis[i].Name(), "."):
				filesPresent--
			}
		}
		if filesPresent > 0 {
			return nil, fmt.Errorf("found existing files in storage path that do not look like storage files compatible with this version of Prometheus; please delete the files in the storage path or choose a different storage path")
		}
		// Finally we can write our own version into a new version file.
		file, err := os.Create(versionPath)
		if err != nil {
			return nil, err
		}
		defer file.Close()
		if _, err := fmt.Fprintf(file, "%d\n", Version); err != nil {
			return nil, err
		}
	} else {
		return nil, err
	}

	fLock, dirtyfileExisted, err := flock.New(dirtyPath)
	if err != nil {
		log.Errorf("Could not lock %s, Prometheus already running?", dirtyPath)
		return nil, err
	}
	if dirtyfileExisted {
		dirty = true
	}

	archivedFingerprintToMetrics, err := index.NewFingerprintMetricIndex(basePath)
	if err != nil {
		// At this point, we could simply blow away the archived
		// fingerprint-to-metric index. However, then we would lose
		// _all_ archived metrics. So better give the user an
		// opportunity to repair the LevelDB with a 3rd party tool.
		log.Errorf("Could not open the fingerprint-to-metric index for archived series. Please try a 3rd party tool to repair LevelDB in directory %q. If unsuccessful or undesired, delete the whole directory and restart Prometheus for crash recovery. You will lose all archived time series.", filepath.Join(basePath, index.FingerprintToMetricDir))
		return nil, err
	}
	archivedFingerprintToTimeRange, err := index.NewFingerprintTimeRangeIndex(basePath)
	if err != nil {
		// We can recover the archived fingerprint-to-timerange index,
		// so blow it away and set ourselves dirty. Then re-open the now
		// empty index.
		if err := index.DeleteFingerprintTimeRangeIndex(basePath); err != nil {
			return nil, err
		}
		dirty = true
		if archivedFingerprintToTimeRange, err = index.NewFingerprintTimeRangeIndex(basePath); err != nil {
			return nil, err
		}
	}

	p := &persistence{
		basePath: basePath,

		archivedFingerprintToMetrics:   archivedFingerprintToMetrics,
		archivedFingerprintToTimeRange: archivedFingerprintToTimeRange,

		indexingQueue:   make(chan indexingOp, indexingQueueCapacity),
		indexingStopped: make(chan struct{}),
		indexingFlush:   make(chan chan int),

		indexingQueueLength: prometheus.NewGauge(prometheus.GaugeOpts{
			Namespace: namespace,
			Subsystem: subsystem,
			Name:      "indexing_queue_length",
			Help:      "The number of metrics waiting to be indexed.",
		}),
		indexingQueueCapacity: prometheus.MustNewConstMetric(
			prometheus.NewDesc(
				prometheus.BuildFQName(namespace, subsystem, "indexing_queue_capacity"),
				"The capacity of the indexing queue.",
				nil, nil,
			),
			prometheus.GaugeValue,
			float64(indexingQueueCapacity),
		),
		indexingBatchSizes: prometheus.NewSummary(
			prometheus.SummaryOpts{
				Namespace: namespace,
				Subsystem: subsystem,
				Name:      "indexing_batch_sizes",
				Help:      "Quantiles for indexing batch sizes (number of metrics per batch).",
			},
		),
		indexingBatchDuration: prometheus.NewSummary(
			prometheus.SummaryOpts{
				Namespace: namespace,
				Subsystem: subsystem,
				Name:      "indexing_batch_duration_seconds",
				Help:      "Quantiles for batch indexing duration in seconds.",
			},
		),
		checkpointLastDuration: prometheus.NewGauge(prometheus.GaugeOpts{
			Namespace: namespace,
			Subsystem: subsystem,
			Name:      "checkpoint_last_duration_seconds",
			Help:      "The duration in seconds it took to last checkpoint open chunks and chunks yet to be persisted.",
		}),
		checkpointDuration: prometheus.NewSummary(prometheus.SummaryOpts{
			Namespace:  namespace,
			Subsystem:  subsystem,
			Objectives: map[float64]float64{},
			Name:       "checkpoint_duration_seconds",
			Help:       "The duration in seconds taken for checkpointing open chunks and chunks yet to be persisted",
		}),
		checkpointLastSize: prometheus.NewGauge(prometheus.GaugeOpts{
			Namespace: namespace,
			Subsystem: subsystem,
			Name:      "checkpoint_last_size_bytes",
			Help:      "The size of the last checkpoint of open chunks and chunks yet to be persisted",
		}),
		checkpointChunksWritten: prometheus.NewSummary(prometheus.SummaryOpts{
			Namespace:  namespace,
			Subsystem:  subsystem,
			Objectives: map[float64]float64{},
			Name:       "checkpoint_series_chunks_written",
			Help:       "The number of chunk written per series while checkpointing open chunks and chunks yet to be persisted.",
		}),
		dirtyCounter: prometheus.NewCounter(prometheus.CounterOpts{
			Namespace: namespace,
			Subsystem: subsystem,
			Name:      "inconsistencies_total",
			Help:      "A counter incremented each time an inconsistency in the local storage is detected. If this is greater zero, restart the server as soon as possible.",
		}),
		startedDirty: prometheus.NewGauge(prometheus.GaugeOpts{
			Namespace: namespace,
			Subsystem: subsystem,
			Name:      "started_dirty",
			Help:      "Whether the local storage was found to be dirty (and crash recovery occurred) during Prometheus startup.",
		}),
		checkpointing: prometheus.NewGauge(prometheus.GaugeOpts{
			Namespace: namespace,
			Subsystem: subsystem,
			Name:      "checkpointing",
			Help:      "1 if the storage is checkpointing, 0 otherwise.",
		}),
		seriesChunksPersisted: prometheus.NewHistogram(prometheus.HistogramOpts{
			Namespace: namespace,
			Subsystem: subsystem,
			Name:      "series_chunks_persisted",
			Help:      "The number of chunks persisted per series.",
			// Even with 4 bytes per sample, you're not going to get more than 85
			// chunks in 6 hours for a time series with 1s resolution.
			Buckets: []float64{1, 2, 4, 8, 16, 32, 64, 128},
		}),
		dirty:          dirty,
		pedanticChecks: pedanticChecks,
		dirtyFileName:  dirtyPath,
		fLock:          fLock,
		shouldSync:     shouldSync,
		minShrinkRatio: minShrinkRatio,
		// Create buffers of length 3*chunkLenWithHeader by default because that is still reasonably small
		// and at the same time enough for many uses. The contract is to never return buffer smaller than
		// that to the pool so that callers can rely on a minimum buffer size.
		bufPool: sync.Pool{New: func() interface{} { return make([]byte, 0, 3*chunkLenWithHeader) }},
	}

	if p.dirty {
		// Blow away the label indexes. We'll rebuild them later.
		if err := index.DeleteLabelPairFingerprintIndex(basePath); err != nil {
			return nil, err
		}
		if err := index.DeleteLabelNameLabelValuesIndex(basePath); err != nil {
			return nil, err
		}
	}
	labelPairToFingerprints, err := index.NewLabelPairFingerprintIndex(basePath)
	if err != nil {
		return nil, err
	}
	labelNameToLabelValues, err := index.NewLabelNameLabelValuesIndex(basePath)
	if err != nil {
		return nil, err
	}
	p.labelPairToFingerprints = labelPairToFingerprints
	p.labelNameToLabelValues = labelNameToLabelValues

	return p, nil
}