bool TubeMQTDMsg::parseBinMsg()

in inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/src/tubemq_tdmsg.cc [509:786]


bool TubeMQTDMsg::parseBinMsg(const char* data, uint32_t data_length, int32_t start_pos,
                              string& err_info) {
  // #lizard forgives
  uint32_t totalLen = 0;
  char msgType = 0;
  uint32_t bidNum = 0;
  uint32_t tidNum = 0;
  uint32_t extField = 0;
  uint32_t dataTime = 0;
  uint32_t msgCnt = 0;
  uint32_t uniqueId = 0;
  uint32_t bodyLen = 0;
  uint32_t attrLen = 0;
  uint32_t msgMagic = 0;
  size_t realBodyLen = 0;
  char* bodyData = NULL;

  int32_t pos1 = start_pos;
  uint32_t remain = data_length;
  if (!getDatantohlInt(data, pos1, remain, totalLen, err_info)) {
    err_info += " for data v4 totalLen parameter";
    return false;
  }
  if (!getDataChar(data, pos1, remain, msgType, err_info)) {
    err_info += " for data v4 msgType parameter";
    return false;
  }
  if (!getDatantohsInt(data, pos1, remain, bidNum, err_info)) {
    err_info += " for v4 bidNum parameter";
    return false;
  }
  if (!getDatantohsInt(data, pos1, remain, tidNum, err_info)) {
    err_info += " for v4 tidNum parameter";
    return false;
  }
  if (!getDatantohsInt(data, pos1, remain, extField, err_info)) {
    err_info += " for v4 extField parameter";
    return false;
  }
  if (!getDatantohlInt(data, pos1, remain, dataTime, err_info)) {
    err_info += " for data v4 dataTime parameter";
    return false;
  }
  create_time_ = dataTime;
  create_time_ *= 1000;
  if (!getDatantohsInt(data, pos1, remain, msgCnt, err_info)) {
    err_info += " for v4 cnt parameter";
    return false;
  }
  if (!getDatantohlInt(data, pos1, remain, uniqueId, err_info)) {
    err_info += " for data v4 uniq parameter";
    return false;
  }
  if (!getDatantohlInt(data, pos1, remain, bodyLen, err_info)) {
    err_info += " for data v4 bodyLen parameter";
    return false;
  }
  if (remain < bodyLen + 2) {
    err_info += "Parse message error: no enough data length for v4 attr_len data";
    return false;
  }
  int32_t attrLenPos = pos1 + bodyLen;
  uint32_t attrLenRemain = remain - bodyLen;
  if (!getDatantohsInt(data, attrLenPos, attrLenRemain, attrLen, err_info)) {
    err_info += " for data v4 attrLen parameter";
    return false;
  }
  if (remain < attrLen + 2) {
    err_info += "Parse message error: no enough data length for v4 msgMagic data";
    return false;
  }
  int32_t msgMagicPos = attrLenPos + attrLen;
  uint32_t msgMagicRemain = remain - attrLen;
  if (!getDatantohsInt(data, msgMagicPos, msgMagicRemain, msgMagic, err_info)) {
    err_info += " for v4 msgMagic parameter";
    return false;
  }
  msgMagic &= 0xFFFF;
  // get attr data
  bool result = false;
  map<string, string> commonAttrMap;
  if (attrLen != 0) {
    char* commonAttr = static_cast<char*>(malloc(attrLen + 1));
    if (commonAttr == NULL) {
      err_info = "Parse message error: malloc buffer for v3 common attr failure!";
      return false;
    }
    memset(commonAttr, 0, attrLen + 1);
    memcpy(commonAttr, data + attrLenPos, attrLen);
    string strAttr = commonAttr;
    Utils::Split(strAttr, commonAttrMap, delimiter::kDelimiterAnd, delimiter::kDelimiterEqual);
    if (commonAttrMap.empty()) {
      free(commonAttr);
      commonAttr = NULL;
      err_info += " for v4 common attribute parameter";
      return result;
    }
    free(commonAttr);
    commonAttr = NULL;
  }
  // get body data
  switch ((msgType & 0xE0) >> 5) {
    case 1: {
      if (snappy_uncompressed_length(data + pos1, bodyLen, &realBodyLen) != SNAPPY_OK) {
        err_info = "Parse message error:  snappy uncompressed v4 body's length failure!";
        return false;
      }
      bodyData = static_cast<char*>(malloc(realBodyLen));
      if (bodyData == NULL) {
        err_info = "Parse message error: malloc buffer for v4 body's data failure!";
        return false;
      }
      if (snappy_uncompress(data + pos1, bodyLen, bodyData, &realBodyLen) != SNAPPY_OK) {
        free(bodyData);
        bodyData = NULL;
        err_info = "Parse message error:  snappy uncompressed v4 body's data failure!";
        return false;
      }
      break;
    }

    case 0:
    default: {
      realBodyLen = bodyLen;
      bodyData = static_cast<char*>(malloc(realBodyLen));
      if (bodyData == NULL) {
        err_info = "Parse message error: malloc buffer for v4 body's data failure!";
        return false;
      }
      memcpy(bodyData, data + pos1, realBodyLen);
      break;
    }
  }
  //  build attr
  commonAttrMap["dt"] = Utils::Long2str(create_time_);
  if ((extField & 0x4) == 0x0) {
    commonAttrMap["bid"] = Utils::Int2str(bidNum);
    commonAttrMap["tid"] = Utils::Int2str(tidNum);
  }
  commonAttrMap["cnt"] = "1";
  int msgCount = msgCnt;
  //  build data
  if ((extField & 0x1) == 0x0) {
    int32_t bodyPos = 0;
    uint32_t bodyRemain = realBodyLen;
    string outKeyValStr;
    Utils::Join(commonAttrMap, outKeyValStr, delimiter::kDelimiterAnd, delimiter::kDelimiterEqual);
    while ((bodyRemain > 0) && (msgCount-- > 0)) {
      uint32_t singleMsgLen = 0;
      if (!getDatantohlInt(bodyData, bodyPos, bodyRemain, singleMsgLen, err_info)) {
        free(bodyData);
        bodyData = NULL;
        err_info += " for v4 attr's msgLength parameter";
        return false;
      }
      if (singleMsgLen <= 0) {
        continue;
      }
      if (singleMsgLen > bodyRemain) {
        free(bodyData);
        bodyData = NULL;
        err_info = "Parse message error: invalid v4 attr's msg Length 1";
        return false;
      }
      char* singleData = static_cast<char*>(malloc(singleMsgLen));
      if (singleData == NULL) {
        free(bodyData);
        bodyData = NULL;
        err_info = "Parse message error: malloc buffer for v4 single data failure!";
        return false;
      }
      memcpy(singleData, bodyData + bodyPos, singleMsgLen);
      bodyPos += singleMsgLen;
      bodyRemain -= singleMsgLen;
      DataItem tmpDataItem(singleMsgLen, singleData);
      addDataItem2Map(outKeyValStr, tmpDataItem);
      free(singleData);
      singleData = NULL;
    }
    free(bodyData);
    bodyData = NULL;
  } else {
    int32_t bodyPos = 0;
    uint32_t bodyRemain = realBodyLen;
    while ((bodyRemain > 0) && (msgCount-- > 0)) {
      uint32_t singleMsgLen = 0;
      if (!getDatantohlInt(bodyData, bodyPos, bodyRemain, singleMsgLen, err_info)) {
        free(bodyData);
        bodyData = NULL;
        err_info += " for v4 attr's msgLength parameter";
        return false;
      }
      if (singleMsgLen <= 0) {
        continue;
      }
      if (singleMsgLen > bodyRemain) {
        free(bodyData);
        bodyData = NULL;
        err_info = "Parse message error: invalid v4 attr's msg Length 2";
        return false;
      }
      char* singleData = static_cast<char*>(malloc(singleMsgLen));
      if (singleData == NULL) {
        free(bodyData);
        bodyData = NULL;
        err_info = "Parse message error: malloc buffer for v4 single data failure!";
        return false;
      }
      memcpy(singleData, bodyData + bodyPos, singleMsgLen);
      bodyPos += singleMsgLen;
      bodyRemain -= singleMsgLen;
      uint32_t singleAttrLen = 0;
      if (!getDatantohlInt(bodyData, bodyPos, bodyRemain, singleAttrLen, err_info)) {
        free(bodyData);
        free(singleData);
        bodyData = NULL;
        singleData = NULL;
        err_info += " for v4 attr's single length parameter";
        return false;
      }
      if ((singleAttrLen <= 0) || (singleAttrLen > bodyRemain)) {
        free(bodyData);
        free(singleData);
        bodyData = NULL;
        singleData = NULL;
        err_info = "Parse message error: invalid v4 attr's attr Length";
        return false;
      }
      map<string, string> privAttrMap;
      map<string, string>::iterator tempIt;
      for (tempIt = commonAttrMap.begin(); tempIt != commonAttrMap.end(); ++tempIt) {
        privAttrMap[tempIt->first] = tempIt->second;
      }
      string strSingleAttr;
      if (singleAttrLen > 0) {
        char* singleAttr = static_cast<char*>(malloc(singleAttrLen + 1));
        if (singleAttr == NULL) {
          free(bodyData);
          free(singleData);
          bodyData = NULL;
          singleData = NULL;
          err_info = "Parse message error: malloc buffer for v4 single attr failure!";
          return false;
        }
        memset(singleAttr, 0, singleAttrLen + 1);
        memcpy(singleAttr, bodyData + bodyPos, singleAttrLen);
        bodyPos += singleAttrLen;
        attrLenRemain -= singleAttrLen;
        bodyRemain -= singleAttrLen;
        strSingleAttr = singleAttr;
        Utils::Split(strSingleAttr, privAttrMap, delimiter::kDelimiterAnd,
                     delimiter::kDelimiterEqual);
        if (privAttrMap.empty()) {
          free(bodyData);
          free(singleAttr);
          free(singleData);
          bodyData = NULL;
          singleData = NULL;
          singleAttr = NULL;
          err_info += " for v4 private attribute parameter";
          return result;
        }
        free(singleAttr);
        singleAttr = NULL;
      }
      string outKeyValStr;
      Utils::Join(privAttrMap, outKeyValStr, delimiter::kDelimiterAnd, delimiter::kDelimiterEqual);
      DataItem tmpDataItem(singleMsgLen, singleData);
      addDataItem2Map(outKeyValStr, tmpDataItem);
      free(singleData);
      singleData = NULL;
    }
    free(bodyData);
    bodyData = NULL;
  }
  is_parsed_ = true;
  return true;
}