func()

in v1storage/persistence.go [890:1067]


func (p *persistence) dropAndPersistChunks(
	fp model.Fingerprint, beforeTime model.Time, chunks []chunk.Chunk,
) (
	firstTimeNotDropped model.Time,
	offset int,
	numDropped int,
	allDropped bool,
	err error,
) {
	// Style note: With the many return values, it was decided to use naked
	// returns in this method. They make the method more readable, but
	// please handle with care!
	if len(chunks) > 0 {
		// We have chunks to persist. First check if those are already
		// too old. If that's the case, the chunks in the series file
		// are all too old, too.
		i := 0
		for ; i < len(chunks); i++ {
			var lt model.Time
			lt, err = chunks[i].NewIterator().LastTimestamp()
			if err != nil {
				return
			}
			if !lt.Before(beforeTime) {
				break
			}
		}
		if i < len(chunks) {
			firstTimeNotDropped = chunks[i].FirstTime()
		}
		if i > 0 || firstTimeNotDropped.Before(beforeTime) {
			// Series file has to go.
			if numDropped, err = p.deleteSeriesFile(fp); err != nil {
				return
			}
			numDropped += i
			if i == len(chunks) {
				allDropped = true
				return
			}
			// Now simply persist what has to be persisted to a new file.
			_, err = p.persistChunks(fp, chunks[i:])
			return
		}
	}

	// If we are here, we have to check the series file itself.
	f, err := p.openChunkFileForReading(fp)
	if os.IsNotExist(err) {
		// No series file. Only need to create new file with chunks to
		// persist, if there are any.
		if len(chunks) == 0 {
			allDropped = true
			err = nil // Do not report not-exist err.
			return
		}
		offset, err = p.persistChunks(fp, chunks)
		return
	}
	if err != nil {
		return
	}
	defer f.Close()

	fi, err := f.Stat()
	if err != nil {
		return
	}
	chunksInFile := int(fi.Size()) / chunkLenWithHeader
	totalChunks := chunksInFile + len(chunks)

	// Calculate chunk index from minShrinkRatio, to skip unnecessary chunk header reading.
	chunkIndexToStartSeek := 0
	if p.minShrinkRatio < 1 {
		chunkIndexToStartSeek = int(math.Floor(float64(totalChunks) * p.minShrinkRatio))
	}
	if chunkIndexToStartSeek >= chunksInFile {
		chunkIndexToStartSeek = chunksInFile - 1
	}
	numDropped = chunkIndexToStartSeek

	headerBuf := make([]byte, chunkHeaderLen)
	// Find the first chunk in the file that should be kept.
	for ; ; numDropped++ {
		_, err = f.Seek(offsetForChunkIndex(numDropped), io.SeekStart)
		if err != nil {
			return
		}
		_, err = io.ReadFull(f, headerBuf)
		if err == io.EOF {
			// Close the file before trying to delete it. This is necessary on Windows
			// (this will cause the defer f.Close to fail, but the error is silently ignored)
			f.Close()
			// We ran into the end of the file without finding any chunks that should
			// be kept. Remove the whole file.
			if numDropped, err = p.deleteSeriesFile(fp); err != nil {
				return
			}
			if len(chunks) == 0 {
				allDropped = true
				return
			}
			offset, err = p.persistChunks(fp, chunks)
			return
		}
		if err != nil {
			return
		}
		lastTime := model.Time(
			binary.LittleEndian.Uint64(headerBuf[chunkHeaderLastTimeOffset:]),
		)
		if !lastTime.Before(beforeTime) {
			break
		}
	}

	// If numDropped isn't incremented, the minShrinkRatio condition isn't satisfied.
	if numDropped == chunkIndexToStartSeek {
		// Nothing to drop. Just adjust the return values and append the chunks (if any).
		numDropped = 0
		_, err = f.Seek(offsetForChunkIndex(0), io.SeekStart)
		if err != nil {
			return
		}
		_, err = io.ReadFull(f, headerBuf)
		if err != nil {
			return
		}
		firstTimeNotDropped = model.Time(
			binary.LittleEndian.Uint64(headerBuf[chunkHeaderFirstTimeOffset:]),
		)
		if len(chunks) > 0 {
			offset, err = p.persistChunks(fp, chunks)
		} else {
			offset = chunksInFile
		}
		return
	}
	// If we are here, we have to drop some chunks for real. So we need to
	// record firstTimeNotDropped from the last read header, seek backwards
	// to the beginning of its header, and start copying everything from
	// there into a new file. Then append the chunks to the new file.
	firstTimeNotDropped = model.Time(
		binary.LittleEndian.Uint64(headerBuf[chunkHeaderFirstTimeOffset:]),
	)
	chunk.Ops.WithLabelValues(chunk.Drop).Add(float64(numDropped))
	_, err = f.Seek(-chunkHeaderLen, io.SeekCurrent)
	if err != nil {
		return
	}

	temp, err := os.OpenFile(p.tempFileNameForFingerprint(fp), os.O_WRONLY|os.O_CREATE, 0640)
	if err != nil {
		return
	}
	defer func() {
		// Close the file before trying to rename to it. This is necessary on Windows
		// (this will cause the defer f.Close to fail, but the error is silently ignored)
		f.Close()
		p.closeChunkFile(temp)
		if err == nil {
			err = os.Rename(p.tempFileNameForFingerprint(fp), p.fileNameForFingerprint(fp))
		}
	}()

	written, err := io.Copy(temp, f)
	if err != nil {
		return
	}
	offset = int(written / chunkLenWithHeader)

	if len(chunks) > 0 {
		if err = p.writeChunks(temp, chunks); err != nil {
			return
		}
	}
	return
}