in apps/mysql.go [317:568]
func (p LoggingProcessorMysqlSlow) Components(ctx context.Context, tag string, uid string) []fluentbit.Component {
modifyFields := map[string]*confgenerator.ModifyField{
InstrumentationSourceLabel: instrumentationSourceValue(p.Type()),
}
// This format is for MySQL 8.0.14+
// Fields are split into this array to improve readability of the regex
mySQLFields := strings.Join([]string{
// Always present slow query log fields
`\s+Query_time:\s+(?<queryTime>[\d\.]+)`,
`\s+Lock_time:\s+(?<lockTime>[\d\.]+)`,
`\s+Rows_sent:\s+(?<rowsSent>\d+)`,
`\s+Rows_examined:\s(?<rowsExamined>\d+)`,
// Extra fields present if log_slow_extra == ON
`(?:\s+Thread_id:\s+\d+)?`, // Field also present in the 2nd line of the multiline log
`(?:\s+Errno:\s(?<errorNumber>\d+))?`,
`(?:\s+Killed:\s(?<killed>\d+))?`,
`(?:\s+Bytes_received:\s(?<bytesReceived>\d+))?`,
`(?:\s+Bytes_sent:\s(?<bytesSent>\d+))?`,
`(?:\s+Read_first:\s(?<readFirst>\d+))?`,
`(?:\s+Read_last:\s(?<readLast>\d+))?`,
`(?:\s+Read_key:\s(?<readKey>\d+))?`,
`(?:\s+Read_next:\s(?<readNext>\d+))?`,
`(?:\s+Read_prev:\s(?<readPrev>\d+))?`,
`(?:\s+Read_rnd:\s(?<readRnd>\d+))?`,
`(?:\s+Read_rnd_next:\s(?<readRndNext>\d+))?`,
`(?:\s+Sort_merge_passes:\s(?<sortMergePasses>\d+))?`,
`(?:\s+Sort_range_count:\s(?<sortRangeCount>\d+))?`,
`(?:\s+Sort_rows:\s(?<sortRows>\d+))?`,
`(?:\s+Sort_scan_count:\s(?<sortScanCount>\d+))?`,
`(?:\s+Created_tmp_disk_tables:\s(?<createdTmpDiskTables>\d+))?`,
`(?:\s+Created_tmp_tables:\s(?<createdTmpTables>\d+))?`,
`(?:\s+Start:\s(?<startTime>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d+Z))?`,
`(?:\s+End:\s(?<endTime>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d+Z))?`,
}, "")
parsers := []confgenerator.RegexParser{{
// Fields documented: https://dev.mysql.com/doc/refman/8.0/en/slow-query-log.html
// Sample line: # Time: 2021-10-12T01:13:38.132884Z
// # User@Host: root[root] @ localhost [] Id: 15
// # Query_time: 0.001855 Lock_time: 0.000000 Rows_sent: 0 Rows_examined: 0
// SET timestamp=1634001218;
// SET GLOBAL slow_query_log = 1;
// Extra fields w/ low_slow_extra = 'ON'
// Sample line: # Time: 2021-10-12T01:34:15.231930Z
// # User@Host: root[root] @ localhost [] Id: 21
// # Query_time: 0.012740 Lock_time: 0.000810 Rows_sent: 327 Rows_examined: 586 Thread_id: 21 Errno: 0 Killed: 0 Bytes_received: 0 Bytes_sent: 41603 Read_first: 2 Read_last: 0 Read_key: 361 Read_next: 361 Read_prev: 0 Read_rnd: 0 Read_rnd_next: 5 Sort_merge_passes: 0 Sort_range_count: 0 Sort_rows: 0 Sort_scan_count: 0 Created_tmp_disk_tables: 0 Created_tmp_tables: 0 Start: 2021-10-12T01:34:15.219190Z End: 2021-10-12T01:34:15.231930Z
// SET timestamp=1634002455;
// select * from information_schema.tables;
Regex: fmt.Sprintf(
`^(?:# Time: (?<time>%s)\s)?# User@Host:\s+(?<user>[^\[]*)\[(?<database>[^\]]*)\]\s+@\s+((?<host>[^\s]+)\s)?\[(?:(?<ipAddress>[\w\d\.:]+)?)\]\s+Id:\s+(?<tid>\d+)\s+#%s\s+(?<message>[\s\S]+)`,
timeRegexMySQLNew,
mySQLFields,
),
Parser: confgenerator.ParserShared{
TimeKey: "time",
TimeFormat: timeFormatMySQLNew,
Types: map[string]string{
"tid": "integer",
"queryTime": "float",
"lockTime": "float",
"rowsSent": "integer",
"rowsExamined": "integer",
"errorNumber": "integer",
"killed": "integer",
"bytesReceived": "integer",
"bytesSent": "integer",
"readFirst": "integer",
"readLast": "integer",
"readKey": "integer",
"readNext": "integer",
"readPrev": "integer",
"readRnd": "integer",
"readRndNext": "integer",
"sortMergePasses": "integer",
"sortRangeCount": "integer",
"sortRows": "integer",
"sortScanCount": "integer",
"createdTmpDiskTables": "integer",
"createdTmpTables": "integer",
},
},
}}
// This format is for old MySQL and all MariaDB.
// Docs:
// https://mariadb.com/kb/en/slow-query-log-extended-statistics/
// https://mariadb.com/kb/en/explain-in-the-slow-query-log/
// Sample MariaDB line:
// # User@Host: root[root] @ localhost []
// # Thread_id: 32 Schema: dbt3sf1 QC_hit: No
// # Query_time: 0.000130 Lock_time: 0.000068 Rows_sent: 0 Rows_examined: 0
// # Rows_affected: 0 Bytes_sent: 1351
// SET timestamp=1689286831;
// SELECT OBJECT_SCHEMA, OBJECT_NAME, COUNT_DELETE, COUNT_FETCH, COUNT_INSERT, COUNT_UPDATE,SUM_TIMER_DELETE, SUM_TIMER_FETCH, SUM_TIMER_INSERT, SUM_TIMER_UPDATE FROM performance_schema.table_io_waits_summary_by_table WHERE OBJECT_SCHEMA NOT IN ('mysql', 'performance_schema');
const (
float = `[\d\.]+`
integer = `\d+`
boolean = `Yes|No`
)
oldFields := [][]struct {
identifier, jsonField, regex string
}{
{
// "# Thread_id: %lu Schema: %s QC_hit: %s\n"
{"Thread_id", "tid", integer},
{"Schema", "database", `\S*`}, // N.B. MariaDB will still show the field with an empty string if the connection doesn't have an active database.
{"QC_hit", "queryCacheHit", boolean},
},
{
// "# Query_time: %s Lock_time: %s Rows_sent: %lu Rows_examined: %lu\n"
{"Query_time", "queryTime", float},
{"Lock_time", "lockTime", float},
{"Rows_sent", "rowsSent", integer},
{"Rows_examined", "rowsExamined", integer},
},
{
// MariaDB 10.3.1+
// "# Rows_affected: %lu Bytes_sent: %lu\n",
{"Rows_affected", "rowsAffected", integer},
{"Bytes_sent", "bytesSent", integer},
},
{
// MariaDB 5.5.37+ if thd->tmp_tables_used with LOG_SLOW_VERBOSITY_QUERY_PLAN
// "# Tmp_tables: %lu Tmp_disk_tables: %lu Tmp_table_sizes: %s\n"
{"Tmp_tables", "createdTmpTables", integer},
{"Tmp_disk_tables", "createdTmpDiskTables", integer},
{"Tmp_table_sizes", "createdTmpTableSizes", integer},
},
{
// MariaDB 10.3.4+ if thd->spcont != NULL
// "# Stored_routine: %s\n"
{"Stored_routine", "storedRoutine", `\S+`},
},
{
// MariaDB 5.5.37+ with LOG_SLOW_VERBOSITY_QUERY_PLAN
// "# Full_scan: %s Full_join: %s Tmp_table: %s Tmp_table_on_disk: %s\n"
{"Full_scan", "fullScan", boolean},
{"Full_join", "fullJoin", boolean},
{"Tmp_table", "", boolean},
{"Tmp_table_on_disk", "", boolean},
},
{
// MariaDB 5.5.37+ with LOG_SLOW_VERBOSITY_QUERY_PLAN
// "# Filesort: %s Filesort_on_disk: %s Merge_passes: %lu Priority_queue: %s\n",
{"Filesort", "filesort", boolean},
{"Filesort_on_disk", "filesortOnDisk", boolean},
{"Merge_passes", "sortMergePasses", integer},
{"Priority_queue", "priorityQueue", boolean},
},
}
// LOG_SLOW_VERBOSITY_EXPLAIN causes additional comment lines
// to be added containing the output of EXPLAIN; it's probably
// not worth parsing them since they're somewhat freeform.
oldLines := []string{
fmt.Sprintf(`^(?:# Time: (?<time>%s)\s)?`, timeRegexOld),
// N.B. MySQL logs two usernames (i.e. "root[root]"). The first username is the "priv_user", i.e. the username used for privilege checking.
// The second username is the "user", which is the string the user provided when connecting.
// We only report the priv_user here.
// See https://dev.mysql.com/doc/refman/8.0/en/audit-log-file-formats.html
`# User@Host:\s+(?<user>[^\[]*)\[[^\]]*\]\s+@\s+((?<host>[^\s]+)\s)?\[(?:(?<ipAddress>[\w\d\.:]+)?)\]`,
}
oldTypes := make(map[string]string)
for _, lineFields := range oldFields {
var out []string
for _, field := range lineFields {
valueRegex := fmt.Sprintf(`(?:%s)`, field.regex)
if field.jsonField != "" {
valueRegex = fmt.Sprintf(`(?<%s>%s)`, field.jsonField, field.regex)
switch field.regex {
case float:
oldTypes[field.jsonField] = "float"
case integer:
oldTypes[field.jsonField] = "integer"
case boolean:
modifyFields[fmt.Sprintf(`jsonPayload.%s`, field.jsonField)] = &confgenerator.ModifyField{
Type: "YesNoBoolean",
}
}
}
optional := "?"
if len(out) == 0 {
// First field on each line is not optional.
// Otherwise we'll consume the "# " of the following line and prevent it from matching the next line's regex.
optional = ""
}
out = append(out, fmt.Sprintf(
`(?:\s+%s:\s%s)%s`,
field.identifier,
valueRegex,
optional,
))
}
oldLines = append(oldLines, fmt.Sprintf(
`(?:\s+#%s)?`,
strings.Join(out, ""),
))
}
oldLines = append(oldLines, `\s+(?<message>[\s\S]+)`)
parsers = append(parsers, confgenerator.RegexParser{
Regex: strings.Join(oldLines, ""),
Parser: confgenerator.ParserShared{
TimeKey: "time",
TimeFormat: timeFormatOld,
Types: oldTypes,
},
})
c := confgenerator.LoggingProcessorParseMultilineRegex{
LoggingProcessorParseRegexComplex: confgenerator.LoggingProcessorParseRegexComplex{
Parsers: parsers,
},
Rules: []confgenerator.MultilineRule{
// Logs start with Time: or User@Host: (omitting time if it's the same as the previous entry).
{
StateName: "start_state",
NextState: "comment",
Regex: fmt.Sprintf(
`^# (User@Host: |Time: (%s|%s))`,
timeRegexMySQLNew,
timeRegexOld,
),
},
// Explicitly consume the next line, which might be User@Host.
{
StateName: "comment",
NextState: "cont",
Regex: `^# `,
},
// Then consume everything until the next Time or User@Host.
{
StateName: "cont",
NextState: "cont",
Regex: fmt.Sprintf(
`^(?!# (User@Host: |Time: (%s|%s)))`,
timeRegexMySQLNew,
timeRegexOld,
),
},
},
}.Components(ctx, tag, uid)
c = append(c,
confgenerator.LoggingProcessorModifyFields{
Fields: modifyFields,
}.Components(ctx, tag, uid)...,
)
return c
}