static void on_frame_received()

in src/session.c [395:743]


static void on_frame_received(void* context, AMQP_VALUE performative, uint32_t payload_size, const unsigned char* payload_bytes)
{
    SESSION_INSTANCE* session_instance = (SESSION_INSTANCE*)context;
    AMQP_VALUE descriptor = amqpvalue_get_inplace_descriptor(performative);

    if (is_begin_type_by_descriptor(descriptor))
    {
        BEGIN_HANDLE begin_handle;

        if (amqpvalue_get_begin(performative, &begin_handle) != 0)
        {
            connection_close(session_instance->connection, "amqp:decode-error", "Cannot decode BEGIN frame", NULL);
        }
        else
        {
            if ((begin_get_incoming_window(begin_handle, &session_instance->remote_incoming_window) != 0) ||
                (begin_get_next_outgoing_id(begin_handle, &session_instance->next_incoming_id) != 0))
            {
                /* error */
                begin_destroy(begin_handle);
                session_set_state(session_instance, SESSION_STATE_DISCARDING);
                connection_close(session_instance->connection, "amqp:decode-error", "Cannot get incoming windows and next outgoing id", NULL);
            }
            else
            {
                begin_destroy(begin_handle);

                if (session_instance->session_state == SESSION_STATE_BEGIN_SENT)
                {
                    session_set_state(session_instance, SESSION_STATE_MAPPED);
                }
                else if(session_instance->session_state == SESSION_STATE_UNMAPPED)
                {
                    session_set_state(session_instance, SESSION_STATE_BEGIN_RCVD);
                    if (send_begin(session_instance) != 0)
                    {
                        connection_close(session_instance->connection, "amqp:internal-error", "Failed sending BEGIN frame", NULL);
                        session_set_state(session_instance, SESSION_STATE_DISCARDING);
                    }
                    else
                    {
                        session_set_state(session_instance, SESSION_STATE_MAPPED);
                    }
                }
            }
        }
    }
    else if (is_attach_type_by_descriptor(descriptor))
    {
        const char* name = NULL;
        ATTACH_HANDLE attach_handle;

        if (amqpvalue_get_attach(performative, &attach_handle) != 0)
        {
            end_session_with_error(session_instance, "amqp:decode-error", "Cannot decode ATTACH frame");
        }
        else
        {
            role role;
            AMQP_VALUE source;
            AMQP_VALUE target;
            fields properties;

            if (attach_get_name(attach_handle, &name) != 0)
            {
                end_session_with_error(session_instance, "amqp:decode-error", "Cannot get link name from ATTACH frame");
            }
            else if (attach_get_role(attach_handle, &role) != 0)
            {
                end_session_with_error(session_instance, "amqp:decode-error", "Cannot get link role from ATTACH frame");
            }
            else
            {
                LINK_ENDPOINT_INSTANCE* link_endpoint;

                if (attach_get_source(attach_handle, &source) != 0)
                {
                    source = NULL;
                }
                if (attach_get_target(attach_handle, &target) != 0)
                {
                    target = NULL;
                }
                if (attach_get_properties(attach_handle, &properties) != 0)
                {
                    properties = NULL;
                }

                link_endpoint = find_link_endpoint_by_name(session_instance, name);
                if (link_endpoint == NULL)
                {
                    /* new link attach */
                    if (session_instance->on_link_attached != NULL)
                    {
                        LINK_ENDPOINT_HANDLE new_link_endpoint = session_create_link_endpoint(session_instance, name);
                        if (new_link_endpoint == NULL)
                        {
                            end_session_with_error(session_instance, "amqp:internal-error", "Cannot create link endpoint");
                        }
                        else if (attach_get_handle(attach_handle, &new_link_endpoint->input_handle) != 0)
                        {
                            end_session_with_error(session_instance, "amqp:decode-error", "Cannot get input handle from ATTACH frame");
                        }
                        else
                        {
                            new_link_endpoint->link_endpoint_state = LINK_ENDPOINT_STATE_ATTACHED;

                            if (!session_instance->on_link_attached(session_instance->on_link_attached_callback_context, new_link_endpoint, name, role, source, target, properties))
                            {
                                remove_link_endpoint(new_link_endpoint);
                                free_link_endpoint(new_link_endpoint);
                                new_link_endpoint = NULL;
                            }
                            else
                            {
                                if (new_link_endpoint->frame_received_callback != NULL)
                                {
                                    new_link_endpoint->frame_received_callback(new_link_endpoint->callback_context, performative, payload_size, payload_bytes);
                                }
                            }
                        }
                    }
                }
                else if (link_endpoint->link_endpoint_state != LINK_ENDPOINT_STATE_DETACHING)
                {
                    if (attach_get_handle(attach_handle, &link_endpoint->input_handle) != 0)
                    {
                        end_session_with_error(session_instance, "amqp:decode-error", "Cannot get input handle from ATTACH frame");
                    }
                    else
                    {
                        link_endpoint->link_endpoint_state = LINK_ENDPOINT_STATE_ATTACHED;

                        if (session_instance->on_link_attached != NULL)
                        {
                            if (!session_instance->on_link_attached(session_instance->on_link_attached_callback_context, link_endpoint, name, role, source, target, properties))
                            {
                                link_endpoint->link_endpoint_state = LINK_ENDPOINT_STATE_DETACHING;
                            }
                        }

                        link_endpoint->frame_received_callback(link_endpoint->callback_context, performative, payload_size, payload_bytes);
                    }
                }
            }

            attach_destroy(attach_handle);
        }
    }
    else if (is_detach_type_by_descriptor(descriptor))
    {
        DETACH_HANDLE detach_handle;

        if (amqpvalue_get_detach(performative, &detach_handle) != 0)
        {
            end_session_with_error(session_instance, "amqp:decode-error", "Cannot decode DETACH frame");
        }
        else
        {
            uint32_t remote_handle;
            if (detach_get_handle(detach_handle, &remote_handle) != 0)
            {
                end_session_with_error(session_instance, "amqp:decode-error", "Cannot get handle from DETACH frame");

                detach_destroy(detach_handle);
            }
            else
            {
                LINK_ENDPOINT_INSTANCE* link_endpoint;
                detach_destroy(detach_handle);

                link_endpoint = find_link_endpoint_by_input_handle(session_instance, remote_handle);
                if (link_endpoint == NULL)
                {
                    end_session_with_error(session_instance, "amqp:session:unattached-handle", "");
                }
                else
                {
                    if (link_endpoint->link_endpoint_state != LINK_ENDPOINT_STATE_DETACHING)
                    {
                        link_endpoint->link_endpoint_state = LINK_ENDPOINT_STATE_DETACHING;
                        link_endpoint->frame_received_callback(link_endpoint->callback_context, performative, payload_size, payload_bytes);
                    }
                    else
                    {
                        /* remove the link endpoint */
                        remove_link_endpoint(link_endpoint);
                        free_link_endpoint(link_endpoint);
                    }
                }
            }
        }
    }
    else if (is_flow_type_by_descriptor(descriptor))
    {
        FLOW_HANDLE flow_handle;

        if (amqpvalue_get_flow(performative, &flow_handle) != 0)
        {
            end_session_with_error(session_instance, "amqp:decode-error", "Cannot decode FLOW frame");
        }
        else
        {
            uint32_t remote_handle;
            transfer_number flow_next_incoming_id;
            uint32_t flow_incoming_window;

            if (flow_get_next_incoming_id(flow_handle, &flow_next_incoming_id) != 0)
            {
                /*
                If the next-incoming-id field of the flow frame is not set,
                then remote-incomingwindow is computed as follows:
                initial-outgoing-id(endpoint) + incoming-window(flow) - next-outgoing-id(endpoint)
                */
                flow_next_incoming_id = session_instance->next_outgoing_id;
            }

            if ((flow_get_next_outgoing_id(flow_handle, &session_instance->next_incoming_id) != 0) ||
                (flow_get_incoming_window(flow_handle, &flow_incoming_window) != 0))
            {
                flow_destroy(flow_handle);

                end_session_with_error(session_instance, "amqp:decode-error", "Cannot decode FLOW frame");
            }
            else
            {
                LINK_ENDPOINT_INSTANCE* link_endpoint_instance = NULL;
                size_t i;

                session_instance->remote_incoming_window = flow_next_incoming_id + flow_incoming_window - session_instance->next_outgoing_id;

                if (flow_get_handle(flow_handle, &remote_handle) == 0)
                {
                    link_endpoint_instance = find_link_endpoint_by_input_handle(session_instance, remote_handle);
                }

                flow_destroy(flow_handle);

                if (link_endpoint_instance != NULL)
                {
                    if (link_endpoint_instance->link_endpoint_state != LINK_ENDPOINT_STATE_DETACHING)
                    {
                        link_endpoint_instance->frame_received_callback(link_endpoint_instance->callback_context, performative, payload_size, payload_bytes);
                    }
                }

                i = 0;
                while ((session_instance->remote_incoming_window > 0) && (i < session_instance->link_endpoint_count))
                {
                    /* notify the caller that it can send here */
                    if (session_instance->link_endpoints[i]->link_endpoint_state != LINK_ENDPOINT_STATE_DETACHING &&
                        session_instance->link_endpoints[i]->on_session_flow_on != NULL)
                    {
                        session_instance->link_endpoints[i]->on_session_flow_on(session_instance->link_endpoints[i]->callback_context);
                    }

                    i++;
                }
            }
        }
    }
    else if (is_transfer_type_by_descriptor(descriptor))
    {
        TRANSFER_HANDLE transfer_handle;

        if (amqpvalue_get_transfer(performative, &transfer_handle) != 0)
        {
            end_session_with_error(session_instance, "amqp:decode-error", "Cannot decode TRANSFER frame");
        }
        else
        {
            uint32_t remote_handle;
            delivery_number delivery_id;

            transfer_get_delivery_id(transfer_handle, &delivery_id);
            if (transfer_get_handle(transfer_handle, &remote_handle) != 0)
            {
                transfer_destroy(transfer_handle);
                end_session_with_error(session_instance, "amqp:decode-error", "Cannot get handle from TRANSFER frame");
            }
            else
            {
                LINK_ENDPOINT_INSTANCE* link_endpoint;
                transfer_destroy(transfer_handle);

                session_instance->next_incoming_id++;
                session_instance->remote_outgoing_window--;
                session_instance->incoming_window--;

                link_endpoint = find_link_endpoint_by_input_handle(session_instance, remote_handle);
                if (link_endpoint == NULL)
                {
                    end_session_with_error(session_instance, "amqp:session:unattached-handle", "");
                }
                else
                {
                    if (link_endpoint->link_endpoint_state != LINK_ENDPOINT_STATE_DETACHING)
                    {
                        link_endpoint->frame_received_callback(link_endpoint->callback_context, performative, payload_size, payload_bytes);
                    }
                }

                if (session_instance->incoming_window == 0)
                {
                    session_instance->incoming_window = session_instance->desired_incoming_window;
                    send_flow(session_instance);
                }
            }
        }
    }
    else if (is_disposition_type_by_descriptor(descriptor))
    {
        uint32_t i;

        for (i = 0; i < session_instance->link_endpoint_count; i++)
        {
            LINK_ENDPOINT_INSTANCE* link_endpoint = session_instance->link_endpoints[i];
            if (link_endpoint->link_endpoint_state != LINK_ENDPOINT_STATE_DETACHING)
            {
                link_endpoint->frame_received_callback(link_endpoint->callback_context, performative, payload_size, payload_bytes);
            }
        }
    }
    else if (is_end_type_by_descriptor(descriptor))
    {
        END_HANDLE end_handle;

        if (amqpvalue_get_end(performative, &end_handle) != 0)
        {
            end_session_with_error(session_instance, "amqp:decode-error", "Cannot decode END frame");
        }
        else
        {
            end_destroy(end_handle);
            if ((session_instance->session_state != SESSION_STATE_END_RCVD) &&
                (session_instance->session_state != SESSION_STATE_DISCARDING))
            {
                session_set_state(session_instance, SESSION_STATE_END_RCVD);
                if (send_end_frame(session_instance, NULL) != 0)
                {
                    /* fatal error */
                    (void)connection_close(session_instance->connection, "amqp:internal-error", "Cannot send END frame.", NULL);
                }

                session_set_state(session_instance, SESSION_STATE_DISCARDING);
            }
        }
    }
}