pb.cpp (336 lines of code) (raw):

#include "pb.h" #include <cassert> #include "coding.h" using namespace aliyun_log_sdk_v6; using namespace aliyun_log_sdk_v6::pb; using KvPair = std::pair<std::string, std::string>; #define SLS_OUT #define IF_CONFITION_RETURN_FALSE(condition) \ if (condition) \ { \ return false; \ } #define CHECK_FIELD_TYPE(index, actual, expected) \ if (expected != actual) \ { \ return false; \ } /// for deserialize static bool ParseFromStr(const SlsStringPiece& str, SLS_OUT KvPair& result) { const char* pos = str.mPtr; const char* end = str.mPtr + str.mLen; bool hasKey = false, hasValue = false; while (pos < end) { uint32_t head = 0; pos = GetVarint32Ptr(pos, end, &head); IF_CONFITION_RETURN_FALSE(pos == nullptr); uint32_t mode = head & 0x7; uint32_t index = head >> 3; CHECK_FIELD_TYPE(index, mode, 2); uint32_t len = 0; pos = GetVarint32Ptr(pos, end, &len); IF_CONFITION_RETURN_FALSE(pos == nullptr || pos + len > end); // key = 1 if (index == 1) { result.first = std::string(pos, len); pos += len; hasKey = true; continue; } // value = 2 if (index == 2) { result.second = std::string(pos, len); pos += len; hasValue = true; continue; } pos = SkipProtobufField(pos, end, mode); IF_CONFITION_RETURN_FALSE(pos == nullptr); } return hasKey && hasValue && (pos == end); } static bool ParseFromStr(const SlsStringPiece& str, SLS_OUT Log& result) { const char* pos = str.mPtr; const char* end = str.mPtr + str.mLen; bool hasTime = false; while (pos < end) { uint32_t head = 0; pos = GetVarint32Ptr(pos, end, &head); IF_CONFITION_RETURN_FALSE(pos == nullptr); uint32_t mode = head & 0x7; uint32_t index = head >> 3; switch (index) { case 1: // log time { CHECK_FIELD_TYPE(index, mode, 0); IF_CONFITION_RETURN_FALSE(hasTime); uint32_t data_time; pos = GetVarint32Ptr(pos, end, &data_time); IF_CONFITION_RETURN_FALSE(pos == nullptr); result.time = data_time; hasTime = true; break; } case 2: // log content { CHECK_FIELD_TYPE(index, mode, 2); uint32_t len = 0; pos = GetVarint32Ptr(pos, end, &len); IF_CONFITION_RETURN_FALSE(pos == nullptr || pos + len > end); KvPair pair; if (!ParseFromStr(SlsStringPiece(pos, len), pair)) return false; result.contents.emplace_back(pair.first, pair.second); pos += len; break; } case 4: // nano time { CHECK_FIELD_TYPE(index, mode, 5); IF_CONFITION_RETURN_FALSE(pos + 4 > end); std::memcpy(&result.timeNs, pos, 4); result.hasTimeNs = true; pos += 4; break; } default: { pos = SkipProtobufField(pos, end, mode); IF_CONFITION_RETURN_FALSE(pos == nullptr); break; } } } return hasTime && (pos == end); } static bool ParseFromStr(const SlsStringPiece& str, SLS_OUT LogGroup& result) { const char* pos = str.mPtr; const char* end = str.mPtr + str.mLen; while (pos < end) { uint32_t head = 0; pos = GetVarint32Ptr(pos, end, &head); IF_CONFITION_RETURN_FALSE(pos == nullptr); uint32_t mode = head & 0x7; uint32_t index = head >> 3; if (index != 1 && index != 3 && index != 4 && index != 6) { pos = SkipProtobufField(pos, end, mode); IF_CONFITION_RETURN_FALSE(pos == nullptr); continue; } CHECK_FIELD_TYPE(index, mode, 2); uint32_t len = 0; pos = GetVarint32Ptr(pos, end, &len); IF_CONFITION_RETURN_FALSE(pos == nullptr || pos + len > end); switch (index) { case 1: // logs { Log log; if (!ParseFromStr(SlsStringPiece(pos, len), log)) return false; result.logs.push_back(std::move(log)); pos += len; break; } case 3: // topic { result.topic = std::string(pos, len); pos += len; break; } case 4: // source { result.source = std::string(pos, len); pos += len; break; } case 6: // tags { KvPair pair; if (!ParseFromStr(SlsStringPiece(pos, len), pair)) return false; result.logTags.emplace_back(pair.first, pair.second); pos += len; break; } default: { // unreachable break; } } } return pos == end; } static bool ParseFromStr(const SlsStringPiece& str, SLS_OUT LogGroupList& result) { const char* pos = str.mPtr; const char* end = str.mPtr + str.mLen; while (pos < end) { uint32_t head = 0; pos = GetVarint32Ptr(pos, end, &head); IF_CONFITION_RETURN_FALSE(pos == nullptr); uint32_t mode = head & 0x7; uint32_t index = head >> 3; if (index != 1) { pos = SkipProtobufField(pos, end, mode); IF_CONFITION_RETURN_FALSE(pos == nullptr); continue; } CHECK_FIELD_TYPE(index, mode, 2); uint32_t len = 0; pos = GetVarint32Ptr(pos, end, &len); IF_CONFITION_RETURN_FALSE(pos == nullptr || pos + len > end); LogGroup logGroup; if (!ParseFromStr(SlsStringPiece(pos, len), logGroup)) return false; result.logGroupList.push_back(std::move(logGroup)); pos += len; } return pos == end; } bool PbEncoder::ParseFromString(const std::string &str, SLS_OUT LogGroupList &out) { out.Clear(); if (!ParseFromStr(SlsStringPiece(str.c_str(), str.size()), out)) { out.Clear(); return false; } return true; } /// for serialize inline static size_t WithFieldHeader(size_t size) { return 1 + size + GetEncodeVarint32Len(size); } inline static size_t PbLength(const std::string& key, const std::string& value) { return WithFieldHeader(key.size()) + WithFieldHeader(value.size()); } inline static size_t PbLength(const Log& log) { size_t size = 1 + GetEncodeVarint32Len(log.time); for (const auto& content : log.contents) { size += WithFieldHeader(PbLength(content.key, content.value)); } if (log.hasTimeNs) { size += 1 + 4; } return size; } inline static size_t PbLength(const LogGroup& logGroup) { size_t size = 0; // topic if (!logGroup.topic.empty()) { size += WithFieldHeader(logGroup.topic.size()); } // source if (!logGroup.source.empty()) { size += WithFieldHeader(logGroup.source.size()); } // logs for (const auto& log : logGroup.logs) { size += WithFieldHeader(PbLength(log)); } // tags for (const auto& logTag : logGroup.logTags) { size += WithFieldHeader(PbLength(logTag.key, logTag.value)); } return size; } inline static char* WriteStrWithHead(const std::string& str, char head, SLS_OUT char* dst) { *dst++ = head; dst = EncodeVarint32(dst, str.size()); std::memcpy(dst, str.c_str(), str.size()); return dst + str.size(); } static char* WriteStrPairWithHead(char head, size_t msgSize, const std::string& key, const std::string& value, SLS_OUT char* dst) { // head | EncodeVarint32(msgSize) | msg *dst++ = head; dst = EncodeVarint32(dst, msgSize); dst = WriteStrWithHead(key, (char)((1 << 3) | 2), dst); return WriteStrWithHead(value, (char)((2 << 3) | 2), dst); } static char* WriteLogWithHead(size_t msgSize, const Log& log, SLS_OUT char* dst) { // write in order *dst++ = (char)((1 << 3) | 2); dst = EncodeVarint32(dst, msgSize); // time = 1 *dst++ = (char)((1 << 3) | 0); dst = EncodeVarint32(dst, log.time); // timeNs = 4 if (log.hasTimeNs) { *dst++ = (char)((4 << 3) | 5); dst = PutFixedUint32(dst, log.timeNs); } // contents = 2 for (const auto& content : log.contents) { size_t contentMsgSize = PbLength(content.key, content.value); dst = WriteStrPairWithHead((char)((2 << 3) | 2), contentMsgSize, content.key, content.value, dst); } return dst; } // todo: cache PbLength bool PbEncoder::SerializeToString(const LogGroup& logGroup, SLS_OUT std::string& out) { size_t totalMsgSize = PbLength(logGroup); out.resize(totalMsgSize); // write in order char* begin = &out[0]; char* pos = begin + totalMsgSize; // topic = 3 if (!logGroup.topic.empty()) { char* writeBeginPos = pos - WithFieldHeader(logGroup.topic.size()); WriteStrWithHead(logGroup.topic, (char)((3 << 3) | 2), writeBeginPos); pos = writeBeginPos; } // source = 4 if (!logGroup.source.empty()) { char* writeBeginPos = pos - WithFieldHeader(logGroup.source.size()); WriteStrWithHead(logGroup.source, (char)((4 << 3) | 2), writeBeginPos); pos = writeBeginPos; } // tags = 6 for (auto it = logGroup.logTags.rbegin(); it != logGroup.logTags.rend(); ++it) { auto& logTag = *it; size_t msgSize = PbLength(logTag.key, logTag.value); char* writeBeginPos = pos - WithFieldHeader(msgSize); WriteStrPairWithHead((char)((6 << 3) | 2), msgSize, logTag.key, logTag.value, writeBeginPos); pos = writeBeginPos; } // logs = 1 for (auto it = logGroup.logs.rbegin(); it != logGroup.logs.rend(); ++it) { auto& log = *it; size_t msgSize = PbLength(log); char* writeBeginPos = pos - WithFieldHeader(msgSize); WriteLogWithHead(msgSize, log, writeBeginPos); pos = writeBeginPos; } if (pos != begin) { out.clear(); return false; } return true; } bool LogGroupList::ParseFromString(const std::string& str) { return PbEncoder::ParseFromString(str, *this); } bool LogGroup::SerializeToString(std::string& out) const { return PbEncoder::SerializeToString(*this, out); }