confgenerator/self_logs.go (176 lines of code) (raw):

// Copyright 2023 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package confgenerator import ( "context" "fmt" "path" "strings" "github.com/GoogleCloudPlatform/ops-agent/confgenerator/fluentbit" "github.com/GoogleCloudPlatform/ops-agent/internal/healthchecks" "github.com/GoogleCloudPlatform/ops-agent/internal/logs" "github.com/GoogleCloudPlatform/ops-agent/internal/platform" "github.com/GoogleCloudPlatform/ops-agent/internal/version" ) var ( agentKind string = "ops-agent" schemaVersion string = "v1" ) const ( opsAgentLogsMatch string = "ops-agent-*" fluentBitSelfLogsTag string = "ops-agent-fluent-bit" healthLogsTag string = "ops-agent-health" agentVersionKey string = "agent.googleapis.com/health/agentVersion" agentKindKey string = "agent.googleapis.com/health/agentKind" schemaVersionKey string = "agent.googleapis.com/health/schemaVersion" ) func fluentbitSelfLogsPath(p platform.Platform) string { loggingModule := "logging-module.log" if p.Type == platform.Windows { return path.Join("${logs_dir}", loggingModule) } return path.Join("${logs_dir}", "subagents", loggingModule) } func healthChecksLogsPath() string { return path.Join("${logs_dir}", "health-checks.log") } func generateInputHealthLoggingPingComponent(ctx context.Context) []fluentbit.Component { return []fluentbit.Component{ { Kind: "INPUT", Config: map[string]string{ "Name": "dummy", "Tag": healthLogsTag, "Dummy": `{"code": "LogPingOpsAgent", "severity": "DEBUG"}`, "Interval_Sec": "600", "Interval_NSec": "0", }, }, } } // This method creates a file input for the `health-checks.log` file, a json parser for the // structured logs and a grep filter to avoid ingesting previous content of the file. func generateInputHealthChecksLogsComponents(ctx context.Context) []fluentbit.Component { out := make([]fluentbit.Component, 0) out = append(out, LoggingReceiverFilesMixin{ IncludePaths: []string{healthChecksLogsPath()}, BufferInMemory: true, }.Components(ctx, healthLogsTag)...) out = append(out, LoggingProcessorParseJson{ // TODO(b/282754149): Remove TimeKey and TimeFormat when feature gets implemented. ParserShared: ParserShared{ TimeKey: logs.TimeZapKey, TimeFormat: "%Y-%m-%dT%H:%M:%S%z", }, }.Components(ctx, healthLogsTag, "health-checks-json")...) out = append(out, []fluentbit.Component{ // This is used to exclude any previous content of the `health-checks.log` file that does not contain // the `jsonPayload.severity` field. Due to `https://github.com/fluent/fluent-bit/issues/7092` the // filtering can't be done directly to the `logging.googleapis.com/severity` field. // We cannot use `LoggingProcessorExcludeLogs` here since it doesn't exclude when the field is missing. { Kind: "FILTER", Config: map[string]string{ "Name": "grep", "Match": healthLogsTag, "Regex": fmt.Sprintf("%s INFO|ERROR|WARNING|DEBUG|info|error|warning|debug", logs.SeverityZapKey), }, }, }...) return out } // This method creates a file input for the `logging-module.log` file, a regex parser for the // fluent-bit self logs and a translator of severity to the logging api format. func generateInputFluentBitSelfLogsComponents(ctx context.Context, logLevel string) []fluentbit.Component { out := make([]fluentbit.Component, 0) out = append(out, LoggingReceiverFilesMixin{ IncludePaths: []string{fluentbitSelfLogsPath(platform.FromContext(ctx))}, //Following: b/226668416 temporarily set storage.type to "memory" //to prevent chunk corruption errors BufferInMemory: true, }.Components(ctx, fluentBitSelfLogsTag)...) out = append(out, LoggingProcessorParseRegex{ Regex: `(?<message>\[[ ]*(?<time>\d+\/\d+\/\d+ \d+:\d+:\d+)] \[[ ]*(?<severity>[a-z]+)\].*)`, PreserveKey: true, ParserShared: ParserShared{ TimeKey: "time", TimeFormat: "%Y/%m/%d %H:%M:%S", Types: map[string]string{ "severity": "string", }, }, }.Components(ctx, fluentBitSelfLogsTag, "fluent-bit-self-log-regex-parsing")...) // Disables sending fluent-bit debug logs to Cloud Logging due to endless spam. // TODO: Remove when b/272779619 is fixed. if logLevel == "debug" { out = append(out, []fluentbit.Component{ { Kind: "FILTER", Config: map[string]string{ "Name": "grep", "Match": fluentBitSelfLogsTag, "Exclude": "severity debug", }, }, }...) } return out } func generateFilterSelfLogsSamplingComponents(ctx context.Context) []fluentbit.Component { out := make([]fluentbit.Component, 0) for _, m := range healthchecks.FluentBitSelfLogTranslationList { // This filter samples specific fluent-bit logs by matching with regex and re-emits // an `ops-agent-health` log. out = append(out, fluentbit.Component{ Kind: "FILTER", Config: map[string]string{ "Name": "rewrite_tag", "Match": fluentBitSelfLogsTag, "Rule": fmt.Sprintf(`message %s %s true`, m.RegexMatch, healthLogsTag), }, }) // This filter sets the appropiate health code and message to the previously sampled logs. out = append(out, fluentbit.Component{ Kind: "FILTER", OrderedConfig: [][2]string{ {"Name", "modify"}, {"Match", healthLogsTag}, {"Condition", fmt.Sprintf(`Key_value_matches message %s`, m.RegexMatch)}, {"Set", fmt.Sprintf(`code %s`, m.Code)}, {"Set", fmt.Sprintf(`message "%s"`, m.Message)}, }, }) } return out } // This method creates a component that enforces the `Structured Health Logs` format to // all `ops-agent-health` logs. It sets `agentKind`, `agentVersion` and `schemaVersion`. func generateFilterStructuredHealthLogsComponents(ctx context.Context) []fluentbit.Component { return LoggingProcessorModifyFields{ Fields: map[string]*ModifyField{ fmt.Sprintf(`labels."%s"`, agentKindKey): { StaticValue: &agentKind, }, fmt.Sprintf(`labels."%s"`, agentVersionKey): { StaticValue: &version.Version, }, fmt.Sprintf(`labels."%s"`, schemaVersionKey): { StaticValue: &schemaVersion, }, }, }.Components(ctx, healthLogsTag, "set-structured-health-logs") } // This method processes all self logs to set the severity field correctly before reaching the output plugin. func generateFilterMapSeverityFieldComponent(ctx context.Context) []fluentbit.Component { return LoggingProcessorModifyFields{ Fields: map[string]*ModifyField{ "severity": { MoveFrom: "jsonPayload.severity", MapValues: map[string]string{ "error": "ERROR", "warn": "WARNING", "info": "INFO", "debug": "DEBUG", }, MapValuesExclusive: false, }, }, }.Components(ctx, opsAgentLogsMatch, "self-logs-processing") } // This method creates a component that outputs all ops-agent self logs to Cloud Logging. func generateOutputSelfLogsComponent(ctx context.Context, userAgent string, ingestSelfLogs bool) fluentbit.Component { outputLogNames := []string{healthLogsTag} if ingestSelfLogs { // Ingest fluent-bit logs to Cloud Logging if enabled. outputLogNames = append(outputLogNames, fluentBitSelfLogsTag) } return stackdriverOutputComponent(ctx, strings.Join(outputLogNames, "|"), userAgent, "", "") } func (uc *UnifiedConfig) generateSelfLogsComponents(ctx context.Context, userAgent string) []fluentbit.Component { out := make([]fluentbit.Component, 0) out = append(out, generateInputHealthLoggingPingComponent(ctx)...) out = append(out, generateInputFluentBitSelfLogsComponents(ctx, uc.Logging.Service.LogLevel)...) out = append(out, generateInputHealthChecksLogsComponents(ctx)...) out = append(out, generateFilterSelfLogsSamplingComponents(ctx)...) out = append(out, generateFilterStructuredHealthLogsComponents(ctx)...) out = append(out, generateFilterMapSeverityFieldComponent(ctx)...) out = append(out, generateOutputSelfLogsComponent(ctx, userAgent, uc.Global.GetDefaultSelfLogFileCollection())) return out }