func NewMemorySeriesStorage()

in v1storage/storage.go [211:387]


func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) *MemorySeriesStorage {
	s := &MemorySeriesStorage{
		fpLocker: newFingerprintLocker(o.NumMutexes),

		options: o,

		loopStopping:               make(chan struct{}),
		loopStopped:                make(chan struct{}),
		logThrottlingStopped:       make(chan struct{}),
		throttled:                  make(chan struct{}, 1),
		targetHeapSize:             o.TargetHeapSize,
		dropAfter:                  o.PersistenceRetentionPeriod,
		headChunkTimeout:           o.HeadChunkTimeout,
		checkpointInterval:         o.CheckpointInterval,
		checkpointDirtySeriesLimit: o.CheckpointDirtySeriesLimit,
		archiveHighWatermark:       model.Now().Add(-o.HeadChunkTimeout),

		evictList:     list.New(),
		evictRequests: make(chan chunk.EvictRequest, evictRequestsCap),
		evictStopping: make(chan struct{}),
		evictStopped:  make(chan struct{}),

		quarantineRequests: make(chan quarantineRequest, quarantineRequestsCap),
		quarantineStopping: make(chan struct{}),
		quarantineStopped:  make(chan struct{}),

		persistErrors: prometheus.NewCounter(prometheus.CounterOpts{
			Namespace: namespace,
			Subsystem: subsystem,
			Name:      "persist_errors_total",
			Help:      "The total number of errors while writing to the persistence layer.",
		}),
		queuedChunksToPersist: prometheus.NewCounter(prometheus.CounterOpts{
			Namespace: namespace,
			Subsystem: subsystem,
			Name:      "queued_chunks_to_persist_total",
			Help:      "The total number of chunks queued for persistence.",
		}),
		memorySeries: prometheus.NewGauge(prometheus.GaugeOpts{
			Namespace: namespace,
			Subsystem: subsystem,
			Name:      "memory_series",
			Help:      "The current number of series in memory.",
		}),
		headChunks: prometheus.NewGauge(prometheus.GaugeOpts{
			Namespace: namespace,
			Subsystem: subsystem,
			Name:      "open_head_chunks",
			Help:      "The current number of open head chunks.",
		}),
		dirtySeries: prometheus.NewGauge(prometheus.GaugeOpts{
			Namespace: namespace,
			Subsystem: subsystem,
			Name:      "memory_dirty_series",
			Help:      "The current number of series that would require a disk seek during crash recovery.",
		}),
		seriesOps: prometheus.NewCounterVec(
			prometheus.CounterOpts{
				Namespace: namespace,
				Subsystem: subsystem,
				Name:      "series_ops_total",
				Help:      "The total number of series operations by their type.",
			},
			[]string{opTypeLabel},
		),
		ingestedSamples: prometheus.NewCounter(prometheus.CounterOpts{
			Namespace: namespace,
			Subsystem: subsystem,
			Name:      "ingested_samples_total",
			Help:      "The total number of samples ingested.",
		}),
		discardedSamples: prometheus.NewCounterVec(
			prometheus.CounterOpts{
				Namespace: namespace,
				Subsystem: subsystem,
				Name:      "out_of_order_samples_total",
				Help:      "The total number of samples that were discarded because their timestamps were at or before the last received sample for a series.",
			},
			[]string{discardReasonLabel},
		),
		nonExistentSeriesMatches: prometheus.NewCounter(prometheus.CounterOpts{
			Namespace: namespace,
			Subsystem: subsystem,
			Name:      "non_existent_series_matches_total",
			Help:      "How often a non-existent series was referred to during label matching or chunk preloading. This is an indication of outdated label indexes.",
		}),
		memChunks: prometheus.NewGaugeFunc(
			prometheus.GaugeOpts{
				Namespace: namespace,
				Subsystem: subsystem,
				Name:      "memory_chunks",
				Help:      "The current number of chunks in memory. The number does not include cloned chunks (i.e. chunks without a descriptor).",
			},
			func() float64 { return float64(atomic.LoadInt64(&chunk.NumMemChunks)) },
		),
		maintainSeriesDuration: prometheus.NewSummaryVec(
			prometheus.SummaryOpts{
				Namespace: namespace,
				Subsystem: subsystem,
				Name:      "maintain_series_duration_seconds",
				Help:      "The duration in seconds it took to perform maintenance on a series.",
			},
			[]string{seriesLocationLabel},
		),
	}

	s.chunksToPersist = prometheus.NewGaugeFunc(
		prometheus.GaugeOpts{
			Namespace: namespace,
			Subsystem: subsystem,
			Name:      "chunks_to_persist",
			Help:      "The current number of chunks waiting for persistence.",
		},
		func() float64 {
			return float64(s.getNumChunksToPersist())
		},
	)
	s.rushedMode = prometheus.NewGaugeFunc(
		prometheus.GaugeOpts{
			Namespace: namespace,
			Subsystem: subsystem,
			Name:      "rushed_mode",
			Help:      "1 if the storage is in rushed mode, 0 otherwise.",
		},
		func() float64 {
			s.rushedMtx.Lock()
			defer s.rushedMtx.Unlock()
			if s.rushed {
				return 1
			}
			return 0
		},
	)
	s.persistenceUrgencyScore = prometheus.NewGaugeFunc(
		prometheus.GaugeOpts{
			Namespace: namespace,
			Subsystem: subsystem,
			Name:      "persistence_urgency_score",
			Help:      "A score of urgency to persist chunks, 0 is least urgent, 1 most.",
		},
		func() float64 {
			score, _ := s.getPersistenceUrgencyScore()
			return score
		},
	)
	s.targetHeapSizeBytes = prometheus.NewGaugeFunc(
		prometheus.GaugeOpts{
			Namespace: namespace,
			Subsystem: subsystem,
			Name:      "target_heap_size_bytes",
			Help:      "The configured target heap size in bytes.",
		},
		func() float64 {
			return float64(s.targetHeapSize)
		},
	)

	// Initialize metric vectors.
	// TODO(beorn7): Rework once we have a utility function for it in client_golang.
	s.discardedSamples.WithLabelValues(outOfOrderTimestamp)
	s.discardedSamples.WithLabelValues(duplicateSample)
	s.maintainSeriesDuration.WithLabelValues(maintainInMemory)
	s.maintainSeriesDuration.WithLabelValues(maintainArchived)
	s.seriesOps.WithLabelValues(create)
	s.seriesOps.WithLabelValues(archive)
	s.seriesOps.WithLabelValues(unarchive)
	s.seriesOps.WithLabelValues(memoryPurge)
	s.seriesOps.WithLabelValues(archivePurge)
	s.seriesOps.WithLabelValues(requestedPurge)
	s.seriesOps.WithLabelValues(memoryMaintenance)
	s.seriesOps.WithLabelValues(archiveMaintenance)
	s.seriesOps.WithLabelValues(completedQurantine)
	s.seriesOps.WithLabelValues(droppedQuarantine)
	s.seriesOps.WithLabelValues(failedQuarantine)

	return s
}