in inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/src/tubemq_tdmsg.cc [509:786]
bool TubeMQTDMsg::parseBinMsg(const char* data, uint32_t data_length, int32_t start_pos,
string& err_info) {
// #lizard forgives
uint32_t totalLen = 0;
char msgType = 0;
uint32_t bidNum = 0;
uint32_t tidNum = 0;
uint32_t extField = 0;
uint32_t dataTime = 0;
uint32_t msgCnt = 0;
uint32_t uniqueId = 0;
uint32_t bodyLen = 0;
uint32_t attrLen = 0;
uint32_t msgMagic = 0;
size_t realBodyLen = 0;
char* bodyData = NULL;
int32_t pos1 = start_pos;
uint32_t remain = data_length;
if (!getDatantohlInt(data, pos1, remain, totalLen, err_info)) {
err_info += " for data v4 totalLen parameter";
return false;
}
if (!getDataChar(data, pos1, remain, msgType, err_info)) {
err_info += " for data v4 msgType parameter";
return false;
}
if (!getDatantohsInt(data, pos1, remain, bidNum, err_info)) {
err_info += " for v4 bidNum parameter";
return false;
}
if (!getDatantohsInt(data, pos1, remain, tidNum, err_info)) {
err_info += " for v4 tidNum parameter";
return false;
}
if (!getDatantohsInt(data, pos1, remain, extField, err_info)) {
err_info += " for v4 extField parameter";
return false;
}
if (!getDatantohlInt(data, pos1, remain, dataTime, err_info)) {
err_info += " for data v4 dataTime parameter";
return false;
}
create_time_ = dataTime;
create_time_ *= 1000;
if (!getDatantohsInt(data, pos1, remain, msgCnt, err_info)) {
err_info += " for v4 cnt parameter";
return false;
}
if (!getDatantohlInt(data, pos1, remain, uniqueId, err_info)) {
err_info += " for data v4 uniq parameter";
return false;
}
if (!getDatantohlInt(data, pos1, remain, bodyLen, err_info)) {
err_info += " for data v4 bodyLen parameter";
return false;
}
if (remain < bodyLen + 2) {
err_info += "Parse message error: no enough data length for v4 attr_len data";
return false;
}
int32_t attrLenPos = pos1 + bodyLen;
uint32_t attrLenRemain = remain - bodyLen;
if (!getDatantohsInt(data, attrLenPos, attrLenRemain, attrLen, err_info)) {
err_info += " for data v4 attrLen parameter";
return false;
}
if (remain < attrLen + 2) {
err_info += "Parse message error: no enough data length for v4 msgMagic data";
return false;
}
int32_t msgMagicPos = attrLenPos + attrLen;
uint32_t msgMagicRemain = remain - attrLen;
if (!getDatantohsInt(data, msgMagicPos, msgMagicRemain, msgMagic, err_info)) {
err_info += " for v4 msgMagic parameter";
return false;
}
msgMagic &= 0xFFFF;
// get attr data
bool result = false;
map<string, string> commonAttrMap;
if (attrLen != 0) {
char* commonAttr = static_cast<char*>(malloc(attrLen + 1));
if (commonAttr == NULL) {
err_info = "Parse message error: malloc buffer for v3 common attr failure!";
return false;
}
memset(commonAttr, 0, attrLen + 1);
memcpy(commonAttr, data + attrLenPos, attrLen);
string strAttr = commonAttr;
Utils::Split(strAttr, commonAttrMap, delimiter::kDelimiterAnd, delimiter::kDelimiterEqual);
if (commonAttrMap.empty()) {
free(commonAttr);
commonAttr = NULL;
err_info += " for v4 common attribute parameter";
return result;
}
free(commonAttr);
commonAttr = NULL;
}
// get body data
switch ((msgType & 0xE0) >> 5) {
case 1: {
if (snappy_uncompressed_length(data + pos1, bodyLen, &realBodyLen) != SNAPPY_OK) {
err_info = "Parse message error: snappy uncompressed v4 body's length failure!";
return false;
}
bodyData = static_cast<char*>(malloc(realBodyLen));
if (bodyData == NULL) {
err_info = "Parse message error: malloc buffer for v4 body's data failure!";
return false;
}
if (snappy_uncompress(data + pos1, bodyLen, bodyData, &realBodyLen) != SNAPPY_OK) {
free(bodyData);
bodyData = NULL;
err_info = "Parse message error: snappy uncompressed v4 body's data failure!";
return false;
}
break;
}
case 0:
default: {
realBodyLen = bodyLen;
bodyData = static_cast<char*>(malloc(realBodyLen));
if (bodyData == NULL) {
err_info = "Parse message error: malloc buffer for v4 body's data failure!";
return false;
}
memcpy(bodyData, data + pos1, realBodyLen);
break;
}
}
// build attr
commonAttrMap["dt"] = Utils::Long2str(create_time_);
if ((extField & 0x4) == 0x0) {
commonAttrMap["bid"] = Utils::Int2str(bidNum);
commonAttrMap["tid"] = Utils::Int2str(tidNum);
}
commonAttrMap["cnt"] = "1";
int msgCount = msgCnt;
// build data
if ((extField & 0x1) == 0x0) {
int32_t bodyPos = 0;
uint32_t bodyRemain = realBodyLen;
string outKeyValStr;
Utils::Join(commonAttrMap, outKeyValStr, delimiter::kDelimiterAnd, delimiter::kDelimiterEqual);
while ((bodyRemain > 0) && (msgCount-- > 0)) {
uint32_t singleMsgLen = 0;
if (!getDatantohlInt(bodyData, bodyPos, bodyRemain, singleMsgLen, err_info)) {
free(bodyData);
bodyData = NULL;
err_info += " for v4 attr's msgLength parameter";
return false;
}
if (singleMsgLen <= 0) {
continue;
}
if (singleMsgLen > bodyRemain) {
free(bodyData);
bodyData = NULL;
err_info = "Parse message error: invalid v4 attr's msg Length 1";
return false;
}
char* singleData = static_cast<char*>(malloc(singleMsgLen));
if (singleData == NULL) {
free(bodyData);
bodyData = NULL;
err_info = "Parse message error: malloc buffer for v4 single data failure!";
return false;
}
memcpy(singleData, bodyData + bodyPos, singleMsgLen);
bodyPos += singleMsgLen;
bodyRemain -= singleMsgLen;
DataItem tmpDataItem(singleMsgLen, singleData);
addDataItem2Map(outKeyValStr, tmpDataItem);
free(singleData);
singleData = NULL;
}
free(bodyData);
bodyData = NULL;
} else {
int32_t bodyPos = 0;
uint32_t bodyRemain = realBodyLen;
while ((bodyRemain > 0) && (msgCount-- > 0)) {
uint32_t singleMsgLen = 0;
if (!getDatantohlInt(bodyData, bodyPos, bodyRemain, singleMsgLen, err_info)) {
free(bodyData);
bodyData = NULL;
err_info += " for v4 attr's msgLength parameter";
return false;
}
if (singleMsgLen <= 0) {
continue;
}
if (singleMsgLen > bodyRemain) {
free(bodyData);
bodyData = NULL;
err_info = "Parse message error: invalid v4 attr's msg Length 2";
return false;
}
char* singleData = static_cast<char*>(malloc(singleMsgLen));
if (singleData == NULL) {
free(bodyData);
bodyData = NULL;
err_info = "Parse message error: malloc buffer for v4 single data failure!";
return false;
}
memcpy(singleData, bodyData + bodyPos, singleMsgLen);
bodyPos += singleMsgLen;
bodyRemain -= singleMsgLen;
uint32_t singleAttrLen = 0;
if (!getDatantohlInt(bodyData, bodyPos, bodyRemain, singleAttrLen, err_info)) {
free(bodyData);
free(singleData);
bodyData = NULL;
singleData = NULL;
err_info += " for v4 attr's single length parameter";
return false;
}
if ((singleAttrLen <= 0) || (singleAttrLen > bodyRemain)) {
free(bodyData);
free(singleData);
bodyData = NULL;
singleData = NULL;
err_info = "Parse message error: invalid v4 attr's attr Length";
return false;
}
map<string, string> privAttrMap;
map<string, string>::iterator tempIt;
for (tempIt = commonAttrMap.begin(); tempIt != commonAttrMap.end(); ++tempIt) {
privAttrMap[tempIt->first] = tempIt->second;
}
string strSingleAttr;
if (singleAttrLen > 0) {
char* singleAttr = static_cast<char*>(malloc(singleAttrLen + 1));
if (singleAttr == NULL) {
free(bodyData);
free(singleData);
bodyData = NULL;
singleData = NULL;
err_info = "Parse message error: malloc buffer for v4 single attr failure!";
return false;
}
memset(singleAttr, 0, singleAttrLen + 1);
memcpy(singleAttr, bodyData + bodyPos, singleAttrLen);
bodyPos += singleAttrLen;
attrLenRemain -= singleAttrLen;
bodyRemain -= singleAttrLen;
strSingleAttr = singleAttr;
Utils::Split(strSingleAttr, privAttrMap, delimiter::kDelimiterAnd,
delimiter::kDelimiterEqual);
if (privAttrMap.empty()) {
free(bodyData);
free(singleAttr);
free(singleData);
bodyData = NULL;
singleData = NULL;
singleAttr = NULL;
err_info += " for v4 private attribute parameter";
return result;
}
free(singleAttr);
singleAttr = NULL;
}
string outKeyValStr;
Utils::Join(privAttrMap, outKeyValStr, delimiter::kDelimiterAnd, delimiter::kDelimiterEqual);
DataItem tmpDataItem(singleMsgLen, singleData);
addDataItem2Map(outKeyValStr, tmpDataItem);
free(singleData);
singleData = NULL;
}
free(bodyData);
bodyData = NULL;
}
is_parsed_ = true;
return true;
}