func()

in metrics/service.go [89:196]


func (s *service) collect(ctx context.Context) {
	t := time.NewTicker(time.Minute)
	defer t.Stop()

	var lastCount float64
	for {
		select {
		case <-ctx.Done():
			return
		case <-t.C:
			// This is only set when running on ingestor currently.
			if s.elector != nil {
				// Only one node should execute the cardinality counts so see if we are the owner.
				if s.elector.IsLeader() {
					if len(s.metricsKustoCli) > 0 {
						stmt := kusto.NewStmt("", kusto.UnsafeStmt(unsafe.Stmt{Add: true, SuppressWarning: true})).UnsafeAdd(
							".set-or-append async AdxmonIngestorTableCardinalityCount <| CountCardinality",
						)

						for _, cli := range s.metricsKustoCli {
							iter, err := cli.Mgmt(ctx, stmt)
							if err != nil {
								logger.Errorf("Failed to execute cardinality counts: %s", err)
							} else {
								iter.Stop()
							}
						}
					}

					if len(s.allKustoClis) > 0 {
						stmt := kusto.NewStmt("", kusto.UnsafeStmt(unsafe.Stmt{Add: true, SuppressWarning: true})).UnsafeAdd(
							".set-or-append async AdxmonIngestorTableDetails <| .show tables details" +
								"| extend PreciseTimeStamp = now() " +
								"| project PreciseTimeStamp, DatabaseName, TableName, TotalExtents, TotalExtentSize, TotalOriginalSize, TotalRowCount, HotExtents, HotExtentSize, HotOriginalSize," +
								"HotRowCount, RetentionPolicy, CachingPolicy, ShardingPolicy, MergePolicy, IngestionBatchingPolicy, MinExtentsCreationTime, MaxExtentsCreationTime, TableId",
						)

						for _, cli := range s.allKustoClis {
							iter, err := cli.Mgmt(ctx, stmt)
							if err != nil {
								logger.Errorf("Failed to execute table sizes: %s", err)
							} else {
								iter.Stop()
							}
						}
					}

				}
			}

			mets, err := prometheus.DefaultGatherer.Gather()
			if err != nil {
				logger.Errorf("Failed to gather metrics: %s", err)
				continue
			}

			var currentTotal, activeConns, droppedConns float64
			for _, v := range mets {
				switch *v.Type {
				case io_prometheus_client.MetricType_GAUGE:
					for _, vv := range v.Metric {
						if !strings.HasPrefix(v.GetName(), Namespace) {
							continue
						}

						if strings.Contains(v.GetName(), "ingestor_active_connections") {
							activeConns += vv.Gauge.GetValue()
						}
					}
				case io_prometheus_client.MetricType_COUNTER:
					for _, vv := range v.Metric {
						if !strings.HasPrefix(v.GetName(), Namespace) {
							continue
						}

						if strings.Contains(v.GetName(), "samples_stored_total") {
							currentTotal += vv.Counter.GetValue()
						} else if strings.Contains(v.GetName(), "ingestor_dropped_connections_total") {
							droppedConns += vv.Counter.GetValue()
						}
					}
				}
			}

			logger.Infof("Status IngestionSamplesPerSecond=%0.2f SamplesIngested=%d IsHealthy=%v "+
				"UploadQueueSize=%d TransferQueueSize=%d SegmentsTotal=%d SegmentsSize=%d UnhealthyReason=%s "+
				"ActiveConnections=%d DroppedConnections=%d MaxSegmentAgeSeconds=%0.2f",
				(currentTotal-lastCount)/60, uint64(currentTotal),
				s.health.IsHealthy(),
				s.health.UploadQueueSize(),
				s.health.TransferQueueSize(),
				s.health.SegmentsTotal(),
				s.health.SegmentsSize(),
				s.health.UnhealthyReason(),
				int(activeConns),
				int(droppedConns),
				s.health.MaxSegmentAge().Seconds())

			lastCount = currentTotal

			// Clear the gauges to prune old metrics that may not be collected anymore.
			IngestorSegmentsMaxAge.Reset()
			IngestorSegmentsSizeBytes.Reset()
			IngestorSegmentsTotal.Reset()
			LogsProxyUploaded.Reset()
		}
	}
}