bool RecvGroup::PackMsg()

in inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc [166:321]


bool RecvGroup::PackMsg(std::vector<SdkMsgPtr> &msgs, char *pack_data,uint32_t &out_len) {
  if (pack_data == nullptr) {
    LOG_ERROR("nullptr, failed to allocate memory for buf");
    return false;
  }
  uint32_t idx = 0;
  for (auto &it : msgs) {
    if (msg_type_ >= 5) {
      *(uint32_t *)(&pack_buf_[idx]) = htonl(it->msg_.size());
      idx += sizeof(uint32_t);
    }
    memcpy(&pack_buf_[idx], it->msg_.data(), it->msg_.size());
    idx += static_cast<uint32_t>(it->msg_.size());

    if (SdkConfig::getInstance()->isAttrDataPackFormat()) {
      *(uint32_t *)(&pack_buf_[idx]) = htonl(it->data_pack_format_attr_.size());
      idx += sizeof(uint32_t);

      memcpy(&pack_buf_[idx], it->data_pack_format_attr_.data(),
             it->data_pack_format_attr_.size());
      idx += static_cast<uint32_t>(it->data_pack_format_attr_.size());
    }

    if (msg_type_ == 2 || msg_type_ == 3) {
      pack_buf_[idx] = '\n';
      ++idx;
    }
  }

  uint32_t cnt = 1;
  if (msgs.size()) {
    cnt = msgs.size();
  }

  if (msg_type_ >= constants::kBinPackMethod) {
    char *bodyBegin = pack_data + sizeof(BinaryMsgHead) + sizeof(uint32_t);
    uint32_t body_len = 0;

    std::string snappy_res;
    bool isSnappy = IsZipAndOperate(snappy_res, idx);
    char real_msg_type;

    if (isSnappy) {
      body_len = static_cast<uint32_t>(snappy_res.size());
      memcpy(bodyBegin, snappy_res.data(), body_len);
      // msg_type
      real_msg_type = (msg_type_ | constants::kBinSnappyFlag);
    } else {
      body_len = idx;
      memcpy(bodyBegin, pack_buf_, body_len);
      real_msg_type = msg_type_;
    }
    *(uint32_t *)(&(pack_data[sizeof(BinaryMsgHead)])) = htonl(body_len);

    bodyBegin += body_len;

    uint32_t char_groupId_flag = 0;
    std::string groupId_streamId_char;
    uint16_t groupId_num = 0, streamId_num = 0;
    if (SdkConfig::getInstance()->enableChar() || groupId_num_ == 0 ||
        streamId_num_ == 0) {
      groupId_num = 0;
      streamId_num = 0;
      groupId_streamId_char = group_id_key_ + msgs[0]->inlong_group_id_ + stream_id_key_ + msgs[0]->inlong_stream_id_;
      char_groupId_flag = 0x4;
    } else {
      groupId_num = groupId_num_;
      streamId_num = streamId_num_;
    }
    uint16_t ext_field =
        (SdkConfig::getInstance()->extend_field_ | char_groupId_flag);
    uint32_t data_time = data_time_ / 1000;

    std::string attr;
    if (SdkConfig::getInstance()->enableTraceIP()) {
      if (groupId_streamId_char.empty())
        attr = "node1ip=" + SdkConfig::getInstance()->local_ip_ +
            "&rtime1=" + std::to_string(Utils::getCurrentMsTime());
      else
        attr = groupId_streamId_char +
            "&node1ip=" + SdkConfig::getInstance()->local_ip_ +
            "&rtime1=" + std::to_string(Utils::getCurrentMsTime());
    } else {
      attr = group_id_key_ + msgs[0]->inlong_group_id_ +
          "&streamId=" + msgs[0]->inlong_stream_id_;
    }
    *(uint16_t *)bodyBegin = htons(attr.size());
    bodyBegin += sizeof(uint16_t);
    memcpy(bodyBegin, attr.data(), attr.size());
    bodyBegin += attr.size();

    *(uint16_t *)bodyBegin = htons(constants::kBinaryMagic);

    uint32_t total_len = 25 + body_len + attr.size();

    char *p = pack_data;
    *(uint32_t *)p = htonl(total_len);
    p += 4;
    *p = real_msg_type;
    ++p;
    *(uint16_t *)p = htons(groupId_num);
    p += 2;
    *(uint16_t *)p = htons(streamId_num);
    p += 2;
    *(uint16_t *)p = htons(ext_field);
    p += 2;
    *(uint32_t *)p = htonl(data_time);
    p += 4;
    *(uint16_t *)p = htons(cnt);
    p += 2;
    *(uint32_t *)p = htonl(uniq_id_);

    out_len = total_len + 4;
  } else {
    if (msg_type_ == 3 || msg_type_ == 2) {
      --idx;
    }

    char *bodyBegin = pack_data + sizeof(ProtocolMsgHead) + sizeof(uint32_t);
    uint32_t body_len = 0;
    std::string snappy_res;
    bool isSnappy = IsZipAndOperate(snappy_res, idx);
    if (isSnappy) {
      body_len = static_cast<uint32_t>(snappy_res.size());
      memcpy(bodyBegin, snappy_res.data(), body_len);
    } else {
      body_len = idx;
      memcpy(bodyBegin, pack_buf_, body_len);
    }
    *(uint32_t *)(&(pack_data[sizeof(ProtocolMsgHead)])) = htonl(body_len);
    bodyBegin += body_len;

    // attr
    std::string attr;
    attr = group_id_key_ + msgs[0]->inlong_group_id_ + stream_id_key_ + msgs[0]->inlong_stream_id_;

    attr += "&dt=" + std::to_string(data_time_);
    attr += "&mid=" + std::to_string(uniq_id_);
    if (isSnappy)
      attr += "&cp=snappy";
    attr += "&cnt=" + std::to_string(cnt);
    attr += "&sid=" + std::string(Utils::getSnowflakeId());

    *(uint32_t *)bodyBegin = htonl(attr.size());
    bodyBegin += sizeof(uint32_t);
    memcpy(bodyBegin, attr.data(), attr.size());

    // total_len
    uint32_t total_len = 1 + 4 + body_len + 4 + attr.size();
    *(uint32_t *)pack_data = htonl(total_len);
    // msg_type
    *(&pack_data[4]) = msg_type_;
    out_len = total_len + 4;
  }
  return true;
}