skywalking/plugins/sw_pymongo.py (97 lines of code) (raw):

# # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # from skywalking import Layer, Component, config from skywalking.trace.context import get_context from skywalking.trace.tags import TagDbType, TagDbInstance, TagDbStatement link_vector = ['https://pymongo.readthedocs.io'] support_matrix = { 'pymongo': { '>=3.7': ['3.11.*'] # TODO: "3.12" incompatible with all python versions, need investigation } } note = """""" def install(): from pymongo.bulk import _Bulk from pymongo.cursor import Cursor from pymongo.pool import SocketInfo bulk_op_map = { 0: 'insert', 1: 'update', 2: 'delete' } # handle insert_many and bulk write inject_bulk_write(_Bulk, bulk_op_map) # handle find() & find_one() inject_cursor(Cursor) # handle other commands inject_socket_info(SocketInfo) def inject_socket_info(SocketInfo): # noqa _command = SocketInfo.command def _sw_command(this: SocketInfo, dbname, spec, *args, **kwargs): # pymongo sends `ismaster` command continuously. ignore it. if spec.get('ismaster') is None: address = this.sock.getpeername() peer = f'{address[0]}:{address[1]}' context = get_context() operation = list(spec.keys())[0] sw_op = f'{operation.capitalize()}Operation' with context.new_exit_span(op=f'MongoDB/{sw_op}', peer=peer, component=Component.MongoDB) as span: result = _command(this, dbname, spec, *args, **kwargs) span.layer = Layer.Database span.tag(TagDbType('MongoDB')) span.tag(TagDbInstance(dbname)) if config.plugin_pymongo_trace_parameters: # get filters filters = _get_filter(operation, spec) max_len = config.plugin_pymongo_parameters_max_length filters = f'{filters[0:max_len]}...' if len(filters) > max_len else filters span.tag(TagDbStatement(filters)) else: result = _command(this, dbname, spec, *args, **kwargs) return result SocketInfo.command = _sw_command def _get_filter(request_type, spec): """ :param request_type: the request param send to MongoDB :param spec: maybe a bson.SON class or a dict :return: filter string """ from bson import SON if isinstance(spec, SON): spec = spec.to_dict() spec.pop(request_type) elif isinstance(spec, dict): spec = dict(spec) spec.pop(request_type) return f'{request_type} {str(spec)}' def inject_bulk_write(_Bulk, bulk_op_map): # noqa _execute = _Bulk.execute def _sw_execute(this: _Bulk, *args, **kwargs): nodes = this.collection.database.client.nodes peer = ','.join([f'{address[0]}:{address[1]}' for address in nodes]) context = get_context() sw_op = 'MixedBulkWriteOperation' with context.new_exit_span(op=f'MongoDB/{sw_op}', peer=peer, component=Component.MongoDB) as span: span.layer = Layer.Database bulk_result = _execute(this, *args, **kwargs) span.tag(TagDbType('MongoDB')) span.tag(TagDbInstance(this.collection.database.name)) if config.plugin_pymongo_trace_parameters: filters = '' bulk_ops = this.ops for bulk_op in bulk_ops: opname = bulk_op_map.get(bulk_op[0]) _filter = f'{opname} {str(bulk_op[1])}' filters = f'{filters + _filter} ' max_len = config.plugin_pymongo_parameters_max_length filters = f'{filters[0:max_len]}...' if len(filters) > max_len else filters span.tag(TagDbStatement(filters)) return bulk_result _Bulk.execute = _sw_execute def inject_cursor(Cursor): # noqa __send_message = Cursor._Cursor__send_message def _sw_send_message(this: Cursor, operation): nodes = this.collection.database.client.nodes peer = ','.join([f'{address[0]}:{address[1]}' for address in nodes]) context = get_context() op = 'FindOperation' with context.new_exit_span(op=f'MongoDB/{op}', peer=peer, component=Component.MongoDB) as span: span.layer = Layer.Database # __send_message return nothing __send_message(this, operation) span.tag(TagDbType('MongoDB')) span.tag(TagDbInstance(this.collection.database.name)) if config.plugin_pymongo_trace_parameters: filters = f'find {str(operation.spec)}' max_len = config.plugin_pymongo_parameters_max_length filters = f'{filters[0:max_len]}...' if len(filters) > max_len else filters span.tag(TagDbStatement(filters)) return Cursor._Cursor__send_message = _sw_send_message