func()

in apps/mongodb.go [126:199]


func (p *LoggingProcessorMongodb) jsonParserWithTimeKey(ctx context.Context, tag, uid string) []fluentbit.Component {
	c := []fluentbit.Component{}

	jsonParser := &confgenerator.LoggingProcessorParseJson{
		ParserShared: confgenerator.ParserShared{
			TimeKey:    "time",
			TimeFormat: "%Y-%m-%dT%H:%M:%S.%L%z",
			Types: map[string]string{
				"id":      "integer",
				"message": "string",
			},
		},
	}
	jpComponents := jsonParser.Components(ctx, tag, uid)

	// The parserFilterComponent is the actual filter component that configures and defines
	// which parser to use. We need the component to determine which parser to use when
	// re-parsing below. Each time a parser filter is used, there are 2 filter components right
	// before it to account for the nest lua script (see confgenerator/fluentbit/parse_deduplication.go).
	// Therefore, the parse filter component is actually the third component in the list.
	parserFilterComponent := jpComponents[2]
	c = append(c, jpComponents...)

	tempPrefix := "temp_ts_"
	timeKey := "time"
	// have to bring $date to top level in order for it to be parsed as timeKey
	// see https://github.com/fluent/fluent-bit/issues/1013
	liftTs := fluentbit.Component{
		Kind: "FILTER",
		Config: map[string]string{
			"Name":         "nest",
			"Match":        tag,
			"Operation":    "lift",
			"Nested_under": "t",
			"Add_prefix":   tempPrefix,
		},
	}

	renameTsOption := modify.NewHardRenameOptions(fmt.Sprintf("%s$date", tempPrefix), timeKey)
	renameTs := renameTsOption.Component(tag)

	c = append(c, liftTs, renameTs)

	// IMPORTANT: now that we have lifted the json to top level
	// we need to re-parse in order to properly set time at the
	// parser level
	nestFilters := fluentbit.LuaFilterComponents(tag, fluentbit.ParserNestLuaFunction, fmt.Sprintf(fluentbit.ParserNestLuaScriptContents, "message"))
	parserFilter := fluentbit.Component{
		Kind: "FILTER",
		Config: map[string]string{
			"Name":         "parser",
			"Match":        tag,
			"Key_Name":     "message",
			"Reserve_Data": "True",
			"Parser":       parserFilterComponent.OrderedConfig[0][1],
		},
	}
	mergeFilters := fluentbit.LuaFilterComponents(tag, fluentbit.ParserMergeLuaFunction, fluentbit.ParserMergeLuaScriptContents)
	c = append(c, nestFilters...)
	c = append(c, parserFilter)
	c = append(c, mergeFilters...)

	removeTimestamp := fluentbit.Component{
		Kind: "FILTER",
		Config: map[string]string{
			"Name":   "modify",
			"Match":  tag,
			"Remove": timeKey,
		},
	}
	c = append(c, removeTimestamp)

	return c
}