in source/mqtt/MqttClient.cpp [541:617]
uint16_t MqttConnection::Subscribe(
const Vector<std::pair<const char *, OnMessageReceivedHandler>> &topicFilters,
QOS qos,
OnMultiSubAckHandler &&onSubAck) noexcept
{
uint16_t packetId = 0;
auto subAckCallbackData = Crt::New<MultiSubAckCallbackData>(m_owningClient->allocator);
if (!subAckCallbackData)
{
return 0;
}
aws_array_list multiPub;
AWS_ZERO_STRUCT(multiPub);
if (aws_array_list_init_dynamic(
&multiPub, m_owningClient->allocator, topicFilters.size(), sizeof(aws_mqtt_topic_subscription)))
{
Crt::Delete(subAckCallbackData, m_owningClient->allocator);
return 0;
}
for (auto &topicFilter : topicFilters)
{
auto pubCallbackData = Crt::New<PubCallbackData>(m_owningClient->allocator);
if (!pubCallbackData)
{
goto clean_up;
}
pubCallbackData->connection = this;
pubCallbackData->onMessageReceived = topicFilter.second;
pubCallbackData->allocator = m_owningClient->allocator;
ByteBuf topicFilterBuf = aws_byte_buf_from_c_str(topicFilter.first);
ByteCursor topicFilterCur = aws_byte_cursor_from_buf(&topicFilterBuf);
aws_mqtt_topic_subscription subscription;
subscription.on_cleanup = s_cleanUpOnPublishData;
subscription.on_publish = s_onPublish;
subscription.on_publish_ud = pubCallbackData;
subscription.qos = qos;
subscription.topic = topicFilterCur;
aws_array_list_push_back(&multiPub, reinterpret_cast<const void *>(&subscription));
}
subAckCallbackData->connection = this;
subAckCallbackData->allocator = m_owningClient->allocator;
subAckCallbackData->onSubAck = std::move(onSubAck);
subAckCallbackData->topic = nullptr;
subAckCallbackData->allocator = m_owningClient->allocator;
packetId = aws_mqtt_client_connection_subscribe_multiple(
m_underlyingConnection, &multiPub, s_onMultiSubAck, subAckCallbackData);
clean_up:
if (!packetId)
{
size_t length = aws_array_list_length(&multiPub);
for (size_t i = 0; i < length; ++i)
{
aws_mqtt_topic_subscription *subscription = NULL;
aws_array_list_get_at_ptr(&multiPub, reinterpret_cast<void **>(&subscription), i);
auto pubCallbackData = reinterpret_cast<PubCallbackData *>(subscription->on_publish_ud);
Crt::Delete(pubCallbackData, m_owningClient->allocator);
}
Crt::Delete(subAckCallbackData, m_owningClient->allocator);
}
aws_array_list_clean_up(&multiPub);
return packetId;
}