uint16_t MqttConnection::Subscribe()

in source/mqtt/MqttClient.cpp [541:617]


            uint16_t MqttConnection::Subscribe(
                const Vector<std::pair<const char *, OnMessageReceivedHandler>> &topicFilters,
                QOS qos,
                OnMultiSubAckHandler &&onSubAck) noexcept
            {
                uint16_t packetId = 0;
                auto subAckCallbackData = Crt::New<MultiSubAckCallbackData>(m_owningClient->allocator);

                if (!subAckCallbackData)
                {
                    return 0;
                }

                aws_array_list multiPub;
                AWS_ZERO_STRUCT(multiPub);

                if (aws_array_list_init_dynamic(
                        &multiPub, m_owningClient->allocator, topicFilters.size(), sizeof(aws_mqtt_topic_subscription)))
                {
                    Crt::Delete(subAckCallbackData, m_owningClient->allocator);
                    return 0;
                }

                for (auto &topicFilter : topicFilters)
                {
                    auto pubCallbackData = Crt::New<PubCallbackData>(m_owningClient->allocator);

                    if (!pubCallbackData)
                    {
                        goto clean_up;
                    }

                    pubCallbackData->connection = this;
                    pubCallbackData->onMessageReceived = topicFilter.second;
                    pubCallbackData->allocator = m_owningClient->allocator;

                    ByteBuf topicFilterBuf = aws_byte_buf_from_c_str(topicFilter.first);
                    ByteCursor topicFilterCur = aws_byte_cursor_from_buf(&topicFilterBuf);

                    aws_mqtt_topic_subscription subscription;
                    subscription.on_cleanup = s_cleanUpOnPublishData;
                    subscription.on_publish = s_onPublish;
                    subscription.on_publish_ud = pubCallbackData;
                    subscription.qos = qos;
                    subscription.topic = topicFilterCur;

                    aws_array_list_push_back(&multiPub, reinterpret_cast<const void *>(&subscription));
                }

                subAckCallbackData->connection = this;
                subAckCallbackData->allocator = m_owningClient->allocator;
                subAckCallbackData->onSubAck = std::move(onSubAck);
                subAckCallbackData->topic = nullptr;
                subAckCallbackData->allocator = m_owningClient->allocator;

                packetId = aws_mqtt_client_connection_subscribe_multiple(
                    m_underlyingConnection, &multiPub, s_onMultiSubAck, subAckCallbackData);

            clean_up:
                if (!packetId)
                {
                    size_t length = aws_array_list_length(&multiPub);
                    for (size_t i = 0; i < length; ++i)
                    {
                        aws_mqtt_topic_subscription *subscription = NULL;
                        aws_array_list_get_at_ptr(&multiPub, reinterpret_cast<void **>(&subscription), i);
                        auto pubCallbackData = reinterpret_cast<PubCallbackData *>(subscription->on_publish_ud);
                        Crt::Delete(pubCallbackData, m_owningClient->allocator);
                    }

                    Crt::Delete(subAckCallbackData, m_owningClient->allocator);
                }

                aws_array_list_clean_up(&multiPub);

                return packetId;
            }