confgenerator/confgenerator.go (357 lines of code) (raw):
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package confgenerator represents the Ops Agent configuration and provides functions to generate subagents configuration from unified agent.
package confgenerator
import (
"context"
"crypto/md5"
"encoding/hex"
"fmt"
"log"
"maps"
"path"
"regexp"
"sort"
"strconv"
"strings"
"github.com/GoogleCloudPlatform/ops-agent/confgenerator/fluentbit"
"github.com/GoogleCloudPlatform/ops-agent/confgenerator/otel"
"github.com/GoogleCloudPlatform/ops-agent/confgenerator/resourcedetector"
"github.com/GoogleCloudPlatform/ops-agent/internal/platform"
)
func googleCloudExporter(userAgent string, instrumentationLabels bool) otel.Component {
return otel.Component{
Type: "googlecloud",
Config: map[string]interface{}{
"user_agent": userAgent,
"metric": map[string]interface{}{
// Receivers are responsible for sending fully-qualified metric names.
// NB: If a receiver fails to send a full URL, OT will add the prefix `workload.googleapis.com/{metric_name}`.
// TODO(b/197129428): Write a test to make sure this doesn't happen.
"prefix": "",
// OT calls CreateMetricDescriptor by default. Skip because we want
// descriptors to be created implicitly with new time series.
"skip_create_descriptor": true,
// Omit instrumentation labels, which break agent metrics.
"instrumentation_library_labels": instrumentationLabels,
// Omit service labels, which break agent metrics.
// TODO: Enable with instrumentationLabels when values are sane.
"service_resource_labels": false,
"resource_filters": []map[string]interface{}{},
},
},
}
}
func googleManagedPrometheusExporter(userAgent string) otel.Component {
return otel.Component{
Type: "googlemanagedprometheus",
Config: map[string]interface{}{
"user_agent": userAgent,
// The exporter has the config option addMetricSuffixes with default value true. It will add Prometheus
// style suffixes to metric names, e.g., `_total` for a counter; set to false to collect metrics as is
"metric": map[string]interface{}{
"add_metric_suffixes": false,
},
},
}
}
func (uc *UnifiedConfig) getOTelLogLevel() string {
logLevel := "info"
if uc.Metrics != nil && uc.Metrics.Service != nil && uc.Metrics.Service.LogLevel != "" {
logLevel = uc.Metrics.Service.LogLevel
}
return logLevel
}
func (uc *UnifiedConfig) GenerateOtelConfig(ctx context.Context, outDir string) (string, error) {
p := platform.FromContext(ctx)
userAgent, _ := p.UserAgent("Google-Cloud-Ops-Agent-Metrics")
metricVersionLabel, _ := p.VersionLabel("google-cloud-ops-agent-metrics")
loggingVersionLabel, _ := p.VersionLabel("google-cloud-ops-agent-logging")
receiverPipelines, pipelines, err := uc.generateOtelPipelines(ctx)
if err != nil {
return "", err
}
receiverPipelines["otel"] = AgentSelfMetrics{
Version: metricVersionLabel,
Port: otel.MetricsPort,
}.MetricsSubmodulePipeline()
pipelines["otel"] = otel.Pipeline{
Type: "metrics",
ReceiverPipelineName: "otel",
}
receiverPipelines["ops_agent"] = OpsAgentSelfMetricsPipeline(ctx, outDir)
pipelines["ops_agent"] = otel.Pipeline{
Type: "metrics",
ReceiverPipelineName: "ops_agent",
}
receiverPipelines["fluentbit"] = AgentSelfMetrics{
Version: loggingVersionLabel,
Port: fluentbit.MetricsPort,
}.LoggingSubmodulePipeline()
pipelines["fluentbit"] = otel.Pipeline{
Type: "metrics",
ReceiverPipelineName: "fluentbit",
}
otelConfig, err := otel.ModularConfig{
LogLevel: uc.getOTelLogLevel(),
ReceiverPipelines: receiverPipelines,
Pipelines: pipelines,
Exporters: map[otel.ExporterType]otel.Component{
otel.System: googleCloudExporter(userAgent, false),
otel.OTel: googleCloudExporter(userAgent, true),
otel.GMP: googleManagedPrometheusExporter(userAgent),
},
}.Generate(ctx)
if err != nil {
return "", err
}
return otelConfig, nil
}
func (p pipelineInstance) fluentBitComponents(ctx context.Context) (fbSource, error) {
receiver, ok := p.receiver.(LoggingReceiver)
if !ok {
return fbSource{}, fmt.Errorf("%q is not a logging receiver", p.rID)
}
tag := fmt.Sprintf("%s.%s", p.pID, p.rID)
// For fluent_forward we create the tag in the following format:
// <hash_string>.<pipeline_id>.<receiver_id>.<existing_tag>
//
// hash_string: Deterministic unique identifier for the pipeline_id + receiver_id.
// This is needed to prevent collisions between receivers in the same
// pipeline when using the glob syntax for matching (using wildcards).
// pipeline_id: User defined pipeline_id but with the "." replaced with "_"
// since the "." character is reserved to be used as a delimiter in the
// Lua script.
// receiver_id: User defined receiver_id but with the "." replaced with "_"
// since the "." character is reserved to be used as a delimiter in the
// Lua script.
// existing_tag: Tag associated with the record prior to ingesting.
//
// For an example testing collisions in receiver_ids, see:
//
// testdata/valid/linux/logging-receiver_forward_multiple_receivers_conflicting_id
if receiver.Type() == "fluent_forward" {
hashString := getMD5Hash(tag)
// Note that we only update the tag for the tag. The LogName will still
// use the user defined receiver_id without this replacement.
pipelineIdCleaned := strings.ReplaceAll(p.pID, ".", "_")
receiverIdCleaned := strings.ReplaceAll(p.rID, ".", "_")
tag = fmt.Sprintf("%s.%s.%s", hashString, pipelineIdCleaned, receiverIdCleaned)
}
var components []fluentbit.Component
receiverComponents := receiver.Components(ctx, tag)
components = append(components, receiverComponents...)
// To match on fluent_forward records, we need to account for the addition
// of the existing tag (unknown during config generation) as the suffix
// of the tag.
globSuffix := ""
regexSuffix := ""
if receiver.Type() == "fluent_forward" {
regexSuffix = `\..*`
globSuffix = `.*`
}
tagRegex := regexp.QuoteMeta(tag) + regexSuffix
tag = tag + globSuffix
for i, processorItem := range p.processors {
processor, ok := processorItem.Component.(LoggingProcessor)
if !ok {
return fbSource{}, fmt.Errorf("logging processor %q is incompatible with a receiver of type %q", processorItem.id, receiver.Type())
}
processorComponents := processor.Components(ctx, tag, strconv.Itoa(i))
if err := processUserDefinedMultilineParser(i, processorItem.id, receiver, processor, receiverComponents, processorComponents); err != nil {
return fbSource{}, err
}
components = append(components, processorComponents...)
}
components = append(components, setLogNameComponents(ctx, tag, p.rID, receiver.Type())...)
// Logs ingested using the fluent_forward receiver must add the existing_tag
// on the record to the LogName. This is done with a Lua filter.
if receiver.Type() == "fluent_forward" {
components = append(components, fluentbit.LuaFilterComponents(tag, addLogNameLuaFunction, addLogNameLuaScriptContents)...)
}
return fbSource{
tagRegex: tagRegex,
components: components,
}, nil
}
func (p pipelineInstance) otelComponents(ctx context.Context) (map[string]otel.ReceiverPipeline, map[string]otel.Pipeline, error) {
outR := make(map[string]otel.ReceiverPipeline)
outP := make(map[string]otel.Pipeline)
receiver, ok := p.receiver.(OTelReceiver)
if !ok {
return nil, nil, fmt.Errorf("%q is not an otel receiver", p.rID)
}
// TODO: Add a way for receivers or processors to decide whether they're compatible with a particular config.
receiverPipelines, err := receiver.Pipelines(ctx)
if err != nil {
return nil, nil, fmt.Errorf("receiver %q has invalid configuration: %w", p.rID, err)
}
for i, receiverPipeline := range receiverPipelines {
receiverPipelineName := strings.ReplaceAll(p.rID, "_", "__")
if i > 0 {
receiverPipelineName = fmt.Sprintf("%s_%d", receiverPipelineName, i)
}
prefix := fmt.Sprintf("%s_%s", strings.ReplaceAll(p.pID, "_", "__"), receiverPipelineName)
if p.pipelineType != "metrics" {
// Don't prepend for metrics pipelines to preserve old golden configs.
prefix = fmt.Sprintf("%s_%s", p.pipelineType, prefix)
}
if processors, ok := receiverPipeline.Processors["logs"]; ok {
receiverPipeline.Processors["logs"] = append(
processors,
otelSetLogNameComponents(ctx, p.rID)...,
)
}
outR[receiverPipelineName] = receiverPipeline
pipeline := otel.Pipeline{
Type: p.pipelineType,
ReceiverPipelineName: receiverPipelineName,
}
// Check the Ops Agent receiver type.
if receiverPipeline.ExporterTypes[p.pipelineType] == otel.GMP {
// Prometheus receivers are incompatible with processors, so we need to assert that no processors are configured.
if len(p.processors) > 0 {
return nil, nil, fmt.Errorf("prometheus receivers are incompatible with Ops Agent processors")
}
}
for _, processorItem := range p.processors {
processor, ok := processorItem.Component.(OTelProcessor)
if !ok {
return nil, nil, fmt.Errorf("processor %q not supported in pipeline %q", processorItem.id, p.pID)
}
if processors, err := processor.Processors(ctx); err != nil {
return nil, nil, fmt.Errorf("processor %q has invalid configuration: %w", processorItem.id, err)
} else {
pipeline.Processors = append(pipeline.Processors, processors...)
}
}
outP[prefix] = pipeline
}
return outR, outP, nil
}
// generateOtelPipelines generates a map of OTel pipeline names to OTel pipelines.
func (uc *UnifiedConfig) generateOtelPipelines(ctx context.Context) (map[string]otel.ReceiverPipeline, map[string]otel.Pipeline, error) {
outR := make(map[string]otel.ReceiverPipeline)
outP := make(map[string]otel.Pipeline)
pipelines, err := uc.Pipelines(ctx)
if err != nil {
return nil, nil, err
}
for _, pipeline := range pipelines {
if pipeline.backend != backendOTel {
continue
}
pipeR, pipeP, err := pipeline.otelComponents(ctx)
if err != nil {
return nil, nil, err
}
maps.Copy(outR, pipeR)
maps.Copy(outP, pipeP)
}
return outR, outP, nil
}
// GenerateFluentBitConfigs generates configuration file(s) for Fluent Bit.
// It returns a map of filenames to file contents.
func (uc *UnifiedConfig) GenerateFluentBitConfigs(ctx context.Context, logsDir string, stateDir string) (map[string]string, error) {
userAgent, _ := platform.FromContext(ctx).UserAgent("Google-Cloud-Ops-Agent-Logging")
components, err := uc.generateFluentbitComponents(ctx, userAgent)
if err != nil {
return nil, err
}
c := fluentbit.ModularConfig{
Variables: map[string]string{
"buffers_dir": path.Join(stateDir, "buffers"),
"logs_dir": logsDir,
},
Components: components,
}
return c.Generate()
}
func contains(s []string, str string) bool {
for _, v := range s {
if v == str {
return true
}
}
return false
}
func processUserDefinedMultilineParser(i int, pID string, receiver LoggingReceiver, processor LoggingProcessor, receiverComponents []fluentbit.Component, processorComponents []fluentbit.Component) error {
var multilineParserNames []string
if processor.Type() != "parse_multiline" {
return nil
}
for _, p := range processorComponents {
if p.Kind == "MULTILINE_PARSER" {
multilineParserNames = append(multilineParserNames, p.Config["name"])
}
}
allowedMultilineReceiverTypes := []string{"files"}
for _, r := range receiverComponents {
if len(multilineParserNames) != 0 &&
!contains(allowedMultilineReceiverTypes, receiver.Type()) {
return fmt.Errorf(`processor %q with type "parse_multiline" can only be applied on receivers with type "files"`, pID)
}
if len(multilineParserNames) != 0 {
r.Config["multiline.parser"] = strings.Join(multilineParserNames, ",")
}
}
if i != 0 {
return fmt.Errorf(`at most one logging processor with type "parse_multiline" is allowed in the pipeline. A logging processor with type "parse_multiline" must be right after a logging receiver with type "files"`)
}
return nil
}
func sliceContains(s []string, v string) bool {
for _, e := range s {
if e == v {
return true
}
}
return false
}
const (
attributeLabelPrefix string = "compute.googleapis.com/attributes/"
)
// addGceMetadataAttributesComponents annotates logs with labels corresponding
// to instance attributes from the GCE metadata server.
func addGceMetadataAttributesComponents(ctx context.Context, attributes []string, tag, uid string) []fluentbit.Component {
processorName := fmt.Sprintf("%s.%s.gce_metadata", tag, uid)
resource, err := platform.FromContext(ctx).GetResource()
if err != nil {
log.Printf("can't get resource metadata: %v", err)
return nil
}
gceMetadata, ok := resource.(resourcedetector.GCEResource)
if !ok {
// Not on GCE; no attributes to detect.
log.Printf("ignoring the gce_metadata_attributes processor outside of GCE: %T", resource)
return nil
}
modifications := map[string]*ModifyField{}
var attributeKeys []string
for k, _ := range gceMetadata.Metadata {
attributeKeys = append(attributeKeys, k)
}
sort.Strings(attributeKeys)
for _, k := range attributeKeys {
if !sliceContains(attributes, k) {
continue
}
v := gceMetadata.Metadata[k]
modifications[fmt.Sprintf(`labels."%s%s"`, attributeLabelPrefix, k)] = &ModifyField{
StaticValue: &v,
}
}
if len(modifications) == 0 {
return nil
}
return LoggingProcessorModifyFields{
Fields: modifications,
}.Components(ctx, tag, processorName)
}
type fbSource struct {
tagRegex string
components []fluentbit.Component
}
// generateFluentbitComponents generates a slice of fluentbit config sections to represent l.
func (uc *UnifiedConfig) generateFluentbitComponents(ctx context.Context, userAgent string) ([]fluentbit.Component, error) {
l := uc.Logging
var out []fluentbit.Component
if l.Service.LogLevel == "" {
l.Service.LogLevel = "info"
}
service := fluentbit.Service{LogLevel: l.Service.LogLevel}
out = append(out, service.Component())
out = append(out, fluentbit.MetricsInputComponent())
if l != nil && l.Service != nil && !l.Service.OTelLogging {
// Type for sorting.
var sources []fbSource
var tags []string
pipelines, err := uc.Pipelines(ctx)
if err != nil {
return nil, err
}
for _, pipeline := range pipelines {
if pipeline.backend != backendFluentBit {
continue
}
source, err := pipeline.fluentBitComponents(ctx)
if err != nil {
return nil, err
}
sources = append(sources, source)
tags = append(tags, source.tagRegex)
}
sort.Slice(sources, func(i, j int) bool { return sources[i].tagRegex < sources[j].tagRegex })
sort.Strings(tags)
for _, s := range sources {
out = append(out, s.components...)
}
if len(tags) > 0 {
out = append(out, stackdriverOutputComponent(ctx, strings.Join(tags, "|"), userAgent, "2G", l.Service.Compress))
}
out = append(out, uc.generateSelfLogsComponents(ctx, userAgent)...)
out = append(out, addGceMetadataAttributesComponents(ctx, []string{
"dataproc-cluster-name",
"dataproc-cluster-uuid",
"dataproc-region",
}, "*", "default-dataproc")...)
}
out = append(out, fluentbit.MetricsOutputComponent())
return out, nil
}
func getMD5Hash(text string) string {
hasher := md5.New()
hasher.Write([]byte(text))
return hex.EncodeToString(hasher.Sum(nil))
}