func()

in v1storage/persistence.go [630:830]


func (p *persistence) checkpointSeriesMapAndHeads(
	ctx context.Context, fingerprintToSeries *seriesMap, fpLocker *fingerprintLocker,
) (err error) {
	log.Info("Checkpointing in-memory metrics and chunks...")
	p.checkpointing.Set(1)
	defer p.checkpointing.Set(0)
	begin := time.Now()
	f, err := os.OpenFile(p.headsTempFileName(), os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0640)
	if err != nil {
		return err
	}

	defer func() {
		defer os.Remove(p.headsTempFileName()) // Just in case it was left behind.

		if err != nil {
			// If we already had an error, do not bother to sync,
			// just close, ignoring any further error.
			f.Close()
			return
		}
		syncErr := f.Sync()
		closeErr := f.Close()
		err = syncErr
		if err != nil {
			return
		}
		err = closeErr
		if err != nil {
			return
		}
		err = os.Rename(p.headsTempFileName(), p.headsFileName())
		duration := time.Since(begin)
		p.checkpointDuration.Observe(duration.Seconds())
		p.checkpointLastDuration.Set(duration.Seconds())
		log.Infof("Done checkpointing in-memory metrics and chunks in %v.", duration)
	}()

	w := bufio.NewWriterSize(f, fileBufSize)

	if _, err = w.WriteString(headsMagicString); err != nil {
		return err
	}
	var numberOfSeriesOffset int
	if numberOfSeriesOffset, err = codable.EncodeVarint(w, headsFormatVersion); err != nil {
		return err
	}
	numberOfSeriesOffset += len(headsMagicString)
	numberOfSeriesInHeader := uint64(fingerprintToSeries.length())
	// We have to write the number of series as uint64 because we might need
	// to overwrite it later, and a varint might change byte width then.
	if err = codable.EncodeUint64(w, numberOfSeriesInHeader); err != nil {
		return err
	}

	iter := fingerprintToSeries.iter()
	defer func() {
		// Consume the iterator in any case to not leak goroutines.
		for range iter {
		}
	}()

	var realNumberOfSeries uint64
	for m := range iter {
		select {
		case <-ctx.Done():
			return ctx.Err()
		default:
		}
		func() { // Wrapped in function to use defer for unlocking the fp.
			fpLocker.Lock(m.fp)
			defer fpLocker.Unlock(m.fp)

			chunksToPersist := len(m.series.chunkDescs) - m.series.persistWatermark
			if len(m.series.chunkDescs) == 0 {
				// This series was completely purged or archived
				// in the meantime. Ignore.
				return
			}
			realNumberOfSeries++

			// Sanity checks.
			if m.series.chunkDescsOffset < 0 && m.series.persistWatermark > 0 {
				panic("encountered unknown chunk desc offset in combination with positive persist watermark")
			}

			// These are the values to save in the normal case.
			var (
				// persistWatermark is zero as we only checkpoint non-persisted chunks.
				persistWatermark int64
				// chunkDescsOffset is shifted by the original persistWatermark for the same reason.
				chunkDescsOffset = int64(m.series.chunkDescsOffset + m.series.persistWatermark)
				numChunkDescs    = int64(chunksToPersist)
			)
			// However, in the special case of a series being fully
			// persisted but still in memory (i.e. not archived), we
			// need to save a "placeholder", for which we use just
			// the chunk desc of the last chunk. Values have to be
			// adjusted accordingly. (The reason for doing it in
			// this weird way is to keep the checkpoint format
			// compatible with older versions.)
			if chunksToPersist == 0 {
				persistWatermark = 1
				chunkDescsOffset-- // Save one chunk desc after all.
				numChunkDescs = 1
			}

			// seriesFlags left empty in v2.
			if err = w.WriteByte(0); err != nil {
				return
			}
			if err = codable.EncodeUint64(w, uint64(m.fp)); err != nil {
				return
			}
			var buf []byte
			buf, err = codable.Metric(m.series.metric).MarshalBinary()
			if err != nil {
				return
			}
			if _, err = w.Write(buf); err != nil {
				return
			}
			if _, err = codable.EncodeVarint(w, persistWatermark); err != nil {
				return
			}
			if m.series.modTime.IsZero() {
				if _, err = codable.EncodeVarint(w, -1); err != nil {
					return
				}
			} else {
				if _, err = codable.EncodeVarint(w, m.series.modTime.UnixNano()); err != nil {
					return
				}
			}
			if _, err = codable.EncodeVarint(w, chunkDescsOffset); err != nil {
				return
			}
			if _, err = codable.EncodeVarint(w, int64(m.series.savedFirstTime)); err != nil {
				return
			}
			if _, err = codable.EncodeVarint(w, numChunkDescs); err != nil {
				return
			}
			if chunksToPersist == 0 {
				// Save the one placeholder chunk desc for a fully persisted series.
				chunkDesc := m.series.chunkDescs[len(m.series.chunkDescs)-1]
				if _, err = codable.EncodeVarint(w, int64(chunkDesc.FirstTime())); err != nil {
					return
				}
				lt, err := chunkDesc.LastTime()
				if err != nil {
					return
				}
				if _, err = codable.EncodeVarint(w, int64(lt)); err != nil {
					return
				}
			} else {
				// Save (only) the non-persisted chunks.
				for _, chunkDesc := range m.series.chunkDescs[m.series.persistWatermark:] {
					if err = w.WriteByte(byte(chunkDesc.C.Encoding())); err != nil {
						return
					}
					if err = chunkDesc.C.Marshal(w); err != nil {
						return
					}
					p.checkpointChunksWritten.Observe(float64(chunksToPersist))
				}
			}
			// Series is checkpointed now, so declare it clean. In case the entire
			// checkpoint fails later on, this is fine, as the storage's series
			// maintenance will mark these series newly dirty again, continuously
			// increasing the total number of dirty series as seen by the storage.
			// This has the effect of triggering a new checkpoint attempt even
			// earlier than if we hadn't incorrectly set "dirty" to "false" here
			// already.
			m.series.dirty = false
		}()
		if err != nil {
			return err
		}
	}
	if err = w.Flush(); err != nil {
		return err
	}
	if realNumberOfSeries != numberOfSeriesInHeader {
		// The number of series has changed in the meantime.
		// Rewrite it in the header.
		if _, err = f.Seek(int64(numberOfSeriesOffset), io.SeekStart); err != nil {
			return err
		}
		if err = codable.EncodeUint64(f, realNumberOfSeries); err != nil {
			return err
		}
	}
	info, err := f.Stat()
	if err != nil {
		return err
	}
	p.checkpointLastSize.Set(float64(info.Size()))
	return err
}