in src/mqtt/Connect.cpp [410:598]
ResponseCode KeepaliveActionRunner::PerformAction(std::shared_ptr<NetworkConnection> p_network_connection,
std::shared_ptr<ActionData> p_action_data) {
// TODO : This action needs cleanup in the future
std::atomic_bool &_p_thread_continue_ = *p_thread_continue_;
std::chrono::milliseconds thread_sleep_duration(DEFAULT_CORE_THREAD_SLEEP_DURATION_MS);
// Wait for first connect, keep alive data will not be available until then
while (_p_thread_continue_ && !p_client_state_->IsConnected()) {
std::this_thread::sleep_for(thread_sleep_duration);
}
std::shared_ptr<PingreqPacket> p_pingreq_packet = PingreqPacket::Create();
if (nullptr == p_pingreq_packet) {
return ResponseCode::NULL_VALUE_ERROR;
}
ResponseCode rc = ResponseCode::SUCCESS;
p_client_state_->setDisconnectCallbackPending(true);
std::chrono::seconds reconnect_backoff_timer = p_client_state_->GetMinReconnectBackoffTimeout();
std::chrono::seconds max_backoff_value = p_client_state_->GetMaxReconnectBackoffTimeout();
std::chrono::seconds keep_alive_interval = p_client_state_->GetKeepAliveTimeout() / 2;
auto next = std::chrono::system_clock::now() + std::chrono::seconds(keep_alive_interval);
do {
if (p_client_state_->IsAutoReconnectEnabled() && p_client_state_->IsAutoReconnectRequired()) {
p_client_state_->SetPingreqPending(false);
if (p_client_state_->isDisconnectCallbackPending()) {
std::shared_ptr<ConnectPacket> p_connect_packet =
std::dynamic_pointer_cast<ConnectPacket>(p_client_state_->GetAutoReconnectData());
/**
* NOTE: All callbacks used by the keepalive should be non-blocking
*/
if (nullptr != p_client_state_->disconnect_handler_ptr_ && nullptr != p_connect_packet) {
p_client_state_->disconnect_handler_ptr_(p_connect_packet->GetClientID(),
p_client_state_->p_disconnect_app_handler_data_);
}
reconnect_backoff_timer = p_client_state_->GetMinReconnectBackoffTimeout();
max_backoff_value = p_client_state_->GetMaxReconnectBackoffTimeout();
AWS_LOG_INFO(KEEPALIVE_LOG_TAG,
"Initial value of reconnect timer : %ld!!",
reconnect_backoff_timer.count());
AWS_LOG_INFO(KEEPALIVE_LOG_TAG, "Max backoff value : %ld!!", max_backoff_value.count());
}
AWS_LOG_INFO(KEEPALIVE_LOG_TAG, "Attempting Reconnect");
std::shared_ptr<ConnectPacket> p_connect_packet =
std::dynamic_pointer_cast<ConnectPacket>(p_client_state_->GetAutoReconnectData());
rc = p_client_state_->PerformAction(ActionType::CONNECT,
p_connect_packet,
p_client_state_->GetMqttCommandTimeout());
if (nullptr != p_client_state_->reconnect_handler_ptr_) {
p_client_state_->reconnect_handler_ptr_(p_connect_packet->GetClientID(),
p_client_state_->p_reconnect_app_handler_data_,
rc);
}
if (ResponseCode::MQTT_CONNACK_CONNECTION_ACCEPTED == rc) {
p_client_state_->SetAutoReconnectRequired(false);
// if no subscriptions, skip resubscribe
if (!p_client_state_->subscription_map_.empty()) {
util::Vector<std::shared_ptr<mqtt::Subscription>> topic_vector;
util::Map<util::String, std::shared_ptr<Subscription>>::const_iterator
itr = p_client_state_->subscription_map_.begin();
while (itr != p_client_state_->subscription_map_.end()) {
topic_vector.push_back(itr->second);
itr++;
if (topic_vector.size() == MAX_TOPICS_IN_ONE_SUBSCRIBE_PACKET) {
std::shared_ptr<mqtt::SubscribePacket>
p_subscribe_packet = mqtt::SubscribePacket::Create(topic_vector);
p_subscribe_packet->SetPacketId(p_client_state_->GetNextPacketId());
rc = WriteToNetworkBuffer(p_network_connection, p_subscribe_packet->ToString());
if (ResponseCode::SUCCESS != rc) {
AWS_LOG_ERROR(KEEPALIVE_LOG_TAG,
"Resubscribe attempt returned unhandled error. \n%s",
ResponseHelper::ToString(rc).c_str());
break;
}
topic_vector.clear();
}
}
if (ResponseCode::SUCCESS == rc || ResponseCode::MQTT_CONNACK_CONNECTION_ACCEPTED == rc) {
if (!topic_vector.empty()) {
std::shared_ptr<mqtt::SubscribePacket>
p_subscribe_packet = mqtt::SubscribePacket::Create(topic_vector);
p_subscribe_packet->SetPacketId(p_client_state_->GetNextPacketId());
rc = WriteToNetworkBuffer(p_network_connection, p_subscribe_packet->ToString());
}
}
if (nullptr != p_client_state_->resubscribe_handler_ptr_) {
p_client_state_->resubscribe_handler_ptr_(p_connect_packet->GetClientID(),
p_client_state_->p_resubscribe_app_handler_data_,
rc);
}
}
/**
* NOTE :The resubscribe response can be NETWORK_DISCONNECTED_ERROR as the network might have
* disconnected again after the reconnect was successful.
*/
if (ResponseCode::NETWORK_DISCONNECTED_ERROR != rc) {
p_client_state_->SetAutoReconnectRequired(false);
} else {
p_client_state_->PerformAction(ActionType::DISCONNECT,
DisconnectPacket::Create(),
p_client_state_->GetMqttCommandTimeout());
p_client_state_->SetAutoReconnectRequired(true);
}
continue;
}
p_client_state_->setDisconnectCallbackPending(false);
AWS_LOG_ERROR(KEEPALIVE_LOG_TAG, "Reconnect failed. %s", ResponseHelper::ToString(rc).c_str());
AWS_LOG_INFO(KEEPALIVE_LOG_TAG,
"Current value of reconnect timer : %ld!!",
reconnect_backoff_timer.count());
if (max_backoff_value > reconnect_backoff_timer) {
reconnect_backoff_timer += reconnect_backoff_timer;
}
AWS_LOG_INFO(KEEPALIVE_LOG_TAG,
"Updated value of reconnect timer : %ld!!",
reconnect_backoff_timer.count());
std::this_thread::sleep_for(reconnect_backoff_timer);
continue;
} else if (p_client_state_->IsAutoReconnectRequired()) {
if (p_client_state_->isDisconnectCallbackPending()) {
std::shared_ptr<ConnectPacket> p_connect_packet =
std::dynamic_pointer_cast<ConnectPacket>(p_client_state_->GetAutoReconnectData());
if (nullptr != p_client_state_->disconnect_handler_ptr_ && nullptr != p_connect_packet) {
p_client_state_->disconnect_handler_ptr_(p_connect_packet->GetClientID(),
p_client_state_->p_disconnect_app_handler_data_);
}
p_client_state_->setDisconnectCallbackPending(false);
}
}
if (std::chrono::system_clock::now() > next) {
if (p_client_state_->IsPingreqPending()) {
if (p_client_state_->IsConnected()) {
rc = p_client_state_->PerformAction(ActionType::DISCONNECT,
DisconnectPacket::Create(),
p_client_state_->GetMqttCommandTimeout());
if (ResponseCode::SUCCESS != rc && ResponseCode::NETWORK_DISCONNECTED_ERROR != rc) {
AWS_LOG_ERROR(KEEPALIVE_LOG_TAG,
"Network Disconnect attempt returned unhandled error. \n%s",
ResponseHelper::ToString(rc).c_str());
}
}
p_client_state_->SetAutoReconnectRequired(true);
continue;
} else if (p_client_state_->IsConnected()) {
rc = WriteToNetworkBuffer(p_network_connection, p_pingreq_packet->ToString());
if (ResponseCode::SUCCESS != rc) {
AWS_LOG_ERROR(KEEPALIVE_LOG_TAG,
"Writing PingReq to Network Failed. \n%s. \nDisconnecting!",
ResponseHelper::ToString(rc).c_str());
rc = p_client_state_->PerformAction(ActionType::DISCONNECT,
DisconnectPacket::Create(),
p_client_state_->GetMqttCommandTimeout());
if (ResponseCode::SUCCESS != rc) {
AWS_LOG_ERROR(KEEPALIVE_LOG_TAG,
"Network Disconnect attempt returned unhandled error. \n%s",
ResponseHelper::ToString(rc).c_str());
}
p_client_state_->SetAutoReconnectRequired(true);
continue;
}
p_client_state_->SetPingreqPending(true);
next = std::chrono::system_clock::now() + std::chrono::seconds(keep_alive_interval);
}
}
std::this_thread::sleep_for(thread_sleep_duration);
} while (_p_thread_continue_);
return rc;
}