in src/TcpAdapterProxy.cpp [598:770]
void tcp_adapter_proxy::async_setup_web_socket(tcp_adapter_context &tac)
{
std::shared_ptr<basic_retry_config> retry_config =
std::make_shared<basic_retry_config>(tac.io_ctx,
GET_SETTING(settings, WEB_SOCKET_CONNECT_RETRY_COUNT),
GET_SETTING(settings, WEB_SOCKET_CONNECT_RETRY_DELAY_MS),
std::bind(&tcp_adapter_proxy::async_setup_web_socket, this, std::ref(tac)));
if (tac.wss && tac.wss->is_open())
{
BOOST_LOG_SEV(log, info) << "Web socket stream already open. Continuing to use existing connection";
if (after_setup_web_socket)
{
after_setup_web_socket();
}
return;
}
if (tac.wss && tac.wss->lowest_layer().is_open())
{
tac.wss->lowest_layer().close();
}
tac.wss = std::make_shared<WebSocketStream>(tac.adapter_config, &log, tac.io_ctx);
tac.wss->control_callback(std::bind(&tcp_adapter_proxy::handle_web_socket_control_message, this, std::ref(tac), std::placeholders::_1, std::placeholders::_2));
static std::string user_agent_string = (boost::format("localproxy %1% %2%-bit/boost-%3%.%4%.%5%/openssl-%6%.%7%.%8%/protobuf-%9%")
% BOOST_PLATFORM % (sizeof(void*)*8)
% (BOOST_VERSION / 100000) % ((BOOST_VERSION / 100) % 1000) % (BOOST_VERSION % 100)
% (OPENSSL_VERSION_NUMBER >> 28) % ((OPENSSL_VERSION_NUMBER >> 20) & 0xF) % ((OPENSSL_VERSION_NUMBER >> 12) & 0xF)
% google::protobuf::internal::VersionString(GOOGLE_PROTOBUF_VERSION) ).str();
//the actual work of this function starts here
BOOST_LOG_SEV(log, info) << "Attempting to establish web socket connection with endpoint wss://" << tac.adapter_config.proxy_host << ":" << tac.adapter_config.proxy_port;
auto on_websocket_handshake = [=, &tac](boost::system::error_code const &ec)
{
BOOST_LOG_SEV(log, trace) << "Web socket upgrade response:\n" << tac.wss_response;
if (ec)
{
BOOST_LOG_SEV(log, error) << (boost::format("Proxy server rejected web socket upgrade request: (HTTP/%4%.%5% %1% %2%) \"%3%\"")
% tac.wss_response.result_int() % tac.wss_response.reason() % boost::trim_copy(tac.wss_response.body())
% (tac.wss_response.version() / 10) % (tac.wss_response.version() % 10)).str(); //form HTTP version
auto is_server_error = [](const int http_response_code) { return http_response_code >= 500 && http_response_code < 600;};
if (is_server_error(tac.wss_response.result_int()))
{ //retry these, otherwise fail and close
basic_retry_execute(log, retry_config, [&]() { std::bind(&tcp_adapter_proxy::web_socket_close_and_stop, this, std::ref(tac)); });
}
else
{
web_socket_close_and_stop(tac);
}
}
else
{ //put web socket in binary mode
tac.wss->binary(true);
tac.wss->auto_fragment(true);
//output this first because it'll be necessary to have this if any further errors need support/debugging
BOOST_LOG_SEV(log, info) << "Web socket session ID: " << tac.wss_response["channel-id"].to_string();
if (!tac.wss_response.count(boost::beast::http::field::sec_websocket_protocol))
{
throw proxy_exception("No websocket subprotocol returned from proxy server!");
}
BOOST_LOG_SEV(log, debug) << "Web socket subprotocol selected: " << tac.wss_response[boost::beast::http::field::sec_websocket_protocol].to_string();
BOOST_LOG_SEV(log, info) << "Successfully established websocket connection with proxy server: wss://" << tac.adapter_config.proxy_host << ":" << tac.adapter_config.proxy_port;
std::shared_ptr<boost::beast::websocket::ping_data> ping_data = std::make_shared<boost::beast::websocket::ping_data>();
do_ping_data(tac, *ping_data);
std::shared_ptr<std::chrono::milliseconds> ping_period =
std::make_shared<std::chrono::milliseconds>(GET_SETTING(settings, WEB_SOCKET_PING_PERIOD_MS));
std::shared_ptr<boost::asio::steady_timer> ping_timer = std::make_shared<boost::asio::steady_timer>(tac.io_ctx);
BOOST_LOG_SEV(log, debug) << "Seting up web socket pings for every " << ping_period->count() << " milliseconds";
tac.wss->async_ping(*ping_data, std::bind(&tcp_adapter_proxy::async_ping_handler_loop, this, std::ref(tac), ping_data, ping_period, ping_timer, std::placeholders::_1));
if (after_setup_web_socket)
{
after_setup_web_socket();
}
}
};
auto on_tcp_connect = [=, &tac](boost::system::error_code const &ec)
{
if (ec)
{
BOOST_LOG_SEV(log, error) << (boost::format("Could not connect to proxy server: %1%") % ec.message()).str();
basic_retry_execute(log, retry_config, [&]() { std::bind(&tcp_adapter_proxy::web_socket_close_and_stop, this, std::ref(tac)); });
}
else
{
BOOST_LOG_SEV(log, debug) << "Connected successfully with proxy server";
boost::asio::socket_base::receive_buffer_size const recv_buffer_size(static_cast<int>(GET_SETTING(settings, WEB_SOCKET_MAX_FRAME_SIZE)));
boost::asio::socket_base::send_buffer_size const send_buffer_size_option(static_cast<int>(GET_SETTING(settings, WEB_SOCKET_MAX_FRAME_SIZE)));
tac.wss->lowest_layer().set_option(recv_buffer_size);
tac.wss->lowest_layer().set_option(send_buffer_size_option);
#ifndef _AWSIOT_TUNNELING_NO_SSL
BOOST_LOG_SEV(log, trace) << "Performing SSL handshake with proxy server";
if (!localproxy_config.no_ssl_host_verify)
{
tac.wss->set_ssl_verify_mode(boost::asio::ssl::verify_peer | boost::asio::ssl::verify_fail_if_no_peer_cert);
tac.wss->set_verify_callback(boost::asio::ssl::rfc2818_verification(tac.adapter_config.proxy_host));
}
else
{
BOOST_LOG_SEV(log, debug) << "SSL host verification is off";
}
//next ssl handshake
tac.wss->async_ssl_handshake(boost::asio::ssl::stream_base::client, [=, &tac](boost::system::error_code const &ec)
{
if (ec)
{
BOOST_LOG_SEV(log, error) << (boost::format("Could not perform SSL handshake with proxy server: %1%") % ec.message()).str();
basic_retry_execute(log, retry_config, [&]() { std::bind(&tcp_adapter_proxy::web_socket_close_and_stop, this, std::ref(tac)); });
}
else
{
BOOST_LOG_SEV(log, debug) << "Successfully completed SSL handshake with proxy server";
#endif
BOOST_LOG_SEV(log, trace) << "Performing websocket handshake with proxy server";
//next do web socket upgrade - add two custom headers
tac.wss->async_handshake_ex(tac.wss_response, tac.adapter_config.proxy_host.c_str(),
(boost::format("/tunnel?%1%=%2%")%PROXY_MODE_QUERY_PARAM % get_proxy_mode_string(tac.adapter_config.mode)).str(),
[&](boost::beast::websocket::request_type &request)
{
request.set(boost::beast::http::field::sec_websocket_protocol, GET_SETTING(settings, WEB_SOCKET_SUBPROTOCOL));
request.set(ACCESS_TOKEN_HEADER, tac.adapter_config.access_token.c_str());
request.set(boost::beast::http::field::user_agent, user_agent_string);
BOOST_LOG_SEV(log, trace) << "Web socket ugprade request(*not entirely final):\n" << get_token_filtered_request(request);
},
on_websocket_handshake
);
#ifndef _AWSIOT_TUNNELING_NO_SSL
}
});
#endif
}
};
auto on_proxy_server_dns_resolve = [=, &tac](boost::system::error_code const &ec, tcp::resolver::results_type results)
{
if (ec)
{
BOOST_LOG_SEV(log, error) << (boost::format("Could not resolve DNS hostname of proxy host: %1% - %2%") % tac.adapter_config.proxy_host % ec.message()).str();
basic_retry_execute(log, retry_config, [&]() { std::bind(&tcp_adapter_proxy::web_socket_close_and_stop, this, std::ref(tac)); });
}
else
{
BOOST_LOG_SEV(log, debug) << "Resolved proxy server IP: " << results->endpoint().address();
//next connect tcp
tac.wss->lowest_layer().async_connect(*results.begin(), on_tcp_connect);
}
};
auto on_web_proxy_dns_resolve = [=, &tac](boost::system::error_code const &ec, tcp::resolver::results_type results)
{
if (ec)
{
BOOST_LOG_SEV(log, error) << (boost::format("Could not resolve DNS hostname of Web proxy: %1% - %2%") % tac.adapter_config.web_proxy_host % ec.message()).str();
basic_retry_execute(log, retry_config, [&]() { std::bind(&tcp_adapter_proxy::web_socket_close_and_stop, this, std::ref(tac)); });
} else {
BOOST_LOG_SEV(log, debug) << "Resolved Web proxy IP: " << results->endpoint().address();
web_proxy_adapter.async_connect(on_tcp_connect, tac.wss, results->endpoint());
}
};
//start first async handler which chains into adding the rest
if (tac.adapter_config.web_proxy_host.empty()) {
BOOST_LOG_SEV(log, trace) << "Resolving proxy server host: " << tac.adapter_config.proxy_host;
tac.wss_resolver.async_resolve(tac.adapter_config.proxy_host, boost::lexical_cast<std::string>(tac.adapter_config.proxy_port), on_proxy_server_dns_resolve);
} else {
BOOST_LOG_SEV(log, trace) << "Resolving Web proxy host: " << tac.adapter_config.web_proxy_host;
tac.wss_resolver.async_resolve(tac.adapter_config.web_proxy_host, boost::lexical_cast<std::string>(tac.adapter_config.web_proxy_port), on_web_proxy_dns_resolve);
}
}