func()

in confgenerator/logging_receivers.go [79:170]


func (r LoggingReceiverFilesMixin) Components(ctx context.Context, tag string) []fluentbit.Component {
	if len(r.IncludePaths) == 0 {
		// No files -> no input.
		return nil
	}
	config := map[string]string{
		// https://docs.fluentbit.io/manual/pipeline/inputs/tail#config
		"Name": "tail",
		"Tag":  tag,
		// TODO: Escaping?
		"Path": strings.Join(r.IncludePaths, ","),
		"DB":   DBPath(tag),
		// DB.locking specifies that the database will be accessed only by Fluent Bit.
		// Enabling this feature helps to increase performance when accessing the database
		// but it restrict any external tool to query the content.
		"DB.locking":     "true",
		"Read_from_Head": "True",
		// Set the chunk limit conservatively to avoid exceeding the recommended chunk size of 5MB per write request.
		"Buffer_Chunk_Size": "512k",
		// Set the max size a bit larger to accommodate for long log lines.
		"Buffer_Max_Size": "2M",
		// When a message is unstructured (no parser applied), append it under a key named "message".
		"Key": "message",
		// Increase this to 30 seconds so log rotations are handled more gracefully.
		"Rotate_Wait": "30",
		// Skip long lines instead of skipping the entire file when a long line exceeds buffer size.
		"Skip_Long_Lines": "On",

		// https://docs.fluentbit.io/manual/administration/buffering-and-storage#input-section-configuration
		// Buffer in disk to improve reliability.
		"storage.type": "filesystem",

		// https://docs.fluentbit.io/manual/administration/backpressure#mem_buf_limit
		// This controls how much data the input plugin can hold in memory once the data is ingested into the core.
		// This is used to deal with backpressure scenarios (e.g: cannot flush data for some reason).
		// When the input plugin hits "mem_buf_limit", because we have enabled filesystem storage type, mem_buf_limit acts
		// as a hint to set "how much data can be up in memory", once the limit is reached it continues writing to disk.
		"Mem_Buf_Limit": "10M",
	}
	if len(r.ExcludePaths) > 0 {
		// TODO: Escaping?
		config["Exclude_Path"] = strings.Join(r.ExcludePaths, ",")
	}
	if r.WildcardRefreshInterval != nil {
		refreshIntervalSeconds := int(r.WildcardRefreshInterval.Seconds())
		config["Refresh_Interval"] = strconv.Itoa(refreshIntervalSeconds)
	}

	if r.RecordLogFilePath != nil && *r.RecordLogFilePath == true {
		config["Path_Key"] = "agent.googleapis.com/log_file_path"
	}

	if r.BufferInMemory {
		config["storage.type"] = "memory"
	}

	c := []fluentbit.Component{}

	if len(r.MultilineRules) > 0 {
		// Configure multiline in the input component;
		// This is necessary, since using the multiline filter will not work
		// if a multiline message spans between two chunks.
		rules := [][2]string{}
		for _, rule := range r.MultilineRules {
			rules = append(rules, [2]string{"rule", rule.AsString()})
		}

		parserName := fmt.Sprintf("multiline.%s", tag)

		c = append(c, fluentbit.Component{
			Kind: "MULTILINE_PARSER",
			Config: map[string]string{
				"name":          parserName,
				"type":          "regex",
				"flush_timeout": "5000",
			},
			OrderedConfig: rules,
		})
		// See https://docs.fluentbit.io/manual/pipeline/inputs/tail#multiline-core-v1.8
		config["multiline.parser"] = parserName

		// multiline parser outputs to a "log" key, but we expect "message" as the output of this pipeline
		c = append(c, modify.NewRenameOptions("log", "message").Component(tag))
	}

	c = append(c, fluentbit.Component{
		Kind:   "INPUT",
		Config: config,
	})

	return c
}