in uamqp/mgmt_operation.py [0:0]
def __init__(self,
session,
target=None,
debug=False,
status_code_field=b'statusCode',
description_fields=b'statusDescription',
encoding='UTF-8'):
self._encoding = encoding
self.connection = session._connection # pylint: disable=protected-access
# self.session = Session(
# connection,
# incoming_window=constants.MAX_FRAME_SIZE_BYTES,
# outgoing_window=constants.MAX_FRAME_SIZE_BYTES)
self.target = self._encode(target or constants.MGMT_TARGET)
status_code_field = self._encode(status_code_field)
description_fields = self._encode(description_fields)
self._responses = {}
self._counter = c_uamqp.TickCounter()
self._mgmt_op = c_uamqp.create_management_operation(session._session, self.target) # pylint: disable=protected-access
self._mgmt_op.set_response_field_names(status_code_field, description_fields)
self._mgmt_op.set_trace(debug)
self.open = None
try:
self._mgmt_op.open(self)
except ValueError:
self.mgmt_error = errors.AMQPConnectionError(
"Unable to open management session. "
"Please confirm URI namespace exists.")
else:
self.mgmt_error = None