metrics/service.go (162 lines of code) (raw):
package metrics
import (
"context"
"strings"
"time"
"github.com/Azure/adx-mon/pkg/logger"
"github.com/Azure/adx-mon/pkg/prompb"
srv "github.com/Azure/adx-mon/pkg/service"
"github.com/Azure/azure-kusto-go/kusto"
"github.com/Azure/azure-kusto-go/kusto/unsafe"
"github.com/prometheus/client_golang/prometheus"
io_prometheus_client "github.com/prometheus/client_model/go"
)
type HealthReporter interface {
IsHealthy() bool
TransferQueueSize() int
UploadQueueSize() int
SegmentsTotal() int64
SegmentsSize() int64
UnhealthyReason() string
MaxSegmentAge() time.Duration
}
type TimeSeriesWriter interface {
Write(ctx context.Context, wr prompb.WriteRequest) error
}
type StatementExecutor interface {
Database() string
Endpoint() string
Mgmt(ctx context.Context, query kusto.Statement, options ...kusto.MgmtOption) (*kusto.RowIterator, error)
}
type Elector interface {
IsLeader() bool
}
type Service interface {
srv.Component
}
type ServiceOpts struct {
Hostname string
Elector Elector
// MetricsKustoCli is the Kusto clients for metrics clusters.
MetricsKustoCli []StatementExecutor
// KustoCli is the Kusto clients for all clusters.
KustoCli []StatementExecutor
PeerHealthReport HealthReporter
}
// Service manages the collection of metrics for ingestors.
type service struct {
closeFn context.CancelFunc
hostname string
elector Elector
metricsKustoCli []StatementExecutor
allKustoClis []StatementExecutor
health HealthReporter
}
func NewService(opts ServiceOpts) Service {
return &service{
elector: opts.Elector,
metricsKustoCli: opts.MetricsKustoCli,
allKustoClis: opts.KustoCli,
hostname: opts.Hostname,
health: opts.PeerHealthReport,
}
}
func (s *service) Open(ctx context.Context) error {
ctx, s.closeFn = context.WithCancel(ctx)
go s.collect(ctx)
return nil
}
func (s *service) Close() error {
s.closeFn()
return nil
}
func (s *service) collect(ctx context.Context) {
t := time.NewTicker(time.Minute)
defer t.Stop()
var lastCount float64
for {
select {
case <-ctx.Done():
return
case <-t.C:
// This is only set when running on ingestor currently.
if s.elector != nil {
// Only one node should execute the cardinality counts so see if we are the owner.
if s.elector.IsLeader() {
if len(s.metricsKustoCli) > 0 {
stmt := kusto.NewStmt("", kusto.UnsafeStmt(unsafe.Stmt{Add: true, SuppressWarning: true})).UnsafeAdd(
".set-or-append async AdxmonIngestorTableCardinalityCount <| CountCardinality",
)
for _, cli := range s.metricsKustoCli {
iter, err := cli.Mgmt(ctx, stmt)
if err != nil {
logger.Errorf("Failed to execute cardinality counts: %s", err)
} else {
iter.Stop()
}
}
}
if len(s.allKustoClis) > 0 {
stmt := kusto.NewStmt("", kusto.UnsafeStmt(unsafe.Stmt{Add: true, SuppressWarning: true})).UnsafeAdd(
".set-or-append async AdxmonIngestorTableDetails <| .show tables details" +
"| extend PreciseTimeStamp = now() " +
"| project PreciseTimeStamp, DatabaseName, TableName, TotalExtents, TotalExtentSize, TotalOriginalSize, TotalRowCount, HotExtents, HotExtentSize, HotOriginalSize," +
"HotRowCount, RetentionPolicy, CachingPolicy, ShardingPolicy, MergePolicy, IngestionBatchingPolicy, MinExtentsCreationTime, MaxExtentsCreationTime, TableId",
)
for _, cli := range s.allKustoClis {
iter, err := cli.Mgmt(ctx, stmt)
if err != nil {
logger.Errorf("Failed to execute table sizes: %s", err)
} else {
iter.Stop()
}
}
}
}
}
mets, err := prometheus.DefaultGatherer.Gather()
if err != nil {
logger.Errorf("Failed to gather metrics: %s", err)
continue
}
var currentTotal, activeConns, droppedConns float64
for _, v := range mets {
switch *v.Type {
case io_prometheus_client.MetricType_GAUGE:
for _, vv := range v.Metric {
if !strings.HasPrefix(v.GetName(), Namespace) {
continue
}
if strings.Contains(v.GetName(), "ingestor_active_connections") {
activeConns += vv.Gauge.GetValue()
}
}
case io_prometheus_client.MetricType_COUNTER:
for _, vv := range v.Metric {
if !strings.HasPrefix(v.GetName(), Namespace) {
continue
}
if strings.Contains(v.GetName(), "samples_stored_total") {
currentTotal += vv.Counter.GetValue()
} else if strings.Contains(v.GetName(), "ingestor_dropped_connections_total") {
droppedConns += vv.Counter.GetValue()
}
}
}
}
logger.Infof("Status IngestionSamplesPerSecond=%0.2f SamplesIngested=%d IsHealthy=%v "+
"UploadQueueSize=%d TransferQueueSize=%d SegmentsTotal=%d SegmentsSize=%d UnhealthyReason=%s "+
"ActiveConnections=%d DroppedConnections=%d MaxSegmentAgeSeconds=%0.2f",
(currentTotal-lastCount)/60, uint64(currentTotal),
s.health.IsHealthy(),
s.health.UploadQueueSize(),
s.health.TransferQueueSize(),
s.health.SegmentsTotal(),
s.health.SegmentsSize(),
s.health.UnhealthyReason(),
int(activeConns),
int(droppedConns),
s.health.MaxSegmentAge().Seconds())
lastCount = currentTotal
// Clear the gauges to prune old metrics that may not be collected anymore.
IngestorSegmentsMaxAge.Reset()
IngestorSegmentsSizeBytes.Reset()
IngestorSegmentsTotal.Reset()
LogsProxyUploaded.Reset()
}
}
}