in inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/src/tubemq_tdmsg.cc [214:333]
bool TubeMQTDMsg::parseDefaultMsg(const char* data, uint32_t data_length, int32_t start_pos,
string& err_info) {
// #lizard forgives
int32_t pos1 = start_pos;
uint32_t remain = data_length;
for (uint32_t i = 0; i < attr_count_; i++) {
uint32_t origAttrLen = 0;
char* origAttrData = NULL;
string commAttr;
uint32_t dataCnt = 1;
uint32_t dataLen = 0;
char compress = 0;
if (remain <= 2) {
if (i == 0) {
err_info = "Parse message error: invalid databody length length";
return false;
} else {
break;
}
}
if (!getDatantohsInt(data, pos1, remain, origAttrLen, err_info)) {
err_info += " for attr length parameter";
return false;
}
if ((origAttrLen <= 0) || (origAttrLen > remain)) {
err_info = "Parse message error: invalid attr length";
return false;
}
origAttrData = static_cast<char*>(malloc(origAttrLen + 1));
if (origAttrData == NULL) {
err_info = "Parse message error: malloc buffer for default attr value failure!";
return false;
}
memset(origAttrData, 0, origAttrLen + 1);
memcpy(origAttrData, data + pos1, origAttrLen);
pos1 += origAttrLen;
remain -= origAttrLen;
commAttr = origAttrData;
free(origAttrData);
origAttrData = NULL;
if (version_ == 2) {
if (!getDatantohlInt(data, pos1, remain, dataCnt, err_info)) {
err_info += " for data count parameter";
return false;
}
}
if (!getDatantohlInt(data, pos1, remain, dataLen, err_info)) {
err_info += " for data len parameter";
return false;
}
if ((dataLen <= 0) || (dataLen > remain)) {
err_info = "Parse message error: invalid data length";
return false;
}
if (!getDataChar(data, pos1, remain, compress, err_info)) {
return false;
}
size_t uncompressDataLen = 0;
char* uncompressData = NULL;
if (compress != 0) {
if (snappy_uncompressed_length(data + pos1, dataLen - 1, &uncompressDataLen) != SNAPPY_OK) {
err_info = "Parse message error: snappy uncompressed default compress's length failure!";
return false;
}
uncompressData = static_cast<char*>(malloc(uncompressDataLen));
if (uncompressData == NULL) {
err_info = "Parse message error: malloc buffer for default compress's data failure!";
return false;
}
if (snappy_uncompress(data + pos1, dataLen - 1, uncompressData, &uncompressDataLen) !=
SNAPPY_OK) {
free(uncompressData);
uncompressData = NULL;
err_info = "Parse message error: snappy uncompressed default compress's data failure!";
return false;
}
} else {
uncompressDataLen = dataLen - 1;
uncompressData = static_cast<char*>(malloc(uncompressDataLen));
if (uncompressData == NULL) {
err_info = "Parse message error: malloc buffer for default's data failure!";
return false;
}
memcpy(uncompressData, data + pos1, dataLen - 1);
}
pos1 += dataLen - 1;
remain -= dataLen - 1;
int32_t itemPos = 0;
// unsigned int totalItemDataLen = 0;
uint32_t itemRemain = uncompressDataLen;
while (itemRemain > 0) {
uint32_t singleMsgLen = 0;
// unsigned int dataMsgLen = 0;
// char *singleData = NULL;
if (!getDatantohlInt(uncompressData, itemPos, itemRemain, singleMsgLen, err_info)) {
free(uncompressData);
uncompressData = NULL;
err_info += " for default item's msgLength parameter";
return false;
}
if (singleMsgLen <= 0) {
continue;
}
if (singleMsgLen > itemRemain) {
free(uncompressData);
uncompressData = NULL;
err_info = "Parse message error: invalid default attr's msg Length";
return false;
}
DataItem tmpDataItem(singleMsgLen, uncompressData + itemPos);
addDataItem2Map(commAttr, tmpDataItem);
itemPos += singleMsgLen;
itemRemain -= singleMsgLen;
}
free(uncompressData);
uncompressData = NULL;
}
is_parsed_ = true;
return true;
}