pplx::task connect_impl()

in Release/src/websockets/client/ws_client_wspp.cpp [277:451]


    pplx::task<void> connect_impl()
    {
        auto& client = m_client->client<WebsocketConfigType>();

        client.clear_access_channels(websocketpp::log::alevel::all);
        client.clear_error_channels(websocketpp::log::alevel::all);
        client.init_asio();
        client.start_perpetual();

        _ASSERTE(m_state == CREATED);
        client.set_open_handler([this](websocketpp::connection_hdl) {
            _ASSERTE(m_state == CONNECTING);
            m_state = CONNECTED;
            m_connect_tce.set();
        });

        client.set_fail_handler([this](websocketpp::connection_hdl con_hdl) {
            _ASSERTE(m_state == CONNECTING);
            this->shutdown_wspp_impl<WebsocketConfigType>(con_hdl, true);
        });

        client.set_message_handler(
            [this](websocketpp::connection_hdl, const websocketpp::config::asio_client::message_type::ptr& msg) {
                if (m_external_message_handler)
                {
                    _ASSERTE(m_state >= CONNECTED && m_state < CLOSED);
                    websocket_incoming_message incoming_msg;

                    switch (msg->get_opcode())
                    {
                        case websocketpp::frame::opcode::binary:
                            incoming_msg.m_msg_type = websocket_message_type::binary_message;
                            break;
                        case websocketpp::frame::opcode::text:
                            incoming_msg.m_msg_type = websocket_message_type::text_message;
                            break;
                        default:
                            // Unknown message type. Since both websocketpp and our code use the RFC codes, we'll just
                            // pass it on to the user.
                            incoming_msg.m_msg_type = static_cast<websocket_message_type>(msg->get_opcode());
                            break;
                    }

                    // 'move' the payload into a container buffer to avoid any copies.
                    auto& payload = msg->get_raw_payload();
                    incoming_msg.m_body = concurrency::streams::container_buffer<std::string>(std::move(payload));

                    m_external_message_handler(incoming_msg);
                }
            });

        client.set_ping_handler([this](websocketpp::connection_hdl, const std::string& msg) {
            if (m_external_message_handler)
            {
                _ASSERTE(m_state >= CONNECTED && m_state < CLOSED);
                websocket_incoming_message incoming_msg;

                incoming_msg.m_msg_type = websocket_message_type::ping;
                incoming_msg.m_body = concurrency::streams::container_buffer<std::string>(msg);

                m_external_message_handler(incoming_msg);
            }
            return true;
        });

        client.set_pong_handler([this](websocketpp::connection_hdl, const std::string& msg) {
            if (m_external_message_handler)
            {
                _ASSERTE(m_state >= CONNECTED && m_state < CLOSED);
                websocket_incoming_message incoming_msg;

                incoming_msg.m_msg_type = websocket_message_type::pong;
                incoming_msg.m_body = concurrency::streams::container_buffer<std::string>(msg);

                m_external_message_handler(incoming_msg);
            }
        });

        client.set_close_handler([this](websocketpp::connection_hdl con_hdl) {
            _ASSERTE(m_state != CLOSED);
            this->shutdown_wspp_impl<WebsocketConfigType>(con_hdl, false);
        });

        // Set User Agent specified by the user. This needs to happen before any connection is created
        const auto& headers = m_config.headers();

        auto user_agent_it = headers.find(web::http::header_names::user_agent);
        if (user_agent_it != headers.end())
        {
            client.set_user_agent(utility::conversions::to_utf8string(user_agent_it->second));
        }

        // Get the connection handle to save for later, have to create temporary
        // because type erasure occurs with connection_hdl.
        websocketpp::lib::error_code ec;
        auto con = client.get_connection(utility::conversions::to_utf8string(m_uri.to_string()), ec);
        m_con = con;
        if (ec.value() != 0)
        {
            return pplx::task_from_exception<void>(websocket_exception(ec, build_error_msg(ec, "get_connection")));
        }

        // Add any request headers specified by the user.
        for (const auto& header : headers)
        {
            if (!utility::details::str_iequal(header.first, g_subProtocolHeader))
            {
                con->append_header(utility::conversions::to_utf8string(header.first),
                                   utility::conversions::to_utf8string(header.second));
            }
        }

        // Add any specified subprotocols.
        if (headers.has(g_subProtocolHeader))
        {
            const std::vector<utility::string_t> protocols = m_config.subprotocols();
            for (const auto& value : protocols)
            {
                con->add_subprotocol(utility::conversions::to_utf8string(value), ec);
                if (ec.value())
                {
                    return pplx::task_from_exception<void>(
                        websocket_exception(ec, build_error_msg(ec, "add_subprotocol")));
                }
            }
        }

        // Setup proxy options.
        const auto& proxy = m_config.proxy();
        if (proxy.is_specified())
        {
            con->set_proxy(utility::conversions::to_utf8string(proxy.address().to_string()), ec);
            if (ec)
            {
                return pplx::task_from_exception<void>(websocket_exception(ec, build_error_msg(ec, "set_proxy")));
            }

            const auto& cred = proxy.credentials();
            if (cred.is_set())
            {
                con->set_proxy_basic_auth(utility::conversions::to_utf8string(cred.username()),
                                          utility::conversions::to_utf8string(*cred._internal_decrypt()),
                                          ec);
                if (ec)
                {
                    return pplx::task_from_exception<void>(
                        websocket_exception(ec, build_error_msg(ec, "set_proxy_basic_auth")));
                }
            }
        }

        m_state = CONNECTING;
        client.connect(con);
        {
            std::lock_guard<std::mutex> lock(m_wspp_client_lock);
            m_thread = std::thread([&client]() {
#if defined(__ANDROID__)
                crossplat::get_jvm_env();
#endif
                client.run();
#if defined(__ANDROID__)
                crossplat::JVM.load()->DetachCurrentThread();
#endif

#if OPENSSL_VERSION_NUMBER < 0x10100000L || defined(LIBRESSL_VERSION_NUMBER)
                // OpenSSL stores some per thread state that never will be cleaned up until
                // the dll is unloaded. If static linking, like we do, the state isn't cleaned up
                // at all and will be reported as leaks.
                // See http://www.openssl.org/support/faq.html#PROG13
                ERR_remove_thread_state(nullptr);
#endif
            });
        } // unlock
        return pplx::create_task(m_connect_tce);
    }