collector/service.go (397 lines of code) (raw):
package collector
import (
"context"
"crypto/tls"
"fmt"
"net"
"net/http/pprof"
_ "net/http/pprof"
"regexp"
"time"
"buf.build/gen/go/opentelemetry/opentelemetry/bufbuild/connect-go/opentelemetry/proto/collector/metrics/v1/metricsv1connect"
metricsHandler "github.com/Azure/adx-mon/collector/metrics"
"github.com/Azure/adx-mon/collector/otlp"
"github.com/Azure/adx-mon/ingestor/cluster"
"github.com/Azure/adx-mon/metrics"
"github.com/Azure/adx-mon/pkg/http"
"github.com/Azure/adx-mon/pkg/logger"
"github.com/Azure/adx-mon/pkg/prompb"
"github.com/Azure/adx-mon/pkg/remote"
"github.com/Azure/adx-mon/pkg/service"
"github.com/Azure/adx-mon/storage"
"github.com/Azure/adx-mon/transform"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
type Service struct {
opts *ServiceOpts
cancel context.CancelFunc
// metricsSvc is the internal metrics component for collector specific metrics.
metricsSvc metrics.Service
// workerSvcs are the collection services that can be opened and closed.
workerSvcs []service.Component
// httpServers is the shared HTTP servers for the collector. The logs and metrics services are registered with these servers.
httpServers []*http.HttpServer
// store is the local WAL store.
store storage.Store
// scraper is the metrics scraper that scrapes metrics from the local node.
scraper *Scraper
// httpHandlers are the write endpoints that receive from Prometheus and Otel clients over HTTP.
httpHandlers []*http.HttpHandler
// grpcHandlers are the write endpoints that receive calls over GRPC
grpcHandlers []*http.GRPCHandler
// batcher is the component that batches metrics and logs for transferring to ingestor.
batcher cluster.Batcher
// replicator is the component that replicates metrics and logs to the ingestor.
replicator service.Component
}
type ServiceOpts struct {
ListenAddr string
NodeName string
Endpoint string
// LogCollectionHandlers is the list of log collection handlers
LogCollectionHandlers []LogCollectorOpts
// HttpLogCollectionHandlers is the list of log collection handlers with HTTP endpoints
HttpLogCollectorOpts []HttpLogCollectorOpts
// PromMetricsHandlers is the list of prom-remote handlers
PromMetricsHandlers []PrometheusRemoteWriteHandlerOpts
// OtlpMetricsHandlers is the list of oltp metrics handlers
OtlpMetricsHandlers []OtlpMetricsHandlerOpts
// Scraper is the options for the prom scraper
Scraper *ScraperOpts
// Labels to lift to columns
LiftLabels []string
AddAttributes map[string]string
LiftAttributes []string
LiftResources []string
// InsecureSkipVerify skips the verification of the remote write endpoint certificate chain and host name.
InsecureSkipVerify bool
TLSCertFile string
TLSKeyFile string
// MaxBatchSize is the maximum number of samples to send in a single batch.
MaxBatchSize int
// MaxSegmentAge is the maximum time allowed before a segment is rolled over.
MaxSegmentAge time.Duration
// MaxSegmentSize is the maximum size allowed for a segment before it is rolled over.
MaxSegmentSize int64
// MaxDiskUsage is the max size in bytes to use for segment store. If this value is exceeded, writes
// will be rejected until space is freed. A value of 0 means no max usage.
MaxDiskUsage int64
// MaxSegmentCount is the maximum number of segments files allowed on disk before signaling back-pressure.
MaxSegmentCount int64
// StorageDir is the directory where the WAL will be stored
StorageDir string
// EnablePprof enables pprof endpoints.
EnablePprof bool
// Region is a location identifier
Region string
MaxConnections int
// WALFlushInterval is the interval at which the WAL segment is flushed to disk. A higher value results in
// better disk IO and less CPU usage but has a greater risk of data loss in the event of a crash. A lower
// value results in more frequent disk IO and higher CPU usage but less data loss in the event of a crash.
// The default is 100ms and the value must be greater than 0.
WALFlushInterval time.Duration
// MaxTransferConcurrency is the maximum number of concurrent transfers to the ingestor.
MaxTransferConcurrency int
// MaxBatchSegments is the maximum number of segments to include when transferring segments in a batch. The segments
// are merged into a new segment. A higher number takes longer to combine on the sending size and increases the
// size of segments on the receiving size. A lower number creates more batches and high remote transfer calls. If
// not specified, the default is 25.
MaxBatchSegments int
// DisableGzip disables gzip compression for the transfer endpoint.
DisableGzip bool
}
type OtlpMetricsHandlerOpts struct {
// Optional. Path is the path where the OTLP/HTTP handler will be registered.
Path string
// Optional. GrpcPort is the port where the metrics OTLP/GRPC handler will listen.
GrpcPort int
MetricOpts MetricsHandlerOpts
}
type PrometheusRemoteWriteHandlerOpts struct {
// Path is the path where the handler will be registered.
Path string
MetricOpts MetricsHandlerOpts
}
type MetricsHandlerOpts struct {
AddLabels map[string]string
// DropLabels is a map of metric names regexes to label name regexes. When both match, the label will be dropped.
DropLabels map[*regexp.Regexp]*regexp.Regexp
// DropMetrics is a slice of regexes that drops metrics when the metric name matches. The metric name format
// should match the Prometheus naming style before the metric is translated to a Kusto table name.
DropMetrics []*regexp.Regexp
KeepMetrics []*regexp.Regexp
KeepMetricsLabelValues map[*regexp.Regexp]*regexp.Regexp
// DisableMetricsForwarding disables the forwarding of metrics to the remote write endpoint.
DisableMetricsForwarding bool
DefaultDropMetrics bool
RemoteWriteClients []remote.RemoteWriteClient
}
func (o MetricsHandlerOpts) RequestTransformer() *transform.RequestTransformer {
return &transform.RequestTransformer{
AddLabels: o.AddLabels,
DropLabels: o.DropLabels,
DropMetrics: o.DropMetrics,
KeepMetrics: o.KeepMetrics,
KeepMetricsWithLabelValue: o.KeepMetricsLabelValues,
DefaultDropMetrics: o.DefaultDropMetrics,
}
}
func NewService(opts *ServiceOpts) (*Service, error) {
maxSegmentAge := 30 * time.Second
if opts.MaxSegmentAge.Seconds() > 0 {
maxSegmentAge = opts.MaxSegmentAge
}
maxSegmentSize := int64(1024 * 1024)
if opts.MaxSegmentSize > 0 {
maxSegmentSize = opts.MaxSegmentSize
}
maxSegmentCount := int64(10000)
if opts.MaxSegmentCount > 0 {
maxSegmentCount = opts.MaxSegmentCount
}
maxDiskUsage := int64(10 * 1024 * 1024 * 1024) // 10 GB
if opts.MaxDiskUsage > 0 {
maxDiskUsage = opts.MaxDiskUsage
}
health := cluster.NewHealth(cluster.HealthOpts{
UnhealthyTimeout: time.Minute,
MaxSegmentCount: maxSegmentCount,
MaxDiskUsage: maxDiskUsage,
})
store := storage.NewLocalStore(storage.StoreOpts{
StorageDir: opts.StorageDir,
SegmentMaxAge: maxSegmentAge,
SegmentMaxSize: maxSegmentSize,
MaxDiskUsage: opts.MaxDiskUsage,
LiftedLabels: opts.LiftLabels,
LiftedAttributes: opts.LiftAttributes,
LiftedResources: opts.LiftResources,
WALFlushInterval: opts.WALFlushInterval,
})
var httpHandlers []*http.HttpHandler
var grpcHandlers []*http.GRPCHandler
workerSvcs := []service.Component{}
for _, handlerOpts := range opts.PromMetricsHandlers {
metricsProxySvc := metricsHandler.NewHandler(metricsHandler.HandlerOpts{
Path: handlerOpts.Path,
RequestTransformer: handlerOpts.MetricOpts.RequestTransformer(),
RequestWriters: append(handlerOpts.MetricOpts.RemoteWriteClients, &StoreRequestWriter{store}),
HealthChecker: health,
})
httpHandlers = append(httpHandlers, &http.HttpHandler{
Path: handlerOpts.Path,
Handler: metricsProxySvc.HandleReceive,
})
}
for _, handlerOpts := range opts.OtlpMetricsHandlers {
writer := otlp.NewOltpMetricWriter(otlp.OltpMetricWriterOpts{
RequestTransformer: handlerOpts.MetricOpts.RequestTransformer(),
Clients: append(handlerOpts.MetricOpts.RemoteWriteClients, &StoreRemoteClient{store}),
MaxBatchSize: opts.MaxBatchSize,
DisableMetricsForwarding: handlerOpts.MetricOpts.DisableMetricsForwarding,
HealthChecker: health,
})
oltpMetricsService := otlp.NewMetricsService(writer, handlerOpts.Path, handlerOpts.GrpcPort)
if handlerOpts.Path != "" {
httpHandlers = append(httpHandlers, &http.HttpHandler{
Path: handlerOpts.Path,
Handler: oltpMetricsService.Handler,
})
}
if handlerOpts.GrpcPort > 0 {
path, handler := metricsv1connect.NewMetricsServiceHandler(oltpMetricsService)
grpcHandlers = append(grpcHandlers, &http.GRPCHandler{
Port: handlerOpts.GrpcPort,
Path: path,
Handler: handler,
})
}
}
var (
replicator service.Component
transferQueue chan *cluster.Batch
partitioner cluster.MetricPartitioner
)
if opts.Endpoint != "" {
// This is a static partitioner that forces all entries to be assigned to the remote endpoint.
partitioner = remotePartitioner{
host: "remote",
addr: opts.Endpoint,
}
r, err := cluster.NewReplicator(cluster.ReplicatorOpts{
Hostname: opts.NodeName,
Partitioner: partitioner,
Health: health,
SegmentRemover: store,
InsecureSkipVerify: opts.InsecureSkipVerify,
MaxTransferConcurrency: opts.MaxTransferConcurrency,
DisableGzip: opts.DisableGzip,
})
if err != nil {
return nil, fmt.Errorf("failed to create replicator: %w", err)
}
transferQueue = r.TransferQueue()
replicator = r
} else {
partitioner = remotePartitioner{
host: "remote",
addr: "http://remotehost:1234",
}
r := cluster.NewFakeReplicator()
transferQueue = r.TransferQueue()
replicator = r
}
batcher := cluster.NewBatcher(cluster.BatcherOpts{
StorageDir: opts.StorageDir,
MaxSegmentAge: time.Minute,
Partitioner: partitioner,
Segmenter: store.Index(),
MinUploadSize: 4 * 1024 * 1024,
MaxBatchSegments: opts.MaxBatchSegments,
UploadQueue: transferQueue,
TransferQueue: transferQueue,
PeerHealthReporter: health,
})
health.QueueSizer = batcher
var scraper *Scraper
if opts.Scraper != nil {
scraperOpts := opts.Scraper
scraperOpts.RemoteClients = append(scraperOpts.RemoteClients, &StoreRemoteClient{store})
scraperOpts.HealthChecker = health
scraper = NewScraper(opts.Scraper)
}
for _, handlerOpts := range opts.LogCollectionHandlers {
svc, err := handlerOpts.Create(store)
if err != nil {
return nil, fmt.Errorf("failed to create log collection service: %w", err)
}
workerSvcs = append(workerSvcs, svc)
}
for _, handlerOpts := range opts.HttpLogCollectorOpts {
svc, httpHandler, err := handlerOpts.CreateHTTPSvc(store, health)
if err != nil {
return nil, fmt.Errorf("failed to create log collection service: %w", err)
}
workerSvcs = append(workerSvcs, svc)
httpHandlers = append(httpHandlers, httpHandler)
}
svc := &Service{
opts: opts,
metricsSvc: metrics.NewService(metrics.ServiceOpts{
PeerHealthReport: health,
}),
store: store,
scraper: scraper,
workerSvcs: workerSvcs,
httpHandlers: httpHandlers,
grpcHandlers: grpcHandlers,
batcher: batcher,
replicator: replicator,
}
return svc, nil
}
func (s *Service) Open(ctx context.Context) error {
ctx, s.cancel = context.WithCancel(ctx)
if err := s.store.Open(ctx); err != nil {
return fmt.Errorf("failed to open wal store: %w", err)
}
if err := s.metricsSvc.Open(ctx); err != nil {
return fmt.Errorf("failed to open metrics service: %w", err)
}
for _, workerSvc := range s.workerSvcs {
if err := workerSvc.Open(ctx); err != nil {
return fmt.Errorf("failed to open worker service: %w", err)
}
}
if err := s.replicator.Open(ctx); err != nil {
return err
}
if err := s.batcher.Open(ctx); err != nil {
return err
}
if s.scraper != nil {
if err := s.scraper.Open(ctx); err != nil {
return err
}
}
listenerFunc := plaintextListenerFunc()
if s.opts.TLSCertFile != "" && s.opts.TLSKeyFile != "" {
logger.Infof("TLS enabled for listeners")
cert, err := tls.LoadX509KeyPair(s.opts.TLSCertFile, s.opts.TLSKeyFile)
if err != nil {
return fmt.Errorf("failed to load cert and key: %w", err)
}
listenerFunc = tlsListenerFunc(cert)
}
s.httpServers = []*http.HttpServer{}
listener, err := listenerFunc(s.opts.ListenAddr)
if err != nil {
return err
}
opts := &http.ServerOpts{
MaxConns: s.opts.MaxConnections,
WriteTimeout: 30 * time.Second,
Listener: listener,
}
primaryHttp := http.NewServer(opts)
primaryHttp.RegisterHandler("/metrics", promhttp.Handler())
if s.opts.EnablePprof {
opts.WriteTimeout = 60 * time.Second
primaryHttp.RegisterHandlerFunc("/debug/pprof/", pprof.Index)
primaryHttp.RegisterHandlerFunc("/debug/pprof/cmdline", pprof.Cmdline)
primaryHttp.RegisterHandlerFunc("/debug/pprof/profile", pprof.Profile)
primaryHttp.RegisterHandlerFunc("/debug/pprof/symbol", pprof.Symbol)
primaryHttp.RegisterHandlerFunc("/debug/pprof/trace", pprof.Trace)
}
for _, handler := range s.httpHandlers {
primaryHttp.RegisterHandlerFunc(handler.Path, handler.Handler)
}
s.httpServers = append(s.httpServers, primaryHttp)
for _, handler := range s.grpcHandlers {
listener, err := listenerFunc(fmt.Sprintf(":%d", handler.Port))
if err != nil {
return err
}
server := http.NewServer(&http.ServerOpts{
MaxConns: s.opts.MaxConnections,
Listener: listener,
})
server.RegisterHandler(handler.Path, handler.Handler)
s.httpServers = append(s.httpServers, server)
}
for _, httpServer := range s.httpServers {
if err := httpServer.Open(ctx); err != nil {
return err
}
logger.Infof("Started %s", httpServer)
}
go func() {
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
metrics.CollectorHealthCheck.WithLabelValues(s.opts.Region).Set(1)
}
}
}()
return nil
}
func (s *Service) Close() error {
if s.scraper != nil {
s.scraper.Close()
}
s.metricsSvc.Close()
for _, workerSvc := range s.workerSvcs {
workerSvc.Close()
}
s.cancel()
for _, httpServer := range s.httpServers {
httpServer.Close()
}
s.batcher.Close()
s.replicator.Close()
s.store.Close()
return nil
}
func tlsListenerFunc(cert tls.Certificate) func(addr string) (net.Listener, error) {
return func(addr string) (net.Listener, error) {
listener, err := tls.Listen("tcp", addr, &tls.Config{
Certificates: []tls.Certificate{cert},
})
if err != nil {
return listener, fmt.Errorf("failed to create listener: %w", err)
}
return listener, nil
}
}
func plaintextListenerFunc() func(addr string) (net.Listener, error) {
return func(addr string) (net.Listener, error) {
listener, err := net.Listen("tcp", addr)
if err != nil {
return listener, fmt.Errorf("failed to create listener: %w", err)
}
return listener, nil
}
}
// remotePartitioner is a Partitioner that always returns the same owner that forces a remove transfer.
type remotePartitioner struct {
host, addr string
}
func (f remotePartitioner) Owner(bytes []byte) (string, string) {
return f.host, f.addr
}
type StoreRequestWriter struct {
store storage.Store
}
func (s *StoreRequestWriter) Write(ctx context.Context, req *prompb.WriteRequest) error {
return s.store.WriteTimeSeries(ctx, req.Timeseries)
}
func (s *StoreRequestWriter) CloseIdleConnections() {
}
type StoreRemoteClient struct {
store storage.Store
}
func (s *StoreRemoteClient) Write(ctx context.Context, wr *prompb.WriteRequest) error {
return s.store.WriteTimeSeries(ctx, wr.Timeseries)
}
func (s *StoreRemoteClient) CloseIdleConnections() {
}