HRESULT connect_impl()

in Source/WebSocket/Websocketpp/websocketpp_websocket.cpp [291:548]


    HRESULT connect_impl(XAsyncBlock* async)
    {
        if (async->queue)
        {
            XTaskQueuePortHandle worker{ nullptr };
            RETURN_IF_FAILED(XTaskQueueGetPort(async->queue, XTaskQueuePort::Work, &worker));
            RETURN_IF_FAILED(XTaskQueueCreateComposite(worker, worker, &m_backgroundQueue));
        }

        auto &client = m_client->client<WebsocketConfigType>();

        client.clear_access_channels(websocketpp::log::alevel::all);
        client.clear_error_channels(websocketpp::log::alevel::all);
        client.init_asio();
        client.start_perpetual();

        auto sharedThis { shared_from_this() };

        ASSERT(m_state == DISCONNECTED);
        client.set_open_handler([sharedThis, async](websocketpp::connection_hdl)
        {
            ASSERT(sharedThis->m_state == CONNECTING);
            sharedThis->m_state = CONNECTED;
            sharedThis->set_connection_error<WebsocketConfigType>();
            sharedThis->send_ping();
            XAsyncComplete(async, S_OK, sizeof(WebSocketCompletionResult));
        });

        client.set_fail_handler([sharedThis, async](websocketpp::connection_hdl)
        {
            ASSERT(sharedThis->m_state == CONNECTING);
            sharedThis->set_connection_error<WebsocketConfigType>();
            sharedThis->shutdown_wspp_impl<WebsocketConfigType>(
                [
                    sharedThis,
                    async
                ]
            {
                XAsyncComplete(async, S_OK, sizeof(WebSocketCompletionResult));
            });
        });

        client.set_message_handler([sharedThis](websocketpp::connection_hdl, const websocketpp::config::asio_client::message_type::ptr &msg)
        {
            HCWebSocketMessageFunction messageFunc{ nullptr };
            HCWebSocketBinaryMessageFunction binaryMessageFunc{ nullptr };
            void* context{ nullptr };
            auto hr = HCWebSocketGetEventFunctions(sharedThis->m_hcWebsocketHandle, &messageFunc, &binaryMessageFunc, nullptr, &context);

            if (SUCCEEDED(hr))
            {
                ASSERT(messageFunc && binaryMessageFunc);

                // TODO: hook up HCWebSocketCloseEventFunction handler upon unexpected disconnect 
                // TODO: verify auto disconnect when closing client's websocket handle

                if (msg->get_opcode() == websocketpp::frame::opcode::text)
                {
                    auto& payload = msg->get_raw_payload();
                    messageFunc(sharedThis->m_hcWebsocketHandle, payload.data(), context);
                }
                else if (msg->get_opcode() == websocketpp::frame::opcode::binary)
                {
                    auto& payload = msg->get_raw_payload();
                    binaryMessageFunc(sharedThis->m_hcWebsocketHandle, (uint8_t*)payload.data(), (uint32_t)payload.size(), context);
                }
            }
        });

        client.set_close_handler([sharedThis](websocketpp::connection_hdl)
        {
            ASSERT(sharedThis->m_state == CONNECTED || sharedThis->m_state == DISCONNECTING);
            sharedThis->shutdown_wspp_impl<WebsocketConfigType>([sharedThis]()
                {
                    HCWebSocketCloseEventFunction closeFunc{ nullptr };
                    void* context{ nullptr };

                    HCWebSocketGetEventFunctions(sharedThis->m_hcWebsocketHandle, nullptr, nullptr, &closeFunc, &context);
                    if (closeFunc)
                    {
                        closeFunc(sharedThis->m_hcWebsocketHandle, static_cast<HCWebSocketCloseStatus>(sharedThis->m_closeCode), context);
                    }
                });
        });

        // Set User Agent specified by the user. This needs to happen before any connection is created
        const auto& headers = m_hcWebsocketHandle->Headers();

        auto user_agent_it = headers.find(websocketpp::user_agent);
        if (user_agent_it != headers.end())
        {
            client.set_user_agent(user_agent_it->second.data());
        }

        // Get the connection handle to save for later, have to create temporary
        // because type erasure occurs with connection_hdl.
        websocketpp::lib::error_code ec;
        auto con = client.get_connection(m_uri.FullPath().data(), ec);
        m_con = con;
        if (ec.value() != 0)
        {
            HC_TRACE_ERROR(WEBSOCKET, "Websocket [ID %llu]: wspp get_connection failed", TO_ULL(m_hcWebsocketHandle->id));
            return E_FAIL;
        }

        // Add any request headers specified by the user.
        for (const auto & header : headers)
        {
            // Subprotocols are handled separately below
            if (str_icmp(header.first, SUB_PROTOCOL_HEADER) != 0)
            {
                con->append_header(header.first.data(), header.second.data());
            }
        }

        // Add any specified subprotocols.
        if (!m_subprotocol.empty())
        {
            con->add_subprotocol(m_subprotocol.data(), ec);
            if (ec.value())
            {
                HC_TRACE_ERROR(WEBSOCKET, "Websocket [ID %llu]: add_subprotocol failed", TO_ULL(m_hcWebsocketHandle->id));
                return E_FAIL;
            }
        }

        // Setup proxy options.
        if (!m_hcWebsocketHandle->ProxyUri().empty())
        {
            con->set_proxy(m_hcWebsocketHandle->ProxyUri().data(), ec);
            if (ec)
            {
                HC_TRACE_ERROR(WEBSOCKET, "Websocket [ID %llu]: wspp set_proxy failed", TO_ULL(m_hcWebsocketHandle->id));
                return E_FAIL;
            }
        }
#if HC_PLATFORM_IS_MICROSOFT
        else
        {
            // On windows platforms use the IE proxy if the user didn't specify one
            Uri proxyUri;
            auto proxyType = get_ie_proxy_info(proxy_protocol::websocket, proxyUri);

            if (proxyType == proxy_type::named_proxy)
            {
                con->set_proxy(proxyUri.FullPath().data(), ec);
                if (ec)
                {
                    HC_TRACE_ERROR(WEBSOCKET, "Websocket [ID %llu]: wspp set_proxy failed", TO_ULL(m_hcWebsocketHandle->id));
                    return E_FAIL;
                }
            }
        }
#endif
        // Initialize the 'connect' XAsyncBlock here, but the actually work will happen on the ASIO background thread below
        auto hr = XAsyncBegin(async, shared_ptr_cache::store(shared_from_this()), (void*)HCWebSocketConnectAsync, __FUNCTION__,
            [](XAsyncOp op, const XAsyncProviderData* data)
        {
            if (op == XAsyncOp::GetResult)
            {
                auto context = shared_ptr_cache::fetch<wspp_websocket_impl>(data->context);
                if (context == nullptr)
                {
                    return E_HC_NOT_INITIALISED;
                }

                auto result = reinterpret_cast<WebSocketCompletionResult*>(data->buffer);
                result->websocket = context->m_hcWebsocketHandle;
                result->platformErrorCode = context->m_connectError.value();
                result->errorCode = context->m_connectError ? E_FAIL : S_OK;
            }
            else if (op == XAsyncOp::Cleanup)
            {
                shared_ptr_cache::remove(data->context);
            }
            return S_OK;
        });

        if (SUCCEEDED(hr))
        {
            m_state = CONNECTING;
            client.connect(con);

            try
            {
                struct client_context
                {
                    client_context(websocketpp::client<WebsocketConfigType>& _client) : client(_client) {}
                    websocketpp::client<WebsocketConfigType>& client;
                };
                auto context = http_allocate_shared<client_context>(client);

                m_websocketThread = std::thread([context, id{ m_hcWebsocketHandle->id }]()
                {
                    HC_TRACE_INFORMATION(WEBSOCKET, "id=%u Wspp client work thread starting", id);

#if HC_PLATFORM == HC_PLATFORM_ANDROID
                    JavaVM* javaVm = nullptr;
                    {   
                        // Allow our singleton to go out of scope quickly once we're done with it
                        auto httpSingleton = xbox::httpclient::get_http_singleton();
                        if (httpSingleton)
                        {
                            auto platformContext = httpSingleton->m_performEnv->androidPlatformContext;
                            javaVm = platformContext->GetJavaVm();
                        }
                    }

                    if (javaVm == nullptr)
                    {
                        HC_TRACE_ERROR(HTTPCLIENT, "javaVm is null");
                        throw std::runtime_error("JavaVm is null");
                    }

                    JNIEnv* jniEnv = nullptr;
                    if (javaVm->AttachCurrentThread(&jniEnv, nullptr) != 0)
                    {
                        assert(false);
                    }
#endif
                    try
                    {
                        context->client.run();
                    }
                    catch (...)
                    {
                        HC_TRACE_ERROR(WEBSOCKET, "Caught exception in wspp client::run!");
                    }

                    // OpenSSL stores some per thread state that never will be cleaned up until
                    // the dll is unloaded. If static linking, like we do, the state isn't cleaned up
                    // at all and will be reported as leaks.
                    // See http://www.openssl.org/support/faq.html#PROG13
#if HC_PLATFORM == HC_PLATFORM_ANDROID || HC_PLATFORM_IS_APPLE
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wdeprecated-declarations"
                    ERR_remove_thread_state(nullptr);
#pragma clang diagnostic pop
#if HC_PLATFORM == HC_PLATFORM_ANDROID
                    javaVm->DetachCurrentThread();
#endif // HC_PLATFORM_ANDROID
#else
                    ERR_remove_thread_state(nullptr);
#endif // HC_PLATFORM_ANDROID || HC_PLATFORM_IS_APPLE

                    HC_TRACE_INFORMATION(WEBSOCKET, "id=%u Wspp client work thread end", id);
                });
                hr = S_OK;
            }
            catch (std::system_error err)
            {
                HC_TRACE_ERROR(WEBSOCKET, "Websocket: couldn't create background websocket thread (%d)", err.code().value());
                hr = E_FAIL;
            }
        }

        return hr;
    }