func()

in v1storage/storage.go [1035:1078]


func (s *MemorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Metric) (*memorySeries, error) {
	series, ok := s.fpToSeries.get(fp)
	if !ok {
		var cds []*chunk.Desc
		var modTime time.Time
		unarchived, err := s.persistence.unarchiveMetric(fp)
		if err != nil {
			log.Errorf("Error unarchiving fingerprint %v (metric %v): %v", fp, m, err)
			return nil, err
		}
		if unarchived {
			s.seriesOps.WithLabelValues(unarchive).Inc()
			// We have to load chunk.Descs anyway to do anything with
			// the series, so let's do it right now so that we don't
			// end up with a series without any chunk.Descs for a
			// while (which is confusing as it makes the series
			// appear as archived or purged).
			cds, err = s.loadChunkDescs(fp, 0)
			if err == nil && len(cds) == 0 {
				err = fmt.Errorf("unarchived fingerprint %v (metric %v) has no chunks on disk", fp, m)
			}
			if err != nil {
				s.quarantineSeries(fp, m, err)
				return nil, err
			}
			modTime = s.persistence.seriesFileModTime(fp)
		} else {
			// This was a genuinely new series, so index the metric.
			s.persistence.indexMetric(fp, m)
			s.seriesOps.WithLabelValues(create).Inc()
		}
		series, err = newMemorySeries(m, cds, modTime)
		if err != nil {
			s.quarantineSeries(fp, m, err)
			return nil, err
		}
		s.fpToSeries.put(fp, series)
		s.memorySeries.Inc()
		if !series.headChunkClosed {
			s.headChunks.Inc()
		}
	}
	return series, nil
}