packetbeat/protos/mongodb/mongodb_parser.go (442 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package mongodb import ( "encoding/json" "errors" "strings" "sync" "go.mongodb.org/mongo-driver/bson" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" ) var ( unknownOpcodes = map[opCode]struct{}{} mutex sync.Mutex ) func mongodbMessageParser(s *stream) (bool, bool) { d := newDecoder(s.data) length, err := d.readInt32() if err != nil { // Not even enough data to parse length of message return true, false } if int(length) > len(s.data) { // Not yet reached the end of message return true, false } // Tell decoder to only consider current message d.truncate(int(length)) // fill up the header common to all messages // see http://docs.mongodb.org/meta-driver/latest/legacy/mongodb-wire-protocol/#standard-message-header s.message.messageLength = length s.message.requestID, _ = d.readInt32() s.message.responseTo, _ = d.readInt32() code, _ := d.readInt32() opCode := opCode(code) if !validOpcode(opCode) { mutex.Lock() defer mutex.Unlock() if _, reported := unknownOpcodes[opCode]; !reported { logp.Err("Unknown operation code: %d (%v)", opCode, opCode) unknownOpcodes[opCode] = struct{}{} } return false, false } s.message.opCode = opCode s.message.isResponse = false // default is that the message is a request. If not opReplyParse will set this to true debugf("opCode = %d (%v)", s.message.opCode, s.message.opCode) // then split depending on operation type s.message.event = mapstr.M{} switch s.message.opCode { case opReply: s.message.isResponse = true return opReplyParse(d, s.message) case opMsgLegacy: s.message.method = "msg" return opMsgLegacyParse(d, s.message) case opUpdate: s.message.method = "update" return opUpdateParse(d, s.message) case opInsert: s.message.method = "insert" return opInsertParse(d, s.message) case opQuery: return opQueryParse(d, s.message) case opGetMore: s.message.method = "getMore" return opGetMoreParse(d, s.message) case opDelete: s.message.method = "delete" return opDeleteParse(d, s.message) case opKillCursor: s.message.method = "killCursors" return opKillCursorsParse(d, s.message) case opMsg: s.message.method = "msg" // The assumption is that the message with responseTo == 0 is the request // TODO: handle the cases where moreToCome flag is set (multiple responses chained by responseTo) if s.message.responseTo > 0 { s.message.isResponse = true } return opMsgParse(d, s.message) } return false, false } // see http://docs.mongodb.org/meta-driver/latest/legacy/mongodb-wire-protocol/#op-reply func opReplyParse(d *decoder, m *mongodbMessage) (bool, bool) { _, err := d.readInt32() // ignore flags for now if err != nil { logp.Err("An error occurred while parsing OP_REPLY message: %s", err) return false, false } m.event["cursorId"], err = d.readInt64() if err != nil { logp.Err("An error occurred while parsing OP_REPLY message: %s", err) return false, false } m.event["startingFrom"], err = d.readInt32() if err != nil { logp.Err("An error occurred while parsing OP_REPLY message: %s", err) return false, false } numberReturned, err := d.readInt32() if err != nil { logp.Err("An error occurred while parsing OP_REPLY message: %s", err) return false, false } m.event["numberReturned"] = numberReturned debugf("Prepare to read %d document from reply", m.event["numberReturned"]) documents := make([]interface{}, numberReturned) for i := int32(0); i < numberReturned; i++ { var document bson.M document, err = d.readDocument() if err != nil { logp.Err("An error occurred while parsing OP_REPLY message: %s", err) return false, false } // Check if the result is actually an error if i == 0 { if mongoError, present := document["$err"]; present { m.error, err = doc2str(mongoError) if err != nil { logp.Err("An error occurred while parsing OP_REPLY message: %s", err) return false, false } } if writeErrors, present := document["writeErrors"]; present { m.error, err = doc2str(writeErrors) if err != nil { logp.Err("An error occurred while parsing OP_REPLY message: %s", err) return false, false } } } documents[i] = document } m.documents = documents return true, true } func opMsgLegacyParse(d *decoder, m *mongodbMessage) (bool, bool) { var err error m.event["message"], err = d.readCStr() if err != nil { logp.Err("An error occurred while parsing OP_MSG message: %s", err) return false, false } return true, true } func opUpdateParse(d *decoder, m *mongodbMessage) (bool, bool) { _, err := d.readInt32() // always ZERO, a slot reserved in the protocol for future use if err != nil { logp.Err("An error occurred while parsing OP_UPDATE message: %s", err) return false, false } m.event["fullCollectionName"], err = d.readCStr() if err != nil { logp.Err("An error occurred while parsing OP_UPDATE message: %s", err) return false, false } _, err = d.readInt32() // ignore flags for now if err != nil { logp.Err("An error occurred while parsing OP_UPDATE message: %s", err) return false, false } m.event["selector"], err = d.readDocumentStr() if err != nil { logp.Err("An error occurred while parsing OP_UPDATE message: %s", err) return false, false } m.event["update"], err = d.readDocumentStr() if err != nil { logp.Err("An error occurred while parsing OP_UPDATE message: %s", err) return false, false } return true, true } func opInsertParse(d *decoder, m *mongodbMessage) (bool, bool) { _, err := d.readInt32() // ignore flags for now if err != nil { logp.Err("An error occurred while parsing OP_INSERT message: %s", err) return false, false } m.event["fullCollectionName"], err = d.readCStr() // TODO parse bson documents // Not too bad if it is not done, as all recent mongodb clients send insert as a command over a query instead of this // Find an old client to generate a pcap with legacy protocol ? if err != nil { logp.Err("An error occurred while parsing OP_INSERT message: %s", err) return false, false } return true, true } // Try to guess whether this key:value pair found in // the query represents a command. func isDatabaseCommand(key string, val interface{}) bool { nameExists := false for _, cmd := range databaseCommands { if strings.EqualFold(cmd, key) { nameExists = true break } } if !nameExists { return false } // value should be either a string or the value 1 _, ok := val.(string) num, _ := val.(float64) if ok || num == 1 { return true } return false } func opQueryParse(d *decoder, m *mongodbMessage) (bool, bool) { _, err := d.readInt32() // ignore flags for now if err != nil { logp.Err("An error occurred while parsing OP_QUERY message: %s", err) return false, false } fullCollectionName, err := d.readCStr() if err != nil { logp.Err("An error occurred while parsing OP_QUERY message: %s", err) return false, false } m.event["fullCollectionName"] = fullCollectionName m.event["numberToSkip"], err = d.readInt32() if err != nil { logp.Err("An error occurred while parsing OP_QUERY message: %s", err) return false, false } m.event["numberToReturn"], err = d.readInt32() if err != nil { logp.Err("An error occurred while parsing OP_QUERY message: %s", err) return false, false } query, err := d.readDocument() if d.i < len(d.in) { m.event["returnFieldsSelector"], err = d.readDocumentStr() } // Actual method is either a 'find' or a command passing through a query if strings.HasSuffix(fullCollectionName, ".$cmd") { m.method = "otherCommand" m.resource = fullCollectionName for key, val := range query { debugf("key=%v val=%s", key, val) if isDatabaseCommand(key, val) { debugf("is db command") col, ok := val.(string) if ok { // replace $cmd with the actual collection name m.resource = fullCollectionName[:len(fullCollectionName)-4] + col } delete(query, key) m.method = key } } } else { m.method = "find" m.resource = fullCollectionName } m.params = query if err != nil { logp.Err("An error occurred while parsing OP_QUERY message: %s", err) return false, false } return true, true } func opGetMoreParse(d *decoder, m *mongodbMessage) (bool, bool) { _, err := d.readInt32() // always ZERO, a slot reserved in the protocol for future use if err != nil { logp.Err("An error occurred while parsing OP_GET_MORE message: %s", err) return false, false } m.event["fullCollectionName"], err = d.readCStr() if err != nil { logp.Err("An error occurred while parsing OP_GET_MORE message: %s", err) return false, false } m.event["numberToReturn"], err = d.readInt32() if err != nil { logp.Err("An error occurred while parsing OP_GET_MORE message: %s", err) return false, false } m.event["cursorId"], err = d.readInt64() if err != nil { logp.Err("An error occurred while parsing OP_GET_MORE message: %s", err) return false, false } return true, true } func opDeleteParse(d *decoder, m *mongodbMessage) (bool, bool) { _, err := d.readInt32() // always ZERO, a slot reserved in the protocol for future use if err != nil { logp.Err("An error occurred while parsing OP_DELETE message: %s", err) return false, false } m.event["fullCollectionName"], err = d.readCStr() if err != nil { logp.Err("An error occurred while parsing OP_DELETE message: %s", err) return false, false } _, err = d.readInt32() // ignore flags for now if err != nil { logp.Err("An error occurred while parsing OP_DELETE message: %s", err) return false, false } m.event["selector"], err = d.readDocumentStr() if err != nil { logp.Err("An error occurred while parsing OP_DELETE message: %s", err) return false, false } return true, true } func opKillCursorsParse(d *decoder, m *mongodbMessage) (bool, bool) { // TODO ? Or not, content is not very interesting. return true, true } func opMsgParse(d *decoder, m *mongodbMessage) (bool, bool) { // ignore flagbits flagBits, err := d.readInt32() if err != nil { logp.Err("An error occurred while parsing OP_MSG message: %s", err) return false, false } m.SetFlagBits(flagBits) // read sections kind, err := d.readByte() if err != nil { logp.Err("An error occurred while parsing OP_MSG message: %s", err) return false, false } switch msgKind(kind) { case msgKindBody: document, err := d.readDocument() if err != nil { logp.Err("An error occurred while parsing OP_MSG message: %s", err) return false, false } m.documents = []interface{}{document} case msgKindDocumentSequence: start := d.i size, err := d.readInt32() if err != nil { logp.Err("An error occurred while parsing OP_MSG message: %s", err) return false, false } cstring, err := d.readCStr() if err != nil { logp.Err("An error occurred while parsing OP_MSG message: %s", err) return false, false } m.event["message"] = cstring var documents []interface{} for d.i < start+int(size) { document, err := d.readDocument() if err != nil { logp.Err("An error occurred while parsing OP_MSG message: %s", err) return false, false } documents = append(documents, document) } m.documents = documents case msgKindInternal: // Ignore the internal purposes section default: logp.Err("Unknown message kind: %v", kind) return false, false } return true, true } // NOTE: The following functions are inspired by the source of the go-mgo/mgo project // https://github.com/go-mgo/mgo/blob/v2/bson/decode.go type decoder struct { in []byte i int } func newDecoder(in []byte) *decoder { return &decoder{in, 0} } func (d *decoder) truncate(length int) { d.in = d.in[:length] } func (d *decoder) readCStr() (string, error) { start := d.i end := start l := len(d.in) for ; end != l; end++ { if d.in[end] == '\x00' { break } } d.i = end + 1 if d.i > l { return "", errors.New("cstring not finished") } return string(d.in[start:end]), nil } func (d *decoder) readByte() (byte, error) { i := d.i d.i++ if d.i > len(d.in) { return 0, errors.New("read byte failed") } return d.in[i], nil } func (d *decoder) readInt32() (int32, error) { b, err := d.readBytes(4) if err != nil { return 0, err } return int32((uint32(b[0]) << 0) | (uint32(b[1]) << 8) | (uint32(b[2]) << 16) | (uint32(b[3]) << 24)), nil } func (d *decoder) readInt64() (int64, error) { b, err := d.readBytes(8) if err != nil { return 0, err } return int64((uint64(b[0]) << 0) | (uint64(b[1]) << 8) | (uint64(b[2]) << 16) | (uint64(b[3]) << 24) | (uint64(b[4]) << 32) | (uint64(b[5]) << 40) | (uint64(b[6]) << 48) | (uint64(b[7]) << 56)), nil } func (d *decoder) readDocument() (bson.M, error) { start := d.i documentLength, err := d.readInt32() if err != nil { return nil, err } d.i = start + int(documentLength) if len(d.in) < d.i { return nil, errors.New("document out of bounds") } documentMap := bson.M{} debugf("Parse %d bytes document from remaining %d bytes", documentLength, len(d.in)-start) err = bson.Unmarshal(d.in[start:d.i], documentMap) if err != nil { debugf("Unmarshall error %v", err) return nil, err } return documentMap, err } func doc2str(documentMap interface{}) (string, error) { document, err := json.Marshal(documentMap) return string(document), err } func (d *decoder) readDocumentStr() (string, error) { documentMap, err := d.readDocument() if err != nil { return "", err } document, err := doc2str(documentMap) return document, err } func (d *decoder) readBytes(length int32) ([]byte, error) { start := d.i d.i += int(length) if d.i > len(d.in) { return *new([]byte), errors.New("no byte to read") } return d.in[start : start+int(length)], nil }