static void link_frame_received()

in src/link.c [317:655]


static void link_frame_received(void* context, AMQP_VALUE performative, uint32_t payload_size, const unsigned char* payload_bytes)
{
    LINK_INSTANCE* link_instance = (LINK_INSTANCE*)context;
    AMQP_VALUE descriptor = amqpvalue_get_inplace_descriptor(performative);

    if (is_attach_type_by_descriptor(descriptor))
    {
        ATTACH_HANDLE attach_handle;

        if (amqpvalue_get_attach(performative, &attach_handle) != 0)
        {
            LogError("Cannot get attach performative");
        }
        else
        {
            if ((link_instance->role == role_receiver) &&
                (attach_get_initial_delivery_count(attach_handle, &link_instance->delivery_count) != 0))
            {
                LogError("Cannot get initial delivery count");
                remove_all_pending_deliveries(link_instance, true);
                set_link_state(link_instance, LINK_STATE_DETACHED);
            }
            else
            {
                if (attach_get_max_message_size(attach_handle, &link_instance->peer_max_message_size) != 0)
                {
                    LogError("Could not retrieve peer_max_message_size from attach frame");
                }

                if ((link_instance->link_state == LINK_STATE_DETACHED) ||
                    (link_instance->link_state == LINK_STATE_HALF_ATTACHED_ATTACH_SENT))
                {
                    if (link_instance->role == role_receiver)
                    {
                        link_instance->current_link_credit = link_instance->max_link_credit;
                        send_flow(link_instance);
                    }
                    else
                    {
                        link_instance->current_link_credit = 0;
                    }

                    if (link_instance->link_state == LINK_STATE_DETACHED)
                    {
                        set_link_state(link_instance, LINK_STATE_HALF_ATTACHED_ATTACH_RECEIVED);
                    }
                    else
                    {
                        set_link_state(link_instance, LINK_STATE_ATTACHED);
                    }
                }
            }

            attach_destroy(attach_handle);
        }
    }
    else if (is_flow_type_by_descriptor(descriptor))
    {
        FLOW_HANDLE flow_handle;
        if (amqpvalue_get_flow(performative, &flow_handle) != 0)
        {
            LogError("Cannot get flow performative");
        }
        else
        {
            if (link_instance->role == role_sender)
            {
                delivery_number rcv_delivery_count;
                uint32_t rcv_link_credit;

                if (flow_get_link_credit(flow_handle, &rcv_link_credit) != 0)
                {
                    LogError("Cannot get link credit");
                    remove_all_pending_deliveries(link_instance, true);
                    set_link_state(link_instance, LINK_STATE_DETACHED);
                }
                else if (flow_get_delivery_count(flow_handle, &rcv_delivery_count) != 0)
                {
                    LogError("Cannot get delivery count");
                    remove_all_pending_deliveries(link_instance, true);
                    set_link_state(link_instance, LINK_STATE_DETACHED);
                }
                else
                {
                    link_instance->current_link_credit = rcv_delivery_count + rcv_link_credit - link_instance->delivery_count;
                    if (link_instance->current_link_credit > 0)
                    {
                        link_instance->on_link_flow_on(link_instance->callback_context);
                    }
                }
            }

            flow_destroy(flow_handle);
        }
    }
    else if (is_transfer_type_by_descriptor(descriptor))
    {
        if (link_instance->on_transfer_received != NULL)
        {
            TRANSFER_HANDLE transfer_handle;
            if (amqpvalue_get_transfer(performative, &transfer_handle) != 0)
            {
                LogError("Cannot get transfer performative");
            }
            else
            {
                AMQP_VALUE delivery_state;
                bool more;
                bool is_error;

                if (link_instance->current_link_credit <= RECEIVER_MIN_LINK_CREDIT)
                {
                    link_instance->current_link_credit = link_instance->max_link_credit;
                    send_flow(link_instance);
                }

                more = false;
                /* Attempt to get more flag, default to false */
                (void)transfer_get_more(transfer_handle, &more);
                is_error = false;

                if (transfer_get_delivery_id(transfer_handle, &link_instance->received_delivery_id) != 0)
                {
                    /* is this not a continuation transfer? */
                    if (link_instance->received_payload_size == 0)
                    {
                        LogError("Could not get the delivery Id from the transfer performative");
                        is_error = true;
                    }
                }

                if (!is_error)
                {
                    /* If this is a continuation transfer or if this is the first chunk of a multi frame transfer */
                    if ((link_instance->received_payload_size > 0) || more)
                    {
                        unsigned char* new_received_payload;;
                        size_t realloc_size = safe_add_size_t((size_t)link_instance->received_payload_size, payload_size);
                        if (realloc_size == SIZE_MAX ||
                            (new_received_payload = (unsigned char*)realloc(link_instance->received_payload, realloc_size)) == NULL)
                        {
                            LogError("Could not allocate memory for the received payload, size:%zu", realloc_size);
                        }
                        else
                        {
                            link_instance->received_payload = new_received_payload;
                            (void)memcpy(link_instance->received_payload + link_instance->received_payload_size, payload_bytes, payload_size);
                            link_instance->received_payload_size += payload_size;
                        }
                    }

                    if (!more)
                    {
                        const unsigned char* indicate_payload_bytes;
                        uint32_t indicate_payload_size;

                        link_instance->current_link_credit--;
                        link_instance->delivery_count++;
                        /* if no previously stored chunks then simply report the current payload */
                        if (link_instance->received_payload_size > 0)
                        {
                            indicate_payload_size = link_instance->received_payload_size;
                            indicate_payload_bytes = link_instance->received_payload;
                        }
                        else
                        {
                            indicate_payload_size = payload_size;
                            indicate_payload_bytes = payload_bytes;
                        }

                        delivery_state = link_instance->on_transfer_received(link_instance->callback_context, transfer_handle, indicate_payload_size, indicate_payload_bytes);

                        if (link_instance->received_payload_size > 0)
                        {
                            free(link_instance->received_payload);
                            link_instance->received_payload = NULL;
                            link_instance->received_payload_size = 0;
                        }

                        if (delivery_state != NULL)
                        {
                            if (send_disposition(link_instance, link_instance->received_delivery_id, delivery_state) != 0)
                            {
                                LogError("Cannot send disposition frame");
                            }

                            amqpvalue_destroy(delivery_state);
                        }
                    }
                }

                transfer_destroy(transfer_handle);
            }
        }
    }
    else if (is_disposition_type_by_descriptor(descriptor))
    {
        DISPOSITION_HANDLE disposition;
        if (amqpvalue_get_disposition(performative, &disposition) != 0)
        {
            LogError("Cannot get disposition performative");
        }
        else
        {
            delivery_number first;
            delivery_number last;

            if (disposition_get_first(disposition, &first) != 0)
            {
                LogError("Cannot get first field");
            }
            else
            {
                bool settled;

                if (disposition_get_last(disposition, &last) != 0)
                {
                    last = first;
                }

                if (disposition_get_settled(disposition, &settled) != 0)
                {
                    settled = false;
                }

                if (settled && 
                    link_instance->pending_deliveries != NULL)
                {
                    LIST_ITEM_HANDLE pending_delivery = singlylinkedlist_get_head_item(link_instance->pending_deliveries);
                    while (pending_delivery != NULL)
                    {
                        LIST_ITEM_HANDLE next_pending_delivery = singlylinkedlist_get_next_item(pending_delivery);
                        ASYNC_OPERATION_HANDLE pending_delivery_operation = (ASYNC_OPERATION_HANDLE)singlylinkedlist_item_get_value(pending_delivery);
                        if (pending_delivery_operation == NULL)
                        {
                            LogError("Cannot obtain pending delivery");
                            break;
                        }
                        else
                        {
                            DELIVERY_INSTANCE* delivery_instance = (DELIVERY_INSTANCE*)GET_ASYNC_OPERATION_CONTEXT(DELIVERY_INSTANCE, pending_delivery_operation);

                            if ((delivery_instance->delivery_id >= first) && (delivery_instance->delivery_id <= last))
                            {
                                AMQP_VALUE delivery_state;
                                if (disposition_get_state(disposition, &delivery_state) == 0)
                                {
                                    delivery_instance->on_delivery_settled(delivery_instance->callback_context, delivery_instance->delivery_id, LINK_DELIVERY_SETTLE_REASON_DISPOSITION_RECEIVED, delivery_state);
                                    async_operation_destroy(pending_delivery_operation);
                                }
                                else
                                {
                                    LogError("Failed getting the disposition state");
                                }

                                if (singlylinkedlist_remove(link_instance->pending_deliveries, pending_delivery) != 0)
                                {
                                    LogError("Cannot remove pending delivery");
                                    break;
                                }
                            }

                            pending_delivery = next_pending_delivery;
                        }
                    }
                }
            }

            disposition_destroy(disposition);
        }
    }
    else if (is_detach_type_by_descriptor(descriptor))
    {
        DETACH_HANDLE detach;

        /* Set link state appropriately based on whether we received detach condition */
        if (amqpvalue_get_detach(performative, &detach) != 0)
        {
            LogError("Cannot get detach performative");
        }
        else
        {
            bool closed = false;
            ERROR_HANDLE error;

            (void)detach_get_closed(detach, &closed);

            /* Received a detach while attached */
            if (link_instance->link_state == LINK_STATE_ATTACHED)
            {
                /* Respond with ack */
                if (send_detach(link_instance, closed, NULL) != 0)
                {
                    LogError("Failed sending detach frame");
                }
            }
            /* Received a closing detach after we sent a non-closing detach. */
            else if (closed &&
                ((link_instance->link_state == LINK_STATE_HALF_ATTACHED_ATTACH_SENT) || (link_instance->link_state == LINK_STATE_HALF_ATTACHED_ATTACH_RECEIVED)) &&
                !link_instance->is_closed)
            {

                /* In this case, we MUST signal that we closed by reattaching and then sending a closing detach.*/
                if (send_attach(link_instance, link_instance->name, 0, link_instance->role) != 0)
                {
                    LogError("Failed sending attach frame");
                }

                if (send_detach(link_instance, true, NULL) != 0)
                {
                    LogError("Failed sending detach frame");
                }
            }

            if (detach_get_error(detach, &error) != 0)
            {
                error = NULL;
            }
            remove_all_pending_deliveries(link_instance, true);
            // signal link detach received in order to handle cases like redirect
            if (link_instance->on_link_detach_received_event_subscription.on_link_detach_received != NULL)
            {
                link_instance->on_link_detach_received_event_subscription.on_link_detach_received(link_instance->on_link_detach_received_event_subscription.context, error);
            }

            if (error != NULL)
            {
                set_link_state(link_instance, LINK_STATE_ERROR);
                error_destroy(error);
            }
            else
            {
                set_link_state(link_instance, LINK_STATE_DETACHED);
            }

            detach_destroy(detach);
        }
    }
}