ResponseCode SubscribeActionAsync::PerformAction()

in src/mqtt/Subscribe.cpp [222:292]


        ResponseCode SubscribeActionAsync::PerformAction(std::shared_ptr<NetworkConnection> p_network_connection,
                                                         std::shared_ptr<ActionData> p_action_data) {
            if (nullptr == p_network_connection) {
                return ResponseCode::NULL_VALUE_ERROR;
            }

            ResponseCode rc = ResponseCode::SUCCESS;
            bool is_ack_registered = false;
            std::shared_ptr<SubscribePacket>
                p_subscribe_packet = std::dynamic_pointer_cast<SubscribePacket>(p_action_data);
            if (nullptr == p_subscribe_packet) {
                return ResponseCode::NULL_VALUE_ERROR;
            }

            uint16_t packet_id = p_subscribe_packet->GetPacketId();
            if (nullptr != p_subscribe_packet->p_async_ack_handler_) {
                rc = p_client_state_->RegisterPendingAck(packet_id, p_subscribe_packet->p_async_ack_handler_);
                if (ResponseCode::SUCCESS != rc) {
                    AWS_LOG_ERROR(SUBSCRIBE_ACTION_LOG_TAG,
                                  "Registering Ack Handler for Connect Action failed. %s",
                                  ResponseHelper::ToString(rc).c_str());
                } else {
                    is_ack_registered = true;
                }
            }

            // Read running in separate thread, Insert before sending request to avoid situations where response arrives early
            util::Vector<std::shared_ptr<Subscription>>::iterator itr = p_subscribe_packet->subscription_list_.begin();
            while (itr != p_subscribe_packet->subscription_list_.end()) {
                util::String topic_name = (*itr)->GetTopicName()->ToStdString();
                auto existing_itr = p_client_state_->subscription_map_.find(topic_name);
                if (p_client_state_->subscription_map_.end() != existing_itr) {
                    if (existing_itr->second->IsActive()) {
                        itr = p_subscribe_packet->subscription_list_.erase(itr);
                        if (is_ack_registered) {
                            p_client_state_->DeletePendingAck(packet_id);
                        }
                        // TODO: This needs to be reworked
                        continue;
                    } else {
                        p_client_state_->subscription_map_.erase(existing_itr);
                        p_client_state_->subscription_map_.insert(std::make_pair(topic_name, (*itr)));
                    }
                } else {
                    p_client_state_->subscription_map_.insert(std::make_pair(topic_name, (*itr)));
                }

                itr++;
            }

            const util::String packet_data = p_subscribe_packet->ToString();
            if (p_subscribe_packet->subscription_list_.size() > 0) {
                rc = WriteToNetworkBuffer(p_network_connection, packet_data);
            }
            if (ResponseCode::SUCCESS != rc) {
                AWS_LOG_ERROR(SUBSCRIBE_ACTION_LOG_TAG, "Subscribe Write to Network Failed. %s",
                              ResponseHelper::ToString(rc).c_str());
                // Remove acks
                for (itr = p_subscribe_packet->subscription_list_.begin();
                     itr < p_subscribe_packet->subscription_list_.end(); ++itr) {
                    util::String topic_name = (*itr)->GetTopicName()->ToStdString();
                    p_client_state_->subscription_map_.erase(topic_name);
                }
                if (is_ack_registered) {
                    p_client_state_->DeletePendingAck(packet_id);
                }
                return rc;
            }

            return ResponseCode::SUCCESS;
        }