def install()

in skywalking/plugins/sw_neo4j.py [0:0]


def install():
    from neo4j import AsyncSession, Session
    from neo4j._sync.work.transaction import TransactionBase
    from neo4j._async.work.transaction import AsyncTransactionBase

    _session_run = Session.run
    _async_session_run = AsyncSession.run
    _transaction_run = TransactionBase.run
    _async_transaction_run = AsyncTransactionBase.run

    def _archive_span(span, database, query, parameters, **kwargs):
        span.layer = Layer.Database
        span.tag(TagDbType('Neo4j'))
        span.tag(TagDbInstance(database or ''))
        span.tag(TagDbStatement(query))

        parameters = dict(parameters or {}, **kwargs)
        if config.plugin_sql_parameters_max_length and parameters:
            parameter = json.dumps(parameters, ensure_ascii=False)
            max_len = config.plugin_sql_parameters_max_length
            parameter = f'{parameter[0:max_len]}...' if len(
                parameter) > max_len else parameter
            span.tag(TagDbSqlParameters(f'[{parameter}]'))

    def get_peer(address):
        return f'{address.host}:{address.port}'

    def _sw_session_run(self, query, parameters, **kwargs):
        with get_context().new_exit_span(
                op='Neo4j/Session/run',
                peer=get_peer(self._pool.address),
                component=Component.Neo4j) as span:
            _archive_span(span, self._config.database, query, parameters, **kwargs)
            return _session_run(self, query, parameters, **kwargs)

    def _sw_transaction_run(self, query, parameters=None, **kwargs):
        with get_context().new_exit_span(
                op='Neo4j/Transaction/run',
                peer=get_peer(self._connection.unresolved_address),
                component=Component.Neo4j) as span:
            _archive_span(span, self._database, query, parameters, **kwargs)
            return _transaction_run(self, query, parameters, **kwargs)

    async def _sw_async_session_run(self, query, parameters, **kwargs):
        with get_context().new_exit_span(
                op='Neo4j/AsyncSession/run',
                peer=get_peer(self._pool.address),
                component=Component.Neo4j) as span:
            _archive_span(span, self._config.database, query, parameters, **kwargs)
            return await _async_session_run(self, query, parameters, **kwargs)

    async def _sw_async_transaction_run(self, query, parameters, **kwargs):
        with get_context().new_exit_span(
                op='Neo4j/AsyncTransaction/run',
                peer=get_peer(self._connection.unresolved_address),
                component=Component.Neo4j) as span:
            _archive_span(span, self._database, query, parameters, **kwargs)
            return await _async_transaction_run(self, query, parameters, **kwargs)

    Session.run = _sw_session_run
    AsyncSession.run = _sw_async_session_run
    TransactionBase.run = _sw_transaction_run
    AsyncTransactionBase.run = _sw_async_transaction_run