static void s_try_write_outgoing_frames()

in source/websocket.c [569:771]


static void s_try_write_outgoing_frames(struct aws_websocket *websocket) {
    AWS_ASSERT(aws_channel_thread_is_callers_thread(websocket->channel_slot->channel));
    int err;

    /* Check whether we should be writing data */
    if (!websocket->thread_data.current_outgoing_frame &&
        aws_linked_list_empty(&websocket->thread_data.outgoing_frame_list)) {

        AWS_LOGF_TRACE(AWS_LS_HTTP_WEBSOCKET, "id=%p: No data to write at this time.", (void *)websocket);
        return;
    }

    if (websocket->thread_data.is_waiting_for_write_completion) {
        AWS_LOGF_TRACE(
            AWS_LS_HTTP_WEBSOCKET,
            "id=%p: Waiting until outstanding aws_io_message is written to socket before sending more data.",
            (void *)websocket);
        return;
    }

    if (websocket->thread_data.is_writing_stopped) {
        AWS_LOGF_TRACE(AWS_LS_HTTP_WEBSOCKET, "id=%p: Websocket is no longer sending data.", (void *)websocket);
        return;
    }

    /* Acquire aws_io_message */
    struct aws_io_message *io_msg = aws_channel_slot_acquire_max_message_for_write(websocket->channel_slot);
    if (!io_msg) {
        AWS_LOGF_ERROR(
            AWS_LS_HTTP_WEBSOCKET,
            "id=%p: Failed acquire message from pool, error %d (%s).",
            (void *)websocket,
            aws_last_error(),
            aws_error_name(aws_last_error()));
        goto error;
    }

    io_msg->user_data = websocket;
    io_msg->on_completion = s_io_message_write_completed;

    /* Loop through frames, writing their data into the io_msg */
    bool wrote_close_frame = false;
    while (!websocket->thread_data.is_writing_stopped) {
        if (websocket->thread_data.current_outgoing_frame) {
            AWS_LOGF_TRACE(
                AWS_LS_HTTP_WEBSOCKET,
                "id=%p: Resuming write of frame=%p opcode=%" PRIu8 "(%s) payload-length=%" PRIu64 ".",
                (void *)websocket,
                (void *)websocket->thread_data.current_outgoing_frame,
                websocket->thread_data.current_outgoing_frame->def.opcode,
                aws_websocket_opcode_str(websocket->thread_data.current_outgoing_frame->def.opcode),
                websocket->thread_data.current_outgoing_frame->def.payload_length);

        } else {
            /* We're not in the middle of encoding a frame, so pop off the next one to encode. */
            if (aws_linked_list_empty(&websocket->thread_data.outgoing_frame_list)) {
                AWS_LOGF_TRACE(AWS_LS_HTTP_WEBSOCKET, "id=%p: No more frames to write.", (void *)websocket);
                break;
            }

            struct aws_linked_list_node *node = aws_linked_list_pop_front(&websocket->thread_data.outgoing_frame_list);
            websocket->thread_data.current_outgoing_frame = AWS_CONTAINER_OF(node, struct outgoing_frame, node);

            struct aws_websocket_frame frame = {
                .fin = websocket->thread_data.current_outgoing_frame->def.fin,
                .opcode = websocket->thread_data.current_outgoing_frame->def.opcode,
                .payload_length = websocket->thread_data.current_outgoing_frame->def.payload_length,
            };

            /* RFC-6455 Section 5.3 Client-to-Server Masking
             * Clients must mask payload with key derived from an unpredictable source of entropy. */
            if (!websocket->is_server) {
                frame.masked = true;
                /* TODO: faster source of random (but still seeded by device_random) */
                struct aws_byte_buf masking_key_buf = aws_byte_buf_from_empty_array(frame.masking_key, 4);
                err = aws_device_random_buffer(&masking_key_buf);
                if (err) {
                    AWS_LOGF_ERROR(
                        AWS_LS_HTTP_WEBSOCKET,
                        "id=%p: Failed to derive masking key, error %d (%s).",
                        (void *)websocket,
                        aws_last_error(),
                        aws_error_name(aws_last_error()));
                    goto error;
                }
            }

            err = aws_websocket_encoder_start_frame(&websocket->thread_data.encoder, &frame);
            if (err) {
                AWS_LOGF_ERROR(
                    AWS_LS_HTTP_WEBSOCKET,
                    "id=%p: Failed to start frame encoding, error %d (%s).",
                    (void *)websocket,
                    aws_last_error(),
                    aws_error_name(aws_last_error()));
                goto error;
            }

            AWS_LOGF_TRACE(
                AWS_LS_HTTP_WEBSOCKET,
                "id=%p: Start writing frame=%p opcode=%" PRIu8 "(%s) payload-length=%" PRIu64 ".",
                (void *)websocket,
                (void *)websocket->thread_data.current_outgoing_frame,
                websocket->thread_data.current_outgoing_frame->def.opcode,
                aws_websocket_opcode_str(websocket->thread_data.current_outgoing_frame->def.opcode),
                websocket->thread_data.current_outgoing_frame->def.payload_length);
        }

        err = aws_websocket_encoder_process(&websocket->thread_data.encoder, &io_msg->message_data);
        if (err) {
            AWS_LOGF_ERROR(
                AWS_LS_HTTP_WEBSOCKET,
                "id=%p: Frame encoding failed with error %d (%s).",
                (void *)websocket,
                aws_last_error(),
                aws_error_name(aws_last_error()));
            goto error;
        }

        if (aws_websocket_encoder_is_frame_in_progress(&websocket->thread_data.encoder)) {
            AWS_LOGF_TRACE(
                AWS_LS_HTTP_WEBSOCKET,
                "id=%p: Outgoing frame still in progress, but no more data can be written at this time.",
                (void *)websocket);
            break;
        }

        if (websocket->thread_data.current_outgoing_frame->def.opcode == AWS_WEBSOCKET_OPCODE_CLOSE) {
            wrote_close_frame = true;
        }

        s_destroy_outgoing_frame(websocket, websocket->thread_data.current_outgoing_frame, AWS_ERROR_SUCCESS);
        websocket->thread_data.current_outgoing_frame = NULL;

        if (wrote_close_frame) {
            break;
        }
    }

    /* If payload stream didn't have any bytes available to read right now, then the aws_io_message might be empty.
     * If this is the case schedule a task to try again in the future. */
    if (io_msg->message_data.len == 0) {
        AWS_LOGF_TRACE(
            AWS_LS_HTTP_WEBSOCKET,
            "id=%p: Reading from payload stream would block, will try again later.",
            (void *)websocket);

        if (!websocket->thread_data.is_waiting_on_payload_stream_task) {
            websocket->thread_data.is_waiting_on_payload_stream_task = true;

            /* Future Optimization Idea: Minimize work while we wait. Use some kind of backoff for the retry timing,
             * or have some way for stream to notify when more data is available. */
            aws_channel_schedule_task_now(websocket->channel_slot->channel, &websocket->waiting_on_payload_stream_task);
        }

        aws_mem_release(io_msg->allocator, io_msg);
        return;
    }

    /* Prepare to send aws_io_message up the channel.
     * Note that the write-completion callback may fire before send_message() returns */

    /* If CLOSE frame was written, that's the last data we'll write */
    if (wrote_close_frame) {
        s_stop_writing(websocket, AWS_ERROR_HTTP_WEBSOCKET_CLOSE_FRAME_SENT);
    }

    AWS_LOGF_TRACE(
        AWS_LS_HTTP_WEBSOCKET,
        "id=%p: Sending aws_io_message of size %zu in write direction.",
        (void *)websocket,
        io_msg->message_data.len);

    websocket->thread_data.is_waiting_for_write_completion = true;
    err = aws_channel_slot_send_message(websocket->channel_slot, io_msg, AWS_CHANNEL_DIR_WRITE);
    if (err) {
        websocket->thread_data.is_waiting_for_write_completion = false;
        AWS_LOGF_ERROR(
            AWS_LS_HTTP_WEBSOCKET,
            "id=%p: Failed to send message in write direction, error %d (%s).",
            (void *)websocket,
            aws_last_error(),
            aws_error_name(aws_last_error()));
        goto error;
    }

    /* Finish shutdown if we were waiting for the CLOSE frame to be written */
    if (wrote_close_frame && websocket->thread_data.is_shutting_down_and_waiting_for_close_frame_to_be_written) {
        AWS_LOGF_TRACE(
            AWS_LS_HTTP_WEBSOCKET, "id=%p: CLOSE frame sent, finishing handler shutdown sequence.", (void *)websocket);

        s_finish_shutdown(websocket);
    }

    return;

error:
    if (io_msg) {
        aws_mem_release(io_msg->allocator, io_msg);
    }

    s_shutdown_due_to_write_err(websocket, aws_last_error());
}