in confgenerator/logging_receivers.go [79:170]
func (r LoggingReceiverFilesMixin) Components(ctx context.Context, tag string) []fluentbit.Component {
if len(r.IncludePaths) == 0 {
// No files -> no input.
return nil
}
config := map[string]string{
// https://docs.fluentbit.io/manual/pipeline/inputs/tail#config
"Name": "tail",
"Tag": tag,
// TODO: Escaping?
"Path": strings.Join(r.IncludePaths, ","),
"DB": DBPath(tag),
// DB.locking specifies that the database will be accessed only by Fluent Bit.
// Enabling this feature helps to increase performance when accessing the database
// but it restrict any external tool to query the content.
"DB.locking": "true",
"Read_from_Head": "True",
// Set the chunk limit conservatively to avoid exceeding the recommended chunk size of 5MB per write request.
"Buffer_Chunk_Size": "512k",
// Set the max size a bit larger to accommodate for long log lines.
"Buffer_Max_Size": "2M",
// When a message is unstructured (no parser applied), append it under a key named "message".
"Key": "message",
// Increase this to 30 seconds so log rotations are handled more gracefully.
"Rotate_Wait": "30",
// Skip long lines instead of skipping the entire file when a long line exceeds buffer size.
"Skip_Long_Lines": "On",
// https://docs.fluentbit.io/manual/administration/buffering-and-storage#input-section-configuration
// Buffer in disk to improve reliability.
"storage.type": "filesystem",
// https://docs.fluentbit.io/manual/administration/backpressure#mem_buf_limit
// This controls how much data the input plugin can hold in memory once the data is ingested into the core.
// This is used to deal with backpressure scenarios (e.g: cannot flush data for some reason).
// When the input plugin hits "mem_buf_limit", because we have enabled filesystem storage type, mem_buf_limit acts
// as a hint to set "how much data can be up in memory", once the limit is reached it continues writing to disk.
"Mem_Buf_Limit": "10M",
}
if len(r.ExcludePaths) > 0 {
// TODO: Escaping?
config["Exclude_Path"] = strings.Join(r.ExcludePaths, ",")
}
if r.WildcardRefreshInterval != nil {
refreshIntervalSeconds := int(r.WildcardRefreshInterval.Seconds())
config["Refresh_Interval"] = strconv.Itoa(refreshIntervalSeconds)
}
if r.RecordLogFilePath != nil && *r.RecordLogFilePath == true {
config["Path_Key"] = "agent.googleapis.com/log_file_path"
}
if r.BufferInMemory {
config["storage.type"] = "memory"
}
c := []fluentbit.Component{}
if len(r.MultilineRules) > 0 {
// Configure multiline in the input component;
// This is necessary, since using the multiline filter will not work
// if a multiline message spans between two chunks.
rules := [][2]string{}
for _, rule := range r.MultilineRules {
rules = append(rules, [2]string{"rule", rule.AsString()})
}
parserName := fmt.Sprintf("multiline.%s", tag)
c = append(c, fluentbit.Component{
Kind: "MULTILINE_PARSER",
Config: map[string]string{
"name": parserName,
"type": "regex",
"flush_timeout": "5000",
},
OrderedConfig: rules,
})
// See https://docs.fluentbit.io/manual/pipeline/inputs/tail#multiline-core-v1.8
config["multiline.parser"] = parserName
// multiline parser outputs to a "log" key, but we expect "message" as the output of this pipeline
c = append(c, modify.NewRenameOptions("log", "message").Component(tag))
}
c = append(c, fluentbit.Component{
Kind: "INPUT",
Config: config,
})
return c
}