def install()

in skywalking/plugins/sw_asyncpg.py [0:0]


def install():
    from asyncpg import Connection
    from asyncpg.protocol import Protocol

    def _sw_init(self, *args, **kwargs):
        _init(self, *args, **kwargs)
        self._protocol._addr = f'{self._addr[0]}:{self._addr[1]}'
        self._protocol._database = self._params.database

    async def __bind(proto, query, params, future, is_many=False):
        peer = getattr(proto, '_addr', '<unavailable>')  # just in case

        with get_context().new_exit_span(op='PostgreSQL/AsyncPG/bind', peer=peer,
                                         component=Component.AsyncPG) as span:
            span.layer = Layer.Database

            span.tag(TagDbType('PostgreSQL'))
            span.tag(TagDbInstance(getattr(proto, '_database', '<unavailable>')))
            span.tag(TagDbStatement(query))

            if config.plugin_sql_parameters_max_length and params is not None:
                if not is_many:
                    text = ','.join(str(v) for v in params)

                    if len(text) > config.plugin_sql_parameters_max_length:
                        text = f'{text[:config.plugin_sql_parameters_max_length]}...'

                    span.tag(TagDbSqlParameters(f'[{text}]'))

                else:
                    max_len = config.plugin_sql_parameters_max_length
                    total_len = 0
                    text_list = []

                    for _params in params:
                        text = f"[{','.join(str(v) for v in _params)}]"
                        total_len += len(text)

                        if total_len > max_len:
                            text_list.append(f'{text[:max_len - total_len]}...')

                            break

                        text_list.append(text)

                    span.tag(TagDbSqlParameters(f"[{','.join(text_list)}]"))

            return await future

    async def _sw_bind(proto, stmt, params, *args, **kwargs):
        return await __bind(proto, stmt.query, params, _bind(proto, stmt, params, *args, **kwargs))

    async def _sw_bind_execute(proto, stmt, params, *args, **kwargs):
        return await __bind(proto, stmt.query, params, _bind_execute(proto, stmt, params, *args, **kwargs))

    async def _sw_bind_execute_many(proto, stmt, params, *args, **kwargs):
        return await __bind(proto, stmt.query, params, _bind_execute_many(proto, stmt, params, *args, **kwargs), True)

    async def _sw_query(proto, query, *args, **kwargs):
        return await __bind(proto, query, (), _query(proto, query, *args, **kwargs))

    # async def _sw_execute(proto, stmt, *args, **kwargs):  # these may be useful in the future, left here for documentation purposes
    # async def _sw_prepare(*args, **kwargs):

    _init = Connection.__init__
    _bind = Protocol.bind
    _bind_execute = Protocol.bind_execute
    _bind_execute_many = Protocol.bind_execute_many
    _query = Protocol.query
    # _execute = Protocol.execute
    # _prepare = Protocol.prepare

    Connection.__init__ = _sw_init
    Protocol.bind = _sw_bind
    Protocol.bind_execute = _sw_bind_execute
    Protocol.bind_execute_many = _sw_bind_execute_many
    Protocol.query = _sw_query