in inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc [166:321]
bool RecvGroup::PackMsg(std::vector<SdkMsgPtr> &msgs, char *pack_data,uint32_t &out_len) {
if (pack_data == nullptr) {
LOG_ERROR("nullptr, failed to allocate memory for buf");
return false;
}
uint32_t idx = 0;
for (auto &it : msgs) {
if (msg_type_ >= 5) {
*(uint32_t *)(&pack_buf_[idx]) = htonl(it->msg_.size());
idx += sizeof(uint32_t);
}
memcpy(&pack_buf_[idx], it->msg_.data(), it->msg_.size());
idx += static_cast<uint32_t>(it->msg_.size());
if (SdkConfig::getInstance()->isAttrDataPackFormat()) {
*(uint32_t *)(&pack_buf_[idx]) = htonl(it->data_pack_format_attr_.size());
idx += sizeof(uint32_t);
memcpy(&pack_buf_[idx], it->data_pack_format_attr_.data(),
it->data_pack_format_attr_.size());
idx += static_cast<uint32_t>(it->data_pack_format_attr_.size());
}
if (msg_type_ == 2 || msg_type_ == 3) {
pack_buf_[idx] = '\n';
++idx;
}
}
uint32_t cnt = 1;
if (msgs.size()) {
cnt = msgs.size();
}
if (msg_type_ >= constants::kBinPackMethod) {
char *bodyBegin = pack_data + sizeof(BinaryMsgHead) + sizeof(uint32_t);
uint32_t body_len = 0;
std::string snappy_res;
bool isSnappy = IsZipAndOperate(snappy_res, idx);
char real_msg_type;
if (isSnappy) {
body_len = static_cast<uint32_t>(snappy_res.size());
memcpy(bodyBegin, snappy_res.data(), body_len);
// msg_type
real_msg_type = (msg_type_ | constants::kBinSnappyFlag);
} else {
body_len = idx;
memcpy(bodyBegin, pack_buf_, body_len);
real_msg_type = msg_type_;
}
*(uint32_t *)(&(pack_data[sizeof(BinaryMsgHead)])) = htonl(body_len);
bodyBegin += body_len;
uint32_t char_groupId_flag = 0;
std::string groupId_streamId_char;
uint16_t groupId_num = 0, streamId_num = 0;
if (SdkConfig::getInstance()->enableChar() || groupId_num_ == 0 ||
streamId_num_ == 0) {
groupId_num = 0;
streamId_num = 0;
groupId_streamId_char = group_id_key_ + msgs[0]->inlong_group_id_ + stream_id_key_ + msgs[0]->inlong_stream_id_;
char_groupId_flag = 0x4;
} else {
groupId_num = groupId_num_;
streamId_num = streamId_num_;
}
uint16_t ext_field =
(SdkConfig::getInstance()->extend_field_ | char_groupId_flag);
uint32_t data_time = data_time_ / 1000;
std::string attr;
if (SdkConfig::getInstance()->enableTraceIP()) {
if (groupId_streamId_char.empty())
attr = "node1ip=" + SdkConfig::getInstance()->local_ip_ +
"&rtime1=" + std::to_string(Utils::getCurrentMsTime());
else
attr = groupId_streamId_char +
"&node1ip=" + SdkConfig::getInstance()->local_ip_ +
"&rtime1=" + std::to_string(Utils::getCurrentMsTime());
} else {
attr = group_id_key_ + msgs[0]->inlong_group_id_ +
"&streamId=" + msgs[0]->inlong_stream_id_;
}
*(uint16_t *)bodyBegin = htons(attr.size());
bodyBegin += sizeof(uint16_t);
memcpy(bodyBegin, attr.data(), attr.size());
bodyBegin += attr.size();
*(uint16_t *)bodyBegin = htons(constants::kBinaryMagic);
uint32_t total_len = 25 + body_len + attr.size();
char *p = pack_data;
*(uint32_t *)p = htonl(total_len);
p += 4;
*p = real_msg_type;
++p;
*(uint16_t *)p = htons(groupId_num);
p += 2;
*(uint16_t *)p = htons(streamId_num);
p += 2;
*(uint16_t *)p = htons(ext_field);
p += 2;
*(uint32_t *)p = htonl(data_time);
p += 4;
*(uint16_t *)p = htons(cnt);
p += 2;
*(uint32_t *)p = htonl(uniq_id_);
out_len = total_len + 4;
} else {
if (msg_type_ == 3 || msg_type_ == 2) {
--idx;
}
char *bodyBegin = pack_data + sizeof(ProtocolMsgHead) + sizeof(uint32_t);
uint32_t body_len = 0;
std::string snappy_res;
bool isSnappy = IsZipAndOperate(snappy_res, idx);
if (isSnappy) {
body_len = static_cast<uint32_t>(snappy_res.size());
memcpy(bodyBegin, snappy_res.data(), body_len);
} else {
body_len = idx;
memcpy(bodyBegin, pack_buf_, body_len);
}
*(uint32_t *)(&(pack_data[sizeof(ProtocolMsgHead)])) = htonl(body_len);
bodyBegin += body_len;
// attr
std::string attr;
attr = group_id_key_ + msgs[0]->inlong_group_id_ + stream_id_key_ + msgs[0]->inlong_stream_id_;
attr += "&dt=" + std::to_string(data_time_);
attr += "&mid=" + std::to_string(uniq_id_);
if (isSnappy)
attr += "&cp=snappy";
attr += "&cnt=" + std::to_string(cnt);
attr += "&sid=" + std::string(Utils::getSnowflakeId());
*(uint32_t *)bodyBegin = htonl(attr.size());
bodyBegin += sizeof(uint32_t);
memcpy(bodyBegin, attr.data(), attr.size());
// total_len
uint32_t total_len = 1 + 4 + body_len + 4 + attr.size();
*(uint32_t *)pack_data = htonl(total_len);
// msg_type
*(&pack_data[4]) = msg_type_;
out_len = total_len + 4;
}
return true;
}