in source/ota_mqtt.c [1074:1174]
OtaErr_t requestFileBlock_Mqtt( OtaAgentContext_t * pAgentCtx )
{
OtaErr_t result = OtaErrRequestFileBlockFailed;
OtaMqttStatus_t mqttStatus = OtaMqttSuccess;
size_t msgSizeFromStream = 0;
uint32_t blockSize = OTA_FILE_BLOCK_SIZE;
uint32_t numBlocks = 0;
uint32_t bitmapLen = 0;
uint32_t msgSizeToPublish = 0;
uint32_t topicLen = 0;
bool cborEncodeRet = false;
char pMsg[ OTA_REQUEST_MSG_MAX_SIZE ];
/* This buffer is used to store the generated MQTT topic. The static size
* is calculated from the template and the corresponding parameters. */
char pTopicBuffer[ TOPIC_GET_STREAM_BUFFER_SIZE ];
const OtaFileContext_t * pFileContext = NULL;
/* NULL-terminated list of topic string parts. */
const char * pTopicParts[] =
{
MQTT_API_THINGS,
NULL, /* Thing Name not available at compile time, initialized below. */
MQTT_API_STREAMS,
NULL, /* Stream Name not available at compile time, initialized below. */
MQTT_API_GET_CBOR,
NULL
};
assert( pAgentCtx != NULL );
/* Suppress warnings about unused variables. */
( void ) pOtaGetStreamTopicTemplate;
/* Get the current file context. */
pFileContext = &( pAgentCtx->fileContext );
pTopicParts[ 1 ] = ( const char * ) pAgentCtx->pThingName;
pTopicParts[ 3 ] = ( const char * ) pFileContext->pStreamName;
/* Reset number of blocks requested. */
pAgentCtx->numOfBlocksToReceive = otaconfigMAX_NUM_BLOCKS_REQUEST;
numBlocks = ( pFileContext->fileSize + ( OTA_FILE_BLOCK_SIZE - 1U ) ) >> otaconfigLOG2_FILE_BLOCK_SIZE;
bitmapLen = ( numBlocks + ( BITS_PER_BYTE - 1U ) ) >> LOG2_BITS_PER_BYTE;
cborEncodeRet = OTA_CBOR_Encode_GetStreamRequestMessage( ( uint8_t * ) pMsg,
sizeof( pMsg ),
&msgSizeFromStream,
OTA_CLIENT_TOKEN,
( int32_t ) pFileContext->serverFileID,
( int32_t ) blockSize,
0,
pFileContext->pRxBlockBitmap,
bitmapLen,
( int32_t ) otaconfigMAX_NUM_BLOCKS_REQUEST );
if( cborEncodeRet == true )
{
msgSizeToPublish = ( uint32_t ) msgSizeFromStream;
/* Try to build the dynamic data REQUEST topic to publish to. */
topicLen = ( uint32_t ) stringBuilder(
pTopicBuffer,
sizeof( pTopicBuffer ),
pTopicParts );
/* The buffer is static and the size is calculated to fit. */
assert( ( topicLen > 0U ) && ( topicLen < sizeof( pTopicBuffer ) ) );
mqttStatus = pAgentCtx->pOtaInterface->mqtt.publish( pTopicBuffer,
( uint16_t ) topicLen,
&pMsg[ 0 ],
msgSizeToPublish,
0 );
if( mqttStatus == OtaMqttSuccess )
{
LogInfo( ( "Published to MQTT topic to request the next block: "
"topic=%s",
pTopicBuffer ) );
result = OtaErrNone;
}
else
{
LogError( ( "Failed to publish MQTT message: "
"publish returned error: "
"OtaMqttStatus_t=%s",
OTA_MQTT_strerror( mqttStatus ) ) );
}
}
else
{
result = OtaErrFailedToEncodeCbor;
LogError( ( "Failed to CBOR encode stream request message: "
"OTA_CBOR_Encode_GetStreamRequestMessage returned error." ) );
}
return result;
}