in v1storage/crashrecovery.go [183:404]
func (p *persistence) sanitizeSeries(
dirname string, fi os.FileInfo,
fingerprintToSeries map[model.Fingerprint]*memorySeries,
fpm fpMappings,
) (model.Fingerprint, bool) {
var (
fp model.Fingerprint
err error
filename = filepath.Join(dirname, fi.Name())
s *memorySeries
)
purge := func() {
if fp != 0 {
var metric model.Metric
if s != nil {
metric = s.metric
}
if err = p.quarantineSeriesFile(
fp, errors.New("purge during crash recovery"), metric,
); err == nil {
return
}
log.
With("file", filename).
With("error", err).
Error("Failed to move lost series file to orphaned directory.")
}
// If we are here, we are either purging an incorrectly named
// file, or quarantining has failed. So simply delete the file.
if err = os.Remove(filename); err != nil {
log.
With("file", filename).
With("error", err).
Error("Failed to delete lost series file.")
}
}
if len(fi.Name()) != fpLen-seriesDirNameLen+len(seriesFileSuffix) ||
!strings.HasSuffix(fi.Name(), seriesFileSuffix) {
log.Warnf("Unexpected series file name %s.", filename)
purge()
return fp, false
}
if fp, err = model.FingerprintFromString(filepath.Base(dirname) + fi.Name()[:fpLen-seriesDirNameLen]); err != nil {
log.Warnf("Error parsing file name %s: %s", filename, err)
purge()
return fp, false
}
bytesToTrim := fi.Size() % int64(chunkLenWithHeader)
chunksInFile := int(fi.Size()) / chunkLenWithHeader
modTime := fi.ModTime()
if bytesToTrim != 0 {
log.Warnf(
"Truncating file %s to exactly %d chunks, trimming %d extraneous bytes.",
filename, chunksInFile, bytesToTrim,
)
f, err := os.OpenFile(filename, os.O_WRONLY, 0640)
if err != nil {
log.Errorf("Could not open file %s: %s", filename, err)
purge()
return fp, false
}
if err := f.Truncate(fi.Size() - bytesToTrim); err != nil {
log.Errorf("Failed to truncate file %s: %s", filename, err)
purge()
return fp, false
}
}
if chunksInFile == 0 {
log.Warnf("No chunks left in file %s.", filename)
purge()
return fp, false
}
s, ok := fingerprintToSeries[fp]
if ok { // This series is supposed to not be archived.
if s == nil {
panic("fingerprint mapped to nil pointer")
}
maybeAddMapping(fp, s.metric, fpm)
if !p.pedanticChecks &&
bytesToTrim == 0 &&
s.chunkDescsOffset != -1 &&
chunksInFile == s.chunkDescsOffset+s.persistWatermark &&
modTime.Equal(s.modTime) {
// Everything is consistent. We are good.
return fp, true
}
// If we are here, we cannot be sure the series file is
// consistent with the checkpoint, so we have to take a closer
// look.
if s.headChunkClosed {
// This is the easy case as we have all chunks on
// disk. Treat this series as a freshly unarchived one
// by loading the chunkDescs and setting all parameters
// based on the loaded chunkDescs.
cds, err := p.loadChunkDescs(fp, 0)
if err != nil {
log.Errorf(
"Failed to load chunk descriptors for metric %v, fingerprint %v: %s",
s.metric, fp, err,
)
purge()
return fp, false
}
log.Warnf(
"Treating recovered metric %v, fingerprint %v, as freshly unarchived, with %d chunks in series file.",
s.metric, fp, len(cds),
)
s.chunkDescs = cds
s.chunkDescsOffset = 0
s.savedFirstTime = cds[0].FirstTime()
s.lastTime, err = cds[len(cds)-1].LastTime()
if err != nil {
log.Errorf(
"Failed to determine time of the last sample for metric %v, fingerprint %v: %s",
s.metric, fp, err,
)
purge()
return fp, false
}
s.persistWatermark = len(cds)
s.modTime = modTime
// Finally, evict again all chunk.Descs except the latest one to save memory.
s.evictChunkDescs(len(cds) - 1)
return fp, true
}
// This is the tricky one: We have chunks from heads.db, but
// some of those chunks might already be in the series
// file. Strategy: Take the last time of the most recent chunk
// in the series file. Then find the oldest chunk among those
// from heads.db that has a first time later or equal to the
// last time from the series file. Throw away the older chunks
// from heads.db and stitch the parts together.
// First, throw away the chunkDescs without chunks.
s.chunkDescs = s.chunkDescs[s.persistWatermark:]
chunk.NumMemDescs.Sub(float64(s.persistWatermark))
cds, err := p.loadChunkDescs(fp, 0)
if err != nil {
log.Errorf(
"Failed to load chunk descriptors for metric %v, fingerprint %v: %s",
s.metric, fp, err,
)
purge()
return fp, false
}
s.persistWatermark = len(cds)
s.chunkDescsOffset = 0
s.savedFirstTime = cds[0].FirstTime()
s.modTime = modTime
lastTime, err := cds[len(cds)-1].LastTime()
if err != nil {
log.Errorf(
"Failed to determine time of the last sample for metric %v, fingerprint %v: %s",
s.metric, fp, err,
)
purge()
return fp, false
}
keepIdx := -1
for i, cd := range s.chunkDescs {
if cd.FirstTime() >= lastTime {
keepIdx = i
break
}
}
if keepIdx == -1 {
log.Warnf(
"Recovered metric %v, fingerprint %v: all %d chunks recovered from series file.",
s.metric, fp, chunksInFile,
)
chunk.NumMemDescs.Sub(float64(len(s.chunkDescs)))
atomic.AddInt64(&chunk.NumMemChunks, int64(-len(s.chunkDescs)))
s.chunkDescs = cds
s.headChunkClosed = true
// Finally, evict again all chunk.Descs except the latest one to save memory.
s.evictChunkDescs(len(cds) - 1)
return fp, true
}
log.Warnf(
"Recovered metric %v, fingerprint %v: recovered %d chunks from series file, recovered %d chunks from checkpoint.",
s.metric, fp, chunksInFile, len(s.chunkDescs)-keepIdx,
)
chunk.NumMemDescs.Sub(float64(keepIdx))
atomic.AddInt64(&chunk.NumMemChunks, int64(-keepIdx))
chunkDescsToEvict := len(cds)
if keepIdx == len(s.chunkDescs) {
// No chunks from series file left, head chunk is evicted, so declare it closed.
s.headChunkClosed = true
chunkDescsToEvict-- // Keep one chunk.Desc in this case to avoid a series with zero chunk.Descs.
}
s.chunkDescs = append(cds, s.chunkDescs[keepIdx:]...)
// Finally, evict again chunk.Descs without chunk to save memory.
s.evictChunkDescs(chunkDescsToEvict)
return fp, true
}
// This series is supposed to be archived.
metric, err := p.archivedMetric(fp)
if err != nil {
log.Errorf(
"Fingerprint %v assumed archived but couldn't be looked up in archived index: %s",
fp, err,
)
purge()
return fp, false
}
if metric == nil {
log.Warnf(
"Fingerprint %v assumed archived but couldn't be found in archived index.",
fp,
)
purge()
return fp, false
}
// This series looks like a properly archived one.
maybeAddMapping(fp, metric, fpm)
return fp, true
}