in uamqp/mgmt_operation.py [0:0]
def execute(self, operation, op_type, message, timeout=0):
"""Execute a request and wait on a response.
:param operation: The type of operation to be performed. This value will
be service-specific, but common values include READ, CREATE and UPDATE.
This value will be added as an application property on the message.
:type operation: bytes or str
:param op_type: The type on which to carry out the operation. This will
be specific to the entities of the service. This value will be added as
an application property on the message.
:type op_type: bytes or str
:param message: The message to send in the management request.
:type message: ~uamqp.message.Message
:param timeout: Provide an optional timeout in milliseconds within which a response
to the management request must be received.
:type timeout: float
:rtype: ~uamqp.message.Message
"""
start_time = self._counter.get_current_ms()
operation_id = str(uuid.uuid4())
self._responses[operation_id] = None
operation = self._encode(operation)
op_type = self._encode(op_type)
def on_complete(operation_result, status_code, description, wrapped_message):
result = constants.MgmtExecuteResult(operation_result)
if result != constants.MgmtExecuteResult.Ok:
_logger.error(
"Failed to complete mgmt operation.\nStatus code: %r\nMessage: %r",
status_code, description)
message = Message(message=wrapped_message) if wrapped_message else None
self._responses[operation_id] = (status_code, message, description)
self._mgmt_op.execute(operation, op_type, None, message.get_message(), on_complete)
while not self._responses[operation_id] and not self.mgmt_error:
if timeout > 0:
now = self._counter.get_current_ms()
if (now - start_time) >= timeout:
raise compat.TimeoutException("Failed to receive mgmt response in {}ms".format(timeout))
self.connection.work()
if self.mgmt_error:
raise self.mgmt_error
response = self._responses.pop(operation_id)
return response