in skywalking/plugins/sw_neo4j.py [0:0]
def install():
from neo4j import AsyncSession, Session
from neo4j._sync.work.transaction import TransactionBase
from neo4j._async.work.transaction import AsyncTransactionBase
_session_run = Session.run
_async_session_run = AsyncSession.run
_transaction_run = TransactionBase.run
_async_transaction_run = AsyncTransactionBase.run
def _archive_span(span, database, query, parameters, **kwargs):
span.layer = Layer.Database
span.tag(TagDbType('Neo4j'))
span.tag(TagDbInstance(database or ''))
span.tag(TagDbStatement(query))
parameters = dict(parameters or {}, **kwargs)
if config.plugin_sql_parameters_max_length and parameters:
parameter = json.dumps(parameters, ensure_ascii=False)
max_len = config.plugin_sql_parameters_max_length
parameter = f'{parameter[0:max_len]}...' if len(
parameter) > max_len else parameter
span.tag(TagDbSqlParameters(f'[{parameter}]'))
def get_peer(address):
return f'{address.host}:{address.port}'
def _sw_session_run(self, query, parameters, **kwargs):
with get_context().new_exit_span(
op='Neo4j/Session/run',
peer=get_peer(self._pool.address),
component=Component.Neo4j) as span:
_archive_span(span, self._config.database, query, parameters, **kwargs)
return _session_run(self, query, parameters, **kwargs)
def _sw_transaction_run(self, query, parameters=None, **kwargs):
with get_context().new_exit_span(
op='Neo4j/Transaction/run',
peer=get_peer(self._connection.unresolved_address),
component=Component.Neo4j) as span:
_archive_span(span, self._database, query, parameters, **kwargs)
return _transaction_run(self, query, parameters, **kwargs)
async def _sw_async_session_run(self, query, parameters, **kwargs):
with get_context().new_exit_span(
op='Neo4j/AsyncSession/run',
peer=get_peer(self._pool.address),
component=Component.Neo4j) as span:
_archive_span(span, self._config.database, query, parameters, **kwargs)
return await _async_session_run(self, query, parameters, **kwargs)
async def _sw_async_transaction_run(self, query, parameters, **kwargs):
with get_context().new_exit_span(
op='Neo4j/AsyncTransaction/run',
peer=get_peer(self._connection.unresolved_address),
component=Component.Neo4j) as span:
_archive_span(span, self._database, query, parameters, **kwargs)
return await _async_transaction_run(self, query, parameters, **kwargs)
Session.run = _sw_session_run
AsyncSession.run = _sw_async_session_run
TransactionBase.run = _sw_transaction_run
AsyncTransactionBase.run = _sw_async_transaction_run