in cmd/collector/main.go [85:843]
func realMain(ctx *cli.Context) error {
logger.Infof("%s version:%s", os.Args[0], version.String())
runtime.MemProfileRate = 4096
runtime.SetBlockProfileRate(int(1 * time.Second))
runtime.SetMutexProfileFraction(1)
var cfg = config.DefaultConfig
configFile := ctx.String("config")
if configFile == "" {
logger.Fatalf("config file is required. Run `collector config` to generate a config file")
}
configBytes, err := os.ReadFile(configFile)
if err != nil {
return err
}
var fileConfig config.Config
if err := toml.Unmarshal(configBytes, &fileConfig); err != nil {
var derr *toml.DecodeError
if errors.As(err, &derr) {
fmt.Println(derr.String())
row, col := derr.Position()
fmt.Println("error occurred at row", row, "column", col)
}
return err
}
cfg = fileConfig
if err := cfg.Validate(); err != nil {
return err
}
hostname := ctx.String("hostname")
if hostname == "" {
var err error
hostname, err = os.Hostname()
if err != nil {
return fmt.Errorf("failed to get hostname: %w", err)
}
}
cfg.ReplaceVariable("$(HOSTNAME)", hostname)
var endpoint string
if cfg.Endpoint != "" {
endpoint = cfg.Endpoint
u, err := url.Parse(cfg.Endpoint)
if err != nil {
return fmt.Errorf("failed to parse endpoint %s: %w", endpoint, err)
}
if u.Scheme != "http" && u.Scheme != "https" {
return fmt.Errorf("endpoint %s must be http or https", endpoint)
}
logger.Infof("Using remote write endpoint %s", endpoint)
}
if cfg.StorageDir == "" {
logger.Fatalf("storage-dir is required")
} else {
logger.Infof("Using storage dir: %s", cfg.StorageDir)
}
sort.Slice(cfg.LiftLabels, func(i, j int) bool {
return cfg.LiftLabels[i].Name < cfg.LiftLabels[j].Name
})
defaultMapping := schema.DefaultMetricsMapping
var sortedLiftedLabels []string
for _, v := range cfg.LiftLabels {
sortedLiftedLabels = append(sortedLiftedLabels, v.Name)
if v.ColumnName != "" {
defaultMapping = defaultMapping.AddStringMapping(v.ColumnName)
continue
}
defaultMapping = defaultMapping.AddStringMapping(v.Name)
}
// Update the default mapping so pooled csv encoders can use the lifted columns
schema.DefaultMetricsMapping = defaultMapping
metricExporterCache := make(map[string]remote.RemoteWriteClient)
var informer *k8s.PodInformer
var scraperOpts *collector.ScraperOpts
if cfg.PrometheusScrape != nil {
informer, err = getInformer(cfg.Kubeconfig, hostname, informer)
if err != nil {
return fmt.Errorf("failed to get informer for prometheus scrape: %w", err)
}
addLabels := mergeMaps(cfg.AddLabels, cfg.PrometheusScrape.AddLabels)
addLabels["adxmon_database"] = cfg.PrometheusScrape.Database
var staticTargets []collector.ScrapeTarget
for _, target := range cfg.PrometheusScrape.StaticScrapeTarget {
if match, err := regexp.MatchString(target.HostRegex, hostname); err != nil {
return fmt.Errorf("failed to match hostname %s with regex %s: %w", hostname, target.HostRegex, err)
} else if !match {
continue
}
staticTargets = append(staticTargets, collector.ScrapeTarget{
Static: true,
Addr: target.URL,
Namespace: target.Namespace,
Pod: target.Pod,
Container: target.Container,
})
}
dropLabels := make(map[*regexp.Regexp]*regexp.Regexp)
for k, v := range mergeMaps(cfg.DropLabels, cfg.PrometheusScrape.DropLabels) {
metricRegex, err := regexp.Compile(k)
if err != nil {
logger.Fatalf("invalid metric regex: %s", err)
}
labelRegex, err := regexp.Compile(v)
if err != nil {
logger.Fatalf("invalid label regex: %s", err)
}
dropLabels[metricRegex] = labelRegex
}
dropMetrics := []*regexp.Regexp{}
for _, v := range unionSlice(cfg.DropMetrics, cfg.PrometheusScrape.DropMetrics) {
metricRegex, err := regexp.Compile(v)
if err != nil {
logger.Fatalf("invalid metric regex: %s", err)
}
dropMetrics = append(dropMetrics, metricRegex)
}
keepMetrics := []*regexp.Regexp{}
for _, v := range unionSlice(cfg.KeepMetrics, cfg.PrometheusScrape.KeepMetrics) {
metricRegex, err := regexp.Compile(v)
if err != nil {
logger.Fatalf("invalid metric regex: %s", err)
}
keepMetrics = append(keepMetrics, metricRegex)
}
keepMetricLabelValues := make(map[*regexp.Regexp]*regexp.Regexp)
for _, v := range append(cfg.KeepMetricsWithLabelValue, cfg.PrometheusScrape.KeepMetricsWithLabelValue...) {
lableRe, err := regexp.Compile(v.LabelRegex)
if err != nil {
logger.Fatalf("invalid metric regex: %s", err)
}
valueRe, err := regexp.Compile(v.ValueRegex)
if err != nil {
logger.Fatalf("invalid label regex: %s", err)
}
keepMetricLabelValues[lableRe] = valueRe
}
var defaultDropMetrics bool
if cfg.DefaultDropMetrics != nil {
defaultDropMetrics = *cfg.DefaultDropMetrics
}
if cfg.PrometheusScrape.DefaultDropMetrics != nil {
defaultDropMetrics = *cfg.PrometheusScrape.DefaultDropMetrics
}
remoteClients, err := getMetricsExporters(cfg.PrometheusScrape.Exporters, cfg.Exporters, metricExporterCache)
if err != nil {
return fmt.Errorf("prometheus scrape: %w", err)
}
scraperOpts = &collector.ScraperOpts{
NodeName: hostname,
PodInformer: informer,
Database: cfg.PrometheusScrape.Database,
AddLabels: addLabels,
DropLabels: dropLabels,
DropMetrics: dropMetrics,
KeepMetrics: keepMetrics,
KeepMetricsWithLabelValue: keepMetricLabelValues,
DefaultDropMetrics: defaultDropMetrics,
DisableMetricsForwarding: cfg.PrometheusScrape.DisableMetricsForwarding,
DisableDiscovery: cfg.PrometheusScrape.DisableDiscovery,
ScrapeInterval: time.Duration(cfg.PrometheusScrape.ScrapeIntervalSeconds) * time.Second,
ScrapeTimeout: time.Duration(cfg.PrometheusScrape.ScrapeTimeout) * time.Second,
Targets: staticTargets,
MaxBatchSize: cfg.MaxBatchSize,
RemoteClients: remoteClients,
}
}
// Add the global add attributes to the log config
addAttributes := cfg.AddAttributes
liftAttributes := cfg.LiftAttributes
sort.Slice(cfg.LiftResources, func(i, j int) bool {
return cfg.LiftResources[i].Name < cfg.LiftResources[j].Name
})
logsMapping := schema.DefaultLogsMapping
var sortedLiftedResources []string
for _, v := range cfg.LiftResources {
sortedLiftedResources = append(sortedLiftedResources, v.Name)
if v.ColumnName != "" {
logsMapping = logsMapping.AddStringMapping(v.ColumnName)
continue
}
logsMapping = logsMapping.AddStringMapping(v.Name)
}
// Update the default mapping so pooled csv encoders can use the lifted columns
schema.DefaultLogsMapping = logsMapping
if cfg.OtelLog != nil {
addAttributes = mergeMaps(addAttributes, cfg.OtelLog.AddAttributes)
liftAttributes = unionSlice(liftAttributes, cfg.OtelLog.LiftAttributes)
}
opts := &collector.ServiceOpts{
EnablePprof: cfg.EnablePprof,
Scraper: scraperOpts,
ListenAddr: cfg.ListenAddr,
NodeName: hostname,
Endpoint: endpoint,
DisableGzip: cfg.DisableGzip,
LiftLabels: sortedLiftedLabels,
AddAttributes: addAttributes,
LiftAttributes: liftAttributes,
LiftResources: sortedLiftedResources,
InsecureSkipVerify: cfg.InsecureSkipVerify,
TLSCertFile: cfg.TLSCertFile,
TLSKeyFile: cfg.TLSKeyFile,
MaxConnections: cfg.MaxConnections,
MaxBatchSize: cfg.MaxBatchSize,
MaxSegmentAge: time.Duration(cfg.MaxSegmentAgeSeconds) * time.Second,
MaxSegmentSize: cfg.MaxSegmentSize,
MaxDiskUsage: cfg.MaxDiskUsage,
MaxTransferConcurrency: cfg.MaxTransferConcurrency,
WALFlushInterval: time.Duration(cfg.WALFlushIntervalMilliSeconds) * time.Millisecond,
Region: cfg.Region,
StorageDir: cfg.StorageDir,
}
for _, v := range cfg.PrometheusRemoteWrite {
// Add this pods identity for all metrics received
addLabels := mergeMaps(cfg.AddLabels, map[string]string{
"adxmon_namespace": k8s.Instance.Namespace,
"adxmon_pod": k8s.Instance.Pod,
"adxmon_container": k8s.Instance.Container,
"adxmon_database": v.Database,
})
dropLabels := make(map[*regexp.Regexp]*regexp.Regexp)
for k, v := range mergeMaps(cfg.DropLabels, v.DropLabels) {
metricRegex, err := regexp.Compile(k)
if err != nil {
logger.Fatalf("invalid metric regex: %s", err)
}
labelRegex, err := regexp.Compile(v)
if err != nil {
logger.Fatalf("invalid label regex: %s", err)
}
dropLabels[metricRegex] = labelRegex
}
dropMetrics := []*regexp.Regexp{}
for _, v := range unionSlice(cfg.DropMetrics, v.DropMetrics) {
metricRegex, err := regexp.Compile(v)
if err != nil {
logger.Fatalf("invalid metric regex: %s", err)
}
dropMetrics = append(dropMetrics, metricRegex)
}
keepMetrics := []*regexp.Regexp{}
for _, v := range unionSlice(cfg.KeepMetrics, v.KeepMetrics) {
metricRegex, err := regexp.Compile(v)
if err != nil {
logger.Fatalf("invalid metric regex: %s", err)
}
keepMetrics = append(keepMetrics, metricRegex)
}
keepMetricLabelValues := make(map[*regexp.Regexp]*regexp.Regexp)
for _, v := range append(cfg.KeepMetricsWithLabelValue, v.KeepMetricsWithLabelValue...) {
labelRe, err := regexp.Compile(v.LabelRegex)
if err != nil {
logger.Fatalf("invalid metric regex: %s", err)
}
valueRe, err := regexp.Compile(v.ValueRegex)
if err != nil {
logger.Fatalf("invalid label regex: %s", err)
}
keepMetricLabelValues[labelRe] = valueRe
}
var defaultDropMetrics bool
if cfg.DefaultDropMetrics != nil {
defaultDropMetrics = *cfg.DefaultDropMetrics
}
if v.DefaultDropMetrics != nil {
defaultDropMetrics = *v.DefaultDropMetrics
}
var disableMetricsForwarding bool
if v.DisableMetricsForwarding != nil {
disableMetricsForwarding = *v.DisableMetricsForwarding
}
remoteWriteClients, err := getMetricsExporters(v.Exporters, cfg.Exporters, metricExporterCache)
if err != nil {
return fmt.Errorf("prometheus remote write: %w", err)
}
opts.PromMetricsHandlers = append(opts.PromMetricsHandlers, collector.PrometheusRemoteWriteHandlerOpts{
Path: v.Path,
MetricOpts: collector.MetricsHandlerOpts{
DefaultDropMetrics: defaultDropMetrics,
AddLabels: addLabels,
DropMetrics: dropMetrics,
DropLabels: dropLabels,
KeepMetrics: keepMetrics,
KeepMetricsLabelValues: keepMetricLabelValues,
DisableMetricsForwarding: disableMetricsForwarding,
RemoteWriteClients: remoteWriteClients,
},
})
}
for _, v := range cfg.OtelMetric {
// Add this pods identity for all metrics received
addLabels := mergeMaps(cfg.AddLabels, v.AddLabels, map[string]string{
"adxmon_namespace": k8s.Instance.Namespace,
"adxmon_pod": k8s.Instance.Pod,
"adxmon_container": k8s.Instance.Container,
"adxmon_database": v.Database,
})
dropLabels := make(map[*regexp.Regexp]*regexp.Regexp)
for k, v := range mergeMaps(cfg.DropLabels, v.DropLabels) {
metricRegex, err := regexp.Compile(k)
if err != nil {
logger.Fatalf("invalid metric regex: %s", err)
}
labelRegex, err := regexp.Compile(v)
if err != nil {
logger.Fatalf("invalid label regex: %s", err)
}
dropLabels[metricRegex] = labelRegex
}
dropMetrics := []*regexp.Regexp{}
for _, v := range unionSlice(cfg.DropMetrics, v.DropMetrics) {
metricRegex, err := regexp.Compile(v)
if err != nil {
logger.Fatalf("invalid metric regex: %s", err)
}
dropMetrics = append(dropMetrics, metricRegex)
}
keepMetrics := []*regexp.Regexp{}
for _, v := range unionSlice(cfg.KeepMetrics, v.KeepMetrics) {
metricRegex, err := regexp.Compile(v)
if err != nil {
logger.Fatalf("invalid metric regex: %s", err)
}
keepMetrics = append(keepMetrics, metricRegex)
}
keepMetricLabelValues := make(map[*regexp.Regexp]*regexp.Regexp)
for _, v := range append(cfg.KeepMetricsWithLabelValue, v.KeepMetricsWithLabelValue...) {
labelRe, err := regexp.Compile(v.LabelRegex)
if err != nil {
logger.Fatalf("invalid metric regex: %s", err)
}
valueRe, err := regexp.Compile(v.ValueRegex)
if err != nil {
logger.Fatalf("invalid label regex: %s", err)
}
keepMetricLabelValues[labelRe] = valueRe
}
var defaultDropMetrics bool
if cfg.DefaultDropMetrics != nil {
defaultDropMetrics = *cfg.DefaultDropMetrics
}
if v.DefaultDropMetrics != nil {
defaultDropMetrics = *v.DefaultDropMetrics
}
var disableMetricsForwarding bool
if v.DisableMetricsForwarding != nil {
disableMetricsForwarding = *v.DisableMetricsForwarding
}
remoteWriteClients, err := getMetricsExporters(v.Exporters, cfg.Exporters, metricExporterCache)
if err != nil {
return fmt.Errorf("otel metric: %w", err)
}
opts.OtlpMetricsHandlers = append(opts.OtlpMetricsHandlers, collector.OtlpMetricsHandlerOpts{
Path: v.Path,
GrpcPort: v.GrpcPort,
MetricOpts: collector.MetricsHandlerOpts{
DefaultDropMetrics: defaultDropMetrics,
AddLabels: addLabels,
DropMetrics: dropMetrics,
DropLabels: dropLabels,
KeepMetrics: keepMetrics,
KeepMetricsLabelValues: keepMetricLabelValues,
DisableMetricsForwarding: disableMetricsForwarding,
RemoteWriteClients: remoteWriteClients,
},
})
}
if cfg.OtelLog != nil {
v := cfg.OtelLog
addAttributes := mergeMaps(cfg.AddAttributes, v.AddAttributes)
createHttpFunc := func(store storage.Store, health *cluster.Health) (*logs.Service, *http.HttpHandler, error) {
transformers := []types.Transformer{}
if v.Transforms != nil {
for _, t := range v.Transforms {
transform, err := transforms.NewTransform(t.Name, t.Config)
if err != nil {
return nil, nil, fmt.Errorf("create transform: %w", err)
}
transformers = append(transformers, transform)
}
}
if len(addAttributes) > 0 {
transformers = append(transformers, addattributes.NewTransform(addattributes.Config{
ResourceValues: addAttributes,
}))
}
sink, err := sinks.NewStoreSink(sinks.StoreSinkConfig{
Store: store,
})
if err != nil {
return nil, nil, fmt.Errorf("failed to create store sink: %w", err)
}
sinks := []types.Sink{sink}
for _, exporterName := range v.Exporters {
sink, err := config.GetLogsExporter(exporterName, cfg.Exporters)
if err != nil {
return nil, nil, fmt.Errorf("failed to get exporter %s: %w", exporterName, err)
}
sinks = append(sinks, sink)
}
logsSvc := otlp.NewLogsService(otlp.LogsServiceOpts{
WorkerCreator: engine.WorkerCreator(transformers, sinks),
HealthChecker: health,
})
workerSvc := &logs.Service{
Source: logsSvc,
Transforms: transformers,
Sinks: sinks,
}
httpHandler := &http.HttpHandler{
Path: "/v1/logs",
Handler: logsSvc.Handler,
}
return workerSvc, httpHandler, nil
}
opts.HttpLogCollectorOpts = append(opts.HttpLogCollectorOpts, collector.HttpLogCollectorOpts{
CreateHTTPSvc: createHttpFunc,
})
}
for _, v := range cfg.HostLog {
tailSourceConfig := tail.TailSourceConfig{}
if !v.DisableKubeDiscovery {
informer, err = getInformer(cfg.Kubeconfig, hostname, informer)
if err != nil {
return fmt.Errorf("failed to get informer for tail: %w", err)
}
var staticPodTargets []*tail.StaticPodTargets
for _, target := range v.StaticPodTargets {
staticPodTargets = append(staticPodTargets, &tail.StaticPodTargets{
Namespace: target.Namespace,
Name: target.Name,
Labels: target.LabelTargets,
Parsers: target.Parsers,
Destination: target.Destination,
})
}
tailSourceConfig.PodDiscoveryOpts = &tail.PodDiscoveryOpts{
NodeName: hostname,
PodInformer: informer,
StaticPodTargets: staticPodTargets,
}
}
createFunc := func(store storage.Store) (*logs.Service, error) {
addAttributes := mergeMaps(cfg.AddLabels, v.AddAttributes, map[string]string{
"adxmon_namespace": k8s.Instance.Namespace,
"adxmon_pod": k8s.Instance.Pod,
"adxmon_container": k8s.Instance.Container,
})
staticTargets := []tail.FileTailTarget{}
for _, target := range v.StaticFileTargets {
staticTargets = append(staticTargets, tail.FileTailTarget{
FilePath: target.FilePath,
LogType: target.LogType,
Database: target.Database,
Table: target.Table,
Parsers: target.Parsers,
})
}
transformers := []types.Transformer{}
for _, t := range v.Transforms {
transform, err := transforms.NewTransform(t.Name, t.Config)
if err != nil {
return nil, fmt.Errorf("create transform: %w", err)
}
transformers = append(transformers, transform)
}
if len(addAttributes) > 0 {
transformers = append(transformers, addattributes.NewTransform(addattributes.Config{
ResourceValues: addAttributes,
}))
}
sink, err := sinks.NewStoreSink(sinks.StoreSinkConfig{
Store: store,
})
if err != nil {
return nil, fmt.Errorf("create sink for tailsource: %w", err)
}
tailSourceConfig.StaticTargets = staticTargets
tailSourceConfig.CursorDirectory = cfg.StorageDir
sinks := []types.Sink{sink}
for _, exporterName := range v.Exporters {
sink, err := config.GetLogsExporter(exporterName, cfg.Exporters)
if err != nil {
return nil, fmt.Errorf("failed to get exporter %s: %w", exporterName, err)
}
sinks = append(sinks, sink)
}
workerCreator := engine.WorkerCreator(
transformers,
sinks,
)
tailSourceConfig.WorkerCreator = workerCreator
source, err := tail.NewTailSource(tailSourceConfig)
if err != nil {
return nil, fmt.Errorf("create tailsource: %w", err)
}
return &logs.Service{
Source: source,
Transforms: transformers,
Sinks: sinks,
}, nil
}
opts.LogCollectionHandlers = append(opts.LogCollectionHandlers, collector.LogCollectorOpts{
Create: createFunc,
})
if len(v.KernelTargets) > 0 {
kernelCreateFunction := func(store storage.Store) (*logs.Service, error) {
addAttributes := mergeMaps(cfg.AddLabels, v.AddAttributes)
sink, err := sinks.NewStoreSink(sinks.StoreSinkConfig{
Store: store,
})
if err != nil {
return nil, fmt.Errorf("create sink for tailsource: %w", err)
}
transformers := []types.Transformer{}
for _, t := range v.Transforms {
transform, err := transforms.NewTransform(t.Name, t.Config)
if err != nil {
return nil, fmt.Errorf("create transform: %w", err)
}
transformers = append(transformers, transform)
}
attributeTransform := addattributes.NewTransform(addattributes.Config{
ResourceValues: addAttributes,
})
transformers = append(transformers, attributeTransform)
var targets []kernel.KernelTargetConfig
for _, target := range v.KernelTargets {
targets = append(targets, kernel.KernelTargetConfig{
Database: target.Database,
Table: target.Table,
PriorityFilter: target.Priority,
})
}
sinks := []types.Sink{sink}
for _, exporterName := range v.Exporters {
sink, err := config.GetLogsExporter(exporterName, cfg.Exporters)
if err != nil {
return nil, fmt.Errorf("failed to get exporter %s: %w", exporterName, err)
}
sinks = append(sinks, sink)
}
kernelSourceConfig := kernel.KernelSourceConfig{
WorkerCreator: engine.WorkerCreator(transformers, sinks),
CursorDirectory: cfg.StorageDir,
Targets: targets,
}
source, err := kernel.NewKernelSource(kernelSourceConfig)
if err != nil {
return nil, fmt.Errorf("create kernel source: %w", err)
}
return &logs.Service{
Source: source,
Transforms: transformers,
Sinks: sinks,
}, nil
}
opts.LogCollectionHandlers = append(opts.LogCollectionHandlers, collector.LogCollectorOpts{
Create: kernelCreateFunction,
})
}
if len(v.JournalTargets) > 0 {
journalCreateFunction := func(store storage.Store) (*logs.Service, error) {
addAttributes := mergeMaps(cfg.AddLabels, v.AddAttributes)
// TODO - combine these with shared file tail code
transformers := []types.Transformer{}
for _, t := range v.Transforms {
transform, err := transforms.NewTransform(t.Name, t.Config)
if err != nil {
return nil, fmt.Errorf("create transform: %w", err)
}
transformers = append(transformers, transform)
}
if len(addAttributes) > 0 {
transformers = append(transformers, addattributes.NewTransform(addattributes.Config{
ResourceValues: addAttributes,
}))
}
sink, err := sinks.NewStoreSink(sinks.StoreSinkConfig{
Store: store,
})
if err != nil {
return nil, fmt.Errorf("create sink for tailsource: %w", err)
}
targets := make([]journal.JournalTargetConfig, 0, len(v.JournalTargets))
for _, target := range v.JournalTargets {
parsers := parser.NewParsers(target.Parsers, "journal")
targets = append(targets, journal.JournalTargetConfig{
Matches: target.Matches,
Database: target.Database,
Table: target.Table,
LogLineParsers: parsers,
JournalFields: target.JournalFields,
})
}
sinks := []types.Sink{sink}
for _, exporterName := range v.Exporters {
sink, err := config.GetLogsExporter(exporterName, cfg.Exporters)
if err != nil {
return nil, fmt.Errorf("failed to get exporter %s: %w", exporterName, err)
}
sinks = append(sinks, sink)
}
journalConfig := journal.SourceConfig{
Targets: targets,
CursorDirectory: cfg.StorageDir,
WorkerCreator: engine.WorkerCreator(transformers, sinks),
}
source := journal.New(journalConfig)
return &logs.Service{
Source: source,
Transforms: transformers,
Sinks: sinks,
}, nil
}
opts.LogCollectionHandlers = append(opts.LogCollectionHandlers, collector.LogCollectorOpts{
Create: journalCreateFunction,
})
}
}
svcCtx, cancel := context.WithCancel(context.Background())
defer cancel()
svc, err := collector.NewService(opts)
if err != nil {
return err
}
if err := svc.Open(svcCtx); err != nil {
return err
}
sc := make(chan os.Signal, 1)
signal.Notify(sc, os.Interrupt, syscall.SIGTERM)
go func() {
sig := <-sc
cancel()
logger.Infof("Received signal %s, exiting...", sig.String())
// Shutdown the server and cancel context
err := svc.Close()
if err != nil {
logger.Errorf(err.Error())
}
}()
<-svcCtx.Done()
return nil
}