in inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/src/tubemq_tdmsg.cc [335:507]
bool TubeMQTDMsg::parseMixAttrMsg(const char* data, uint32_t data_length, int32_t start_pos,
string& err_info) {
// #lizard forgives
int32_t pos1 = start_pos;
uint32_t remain = data_length;
for (uint32_t i = 0; i < attr_count_; i++) {
uint32_t origAttrLen = 0;
char* origAttrData = NULL;
string commAttr;
uint32_t bodyDataLen = 0;
char compress = 0;
if (remain <= 2) {
if (i == 0) {
err_info = "Parse message error: invalid databody length length";
return false;
} else {
break;
}
}
if (!getDatantohsInt(data, pos1, remain, origAttrLen, err_info)) {
err_info += " for attr length parameter";
return false;
}
if ((origAttrLen <= 0) || (origAttrLen > remain)) {
err_info = "Parse message error: invalid attr length";
return false;
}
origAttrData = static_cast<char*>(malloc(origAttrLen + 1));
if (origAttrData == NULL) {
err_info = "Parse message error: malloc buffer for v3 attr value failure!";
return false;
}
memset(origAttrData, 0, origAttrLen + 1);
memcpy(origAttrData, data + pos1, origAttrLen);
pos1 += origAttrLen;
remain -= origAttrLen;
commAttr = origAttrData;
free(origAttrData);
origAttrData = NULL;
if (!getDatantohlInt(data, pos1, remain, bodyDataLen, err_info)) {
err_info += " for body data len parameter";
return false;
}
if ((bodyDataLen <= 0) || (bodyDataLen > remain)) {
err_info = "Parse message error: invalid data length";
return false;
}
if (!getDataChar(data, pos1, remain, compress, err_info)) {
err_info += " for attr compress parameter";
return false;
}
size_t uncompressDataLen = 0;
char* uncompressData = NULL;
if (compress != 0) {
if (snappy_uncompressed_length(data + pos1, bodyDataLen - 1, &uncompressDataLen) !=
SNAPPY_OK) {
err_info = "Parse message error: snappy uncompressed v3 compress's length failure!";
return false;
}
uncompressData = static_cast<char*>(malloc(uncompressDataLen));
if (uncompressData == NULL) {
err_info = "Parse message error: malloc buffer for v3 compress's data failure!";
return false;
}
if (snappy_uncompress(data + pos1, bodyDataLen - 1, uncompressData, &uncompressDataLen) !=
SNAPPY_OK) {
free(uncompressData);
uncompressData = NULL;
err_info = "Parse message error: snappy uncompressed v3 compress's data failure!";
return false;
}
} else {
uncompressDataLen = bodyDataLen - 1;
uncompressData = static_cast<char*>(malloc(uncompressDataLen));
if (uncompressData == NULL) {
err_info = "Parse message error: malloc buffer for v3 compress's data failure!";
return false;
}
memcpy(uncompressData, data + pos1, bodyDataLen - 1);
}
pos1 += bodyDataLen - 1;
remain -= bodyDataLen - 1;
int32_t itemPos = 0;
uint32_t totalItemDataLen = 0;
uint32_t itemRemain = uncompressDataLen;
if (!getDatantohlInt(uncompressData, itemPos, itemRemain, totalItemDataLen, err_info)) {
free(uncompressData);
uncompressData = NULL;
err_info += " for v3 item's msgLength parameter";
return false;
}
if ((totalItemDataLen <= 0) || (totalItemDataLen > itemRemain)) {
free(uncompressData);
uncompressData = NULL;
err_info = "Parse message error: invalid v3 attr's msg Length";
return false;
}
while (itemRemain > 0) {
uint32_t singleMsgLen = 0;
char* singleData = NULL;
uint32_t singleAttrLen = 0;
char* singleAttr = NULL;
string finalAttr;
if (!getDatantohlInt(uncompressData, itemPos, itemRemain, singleMsgLen, err_info)) {
free(uncompressData);
uncompressData = NULL;
err_info += " for v3 item's msgLength parameter";
return false;
}
if ((singleMsgLen <= 0) || (singleMsgLen > itemRemain)) {
free(uncompressData);
uncompressData = NULL;
err_info = "Parse message error: invalid v3 attr's msg Length";
return false;
}
singleData = static_cast<char*>(malloc(singleMsgLen));
if (singleData == NULL) {
free(uncompressData);
uncompressData = NULL;
err_info = "Parse message error: malloc buffer for v3 single data failure!";
return false;
}
memcpy(singleData, uncompressData + itemPos, singleMsgLen);
itemPos += singleMsgLen;
itemRemain -= singleMsgLen;
if (itemRemain > 0) {
if (!getDatantohlInt(uncompressData, itemPos, itemRemain, singleAttrLen, err_info)) {
free(uncompressData);
free(singleData);
uncompressData = NULL;
singleData = NULL;
err_info += " for v3 attr's single length parameter";
return false;
}
if ((singleAttrLen <= 0) || (singleAttrLen > itemRemain)) {
free(uncompressData);
free(singleData);
uncompressData = NULL;
singleData = NULL;
err_info = "Parse message error: invalid v3 attr's attr Length";
return false;
}
singleAttr = static_cast<char*>(malloc(singleAttrLen + 1));
if (singleAttr == NULL) {
free(uncompressData);
free(singleData);
uncompressData = NULL;
singleData = NULL;
err_info = "Parse message error: malloc buffer for v3 single attr failure!";
return false;
}
memset(singleAttr, 0, singleAttrLen + 1);
memcpy(singleAttr, uncompressData + itemPos, singleAttrLen);
itemPos += singleAttrLen;
itemRemain -= singleAttrLen;
string strSingleAttr = singleAttr;
finalAttr = commAttr + "&" + strSingleAttr;
free(singleAttr);
singleAttr = NULL;
} else {
finalAttr = commAttr;
}
DataItem tmpDataItem(singleMsgLen, singleData);
addDataItem2Map(finalAttr, tmpDataItem);
free(singleData);
singleData = NULL;
}
free(uncompressData);
uncompressData = NULL;
}
is_parsed_ = true;
return true;
}