static void on_amqp_frame_received()

in src/connection.c [814:1108]


static void on_amqp_frame_received(void* context, uint16_t channel, AMQP_VALUE performative, const unsigned char* payload_bytes, uint32_t payload_size)
{
    CONNECTION_HANDLE connection = (CONNECTION_HANDLE)context;

    (void)channel;

    if (tickcounter_get_current_ms(connection->tick_counter, &connection->last_frame_received_time) != 0)
    {
        LogError("Cannot get tickcounter value");
        close_connection_with_error(connection, "amqp:internal-error", "cannot get current tick count", NULL);
    }
    else
    {
        if (connection->is_underlying_io_open)
        {
            switch (connection->connection_state)
            {
            default:
                if (performative == NULL)
                {
                    /* Codes_S_R_S_CONNECTION_01_223: [If the on_endpoint_frame_received is called with a NULL performative then the connection shall be closed with the error condition amqp:internal-error and an implementation defined error description.] */
                    close_connection_with_error(connection, "amqp:internal-error", "connection_endpoint_frame_received::NULL performative", NULL);
                    LogError("connection_endpoint_frame_received::NULL performative");
                }
                else
                {
                    AMQP_VALUE descriptor = amqpvalue_get_inplace_descriptor(performative);

                    if (connection->is_trace_on == 1)
                    {
                        log_incoming_frame(performative);
                    }

                    if (is_open_type_by_descriptor(descriptor))
                    {
                        if (channel != 0)
                        {
                            /* Codes_S_R_S_CONNECTION_01_006: [The open frame can only be sent on channel 0.] */
                            /* Codes_S_R_S_CONNECTION_01_222: [If an Open frame is received in a manner violating the ISO specification, the connection shall be closed with condition amqp:not-allowed and description being an implementation defined string.] */
                            close_connection_with_error(connection, "amqp:not-allowed", "OPEN frame received on a channel that is not 0", NULL);
                            LogError("OPEN frame received on a channel that is not 0");
                        }

                        if (connection->connection_state == CONNECTION_STATE_OPENED)
                        {
                            /* Codes_S_R_S_CONNECTION_01_239: [If an Open frame is received in the Opened state the connection shall be closed with condition amqp:illegal-state and description being an implementation defined string.] */
                            close_connection_with_error(connection, "amqp:illegal-state", "OPEN frame received in the OPENED state", NULL);
                            LogError("OPEN frame received in the OPENED state");
                        }
                        else if ((connection->connection_state == CONNECTION_STATE_OPEN_SENT) ||
                            (connection->connection_state == CONNECTION_STATE_HDR_EXCH))
                        {
                            OPEN_HANDLE open_handle;
                            if (amqpvalue_get_open(performative, &open_handle) != 0)
                            {
                                /* Codes_S_R_S_CONNECTION_01_143: [If any of the values in the received open frame are invalid then the connection shall be closed.] */
                                /* Codes_S_R_S_CONNECTION_01_220: [The error amqp:invalid-field shall be set in the error.condition field of the CLOSE frame.] */
                                close_connection_with_error(connection, "amqp:invalid-field", "connection_endpoint_frame_received::failed parsing OPEN frame", NULL);
                                LogError("connection_endpoint_frame_received::failed parsing OPEN frame");
                            }
                            else
                            {
                                if (open_get_idle_time_out(open_handle, &connection->remote_idle_timeout) == 0)
                                {
                                    /* since we obtained the remote_idle_timeout, compute at what millisecond we should send the empty frame */
                                    connection->remote_idle_timeout_send_frame_millisecond = (milliseconds)(connection->idle_timeout_empty_frame_send_ratio * connection->remote_idle_timeout);
                                }

                                if ((open_get_max_frame_size(open_handle, &connection->remote_max_frame_size) != 0) ||
                                    /* Codes_S_R_S_CONNECTION_01_167: [Both peers MUST accept frames of up to 512 (MIN-MAX-FRAME-SIZE) octets.] */
                                    (connection->remote_max_frame_size < 512))
                                {
                                    /* Codes_S_R_S_CONNECTION_01_143: [If any of the values in the received open frame are invalid then the connection shall be closed.] */
                                    /* Codes_S_R_S_CONNECTION_01_220: [The error amqp:invalid-field shall be set in the error.condition field of the CLOSE frame.] */
                                    close_connection_with_error(connection, "amqp:invalid-field", "connection_endpoint_frame_received::failed parsing OPEN frame", NULL);
                                    LogError("connection_endpoint_frame_received::failed parsing OPEN frame");
                                }
                                else
                                {
                                    if (connection->connection_state == CONNECTION_STATE_OPEN_SENT)
                                    {
                                        connection_set_state(connection, CONNECTION_STATE_OPENED);
                                    }
                                    else
                                    {
                                        if (send_open_frame(connection) != 0)
                                        {
                                            connection_set_state(connection, CONNECTION_STATE_END);
                                        }
                                        else
                                        {
                                            connection_set_state(connection, CONNECTION_STATE_OPENED);
                                        }
                                    }
                                }

                                open_destroy(open_handle);
                            }
                        }
                        else
                        {
                            /* do nothing for now ... */
                        }
                    }
                    else if (is_close_type_by_descriptor(descriptor))
                    {
                        /* Codes_S_R_S_CONNECTION_01_242: [The connection module shall accept CLOSE frames even if they have extra payload bytes besides the Close performative.] */

                        /* Codes_S_R_S_CONNECTION_01_225: [HDR_RCVD HDR OPEN] */
                        if ((connection->connection_state == CONNECTION_STATE_HDR_RCVD) ||
                            /* Codes_S_R_S_CONNECTION_01_227: [HDR_EXCH OPEN OPEN] */
                            (connection->connection_state == CONNECTION_STATE_HDR_EXCH) ||
                            /* Codes_S_R_S_CONNECTION_01_228: [OPEN_RCVD OPEN *] */
                            (connection->connection_state == CONNECTION_STATE_OPEN_RCVD) ||
                            /* Codes_S_R_S_CONNECTION_01_235: [CLOSE_SENT - * TCP Close for Write] */
                            (connection->connection_state == CONNECTION_STATE_CLOSE_SENT) ||
                            /* Codes_S_R_S_CONNECTION_01_236: [DISCARDING - * TCP Close for Write] */
                            (connection->connection_state == CONNECTION_STATE_DISCARDING))
                        {
                            if (xio_close(connection->io, NULL, NULL) != 0)
                            {
                                LogError("xio_close failed");
                            }
                        }
                        else
                        {
                            CLOSE_HANDLE close_handle;

                            /* Codes_S_R_S_CONNECTION_01_012: [A close frame MAY be received on any channel up to the maximum channel number negotiated in open.] */
                            if (channel > connection->channel_max)
                            {
                                close_connection_with_error(connection, "amqp:invalid-field", "connection_endpoint_frame_received::failed parsing CLOSE frame", NULL);
                                LogError("connection_endpoint_frame_received::failed parsing CLOSE frame");
                            }
                            else
                            {
                                if (amqpvalue_get_close(performative, &close_handle) != 0)
                                {
                                    close_connection_with_error(connection, "amqp:invalid-field", "connection_endpoint_frame_received::failed parsing CLOSE frame", NULL);
                                    LogError("connection_endpoint_frame_received::failed parsing CLOSE frame");
                                }
                                else
                                {
                                    ERROR_HANDLE error;

                                    if (close_get_error(close_handle, &error) != 0)
                                    {
                                        error = NULL;
                                    }

                                    close_destroy(close_handle);

                                    connection_set_state(connection, CONNECTION_STATE_CLOSE_RCVD);

                                    if (send_close_frame(connection, NULL) != 0)
                                    {
                                        LogError("Cannot send CLOSE frame");
                                    }

                                    /* Codes_S_R_S_CONNECTION_01_214: [If the close frame cannot be constructed or sent, the connection shall be closed and set to the END state.] */
                                    if (xio_close(connection->io, NULL, NULL) != 0)
                                    {
                                        LogError("xio_close failed");
                                    }

                                    connection_set_state(connection, CONNECTION_STATE_END);

                                    if (connection->on_connection_close_received_event_subscription.on_connection_close_received != NULL)
                                    {
                                        connection->on_connection_close_received_event_subscription.on_connection_close_received(connection->on_connection_close_received_event_subscription.context, error);
                                    }

                                    error_destroy(error);
                                }
                            }
                        }
                    }
                    else
                    {
                        uint64_t performative_ulong;

                        if (amqpvalue_get_ulong(descriptor, &performative_ulong) != 0)
                        {
                            LogError("Failed getting ulong amqp performative");
                        }
                        else
                        {
                            switch (performative_ulong)
                            {
                            default:
                                LogError("Bad performative: %02x", (unsigned int)performative_ulong);
                                break;

                            case AMQP_BEGIN:
                            {
                                BEGIN_HANDLE begin;

                                if (amqpvalue_get_begin(performative, &begin) != 0)
                                {
                                    LogError("Cannot get begin performative");
                                }
                                else
                                {
                                    uint16_t remote_channel;
                                    ENDPOINT_HANDLE new_endpoint = NULL;
                                    bool remote_begin = false;

                                    if (begin_get_remote_channel(begin, &remote_channel) != 0)
                                    {
                                        remote_begin = true;
                                        if (connection->on_new_endpoint != NULL)
                                        {
                                            new_endpoint = connection_create_endpoint(connection);
                                            if (!connection->on_new_endpoint(connection->on_new_endpoint_callback_context, new_endpoint))
                                            {
                                                connection_destroy_endpoint(new_endpoint);
                                                new_endpoint = NULL;
                                            }
                                        }
                                    }

                                    if (!remote_begin)
                                    {
                                        ENDPOINT_INSTANCE* session_endpoint = find_session_endpoint_by_outgoing_channel(connection, remote_channel);
                                        if (session_endpoint == NULL)
                                        {
                                            LogError("Cannot create session endpoint");
                                        }
                                        else
                                        {
                                            session_endpoint->incoming_channel = channel;
                                            session_endpoint->on_endpoint_frame_received(session_endpoint->callback_context, performative, payload_size, payload_bytes);
                                        }
                                    }
                                    else
                                    {
                                        if (new_endpoint != NULL)
                                        {
                                            new_endpoint->incoming_channel = channel;
                                            new_endpoint->on_endpoint_frame_received(new_endpoint->callback_context, performative, payload_size, payload_bytes);
                                        }
                                    }

                                    begin_destroy(begin);
                                }

                                break;
                            }

                            case AMQP_FLOW:
                            case AMQP_TRANSFER:
                            case AMQP_DISPOSITION:
                            case AMQP_END:
                            case AMQP_ATTACH:
                            case AMQP_DETACH:
                            {
                                ENDPOINT_INSTANCE* session_endpoint = find_session_endpoint_by_incoming_channel(connection, channel);
                                if (session_endpoint == NULL)
                                {
                                    LogError("Cannot find session endpoint for channel %u", (unsigned int)channel);
                                }
                                else
                                {
                                    session_endpoint->on_endpoint_frame_received(session_endpoint->callback_context, performative, payload_size, payload_bytes);
                                }

                                break;
                            }
                            }
                        }
                    }
                }
                break;

            case CONNECTION_STATE_START:
                /* Codes_S_R_S_CONNECTION_01_224: [START HDR HDR] */
            case CONNECTION_STATE_HDR_SENT:
                /* Codes_S_R_S_CONNECTION_01_226: [HDR_SENT OPEN HDR] */
            case CONNECTION_STATE_OPEN_PIPE:
                /* Codes_S_R_S_CONNECTION_01_230: [OPEN_PIPE ** HDR] */
            case CONNECTION_STATE_OC_PIPE:
                /* Codes_S_R_S_CONNECTION_01_232: [OC_PIPE - HDR TCP Close for Write] */
            case CONNECTION_STATE_CLOSE_RCVD:
                /* Codes_S_R_S_CONNECTION_01_234: [CLOSE_RCVD * - TCP Close for Read] */
            case CONNECTION_STATE_END:
                /* Codes_S_R_S_CONNECTION_01_237: [END - - TCP Close] */
                if (xio_close(connection->io, NULL, NULL) != 0)
                {
                    LogError("xio_close failed");
                }
                break;
            }
        }
    }
}