in skywalking/plugins/sw_asyncpg.py [0:0]
def install():
from asyncpg import Connection
from asyncpg.protocol import Protocol
def _sw_init(self, *args, **kwargs):
_init(self, *args, **kwargs)
self._protocol._addr = f'{self._addr[0]}:{self._addr[1]}'
self._protocol._database = self._params.database
async def __bind(proto, query, params, future, is_many=False):
peer = getattr(proto, '_addr', '<unavailable>') # just in case
with get_context().new_exit_span(op='PostgreSQL/AsyncPG/bind', peer=peer,
component=Component.AsyncPG) as span:
span.layer = Layer.Database
span.tag(TagDbType('PostgreSQL'))
span.tag(TagDbInstance(getattr(proto, '_database', '<unavailable>')))
span.tag(TagDbStatement(query))
if config.plugin_sql_parameters_max_length and params is not None:
if not is_many:
text = ','.join(str(v) for v in params)
if len(text) > config.plugin_sql_parameters_max_length:
text = f'{text[:config.plugin_sql_parameters_max_length]}...'
span.tag(TagDbSqlParameters(f'[{text}]'))
else:
max_len = config.plugin_sql_parameters_max_length
total_len = 0
text_list = []
for _params in params:
text = f"[{','.join(str(v) for v in _params)}]"
total_len += len(text)
if total_len > max_len:
text_list.append(f'{text[:max_len - total_len]}...')
break
text_list.append(text)
span.tag(TagDbSqlParameters(f"[{','.join(text_list)}]"))
return await future
async def _sw_bind(proto, stmt, params, *args, **kwargs):
return await __bind(proto, stmt.query, params, _bind(proto, stmt, params, *args, **kwargs))
async def _sw_bind_execute(proto, stmt, params, *args, **kwargs):
return await __bind(proto, stmt.query, params, _bind_execute(proto, stmt, params, *args, **kwargs))
async def _sw_bind_execute_many(proto, stmt, params, *args, **kwargs):
return await __bind(proto, stmt.query, params, _bind_execute_many(proto, stmt, params, *args, **kwargs), True)
async def _sw_query(proto, query, *args, **kwargs):
return await __bind(proto, query, (), _query(proto, query, *args, **kwargs))
# async def _sw_execute(proto, stmt, *args, **kwargs): # these may be useful in the future, left here for documentation purposes
# async def _sw_prepare(*args, **kwargs):
_init = Connection.__init__
_bind = Protocol.bind
_bind_execute = Protocol.bind_execute
_bind_execute_many = Protocol.bind_execute_many
_query = Protocol.query
# _execute = Protocol.execute
# _prepare = Protocol.prepare
Connection.__init__ = _sw_init
Protocol.bind = _sw_bind
Protocol.bind_execute = _sw_bind_execute
Protocol.bind_execute_many = _sw_bind_execute_many
Protocol.query = _sw_query