in qpid/messaging/endpoints.py [0:0]
def send(self, object, sync=True, timeout=None):
"""
Send a message. If the object passed in is of type L{unicode},
L{str}, L{list}, or L{dict}, it will automatically be wrapped in a
L{Message} and sent. If it is of type L{Message}, it will be sent
directly. If the sender capacity is not L{UNLIMITED} then send
will block until there is available capacity to send the message.
If the timeout parameter is specified, then send will throw an
L{InsufficientCapacity} exception if capacity does not become
available within the specified time.
@type object: unicode, str, list, dict, Message
@param object: the message or content to send
@type sync: boolean
@param sync: if true then block until the message is sent
@type timeout: float
@param timeout: the time to wait for available capacity
"""
if not self.session.connection._connected or self.session.closing:
raise Detached()
self._ecwait(lambda: self.linked, timeout=timeout)
if isinstance(object, Message):
message = object
else:
message = Message(object)
if message.durable is None:
message.durable = self.durable
if self.capacity is not UNLIMITED:
if self.capacity <= 0:
raise InsufficientCapacity("capacity = %s" % self.capacity)
if not self._ecwait(self.available, timeout=timeout):
raise InsufficientCapacity("capacity = %s" % self.capacity)
# XXX: what if we send the same message to multiple senders?
message._sender = self
if self.capacity is not UNLIMITED:
message._sync = sync or self.available() <= int(ceil(self.threshold*self.capacity))
else:
message._sync = sync
self.session.outgoing.append(message)
self.queued += 1
if sync:
self.sync(timeout=timeout)
assert message not in self.session.outgoing
else:
self._wakeup()