in src/mqtt/Subscribe.cpp [222:292]
ResponseCode SubscribeActionAsync::PerformAction(std::shared_ptr<NetworkConnection> p_network_connection,
std::shared_ptr<ActionData> p_action_data) {
if (nullptr == p_network_connection) {
return ResponseCode::NULL_VALUE_ERROR;
}
ResponseCode rc = ResponseCode::SUCCESS;
bool is_ack_registered = false;
std::shared_ptr<SubscribePacket>
p_subscribe_packet = std::dynamic_pointer_cast<SubscribePacket>(p_action_data);
if (nullptr == p_subscribe_packet) {
return ResponseCode::NULL_VALUE_ERROR;
}
uint16_t packet_id = p_subscribe_packet->GetPacketId();
if (nullptr != p_subscribe_packet->p_async_ack_handler_) {
rc = p_client_state_->RegisterPendingAck(packet_id, p_subscribe_packet->p_async_ack_handler_);
if (ResponseCode::SUCCESS != rc) {
AWS_LOG_ERROR(SUBSCRIBE_ACTION_LOG_TAG,
"Registering Ack Handler for Connect Action failed. %s",
ResponseHelper::ToString(rc).c_str());
} else {
is_ack_registered = true;
}
}
// Read running in separate thread, Insert before sending request to avoid situations where response arrives early
util::Vector<std::shared_ptr<Subscription>>::iterator itr = p_subscribe_packet->subscription_list_.begin();
while (itr != p_subscribe_packet->subscription_list_.end()) {
util::String topic_name = (*itr)->GetTopicName()->ToStdString();
auto existing_itr = p_client_state_->subscription_map_.find(topic_name);
if (p_client_state_->subscription_map_.end() != existing_itr) {
if (existing_itr->second->IsActive()) {
itr = p_subscribe_packet->subscription_list_.erase(itr);
if (is_ack_registered) {
p_client_state_->DeletePendingAck(packet_id);
}
// TODO: This needs to be reworked
continue;
} else {
p_client_state_->subscription_map_.erase(existing_itr);
p_client_state_->subscription_map_.insert(std::make_pair(topic_name, (*itr)));
}
} else {
p_client_state_->subscription_map_.insert(std::make_pair(topic_name, (*itr)));
}
itr++;
}
const util::String packet_data = p_subscribe_packet->ToString();
if (p_subscribe_packet->subscription_list_.size() > 0) {
rc = WriteToNetworkBuffer(p_network_connection, packet_data);
}
if (ResponseCode::SUCCESS != rc) {
AWS_LOG_ERROR(SUBSCRIBE_ACTION_LOG_TAG, "Subscribe Write to Network Failed. %s",
ResponseHelper::ToString(rc).c_str());
// Remove acks
for (itr = p_subscribe_packet->subscription_list_.begin();
itr < p_subscribe_packet->subscription_list_.end(); ++itr) {
util::String topic_name = (*itr)->GetTopicName()->ToStdString();
p_client_state_->subscription_map_.erase(topic_name);
}
if (is_ack_registered) {
p_client_state_->DeletePendingAck(packet_id);
}
return rc;
}
return ResponseCode::SUCCESS;
}