static void on_underlying_io_bytes_received()

in src/header_detect_io.c [147:355]


static void on_underlying_io_bytes_received(void* context, const unsigned char* buffer, size_t size)
{
    if (context == NULL)
    {
        /* Codes_SRS_HEADER_DETECT_IO_01_050: [ If `context` is NULL, `on_underlying_io_bytes_received` shall do nothing. ]*/
        LogError("NULL context");
    }
    else
    {
        HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)context;

        if ((buffer == NULL) ||
            (size == 0))
        {
            switch (header_detect_io_instance->io_state)
            {
            default:
                break;

            case IO_STATE_OPEN:
                /* Codes_SRS_HEADER_DETECT_IO_01_051: [ If `buffer` is NULL or `size` is 0 while the IO is OPEN an error shall be indicated by calling `on_io_error`. ]*/
                indicate_error(header_detect_io_instance);
                break;
            }
        }
        else
        {
            while (size > 0)
            {
                switch (header_detect_io_instance->io_state)
                {
                default:
                    break;

                case IO_STATE_OPENING_UNDERLYING_IO:
                    /* Codes_SRS_HEADER_DETECT_IO_01_049: [ When `on_underlying_io_bytes_received` is called while opening the underlying IO (before the underlying open complete is received), an error shall be indicated by calling `on_io_open_complete` with `IO_OPEN_ERROR`. ]*/
                    indicate_open_complete(header_detect_io_instance, IO_OPEN_ERROR);
                    size = 0;
                    break;

                case IO_STATE_OPENING_DETECTED_IO:
                {
                    /* Codes_SRS_HEADER_DETECT_IO_01_087: [ If `on_underlying_io_bytes_received` is called while waiting for the detected IO to complete its open, the bytes shall be given to the last created IO by calling its `on_bytes_received` callback that was filled into the `on_bytes_received` member of `SERVER_PROTOCOL_IO_CONFIG`. ]*/
                    CHAINED_IO* chained_io = (CHAINED_IO*)(((unsigned char*)header_detect_io_instance->last_io) - offsetof(CHAINED_IO, detected_io));
                    (chained_io->on_bytes_received)(chained_io->on_bytes_received_context, buffer, size);
                    size = 0;
                    break;
                }

                case IO_STATE_WAIT_FOR_HEADER:
                {
                    size_t i;
                    bool has_one_match = false;

                    /* check if any of the headers matches */
                    for (i = 0; i < header_detect_io_instance->header_detect_entry_count; i++)
                    {
                        /* Codes_SRS_HEADER_DETECT_IO_01_067: [ When `on_underlying_io_bytes_received` is called while waiting for header bytes (after the underlying IO was open), the bytes shall be matched against the entries provided in the configuration passed to `header_detect_io_create`. ]*/
                        /* Codes_SRS_HEADER_DETECT_IO_01_068: [ Header bytes shall be accepted in multiple `on_underlying_io_bytes_received` calls. ]*/
                        if ((header_detect_io_instance->header_pos < header_detect_io_instance->header_detect_entries[i].header_size) &&
                            (header_detect_io_instance->header_detect_entries[i].header_bytes[header_detect_io_instance->header_pos] == buffer[0]))
                        {
                            has_one_match = true;

                            if (header_detect_io_instance->header_pos + 1 == header_detect_io_instance->header_detect_entries[i].header_size)
                            {
                                /* recognized one header */
                                if (xio_send(*header_detect_io_instance->last_io, header_detect_io_instance->header_detect_entries[i].header_bytes, header_detect_io_instance->header_detect_entries[i].header_size, on_send_complete, header_detect_io_instance) != 0)
                                {
                                    LogError("Failed sending header");
                                    header_detect_io_instance->io_state = IO_STATE_NOT_OPEN;
                                    indicate_open_complete(header_detect_io_instance, IO_OPEN_ERROR);
                                }
                                else
                                {
                                    // wait for send complete and then start the detected IO open
                                    if (header_detect_io_instance->header_detect_entries[i].io_interface_description == NULL)
                                    {
                                        header_detect_io_instance->io_state = IO_STATE_OPEN;
                                        indicate_open_complete(header_detect_io_instance, IO_OPEN_OK);
                                    }
                                    else
                                    {
                                        SERVER_PROTOCOL_IO_CONFIG server_protocol_io_config;
                                        CHAINED_IO* chained_io = (CHAINED_IO*)malloc(sizeof(CHAINED_IO));
                                        if (chained_io == NULL)
                                        {
                                            LogError("Cannot allocate memory for chained IO");
                                            internal_close(header_detect_io_instance);
                                            indicate_open_complete(header_detect_io_instance, IO_OPEN_ERROR);
                                        }
                                        else
                                        {
                                            /* Codes_SRS_HEADER_DETECT_IO_01_076: [ If no detected IO was created then the underlying IO in the `SERVER_PROTOCOL_IO_CONFIG` structure shall be set to the `underlying_io` passed in the create arguments. ]*/
                                            /* Codes_SRS_HEADER_DETECT_IO_01_075: [ The underlying IO in the `SERVER_PROTOCOL_IO_CONFIG` structure shall be set to the last detected IO that was created if any. ]*/
                                            server_protocol_io_config.underlying_io = *header_detect_io_instance->last_io;
                                            server_protocol_io_config.on_bytes_received = &chained_io->on_bytes_received;
                                            server_protocol_io_config.on_bytes_received_context = &chained_io->on_bytes_received_context;

                                            /* Codes_SRS_HEADER_DETECT_IO_01_069: [ If a header match was detected on an entry with a non-NULL io handle, a new IO associated shall be created by calling `xio_create`. ]*/
                                            /* Codes_SRS_HEADER_DETECT_IO_01_073: [ The interface description passed to `xio_create` shall be the interface description associated with the detected header. ]*/
                                            /* Codes_SRS_HEADER_DETECT_IO_01_074: [ The IO create parameters shall be a `SERVER_PROTOCOL_IO_CONFIG` structure. ]*/
                                            chained_io->detected_io = xio_create(header_detect_io_instance->header_detect_entries[i].io_interface_description, &server_protocol_io_config);
                                            if (chained_io->detected_io == NULL)
                                            {
                                                /* Codes_SRS_HEADER_DETECT_IO_01_077: [ If `xio_create` fails the header detect IO shall be closed and an error shall be indicated by calling `on_io_open_complete` with `IO_OPEN_ERROR`. ]*/
                                                LogError("Creating detected IO failed");
                                                free(chained_io);
                                                internal_close(header_detect_io_instance);
                                                indicate_open_complete(header_detect_io_instance, IO_OPEN_ERROR);
                                            }
                                            else
                                            {
                                                /* Codes_SRS_HEADER_DETECT_IO_01_086: [ The newly created IO shall be added to the chain of IOs by calling `singlylinkedlist_add`. ]*/
                                                LIST_ITEM_HANDLE new_list_item = singlylinkedlist_add(header_detect_io_instance->chained_io_list, chained_io);
                                                if (new_list_item == NULL)
                                                {
                                                    /* Codes_SRS_HEADER_DETECT_IO_01_084: [ If `singlylinkedlist_add` fails the newly created IO shall be destroyed and an error shall be indicated by calling `on_io_open_complete` with `IO_OPEN_ERROR`. ]*/
                                                    LogError("Cannot add detected IO to list");
                                                    xio_destroy(chained_io->detected_io);
                                                    free(chained_io);
                                                    internal_close(header_detect_io_instance);
                                                    indicate_open_complete(header_detect_io_instance, IO_OPEN_ERROR);
                                                }
                                                else
                                                {
                                                    /* Codes_SRS_HEADER_DETECT_IO_01_063: [ `header_detect_io_close_async` shall close the last detected IO that was created as a result of matching a header. ]*/
                                                    XIO_HANDLE* previous_last_io = header_detect_io_instance->last_io;
                                                    header_detect_io_instance->last_io = &chained_io->detected_io;

                                                    /* Codes_SRS_HEADER_DETECT_IO_01_083: [ The header detect IO shall wait for opening of the detected IO (signaled by the `on_underlying_io_open_complete`). ]*/
                                                    header_detect_io_instance->io_state = IO_STATE_OPENING_DETECTED_IO;

                                                    /* Codes_SRS_HEADER_DETECT_IO_01_078: [ The newly create IO shall be open by calling `xio_open`. ]*/
                                                    /* Codes_SRS_HEADER_DETECT_IO_01_079: [ The `on_io_open_complete` callback passed to `xio_open` shall be `on_underlying_io_open_complete`. ]*/
                                                    /* Codes_SRS_HEADER_DETECT_IO_01_080: [ The `on_bytes_received` callback passed to `xio_open` shall be `on_underlying_io_bytes_received`. ]*/
                                                    /* Codes_SRS_HEADER_DETECT_IO_01_081: [ The `on_io_error` callback passed to `xio_open` shall be `on_underlying_io_error`. ]*/
                                                    if (xio_open(chained_io->detected_io, on_underlying_io_open_complete, header_detect_io_instance, on_underlying_io_bytes_received, header_detect_io_instance, on_underlying_io_error, header_detect_io_instance) != 0)
                                                    {
                                                        /* Codes_SRS_HEADER_DETECT_IO_01_082: [ If `xio_open` fails the header detect IO shall be closed and an error shall be indicated by calling `on_io_open_complete` with `IO_OPEN_ERROR`. ]*/
                                                        LogError("Opening detected IO failed");
                                                        if (singlylinkedlist_remove(header_detect_io_instance->chained_io_list, new_list_item) != 0)
                                                        {
                                                            LogError("Cannot remove chained IO from list");
                                                        }

                                                        xio_destroy(chained_io->detected_io);
                                                        free(chained_io);
                                                        header_detect_io_instance->last_io = previous_last_io;
                                                        internal_close(header_detect_io_instance);
                                                        indicate_open_complete(header_detect_io_instance, IO_OPEN_ERROR);
                                                    }
                                                    else
                                                    {
                                                        // all OK
                                                    }
                                                }
                                            }
                                        }
                                    }
                                }

                                break;
                            }
                        }
                    }

                    if (has_one_match)
                    {
                        if (header_detect_io_instance->io_state == IO_STATE_OPENING_DETECTED_IO)
                        {
                            header_detect_io_instance->header_pos = 0;
                        }
                        else
                        {
                            header_detect_io_instance->header_pos++;
                        }

                        size--;
                        buffer++;
                    }
                    else
                    {
                        /* all header matches failed, we can't proceed, send back to the peer the first header we know of, */
                        /* then close as per spec.  We do not care if we fail sending */
                        if (xio_send(header_detect_io_instance->underlying_io, header_detect_io_instance->header_detect_entries[0].header_bytes, header_detect_io_instance->header_detect_entries[0].header_size, unchecked_on_send_complete, NULL) != 0)
                        {
                            LogError("Failed sending header");
                        }

                        internal_close(header_detect_io_instance);
                        indicate_open_complete(header_detect_io_instance, IO_OPEN_ERROR);
                        size = 0;
                    }

                    break;
                }

                case IO_STATE_OPEN:
                    /* Codes_SRS_HEADER_DETECT_IO_01_089: [ If `on_underlying_io_bytes_received` is called while header detect IO is OPEN the bytes shall be given to the user via the `on_bytes_received` callback that was the `on_bytes_received` callback passed to `header_detect_io_open_async`. ]*/
                    /* Codes_SRS_HEADER_DETECT_IO_01_090: [ If no detected IOs were created and `on_underlying_io_bytes_received` is called while header detect IO is OPEN, the `on_bytes_received` callback passed to `header_detect_io_open_async` shall be called to indicate the bytes as received. ]*/
                    header_detect_io_instance->on_bytes_received(header_detect_io_instance->on_bytes_received_context, buffer, size);
                    size = 0;
                    break;
                }
            }
        }
    }
}