bool TubeMQTDMsg::parseDefaultMsg()

in inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/src/tubemq_tdmsg.cc [214:333]


bool TubeMQTDMsg::parseDefaultMsg(const char* data, uint32_t data_length, int32_t start_pos,
                                  string& err_info) {
  // #lizard forgives
  int32_t pos1 = start_pos;
  uint32_t remain = data_length;
  for (uint32_t i = 0; i < attr_count_; i++) {
    uint32_t origAttrLen = 0;
    char* origAttrData = NULL;
    string commAttr;
    uint32_t dataCnt = 1;
    uint32_t dataLen = 0;
    char compress = 0;
    if (remain <= 2) {
      if (i == 0) {
        err_info = "Parse message error: invalid databody length length";
        return false;
      } else {
        break;
      }
    }
    if (!getDatantohsInt(data, pos1, remain, origAttrLen, err_info)) {
      err_info += " for attr length parameter";
      return false;
    }
    if ((origAttrLen <= 0) || (origAttrLen > remain)) {
      err_info = "Parse message error: invalid attr length";
      return false;
    }
    origAttrData = static_cast<char*>(malloc(origAttrLen + 1));
    if (origAttrData == NULL) {
      err_info = "Parse message error: malloc buffer for default attr value failure!";
      return false;
    }
    memset(origAttrData, 0, origAttrLen + 1);
    memcpy(origAttrData, data + pos1, origAttrLen);
    pos1 += origAttrLen;
    remain -= origAttrLen;
    commAttr = origAttrData;
    free(origAttrData);
    origAttrData = NULL;
    if (version_ == 2) {
      if (!getDatantohlInt(data, pos1, remain, dataCnt, err_info)) {
        err_info += " for data count parameter";
        return false;
      }
    }
    if (!getDatantohlInt(data, pos1, remain, dataLen, err_info)) {
      err_info += " for data len parameter";
      return false;
    }
    if ((dataLen <= 0) || (dataLen > remain)) {
      err_info = "Parse message error: invalid data length";
      return false;
    }
    if (!getDataChar(data, pos1, remain, compress, err_info)) {
      return false;
    }
    size_t uncompressDataLen = 0;
    char* uncompressData = NULL;
    if (compress != 0) {
      if (snappy_uncompressed_length(data + pos1, dataLen - 1, &uncompressDataLen) != SNAPPY_OK) {
        err_info = "Parse message error:  snappy uncompressed default compress's length failure!";
        return false;
      }
      uncompressData = static_cast<char*>(malloc(uncompressDataLen));
      if (uncompressData == NULL) {
        err_info = "Parse message error: malloc buffer for default compress's data failure!";
        return false;
      }
      if (snappy_uncompress(data + pos1, dataLen - 1, uncompressData, &uncompressDataLen) !=
          SNAPPY_OK) {
        free(uncompressData);
        uncompressData = NULL;
        err_info = "Parse message error:  snappy uncompressed default compress's data failure!";
        return false;
      }
    } else {
      uncompressDataLen = dataLen - 1;
      uncompressData = static_cast<char*>(malloc(uncompressDataLen));
      if (uncompressData == NULL) {
        err_info = "Parse message error: malloc buffer for default's data failure!";
        return false;
      }
      memcpy(uncompressData, data + pos1, dataLen - 1);
    }
    pos1 += dataLen - 1;
    remain -= dataLen - 1;
    int32_t itemPos = 0;
    // unsigned int totalItemDataLen = 0;
    uint32_t itemRemain = uncompressDataLen;
    while (itemRemain > 0) {
      uint32_t singleMsgLen = 0;
      // unsigned int dataMsgLen = 0;
      // char *singleData = NULL;
      if (!getDatantohlInt(uncompressData, itemPos, itemRemain, singleMsgLen, err_info)) {
        free(uncompressData);
        uncompressData = NULL;
        err_info += " for default item's msgLength parameter";
        return false;
      }
      if (singleMsgLen <= 0) {
        continue;
      }
      if (singleMsgLen > itemRemain) {
        free(uncompressData);
        uncompressData = NULL;
        err_info = "Parse message error: invalid default attr's msg Length";
        return false;
      }
      DataItem tmpDataItem(singleMsgLen, uncompressData + itemPos);
      addDataItem2Map(commAttr, tmpDataItem);
      itemPos += singleMsgLen;
      itemRemain -= singleMsgLen;
    }
    free(uncompressData);
    uncompressData = NULL;
  }
  is_parsed_ = true;
  return true;
}