func mysqlMessageParser()

in packetbeat/protos/mysql/mysql.go [274:507]


func mysqlMessageParser(s *mysqlStream) (bool, bool) {
	logp.Debug("mysqldetailed", "MySQL parser called. parseState = %s", s.parseState)

	m := s.message
	for s.parseOffset < len(s.data) {
		switch s.parseState {
		case mysqlStateStart:
			m.start = s.parseOffset
			if len(s.data[s.parseOffset:]) < 5 {
				logp.Warn("MySQL Message too short. Ignore it.")
				return false, false
			}
			hdr := s.data[s.parseOffset : s.parseOffset+5]
			m.packetLength = leUint24(hdr[0:3])
			m.seq = hdr[3]
			m.typ = hdr[4]

			logp.Debug("mysqldetailed", "MySQL Header: Packet length %d, Seq %d, Type=%d isClient=%v", m.packetLength, m.seq, m.typ, s.isClient)

			if s.isClient {
				// starts Command Phase

				if m.seq == 0 && isRequest(m.typ) {
					// parse request
					m.isRequest = true
					m.start = s.parseOffset
					s.parseState = mysqlStateEatMessage
				} else {
					// ignore command
					m.ignoreMessage = true
					s.parseState = mysqlStateEatMessage
				}
			} else if !s.isClient {
				// parse response
				m.isRequest = false

				if hdr[4] == 0x00 || hdr[4] == 0xfe {
					logp.Debug("mysqldetailed", "Received OK response")
					m.start = s.parseOffset
					s.parseState = mysqlStateEatMessage
					m.isOK = true
				} else if hdr[4] == 0xff {
					logp.Debug("mysqldetailed", "Received ERR response")
					m.start = s.parseOffset
					s.parseState = mysqlStateEatMessage
					m.isError = true
				} else if m.packetLength == 1 {
					logp.Debug("mysqldetailed", "Query response. Number of fields %d", hdr[4])
					m.numberOfFields = int(hdr[4])
					m.start = s.parseOffset
					s.parseOffset += 5
					s.parseState = mysqlStateEatFields
				} else {
					// something else. ignore
					m.ignoreMessage = true
					s.parseState = mysqlStateEatMessage
				}
			}

		case mysqlStateEatMessage:
			if len(s.data[s.parseOffset:]) < int(m.packetLength)+4 {
				// wait for more data
				return true, false
			}

			s.parseOffset += 4 // header
			s.parseOffset += int(m.packetLength)
			m.end = s.parseOffset
			if m.isRequest {
				// get the statement id
				if m.typ == mysqlCmdStmtExecute || m.typ == mysqlCmdStmtClose {
					m.statementID = int(binary.LittleEndian.Uint32(s.data[m.start+5:]))
				} else {
					m.query = string(s.data[m.start+5 : m.end])
				}
			} else if m.isOK {
				// affected rows
				affectedRows, off, complete, err := readLinteger(s.data, m.start+5)
				if !complete {
					return true, false
				}
				if err != nil {
					logp.Debug("mysql", "Error on read_linteger: %s", err)
					return false, false
				}
				m.affectedRows = affectedRows

				// last insert id
				insertID, _, complete, err := readLinteger(s.data, off)
				if !complete {
					return true, false
				}
				if err != nil {
					logp.Debug("mysql", "Error on read_linteger: %s", err)
					return false, false
				}
				m.insertID = insertID
			} else if m.isError {
				// int<1>header (0xff)
				// int<2>error code
				// string[1] sql state marker
				// string[5] sql state
				// string<EOF> error message
				m.errorCode = binary.LittleEndian.Uint16(s.data[m.start+5 : m.start+7])

				m.errorInfo = string(s.data[m.start+8:m.start+13]) + ": " + string(s.data[m.start+13:])
			}
			m.size = uint64(m.end - m.start)
			logp.Debug("mysqldetailed", "Message complete. remaining=%d",
				len(s.data[s.parseOffset:]))

			// PREPARE_OK packet for Prepared Statement
			// a trick for classify special OK packet
			if m.isOK && m.packetLength == 12 {
				m.statementID = int(binary.LittleEndian.Uint32(s.data[m.start+5:]))
				m.numberOfFields = int(binary.LittleEndian.Uint16(s.data[m.start+9:]))
				m.numberOfParams = int(binary.LittleEndian.Uint16(s.data[m.start+11:]))
				if m.numberOfFields > 0 {
					s.parseState = mysqlStateEatFields
				} else if m.numberOfParams > 0 {
					s.parseState = mysqlStateEatRows
				}
			} else {
				return true, true
			}
		case mysqlStateEatFields:
			if len(s.data[s.parseOffset:]) < 4 {
				// wait for more
				return true, false
			}

			hdr := s.data[s.parseOffset : s.parseOffset+4]
			m.packetLength = leUint24(hdr[:3])
			m.seq = hdr[3]

			if len(s.data[s.parseOffset:]) >= int(m.packetLength)+4 {
				s.parseOffset += 4 // header

				if s.data[s.parseOffset] == 0xfe {
					logp.Debug("mysqldetailed", "Received EOF packet")
					// EOF marker
					s.parseOffset += int(m.packetLength)

					s.parseState = mysqlStateEatRows
				} else {
					_ /* catalog */, off, complete, err := readLstring(s.data, s.parseOffset)
					if !complete {
						return true, false
					}
					if err != nil {
						logp.Debug("mysql", "Error on read_lstring: %s", err)
						return false, false
					}
					db /*schema */, off, complete, err := readLstring(s.data, off)
					if !complete {
						return true, false
					}
					if err != nil {
						logp.Debug("mysql", "Error on read_lstring: %s", err)
						return false, false
					}
					table /* table */, _ /*off*/, complete, err := readLstring(s.data, off)
					if !complete {
						return true, false
					}
					if err != nil {
						logp.Debug("mysql", "Error on read_lstring: %s", err)
						return false, false
					}

					dbTable := string(db) + "." + string(table)

					if len(m.tables) == 0 {
						m.tables = dbTable
					} else if !strings.Contains(m.tables, dbTable) {
						m.tables = m.tables + ", " + dbTable
					}
					logp.Debug("mysqldetailed", "db=%s, table=%s", db, table)
					s.parseOffset += int(m.packetLength)
					// go to next field
				}
			} else {
				// wait for more
				return true, false
			}

		case mysqlStateEatRows:
			if len(s.data[s.parseOffset:]) < 4 {
				// wait for more
				return true, false
			}
			hdr := s.data[s.parseOffset : s.parseOffset+4]
			m.packetLength = leUint24(hdr[:3])
			m.seq = hdr[3]

			logp.Debug("mysqldetailed", "Rows: packet length %d, packet number %d", m.packetLength, m.seq)

			if len(s.data[s.parseOffset:]) < int(m.packetLength)+4 {
				// wait for more
				return true, false
			}

			s.parseOffset += 4 // header

			if s.data[s.parseOffset] == 0xfe {
				logp.Debug("mysqldetailed", "Received EOF packet")
				// EOF marker
				s.parseOffset += int(m.packetLength)

				if m.end == 0 {
					m.end = s.parseOffset
				} else {
					m.isTruncated = true
				}
				if !m.isError {
					// in case the response was sent successfully
					m.isOK = true
				}
				m.size = uint64(m.end - m.start)
				return true, true
			}

			s.parseOffset += int(m.packetLength)
			if m.end == 0 && s.parseOffset > maxPayloadSize {
				// only send up to here, but read until the end
				m.end = s.parseOffset
			}
			m.numberOfRows++
			// go to next row
		}
	}

	return true, false
}