in v1storage/persistence.go [143:357]
func newPersistence(
basePath string,
dirty, pedanticChecks bool,
shouldSync syncStrategy,
minShrinkRatio float64,
) (*persistence, error) {
dirtyPath := filepath.Join(basePath, dirtyFileName)
versionPath := filepath.Join(basePath, versionFileName)
if versionData, err := ioutil.ReadFile(versionPath); err == nil {
if persistedVersion, err := strconv.Atoi(strings.TrimSpace(string(versionData))); err != nil {
return nil, fmt.Errorf("cannot parse content of %s: %s", versionPath, versionData)
} else if persistedVersion != Version {
return nil, fmt.Errorf("found storage version %d on disk, need version %d - please wipe storage or run a version of Prometheus compatible with storage version %d", persistedVersion, Version, persistedVersion)
}
} else if os.IsNotExist(err) {
// No version file found. Let's create the directory (in case
// it's not there yet) and then check if it is actually
// empty. If not, we have found an old storage directory without
// version file, so we have to bail out.
if err := os.MkdirAll(basePath, 0700); err != nil {
if abspath, e := filepath.Abs(basePath); e == nil {
return nil, fmt.Errorf("cannot create persistent directory %s: %s", abspath, err)
}
return nil, fmt.Errorf("cannot create persistent directory %s: %s", basePath, err)
}
fis, err := ioutil.ReadDir(basePath)
if err != nil {
return nil, err
}
filesPresent := len(fis)
for i := range fis {
switch {
case fis[i].Name() == "lost+found" && fis[i].IsDir():
filesPresent--
case strings.HasPrefix(fis[i].Name(), "."):
filesPresent--
}
}
if filesPresent > 0 {
return nil, fmt.Errorf("found existing files in storage path that do not look like storage files compatible with this version of Prometheus; please delete the files in the storage path or choose a different storage path")
}
// Finally we can write our own version into a new version file.
file, err := os.Create(versionPath)
if err != nil {
return nil, err
}
defer file.Close()
if _, err := fmt.Fprintf(file, "%d\n", Version); err != nil {
return nil, err
}
} else {
return nil, err
}
fLock, dirtyfileExisted, err := flock.New(dirtyPath)
if err != nil {
log.Errorf("Could not lock %s, Prometheus already running?", dirtyPath)
return nil, err
}
if dirtyfileExisted {
dirty = true
}
archivedFingerprintToMetrics, err := index.NewFingerprintMetricIndex(basePath)
if err != nil {
// At this point, we could simply blow away the archived
// fingerprint-to-metric index. However, then we would lose
// _all_ archived metrics. So better give the user an
// opportunity to repair the LevelDB with a 3rd party tool.
log.Errorf("Could not open the fingerprint-to-metric index for archived series. Please try a 3rd party tool to repair LevelDB in directory %q. If unsuccessful or undesired, delete the whole directory and restart Prometheus for crash recovery. You will lose all archived time series.", filepath.Join(basePath, index.FingerprintToMetricDir))
return nil, err
}
archivedFingerprintToTimeRange, err := index.NewFingerprintTimeRangeIndex(basePath)
if err != nil {
// We can recover the archived fingerprint-to-timerange index,
// so blow it away and set ourselves dirty. Then re-open the now
// empty index.
if err := index.DeleteFingerprintTimeRangeIndex(basePath); err != nil {
return nil, err
}
dirty = true
if archivedFingerprintToTimeRange, err = index.NewFingerprintTimeRangeIndex(basePath); err != nil {
return nil, err
}
}
p := &persistence{
basePath: basePath,
archivedFingerprintToMetrics: archivedFingerprintToMetrics,
archivedFingerprintToTimeRange: archivedFingerprintToTimeRange,
indexingQueue: make(chan indexingOp, indexingQueueCapacity),
indexingStopped: make(chan struct{}),
indexingFlush: make(chan chan int),
indexingQueueLength: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "indexing_queue_length",
Help: "The number of metrics waiting to be indexed.",
}),
indexingQueueCapacity: prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "indexing_queue_capacity"),
"The capacity of the indexing queue.",
nil, nil,
),
prometheus.GaugeValue,
float64(indexingQueueCapacity),
),
indexingBatchSizes: prometheus.NewSummary(
prometheus.SummaryOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "indexing_batch_sizes",
Help: "Quantiles for indexing batch sizes (number of metrics per batch).",
},
),
indexingBatchDuration: prometheus.NewSummary(
prometheus.SummaryOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "indexing_batch_duration_seconds",
Help: "Quantiles for batch indexing duration in seconds.",
},
),
checkpointLastDuration: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "checkpoint_last_duration_seconds",
Help: "The duration in seconds it took to last checkpoint open chunks and chunks yet to be persisted.",
}),
checkpointDuration: prometheus.NewSummary(prometheus.SummaryOpts{
Namespace: namespace,
Subsystem: subsystem,
Objectives: map[float64]float64{},
Name: "checkpoint_duration_seconds",
Help: "The duration in seconds taken for checkpointing open chunks and chunks yet to be persisted",
}),
checkpointLastSize: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "checkpoint_last_size_bytes",
Help: "The size of the last checkpoint of open chunks and chunks yet to be persisted",
}),
checkpointChunksWritten: prometheus.NewSummary(prometheus.SummaryOpts{
Namespace: namespace,
Subsystem: subsystem,
Objectives: map[float64]float64{},
Name: "checkpoint_series_chunks_written",
Help: "The number of chunk written per series while checkpointing open chunks and chunks yet to be persisted.",
}),
dirtyCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "inconsistencies_total",
Help: "A counter incremented each time an inconsistency in the local storage is detected. If this is greater zero, restart the server as soon as possible.",
}),
startedDirty: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "started_dirty",
Help: "Whether the local storage was found to be dirty (and crash recovery occurred) during Prometheus startup.",
}),
checkpointing: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "checkpointing",
Help: "1 if the storage is checkpointing, 0 otherwise.",
}),
seriesChunksPersisted: prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "series_chunks_persisted",
Help: "The number of chunks persisted per series.",
// Even with 4 bytes per sample, you're not going to get more than 85
// chunks in 6 hours for a time series with 1s resolution.
Buckets: []float64{1, 2, 4, 8, 16, 32, 64, 128},
}),
dirty: dirty,
pedanticChecks: pedanticChecks,
dirtyFileName: dirtyPath,
fLock: fLock,
shouldSync: shouldSync,
minShrinkRatio: minShrinkRatio,
// Create buffers of length 3*chunkLenWithHeader by default because that is still reasonably small
// and at the same time enough for many uses. The contract is to never return buffer smaller than
// that to the pool so that callers can rely on a minimum buffer size.
bufPool: sync.Pool{New: func() interface{} { return make([]byte, 0, 3*chunkLenWithHeader) }},
}
if p.dirty {
// Blow away the label indexes. We'll rebuild them later.
if err := index.DeleteLabelPairFingerprintIndex(basePath); err != nil {
return nil, err
}
if err := index.DeleteLabelNameLabelValuesIndex(basePath); err != nil {
return nil, err
}
}
labelPairToFingerprints, err := index.NewLabelPairFingerprintIndex(basePath)
if err != nil {
return nil, err
}
labelNameToLabelValues, err := index.NewLabelNameLabelValuesIndex(basePath)
if err != nil {
return nil, err
}
p.labelPairToFingerprints = labelPairToFingerprints
p.labelNameToLabelValues = labelNameToLabelValues
return p, nil
}