packetbeat/protos/mysql/mysql.go (995 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package mysql
import (
"encoding/binary"
"errors"
"fmt"
"strconv"
"strings"
"time"
"github.com/elastic/beats/v7/libbeat/common"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/beats/v7/packetbeat/pb"
"github.com/elastic/beats/v7/packetbeat/procs"
"github.com/elastic/beats/v7/packetbeat/protos"
"github.com/elastic/beats/v7/packetbeat/protos/tcp"
)
// Packet types
const (
mysqlCmdQuery = 3
mysqlCmdStmtPrepare = 22
mysqlCmdStmtExecute = 23
mysqlCmdStmtClose = 25
)
const maxPayloadSize = 100 * 1024
var (
unmatchedRequests = monitoring.NewInt(nil, "mysql.unmatched_requests")
unmatchedResponses = monitoring.NewInt(nil, "mysql.unmatched_responses")
)
type mysqlMessage struct {
start int
end int
ts time.Time
isRequest bool
packetLength uint32
seq uint8
typ uint8
numberOfRows int
numberOfFields int
size uint64
tables string
isOK bool
affectedRows uint64
insertID uint64
isError bool
errorCode uint16
errorInfo string
query string
ignoreMessage bool
direction uint8
isTruncated bool
tcpTuple common.TCPTuple
cmdlineTuple *common.ProcessTuple
raw []byte
notes []string
statementID int
numberOfParams int
}
type mysqlTransaction struct {
tuple common.TCPTuple
src common.Endpoint
dst common.Endpoint
ts time.Time
endTime time.Time
query string
method string
path string // for mysql, Path refers to the mysql table queried
bytesOut uint64
bytesIn uint64
notes []string
isError bool
mysql mapstr.M
requestRaw string
responseRaw string
statementID int // for prepare statement
params []string // for execute statement param
}
type mysqlStream struct {
data []byte
parseOffset int
parseState parseState
isClient bool
message *mysqlMessage
}
type parseState int
const (
mysqlStateStart parseState = iota
mysqlStateEatMessage
mysqlStateEatFields
mysqlStateEatRows
mysqlStateMax
)
var stateStrings = []string{
"Start",
"EatMessage",
"EatFields",
"EatRows",
}
func (state parseState) String() string {
return stateStrings[state]
}
type mysqlPlugin struct {
// config
ports []int
maxStoreRows int
maxRowLength int
sendRequest bool
sendResponse bool
transactions *common.Cache
transactionTimeout time.Duration
// prepare statements cache
prepareStatements *common.Cache
prepareStatementTimeout time.Duration
results protos.Reporter
watcher *procs.ProcessesWatcher
// function pointer for mocking
handleMysql func(mysql *mysqlPlugin, m *mysqlMessage, tcp *common.TCPTuple,
dir uint8, raw_msg []byte)
}
func init() {
protos.Register("mysql", New)
}
func New(
testMode bool,
results protos.Reporter,
watcher *procs.ProcessesWatcher,
cfg *conf.C,
) (protos.Plugin, error) {
p := &mysqlPlugin{}
config := defaultConfig
if !testMode {
if err := cfg.Unpack(&config); err != nil {
return nil, err
}
}
if err := p.init(results, watcher, &config); err != nil {
return nil, err
}
return p, nil
}
func (mysql *mysqlPlugin) init(results protos.Reporter, watcher *procs.ProcessesWatcher, config *mysqlConfig) error {
mysql.setFromConfig(config)
mysql.transactions = common.NewCache(
mysql.transactionTimeout,
protos.DefaultTransactionHashSize)
mysql.transactions.StartJanitor(mysql.transactionTimeout)
// prepare statements cache
mysql.prepareStatements = common.NewCache(
mysql.prepareStatementTimeout,
protos.DefaultTransactionHashSize)
mysql.prepareStatements.StartJanitor(mysql.prepareStatementTimeout)
mysql.handleMysql = handleMysql
mysql.results = results
mysql.watcher = watcher
return nil
}
func (mysql *mysqlPlugin) setFromConfig(config *mysqlConfig) {
mysql.ports = config.Ports
mysql.maxRowLength = config.MaxRowLength
mysql.maxStoreRows = config.MaxRows
mysql.sendRequest = config.SendRequest
mysql.sendResponse = config.SendResponse
mysql.transactionTimeout = config.TransactionTimeout
mysql.prepareStatementTimeout = config.StatementTimeout
}
func (mysql *mysqlPlugin) getTransaction(k common.HashableTCPTuple) *mysqlTransaction {
v := mysql.transactions.Get(k)
if v != nil {
return v.(*mysqlTransaction)
}
return nil
}
// cache the prepare statement info
type mysqlStmtData struct {
query string
numOfParameters int
nparamType []uint8
}
type mysqlStmtMap map[int]*mysqlStmtData
func (mysql *mysqlPlugin) getStmtsMap(k common.HashableTCPTuple) mysqlStmtMap {
v := mysql.prepareStatements.Get(k)
if v != nil {
return v.(mysqlStmtMap)
}
return nil
}
func (mysql *mysqlPlugin) GetPorts() []int {
return mysql.ports
}
func (stream *mysqlStream) prepareForNewMessage() {
stream.data = stream.data[stream.parseOffset:]
stream.parseState = mysqlStateStart
stream.parseOffset = 0
stream.message = nil
}
func (mysql *mysqlPlugin) isServerPort(port uint16) bool {
for _, sPort := range mysql.ports {
if uint16(sPort) == port {
return true
}
}
return false
}
func isRequest(typ uint8) bool {
if typ == mysqlCmdQuery || typ == mysqlCmdStmtPrepare ||
typ == mysqlCmdStmtExecute || typ == mysqlCmdStmtClose {
return true
}
return false
}
func mysqlMessageParser(s *mysqlStream) (bool, bool) {
logp.Debug("mysqldetailed", "MySQL parser called. parseState = %s", s.parseState)
m := s.message
for s.parseOffset < len(s.data) {
switch s.parseState {
case mysqlStateStart:
m.start = s.parseOffset
if len(s.data[s.parseOffset:]) < 5 {
logp.Warn("MySQL Message too short. Ignore it.")
return false, false
}
hdr := s.data[s.parseOffset : s.parseOffset+5]
m.packetLength = leUint24(hdr[0:3])
m.seq = hdr[3]
m.typ = hdr[4]
logp.Debug("mysqldetailed", "MySQL Header: Packet length %d, Seq %d, Type=%d isClient=%v", m.packetLength, m.seq, m.typ, s.isClient)
if s.isClient {
// starts Command Phase
if m.seq == 0 && isRequest(m.typ) {
// parse request
m.isRequest = true
m.start = s.parseOffset
s.parseState = mysqlStateEatMessage
} else {
// ignore command
m.ignoreMessage = true
s.parseState = mysqlStateEatMessage
}
} else if !s.isClient {
// parse response
m.isRequest = false
if hdr[4] == 0x00 || hdr[4] == 0xfe {
logp.Debug("mysqldetailed", "Received OK response")
m.start = s.parseOffset
s.parseState = mysqlStateEatMessage
m.isOK = true
} else if hdr[4] == 0xff {
logp.Debug("mysqldetailed", "Received ERR response")
m.start = s.parseOffset
s.parseState = mysqlStateEatMessage
m.isError = true
} else if m.packetLength == 1 {
logp.Debug("mysqldetailed", "Query response. Number of fields %d", hdr[4])
m.numberOfFields = int(hdr[4])
m.start = s.parseOffset
s.parseOffset += 5
s.parseState = mysqlStateEatFields
} else {
// something else. ignore
m.ignoreMessage = true
s.parseState = mysqlStateEatMessage
}
}
case mysqlStateEatMessage:
if len(s.data[s.parseOffset:]) < int(m.packetLength)+4 {
// wait for more data
return true, false
}
s.parseOffset += 4 // header
s.parseOffset += int(m.packetLength)
m.end = s.parseOffset
if m.isRequest {
// get the statement id
if m.typ == mysqlCmdStmtExecute || m.typ == mysqlCmdStmtClose {
m.statementID = int(binary.LittleEndian.Uint32(s.data[m.start+5:]))
} else {
m.query = string(s.data[m.start+5 : m.end])
}
} else if m.isOK {
// affected rows
affectedRows, off, complete, err := readLinteger(s.data, m.start+5)
if !complete {
return true, false
}
if err != nil {
logp.Debug("mysql", "Error on read_linteger: %s", err)
return false, false
}
m.affectedRows = affectedRows
// last insert id
insertID, _, complete, err := readLinteger(s.data, off)
if !complete {
return true, false
}
if err != nil {
logp.Debug("mysql", "Error on read_linteger: %s", err)
return false, false
}
m.insertID = insertID
} else if m.isError {
// int<1>header (0xff)
// int<2>error code
// string[1] sql state marker
// string[5] sql state
// string<EOF> error message
m.errorCode = binary.LittleEndian.Uint16(s.data[m.start+5 : m.start+7])
m.errorInfo = string(s.data[m.start+8:m.start+13]) + ": " + string(s.data[m.start+13:])
}
m.size = uint64(m.end - m.start)
logp.Debug("mysqldetailed", "Message complete. remaining=%d",
len(s.data[s.parseOffset:]))
// PREPARE_OK packet for Prepared Statement
// a trick for classify special OK packet
if m.isOK && m.packetLength == 12 {
m.statementID = int(binary.LittleEndian.Uint32(s.data[m.start+5:]))
m.numberOfFields = int(binary.LittleEndian.Uint16(s.data[m.start+9:]))
m.numberOfParams = int(binary.LittleEndian.Uint16(s.data[m.start+11:]))
if m.numberOfFields > 0 {
s.parseState = mysqlStateEatFields
} else if m.numberOfParams > 0 {
s.parseState = mysqlStateEatRows
}
} else {
return true, true
}
case mysqlStateEatFields:
if len(s.data[s.parseOffset:]) < 4 {
// wait for more
return true, false
}
hdr := s.data[s.parseOffset : s.parseOffset+4]
m.packetLength = leUint24(hdr[:3])
m.seq = hdr[3]
if len(s.data[s.parseOffset:]) >= int(m.packetLength)+4 {
s.parseOffset += 4 // header
if s.data[s.parseOffset] == 0xfe {
logp.Debug("mysqldetailed", "Received EOF packet")
// EOF marker
s.parseOffset += int(m.packetLength)
s.parseState = mysqlStateEatRows
} else {
_ /* catalog */, off, complete, err := readLstring(s.data, s.parseOffset)
if !complete {
return true, false
}
if err != nil {
logp.Debug("mysql", "Error on read_lstring: %s", err)
return false, false
}
db /*schema */, off, complete, err := readLstring(s.data, off)
if !complete {
return true, false
}
if err != nil {
logp.Debug("mysql", "Error on read_lstring: %s", err)
return false, false
}
table /* table */, _ /*off*/, complete, err := readLstring(s.data, off)
if !complete {
return true, false
}
if err != nil {
logp.Debug("mysql", "Error on read_lstring: %s", err)
return false, false
}
dbTable := string(db) + "." + string(table)
if len(m.tables) == 0 {
m.tables = dbTable
} else if !strings.Contains(m.tables, dbTable) {
m.tables = m.tables + ", " + dbTable
}
logp.Debug("mysqldetailed", "db=%s, table=%s", db, table)
s.parseOffset += int(m.packetLength)
// go to next field
}
} else {
// wait for more
return true, false
}
case mysqlStateEatRows:
if len(s.data[s.parseOffset:]) < 4 {
// wait for more
return true, false
}
hdr := s.data[s.parseOffset : s.parseOffset+4]
m.packetLength = leUint24(hdr[:3])
m.seq = hdr[3]
logp.Debug("mysqldetailed", "Rows: packet length %d, packet number %d", m.packetLength, m.seq)
if len(s.data[s.parseOffset:]) < int(m.packetLength)+4 {
// wait for more
return true, false
}
s.parseOffset += 4 // header
if s.data[s.parseOffset] == 0xfe {
logp.Debug("mysqldetailed", "Received EOF packet")
// EOF marker
s.parseOffset += int(m.packetLength)
if m.end == 0 {
m.end = s.parseOffset
} else {
m.isTruncated = true
}
if !m.isError {
// in case the response was sent successfully
m.isOK = true
}
m.size = uint64(m.end - m.start)
return true, true
}
s.parseOffset += int(m.packetLength)
if m.end == 0 && s.parseOffset > maxPayloadSize {
// only send up to here, but read until the end
m.end = s.parseOffset
}
m.numberOfRows++
// go to next row
}
}
return true, false
}
// messageGap is called when a gap of size `nbytes` is found in the
// tcp stream. Returns true if there is already enough data in the message
// read so far that we can use it further in the stack.
func (mysql *mysqlPlugin) messageGap(s *mysqlStream, nbytes int) (complete bool) {
m := s.message
switch s.parseState {
case mysqlStateStart, mysqlStateEatMessage:
// not enough data yet to be useful
return false
case mysqlStateEatFields, mysqlStateEatRows:
// enough data here
m.end = s.parseOffset
if m.isRequest {
m.notes = append(m.notes, "Packet loss while capturing the request")
} else {
m.notes = append(m.notes, "Packet loss while capturing the response")
}
return true
}
return true
}
type mysqlPrivateData struct {
data [2]*mysqlStream
}
// Called when the parser has identified a full message.
func (mysql *mysqlPlugin) messageComplete(tcptuple *common.TCPTuple, dir uint8, stream *mysqlStream) {
// all ok, ship it
msg := stream.data[stream.message.start:stream.message.end]
if !stream.message.ignoreMessage {
mysql.handleMysql(mysql, stream.message, tcptuple, dir, msg)
}
// and reset message
stream.prepareForNewMessage()
}
func (mysql *mysqlPlugin) ConnectionTimeout() time.Duration {
return mysql.transactionTimeout
}
func (mysql *mysqlPlugin) Parse(pkt *protos.Packet, tcptuple *common.TCPTuple,
dir uint8, private protos.ProtocolData,
) protos.ProtocolData {
priv := mysqlPrivateData{}
if private != nil {
var ok bool
priv, ok = private.(mysqlPrivateData)
if !ok {
priv = mysqlPrivateData{}
}
}
if priv.data[dir] == nil {
dstPort := tcptuple.DstPort
if dir == tcp.TCPDirectionReverse {
dstPort = tcptuple.SrcPort
}
priv.data[dir] = &mysqlStream{
data: pkt.Payload,
message: &mysqlMessage{ts: pkt.Ts},
isClient: mysql.isServerPort(dstPort),
}
} else {
// concatenate bytes
priv.data[dir].data = append(priv.data[dir].data, pkt.Payload...)
if len(priv.data[dir].data) > tcp.TCPMaxDataInStream {
logp.Debug("mysql", "Stream data too large, dropping TCP stream")
priv.data[dir] = nil
return priv
}
}
stream := priv.data[dir]
for len(stream.data) > 0 {
if stream.message == nil {
stream.message = &mysqlMessage{ts: pkt.Ts}
}
ok, complete := mysqlMessageParser(priv.data[dir])
logp.Debug("mysqldetailed", "mysqlMessageParser returned ok=%v complete=%v", ok, complete)
if !ok {
// drop this tcp stream. Will retry parsing with the next
// segment in it
priv.data[dir] = nil
logp.Debug("mysql", "Ignore MySQL message. Drop tcp stream. Try parsing with the next segment")
return priv
}
if complete {
mysql.messageComplete(tcptuple, dir, stream)
} else {
// wait for more data
break
}
}
return priv
}
func (mysql *mysqlPlugin) GapInStream(tcptuple *common.TCPTuple, dir uint8,
nbytes int, private protos.ProtocolData) (priv protos.ProtocolData, drop bool,
) {
if private == nil {
return private, false
}
mysqlData, ok := private.(mysqlPrivateData)
if !ok {
return private, false
}
stream := mysqlData.data[dir]
if stream == nil || stream.message == nil {
// nothing to do
return private, false
}
if mysql.messageGap(stream, nbytes) {
// we need to publish from here
mysql.messageComplete(tcptuple, dir, stream)
}
// we always drop the TCP stream. Because it's binary and len based,
// there are too few cases in which we could recover the stream (maybe
// for very large blobs, leaving that as TODO)
return private, true
}
func (mysql *mysqlPlugin) ReceivedFin(tcptuple *common.TCPTuple, dir uint8,
private protos.ProtocolData,
) protos.ProtocolData {
// TODO: check if we have data pending and either drop it to free
// memory or send it up the stack.
return private
}
func handleMysql(mysql *mysqlPlugin, m *mysqlMessage, tcptuple *common.TCPTuple,
dir uint8, rawMsg []byte,
) {
m.tcpTuple = *tcptuple
m.direction = dir
m.cmdlineTuple = mysql.watcher.FindProcessesTupleTCP(tcptuple.IPPort())
m.raw = rawMsg
if m.isRequest {
mysql.receivedMysqlRequest(m)
} else {
mysql.receivedMysqlResponse(m)
}
}
func (mysql *mysqlPlugin) receivedMysqlRequest(msg *mysqlMessage) {
tuple := msg.tcpTuple
trans := mysql.getTransaction(tuple.Hashable())
if trans != nil {
if trans.mysql != nil {
logp.Debug("mysql", "Two requests without a Response. Dropping old request: %s", trans.mysql)
unmatchedRequests.Add(1)
}
} else {
trans = &mysqlTransaction{tuple: tuple}
mysql.transactions.Put(tuple.Hashable(), trans)
}
trans.ts = msg.ts
trans.src, trans.dst = common.MakeEndpointPair(msg.tcpTuple.BaseTuple, msg.cmdlineTuple)
if msg.direction == tcp.TCPDirectionReverse {
trans.src, trans.dst = trans.dst, trans.src
}
// try to get query string for Execute statement from cache
// and delete statement id for Close statement from cache
if msg.statementID != 0 {
trans.statementID = msg.statementID
stmts := mysql.getStmtsMap(msg.tcpTuple.Hashable())
if stmts == nil {
switch msg.typ {
case mysqlCmdStmtExecute:
trans.query = "Request Execute Statement"
case mysqlCmdStmtClose:
trans.query = "Request Close Statement"
}
trans.notes = append(trans.notes, "The actual query being used is unknown")
trans.requestRaw = msg.query
trans.bytesIn = msg.size
return
}
switch msg.typ {
case mysqlCmdStmtExecute:
if value, ok := stmts[trans.statementID]; ok {
trans.query = value.query
// parse parameters
trans.params = mysql.parseMysqlExecuteStatement(msg.raw, value)
} else {
trans.query = "Request Execute Statement"
trans.notes = append(trans.notes, "The actual query being used is unknown")
trans.requestRaw = msg.query
trans.bytesIn = msg.size
return
}
case mysqlCmdStmtClose:
delete(stmts, trans.statementID)
trans.query = "CmdStmtClose"
mysql.transactions.Delete(tuple.Hashable())
}
} else {
trans.query = msg.query
}
// Extract the method, by simply taking the first word and
// making it upper case.
query := strings.Trim(trans.query, " \r\n\t")
index := strings.IndexAny(query, " \r\n\t")
var method string
if index > 0 {
method = strings.ToUpper(query[:index])
} else {
method = strings.ToUpper(query)
}
trans.query = query
trans.method = method
trans.mysql = mapstr.M{}
trans.notes = msg.notes
// save Raw message
trans.requestRaw = msg.query
trans.bytesIn = msg.size
}
func (mysql *mysqlPlugin) receivedMysqlResponse(msg *mysqlMessage) {
trans := mysql.getTransaction(msg.tcpTuple.Hashable())
if trans == nil {
logp.Debug("mysql", "Response from unknown transaction. Ignoring.")
unmatchedResponses.Add(1)
return
}
// check if the request was received
if trans.mysql == nil {
logp.Debug("mysql", "Response from unknown transaction. Ignoring.")
unmatchedResponses.Add(1)
return
}
// save json details
trans.mysql.Update(mapstr.M{
"affected_rows": msg.affectedRows,
"insert_id": msg.insertID,
"num_rows": msg.numberOfRows,
"num_fields": msg.numberOfFields,
})
trans.isError = msg.isError
if trans.isError {
trans.mysql["error_code"] = msg.errorCode
trans.mysql["error_message"] = msg.errorInfo
}
if msg.statementID != 0 {
// cache prepare statement response info
stmts := mysql.getStmtsMap(msg.tcpTuple.Hashable())
if stmts == nil {
stmts = mysqlStmtMap{}
}
if stmts[msg.statementID] == nil {
stmtData := &mysqlStmtData{
query: trans.query,
numOfParameters: msg.numberOfParams,
}
stmts[msg.statementID] = stmtData
}
mysql.prepareStatements.Put(msg.tcpTuple.Hashable(), stmts)
trans.notes = append(trans.notes, trans.query)
trans.query = "Request Prepare Statement"
}
trans.bytesOut = msg.size
trans.path = msg.tables
trans.endTime = msg.ts
// save Raw message
if len(msg.raw) > 0 {
fields, rows := mysql.parseMysqlResponse(msg.raw)
trans.responseRaw = common.DumpInCSVFormat(fields, rows)
}
trans.notes = append(trans.notes, msg.notes...)
mysql.publishTransaction(trans)
mysql.transactions.Delete(trans.tuple.Hashable())
logp.Debug("mysql", "Mysql transaction completed: %s %s %s", trans.query, trans.params, trans.mysql)
}
func (mysql *mysqlPlugin) parseMysqlExecuteStatement(data []byte, stmtdata *mysqlStmtData) []string {
dataLen := len(data)
if dataLen < 14 {
logp.Debug("mysql", "Data too small")
return nil
}
var paramType, paramUnsigned uint8
nparamType := []uint8{}
paramString := []string{}
nparam := stmtdata.numOfParameters
offset := 0
// mysql hdr
offset += 4
// cmd type
offset++
// stmt id
offset += 4
// flags
offset++
// iterations
offset += 4
// null-bitmap
if nparam > 0 {
offset += (nparam + 7) / 8
} else {
return nil
}
// stmt bound
if dataLen <= offset {
logp.Debug("mysql", "Data too small")
return nil
}
stmtBound := data[offset]
offset++
paramOffset := offset
if stmtBound == 1 {
paramOffset += nparam * 2
if dataLen <= paramOffset {
logp.Debug("mysql", "Data too small to contain parameters")
return nil
}
// First call or rebound (1)
for stmtPos := 0; stmtPos < nparam; stmtPos++ {
paramType = uint8(data[offset])
offset++
nparamType = append(nparamType, paramType)
logp.Debug("mysqldetailed", "type = %d", paramType)
paramUnsigned = uint8(data[offset])
offset++
if paramUnsigned != 0 {
logp.Debug("mysql", "Illegal param unsigned")
return nil
}
}
// Save param type info
stmtdata.nparamType = nparamType
} else {
// Subsequent call (0)
if len(stmtdata.nparamType) > 0 {
// get saved param type info
nparamType = stmtdata.nparamType
} else {
return nil
}
}
for stmtPos := 0; stmtPos < nparam; stmtPos++ {
paramType = nparamType[stmtPos]
// dissect parameter on paramType
switch paramType {
// FIELD_TYPE_TINY
case 0x01:
valueString := strconv.Itoa(int(data[paramOffset]))
paramString = append(paramString, valueString)
paramOffset++
// FIELD_TYPE_SHORT
case 0x02:
if dataLen < paramOffset+2 {
logp.Debug("mysql", "Data too small")
return nil
}
valueString := strconv.Itoa(int(binary.LittleEndian.Uint16(data[paramOffset:])))
paramString = append(paramString, valueString)
paramOffset += 2
// FIELD_TYPE_LONG
case 0x03:
if dataLen < paramOffset+4 {
logp.Debug("mysql", "Data too small")
return nil
}
valueString := strconv.Itoa(int(binary.LittleEndian.Uint32(data[paramOffset:])))
paramString = append(paramString, valueString)
paramOffset += 4
// FIELD_TYPE_FLOAT
case 0x04:
paramString = append(paramString, "TYPE_FLOAT")
paramOffset += 4
// FIELD_TYPE_DOUBLE
case 0x05:
paramString = append(paramString, "TYPE_DOUBLE")
paramOffset += 4
// FIELD_TYPE_NULL
case 0x06:
paramString = append(paramString, "TYPE_NULL")
// FIELD_TYPE_LONGLONG
case 0x08:
if dataLen < paramOffset+8 {
logp.Debug("mysql", "Data too small")
return nil
}
valueString := strconv.FormatInt(int64(binary.LittleEndian.Uint64(data[paramOffset:paramOffset+8])), 10)
paramString = append(paramString, valueString)
paramOffset += 8
// FIELD_TYPE_TIMESTAMP
// FIELD_TYPE_DATETIME
// FIELD_TYPE_DATE
case 0x07, 0x0c, 0x0a:
var year, month, day, hour, minute, second string
paramLen := int(data[paramOffset])
if dataLen < paramOffset+paramLen+1 {
logp.Debug("mysql", "Data too small")
return nil
}
paramOffset++
if paramLen >= 2 {
year = strconv.Itoa(int(binary.LittleEndian.Uint16(data[paramOffset:])))
}
if paramLen >= 4 {
month = strconv.Itoa(int(data[paramOffset+2]))
day = strconv.Itoa(int(data[paramOffset+3]))
}
if paramLen >= 7 {
hour = strconv.Itoa(int(data[paramOffset+4]))
minute = strconv.Itoa(int(data[paramOffset+5]))
second = strconv.Itoa(int(data[paramOffset+6]))
}
// If paramLen is greater or equal to 11
// then nanoseconds are also available.
// We do not handle them.
datetime := year + "/" + month + "/" + day + " " + hour + ":" + minute + ":" + second
paramString = append(paramString, datetime)
paramOffset += paramLen
// FIELD_TYPE_TIME
case 0x0b:
paramLen := int(data[paramOffset])
if dataLen < paramOffset+paramLen+1 {
logp.Debug("mysql", "Data too small")
return nil
}
paramOffset++
paramString = append(paramString, "TYPE_TIME")
paramOffset += paramLen
// FIELD_TYPE_VAR_STRING
// FIELD_TYPE_BLOB
// FIELD_TYPE_STRING
case 0xf6, 0xfc, 0xfd, 0xfe:
paramLen := int(data[paramOffset])
paramOffset++
switch paramLen {
case 0xfc: /* 252 - 64k chars */
paramLen16 := int(binary.LittleEndian.Uint16(data[paramOffset : paramOffset+2]))
if dataLen < paramOffset+paramLen16+2 {
logp.Debug("mysql", "Data too small")
return nil
}
paramOffset += 2
paramString = append(paramString, string(data[paramOffset:paramOffset+paramLen16]))
paramOffset += paramLen16
case 0xfd: /* 64k - 16M chars */
paramLen24 := int(leUint24(data[paramOffset : paramOffset+3]))
if dataLen < paramOffset+paramLen24+3 {
logp.Debug("mysql", "Data too small")
return nil
}
paramOffset += 3
paramString = append(paramString, string(data[paramOffset:paramOffset+paramLen24]))
paramOffset += paramLen24
default: /* < 252 chars */
if dataLen < paramOffset+paramLen {
logp.Debug("mysql", "Data too small")
return nil
}
paramString = append(paramString, string(data[paramOffset:paramOffset+paramLen]))
paramOffset += paramLen
}
default:
logp.Debug("mysql", "Unknown param type")
return nil
}
}
return paramString
}
func (mysql *mysqlPlugin) parseMysqlResponse(data []byte) ([]string, [][]string) {
length, err := readLength(data, 0)
if err != nil {
logp.Warn("Invalid response: %v", err)
return []string{}, [][]string{}
}
if length < 1 {
logp.Warn("Warning: Skipping empty Response")
return []string{}, [][]string{}
}
if len(data) < 5 {
logp.Warn("Invalid response: data less than 5 bytes")
return []string{}, [][]string{}
}
fields := []string{}
rows := [][]string{}
switch data[4] {
case 0x00:
// OK response
case 0xff:
// Error response
default:
offset := 5
logp.Debug("mysql", "Data len: %d", len(data))
// Read fields
for {
length, err = readLength(data, offset)
if err != nil {
logp.Warn("Invalid response: %v", err)
return []string{}, [][]string{}
}
if len(data[offset:]) < 5 {
logp.Warn("Invalid response.")
return []string{}, [][]string{}
}
if data[offset+4] == 0xfe {
// EOF
offset += length + 4
break
}
_ /* catalog */, off, complete, err := readLstring(data, offset+4)
if err != nil || !complete {
logp.Debug("mysql", "Reading field: %v %v", err, complete)
return fields, rows
}
_ /*database*/, off, complete, err = readLstring(data, off)
if err != nil || !complete {
logp.Debug("mysql", "Reading field: %v %v", err, complete)
return fields, rows
}
_ /*table*/, off, complete, err = readLstring(data, off)
if err != nil || !complete {
logp.Debug("mysql", "Reading field: %v %v", err, complete)
return fields, rows
}
_ /*org table*/, off, complete, err = readLstring(data, off)
if err != nil || !complete {
logp.Debug("mysql", "Reading field: %v %v", err, complete)
return fields, rows
}
name, off, complete, err := readLstring(data, off)
if err != nil || !complete {
logp.Debug("mysql", "Reading field: %v %v", err, complete)
return fields, rows
}
_ /* org name */, _ /*off*/, complete, err = readLstring(data, off)
if err != nil || !complete {
logp.Debug("mysql", "Reading field: %v %v", err, complete)
return fields, rows
}
fields = append(fields, string(name))
offset += length + 4
if len(data) < offset {
logp.Warn("Invalid response.")
return []string{}, [][]string{}
}
}
// Read rows
for offset < len(data) {
var row []string
var rowLen int
if len(data[offset:]) < 5 {
logp.Warn("Invalid response.")
break
}
if data[offset+4] == 0xfe {
// EOF
offset += length + 4 // ineffassign
break
}
length, err = readLength(data, offset)
if err != nil {
logp.Warn("Invalid response: %v", err)
break
}
off := offset + 4 // skip length + packet number
start := off
for off < start+length {
var text []byte
if data[off] == 0xfb {
text = []byte("NULL")
off++
} else {
var err error
var complete bool
text, off, complete, err = readLstring(data, off)
if err != nil || !complete {
logp.Debug("mysql", "Error parsing rows: %+v %t", err, complete)
// nevertheless, return what we have so far
return fields, rows
}
}
if rowLen < mysql.maxRowLength {
if rowLen+len(text) > mysql.maxRowLength {
text = text[:mysql.maxRowLength-rowLen]
}
row = append(row, string(text))
rowLen += len(text)
}
}
logp.Debug("mysqldetailed", "Append row: %v", row)
rows = append(rows, row)
if len(rows) >= mysql.maxStoreRows {
break
}
offset += length + 4
}
}
return fields, rows
}
func (mysql *mysqlPlugin) publishTransaction(t *mysqlTransaction) {
if mysql.results == nil {
return
}
logp.Debug("mysql", "mysql.results exists")
evt, pbf := pb.NewBeatEvent(t.ts)
pbf.SetSource(&t.src)
pbf.AddIP(t.src.IP)
pbf.SetDestination(&t.dst)
pbf.AddIP(t.dst.IP)
pbf.Source.Bytes = int64(t.bytesIn)
pbf.Destination.Bytes = int64(t.bytesOut)
pbf.Event.Dataset = "mysql"
pbf.Event.Start = t.ts
pbf.Event.End = t.endTime
pbf.Network.Transport = "tcp"
pbf.Network.Protocol = "mysql"
pbf.Error.Message = t.notes
fields := evt.Fields
fields["type"] = pbf.Event.Dataset
fields["method"] = t.method
fields["query"] = t.query
fields["mysql"] = t.mysql
if len(t.path) > 0 {
fields["path"] = t.path
}
if len(t.params) > 0 {
fields["params"] = t.params
}
if t.isError {
fields["status"] = common.ERROR_STATUS
} else {
fields["status"] = common.OK_STATUS
}
if mysql.sendRequest {
fields["request"] = t.requestRaw
}
if mysql.sendResponse {
fields["response"] = t.responseRaw
}
mysql.results(evt)
}
func readLstring(data []byte, offset int) ([]byte, int, bool, error) {
length, off, complete, err := readLinteger(data, offset)
if err != nil {
return nil, 0, false, err
}
if !complete || len(data[off:]) < int(length) {
return nil, 0, false, nil
}
return data[off : off+int(length)], off + int(length), true, nil
}
func leUint24(data []byte) uint32 {
return uint32(data[0]) | uint32(data[1])<<8 | uint32(data[2])<<16
}
func readLinteger(data []byte, offset int) (uint64, int, bool, error) {
if len(data) < offset+1 {
return 0, 0, false, nil
}
switch data[offset] {
case 0xfe:
if len(data[offset+1:]) < 8 {
return 0, 0, false, nil
}
return binary.LittleEndian.Uint64(data[offset+1:]),
offset + 9, true, nil
case 0xfd:
if len(data[offset+1:]) < 3 {
return 0, 0, false, nil
}
return uint64(leUint24(data[offset+1 : offset+4])), offset + 4, true, nil
case 0xfc:
if len(data[offset+1:]) < 2 {
return 0, 0, false, nil
}
return uint64(binary.LittleEndian.Uint16(data[offset+1:])), offset + 3, true, nil
}
if uint64(data[offset]) >= 0xfb {
return 0, 0, false, fmt.Errorf("unexpected value in read_linteger")
}
return uint64(data[offset]), offset + 1, true, nil
}
// Read a mysql length field (3 bytes LE)
func readLength(data []byte, offset int) (int, error) {
if len(data[offset:]) < 3 {
return 0, errors.New("data too small to contain a valid length")
}
return int(leUint24(data[offset : offset+3])), nil
}