ResponseCode KeepaliveActionRunner::PerformAction()

in src/mqtt/Connect.cpp [410:598]


        ResponseCode KeepaliveActionRunner::PerformAction(std::shared_ptr<NetworkConnection> p_network_connection,
                                                          std::shared_ptr<ActionData> p_action_data) {
            // TODO : This action needs cleanup in the future
            std::atomic_bool &_p_thread_continue_ = *p_thread_continue_;
            std::chrono::milliseconds thread_sleep_duration(DEFAULT_CORE_THREAD_SLEEP_DURATION_MS);

            // Wait for first connect, keep alive data will not be available until then
            while (_p_thread_continue_ && !p_client_state_->IsConnected()) {
                std::this_thread::sleep_for(thread_sleep_duration);
            }

            std::shared_ptr<PingreqPacket> p_pingreq_packet = PingreqPacket::Create();
            if (nullptr == p_pingreq_packet) {
                return ResponseCode::NULL_VALUE_ERROR;
            }

            ResponseCode rc = ResponseCode::SUCCESS;
            p_client_state_->setDisconnectCallbackPending(true);

            std::chrono::seconds reconnect_backoff_timer = p_client_state_->GetMinReconnectBackoffTimeout();
            std::chrono::seconds max_backoff_value = p_client_state_->GetMaxReconnectBackoffTimeout();
            std::chrono::seconds keep_alive_interval = p_client_state_->GetKeepAliveTimeout() / 2;
            auto next = std::chrono::system_clock::now() + std::chrono::seconds(keep_alive_interval);

            do {
                if (p_client_state_->IsAutoReconnectEnabled() && p_client_state_->IsAutoReconnectRequired()) {
                    p_client_state_->SetPingreqPending(false);
                    if (p_client_state_->isDisconnectCallbackPending()) {

                        std::shared_ptr<ConnectPacket> p_connect_packet =
                            std::dynamic_pointer_cast<ConnectPacket>(p_client_state_->GetAutoReconnectData());

                        /**
                         * NOTE: All callbacks used by the keepalive should be non-blocking
                         */
                        if (nullptr != p_client_state_->disconnect_handler_ptr_ && nullptr != p_connect_packet) {
                            p_client_state_->disconnect_handler_ptr_(p_connect_packet->GetClientID(),
                                                                   p_client_state_->p_disconnect_app_handler_data_);
                        }

                        reconnect_backoff_timer = p_client_state_->GetMinReconnectBackoffTimeout();
                        max_backoff_value = p_client_state_->GetMaxReconnectBackoffTimeout();
                        AWS_LOG_INFO(KEEPALIVE_LOG_TAG,
                                     "Initial value of reconnect timer : %ld!!",
                                     reconnect_backoff_timer.count());
                        AWS_LOG_INFO(KEEPALIVE_LOG_TAG, "Max backoff value : %ld!!", max_backoff_value.count());
                    }
                    AWS_LOG_INFO(KEEPALIVE_LOG_TAG, "Attempting Reconnect");

                    std::shared_ptr<ConnectPacket> p_connect_packet =
                        std::dynamic_pointer_cast<ConnectPacket>(p_client_state_->GetAutoReconnectData());

                    rc = p_client_state_->PerformAction(ActionType::CONNECT,
                                                        p_connect_packet,
                                                        p_client_state_->GetMqttCommandTimeout());

                    if (nullptr != p_client_state_->reconnect_handler_ptr_) {
                        p_client_state_->reconnect_handler_ptr_(p_connect_packet->GetClientID(),
                                                              p_client_state_->p_reconnect_app_handler_data_,
                                                              rc);
                    }
                    if (ResponseCode::MQTT_CONNACK_CONNECTION_ACCEPTED == rc) {
                        p_client_state_->SetAutoReconnectRequired(false);
                        // if no subscriptions, skip resubscribe
                        if (!p_client_state_->subscription_map_.empty()) {

                            util::Vector<std::shared_ptr<mqtt::Subscription>> topic_vector;

                            util::Map<util::String, std::shared_ptr<Subscription>>::const_iterator
                                itr = p_client_state_->subscription_map_.begin();
                            while (itr != p_client_state_->subscription_map_.end()) {
                                topic_vector.push_back(itr->second);
                                itr++;
                                if (topic_vector.size() == MAX_TOPICS_IN_ONE_SUBSCRIBE_PACKET) {
                                    std::shared_ptr<mqtt::SubscribePacket>
                                        p_subscribe_packet = mqtt::SubscribePacket::Create(topic_vector);
                                    p_subscribe_packet->SetPacketId(p_client_state_->GetNextPacketId());
                                    rc = WriteToNetworkBuffer(p_network_connection, p_subscribe_packet->ToString());
                                    if (ResponseCode::SUCCESS != rc) {
                                        AWS_LOG_ERROR(KEEPALIVE_LOG_TAG,
                                                      "Resubscribe attempt returned unhandled error. \n%s",
                                                      ResponseHelper::ToString(rc).c_str());
                                        break;
                                    }
                                    topic_vector.clear();
                                }
                            }

                            if (ResponseCode::SUCCESS == rc || ResponseCode::MQTT_CONNACK_CONNECTION_ACCEPTED == rc) {
                                if (!topic_vector.empty()) {
                                    std::shared_ptr<mqtt::SubscribePacket>
                                        p_subscribe_packet = mqtt::SubscribePacket::Create(topic_vector);
                                    p_subscribe_packet->SetPacketId(p_client_state_->GetNextPacketId());
                                    rc = WriteToNetworkBuffer(p_network_connection, p_subscribe_packet->ToString());
                                }
                            }

                            if (nullptr != p_client_state_->resubscribe_handler_ptr_) {
                                p_client_state_->resubscribe_handler_ptr_(p_connect_packet->GetClientID(),
                                                                        p_client_state_->p_resubscribe_app_handler_data_,
                                                                        rc);
                            }
                        }
                        /**
                         * NOTE :The resubscribe response can be NETWORK_DISCONNECTED_ERROR as the network might have
                         * disconnected again after the reconnect was successful.
                         */
                        if (ResponseCode::NETWORK_DISCONNECTED_ERROR != rc) {
                            p_client_state_->SetAutoReconnectRequired(false);
                        } else {
                            p_client_state_->PerformAction(ActionType::DISCONNECT,
                                                           DisconnectPacket::Create(),
                                                           p_client_state_->GetMqttCommandTimeout());
                            p_client_state_->SetAutoReconnectRequired(true);
                        }
                        continue;
                    }

                    p_client_state_->setDisconnectCallbackPending(false);
                    AWS_LOG_ERROR(KEEPALIVE_LOG_TAG, "Reconnect failed. %s", ResponseHelper::ToString(rc).c_str());

                    AWS_LOG_INFO(KEEPALIVE_LOG_TAG,
                                 "Current value of reconnect timer : %ld!!",
                                 reconnect_backoff_timer.count());
                    if (max_backoff_value > reconnect_backoff_timer) {
                        reconnect_backoff_timer += reconnect_backoff_timer;
                    }

                    AWS_LOG_INFO(KEEPALIVE_LOG_TAG,
                                 "Updated value of reconnect timer : %ld!!",
                                 reconnect_backoff_timer.count());
                    std::this_thread::sleep_for(reconnect_backoff_timer);
                    continue;
                } else if (p_client_state_->IsAutoReconnectRequired()) {
                    if (p_client_state_->isDisconnectCallbackPending()) {
                        std::shared_ptr<ConnectPacket> p_connect_packet =
                            std::dynamic_pointer_cast<ConnectPacket>(p_client_state_->GetAutoReconnectData());

                        if (nullptr != p_client_state_->disconnect_handler_ptr_ && nullptr != p_connect_packet) {
                            p_client_state_->disconnect_handler_ptr_(p_connect_packet->GetClientID(),
                                                                   p_client_state_->p_disconnect_app_handler_data_);
                        }

                        p_client_state_->setDisconnectCallbackPending(false);
                    }
                }

                if (std::chrono::system_clock::now() > next) {
                    if (p_client_state_->IsPingreqPending()) {
                        if (p_client_state_->IsConnected()) {
                            rc = p_client_state_->PerformAction(ActionType::DISCONNECT,
                                                                DisconnectPacket::Create(),
                                                                p_client_state_->GetMqttCommandTimeout());
                            if (ResponseCode::SUCCESS != rc && ResponseCode::NETWORK_DISCONNECTED_ERROR != rc) {
                                AWS_LOG_ERROR(KEEPALIVE_LOG_TAG,
                                              "Network Disconnect attempt returned unhandled error. \n%s",
                                              ResponseHelper::ToString(rc).c_str());
                            }
                        }
                        p_client_state_->SetAutoReconnectRequired(true);
                        continue;
                    } else if (p_client_state_->IsConnected()) {
                        rc = WriteToNetworkBuffer(p_network_connection, p_pingreq_packet->ToString());

                        if (ResponseCode::SUCCESS != rc) {
                            AWS_LOG_ERROR(KEEPALIVE_LOG_TAG,
                                          "Writing PingReq to Network Failed. \n%s. \nDisconnecting!",
                                          ResponseHelper::ToString(rc).c_str());
                            rc = p_client_state_->PerformAction(ActionType::DISCONNECT,
                                                                DisconnectPacket::Create(),
                                                                p_client_state_->GetMqttCommandTimeout());
                            if (ResponseCode::SUCCESS != rc) {
                                AWS_LOG_ERROR(KEEPALIVE_LOG_TAG,
                                              "Network Disconnect attempt returned unhandled error. \n%s",
                                              ResponseHelper::ToString(rc).c_str());
                            }
                            p_client_state_->SetAutoReconnectRequired(true);
                            continue;
                        }

                        p_client_state_->SetPingreqPending(true);
                        next = std::chrono::system_clock::now() + std::chrono::seconds(keep_alive_interval);
                    }
                }
                std::this_thread::sleep_for(thread_sleep_duration);
            } while (_p_thread_continue_);

            return rc;
        }