in src/message.c [1843:2065]
void qd_message_send(qd_message_t *in_msg,
qd_link_t *link,
bool strip_annotations,
bool *q3_stalled)
{
qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
qd_message_content_t *content = msg->content;
pn_link_t *pnl = qd_link_pn(link);
*q3_stalled = false;
if (msg->sent_depth < QD_DEPTH_MESSAGE_ANNOTATIONS) {
if (IS_ATOMIC_FLAG_SET(&content->aborted)) {
// Message is aborted before any part of it has been sent.
// Declare the message to be sent,
SET_ATOMIC_FLAG(&msg->send_complete);
// If the outgoing delivery is not already aborted then abort it.
if (!pn_delivery_aborted(pn_link_current(pnl))) {
pn_delivery_abort(pn_link_current(pnl));
}
return;
}
msg->cursor.buffer = DEQ_HEAD(content->buffers);
msg->cursor.cursor = qd_buffer_base(msg->cursor.buffer);
// Since link-routed messages do not set router annotations they will
// skip the following (content->ma_disabled will be true) and unconditionally
// start sending from the first octet of the content.
if (!content->ma_disabled) {
//
// Send header if present
//
const int header_consume = content->section_message_header.length + content->section_message_header.hdr_length;
if (header_consume > 0) {
assert(msg->cursor.cursor == content->section_message_header.offset + qd_buffer_base(msg->cursor.buffer));
advance_guarded(&msg->cursor.cursor, &msg->cursor.buffer, header_consume, send_handler, (void*) pnl);
}
//
// Send delivery annotation if present
//
const int da_consume = content->section_delivery_annotation.length + content->section_delivery_annotation.hdr_length;
if (da_consume > 0) {
assert(msg->cursor.cursor == content->section_delivery_annotation.offset + qd_buffer_base(msg->cursor.buffer));
advance_guarded(&msg->cursor.cursor, &msg->cursor.buffer, da_consume, send_handler, (void*) pnl);
}
//
// Send the message annotations section
//
uint8_t ma_header[12]; // max length for MA section and map header
int ma_header_len; // size of ma_header content
qd_buffer_list_t ma_trailer = DEQ_EMPTY;
if (strip_annotations) {
// send the original user message annotations only (if present)
ma_header_len = restore_user_message_annotations(msg, ma_header);
} else {
ma_header_len = compose_router_message_annotations(msg, ma_header, &ma_trailer);
}
if (ma_header_len) {
//
// send annotation section and map header
//
pn_link_send(pnl, (char*) ma_header, ma_header_len);
//
// Now send any annotation set by the original endpoint
//
if (content->ma_user_annotations.remaining) {
qd_buffer_t *buf2 = content->ma_user_annotations.buffer;
const uint8_t *cursor2 = content->ma_user_annotations.cursor;
advance_guarded(&cursor2, &buf2,
content->ma_user_annotations.remaining,
send_handler, (void*) pnl);
}
//
// Next send router annotations
//
qd_buffer_t *ta_buf = DEQ_HEAD(ma_trailer);
while (ta_buf) {
char *to_send = (char*) qd_buffer_base(ta_buf);
pn_link_send(pnl, to_send, qd_buffer_size(ta_buf));
ta_buf = DEQ_NEXT(ta_buf);
}
qd_buffer_list_free_buffers(&ma_trailer);
}
//
// Skip over replaced message annotations
//
const int ma_consume = content->section_message_annotation.hdr_length + content->section_message_annotation.length;
if (ma_consume > 0) {
assert(msg->cursor.cursor == content->section_message_annotation.offset + qd_buffer_base(msg->cursor.buffer));
advance_guarded(&msg->cursor.cursor, &msg->cursor.buffer, ma_consume, 0, 0);
}
}
msg->sent_depth = QD_DEPTH_MESSAGE_ANNOTATIONS;
}
qd_buffer_t *buf = msg->cursor.buffer;
qd_message_q2_unblocker_t q2_unblock = {0};
pn_session_t *pns = pn_link_session(pnl);
const size_t q3_upper = BUFFER_SIZE * QD_QLIMIT_Q3_UPPER;
while (!IS_ATOMIC_FLAG_SET(&content->aborted)
&& buf
&& pn_session_outgoing_bytes(pns) < q3_upper) {
// This will send the remaining data in the buffer if any. There may be
// zero bytes left to send if we stopped here last time and there was
// no next buf
//
size_t buf_size = qd_buffer_size(buf);
int num_bytes_to_send = buf_size - (msg->cursor.cursor - qd_buffer_base(buf));
ssize_t bytes_sent = 0;
if (num_bytes_to_send > 0) {
bytes_sent = pn_link_send(pnl, (const char*)msg->cursor.cursor, num_bytes_to_send);
}
LOCK(content->lock);
if (bytes_sent < 0) {
//
// send error - likely the link has failed and we will eventually
// get a link detach event for this link
//
SET_ATOMIC_FLAG(&content->aborted);
SET_ATOMIC_FLAG(&msg->send_complete);
if (!pn_delivery_aborted(pn_link_current(pnl))) {
pn_delivery_abort(pn_link_current(pnl));
}
qd_log(qd_message_log_source(),
QD_LOG_WARNING,
"Sending data on link %s has failed (code=%zi)",
pn_link_name(pnl), bytes_sent);
} else {
msg->cursor.cursor += bytes_sent;
if (bytes_sent == num_bytes_to_send) {
//
// sent the whole buffer.
// Can we move to the next buffer? Only if there is a next buffer
// or we are at the end and done sending this message
//
qd_buffer_t *next_buf = DEQ_NEXT(buf);
bool complete = qd_message_receive_complete(in_msg);
if (next_buf || complete) {
//
// this buffer may be freed if there are no more references to it
//
uint32_t ref_count = (msg->is_fanout) ? qd_buffer_dec_fanout(buf) : 1;
if (ref_count == 1) {
DEQ_REMOVE(content->buffers, buf);
qd_buffer_free(buf);
++content->buffers_freed;
// by freeing a buffer there now may be room to restart a
// stalled message receiver
if (content->q2_input_holdoff) {
if (_Q2_holdoff_should_unblock_LH(content)) {
// wake up receive side
// Note: clearing holdoff here is easy compared to
// clearing it in the deferred callback. Tracing
// shows that rx_handler may run and subsequently
// set input holdoff before the deferred handler
// runs.
content->q2_input_holdoff = false;
q2_unblock = content->q2_unblocker;
}
}
} // end free buffer
msg->cursor.buffer = next_buf;
msg->cursor.cursor = (next_buf) ? qd_buffer_base(next_buf) : 0;
SET_ATOMIC_BOOL(&msg->send_complete, (complete && !next_buf));
}
buf = next_buf;
} else if (num_bytes_to_send && bytes_sent == 0) {
//
// the proton link cannot take anymore data,
// retry later...
//
buf = 0;
qd_log(qd_message_log_source(), QD_LOG_DEBUG,
"Link %s output limit reached", pn_link_name(pnl));
}
}
UNLOCK(content->lock);
}
// the Q2 handler must be invoked outside the lock
if (q2_unblock.handler)
q2_unblock.handler(q2_unblock.context);
if (IS_ATOMIC_FLAG_SET(&content->aborted)) {
if (pn_link_current(pnl)) {
SET_ATOMIC_FLAG(&msg->send_complete);
if (!pn_delivery_aborted(pn_link_current(pnl))) {
pn_delivery_abort(pn_link_current(pnl));
}
}
}
*q3_stalled = (pn_session_outgoing_bytes(pns) >= q3_upper);
}