void qd_message_send()

in src/message.c [1843:2065]


void qd_message_send(qd_message_t *in_msg,
                     qd_link_t    *link,
                     bool          strip_annotations,
                     bool         *q3_stalled)
{
    qd_message_pvt_t     *msg     = (qd_message_pvt_t*) in_msg;
    qd_message_content_t *content = msg->content;
    pn_link_t            *pnl     = qd_link_pn(link);

    *q3_stalled                   = false;

    if (msg->sent_depth < QD_DEPTH_MESSAGE_ANNOTATIONS) {

        if (IS_ATOMIC_FLAG_SET(&content->aborted)) {
            // Message is aborted before any part of it has been sent.
            // Declare the message to be sent,
            SET_ATOMIC_FLAG(&msg->send_complete);
            // If the outgoing delivery is not already aborted then abort it.
            if (!pn_delivery_aborted(pn_link_current(pnl))) {
                pn_delivery_abort(pn_link_current(pnl));
            }
            return;
        }

        msg->cursor.buffer = DEQ_HEAD(content->buffers);
        msg->cursor.cursor = qd_buffer_base(msg->cursor.buffer);

        // Since link-routed messages do not set router annotations they will
        // skip the following (content->ma_disabled will be true) and unconditionally
        // start sending from the first octet of the content.

        if (!content->ma_disabled) {
            //
            // Send header if present
            //
            const int header_consume = content->section_message_header.length + content->section_message_header.hdr_length;
            if (header_consume > 0) {
                assert(msg->cursor.cursor == content->section_message_header.offset + qd_buffer_base(msg->cursor.buffer));
                advance_guarded(&msg->cursor.cursor, &msg->cursor.buffer, header_consume, send_handler, (void*) pnl);
            }

            //
            // Send delivery annotation if present
            //
            const int da_consume = content->section_delivery_annotation.length + content->section_delivery_annotation.hdr_length;
            if (da_consume > 0) {
                assert(msg->cursor.cursor == content->section_delivery_annotation.offset + qd_buffer_base(msg->cursor.buffer));
                advance_guarded(&msg->cursor.cursor, &msg->cursor.buffer, da_consume, send_handler, (void*) pnl);
            }

            //
            // Send the message annotations section
            //

            uint8_t ma_header[12];  // max length for MA section and map header
            int ma_header_len;      // size of ma_header content
            qd_buffer_list_t ma_trailer = DEQ_EMPTY;

            if (strip_annotations) {
                // send the original user message annotations only (if present)
                ma_header_len = restore_user_message_annotations(msg, ma_header);
            } else {
                ma_header_len = compose_router_message_annotations(msg, ma_header, &ma_trailer);
            }

            if (ma_header_len) {
                //
                // send annotation section and map header
                //
                pn_link_send(pnl, (char*) ma_header, ma_header_len);

                //
                // Now send any annotation set by the original endpoint
                //
                if (content->ma_user_annotations.remaining) {
                    qd_buffer_t *buf2      = content->ma_user_annotations.buffer;
                    const uint8_t *cursor2 = content->ma_user_annotations.cursor;
                    advance_guarded(&cursor2, &buf2,
                                    content->ma_user_annotations.remaining,
                                    send_handler, (void*) pnl);
                }

                //
                // Next send router annotations
                //
                qd_buffer_t *ta_buf = DEQ_HEAD(ma_trailer);
                while (ta_buf) {
                    char *to_send = (char*) qd_buffer_base(ta_buf);
                    pn_link_send(pnl, to_send, qd_buffer_size(ta_buf));
                    ta_buf = DEQ_NEXT(ta_buf);
                }
                qd_buffer_list_free_buffers(&ma_trailer);
            }

            //
            // Skip over replaced message annotations
            //
            const int ma_consume = content->section_message_annotation.hdr_length + content->section_message_annotation.length;
            if (ma_consume > 0) {
                assert(msg->cursor.cursor == content->section_message_annotation.offset + qd_buffer_base(msg->cursor.buffer));
                advance_guarded(&msg->cursor.cursor, &msg->cursor.buffer, ma_consume, 0, 0);
            }
        }

        msg->sent_depth = QD_DEPTH_MESSAGE_ANNOTATIONS;
    }

    qd_buffer_t *buf = msg->cursor.buffer;

    qd_message_q2_unblocker_t  q2_unblock = {0};
    pn_session_t              *pns        = pn_link_session(pnl);
    const size_t               q3_upper   = BUFFER_SIZE * QD_QLIMIT_Q3_UPPER;

    while (!IS_ATOMIC_FLAG_SET(&content->aborted)
           && buf
           && pn_session_outgoing_bytes(pns) < q3_upper) {

        // This will send the remaining data in the buffer if any. There may be
        // zero bytes left to send if we stopped here last time and there was
        // no next buf
        //
        size_t buf_size = qd_buffer_size(buf);
        int num_bytes_to_send = buf_size - (msg->cursor.cursor - qd_buffer_base(buf));
        ssize_t bytes_sent = 0;
        if (num_bytes_to_send > 0) {
            bytes_sent = pn_link_send(pnl, (const char*)msg->cursor.cursor, num_bytes_to_send);
        }

        LOCK(content->lock);

        if (bytes_sent < 0) {
            //
            // send error - likely the link has failed and we will eventually
            // get a link detach event for this link
            //
            SET_ATOMIC_FLAG(&content->aborted);
            SET_ATOMIC_FLAG(&msg->send_complete);
            if (!pn_delivery_aborted(pn_link_current(pnl))) {
                pn_delivery_abort(pn_link_current(pnl));
            }

            qd_log(qd_message_log_source(),
                   QD_LOG_WARNING,
                   "Sending data on link %s has failed (code=%zi)",
                   pn_link_name(pnl), bytes_sent);

        } else {

            msg->cursor.cursor += bytes_sent;

            if (bytes_sent == num_bytes_to_send) {
                //
                // sent the whole buffer.
                // Can we move to the next buffer?  Only if there is a next buffer
                // or we are at the end and done sending this message
                //
                qd_buffer_t *next_buf = DEQ_NEXT(buf);
                bool complete = qd_message_receive_complete(in_msg);

                if (next_buf || complete) {
                    //
                    // this buffer may be freed if there are no more references to it
                    //
                    uint32_t ref_count = (msg->is_fanout) ? qd_buffer_dec_fanout(buf) : 1;
                    if (ref_count == 1) {

                        DEQ_REMOVE(content->buffers, buf);
                        qd_buffer_free(buf);
                        ++content->buffers_freed;

                        // by freeing a buffer there now may be room to restart a
                        // stalled message receiver
                        if (content->q2_input_holdoff) {
                            if (_Q2_holdoff_should_unblock_LH(content)) {
                                // wake up receive side
                                // Note: clearing holdoff here is easy compared to
                                // clearing it in the deferred callback. Tracing
                                // shows that rx_handler may run and subsequently
                                // set input holdoff before the deferred handler
                                // runs.
                                content->q2_input_holdoff = false;
                                q2_unblock = content->q2_unblocker;
                            }
                        }
                    }   // end free buffer

                    msg->cursor.buffer = next_buf;
                    msg->cursor.cursor = (next_buf) ? qd_buffer_base(next_buf) : 0;

                    SET_ATOMIC_BOOL(&msg->send_complete, (complete && !next_buf));
                }

                buf = next_buf;

            } else if (num_bytes_to_send && bytes_sent == 0) {
                //
                // the proton link cannot take anymore data,
                // retry later...
                //
                buf = 0;
                qd_log(qd_message_log_source(), QD_LOG_DEBUG,
                       "Link %s output limit reached", pn_link_name(pnl));
            }
        }

        UNLOCK(content->lock);
    }

    // the Q2 handler must be invoked outside the lock
    if (q2_unblock.handler)
        q2_unblock.handler(q2_unblock.context);

    if (IS_ATOMIC_FLAG_SET(&content->aborted)) {
        if (pn_link_current(pnl)) {
            SET_ATOMIC_FLAG(&msg->send_complete);
            if (!pn_delivery_aborted(pn_link_current(pnl))) {
                pn_delivery_abort(pn_link_current(pnl));
            }
        }
    }

    *q3_stalled = (pn_session_outgoing_bytes(pns) >= q3_upper);
}