in v1storage/storage.go [211:387]
func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) *MemorySeriesStorage {
s := &MemorySeriesStorage{
fpLocker: newFingerprintLocker(o.NumMutexes),
options: o,
loopStopping: make(chan struct{}),
loopStopped: make(chan struct{}),
logThrottlingStopped: make(chan struct{}),
throttled: make(chan struct{}, 1),
targetHeapSize: o.TargetHeapSize,
dropAfter: o.PersistenceRetentionPeriod,
headChunkTimeout: o.HeadChunkTimeout,
checkpointInterval: o.CheckpointInterval,
checkpointDirtySeriesLimit: o.CheckpointDirtySeriesLimit,
archiveHighWatermark: model.Now().Add(-o.HeadChunkTimeout),
evictList: list.New(),
evictRequests: make(chan chunk.EvictRequest, evictRequestsCap),
evictStopping: make(chan struct{}),
evictStopped: make(chan struct{}),
quarantineRequests: make(chan quarantineRequest, quarantineRequestsCap),
quarantineStopping: make(chan struct{}),
quarantineStopped: make(chan struct{}),
persistErrors: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "persist_errors_total",
Help: "The total number of errors while writing to the persistence layer.",
}),
queuedChunksToPersist: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "queued_chunks_to_persist_total",
Help: "The total number of chunks queued for persistence.",
}),
memorySeries: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "memory_series",
Help: "The current number of series in memory.",
}),
headChunks: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "open_head_chunks",
Help: "The current number of open head chunks.",
}),
dirtySeries: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "memory_dirty_series",
Help: "The current number of series that would require a disk seek during crash recovery.",
}),
seriesOps: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "series_ops_total",
Help: "The total number of series operations by their type.",
},
[]string{opTypeLabel},
),
ingestedSamples: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "ingested_samples_total",
Help: "The total number of samples ingested.",
}),
discardedSamples: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "out_of_order_samples_total",
Help: "The total number of samples that were discarded because their timestamps were at or before the last received sample for a series.",
},
[]string{discardReasonLabel},
),
nonExistentSeriesMatches: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "non_existent_series_matches_total",
Help: "How often a non-existent series was referred to during label matching or chunk preloading. This is an indication of outdated label indexes.",
}),
memChunks: prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "memory_chunks",
Help: "The current number of chunks in memory. The number does not include cloned chunks (i.e. chunks without a descriptor).",
},
func() float64 { return float64(atomic.LoadInt64(&chunk.NumMemChunks)) },
),
maintainSeriesDuration: prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "maintain_series_duration_seconds",
Help: "The duration in seconds it took to perform maintenance on a series.",
},
[]string{seriesLocationLabel},
),
}
s.chunksToPersist = prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "chunks_to_persist",
Help: "The current number of chunks waiting for persistence.",
},
func() float64 {
return float64(s.getNumChunksToPersist())
},
)
s.rushedMode = prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "rushed_mode",
Help: "1 if the storage is in rushed mode, 0 otherwise.",
},
func() float64 {
s.rushedMtx.Lock()
defer s.rushedMtx.Unlock()
if s.rushed {
return 1
}
return 0
},
)
s.persistenceUrgencyScore = prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "persistence_urgency_score",
Help: "A score of urgency to persist chunks, 0 is least urgent, 1 most.",
},
func() float64 {
score, _ := s.getPersistenceUrgencyScore()
return score
},
)
s.targetHeapSizeBytes = prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "target_heap_size_bytes",
Help: "The configured target heap size in bytes.",
},
func() float64 {
return float64(s.targetHeapSize)
},
)
// Initialize metric vectors.
// TODO(beorn7): Rework once we have a utility function for it in client_golang.
s.discardedSamples.WithLabelValues(outOfOrderTimestamp)
s.discardedSamples.WithLabelValues(duplicateSample)
s.maintainSeriesDuration.WithLabelValues(maintainInMemory)
s.maintainSeriesDuration.WithLabelValues(maintainArchived)
s.seriesOps.WithLabelValues(create)
s.seriesOps.WithLabelValues(archive)
s.seriesOps.WithLabelValues(unarchive)
s.seriesOps.WithLabelValues(memoryPurge)
s.seriesOps.WithLabelValues(archivePurge)
s.seriesOps.WithLabelValues(requestedPurge)
s.seriesOps.WithLabelValues(memoryMaintenance)
s.seriesOps.WithLabelValues(archiveMaintenance)
s.seriesOps.WithLabelValues(completedQurantine)
s.seriesOps.WithLabelValues(droppedQuarantine)
s.seriesOps.WithLabelValues(failedQuarantine)
return s
}