in Release/src/websockets/client/ws_client_wspp.cpp [277:451]
pplx::task<void> connect_impl()
{
auto& client = m_client->client<WebsocketConfigType>();
client.clear_access_channels(websocketpp::log::alevel::all);
client.clear_error_channels(websocketpp::log::alevel::all);
client.init_asio();
client.start_perpetual();
_ASSERTE(m_state == CREATED);
client.set_open_handler([this](websocketpp::connection_hdl) {
_ASSERTE(m_state == CONNECTING);
m_state = CONNECTED;
m_connect_tce.set();
});
client.set_fail_handler([this](websocketpp::connection_hdl con_hdl) {
_ASSERTE(m_state == CONNECTING);
this->shutdown_wspp_impl<WebsocketConfigType>(con_hdl, true);
});
client.set_message_handler(
[this](websocketpp::connection_hdl, const websocketpp::config::asio_client::message_type::ptr& msg) {
if (m_external_message_handler)
{
_ASSERTE(m_state >= CONNECTED && m_state < CLOSED);
websocket_incoming_message incoming_msg;
switch (msg->get_opcode())
{
case websocketpp::frame::opcode::binary:
incoming_msg.m_msg_type = websocket_message_type::binary_message;
break;
case websocketpp::frame::opcode::text:
incoming_msg.m_msg_type = websocket_message_type::text_message;
break;
default:
// Unknown message type. Since both websocketpp and our code use the RFC codes, we'll just
// pass it on to the user.
incoming_msg.m_msg_type = static_cast<websocket_message_type>(msg->get_opcode());
break;
}
// 'move' the payload into a container buffer to avoid any copies.
auto& payload = msg->get_raw_payload();
incoming_msg.m_body = concurrency::streams::container_buffer<std::string>(std::move(payload));
m_external_message_handler(incoming_msg);
}
});
client.set_ping_handler([this](websocketpp::connection_hdl, const std::string& msg) {
if (m_external_message_handler)
{
_ASSERTE(m_state >= CONNECTED && m_state < CLOSED);
websocket_incoming_message incoming_msg;
incoming_msg.m_msg_type = websocket_message_type::ping;
incoming_msg.m_body = concurrency::streams::container_buffer<std::string>(msg);
m_external_message_handler(incoming_msg);
}
return true;
});
client.set_pong_handler([this](websocketpp::connection_hdl, const std::string& msg) {
if (m_external_message_handler)
{
_ASSERTE(m_state >= CONNECTED && m_state < CLOSED);
websocket_incoming_message incoming_msg;
incoming_msg.m_msg_type = websocket_message_type::pong;
incoming_msg.m_body = concurrency::streams::container_buffer<std::string>(msg);
m_external_message_handler(incoming_msg);
}
});
client.set_close_handler([this](websocketpp::connection_hdl con_hdl) {
_ASSERTE(m_state != CLOSED);
this->shutdown_wspp_impl<WebsocketConfigType>(con_hdl, false);
});
// Set User Agent specified by the user. This needs to happen before any connection is created
const auto& headers = m_config.headers();
auto user_agent_it = headers.find(web::http::header_names::user_agent);
if (user_agent_it != headers.end())
{
client.set_user_agent(utility::conversions::to_utf8string(user_agent_it->second));
}
// Get the connection handle to save for later, have to create temporary
// because type erasure occurs with connection_hdl.
websocketpp::lib::error_code ec;
auto con = client.get_connection(utility::conversions::to_utf8string(m_uri.to_string()), ec);
m_con = con;
if (ec.value() != 0)
{
return pplx::task_from_exception<void>(websocket_exception(ec, build_error_msg(ec, "get_connection")));
}
// Add any request headers specified by the user.
for (const auto& header : headers)
{
if (!utility::details::str_iequal(header.first, g_subProtocolHeader))
{
con->append_header(utility::conversions::to_utf8string(header.first),
utility::conversions::to_utf8string(header.second));
}
}
// Add any specified subprotocols.
if (headers.has(g_subProtocolHeader))
{
const std::vector<utility::string_t> protocols = m_config.subprotocols();
for (const auto& value : protocols)
{
con->add_subprotocol(utility::conversions::to_utf8string(value), ec);
if (ec.value())
{
return pplx::task_from_exception<void>(
websocket_exception(ec, build_error_msg(ec, "add_subprotocol")));
}
}
}
// Setup proxy options.
const auto& proxy = m_config.proxy();
if (proxy.is_specified())
{
con->set_proxy(utility::conversions::to_utf8string(proxy.address().to_string()), ec);
if (ec)
{
return pplx::task_from_exception<void>(websocket_exception(ec, build_error_msg(ec, "set_proxy")));
}
const auto& cred = proxy.credentials();
if (cred.is_set())
{
con->set_proxy_basic_auth(utility::conversions::to_utf8string(cred.username()),
utility::conversions::to_utf8string(*cred._internal_decrypt()),
ec);
if (ec)
{
return pplx::task_from_exception<void>(
websocket_exception(ec, build_error_msg(ec, "set_proxy_basic_auth")));
}
}
}
m_state = CONNECTING;
client.connect(con);
{
std::lock_guard<std::mutex> lock(m_wspp_client_lock);
m_thread = std::thread([&client]() {
#if defined(__ANDROID__)
crossplat::get_jvm_env();
#endif
client.run();
#if defined(__ANDROID__)
crossplat::JVM.load()->DetachCurrentThread();
#endif
#if OPENSSL_VERSION_NUMBER < 0x10100000L || defined(LIBRESSL_VERSION_NUMBER)
// OpenSSL stores some per thread state that never will be cleaned up until
// the dll is unloaded. If static linking, like we do, the state isn't cleaned up
// at all and will be reported as leaks.
// See http://www.openssl.org/support/faq.html#PROG13
ERR_remove_thread_state(nullptr);
#endif
});
} // unlock
return pplx::create_task(m_connect_tce);
}