confgenerator/logging_receivers.go (562 lines of code) (raw):
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package confgenerator
import (
"context"
"fmt"
"path"
"strconv"
"strings"
"time"
"github.com/GoogleCloudPlatform/ops-agent/confgenerator/fluentbit"
"github.com/GoogleCloudPlatform/ops-agent/confgenerator/fluentbit/modify"
"github.com/GoogleCloudPlatform/ops-agent/confgenerator/otel"
"github.com/GoogleCloudPlatform/ops-agent/confgenerator/otel/ottl"
"github.com/GoogleCloudPlatform/ops-agent/internal/platform"
)
// DBPath returns the database path for the given log tag
func DBPath(tag string) string {
// TODO: More sanitization?
dir := strings.ReplaceAll(strings.ReplaceAll(tag, ".", "_"), "/", "_")
return path.Join("${buffers_dir}", dir)
}
// A LoggingReceiverFiles represents the user configuration for a file receiver (fluentbit's tail plugin).
type LoggingReceiverFiles struct {
ConfigComponent `yaml:",inline"`
// TODO: Use LoggingReceiverFilesMixin after figuring out the validation story.
IncludePaths []string `yaml:"include_paths" validate:"required,min=1"`
ExcludePaths []string `yaml:"exclude_paths,omitempty"`
WildcardRefreshInterval *time.Duration `yaml:"wildcard_refresh_interval,omitempty" validate:"omitempty,min=1s,multipleof_time=1s"`
RecordLogFilePath *bool `yaml:"record_log_file_path,omitempty"`
}
func (r LoggingReceiverFiles) Type() string {
return "files"
}
func (r LoggingReceiverFiles) mixin() LoggingReceiverFilesMixin {
return LoggingReceiverFilesMixin{
IncludePaths: r.IncludePaths,
ExcludePaths: r.ExcludePaths,
WildcardRefreshInterval: r.WildcardRefreshInterval,
RecordLogFilePath: r.RecordLogFilePath,
}
}
func (r LoggingReceiverFiles) Components(ctx context.Context, tag string) []fluentbit.Component {
return r.mixin().Components(ctx, tag)
}
func (r LoggingReceiverFiles) Pipelines(ctx context.Context) ([]otel.ReceiverPipeline, error) {
return r.mixin().Pipelines(ctx)
}
type LoggingReceiverFilesMixin struct {
IncludePaths []string `yaml:"include_paths,omitempty"`
ExcludePaths []string `yaml:"exclude_paths,omitempty"`
WildcardRefreshInterval *time.Duration `yaml:"wildcard_refresh_interval,omitempty" validate:"omitempty,min=1s,multipleof_time=1s"`
MultilineRules []MultilineRule `yaml:"-"`
BufferInMemory bool `yaml:"-"`
RecordLogFilePath *bool `yaml:"record_log_file_path,omitempty"`
}
func (r LoggingReceiverFilesMixin) Components(ctx context.Context, tag string) []fluentbit.Component {
if len(r.IncludePaths) == 0 {
// No files -> no input.
return nil
}
config := map[string]string{
// https://docs.fluentbit.io/manual/pipeline/inputs/tail#config
"Name": "tail",
"Tag": tag,
// TODO: Escaping?
"Path": strings.Join(r.IncludePaths, ","),
"DB": DBPath(tag),
// DB.locking specifies that the database will be accessed only by Fluent Bit.
// Enabling this feature helps to increase performance when accessing the database
// but it restrict any external tool to query the content.
"DB.locking": "true",
"Read_from_Head": "True",
// Set the chunk limit conservatively to avoid exceeding the recommended chunk size of 5MB per write request.
"Buffer_Chunk_Size": "512k",
// Set the max size a bit larger to accommodate for long log lines.
"Buffer_Max_Size": "2M",
// When a message is unstructured (no parser applied), append it under a key named "message".
"Key": "message",
// Increase this to 30 seconds so log rotations are handled more gracefully.
"Rotate_Wait": "30",
// Skip long lines instead of skipping the entire file when a long line exceeds buffer size.
"Skip_Long_Lines": "On",
// https://docs.fluentbit.io/manual/administration/buffering-and-storage#input-section-configuration
// Buffer in disk to improve reliability.
"storage.type": "filesystem",
// https://docs.fluentbit.io/manual/administration/backpressure#mem_buf_limit
// This controls how much data the input plugin can hold in memory once the data is ingested into the core.
// This is used to deal with backpressure scenarios (e.g: cannot flush data for some reason).
// When the input plugin hits "mem_buf_limit", because we have enabled filesystem storage type, mem_buf_limit acts
// as a hint to set "how much data can be up in memory", once the limit is reached it continues writing to disk.
"Mem_Buf_Limit": "10M",
}
if len(r.ExcludePaths) > 0 {
// TODO: Escaping?
config["Exclude_Path"] = strings.Join(r.ExcludePaths, ",")
}
if r.WildcardRefreshInterval != nil {
refreshIntervalSeconds := int(r.WildcardRefreshInterval.Seconds())
config["Refresh_Interval"] = strconv.Itoa(refreshIntervalSeconds)
}
if r.RecordLogFilePath != nil && *r.RecordLogFilePath == true {
config["Path_Key"] = "agent.googleapis.com/log_file_path"
}
if r.BufferInMemory {
config["storage.type"] = "memory"
}
c := []fluentbit.Component{}
if len(r.MultilineRules) > 0 {
// Configure multiline in the input component;
// This is necessary, since using the multiline filter will not work
// if a multiline message spans between two chunks.
rules := [][2]string{}
for _, rule := range r.MultilineRules {
rules = append(rules, [2]string{"rule", rule.AsString()})
}
parserName := fmt.Sprintf("multiline.%s", tag)
c = append(c, fluentbit.Component{
Kind: "MULTILINE_PARSER",
Config: map[string]string{
"name": parserName,
"type": "regex",
"flush_timeout": "5000",
},
OrderedConfig: rules,
})
// See https://docs.fluentbit.io/manual/pipeline/inputs/tail#multiline-core-v1.8
config["multiline.parser"] = parserName
// multiline parser outputs to a "log" key, but we expect "message" as the output of this pipeline
c = append(c, modify.NewRenameOptions("log", "message").Component(tag))
}
c = append(c, fluentbit.Component{
Kind: "INPUT",
Config: config,
})
return c
}
func (r LoggingReceiverFilesMixin) Pipelines(ctx context.Context) ([]otel.ReceiverPipeline, error) {
operators := []map[string]any{}
receiver_config := map[string]any{
"include": r.IncludePaths,
"exclude": r.ExcludePaths,
"start_at": "beginning",
"include_file_name": false,
}
if i := r.WildcardRefreshInterval; i != nil {
receiver_config["poll_interval"] = i.String()
}
// TODO: Configure `storage` to store file checkpoints
// TODO: Configure multiline rules
// TODO: Support BufferInMemory
// OTel parses the log to `body` by default; put it in a `message` field to match fluent-bit's behavior.
operators = append(operators, map[string]any{
"id": "body",
"type": "move",
"from": "body",
"to": "body.message",
})
if r.RecordLogFilePath != nil && *r.RecordLogFilePath {
receiver_config["include_file_path"] = true
operators = append(operators, map[string]any{
"id": "record_log_file_path",
"type": "move",
"from": `attributes["log.file.path"]`,
"to": `attributes["agent.googleapis.com/log_file_path"]`,
})
}
receiver_config["operators"] = operators
return []otel.ReceiverPipeline{{
Receiver: otel.Component{
Type: "filelog",
Config: receiver_config,
},
Processors: map[string][]otel.Component{
"logs": nil,
},
ExporterTypes: map[string]otel.ExporterType{
"logs": otel.OTel,
},
}}, nil
}
func init() {
LoggingReceiverTypes.RegisterType(func() LoggingReceiver { return &LoggingReceiverFiles{} })
}
// A LoggingReceiverSyslog represents the configuration for a syslog protocol receiver.
type LoggingReceiverSyslog struct {
ConfigComponent `yaml:",inline"`
TransportProtocol string `yaml:"transport_protocol,omitempty" validate:"oneof=tcp udp"`
ListenHost string `yaml:"listen_host,omitempty" validate:"required,ip"`
ListenPort uint16 `yaml:"listen_port,omitempty" validate:"required"`
}
func (r LoggingReceiverSyslog) Type() string {
return "syslog"
}
func (r LoggingReceiverSyslog) GetListenPort() uint16 {
return r.ListenPort
}
func (r LoggingReceiverSyslog) Components(ctx context.Context, tag string) []fluentbit.Component {
return []fluentbit.Component{{
Kind: "INPUT",
Config: map[string]string{
// https://docs.fluentbit.io/manual/pipeline/inputs/syslog
"Name": "syslog",
"Tag": tag,
"Mode": r.TransportProtocol,
"Listen": r.ListenHost,
"Port": fmt.Sprintf("%d", r.GetListenPort()),
"Parser": tag,
// https://docs.fluentbit.io/manual/administration/buffering-and-storage#input-section-configuration
// Buffer in disk to improve reliability.
"storage.type": "filesystem",
// https://docs.fluentbit.io/manual/administration/backpressure#mem_buf_limit
// This controls how much data the input plugin can hold in memory once the data is ingested into the core.
// This is used to deal with backpressure scenarios (e.g: cannot flush data for some reason).
// When the input plugin hits "mem_buf_limit", because we have enabled filesystem storage type, mem_buf_limit acts
// as a hint to set "how much data can be up in memory", once the limit is reached it continues writing to disk.
"Mem_Buf_Limit": "10M",
},
}, {
// FIXME: This is not new, but we shouldn't be disabling syslog protocol parsing by passing a custom Parser - Fluentbit includes builtin syslog protocol support, and we should enable/expose that.
Kind: "PARSER",
Config: map[string]string{
"Name": tag,
"Format": "regex",
"Regex": `^(?<message>.*)$`,
},
}}
}
func init() {
LoggingReceiverTypes.RegisterType(func() LoggingReceiver { return &LoggingReceiverSyslog{} })
}
// A LoggingReceiverTCP represents the configuration for a TCP receiver.
type LoggingReceiverTCP struct {
ConfigComponent `yaml:",inline"`
Format string `yaml:"format,omitempty" validate:"required,oneof=json"`
ListenHost string `yaml:"listen_host,omitempty" validate:"omitempty,ip"`
ListenPort uint16 `yaml:"listen_port,omitempty"`
}
func (r LoggingReceiverTCP) Type() string {
return "tcp"
}
func (r LoggingReceiverTCP) GetListenPort() uint16 {
if r.ListenPort == 0 {
r.ListenPort = 5170
}
return r.ListenPort
}
func (r LoggingReceiverTCP) Components(ctx context.Context, tag string) []fluentbit.Component {
if r.ListenHost == "" {
r.ListenHost = "127.0.0.1"
}
return []fluentbit.Component{{
Kind: "INPUT",
Config: map[string]string{
// https://docs.fluentbit.io/manual/pipeline/inputs/tcp
"Name": "tcp",
"Tag": tag,
"Listen": r.ListenHost,
"Port": fmt.Sprintf("%d", r.GetListenPort()),
"Format": r.Format,
// https://docs.fluentbit.io/manual/administration/buffering-and-storage#input-section-configuration
// Buffer in disk to improve reliability.
"storage.type": "filesystem",
// https://docs.fluentbit.io/manual/administration/backpressure#mem_buf_limit
// This controls how much data the input plugin can hold in memory once the data is ingested into the core.
// This is used to deal with backpressure scenarios (e.g: cannot flush data for some reason).
// When the input plugin hits "mem_buf_limit", because we have enabled filesystem storage type, mem_buf_limit acts
// as a hint to set "how much data can be up in memory", once the limit is reached it continues writing to disk.
"Mem_Buf_Limit": "10M",
// Allow incoming logs to occupy the maximum possible size per the Logging API (256k).
// Use a safety factor of 2 to account for things like encoding overhead.
"Chunk_Size": "512k",
},
}}
}
func init() {
LoggingReceiverTypes.RegisterType(func() LoggingReceiver { return &LoggingReceiverTCP{} })
}
// A LoggingReceiverFluentForward represents the configuration for a Forward Protocol receiver.
type LoggingReceiverFluentForward struct {
ConfigComponent `yaml:",inline"`
ListenHost string `yaml:"listen_host,omitempty" validate:"omitempty,ip"`
ListenPort uint16 `yaml:"listen_port,omitempty"`
}
func (r LoggingReceiverFluentForward) Type() string {
return "fluent_forward"
}
func (r LoggingReceiverFluentForward) GetListenPort() uint16 {
if r.ListenPort == 0 {
r.ListenPort = 24224
}
return r.ListenPort
}
func (r LoggingReceiverFluentForward) Components(ctx context.Context, tag string) []fluentbit.Component {
if r.ListenHost == "" {
r.ListenHost = "127.0.0.1"
}
return []fluentbit.Component{{
Kind: "INPUT",
Config: map[string]string{
// https://docs.fluentbit.io/manual/pipeline/inputs/forward
"Name": "forward",
"Tag_Prefix": tag + ".",
"Listen": r.ListenHost,
"Port": fmt.Sprintf("%d", r.GetListenPort()),
// https://docs.fluentbit.io/manual/administration/buffering-and-storage#input-section-configuration
// Buffer in disk to improve reliability.
"storage.type": "filesystem",
// https://docs.fluentbit.io/manual/administration/backpressure#mem_buf_limit
// This controls how much data the input plugin can hold in memory once the data is ingested into the core.
// This is used to deal with backpressure scenarios (e.g: cannot flush data for some reason).
// When the input plugin hits "mem_buf_limit", because we have enabled filesystem storage type, mem_buf_limit acts
// as a hint to set "how much data can be up in memory", once the limit is reached it continues writing to disk.
"Mem_Buf_Limit": "10M",
},
}}
}
func init() {
LoggingReceiverTypes.RegisterType(func() LoggingReceiver { return &LoggingReceiverFluentForward{} })
}
// A LoggingReceiverWindowsEventLog represents the user configuration for a Windows event log receiver.
type LoggingReceiverWindowsEventLog struct {
ConfigComponent `yaml:",inline"`
Channels []string `yaml:"channels,omitempty,flow" validate:"required,winlogchannels"`
ReceiverVersion string `yaml:"receiver_version,omitempty" validate:"omitempty,oneof=1 2" tracking:""`
RenderAsXML bool `yaml:"render_as_xml,omitempty" tracking:""`
}
const eventLogV2SeverityParserLua = `
function process(tag, timestamp, record)
severityKey = 'logging.googleapis.com/severity'
if record['Level'] == 1 then
record[severityKey] = 'CRITICAL'
elseif record['Level'] == 2 then
record[severityKey] = 'ERROR'
elseif record['Level'] == 3 then
record[severityKey] = 'WARNING'
elseif record['Level'] == 4 then
record[severityKey] = 'INFO'
elseif record['Level'] == 5 then
record[severityKey] = 'NOTICE'
end
return 2, timestamp, record
end
`
func (r LoggingReceiverWindowsEventLog) Type() string {
return "windows_event_log"
}
func (r LoggingReceiverWindowsEventLog) IsDefaultVersion() bool {
return r.ReceiverVersion == "" || r.ReceiverVersion == "1"
}
func (r LoggingReceiverWindowsEventLog) Components(ctx context.Context, tag string) []fluentbit.Component {
if len(r.ReceiverVersion) == 0 {
r.ReceiverVersion = "1"
}
inputName := "winlog"
timeKey := "TimeGenerated"
if !r.IsDefaultVersion() {
inputName = "winevtlog"
timeKey = "TimeCreated"
}
// https://docs.fluentbit.io/manual/pipeline/inputs/windows-event-log
input := []fluentbit.Component{{
Kind: "INPUT",
Config: map[string]string{
"Name": inputName,
"Tag": tag,
// TODO(@braydonk): Remove this upon the next Fluent Bit update. See https://github.com/fluent/fluent-bit/issues/8854
"String_Inserts": "true",
"Channels": strings.Join(r.Channels, ","),
"Interval_Sec": "1",
"DB": DBPath(tag),
},
}}
// On Windows Server 2012/2016, there is a known problem where most log fields end
// up blank. The Use_ANSI configuration is provided to work around this; however,
// this also strips Unicode characters away, so we only use it on affected
// platforms. This only affects the newer API.
p := platform.FromContext(ctx)
if !r.IsDefaultVersion() && (p.Is2012() || p.Is2016()) {
input[0].Config["Use_ANSI"] = "True"
}
if r.RenderAsXML {
input[0].Config["Render_Event_As_XML"] = "True"
// By default, fluent-bit puts the rendered XML into a field named "System"
// (this is a constant field name and has no relation to the "System" channel).
// Rename it to "raw_xml" because it's a more descriptive name than "System".
input = append(input, modify.NewRenameOptions("System", "raw_xml").Component(tag))
}
// Parser for parsing TimeCreated/TimeGenerated field as log record timestamp.
timestampParserName := fmt.Sprintf("%s.timestamp_parser", tag)
timestampParser := fluentbit.Component{
Kind: "PARSER",
Config: map[string]string{
"Name": timestampParserName,
"Format": "regex",
"Time_Format": "%Y-%m-%d %H:%M:%S %z",
"Time_Key": "timestamp",
"Regex": `(?<timestamp>\d+-\d+-\d+ \d+:\d+:\d+ [+-]\d{4})`,
},
}
timestampParserFilters := fluentbit.ParserFilterComponents(tag, timeKey, []string{timestampParserName}, true)
input = append(input, timestampParser)
input = append(input, timestampParserFilters...)
var filters []fluentbit.Component
if r.IsDefaultVersion() {
filters = fluentbit.TranslationComponents(tag, "EventType", "logging.googleapis.com/severity", false,
[]struct{ SrcVal, DestVal string }{
{"Error", "ERROR"},
{"Information", "INFO"},
{"Warning", "WARNING"},
{"SuccessAudit", "NOTICE"},
{"FailureAudit", "NOTICE"},
})
} else {
// Ordinarily we use fluentbit.TranslationComponents to populate severity,
// which uses 'modify' filters, except 'modify' filters only work on string
// values and Level is an int. So we need to use Lua.
filters = fluentbit.LuaFilterComponents(tag, "process", eventLogV2SeverityParserLua)
}
return append(input, filters...)
}
func (r LoggingReceiverWindowsEventLog) Pipelines(ctx context.Context) ([]otel.ReceiverPipeline, error) {
// TODO: r.IsDefaultVersion() should use the old windows event log API, but the Collector doesn't have a receiver for that.
var out []otel.ReceiverPipeline
for _, c := range r.Channels {
receiver_config := map[string]any{
"channel": c,
"start_at": "beginning",
"poll_interval": "1s",
// TODO: Configure storage
}
if r.RenderAsXML {
receiver_config["raw"] = true
// TODO: Rename to `jsonPayload.raw_xml`
}
var p []otel.Component
if r.IsDefaultVersion() {
var err error
p, err = windowsEventLogV1Processors(ctx)
if err != nil {
return nil, err
}
}
// TODO: Add processors for fluent-bit's V2 format.
out = append(out, otel.ReceiverPipeline{
Receiver: otel.Component{
Type: "windowseventlog",
Config: receiver_config,
},
Processors: map[string][]otel.Component{
"logs": p,
},
ExporterTypes: map[string]otel.ExporterType{
"logs": otel.OTel,
},
})
}
return out, nil
}
func windowsEventLogV1Processors(ctx context.Context) ([]otel.Component, error) {
// The winlog input in fluent-bit has a completely different structure, so we need to convert the OTel format into the fluent-bit format.
var empty string
p := &LoggingProcessorModifyFields{
EmptyBody: true,
Fields: map[string]*ModifyField{
"jsonPayload.Channel": {CopyFrom: "jsonPayload.channel"},
"jsonPayload.ComputerName": {CopyFrom: "jsonPayload.computer"},
"jsonPayload.Data": {
CopyFrom: "jsonPayload.event_data.binary",
DefaultValue: &empty,
CustomConvertFunc: func(v ottl.LValue) ottl.Statements {
return v.Set(ottl.ConvertCase(v, "lower"))
},
},
// TODO: OTel puts the human-readable category at jsonPayload.task, but we need them to add the integer version.
//"jsonPayload.EventCategory": {StaticValue: "0", Type: "integer"},
"jsonPayload.EventID": {CopyFrom: "jsonPayload.event_id.id"},
"jsonPayload.EventType": {
CopyFrom: "jsonPayload.level",
CustomConvertFunc: func(v ottl.LValue) ottl.Statements {
// TODO: What if there are multiple keywords?
keywords := ottl.LValue{"cache", "body", "keywords"}
keyword0 := ottl.RValue(`cache["body"]["keywords"][0]`)
return ottl.NewStatements(
v.SetIf(ottl.StringLiteral("SuccessAudit"), ottl.And(
keywords.IsPresent(),
ottl.IsNotNil(keyword0),
ottl.Equals(keyword0, ottl.StringLiteral("Audit Success")),
)),
v.SetIf(ottl.StringLiteral("FailureAudit"), ottl.And(
keywords.IsPresent(),
ottl.IsNotNil(keyword0),
ottl.Equals(keyword0, ottl.StringLiteral("Audit Failure")),
)),
)
},
},
// TODO: Fix OTel receiver to provide raw non-parsed messages.
"jsonPayload.Message": {CopyFrom: "jsonPayload.message"},
"jsonPayload.Qualifiers": {CopyFrom: "jsonPayload.event_id.qualifiers"},
"jsonPayload.RecordNumber": {CopyFrom: "jsonPayload.record_id"},
"jsonPayload.Sid": {
CopyFrom: "jsonPayload.security.user_id",
DefaultValue: &empty,
},
"jsonPayload.SourceName": {
CopyFrom: "jsonPayload.provider.name",
CustomConvertFunc: func(v ottl.LValue) ottl.Statements {
// Prefer jsonPayload.provider.event_source if present and non-empty
eventSource := ottl.LValue{"cache", "body", "provider", "event_source"}
return v.SetIf(
eventSource,
ottl.And(
eventSource.IsPresent(),
ottl.Not(ottl.Equals(
eventSource,
ottl.StringLiteral(""),
)),
),
)
},
},
// TODO: Convert from array of maps to array of strings
"jsonPayload.StringInserts": {CopyFrom: "jsonPayload.event_data.data"},
// TODO: Reformat? (v1 was "YYYY-MM-DD hh:mm:ss +0000", OTel is "YYYY-MM-DDThh:mm:ssZ")
"jsonPayload.TimeGenerated": {CopyFrom: "jsonPayload.system_time"},
// TODO: Reformat?
"jsonPayload.TimeWritten": {CopyFrom: "jsonPayload.system_time"},
}}
return p.Processors(ctx)
}
func init() {
LoggingReceiverTypes.RegisterType(func() LoggingReceiver { return &LoggingReceiverWindowsEventLog{} }, platform.Windows)
}
// A LoggingReceiverSystemd represents the user configuration for a Systemd/journald receiver.
type LoggingReceiverSystemd struct {
ConfigComponent `yaml:",inline"`
}
func (r LoggingReceiverSystemd) Type() string {
return "systemd_journald"
}
func (r LoggingReceiverSystemd) Components(ctx context.Context, tag string) []fluentbit.Component {
input := []fluentbit.Component{{
Kind: "INPUT",
Config: map[string]string{
// https://docs.fluentbit.io/manual/pipeline/inputs/systemd
"Name": "systemd",
"Tag": tag,
"DB": DBPath(tag),
},
}}
filters := fluentbit.TranslationComponents(tag, "PRIORITY", "logging.googleapis.com/severity", false,
[]struct{ SrcVal, DestVal string }{
{"7", "DEBUG"},
{"6", "INFO"},
{"5", "NOTICE"},
{"4", "WARNING"},
{"3", "ERROR"},
{"2", "CRITICAL"},
{"1", "ALERT"},
{"0", "EMERGENCY"},
})
input = append(input, filters...)
input = append(input, fluentbit.Component{
Kind: "FILTER",
Config: map[string]string{
"Name": "modify",
"Match": tag,
"Condition": fmt.Sprintf("Key_exists %s", "CODE_FILE"),
"Copy": fmt.Sprintf("CODE_FILE %s", "logging.googleapis.com/sourceLocation/file"),
},
})
input = append(input, fluentbit.Component{
Kind: "FILTER",
Config: map[string]string{
"Name": "modify",
"Match": tag,
"Condition": fmt.Sprintf("Key_exists %s", "CODE_FUNC"),
"Copy": fmt.Sprintf("CODE_FUNC %s", "logging.googleapis.com/sourceLocation/function"),
},
})
input = append(input, fluentbit.Component{
Kind: "FILTER",
Config: map[string]string{
"Name": "modify",
"Match": tag,
"Condition": fmt.Sprintf("Key_exists %s", "CODE_LINE"),
"Copy": fmt.Sprintf("CODE_LINE %s", "logging.googleapis.com/sourceLocation/line"),
},
})
input = append(input, fluentbit.Component{
Kind: "FILTER",
Config: map[string]string{
"Name": "nest",
"Match": tag,
"Operation": "nest",
"Wildcard": "logging.googleapis.com/sourceLocation/*",
"Nest_under": "logging.googleapis.com/sourceLocation",
"Remove_prefix": "logging.googleapis.com/sourceLocation/",
},
})
return input
}
func (r LoggingReceiverSystemd) Pipelines(ctx context.Context) ([]otel.ReceiverPipeline, error) {
receiver_config := map[string]any{
"start_at": "beginning",
"priority": "debug",
}
modify_fields_processors, err := LoggingProcessorModifyFields{
Fields: map[string]*ModifyField{
`severity`: {
CopyFrom: "jsonPayload.PRIORITY",
MapValues: map[string]string{
"7": "DEBUG",
"6": "INFO",
"5": "NOTICE",
"4": "WARNING",
"3": "ERROR",
"2": "CRITICAL",
"1": "ALERT",
"0": "EMERGENCY",
},
MapValuesExclusive: true,
},
`sourceLocation.file`: {
CopyFrom: "jsonPayload.CODE_FILE",
},
`sourceLocation.func`: {
CopyFrom: "jsonPayload.CODE_FUNC",
},
`sourceLocation.line`: {
CopyFrom: "jsonPayload.CODE_LINE",
Type: "integer",
},
},
}.Processors(ctx)
if err != nil {
return nil, err
}
return []otel.ReceiverPipeline{{
Receiver: otel.Component{
Type: "journald",
Config: receiver_config,
},
Processors: map[string][]otel.Component{
"logs": modify_fields_processors,
},
ExporterTypes: map[string]otel.ExporterType{
"logs": otel.OTel,
},
}}, nil
}
func init() {
LoggingReceiverTypes.RegisterType(func() LoggingReceiver { return &LoggingReceiverSystemd{} }, platform.Linux)
}