in qpid/messaging/driver.py [0:0]
def send(self, snd, msg):
sst = self._attachments[snd.session]
_snd = self._attachments[snd]
if msg.subject is None or _snd._exchange == "":
rk = _snd._routing_key
else:
rk = msg.subject
if msg.subject is None:
subject = _snd.subject
else:
subject = msg.subject
# XXX: do we need to query to figure out how to create the reply-to interoperably?
if msg.reply_to:
rt = addr2reply_to(msg.reply_to)
else:
rt = None
content_encoding = msg.properties.get("x-amqp-0-10.content-encoding")
dp = DeliveryProperties(routing_key=rk)
mp = MessageProperties(message_id=msg.id,
user_id=msg.user_id,
reply_to=rt,
correlation_id=msg.correlation_id,
app_id = msg.properties.get("x-amqp-0-10.app-id"),
content_type=msg.content_type,
content_encoding=content_encoding,
application_headers=msg.properties)
if subject is not None:
if mp.application_headers is None:
mp.application_headers = {}
mp.application_headers[SUBJECT] = subject
if msg.durable is not None:
if msg.durable:
dp.delivery_mode = delivery_mode.persistent
else:
dp.delivery_mode = delivery_mode.non_persistent
if msg.priority is not None:
dp.priority = msg.priority
if msg.ttl is not None:
dp.ttl = long(msg.ttl*1000)
enc, dec = get_codec(msg.content_type)
try:
body = enc(msg.content)
except AttributeError as e:
# convert to non-blocking EncodeError
raise EncodeError(e)
# XXX: this is not safe for out of order, can this be triggered by pre_ack?
def msg_acked():
# XXX: should we log the ack somehow too?
snd.acked += 1
m = snd.session.outgoing.pop(0)
sst.outgoing_idx -= 1
log.debug("RACK[%s]: %s", sst.session.log_id, msg)
assert msg == m
xfr = MessageTransfer(destination=_snd._exchange, headers=(dp, mp),
payload=body)
if _snd.pre_ack:
sst.write_cmd(xfr)
else:
sst.write_cmd(xfr, msg_acked, sync=msg._sync)
log.debug("SENT[%s]: %s", sst.session.log_id, msg)
if _snd.pre_ack:
msg_acked()