bool TubeMQTDMsg::parseMixAttrMsg()

in inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/src/tubemq_tdmsg.cc [335:507]


bool TubeMQTDMsg::parseMixAttrMsg(const char* data, uint32_t data_length, int32_t start_pos,
                                  string& err_info) {
  // #lizard forgives
  int32_t pos1 = start_pos;
  uint32_t remain = data_length;
  for (uint32_t i = 0; i < attr_count_; i++) {
    uint32_t origAttrLen = 0;
    char* origAttrData = NULL;
    string commAttr;
    uint32_t bodyDataLen = 0;
    char compress = 0;
    if (remain <= 2) {
      if (i == 0) {
        err_info = "Parse message error: invalid databody length length";
        return false;
      } else {
        break;
      }
    }
    if (!getDatantohsInt(data, pos1, remain, origAttrLen, err_info)) {
      err_info += " for attr length parameter";
      return false;
    }
    if ((origAttrLen <= 0) || (origAttrLen > remain)) {
      err_info = "Parse message error: invalid attr length";
      return false;
    }
    origAttrData = static_cast<char*>(malloc(origAttrLen + 1));
    if (origAttrData == NULL) {
      err_info = "Parse message error: malloc buffer for v3 attr value failure!";
      return false;
    }
    memset(origAttrData, 0, origAttrLen + 1);
    memcpy(origAttrData, data + pos1, origAttrLen);
    pos1 += origAttrLen;
    remain -= origAttrLen;
    commAttr = origAttrData;
    free(origAttrData);
    origAttrData = NULL;
    if (!getDatantohlInt(data, pos1, remain, bodyDataLen, err_info)) {
      err_info += " for body data len parameter";
      return false;
    }
    if ((bodyDataLen <= 0) || (bodyDataLen > remain)) {
      err_info = "Parse message error: invalid data length";
      return false;
    }
    if (!getDataChar(data, pos1, remain, compress, err_info)) {
      err_info += " for attr compress parameter";
      return false;
    }
    size_t uncompressDataLen = 0;
    char* uncompressData = NULL;
    if (compress != 0) {
      if (snappy_uncompressed_length(data + pos1, bodyDataLen - 1, &uncompressDataLen) !=
          SNAPPY_OK) {
        err_info = "Parse message error:  snappy uncompressed v3 compress's length failure!";
        return false;
      }
      uncompressData = static_cast<char*>(malloc(uncompressDataLen));
      if (uncompressData == NULL) {
        err_info = "Parse message error: malloc buffer for v3 compress's data failure!";
        return false;
      }
      if (snappy_uncompress(data + pos1, bodyDataLen - 1, uncompressData, &uncompressDataLen) !=
          SNAPPY_OK) {
        free(uncompressData);
        uncompressData = NULL;
        err_info = "Parse message error:  snappy uncompressed v3 compress's data failure!";
        return false;
      }
    } else {
      uncompressDataLen = bodyDataLen - 1;
      uncompressData = static_cast<char*>(malloc(uncompressDataLen));
      if (uncompressData == NULL) {
        err_info = "Parse message error: malloc buffer for v3 compress's data failure!";
        return false;
      }
      memcpy(uncompressData, data + pos1, bodyDataLen - 1);
    }
    pos1 += bodyDataLen - 1;
    remain -= bodyDataLen - 1;
    int32_t itemPos = 0;
    uint32_t totalItemDataLen = 0;
    uint32_t itemRemain = uncompressDataLen;
    if (!getDatantohlInt(uncompressData, itemPos, itemRemain, totalItemDataLen, err_info)) {
      free(uncompressData);
      uncompressData = NULL;
      err_info += " for v3 item's msgLength parameter";
      return false;
    }
    if ((totalItemDataLen <= 0) || (totalItemDataLen > itemRemain)) {
      free(uncompressData);
      uncompressData = NULL;
      err_info = "Parse message error: invalid v3 attr's msg Length";
      return false;
    }
    while (itemRemain > 0) {
      uint32_t singleMsgLen = 0;
      char* singleData = NULL;
      uint32_t singleAttrLen = 0;
      char* singleAttr = NULL;
      string finalAttr;
      if (!getDatantohlInt(uncompressData, itemPos, itemRemain, singleMsgLen, err_info)) {
        free(uncompressData);
        uncompressData = NULL;
        err_info += " for v3 item's msgLength parameter";
        return false;
      }
      if ((singleMsgLen <= 0) || (singleMsgLen > itemRemain)) {
        free(uncompressData);
        uncompressData = NULL;
        err_info = "Parse message error: invalid v3 attr's msg Length";
        return false;
      }
      singleData = static_cast<char*>(malloc(singleMsgLen));
      if (singleData == NULL) {
        free(uncompressData);
        uncompressData = NULL;
        err_info = "Parse message error: malloc buffer for v3 single data failure!";
        return false;
      }
      memcpy(singleData, uncompressData + itemPos, singleMsgLen);
      itemPos += singleMsgLen;
      itemRemain -= singleMsgLen;
      if (itemRemain > 0) {
        if (!getDatantohlInt(uncompressData, itemPos, itemRemain, singleAttrLen, err_info)) {
          free(uncompressData);
          free(singleData);
          uncompressData = NULL;
          singleData = NULL;
          err_info += " for v3 attr's single length parameter";
          return false;
        }
        if ((singleAttrLen <= 0) || (singleAttrLen > itemRemain)) {
          free(uncompressData);
          free(singleData);
          uncompressData = NULL;
          singleData = NULL;
          err_info = "Parse message error: invalid v3 attr's attr Length";
          return false;
        }
        singleAttr = static_cast<char*>(malloc(singleAttrLen + 1));
        if (singleAttr == NULL) {
          free(uncompressData);
          free(singleData);
          uncompressData = NULL;
          singleData = NULL;
          err_info = "Parse message error: malloc buffer for v3 single attr failure!";
          return false;
        }
        memset(singleAttr, 0, singleAttrLen + 1);
        memcpy(singleAttr, uncompressData + itemPos, singleAttrLen);
        itemPos += singleAttrLen;
        itemRemain -= singleAttrLen;
        string strSingleAttr = singleAttr;
        finalAttr = commAttr + "&" + strSingleAttr;
        free(singleAttr);
        singleAttr = NULL;
      } else {
        finalAttr = commAttr;
      }
      DataItem tmpDataItem(singleMsgLen, singleData);
      addDataItem2Map(finalAttr, tmpDataItem);
      free(singleData);
      singleData = NULL;
    }
    free(uncompressData);
    uncompressData = NULL;
  }
  is_parsed_ = true;
  return true;
}