def send()

in qpid/messaging/driver.py [0:0]


  def send(self, snd, msg):
    sst = self._attachments[snd.session]
    _snd = self._attachments[snd]

    if msg.subject is None or _snd._exchange == "":
      rk = _snd._routing_key
    else:
      rk = msg.subject

    if msg.subject is None:
      subject = _snd.subject
    else:
      subject = msg.subject

    # XXX: do we need to query to figure out how to create the reply-to interoperably?
    if msg.reply_to:
      rt = addr2reply_to(msg.reply_to)
    else:
      rt = None
    content_encoding = msg.properties.get("x-amqp-0-10.content-encoding")
    dp = DeliveryProperties(routing_key=rk)
    mp = MessageProperties(message_id=msg.id,
                           user_id=msg.user_id,
                           reply_to=rt,
                           correlation_id=msg.correlation_id,
                           app_id = msg.properties.get("x-amqp-0-10.app-id"),
                           content_type=msg.content_type,
                           content_encoding=content_encoding,
                           application_headers=msg.properties)
    if subject is not None:
      if mp.application_headers is None:
        mp.application_headers = {}
      mp.application_headers[SUBJECT] = subject
    if msg.durable is not None:
      if msg.durable:
        dp.delivery_mode = delivery_mode.persistent
      else:
        dp.delivery_mode = delivery_mode.non_persistent
    if msg.priority is not None:
      dp.priority = msg.priority
    if msg.ttl is not None:
      dp.ttl = long(msg.ttl*1000)
    enc, dec = get_codec(msg.content_type)
    try:
      body = enc(msg.content)
    except AttributeError as e:
      # convert to non-blocking EncodeError
      raise EncodeError(e)

    # XXX: this is not safe for out of order, can this be triggered by pre_ack?
    def msg_acked():
      # XXX: should we log the ack somehow too?
      snd.acked += 1
      m = snd.session.outgoing.pop(0)
      sst.outgoing_idx -= 1
      log.debug("RACK[%s]: %s", sst.session.log_id, msg)
      assert msg == m

    xfr = MessageTransfer(destination=_snd._exchange, headers=(dp, mp),
                          payload=body)

    if _snd.pre_ack:
      sst.write_cmd(xfr)
    else:
      sst.write_cmd(xfr, msg_acked, sync=msg._sync)

    log.debug("SENT[%s]: %s", sst.session.log_id, msg)

    if _snd.pre_ack:
      msg_acked()