in Source/WebSocket/Websocketpp/websocketpp_websocket.cpp [291:548]
HRESULT connect_impl(XAsyncBlock* async)
{
if (async->queue)
{
XTaskQueuePortHandle worker{ nullptr };
RETURN_IF_FAILED(XTaskQueueGetPort(async->queue, XTaskQueuePort::Work, &worker));
RETURN_IF_FAILED(XTaskQueueCreateComposite(worker, worker, &m_backgroundQueue));
}
auto &client = m_client->client<WebsocketConfigType>();
client.clear_access_channels(websocketpp::log::alevel::all);
client.clear_error_channels(websocketpp::log::alevel::all);
client.init_asio();
client.start_perpetual();
auto sharedThis { shared_from_this() };
ASSERT(m_state == DISCONNECTED);
client.set_open_handler([sharedThis, async](websocketpp::connection_hdl)
{
ASSERT(sharedThis->m_state == CONNECTING);
sharedThis->m_state = CONNECTED;
sharedThis->set_connection_error<WebsocketConfigType>();
sharedThis->send_ping();
XAsyncComplete(async, S_OK, sizeof(WebSocketCompletionResult));
});
client.set_fail_handler([sharedThis, async](websocketpp::connection_hdl)
{
ASSERT(sharedThis->m_state == CONNECTING);
sharedThis->set_connection_error<WebsocketConfigType>();
sharedThis->shutdown_wspp_impl<WebsocketConfigType>(
[
sharedThis,
async
]
{
XAsyncComplete(async, S_OK, sizeof(WebSocketCompletionResult));
});
});
client.set_message_handler([sharedThis](websocketpp::connection_hdl, const websocketpp::config::asio_client::message_type::ptr &msg)
{
HCWebSocketMessageFunction messageFunc{ nullptr };
HCWebSocketBinaryMessageFunction binaryMessageFunc{ nullptr };
void* context{ nullptr };
auto hr = HCWebSocketGetEventFunctions(sharedThis->m_hcWebsocketHandle, &messageFunc, &binaryMessageFunc, nullptr, &context);
if (SUCCEEDED(hr))
{
ASSERT(messageFunc && binaryMessageFunc);
// TODO: hook up HCWebSocketCloseEventFunction handler upon unexpected disconnect
// TODO: verify auto disconnect when closing client's websocket handle
if (msg->get_opcode() == websocketpp::frame::opcode::text)
{
auto& payload = msg->get_raw_payload();
messageFunc(sharedThis->m_hcWebsocketHandle, payload.data(), context);
}
else if (msg->get_opcode() == websocketpp::frame::opcode::binary)
{
auto& payload = msg->get_raw_payload();
binaryMessageFunc(sharedThis->m_hcWebsocketHandle, (uint8_t*)payload.data(), (uint32_t)payload.size(), context);
}
}
});
client.set_close_handler([sharedThis](websocketpp::connection_hdl)
{
ASSERT(sharedThis->m_state == CONNECTED || sharedThis->m_state == DISCONNECTING);
sharedThis->shutdown_wspp_impl<WebsocketConfigType>([sharedThis]()
{
HCWebSocketCloseEventFunction closeFunc{ nullptr };
void* context{ nullptr };
HCWebSocketGetEventFunctions(sharedThis->m_hcWebsocketHandle, nullptr, nullptr, &closeFunc, &context);
if (closeFunc)
{
closeFunc(sharedThis->m_hcWebsocketHandle, static_cast<HCWebSocketCloseStatus>(sharedThis->m_closeCode), context);
}
});
});
// Set User Agent specified by the user. This needs to happen before any connection is created
const auto& headers = m_hcWebsocketHandle->Headers();
auto user_agent_it = headers.find(websocketpp::user_agent);
if (user_agent_it != headers.end())
{
client.set_user_agent(user_agent_it->second.data());
}
// Get the connection handle to save for later, have to create temporary
// because type erasure occurs with connection_hdl.
websocketpp::lib::error_code ec;
auto con = client.get_connection(m_uri.FullPath().data(), ec);
m_con = con;
if (ec.value() != 0)
{
HC_TRACE_ERROR(WEBSOCKET, "Websocket [ID %llu]: wspp get_connection failed", TO_ULL(m_hcWebsocketHandle->id));
return E_FAIL;
}
// Add any request headers specified by the user.
for (const auto & header : headers)
{
// Subprotocols are handled separately below
if (str_icmp(header.first, SUB_PROTOCOL_HEADER) != 0)
{
con->append_header(header.first.data(), header.second.data());
}
}
// Add any specified subprotocols.
if (!m_subprotocol.empty())
{
con->add_subprotocol(m_subprotocol.data(), ec);
if (ec.value())
{
HC_TRACE_ERROR(WEBSOCKET, "Websocket [ID %llu]: add_subprotocol failed", TO_ULL(m_hcWebsocketHandle->id));
return E_FAIL;
}
}
// Setup proxy options.
if (!m_hcWebsocketHandle->ProxyUri().empty())
{
con->set_proxy(m_hcWebsocketHandle->ProxyUri().data(), ec);
if (ec)
{
HC_TRACE_ERROR(WEBSOCKET, "Websocket [ID %llu]: wspp set_proxy failed", TO_ULL(m_hcWebsocketHandle->id));
return E_FAIL;
}
}
#if HC_PLATFORM_IS_MICROSOFT
else
{
// On windows platforms use the IE proxy if the user didn't specify one
Uri proxyUri;
auto proxyType = get_ie_proxy_info(proxy_protocol::websocket, proxyUri);
if (proxyType == proxy_type::named_proxy)
{
con->set_proxy(proxyUri.FullPath().data(), ec);
if (ec)
{
HC_TRACE_ERROR(WEBSOCKET, "Websocket [ID %llu]: wspp set_proxy failed", TO_ULL(m_hcWebsocketHandle->id));
return E_FAIL;
}
}
}
#endif
// Initialize the 'connect' XAsyncBlock here, but the actually work will happen on the ASIO background thread below
auto hr = XAsyncBegin(async, shared_ptr_cache::store(shared_from_this()), (void*)HCWebSocketConnectAsync, __FUNCTION__,
[](XAsyncOp op, const XAsyncProviderData* data)
{
if (op == XAsyncOp::GetResult)
{
auto context = shared_ptr_cache::fetch<wspp_websocket_impl>(data->context);
if (context == nullptr)
{
return E_HC_NOT_INITIALISED;
}
auto result = reinterpret_cast<WebSocketCompletionResult*>(data->buffer);
result->websocket = context->m_hcWebsocketHandle;
result->platformErrorCode = context->m_connectError.value();
result->errorCode = context->m_connectError ? E_FAIL : S_OK;
}
else if (op == XAsyncOp::Cleanup)
{
shared_ptr_cache::remove(data->context);
}
return S_OK;
});
if (SUCCEEDED(hr))
{
m_state = CONNECTING;
client.connect(con);
try
{
struct client_context
{
client_context(websocketpp::client<WebsocketConfigType>& _client) : client(_client) {}
websocketpp::client<WebsocketConfigType>& client;
};
auto context = http_allocate_shared<client_context>(client);
m_websocketThread = std::thread([context, id{ m_hcWebsocketHandle->id }]()
{
HC_TRACE_INFORMATION(WEBSOCKET, "id=%u Wspp client work thread starting", id);
#if HC_PLATFORM == HC_PLATFORM_ANDROID
JavaVM* javaVm = nullptr;
{
// Allow our singleton to go out of scope quickly once we're done with it
auto httpSingleton = xbox::httpclient::get_http_singleton();
if (httpSingleton)
{
auto platformContext = httpSingleton->m_performEnv->androidPlatformContext;
javaVm = platformContext->GetJavaVm();
}
}
if (javaVm == nullptr)
{
HC_TRACE_ERROR(HTTPCLIENT, "javaVm is null");
throw std::runtime_error("JavaVm is null");
}
JNIEnv* jniEnv = nullptr;
if (javaVm->AttachCurrentThread(&jniEnv, nullptr) != 0)
{
assert(false);
}
#endif
try
{
context->client.run();
}
catch (...)
{
HC_TRACE_ERROR(WEBSOCKET, "Caught exception in wspp client::run!");
}
// OpenSSL stores some per thread state that never will be cleaned up until
// the dll is unloaded. If static linking, like we do, the state isn't cleaned up
// at all and will be reported as leaks.
// See http://www.openssl.org/support/faq.html#PROG13
#if HC_PLATFORM == HC_PLATFORM_ANDROID || HC_PLATFORM_IS_APPLE
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wdeprecated-declarations"
ERR_remove_thread_state(nullptr);
#pragma clang diagnostic pop
#if HC_PLATFORM == HC_PLATFORM_ANDROID
javaVm->DetachCurrentThread();
#endif // HC_PLATFORM_ANDROID
#else
ERR_remove_thread_state(nullptr);
#endif // HC_PLATFORM_ANDROID || HC_PLATFORM_IS_APPLE
HC_TRACE_INFORMATION(WEBSOCKET, "id=%u Wspp client work thread end", id);
});
hr = S_OK;
}
catch (std::system_error err)
{
HC_TRACE_ERROR(WEBSOCKET, "Websocket: couldn't create background websocket thread (%d)", err.code().value());
hr = E_FAIL;
}
}
return hr;
}