func()

in apps/mysql.go [317:568]


func (p LoggingProcessorMysqlSlow) Components(ctx context.Context, tag string, uid string) []fluentbit.Component {
	modifyFields := map[string]*confgenerator.ModifyField{
		InstrumentationSourceLabel: instrumentationSourceValue(p.Type()),
	}

	// This format is for MySQL 8.0.14+
	// Fields are split into this array to improve readability of the regex
	mySQLFields := strings.Join([]string{
		// Always present slow query log fields
		`\s+Query_time:\s+(?<queryTime>[\d\.]+)`,
		`\s+Lock_time:\s+(?<lockTime>[\d\.]+)`,
		`\s+Rows_sent:\s+(?<rowsSent>\d+)`,
		`\s+Rows_examined:\s(?<rowsExamined>\d+)`,

		// Extra fields present if log_slow_extra == ON
		`(?:\s+Thread_id:\s+\d+)?`, // Field also present in the 2nd line of the multiline log
		`(?:\s+Errno:\s(?<errorNumber>\d+))?`,
		`(?:\s+Killed:\s(?<killed>\d+))?`,
		`(?:\s+Bytes_received:\s(?<bytesReceived>\d+))?`,
		`(?:\s+Bytes_sent:\s(?<bytesSent>\d+))?`,
		`(?:\s+Read_first:\s(?<readFirst>\d+))?`,
		`(?:\s+Read_last:\s(?<readLast>\d+))?`,
		`(?:\s+Read_key:\s(?<readKey>\d+))?`,
		`(?:\s+Read_next:\s(?<readNext>\d+))?`,
		`(?:\s+Read_prev:\s(?<readPrev>\d+))?`,
		`(?:\s+Read_rnd:\s(?<readRnd>\d+))?`,
		`(?:\s+Read_rnd_next:\s(?<readRndNext>\d+))?`,
		`(?:\s+Sort_merge_passes:\s(?<sortMergePasses>\d+))?`,
		`(?:\s+Sort_range_count:\s(?<sortRangeCount>\d+))?`,
		`(?:\s+Sort_rows:\s(?<sortRows>\d+))?`,
		`(?:\s+Sort_scan_count:\s(?<sortScanCount>\d+))?`,
		`(?:\s+Created_tmp_disk_tables:\s(?<createdTmpDiskTables>\d+))?`,
		`(?:\s+Created_tmp_tables:\s(?<createdTmpTables>\d+))?`,
		`(?:\s+Start:\s(?<startTime>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d+Z))?`,
		`(?:\s+End:\s(?<endTime>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d+Z))?`,
	}, "")
	parsers := []confgenerator.RegexParser{{
		// Fields documented: https://dev.mysql.com/doc/refman/8.0/en/slow-query-log.html
		// Sample line: # Time: 2021-10-12T01:13:38.132884Z
		//              # User@Host: root[root] @ localhost []  Id:    15
		//              # Query_time: 0.001855  Lock_time: 0.000000 Rows_sent: 0  Rows_examined: 0
		//              SET timestamp=1634001218;
		//              SET GLOBAL slow_query_log = 1;
		// Extra fields w/ low_slow_extra = 'ON'
		// Sample line: # Time: 2021-10-12T01:34:15.231930Z
		//              # User@Host: root[root] @ localhost []  Id:    21
		//              # Query_time: 0.012740  Lock_time: 0.000810 Rows_sent: 327  Rows_examined: 586 Thread_id: 21 Errno: 0 Killed: 0 Bytes_received: 0 Bytes_sent: 41603 Read_first: 2 Read_last: 0 Read_key: 361 Read_next: 361 Read_prev: 0 Read_rnd: 0 Read_rnd_next: 5 Sort_merge_passes: 0 Sort_range_count: 0 Sort_rows: 0 Sort_scan_count: 0 Created_tmp_disk_tables: 0 Created_tmp_tables: 0 Start: 2021-10-12T01:34:15.219190Z End: 2021-10-12T01:34:15.231930Z
		//              SET timestamp=1634002455;
		//              select * from information_schema.tables;
		Regex: fmt.Sprintf(
			`^(?:# Time: (?<time>%s)\s)?# User@Host:\s+(?<user>[^\[]*)\[(?<database>[^\]]*)\]\s+@\s+((?<host>[^\s]+)\s)?\[(?:(?<ipAddress>[\w\d\.:]+)?)\]\s+Id:\s+(?<tid>\d+)\s+#%s\s+(?<message>[\s\S]+)`,
			timeRegexMySQLNew,
			mySQLFields,
		),
		Parser: confgenerator.ParserShared{
			TimeKey:    "time",
			TimeFormat: timeFormatMySQLNew,
			Types: map[string]string{
				"tid":                  "integer",
				"queryTime":            "float",
				"lockTime":             "float",
				"rowsSent":             "integer",
				"rowsExamined":         "integer",
				"errorNumber":          "integer",
				"killed":               "integer",
				"bytesReceived":        "integer",
				"bytesSent":            "integer",
				"readFirst":            "integer",
				"readLast":             "integer",
				"readKey":              "integer",
				"readNext":             "integer",
				"readPrev":             "integer",
				"readRnd":              "integer",
				"readRndNext":          "integer",
				"sortMergePasses":      "integer",
				"sortRangeCount":       "integer",
				"sortRows":             "integer",
				"sortScanCount":        "integer",
				"createdTmpDiskTables": "integer",
				"createdTmpTables":     "integer",
			},
		},
	}}

	// This format is for old MySQL and all MariaDB.
	// Docs:
	//   https://mariadb.com/kb/en/slow-query-log-extended-statistics/
	//   https://mariadb.com/kb/en/explain-in-the-slow-query-log/
	// Sample MariaDB line:
	// # User@Host: root[root] @ localhost []
	// # Thread_id: 32  Schema: dbt3sf1  QC_hit: No
	// # Query_time: 0.000130  Lock_time: 0.000068  Rows_sent: 0  Rows_examined: 0
	// # Rows_affected: 0  Bytes_sent: 1351
	// SET timestamp=1689286831;
	// SELECT OBJECT_SCHEMA, OBJECT_NAME, COUNT_DELETE, COUNT_FETCH, COUNT_INSERT, COUNT_UPDATE,SUM_TIMER_DELETE, SUM_TIMER_FETCH, SUM_TIMER_INSERT, SUM_TIMER_UPDATE FROM performance_schema.table_io_waits_summary_by_table WHERE OBJECT_SCHEMA NOT IN ('mysql', 'performance_schema');

	const (
		float   = `[\d\.]+`
		integer = `\d+`
		boolean = `Yes|No`
	)

	oldFields := [][]struct {
		identifier, jsonField, regex string
	}{
		{
			// "# Thread_id: %lu  Schema: %s  QC_hit: %s\n"
			{"Thread_id", "tid", integer},
			{"Schema", "database", `\S*`}, // N.B. MariaDB will still show the field with an empty string if the connection doesn't have an active database.
			{"QC_hit", "queryCacheHit", boolean},
		},
		{
			// "# Query_time: %s  Lock_time: %s  Rows_sent: %lu  Rows_examined: %lu\n"
			{"Query_time", "queryTime", float},
			{"Lock_time", "lockTime", float},
			{"Rows_sent", "rowsSent", integer},
			{"Rows_examined", "rowsExamined", integer},
		},
		{
			// MariaDB 10.3.1+
			// "# Rows_affected: %lu  Bytes_sent: %lu\n",
			{"Rows_affected", "rowsAffected", integer},
			{"Bytes_sent", "bytesSent", integer},
		},
		{
			// MariaDB 5.5.37+ if thd->tmp_tables_used with LOG_SLOW_VERBOSITY_QUERY_PLAN
			// "# Tmp_tables: %lu  Tmp_disk_tables: %lu  Tmp_table_sizes: %s\n"
			{"Tmp_tables", "createdTmpTables", integer},
			{"Tmp_disk_tables", "createdTmpDiskTables", integer},
			{"Tmp_table_sizes", "createdTmpTableSizes", integer},
		},
		{
			// MariaDB 10.3.4+ if thd->spcont != NULL
			// "# Stored_routine: %s\n"
			{"Stored_routine", "storedRoutine", `\S+`},
		},
		{
			// MariaDB 5.5.37+ with LOG_SLOW_VERBOSITY_QUERY_PLAN
			// "# Full_scan: %s  Full_join: %s  Tmp_table: %s  Tmp_table_on_disk: %s\n"
			{"Full_scan", "fullScan", boolean},
			{"Full_join", "fullJoin", boolean},
			{"Tmp_table", "", boolean},
			{"Tmp_table_on_disk", "", boolean},
		},
		{
			// MariaDB 5.5.37+ with LOG_SLOW_VERBOSITY_QUERY_PLAN
			// "# Filesort: %s  Filesort_on_disk: %s  Merge_passes: %lu  Priority_queue: %s\n",
			{"Filesort", "filesort", boolean},
			{"Filesort_on_disk", "filesortOnDisk", boolean},
			{"Merge_passes", "sortMergePasses", integer},
			{"Priority_queue", "priorityQueue", boolean},
		},
	}
	// LOG_SLOW_VERBOSITY_EXPLAIN causes additional comment lines
	// to be added containing the output of EXPLAIN; it's probably
	// not worth parsing them since they're somewhat freeform.
	oldLines := []string{
		fmt.Sprintf(`^(?:# Time: (?<time>%s)\s)?`, timeRegexOld),
		// N.B. MySQL logs two usernames (i.e. "root[root]"). The first username is the "priv_user", i.e. the username used for privilege checking.
		// The second username is the "user", which is the string the user provided when connecting.
		// We only report the priv_user here.
		// See https://dev.mysql.com/doc/refman/8.0/en/audit-log-file-formats.html
		`# User@Host:\s+(?<user>[^\[]*)\[[^\]]*\]\s+@\s+((?<host>[^\s]+)\s)?\[(?:(?<ipAddress>[\w\d\.:]+)?)\]`,
	}
	oldTypes := make(map[string]string)
	for _, lineFields := range oldFields {
		var out []string
		for _, field := range lineFields {
			valueRegex := fmt.Sprintf(`(?:%s)`, field.regex)
			if field.jsonField != "" {
				valueRegex = fmt.Sprintf(`(?<%s>%s)`, field.jsonField, field.regex)
				switch field.regex {
				case float:
					oldTypes[field.jsonField] = "float"
				case integer:
					oldTypes[field.jsonField] = "integer"
				case boolean:
					modifyFields[fmt.Sprintf(`jsonPayload.%s`, field.jsonField)] = &confgenerator.ModifyField{
						Type: "YesNoBoolean",
					}
				}
			}
			optional := "?"
			if len(out) == 0 {
				// First field on each line is not optional.
				// Otherwise we'll consume the "# " of the following line and prevent it from matching the next line's regex.
				optional = ""
			}
			out = append(out, fmt.Sprintf(
				`(?:\s+%s:\s%s)%s`,
				field.identifier,
				valueRegex,
				optional,
			))
		}
		oldLines = append(oldLines, fmt.Sprintf(
			`(?:\s+#%s)?`,
			strings.Join(out, ""),
		))
	}
	oldLines = append(oldLines, `\s+(?<message>[\s\S]+)`)

	parsers = append(parsers, confgenerator.RegexParser{
		Regex: strings.Join(oldLines, ""),
		Parser: confgenerator.ParserShared{
			TimeKey:    "time",
			TimeFormat: timeFormatOld,
			Types:      oldTypes,
		},
	})

	c := confgenerator.LoggingProcessorParseMultilineRegex{
		LoggingProcessorParseRegexComplex: confgenerator.LoggingProcessorParseRegexComplex{
			Parsers: parsers,
		},
		Rules: []confgenerator.MultilineRule{
			// Logs start with Time: or User@Host: (omitting time if it's the same as the previous entry).
			{
				StateName: "start_state",
				NextState: "comment",
				Regex: fmt.Sprintf(
					`^# (User@Host: |Time: (%s|%s))`,
					timeRegexMySQLNew,
					timeRegexOld,
				),
			},
			// Explicitly consume the next line, which might be User@Host.
			{
				StateName: "comment",
				NextState: "cont",
				Regex:     `^# `,
			},
			// Then consume everything until the next Time or User@Host.
			{
				StateName: "cont",
				NextState: "cont",
				Regex: fmt.Sprintf(
					`^(?!# (User@Host: |Time: (%s|%s)))`,
					timeRegexMySQLNew,
					timeRegexOld,
				),
			},
		},
	}.Components(ctx, tag, uid)

	c = append(c,
		confgenerator.LoggingProcessorModifyFields{
			Fields: modifyFields,
		}.Components(ctx, tag, uid)...,
	)
	return c
}