packetbeat/protos/mongodb/mongodb_structs.go (252 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package mongodb // Represent a mongodb message being parsed import ( "fmt" "time" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/elastic-agent-libs/mapstr" ) type mongodbMessage struct { ts time.Time tcpTuple common.TCPTuple cmdlineTuple *common.ProcessTuple direction uint8 isResponse bool // Standard message header fields from mongodb wire protocol // see http://docs.mongodb.org/meta-driver/latest/legacy/mongodb-wire-protocol/#standard-message-header messageLength int32 requestID int32 responseTo int32 opCode opCode // decoded flagBits checkSumPresent bool moreToCome bool exhaustAllowed bool // deduced from content. Either an operation from the original wire protocol or the name of a command (passed through a query) // List of commands: http://docs.mongodb.org/manual/reference/command/ // List of original protocol operations: http://docs.mongodb.org/meta-driver/latest/legacy/mongodb-wire-protocol/#request-opcodes method string error string resource string documents []interface{} params map[string]interface{} // Other fields vary very much depending on operation type // lets just put them in a map event mapstr.M } func (m *mongodbMessage) SetFlagBits(flagBits int32) { m.checkSumPresent = flagBits&0x1 != 0 // 0 bit m.moreToCome = flagBits&0x2 != 0 // 1 bit m.exhaustAllowed = flagBits&0x10000 != 0 // 16 bit } // Represent a stream being parsed that contains a mongodb message type stream struct { tcptuple *common.TCPTuple data []byte message *mongodbMessage } // Parser moves to next message in stream func (st *stream) PrepareForNewMessage() { st.data = st.data[st.message.messageLength:] st.message = nil } // The private data of a parser instance // is composed of 2 potentially active streams: incoming, outgoing type mongodbConnectionData struct { streams [2]*stream } // Represent a full mongodb transaction (request/reply) // These transactions are the end product of this parser type transaction struct { cmdline *common.ProcessTuple src common.Endpoint dst common.Endpoint ts time.Time endTime time.Time bytesOut int bytesIn int mongodb mapstr.M event mapstr.M method string resource string error string params map[string]interface{} requestDocuments []interface{} documents []interface{} } type msgKind byte const ( msgKindBody msgKind = 0 msgKindDocumentSequence msgKind = 1 msgKindInternal msgKind = 2 ) type opCode int32 const ( opReply opCode = 1 opMsgLegacy opCode = 1000 opUpdate opCode = 2001 opInsert opCode = 2002 opReserved opCode = 2003 opQuery opCode = 2004 opGetMore opCode = 2005 opDelete opCode = 2006 opKillCursor opCode = 2007 opMsg opCode = 2013 ) // List of valid mongodb wire protocol operation codes // see http://docs.mongodb.org/meta-driver/latest/legacy/mongodb-wire-protocol/#request-opcodes var opCodeNames = map[opCode]string{ 1: "OP_REPLY", 1000: "OP_MSG", 2001: "OP_UPDATE", 2002: "OP_INSERT", 2003: "RESERVED", 2004: "OP_QUERY", 2005: "OP_GET_MORE", 2006: "OP_DELETE", 2007: "OP_KILL_CURSORS", 2013: "OP_MSG", } func validOpcode(o opCode) bool { _, found := opCodeNames[o] return found } func (o opCode) String() string { if name, found := opCodeNames[o]; found { return name } return fmt.Sprintf("(value=%d)", int32(o)) } func awaitsReply(msg *mongodbMessage) bool { opCode := msg.opCode // The request of opMsg type doesn't get response if moreToCome is set // From documentation: https://www.mongodb.com/docs/manual/reference/mongodb-wire-protocol // "Requests with the moreToCome bit set will not receive a reply" if !msg.isResponse && opCode == opMsg && !msg.moreToCome { return true } return opCode == opQuery || opCode == opGetMore } // List of mongodb user commands (send through a query of the legacy protocol) // see http://docs.mongodb.org/manual/reference/command/ // // This list was obtained by calling db.listCommands() and some grepping. // They are compared cased insensitive var databaseCommands = []string{ "getLastError", "connPoolSync", "top", "dropIndexes", "explain", "grantRolesToRole", "dropRole", "dropAllRolesFromDatabase", "listCommands", "replSetReconfig", "replSetFresh", "writebacklisten", "setParameter", "update", "replSetGetStatus", "find", "resync", "appendOplogNote", "revokeRolesFromRole", "compact", "createUser", "replSetElect", "getPrevError", "serverStatus", "getShardVersion", "updateRole", "replSetFreeze", "getCmdLineOpts", "applyOps", "count", "aggregate", "copydbsaslstart", "distinct", "repairDatabase", "profile", "replSetStepDown", "findAndModify", "_transferMods", "filemd5", "forceerror", "getnonce", "saslContinue", "clone", "saslStart", "_getUserCacheGeneration", "_recvChunkCommit", "whatsmyuri", "repairCursor", "validate", "dbHash", "planCacheListFilters", "touch", "mergeChunks", "cursorInfo", "_recvChunkStart", "unsetSharding", "revokePrivilegesFromRole", "logout", "group", "shardConnPoolStats", "listDatabases", "buildInfo", "availableQueryOptions", "_isSelf", "splitVector", "geoSearch", "dbStats", "connectionStatus", "currentOpCtx", "copydb", "insert", "reIndex", "moveChunk", "cleanupOrphaned", "driverOIDTest", "isMaster", "getParameter", "replSetHeartbeat", "ping", "listIndexes", "dropUser", "dropDatabase", "dataSize", "convertToCapped", "planCacheSetFilter", "usersInfo", "grantPrivilegesToRole", "handshake", "_mergeAuthzCollections", "mapreduce.shardedfinish", "_recvChunkAbort", "authSchemaUpgrade", "replSetGetConfig", "replSetSyncFrom", "collStats", "replSetMaintenance", "createRole", "copydbgetnonce", "cloneCollectionAsCapped", "_migrateClone", "parallelCollectionScan", "connPoolStats", "revokeRolesFromUser", "authenticate", "create", "shutdown", "invalidateUserCache", "shardingState", "renameCollection", "replSetGetRBID", "splitChunk", "createIndexes", "updateUser", "cloneCollection", "logRotate", "planCacheListPlans", "medianKey", "hostInfo", "geoNear", "fsync", "checkShardingIndex", "getShardMap", "planCacheClear", "listCollections", "collMod", "_recvChunkStatus", "planCacheListQueryShapes", "delete", "planCacheClearFilters", "mapReduce", "rolesInfo", "eval", "drop", "grantRolesToUser", "resetError", "getLog", "dropAllUsersFromDatabase", "diagLogging", "replSetUpdatePosition", "setShardVersion", "replSetInitiate", }