in source/mqtt/MqttClient.cpp [468:519]
uint16_t MqttConnection::Subscribe(
const char *topicFilter,
QOS qos,
OnMessageReceivedHandler &&onMessage,
OnSubAckHandler &&onSubAck) noexcept
{
auto pubCallbackData = Crt::New<PubCallbackData>(m_owningClient->allocator);
if (!pubCallbackData)
{
return 0;
}
pubCallbackData->connection = this;
pubCallbackData->onMessageReceived = std::move(onMessage);
pubCallbackData->allocator = m_owningClient->allocator;
auto subAckCallbackData = Crt::New<SubAckCallbackData>(m_owningClient->allocator);
if (!subAckCallbackData)
{
Crt::Delete(pubCallbackData, m_owningClient->allocator);
return 0;
}
subAckCallbackData->connection = this;
subAckCallbackData->allocator = m_owningClient->allocator;
subAckCallbackData->onSubAck = std::move(onSubAck);
subAckCallbackData->topic = nullptr;
subAckCallbackData->allocator = m_owningClient->allocator;
ByteBuf topicFilterBuf = aws_byte_buf_from_c_str(topicFilter);
ByteCursor topicFilterCur = aws_byte_cursor_from_buf(&topicFilterBuf);
uint16_t packetId = aws_mqtt_client_connection_subscribe(
m_underlyingConnection,
&topicFilterCur,
qos,
s_onPublish,
pubCallbackData,
s_cleanUpOnPublishData,
s_onSubAck,
subAckCallbackData);
if (!packetId)
{
Crt::Delete(pubCallbackData, pubCallbackData->allocator);
Crt::Delete(subAckCallbackData, subAckCallbackData->allocator);
}
return packetId;
}