in metrics/service.go [89:196]
func (s *service) collect(ctx context.Context) {
t := time.NewTicker(time.Minute)
defer t.Stop()
var lastCount float64
for {
select {
case <-ctx.Done():
return
case <-t.C:
// This is only set when running on ingestor currently.
if s.elector != nil {
// Only one node should execute the cardinality counts so see if we are the owner.
if s.elector.IsLeader() {
if len(s.metricsKustoCli) > 0 {
stmt := kusto.NewStmt("", kusto.UnsafeStmt(unsafe.Stmt{Add: true, SuppressWarning: true})).UnsafeAdd(
".set-or-append async AdxmonIngestorTableCardinalityCount <| CountCardinality",
)
for _, cli := range s.metricsKustoCli {
iter, err := cli.Mgmt(ctx, stmt)
if err != nil {
logger.Errorf("Failed to execute cardinality counts: %s", err)
} else {
iter.Stop()
}
}
}
if len(s.allKustoClis) > 0 {
stmt := kusto.NewStmt("", kusto.UnsafeStmt(unsafe.Stmt{Add: true, SuppressWarning: true})).UnsafeAdd(
".set-or-append async AdxmonIngestorTableDetails <| .show tables details" +
"| extend PreciseTimeStamp = now() " +
"| project PreciseTimeStamp, DatabaseName, TableName, TotalExtents, TotalExtentSize, TotalOriginalSize, TotalRowCount, HotExtents, HotExtentSize, HotOriginalSize," +
"HotRowCount, RetentionPolicy, CachingPolicy, ShardingPolicy, MergePolicy, IngestionBatchingPolicy, MinExtentsCreationTime, MaxExtentsCreationTime, TableId",
)
for _, cli := range s.allKustoClis {
iter, err := cli.Mgmt(ctx, stmt)
if err != nil {
logger.Errorf("Failed to execute table sizes: %s", err)
} else {
iter.Stop()
}
}
}
}
}
mets, err := prometheus.DefaultGatherer.Gather()
if err != nil {
logger.Errorf("Failed to gather metrics: %s", err)
continue
}
var currentTotal, activeConns, droppedConns float64
for _, v := range mets {
switch *v.Type {
case io_prometheus_client.MetricType_GAUGE:
for _, vv := range v.Metric {
if !strings.HasPrefix(v.GetName(), Namespace) {
continue
}
if strings.Contains(v.GetName(), "ingestor_active_connections") {
activeConns += vv.Gauge.GetValue()
}
}
case io_prometheus_client.MetricType_COUNTER:
for _, vv := range v.Metric {
if !strings.HasPrefix(v.GetName(), Namespace) {
continue
}
if strings.Contains(v.GetName(), "samples_stored_total") {
currentTotal += vv.Counter.GetValue()
} else if strings.Contains(v.GetName(), "ingestor_dropped_connections_total") {
droppedConns += vv.Counter.GetValue()
}
}
}
}
logger.Infof("Status IngestionSamplesPerSecond=%0.2f SamplesIngested=%d IsHealthy=%v "+
"UploadQueueSize=%d TransferQueueSize=%d SegmentsTotal=%d SegmentsSize=%d UnhealthyReason=%s "+
"ActiveConnections=%d DroppedConnections=%d MaxSegmentAgeSeconds=%0.2f",
(currentTotal-lastCount)/60, uint64(currentTotal),
s.health.IsHealthy(),
s.health.UploadQueueSize(),
s.health.TransferQueueSize(),
s.health.SegmentsTotal(),
s.health.SegmentsSize(),
s.health.UnhealthyReason(),
int(activeConns),
int(droppedConns),
s.health.MaxSegmentAge().Seconds())
lastCount = currentTotal
// Clear the gauges to prune old metrics that may not be collected anymore.
IngestorSegmentsMaxAge.Reset()
IngestorSegmentsSizeBytes.Reset()
IngestorSegmentsTotal.Reset()
LogsProxyUploaded.Reset()
}
}
}