void tcp_adapter_proxy::async_setup_web_socket()

in src/TcpAdapterProxy.cpp [598:770]


    void tcp_adapter_proxy::async_setup_web_socket(tcp_adapter_context &tac)
    {
        std::shared_ptr<basic_retry_config> retry_config =
            std::make_shared<basic_retry_config>(tac.io_ctx,
                GET_SETTING(settings, WEB_SOCKET_CONNECT_RETRY_COUNT),
                GET_SETTING(settings, WEB_SOCKET_CONNECT_RETRY_DELAY_MS),
                std::bind(&tcp_adapter_proxy::async_setup_web_socket, this, std::ref(tac)));

        if (tac.wss && tac.wss->is_open())
        {
            BOOST_LOG_SEV(log, info) << "Web socket stream already open. Continuing to use existing connection";
            if (after_setup_web_socket)
            {
                after_setup_web_socket();
            }
            return;
        }
        if (tac.wss && tac.wss->lowest_layer().is_open())
        {
            tac.wss->lowest_layer().close();
        }
        tac.wss = std::make_shared<WebSocketStream>(tac.adapter_config, &log, tac.io_ctx);
        tac.wss->control_callback(std::bind(&tcp_adapter_proxy::handle_web_socket_control_message, this, std::ref(tac), std::placeholders::_1, std::placeholders::_2));
        
        static std::string user_agent_string = (boost::format("localproxy %1% %2%-bit/boost-%3%.%4%.%5%/openssl-%6%.%7%.%8%/protobuf-%9%")
            % BOOST_PLATFORM % (sizeof(void*)*8)
            % (BOOST_VERSION / 100000) % ((BOOST_VERSION / 100) % 1000) % (BOOST_VERSION % 100)
            % (OPENSSL_VERSION_NUMBER >> 28) % ((OPENSSL_VERSION_NUMBER >> 20) & 0xF) % ((OPENSSL_VERSION_NUMBER >> 12) & 0xF)
            % google::protobuf::internal::VersionString(GOOGLE_PROTOBUF_VERSION) ).str();
        
        //the actual work of this function starts here
        BOOST_LOG_SEV(log, info) << "Attempting to establish web socket connection with endpoint wss://" << tac.adapter_config.proxy_host << ":" << tac.adapter_config.proxy_port;

        auto on_websocket_handshake = [=, &tac](boost::system::error_code const &ec)
        {
            BOOST_LOG_SEV(log, trace) << "Web socket upgrade response:\n" << tac.wss_response;
            if (ec)
            {
                BOOST_LOG_SEV(log, error) << (boost::format("Proxy server rejected web socket upgrade request: (HTTP/%4%.%5% %1% %2%) \"%3%\"")
                                              % tac.wss_response.result_int() % tac.wss_response.reason() % boost::trim_copy(tac.wss_response.body())
                                              % (tac.wss_response.version() / 10) % (tac.wss_response.version() % 10)).str();    //form HTTP version
                auto is_server_error = [](const int http_response_code) { return http_response_code >= 500 && http_response_code < 600;};
                if (is_server_error(tac.wss_response.result_int()))
                {   //retry these, otherwise fail and close
                    basic_retry_execute(log, retry_config, [&]() { std::bind(&tcp_adapter_proxy::web_socket_close_and_stop, this, std::ref(tac)); });
                }
                else
                {
                    web_socket_close_and_stop(tac);
                }
            }
            else
            {   //put web socket in binary mode
                tac.wss->binary(true);
                tac.wss->auto_fragment(true);
                //output this first because it'll be necessary to have this if any further errors need support/debugging
                BOOST_LOG_SEV(log, info) << "Web socket session ID: " << tac.wss_response["channel-id"].to_string();
                if (!tac.wss_response.count(boost::beast::http::field::sec_websocket_protocol))
                {
                    throw proxy_exception("No websocket subprotocol returned from proxy server!");
                }
                BOOST_LOG_SEV(log, debug) << "Web socket subprotocol selected: " << tac.wss_response[boost::beast::http::field::sec_websocket_protocol].to_string();
                BOOST_LOG_SEV(log, info) << "Successfully established websocket connection with proxy server: wss://" << tac.adapter_config.proxy_host << ":" << tac.adapter_config.proxy_port;
                std::shared_ptr<boost::beast::websocket::ping_data> ping_data = std::make_shared<boost::beast::websocket::ping_data>();
                do_ping_data(tac, *ping_data);
                std::shared_ptr<std::chrono::milliseconds> ping_period =
                        std::make_shared<std::chrono::milliseconds>(GET_SETTING(settings, WEB_SOCKET_PING_PERIOD_MS));
                std::shared_ptr<boost::asio::steady_timer> ping_timer = std::make_shared<boost::asio::steady_timer>(tac.io_ctx);

                BOOST_LOG_SEV(log, debug) << "Seting up web socket pings for every " << ping_period->count() << " milliseconds";

                tac.wss->async_ping(*ping_data, std::bind(&tcp_adapter_proxy::async_ping_handler_loop, this, std::ref(tac), ping_data, ping_period, ping_timer, std::placeholders::_1));

                if (after_setup_web_socket)
                {
                    after_setup_web_socket();
                }
            }
        };
        auto on_tcp_connect = [=, &tac](boost::system::error_code const &ec)
        {
            if (ec)
            {
                BOOST_LOG_SEV(log, error) << (boost::format("Could not connect to proxy server: %1%") % ec.message()).str();
                basic_retry_execute(log, retry_config, [&]() { std::bind(&tcp_adapter_proxy::web_socket_close_and_stop, this, std::ref(tac)); });
            }
            else
            {
                BOOST_LOG_SEV(log, debug) << "Connected successfully with proxy server";
                boost::asio::socket_base::receive_buffer_size const recv_buffer_size(static_cast<int>(GET_SETTING(settings, WEB_SOCKET_MAX_FRAME_SIZE)));
                boost::asio::socket_base::send_buffer_size const send_buffer_size_option(static_cast<int>(GET_SETTING(settings, WEB_SOCKET_MAX_FRAME_SIZE)));
                tac.wss->lowest_layer().set_option(recv_buffer_size);
                tac.wss->lowest_layer().set_option(send_buffer_size_option);

#ifndef _AWSIOT_TUNNELING_NO_SSL
                BOOST_LOG_SEV(log, trace) << "Performing SSL handshake with proxy server";
                if (!localproxy_config.no_ssl_host_verify)
                {
                    tac.wss->set_ssl_verify_mode(boost::asio::ssl::verify_peer | boost::asio::ssl::verify_fail_if_no_peer_cert);
                    tac.wss->set_verify_callback(boost::asio::ssl::rfc2818_verification(tac.adapter_config.proxy_host));
                }
                else
                {
                    BOOST_LOG_SEV(log, debug) << "SSL host verification is off";
                }
                //next ssl handshake
                tac.wss->async_ssl_handshake(boost::asio::ssl::stream_base::client, [=, &tac](boost::system::error_code const &ec)
                {
                    if (ec)
                    {
                        BOOST_LOG_SEV(log, error) << (boost::format("Could not perform SSL handshake with proxy server: %1%") % ec.message()).str();
                        basic_retry_execute(log, retry_config, [&]() { std::bind(&tcp_adapter_proxy::web_socket_close_and_stop, this, std::ref(tac)); });
                    }
                    else
                    {
                        BOOST_LOG_SEV(log, debug) << "Successfully completed SSL handshake with proxy server";
#endif
                        BOOST_LOG_SEV(log, trace) << "Performing websocket handshake with proxy server";
                        //next do web socket upgrade - add two custom headers

                        tac.wss->async_handshake_ex(tac.wss_response, tac.adapter_config.proxy_host.c_str(),
                                                    (boost::format("/tunnel?%1%=%2%")%PROXY_MODE_QUERY_PARAM % get_proxy_mode_string(tac.adapter_config.mode)).str(),
                                                    [&](boost::beast::websocket::request_type &request)
                                                    {
                                                        request.set(boost::beast::http::field::sec_websocket_protocol, GET_SETTING(settings, WEB_SOCKET_SUBPROTOCOL));
                                                        request.set(ACCESS_TOKEN_HEADER, tac.adapter_config.access_token.c_str());
                                                        request.set(boost::beast::http::field::user_agent, user_agent_string);
                                                        BOOST_LOG_SEV(log, trace) << "Web socket ugprade request(*not entirely final):\n" << get_token_filtered_request(request);
                                                    },
                                                    on_websocket_handshake
                        );
#ifndef _AWSIOT_TUNNELING_NO_SSL
                    }
                });
#endif
            }
        };
        auto on_proxy_server_dns_resolve = [=, &tac](boost::system::error_code const &ec, tcp::resolver::results_type results)
        {
            if (ec)
            {
                BOOST_LOG_SEV(log, error) << (boost::format("Could not resolve DNS hostname of proxy host: %1% - %2%") % tac.adapter_config.proxy_host % ec.message()).str();
                basic_retry_execute(log, retry_config, [&]() { std::bind(&tcp_adapter_proxy::web_socket_close_and_stop, this, std::ref(tac)); });
            }
            else
            {
                BOOST_LOG_SEV(log, debug) << "Resolved proxy server IP: " << results->endpoint().address();
                //next connect tcp
                tac.wss->lowest_layer().async_connect(*results.begin(), on_tcp_connect);
            }
        };
        auto on_web_proxy_dns_resolve = [=, &tac](boost::system::error_code const &ec, tcp::resolver::results_type results)
        {
            if (ec)
            {
                BOOST_LOG_SEV(log, error) << (boost::format("Could not resolve DNS hostname of Web proxy: %1% - %2%") % tac.adapter_config.web_proxy_host % ec.message()).str();
                basic_retry_execute(log, retry_config, [&]() { std::bind(&tcp_adapter_proxy::web_socket_close_and_stop, this, std::ref(tac)); });
            } else {
                BOOST_LOG_SEV(log, debug) << "Resolved Web proxy IP: " << results->endpoint().address();
                web_proxy_adapter.async_connect(on_tcp_connect, tac.wss, results->endpoint());
            }
        };


        //start first async handler which chains into adding the rest
        if (tac.adapter_config.web_proxy_host.empty()) {
            BOOST_LOG_SEV(log, trace) << "Resolving proxy server host: " << tac.adapter_config.proxy_host;
            tac.wss_resolver.async_resolve(tac.adapter_config.proxy_host, boost::lexical_cast<std::string>(tac.adapter_config.proxy_port), on_proxy_server_dns_resolve);
        } else {
            BOOST_LOG_SEV(log, trace) << "Resolving Web proxy host: " << tac.adapter_config.web_proxy_host;
            tac.wss_resolver.async_resolve(tac.adapter_config.web_proxy_host, boost::lexical_cast<std::string>(tac.adapter_config.web_proxy_port), on_web_proxy_dns_resolve);
        }
    }