in qpid/peer.py [0:0]
def invoke_method(self, frame, content = None):
if frame.method.result:
cmd_id = self.completion.command_id
future = Future()
self.futures[cmd_id] = future
if frame.method.klass.name == "basic" and frame.method.name == "publish":
self._flow_control_wait_condition.acquire()
try:
self.check_flow_control()
self.write(frame, content)
finally:
self._flow_control_wait_condition.release()
else:
self.write(frame, content)
try:
# here we depend on all nowait fields being named nowait
f = frame.method.fields.byname["nowait"]
nowait = frame.args[frame.method.fields.index(f)]
except KeyError:
nowait = False
try:
if not nowait and frame.method.responses:
resp = self.responses.get()
if resp.method.content:
content = read_content(self.responses)
else:
content = None
if resp.method in frame.method.responses:
return Message(self, resp, content)
else:
raise ValueError(resp)
elif frame.method.result:
if self.synchronous:
fr = future.get_response(timeout=10)
if self._closed:
raise Closed(self.reason)
return fr
else:
return future
elif self.synchronous and not frame.method.response \
and self.use_execution_layer and frame.method.is_l4_command():
self.execution_sync()
completed = self.completion.wait(timeout=10)
if self._closed:
raise Closed(self.reason)
if not completed:
self.closed("Timed-out waiting for completion of %s" % frame)
except QueueClosed as e:
if self._closed:
raise Closed(self.reason)
else:
raise e