common/protobuf/kudu/util/pb_util.cc (844 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. // // Some portions copyright (C) 2008, Google, inc. // // Utilities for working with protobufs. // Some of this code is cribbed from the protobuf source, // but modified to work with kudu's 'faststring' instead of STL strings. #include "kudu/util/pb_util.h" #include <algorithm> #include <cstddef> #include <deque> #include <initializer_list> #include <limits> #include <memory> #include <mutex> #include <ostream> #include <string> #include <unordered_set> #include <vector> #include <boost/optional/optional.hpp> #include <glog/logging.h> #include <google/protobuf/descriptor.h> #include <google/protobuf/descriptor.pb.h> #include <google/protobuf/descriptor_database.h> #include <google/protobuf/dynamic_message.h> #include <google/protobuf/io/coded_stream.h> #include <google/protobuf/io/zero_copy_stream_impl_lite.h> #include <google/protobuf/message.h> #include <google/protobuf/message_lite.h> #include <google/protobuf/stubs/status.h> #include <google/protobuf/text_format.h> #include <google/protobuf/util/json_util.h> #include "kudu/gutil/integral_types.h" #include "kudu/gutil/macros.h" #include "kudu/gutil/map-util.h" #include "kudu/gutil/port.h" #include "kudu/gutil/strings/escaping.h" #include "kudu/gutil/strings/fastmem.h" #include "kudu/gutil/strings/substitute.h" #include "kudu/util/coding.h" #include "kudu/util/coding-inl.h" #include "kudu/util/crc.h" #include "kudu/util/debug/sanitizer_scopes.h" #include "kudu/util/debug/trace_event.h" #include "kudu/util/env.h" #include "kudu/util/faststring.h" #include "kudu/util/jsonwriter.h" #include "kudu/util/logging.h" #include "kudu/util/path_util.h" #include "kudu/util/pb_util-internal.h" #include "kudu/util/pb_util.pb.h" #include "kudu/util/scoped_cleanup.h" #include "kudu/util/slice.h" #include "kudu/util/status.h" using google::protobuf::Descriptor; using google::protobuf::DescriptorPool; using google::protobuf::DynamicMessageFactory; using google::protobuf::FieldDescriptor; using google::protobuf::FileDescriptor; using google::protobuf::FileDescriptorProto; using google::protobuf::FileDescriptorSet; using google::protobuf::io::ArrayInputStream; using google::protobuf::io::CodedInputStream; using google::protobuf::Message; using google::protobuf::MessageLite; using google::protobuf::Reflection; using google::protobuf::SimpleDescriptorDatabase; using google::protobuf::TextFormat; using kudu::crc::Crc; using kudu::pb_util::internal::SequentialFileFileInputStream; using kudu::pb_util::internal::WritableFileOutputStream; using std::deque; using std::endl; using std::initializer_list; using std::ostream; using std::shared_ptr; using std::string; using std::unique_ptr; using std::unordered_set; using std::vector; using strings::Substitute; using strings::Utf8SafeCEscape; namespace std { // Allow the use of FileState with DCHECK_EQ. std::ostream& operator<< (std::ostream& os, const kudu::pb_util::FileState& state) { os << static_cast<int>(state); return os; } } // namespace std namespace kudu { namespace pb_util { static const char* const kTmpTemplateSuffix = ".XXXXXX"; // Protobuf container constants. static const uint32_t kPBContainerInvalidVersion = 0; static const uint32_t kPBContainerDefaultVersion = 2; static const int kPBContainerChecksumLen = sizeof(uint32_t); static const char kPBContainerMagic[] = "kuducntr"; static const int kPBContainerMagicLen = 8; static const int kPBContainerV1HeaderLen = kPBContainerMagicLen + sizeof(uint32_t); // Magic number + version. static const int kPBContainerV2HeaderLen = kPBContainerV1HeaderLen + kPBContainerChecksumLen; // Same as V1 plus a checksum. const int kPBContainerMinimumValidLength = kPBContainerV1HeaderLen; static_assert(arraysize(kPBContainerMagic) - 1 == kPBContainerMagicLen, "kPBContainerMagic does not match expected length"); namespace { // When serializing, we first compute the byte size, then serialize the message. // If serialization produces a different number of bytes than expected, we // call this function, which crashes. The problem could be due to a bug in the // protobuf implementation but is more likely caused by concurrent modification // of the message. This function attempts to distinguish between the two and // provide a useful error message. void ByteSizeConsistencyError(size_t byte_size_before_serialization, size_t byte_size_after_serialization, size_t bytes_produced_by_serialization) { CHECK_EQ(byte_size_before_serialization, byte_size_after_serialization) << "Protocol message was modified concurrently during serialization."; CHECK_EQ(bytes_produced_by_serialization, byte_size_before_serialization) << "Byte size calculation and serialization were inconsistent. This " "may indicate a bug in protocol buffers or it may be caused by " "concurrent modification of the message."; LOG(FATAL) << "This shouldn't be called if all the sizes are equal."; } string InitializationErrorMessage(const char* action, const MessageLite& message) { // Note: We want to avoid depending on strutil in the lite library, otherwise // we'd use: // // return strings::Substitute( // "Can't $0 message of type \"$1\" because it is missing required " // "fields: $2", // action, message.GetTypeName(), // message.InitializationErrorString()); string result; result += "Can't "; result += action; result += " message of type \""; result += message.GetTypeName(); result += "\" because it is missing required fields: "; result += message.InitializationErrorString(); return result; } // Returns true iff the specified protobuf container file version is supported // by this implementation. bool IsSupportedContainerVersion(uint32_t version) { if (version == 1 || version == 2) { return true; } return false; } // Reads exactly 'length' bytes from the container file into 'result', // validating that there is sufficient data in the file to read this length // before attempting to do so, and validating that it has read that length // after performing the read. // // If the file size is less than the requested size of the read, returns // Status::Incomplete. // If there is an unexpected short read, returns Status::Corruption. // // NOTE: the data in 'result' may be modified even in the case of a failed read. template<typename ReadableFileType> Status ValidateAndReadData(ReadableFileType* reader, uint64_t file_size, uint64_t* offset, uint64_t length, faststring* result) { // Validate the read length using the file size. if (*offset + length > file_size) { return Status::Incomplete("File size not large enough to be valid", Substitute("Proto container file $0: " "Tried to read $1 bytes at offset " "$2 but file size is only $3 bytes", reader->filename(), length, *offset, file_size)); } // Perform the read. result->resize(length); RETURN_NOT_OK(reader->Read(*offset, Slice(*result))); *offset += length; return Status::OK(); } // Helper macro for use with ParseAndCompareChecksum(). Example usage: // RETURN_NOT_OK_PREPEND(ParseAndCompareChecksum(checksum.data(), { data }), // CHECKSUM_ERR_MSG("Data checksum does not match", filename, offset)); #define CHECKSUM_ERR_MSG(prefix, filename, cksum_offset) \ Substitute("$0: Incorrect checksum in file $1 at offset $2", prefix, filename, cksum_offset) // Parses a checksum from the specified buffer and compares it to the bytes // given in 'slices' by calculating a rolling CRC32 checksum of the bytes in // the 'slices'. // If they match, returns OK. Otherwise, returns Status::Corruption. Status ParseAndCompareChecksum(const uint8_t* checksum_buf, const initializer_list<Slice>& slices) { uint32_t written_checksum = DecodeFixed32(checksum_buf); uint64_t actual_checksum = 0; Crc* crc32c = crc::GetCrc32cInstance(); for (Slice s : slices) { crc32c->Compute(s.data(), s.size(), &actual_checksum); } if (PREDICT_FALSE(actual_checksum != written_checksum)) { return Status::Corruption(Substitute("Checksum does not match. Expected: $0. Actual: $1", written_checksum, actual_checksum)); } return Status::OK(); } // If necessary, get the size of the file opened by 'reader' in 'cached_file_size'. // If 'cached_file_size' already has a value, this is a no-op. template<typename ReadableFileType> Status CacheFileSize(ReadableFileType* reader, boost::optional<uint64_t>* cached_file_size) { if (*cached_file_size) { return Status::OK(); } uint64_t file_size; RETURN_NOT_OK(reader->Size(&file_size)); *cached_file_size = file_size; return Status::OK(); } template<typename ReadableFileType> Status RestOfFileIsAllZeros(ReadableFileType* reader, uint64_t filesize, uint64_t offset, bool* all_zeros) { DCHECK(reader); DCHECK_GE(filesize, offset); DCHECK(all_zeros); constexpr uint64_t max_to_read = 4 * 1024 * 1024; // 4 MiB. faststring buf; while (true) { uint64_t to_read = std::min(max_to_read, filesize - offset); if (to_read == 0) { break; } buf.resize(to_read); RETURN_NOT_OK(reader->Read(offset, Slice(buf))); offset += to_read; if (!IsAllZeros(buf)) { *all_zeros = false; return Status::OK(); } } *all_zeros = true; return Status::OK(); } // Read and parse a message of the specified format at the given offset in the // format documented in pb_util.h. 'offset' is an in-out parameter and will be // updated with the new offset on success. On failure, 'offset' is not modified. template<typename ReadableFileType> Status ReadPBStartingAt(ReadableFileType* reader, int version, boost::optional<uint64_t>* cached_file_size, uint64_t* offset, Message* msg) { uint64_t tmp_offset = *offset; VLOG(1) << "Reading PB with version " << version << " starting at offset " << *offset; RETURN_NOT_OK(CacheFileSize(reader, cached_file_size)); uint64_t file_size = cached_file_size->get(); if (tmp_offset == *cached_file_size) { return Status::EndOfFile("Reached end of file"); } // Read the data length from the file. // Version 2+ includes a checksum for the length field. uint64_t length_buflen = (version == 1) ? sizeof(uint32_t) : sizeof(uint32_t) + kPBContainerChecksumLen; faststring length_and_cksum_buf; RETURN_NOT_OK_PREPEND(ValidateAndReadData(reader, file_size, &tmp_offset, length_buflen, &length_and_cksum_buf), Substitute("Could not read data length from proto container file $0 " "at offset $1", reader->filename(), *offset)); Slice length(length_and_cksum_buf.data(), sizeof(uint32_t)); // Versions >= 2 have an individual checksum for the data length. if (version >= 2) { // KUDU-2260: If the length and checksum data are all 0's, and the rest of // the file is all 0's, then it's an incomplete record, not corruption. // This can happen e.g. on ext4 in the default data=ordered mode, when the // filesize metadata is updated but the new data is not persisted. // See https://news.ycombinator.com/item?id=11512006 if (IsAllZeros(length_and_cksum_buf)) { bool all_zeros = false; RETURN_NOT_OK(RestOfFileIsAllZeros(reader, file_size, tmp_offset, &all_zeros)); if (all_zeros) { return Status::Incomplete("incomplete write of PB: rest of file is NULL bytes"); } } Slice length_checksum(length_and_cksum_buf.data() + sizeof(uint32_t), kPBContainerChecksumLen); RETURN_NOT_OK_PREPEND(ParseAndCompareChecksum(length_checksum.data(), { length }), CHECKSUM_ERR_MSG("Data length checksum does not match", reader->filename(), tmp_offset - kPBContainerChecksumLen)); } uint32_t data_length = DecodeFixed32(length.data()); // Read body and checksum into buffer for checksum & parsing. uint64_t data_and_cksum_buflen = data_length + kPBContainerChecksumLen; faststring body_and_cksum_buf; RETURN_NOT_OK_PREPEND(ValidateAndReadData(reader, file_size, &tmp_offset, data_and_cksum_buflen, &body_and_cksum_buf), Substitute("Could not read PB message data from proto container file $0 " "at offset $1", reader->filename(), tmp_offset)); Slice body(body_and_cksum_buf.data(), data_length); Slice record_checksum(body_and_cksum_buf.data() + data_length, kPBContainerChecksumLen); // Version 1 has a single checksum for length, body. // Version 2+ has individual checksums for length and body, respectively. if (version == 1) { RETURN_NOT_OK_PREPEND(ParseAndCompareChecksum(record_checksum.data(), { length, body }), CHECKSUM_ERR_MSG("Length and data checksum does not match", reader->filename(), tmp_offset - kPBContainerChecksumLen)); } else { RETURN_NOT_OK_PREPEND(ParseAndCompareChecksum(record_checksum.data(), { body }), CHECKSUM_ERR_MSG("Data checksum does not match", reader->filename(), tmp_offset - kPBContainerChecksumLen)); } // The checksum is correct. Time to decode the body. // // We could compare pb_type_ against msg.GetTypeName(), but: // 1. pb_type_ is not available when reading the supplemental header, // 2. ParseFromArray() should fail if the data cannot be parsed into the // provided message type. // To permit parsing of very large PB messages, we must use parse through a // CodedInputStream and bump the byte limit. The SetTotalBytesLimit() docs // say that 512MB is the shortest theoretical message length that may produce // integer overflow warnings, so that's what we'll use. ArrayInputStream ais(body.data(), body.size()); CodedInputStream cis(&ais); cis.SetTotalBytesLimit(512 * 1024 * 1024); if (PREDICT_FALSE(!msg->ParseFromCodedStream(&cis))) { return Status::IOError("Unable to parse PB from path", reader->filename()); } *offset = tmp_offset; return Status::OK(); } // Wrapper around ReadPBStartingAt() to enforce that we don't return // Status::Incomplete() for V1 format files. template<typename ReadableFileType> Status ReadFullPB(ReadableFileType* reader, int version, boost::optional<uint64_t>* cached_file_size, uint64_t* offset, Message* msg) { bool had_cached_size = *cached_file_size != boost::none; Status s = ReadPBStartingAt(reader, version, cached_file_size, offset, msg); if (PREDICT_FALSE(s.IsIncomplete() && version == 1)) { return Status::Corruption("Unrecoverable incomplete record", s.ToString()); } // If we hit EOF, but we were using a cached view of the file size, then it might be // that the file has been extended. Invalidate the cache and try again. if (had_cached_size && (s.IsIncomplete() || s.IsEndOfFile())) { *cached_file_size = boost::none; return ReadFullPB(reader, version, cached_file_size, offset, msg); } return s; } // Read and parse the protobuf container file-level header documented in pb_util.h. template<typename ReadableFileType> Status ParsePBFileHeader(ReadableFileType* reader, boost::optional<uint64_t>* cached_file_size, uint64_t* offset, int* version) { RETURN_NOT_OK(CacheFileSize(reader, cached_file_size)); uint64_t file_size = cached_file_size->get(); // We initially read enough data for a V2+ file header. This optimizes for // V2+ and is valid on a V1 file because we don't consider these files valid // unless they contain a record in addition to the file header. The // additional 4 bytes required by a V2+ header (vs V1) is still less than the // minimum number of bytes required for a V1 format data record. uint64_t tmp_offset = *offset; faststring header; RETURN_NOT_OK_PREPEND(ValidateAndReadData(reader, file_size, &tmp_offset, kPBContainerV2HeaderLen, &header), Substitute("Could not read header for proto container file $0", reader->filename())); Slice magic_and_version(header.data(), kPBContainerMagicLen + sizeof(uint32_t)); Slice checksum(header.data() + kPBContainerMagicLen + sizeof(uint32_t), kPBContainerChecksumLen); // Validate magic number. if (PREDICT_FALSE(!strings::memeq(kPBContainerMagic, header.data(), kPBContainerMagicLen))) { string file_magic(reinterpret_cast<const char*>(header.data()), kPBContainerMagicLen); return Status::Corruption("Invalid magic number", Substitute("Expected: $0, found: $1", Utf8SafeCEscape(kPBContainerMagic), Utf8SafeCEscape(file_magic))); } // Validate container file version. uint32_t tmp_version = DecodeFixed32(header.data() + kPBContainerMagicLen); if (PREDICT_FALSE(!IsSupportedContainerVersion(tmp_version))) { return Status::NotSupported( Substitute("Protobuf container has unsupported version: $0. Default version: $1", tmp_version, kPBContainerDefaultVersion)); } // Versions >= 2 have a checksum after the magic number and encoded version // to ensure the integrity of these fields. if (tmp_version >= 2) { RETURN_NOT_OK_PREPEND(ParseAndCompareChecksum(checksum.data(), { magic_and_version }), CHECKSUM_ERR_MSG("File header checksum does not match", reader->filename(), tmp_offset - kPBContainerChecksumLen)); } else { // Version 1 doesn't have a header checksum. Rewind our read offset so this // data will be read again when we next attempt to read a data record. tmp_offset -= kPBContainerChecksumLen; } *offset = tmp_offset; *version = tmp_version; return Status::OK(); } // Read and parse the supplemental header from the container file. template<typename ReadableFileType> Status ReadSupplementalHeader(ReadableFileType* reader, int version, boost::optional<uint64_t>* cached_file_size, uint64_t* offset, ContainerSupHeaderPB* sup_header) { RETURN_NOT_OK_PREPEND(ReadFullPB(reader, version, cached_file_size, offset, sup_header), Substitute("Could not read supplemental header from proto container file $0 " "with version $1 at offset $2", reader->filename(), version, *offset)); return Status::OK(); } } // anonymous namespace void AppendToString(const MessageLite &msg, faststring *output) { DCHECK(msg.IsInitialized()) << InitializationErrorMessage("serialize", msg); AppendPartialToString(msg, output); } void AppendPartialToString(const MessageLite &msg, faststring* output) { size_t old_size = output->size(); size_t byte_size = msg.ByteSizeLong(); // Messages >2G cannot be serialized due to overflow computing ByteSize. DCHECK_GE(byte_size, 0) << "Error computing ByteSize"; output->resize(old_size + byte_size); uint8* start = &((*output)[old_size]); uint8* end = msg.SerializeWithCachedSizesToArray(start); if (end - start != byte_size) { ByteSizeConsistencyError(byte_size, msg.ByteSizeLong(), end - start); } } void SerializeToString(const MessageLite &msg, faststring *output) { output->clear(); AppendToString(msg, output); } Status ParseFromSequentialFile(MessageLite *msg, SequentialFile *rfile) { SequentialFileFileInputStream input(rfile); if (!msg->ParseFromZeroCopyStream(&input)) { RETURN_NOT_OK(input.status()); // If it's not a file IO error then it's a parsing error. // Probably, we read wrong or damaged data here. return Status::Corruption("Error parsing msg", InitializationErrorMessage("parse", *msg)); } return Status::OK(); } Status ParseFromArray(MessageLite* msg, const uint8_t* data, uint32_t length) { if (!msg->ParseFromArray(data, length)) { return Status::Corruption("Error parsing msg", InitializationErrorMessage("parse", *msg)); } return Status::OK(); } Status WritePBToPath(Env* env, const std::string& path, const MessageLite& msg, SyncMode sync) { const string tmp_template = path + kTmpInfix + kTmpTemplateSuffix; string tmp_path; unique_ptr<WritableFile> file; WritableFileOptions opts; opts.is_sensitive = false; RETURN_NOT_OK(env->NewTempWritableFile(opts, tmp_template, &tmp_path, &file)); auto tmp_deleter = MakeScopedCleanup([&]() { WARN_NOT_OK(env->DeleteFile(tmp_path), "Could not delete file " + tmp_path); }); WritableFileOutputStream output(file.get()); bool res = msg.SerializeToZeroCopyStream(&output); if (!res || !output.Flush()) { return Status::IOError("Unable to serialize PB to file"); } if (sync == pb_util::SYNC) { RETURN_NOT_OK_PREPEND(file->Sync(), "Failed to Sync() " + tmp_path); } RETURN_NOT_OK_PREPEND(file->Close(), "Failed to Close() " + tmp_path); RETURN_NOT_OK_PREPEND(env->RenameFile(tmp_path, path), "Failed to rename tmp file to " + path); tmp_deleter.cancel(); if (sync == pb_util::SYNC) { RETURN_NOT_OK_PREPEND(env->SyncDir(DirName(path)), "Failed to SyncDir() parent of " + path); } return Status::OK(); } Status ReadPBFromPath(Env* env, const std::string& path, MessageLite* msg) { unique_ptr<SequentialFile> rfile; SequentialFileOptions opts; opts.is_sensitive = false; RETURN_NOT_OK(env->NewSequentialFile(opts, path, &rfile)); RETURN_NOT_OK(ParseFromSequentialFile(msg, rfile.get())); return Status::OK(); } static void TruncateString(string* s, int max_len) { if (s->size() > max_len) { s->resize(max_len); s->append("<truncated>"); } } void TruncateFields(Message* message, int max_len) { const Reflection* reflection = message->GetReflection(); vector<const FieldDescriptor*> fields; reflection->ListFields(*message, &fields); for (const FieldDescriptor* field : fields) { if (field->is_repeated()) { for (int i = 0; i < reflection->FieldSize(*message, field); i++) { switch (field->cpp_type()) { case FieldDescriptor::CPPTYPE_STRING: { const string& s_const = reflection->GetRepeatedStringReference(*message, field, i, nullptr); TruncateString(const_cast<string*>(&s_const), max_len); break; } case FieldDescriptor::CPPTYPE_MESSAGE: { TruncateFields(reflection->MutableRepeatedMessage(message, field, i), max_len); break; } default: break; } } } else { switch (field->cpp_type()) { case FieldDescriptor::CPPTYPE_STRING: { const string& s_const = reflection->GetStringReference(*message, field, nullptr); TruncateString(const_cast<string*>(&s_const), max_len); break; } case FieldDescriptor::CPPTYPE_MESSAGE: { TruncateFields(reflection->MutableMessage(message, field), max_len); break; } default: break; } } } } namespace { class SecureFieldPrinter : public TextFormat::FastFieldValuePrinter { public: using super = TextFormat::FastFieldValuePrinter; using BaseTextGenerator = TextFormat::BaseTextGenerator; void PrintFieldName(const Message& message, const Reflection* reflection, const FieldDescriptor* field, BaseTextGenerator* generator) const override { hide_next_string_ = field->cpp_type() == FieldDescriptor::CPPTYPE_STRING && field->options().GetExtension(REDACT); super::PrintFieldName(message, reflection, field, generator); } void PrintFieldName(const Message& message, int field_index, int field_count, const Reflection* reflection, const FieldDescriptor* field, BaseTextGenerator* generator) const override { hide_next_string_ = field->cpp_type() == FieldDescriptor::CPPTYPE_STRING && field->options().GetExtension(REDACT); super::PrintFieldName(message, field_index, field_count, reflection, field, generator); } void PrintString(const string& val, BaseTextGenerator* generator) const override { if (hide_next_string_) { hide_next_string_ = false; super::PrintString(KUDU_REDACT(val), generator); return; } super::PrintString(val, generator); } void PrintBytes(const string& val, BaseTextGenerator* generator) const override { if (hide_next_string_) { hide_next_string_ = false; super::PrintBytes(KUDU_REDACT(val), generator); } super::PrintBytes(val, generator); } mutable bool hide_next_string_ = false; }; } // anonymous namespace string SecureDebugString(const Message& msg) { string debug_string; TextFormat::Printer printer; printer.SetDefaultFieldValuePrinter(new SecureFieldPrinter()); printer.PrintToString(msg, &debug_string); return debug_string; } string SecureShortDebugString(const Message& msg) { string debug_string; TextFormat::Printer printer; printer.SetSingleLineMode(true); printer.SetDefaultFieldValuePrinter(new SecureFieldPrinter()); printer.PrintToString(msg, &debug_string); // Single line mode currently might have an extra space at the end. if (!debug_string.empty() && debug_string[debug_string.size() - 1] == ' ') { debug_string.resize(debug_string.size() - 1); } return debug_string; } WritablePBContainerFile::WritablePBContainerFile(shared_ptr<RWFile> writer) : state_(FileState::NOT_INITIALIZED), offset_(writer->GetEncryptionHeaderSize()), version_(kPBContainerDefaultVersion), writer_(std::move(writer)) {} WritablePBContainerFile::~WritablePBContainerFile() { WARN_NOT_OK(Close(), "Could not Close() when destroying file"); } Status WritablePBContainerFile::SetVersionForTests(int version) { DCHECK_EQ(FileState::NOT_INITIALIZED, state_); if (!IsSupportedContainerVersion(version)) { return Status::NotSupported(Substitute("Version $0 is not supported", version)); } version_ = version; return Status::OK(); } Status WritablePBContainerFile::CreateNew(const Message& msg) { DCHECK_EQ(FileState::NOT_INITIALIZED, state_); const uint64_t kHeaderLen = (version_ == 1) ? kPBContainerV1HeaderLen : kPBContainerV1HeaderLen + kPBContainerChecksumLen; faststring buf; buf.resize(kHeaderLen); // Serialize the magic. strings::memcpy_inlined(buf.data(), kPBContainerMagic, kPBContainerMagicLen); uint64_t offset = kPBContainerMagicLen; // Serialize the version. InlineEncodeFixed32(buf.data() + offset, version_); offset += sizeof(uint32_t); DCHECK_EQ(kPBContainerV1HeaderLen, offset) << "Serialized unexpected number of total bytes"; // Versions >= 2: Checksum the magic and version. if (version_ >= 2) { uint32_t header_checksum = crc::Crc32c(buf.data(), offset); InlineEncodeFixed32(buf.data() + offset, header_checksum); offset += sizeof(uint32_t); } DCHECK_EQ(offset, kHeaderLen); // Serialize the supplemental header. ContainerSupHeaderPB sup_header; PopulateDescriptorSet(msg.GetDescriptor()->file(), sup_header.mutable_protos()); sup_header.set_pb_type(msg.GetTypeName()); RETURN_NOT_OK_PREPEND(AppendMsgToBuffer(sup_header, &buf), "Failed to prepare supplemental header for writing"); // Write the serialized buffer to the file. RETURN_NOT_OK_PREPEND(AppendBytes(buf), "Failed to append header to file"); state_ = FileState::OPEN; return Status::OK(); } Status WritablePBContainerFile::OpenExisting() { DCHECK_EQ(FileState::NOT_INITIALIZED, state_); boost::optional<uint64_t> size; RETURN_NOT_OK(ParsePBFileHeader(writer_.get(), &size, &offset_, &version_)); ContainerSupHeaderPB sup_header; RETURN_NOT_OK(ReadSupplementalHeader(writer_.get(), version_, &size, &offset_, &sup_header)); offset_ = size.get(); // Reset the write offset to the end of the file. state_ = FileState::OPEN; return Status::OK(); } Status WritablePBContainerFile::AppendBytes(const Slice& data) { std::lock_guard<Mutex> l(offset_lock_); RETURN_NOT_OK(writer_->Write(offset_, data)); offset_ += data.size(); return Status::OK(); } Status WritablePBContainerFile::Append(const Message& msg) { DCHECK_EQ(FileState::OPEN, state_); faststring buf; RETURN_NOT_OK_PREPEND(AppendMsgToBuffer(msg, &buf), "Failed to prepare buffer for writing"); RETURN_NOT_OK_PREPEND(AppendBytes(buf), "Failed to append data to file"); return Status::OK(); } Status WritablePBContainerFile::Flush() { DCHECK_EQ(FileState::OPEN, state_); // TODO: Flush just the dirty bytes. RETURN_NOT_OK_PREPEND(writer_->Flush(RWFile::FLUSH_ASYNC, 0, 0), "Failed to Flush() file"); return Status::OK(); } Status WritablePBContainerFile::Sync() { DCHECK_EQ(FileState::OPEN, state_); RETURN_NOT_OK_PREPEND(writer_->Sync(), "Failed to Sync() file"); return Status::OK(); } uint64_t WritablePBContainerFile::Offset() const { std::lock_guard<Mutex> l(offset_lock_); return offset_; } Status WritablePBContainerFile::Close() { if (state_ != FileState::CLOSED) { state_ = FileState::CLOSED; Status s = writer_->Close(); writer_.reset(); RETURN_NOT_OK_PREPEND(s, "Failed to Close() file"); } return Status::OK(); } const string& WritablePBContainerFile::filename() const { return writer_->filename(); } Status WritablePBContainerFile::AppendMsgToBuffer(const Message& msg, faststring* buf) { DCHECK(msg.IsInitialized()) << InitializationErrorMessage("serialize", msg); size_t data_len_long = msg.ByteSizeLong(); // Messages >2G cannot be serialized due to format restrictions. CHECK_LT(data_len_long, std::numeric_limits<int32_t>::max()); uint32_t data_len = static_cast<uint32_t>(data_len_long); uint64_t record_buflen = sizeof(uint32_t) + data_len + sizeof(uint32_t); if (version_ >= 2) { record_buflen += sizeof(uint32_t); // Additional checksum just for the length. } // Grow the buffer to hold the new data. uint64_t record_offset = buf->size(); buf->resize(record_offset + record_buflen); uint8_t* dst = buf->data() + record_offset; // Serialize the data length. size_t cur_offset = 0; InlineEncodeFixed32(dst + cur_offset, data_len); cur_offset += sizeof(uint32_t); // For version >= 2: Serialize the checksum of the data length. if (version_ >= 2) { uint32_t length_checksum = crc::Crc32c(&data_len, sizeof(data_len)); InlineEncodeFixed32(dst + cur_offset, length_checksum); cur_offset += sizeof(uint32_t); } // Serialize the data. uint64_t data_offset = cur_offset; if (PREDICT_FALSE(!msg.SerializeWithCachedSizesToArray(dst + cur_offset))) { return Status::IOError("Failed to serialize PB to array"); } cur_offset += data_len; // Calculate and serialize the data checksum. // For version 1, this is the checksum of the len + data. // For version >= 2, this is only the checksum of the data. uint32_t data_checksum; if (version_ == 1) { data_checksum = crc::Crc32c(dst, cur_offset); } else { data_checksum = crc::Crc32c(dst + data_offset, data_len); } InlineEncodeFixed32(dst + cur_offset, data_checksum); cur_offset += sizeof(uint32_t); DCHECK_EQ(record_buflen, cur_offset) << "Serialized unexpected number of total bytes"; return Status::OK(); } void WritablePBContainerFile::PopulateDescriptorSet( const FileDescriptor* desc, FileDescriptorSet* output) { // Because we don't compile protobuf with TSAN enabled, copying the // static PB descriptors in this function ends up triggering a lot of // race reports. We suppress the reports, but TSAN still has to walk // the stack, etc, and this function becomes very slow. So, we ignore // TSAN here. debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan; FileDescriptorSet all_descs; // Tracks all schemas that have been added to 'unemitted' at one point // or another. Is a superset of 'unemitted' and only ever grows. unordered_set<const FileDescriptor*> processed; // Tracks all remaining unemitted schemas. deque<const FileDescriptor*> unemitted; InsertOrDie(&processed, desc); unemitted.push_front(desc); while (!unemitted.empty()) { const FileDescriptor* proto = unemitted.front(); // The current schema is emitted iff we've processed (i.e. emitted) all // of its dependencies. bool emit = true; for (int i = 0; i < proto->dependency_count(); i++) { const FileDescriptor* dep = proto->dependency(i); if (InsertIfNotPresent(&processed, dep)) { unemitted.push_front(dep); emit = false; } } if (emit) { unemitted.pop_front(); proto->CopyTo(all_descs.mutable_file()->Add()); } } all_descs.Swap(output); } ReadablePBContainerFile::ReadablePBContainerFile(shared_ptr<RandomAccessFile> reader) : state_(FileState::NOT_INITIALIZED), version_(kPBContainerInvalidVersion), offset_(reader->GetEncryptionHeaderSize()), reader_(std::move(reader)) { } ReadablePBContainerFile::~ReadablePBContainerFile() { Close(); } Status ReadablePBContainerFile::Open() { DCHECK_EQ(FileState::NOT_INITIALIZED, state_); RETURN_NOT_OK(ParsePBFileHeader(reader_.get(), &cached_file_size_, &offset_, &version_)); ContainerSupHeaderPB sup_header; RETURN_NOT_OK(ReadSupplementalHeader(reader_.get(), version_, &cached_file_size_, &offset_, &sup_header)); protos_.reset(sup_header.release_protos()); pb_type_ = sup_header.pb_type(); state_ = FileState::OPEN; return Status::OK(); } Status ReadablePBContainerFile::ReadNextPB(Message* msg) { DCHECK_EQ(FileState::OPEN, state_); return ReadFullPB(reader_.get(), version_, &cached_file_size_, &offset_, msg); } Status ReadablePBContainerFile::GetPrototype(const Message** prototype) { if (!prototype_) { // Loading the schemas into a DescriptorDatabase (and not directly into // a DescriptorPool) defers resolution until FindMessageTypeByName() // below, allowing for schemas to be loaded in any order. unique_ptr<SimpleDescriptorDatabase> db(new SimpleDescriptorDatabase()); for (int i = 0; i < protos()->file_size(); i++) { if (!db->Add(protos()->file(i))) { return Status::Corruption("Descriptor not loaded", Substitute( "Could not load descriptor for PB type $0 referenced in container file", pb_type())); } } unique_ptr<DescriptorPool> pool(new DescriptorPool(db.get())); const Descriptor* desc = pool->FindMessageTypeByName(pb_type()); if (!desc) { return Status::NotFound("Descriptor not found", Substitute( "Could not find descriptor for PB type $0 referenced in container file", pb_type())); } unique_ptr<DynamicMessageFactory> factory(new DynamicMessageFactory()); const Message* p = factory->GetPrototype(desc); if (!p) { return Status::NotSupported("Descriptor not supported", Substitute( "Descriptor $0 referenced in container file not supported", pb_type())); } db_ = std::move(db); descriptor_pool_ = std::move(pool); message_factory_ = std::move(factory); prototype_ = p; } *prototype = prototype_; return Status::OK(); } Status ReadablePBContainerFile::Dump(ostream* os, ReadablePBContainerFile::Format format) { DCHECK_EQ(FileState::OPEN, state_); // Since we use the protobuf library support for dumping JSON, there isn't any easy // way to hook in our redaction support. Since this is only used by CLI tools, // just refuse to dump JSON if redaction is enabled. if (format == Format::JSON && KUDU_SHOULD_REDACT()) { return Status::NotSupported("cannot dump PBC file in JSON format if redaction is enabled"); } const char* const kDashes = "-------"; if (format == Format::DEBUG) { *os << "File header" << endl; *os << kDashes << endl; *os << "Protobuf container version: " << version_ << endl; *os << "Total container file size: " << *cached_file_size_ << endl; *os << "Entry PB type: " << pb_type_ << endl; *os << endl; } // Use the embedded protobuf information from the container file to // create the appropriate kind of protobuf Message. const Message* prototype; RETURN_NOT_OK(GetPrototype(&prototype)); unique_ptr<Message> msg(prototype_->New()); // Dump each message in the container file. int count = 0; uint64_t prev_offset = offset_; Status s; string buf; for (s = ReadNextPB(msg.get()); s.ok(); s = ReadNextPB(msg.get())) { switch (format) { case Format::ONELINE: *os << count << "\t" << SecureShortDebugString(*msg) << endl; break; case Format::DEFAULT: case Format::DEBUG: *os << "Message " << count << endl; if (format == Format::DEBUG) { *os << "offset: " << prev_offset << endl; *os << "length: " << (offset_ - prev_offset) << endl; } *os << kDashes << endl; *os << SecureDebugString(*msg) << endl; break; case Format::JSON: case Format::JSON_PRETTY: buf.clear(); auto opt = google::protobuf::util::JsonPrintOptions(); if (format == Format::JSON_PRETTY) { opt.add_whitespace = true; } const auto& google_status = google::protobuf::util::MessageToJsonString(*msg, &buf, opt); if (!google_status.ok()) { return Status::RuntimeError("could not convert PB to JSON", google_status.ToString()); } *os << buf << endl; break; } prev_offset = offset_; count++; } if (format == Format::DEBUG && !s.IsEndOfFile()) { *os << "Message " << count << endl; *os << "error: failed to parse protobuf message" << endl; *os << "offset: " << prev_offset << endl; *os << "remaining file length: " << (*cached_file_size_ - prev_offset) << endl; *os << kDashes << endl; } return s.IsEndOfFile() ? Status::OK() : s; } Status ReadablePBContainerFile::Close() { state_ = FileState::CLOSED; reader_.reset(); return Status::OK(); } int ReadablePBContainerFile::version() const { DCHECK_EQ(FileState::OPEN, state_); return version_; } uint64_t ReadablePBContainerFile::offset() const { DCHECK_EQ(FileState::OPEN, state_); return offset_; } Status ReadPBContainerFromPath(Env* env, const std::string& path, Message* msg, SensitivityMode sensitivity_mode) { unique_ptr<RandomAccessFile> file; RandomAccessFileOptions opts; opts.is_sensitive = sensitivity_mode == SENSITIVE; RETURN_NOT_OK(env->NewRandomAccessFile(opts, path, &file)); ReadablePBContainerFile pb_file(std::move(file)); RETURN_NOT_OK(pb_file.Open()); RETURN_NOT_OK(pb_file.ReadNextPB(msg)); return pb_file.Close(); } Status WritePBContainerToPath(Env* env, const std::string& path, const Message& msg, CreateMode create, SyncMode sync, SensitivityMode sensitivity_mode) { TRACE_EVENT2("io", "WritePBContainerToPath", "path", path, "msg_type", msg.GetTypeName()); if (create == NO_OVERWRITE && env->FileExists(path)) { return Status::AlreadyPresent(Substitute("File $0 already exists", path)); } const string tmp_template = path + kTmpInfix + kTmpTemplateSuffix; string tmp_path; unique_ptr<RWFile> file; RWFileOptions opts; opts.is_sensitive = sensitivity_mode == SENSITIVE; RETURN_NOT_OK(env->NewTempRWFile(opts, tmp_template, &tmp_path, &file)); auto tmp_deleter = MakeScopedCleanup([&]() { WARN_NOT_OK(env->DeleteFile(tmp_path), "Could not delete file " + tmp_path); }); WritablePBContainerFile pb_file(std::move(file)); RETURN_NOT_OK(pb_file.CreateNew(msg)); RETURN_NOT_OK(pb_file.Append(msg)); if (sync == pb_util::SYNC) { RETURN_NOT_OK(pb_file.Sync()); } RETURN_NOT_OK(pb_file.Close()); RETURN_NOT_OK_PREPEND(env->RenameFile(tmp_path, path), "Failed to rename tmp file to " + path); tmp_deleter.cancel(); if (sync == pb_util::SYNC) { RETURN_NOT_OK_PREPEND(env->SyncDir(DirName(path)), "Failed to SyncDir() parent of " + path); } return Status::OK(); } scoped_refptr<debug::ConvertableToTraceFormat> PbTracer::TracePb(const Message& msg) { return make_scoped_refptr(new PbTracer(msg)); } PbTracer::PbTracer(const Message& msg) : msg_(msg.New()) { msg_->CopyFrom(msg); } void PbTracer::AppendAsTraceFormat(std::string* out) const { pb_util::TruncateFields(msg_.get(), kMaxFieldLengthToTrace); std::ostringstream ss; JsonWriter jw(&ss, JsonWriter::COMPACT); jw.Protobuf(*msg_); out->append(ss.str()); } } // namespace pb_util } // namespace kudu