in source/websocket.c [569:771]
static void s_try_write_outgoing_frames(struct aws_websocket *websocket) {
AWS_ASSERT(aws_channel_thread_is_callers_thread(websocket->channel_slot->channel));
int err;
/* Check whether we should be writing data */
if (!websocket->thread_data.current_outgoing_frame &&
aws_linked_list_empty(&websocket->thread_data.outgoing_frame_list)) {
AWS_LOGF_TRACE(AWS_LS_HTTP_WEBSOCKET, "id=%p: No data to write at this time.", (void *)websocket);
return;
}
if (websocket->thread_data.is_waiting_for_write_completion) {
AWS_LOGF_TRACE(
AWS_LS_HTTP_WEBSOCKET,
"id=%p: Waiting until outstanding aws_io_message is written to socket before sending more data.",
(void *)websocket);
return;
}
if (websocket->thread_data.is_writing_stopped) {
AWS_LOGF_TRACE(AWS_LS_HTTP_WEBSOCKET, "id=%p: Websocket is no longer sending data.", (void *)websocket);
return;
}
/* Acquire aws_io_message */
struct aws_io_message *io_msg = aws_channel_slot_acquire_max_message_for_write(websocket->channel_slot);
if (!io_msg) {
AWS_LOGF_ERROR(
AWS_LS_HTTP_WEBSOCKET,
"id=%p: Failed acquire message from pool, error %d (%s).",
(void *)websocket,
aws_last_error(),
aws_error_name(aws_last_error()));
goto error;
}
io_msg->user_data = websocket;
io_msg->on_completion = s_io_message_write_completed;
/* Loop through frames, writing their data into the io_msg */
bool wrote_close_frame = false;
while (!websocket->thread_data.is_writing_stopped) {
if (websocket->thread_data.current_outgoing_frame) {
AWS_LOGF_TRACE(
AWS_LS_HTTP_WEBSOCKET,
"id=%p: Resuming write of frame=%p opcode=%" PRIu8 "(%s) payload-length=%" PRIu64 ".",
(void *)websocket,
(void *)websocket->thread_data.current_outgoing_frame,
websocket->thread_data.current_outgoing_frame->def.opcode,
aws_websocket_opcode_str(websocket->thread_data.current_outgoing_frame->def.opcode),
websocket->thread_data.current_outgoing_frame->def.payload_length);
} else {
/* We're not in the middle of encoding a frame, so pop off the next one to encode. */
if (aws_linked_list_empty(&websocket->thread_data.outgoing_frame_list)) {
AWS_LOGF_TRACE(AWS_LS_HTTP_WEBSOCKET, "id=%p: No more frames to write.", (void *)websocket);
break;
}
struct aws_linked_list_node *node = aws_linked_list_pop_front(&websocket->thread_data.outgoing_frame_list);
websocket->thread_data.current_outgoing_frame = AWS_CONTAINER_OF(node, struct outgoing_frame, node);
struct aws_websocket_frame frame = {
.fin = websocket->thread_data.current_outgoing_frame->def.fin,
.opcode = websocket->thread_data.current_outgoing_frame->def.opcode,
.payload_length = websocket->thread_data.current_outgoing_frame->def.payload_length,
};
/* RFC-6455 Section 5.3 Client-to-Server Masking
* Clients must mask payload with key derived from an unpredictable source of entropy. */
if (!websocket->is_server) {
frame.masked = true;
/* TODO: faster source of random (but still seeded by device_random) */
struct aws_byte_buf masking_key_buf = aws_byte_buf_from_empty_array(frame.masking_key, 4);
err = aws_device_random_buffer(&masking_key_buf);
if (err) {
AWS_LOGF_ERROR(
AWS_LS_HTTP_WEBSOCKET,
"id=%p: Failed to derive masking key, error %d (%s).",
(void *)websocket,
aws_last_error(),
aws_error_name(aws_last_error()));
goto error;
}
}
err = aws_websocket_encoder_start_frame(&websocket->thread_data.encoder, &frame);
if (err) {
AWS_LOGF_ERROR(
AWS_LS_HTTP_WEBSOCKET,
"id=%p: Failed to start frame encoding, error %d (%s).",
(void *)websocket,
aws_last_error(),
aws_error_name(aws_last_error()));
goto error;
}
AWS_LOGF_TRACE(
AWS_LS_HTTP_WEBSOCKET,
"id=%p: Start writing frame=%p opcode=%" PRIu8 "(%s) payload-length=%" PRIu64 ".",
(void *)websocket,
(void *)websocket->thread_data.current_outgoing_frame,
websocket->thread_data.current_outgoing_frame->def.opcode,
aws_websocket_opcode_str(websocket->thread_data.current_outgoing_frame->def.opcode),
websocket->thread_data.current_outgoing_frame->def.payload_length);
}
err = aws_websocket_encoder_process(&websocket->thread_data.encoder, &io_msg->message_data);
if (err) {
AWS_LOGF_ERROR(
AWS_LS_HTTP_WEBSOCKET,
"id=%p: Frame encoding failed with error %d (%s).",
(void *)websocket,
aws_last_error(),
aws_error_name(aws_last_error()));
goto error;
}
if (aws_websocket_encoder_is_frame_in_progress(&websocket->thread_data.encoder)) {
AWS_LOGF_TRACE(
AWS_LS_HTTP_WEBSOCKET,
"id=%p: Outgoing frame still in progress, but no more data can be written at this time.",
(void *)websocket);
break;
}
if (websocket->thread_data.current_outgoing_frame->def.opcode == AWS_WEBSOCKET_OPCODE_CLOSE) {
wrote_close_frame = true;
}
s_destroy_outgoing_frame(websocket, websocket->thread_data.current_outgoing_frame, AWS_ERROR_SUCCESS);
websocket->thread_data.current_outgoing_frame = NULL;
if (wrote_close_frame) {
break;
}
}
/* If payload stream didn't have any bytes available to read right now, then the aws_io_message might be empty.
* If this is the case schedule a task to try again in the future. */
if (io_msg->message_data.len == 0) {
AWS_LOGF_TRACE(
AWS_LS_HTTP_WEBSOCKET,
"id=%p: Reading from payload stream would block, will try again later.",
(void *)websocket);
if (!websocket->thread_data.is_waiting_on_payload_stream_task) {
websocket->thread_data.is_waiting_on_payload_stream_task = true;
/* Future Optimization Idea: Minimize work while we wait. Use some kind of backoff for the retry timing,
* or have some way for stream to notify when more data is available. */
aws_channel_schedule_task_now(websocket->channel_slot->channel, &websocket->waiting_on_payload_stream_task);
}
aws_mem_release(io_msg->allocator, io_msg);
return;
}
/* Prepare to send aws_io_message up the channel.
* Note that the write-completion callback may fire before send_message() returns */
/* If CLOSE frame was written, that's the last data we'll write */
if (wrote_close_frame) {
s_stop_writing(websocket, AWS_ERROR_HTTP_WEBSOCKET_CLOSE_FRAME_SENT);
}
AWS_LOGF_TRACE(
AWS_LS_HTTP_WEBSOCKET,
"id=%p: Sending aws_io_message of size %zu in write direction.",
(void *)websocket,
io_msg->message_data.len);
websocket->thread_data.is_waiting_for_write_completion = true;
err = aws_channel_slot_send_message(websocket->channel_slot, io_msg, AWS_CHANNEL_DIR_WRITE);
if (err) {
websocket->thread_data.is_waiting_for_write_completion = false;
AWS_LOGF_ERROR(
AWS_LS_HTTP_WEBSOCKET,
"id=%p: Failed to send message in write direction, error %d (%s).",
(void *)websocket,
aws_last_error(),
aws_error_name(aws_last_error()));
goto error;
}
/* Finish shutdown if we were waiting for the CLOSE frame to be written */
if (wrote_close_frame && websocket->thread_data.is_shutting_down_and_waiting_for_close_frame_to_be_written) {
AWS_LOGF_TRACE(
AWS_LS_HTTP_WEBSOCKET, "id=%p: CLOSE frame sent, finishing handler shutdown sequence.", (void *)websocket);
s_finish_shutdown(websocket);
}
return;
error:
if (io_msg) {
aws_mem_release(io_msg->allocator, io_msg);
}
s_shutdown_due_to_write_err(websocket, aws_last_error());
}