skywalking/plugins/sw_neo4j.py (65 lines of code) (raw):
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import json
from skywalking import Layer, Component, config
from skywalking.trace.context import get_context
from skywalking.trace.tags import TagDbType, TagDbInstance, TagDbStatement, TagDbSqlParameters
link_vector = ['https://neo4j.com/docs/python-manual/5/']
support_matrix = {
'neo4j': {
'>=3.7': ['5.*'],
}
}
note = """The Neo4j plugin integrates neo4j python driver 5.x.x versions which
support both Neo4j 5 and 4.4 DBMS."""
def install():
from neo4j import AsyncSession, Session
from neo4j._sync.work.transaction import TransactionBase
from neo4j._async.work.transaction import AsyncTransactionBase
_session_run = Session.run
_async_session_run = AsyncSession.run
_transaction_run = TransactionBase.run
_async_transaction_run = AsyncTransactionBase.run
def _archive_span(span, database, query, parameters, **kwargs):
span.layer = Layer.Database
span.tag(TagDbType('Neo4j'))
span.tag(TagDbInstance(database or ''))
span.tag(TagDbStatement(query))
parameters = dict(parameters or {}, **kwargs)
if config.plugin_sql_parameters_max_length and parameters:
parameter = json.dumps(parameters, ensure_ascii=False)
max_len = config.plugin_sql_parameters_max_length
parameter = f'{parameter[0:max_len]}...' if len(
parameter) > max_len else parameter
span.tag(TagDbSqlParameters(f'[{parameter}]'))
def get_peer(address):
return f'{address.host}:{address.port}'
def _sw_session_run(self, query, parameters, **kwargs):
with get_context().new_exit_span(
op='Neo4j/Session/run',
peer=get_peer(self._pool.address),
component=Component.Neo4j) as span:
_archive_span(span, self._config.database, query, parameters, **kwargs)
return _session_run(self, query, parameters, **kwargs)
def _sw_transaction_run(self, query, parameters=None, **kwargs):
with get_context().new_exit_span(
op='Neo4j/Transaction/run',
peer=get_peer(self._connection.unresolved_address),
component=Component.Neo4j) as span:
_archive_span(span, self._database, query, parameters, **kwargs)
return _transaction_run(self, query, parameters, **kwargs)
async def _sw_async_session_run(self, query, parameters, **kwargs):
with get_context().new_exit_span(
op='Neo4j/AsyncSession/run',
peer=get_peer(self._pool.address),
component=Component.Neo4j) as span:
_archive_span(span, self._config.database, query, parameters, **kwargs)
return await _async_session_run(self, query, parameters, **kwargs)
async def _sw_async_transaction_run(self, query, parameters, **kwargs):
with get_context().new_exit_span(
op='Neo4j/AsyncTransaction/run',
peer=get_peer(self._connection.unresolved_address),
component=Component.Neo4j) as span:
_archive_span(span, self._database, query, parameters, **kwargs)
return await _async_transaction_run(self, query, parameters, **kwargs)
Session.run = _sw_session_run
AsyncSession.run = _sw_async_session_run
TransactionBase.run = _sw_transaction_run
AsyncTransactionBase.run = _sw_async_transaction_run