in v1storage/persistence.go [890:1067]
func (p *persistence) dropAndPersistChunks(
fp model.Fingerprint, beforeTime model.Time, chunks []chunk.Chunk,
) (
firstTimeNotDropped model.Time,
offset int,
numDropped int,
allDropped bool,
err error,
) {
// Style note: With the many return values, it was decided to use naked
// returns in this method. They make the method more readable, but
// please handle with care!
if len(chunks) > 0 {
// We have chunks to persist. First check if those are already
// too old. If that's the case, the chunks in the series file
// are all too old, too.
i := 0
for ; i < len(chunks); i++ {
var lt model.Time
lt, err = chunks[i].NewIterator().LastTimestamp()
if err != nil {
return
}
if !lt.Before(beforeTime) {
break
}
}
if i < len(chunks) {
firstTimeNotDropped = chunks[i].FirstTime()
}
if i > 0 || firstTimeNotDropped.Before(beforeTime) {
// Series file has to go.
if numDropped, err = p.deleteSeriesFile(fp); err != nil {
return
}
numDropped += i
if i == len(chunks) {
allDropped = true
return
}
// Now simply persist what has to be persisted to a new file.
_, err = p.persistChunks(fp, chunks[i:])
return
}
}
// If we are here, we have to check the series file itself.
f, err := p.openChunkFileForReading(fp)
if os.IsNotExist(err) {
// No series file. Only need to create new file with chunks to
// persist, if there are any.
if len(chunks) == 0 {
allDropped = true
err = nil // Do not report not-exist err.
return
}
offset, err = p.persistChunks(fp, chunks)
return
}
if err != nil {
return
}
defer f.Close()
fi, err := f.Stat()
if err != nil {
return
}
chunksInFile := int(fi.Size()) / chunkLenWithHeader
totalChunks := chunksInFile + len(chunks)
// Calculate chunk index from minShrinkRatio, to skip unnecessary chunk header reading.
chunkIndexToStartSeek := 0
if p.minShrinkRatio < 1 {
chunkIndexToStartSeek = int(math.Floor(float64(totalChunks) * p.minShrinkRatio))
}
if chunkIndexToStartSeek >= chunksInFile {
chunkIndexToStartSeek = chunksInFile - 1
}
numDropped = chunkIndexToStartSeek
headerBuf := make([]byte, chunkHeaderLen)
// Find the first chunk in the file that should be kept.
for ; ; numDropped++ {
_, err = f.Seek(offsetForChunkIndex(numDropped), io.SeekStart)
if err != nil {
return
}
_, err = io.ReadFull(f, headerBuf)
if err == io.EOF {
// Close the file before trying to delete it. This is necessary on Windows
// (this will cause the defer f.Close to fail, but the error is silently ignored)
f.Close()
// We ran into the end of the file without finding any chunks that should
// be kept. Remove the whole file.
if numDropped, err = p.deleteSeriesFile(fp); err != nil {
return
}
if len(chunks) == 0 {
allDropped = true
return
}
offset, err = p.persistChunks(fp, chunks)
return
}
if err != nil {
return
}
lastTime := model.Time(
binary.LittleEndian.Uint64(headerBuf[chunkHeaderLastTimeOffset:]),
)
if !lastTime.Before(beforeTime) {
break
}
}
// If numDropped isn't incremented, the minShrinkRatio condition isn't satisfied.
if numDropped == chunkIndexToStartSeek {
// Nothing to drop. Just adjust the return values and append the chunks (if any).
numDropped = 0
_, err = f.Seek(offsetForChunkIndex(0), io.SeekStart)
if err != nil {
return
}
_, err = io.ReadFull(f, headerBuf)
if err != nil {
return
}
firstTimeNotDropped = model.Time(
binary.LittleEndian.Uint64(headerBuf[chunkHeaderFirstTimeOffset:]),
)
if len(chunks) > 0 {
offset, err = p.persistChunks(fp, chunks)
} else {
offset = chunksInFile
}
return
}
// If we are here, we have to drop some chunks for real. So we need to
// record firstTimeNotDropped from the last read header, seek backwards
// to the beginning of its header, and start copying everything from
// there into a new file. Then append the chunks to the new file.
firstTimeNotDropped = model.Time(
binary.LittleEndian.Uint64(headerBuf[chunkHeaderFirstTimeOffset:]),
)
chunk.Ops.WithLabelValues(chunk.Drop).Add(float64(numDropped))
_, err = f.Seek(-chunkHeaderLen, io.SeekCurrent)
if err != nil {
return
}
temp, err := os.OpenFile(p.tempFileNameForFingerprint(fp), os.O_WRONLY|os.O_CREATE, 0640)
if err != nil {
return
}
defer func() {
// Close the file before trying to rename to it. This is necessary on Windows
// (this will cause the defer f.Close to fail, but the error is silently ignored)
f.Close()
p.closeChunkFile(temp)
if err == nil {
err = os.Rename(p.tempFileNameForFingerprint(fp), p.fileNameForFingerprint(fp))
}
}()
written, err := io.Copy(temp, f)
if err != nil {
return
}
offset = int(written / chunkLenWithHeader)
if len(chunks) > 0 {
if err = p.writeChunks(temp, chunks); err != nil {
return
}
}
return
}