in packetbeat/protos/mysql/mysql.go [274:507]
func mysqlMessageParser(s *mysqlStream) (bool, bool) {
logp.Debug("mysqldetailed", "MySQL parser called. parseState = %s", s.parseState)
m := s.message
for s.parseOffset < len(s.data) {
switch s.parseState {
case mysqlStateStart:
m.start = s.parseOffset
if len(s.data[s.parseOffset:]) < 5 {
logp.Warn("MySQL Message too short. Ignore it.")
return false, false
}
hdr := s.data[s.parseOffset : s.parseOffset+5]
m.packetLength = leUint24(hdr[0:3])
m.seq = hdr[3]
m.typ = hdr[4]
logp.Debug("mysqldetailed", "MySQL Header: Packet length %d, Seq %d, Type=%d isClient=%v", m.packetLength, m.seq, m.typ, s.isClient)
if s.isClient {
// starts Command Phase
if m.seq == 0 && isRequest(m.typ) {
// parse request
m.isRequest = true
m.start = s.parseOffset
s.parseState = mysqlStateEatMessage
} else {
// ignore command
m.ignoreMessage = true
s.parseState = mysqlStateEatMessage
}
} else if !s.isClient {
// parse response
m.isRequest = false
if hdr[4] == 0x00 || hdr[4] == 0xfe {
logp.Debug("mysqldetailed", "Received OK response")
m.start = s.parseOffset
s.parseState = mysqlStateEatMessage
m.isOK = true
} else if hdr[4] == 0xff {
logp.Debug("mysqldetailed", "Received ERR response")
m.start = s.parseOffset
s.parseState = mysqlStateEatMessage
m.isError = true
} else if m.packetLength == 1 {
logp.Debug("mysqldetailed", "Query response. Number of fields %d", hdr[4])
m.numberOfFields = int(hdr[4])
m.start = s.parseOffset
s.parseOffset += 5
s.parseState = mysqlStateEatFields
} else {
// something else. ignore
m.ignoreMessage = true
s.parseState = mysqlStateEatMessage
}
}
case mysqlStateEatMessage:
if len(s.data[s.parseOffset:]) < int(m.packetLength)+4 {
// wait for more data
return true, false
}
s.parseOffset += 4 // header
s.parseOffset += int(m.packetLength)
m.end = s.parseOffset
if m.isRequest {
// get the statement id
if m.typ == mysqlCmdStmtExecute || m.typ == mysqlCmdStmtClose {
m.statementID = int(binary.LittleEndian.Uint32(s.data[m.start+5:]))
} else {
m.query = string(s.data[m.start+5 : m.end])
}
} else if m.isOK {
// affected rows
affectedRows, off, complete, err := readLinteger(s.data, m.start+5)
if !complete {
return true, false
}
if err != nil {
logp.Debug("mysql", "Error on read_linteger: %s", err)
return false, false
}
m.affectedRows = affectedRows
// last insert id
insertID, _, complete, err := readLinteger(s.data, off)
if !complete {
return true, false
}
if err != nil {
logp.Debug("mysql", "Error on read_linteger: %s", err)
return false, false
}
m.insertID = insertID
} else if m.isError {
// int<1>header (0xff)
// int<2>error code
// string[1] sql state marker
// string[5] sql state
// string<EOF> error message
m.errorCode = binary.LittleEndian.Uint16(s.data[m.start+5 : m.start+7])
m.errorInfo = string(s.data[m.start+8:m.start+13]) + ": " + string(s.data[m.start+13:])
}
m.size = uint64(m.end - m.start)
logp.Debug("mysqldetailed", "Message complete. remaining=%d",
len(s.data[s.parseOffset:]))
// PREPARE_OK packet for Prepared Statement
// a trick for classify special OK packet
if m.isOK && m.packetLength == 12 {
m.statementID = int(binary.LittleEndian.Uint32(s.data[m.start+5:]))
m.numberOfFields = int(binary.LittleEndian.Uint16(s.data[m.start+9:]))
m.numberOfParams = int(binary.LittleEndian.Uint16(s.data[m.start+11:]))
if m.numberOfFields > 0 {
s.parseState = mysqlStateEatFields
} else if m.numberOfParams > 0 {
s.parseState = mysqlStateEatRows
}
} else {
return true, true
}
case mysqlStateEatFields:
if len(s.data[s.parseOffset:]) < 4 {
// wait for more
return true, false
}
hdr := s.data[s.parseOffset : s.parseOffset+4]
m.packetLength = leUint24(hdr[:3])
m.seq = hdr[3]
if len(s.data[s.parseOffset:]) >= int(m.packetLength)+4 {
s.parseOffset += 4 // header
if s.data[s.parseOffset] == 0xfe {
logp.Debug("mysqldetailed", "Received EOF packet")
// EOF marker
s.parseOffset += int(m.packetLength)
s.parseState = mysqlStateEatRows
} else {
_ /* catalog */, off, complete, err := readLstring(s.data, s.parseOffset)
if !complete {
return true, false
}
if err != nil {
logp.Debug("mysql", "Error on read_lstring: %s", err)
return false, false
}
db /*schema */, off, complete, err := readLstring(s.data, off)
if !complete {
return true, false
}
if err != nil {
logp.Debug("mysql", "Error on read_lstring: %s", err)
return false, false
}
table /* table */, _ /*off*/, complete, err := readLstring(s.data, off)
if !complete {
return true, false
}
if err != nil {
logp.Debug("mysql", "Error on read_lstring: %s", err)
return false, false
}
dbTable := string(db) + "." + string(table)
if len(m.tables) == 0 {
m.tables = dbTable
} else if !strings.Contains(m.tables, dbTable) {
m.tables = m.tables + ", " + dbTable
}
logp.Debug("mysqldetailed", "db=%s, table=%s", db, table)
s.parseOffset += int(m.packetLength)
// go to next field
}
} else {
// wait for more
return true, false
}
case mysqlStateEatRows:
if len(s.data[s.parseOffset:]) < 4 {
// wait for more
return true, false
}
hdr := s.data[s.parseOffset : s.parseOffset+4]
m.packetLength = leUint24(hdr[:3])
m.seq = hdr[3]
logp.Debug("mysqldetailed", "Rows: packet length %d, packet number %d", m.packetLength, m.seq)
if len(s.data[s.parseOffset:]) < int(m.packetLength)+4 {
// wait for more
return true, false
}
s.parseOffset += 4 // header
if s.data[s.parseOffset] == 0xfe {
logp.Debug("mysqldetailed", "Received EOF packet")
// EOF marker
s.parseOffset += int(m.packetLength)
if m.end == 0 {
m.end = s.parseOffset
} else {
m.isTruncated = true
}
if !m.isError {
// in case the response was sent successfully
m.isOK = true
}
m.size = uint64(m.end - m.start)
return true, true
}
s.parseOffset += int(m.packetLength)
if m.end == 0 && s.parseOffset > maxPayloadSize {
// only send up to here, but read until the end
m.end = s.parseOffset
}
m.numberOfRows++
// go to next row
}
}
return true, false
}