cpp/core/shuffle/Payload.cc (458 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "shuffle/Payload.h" #include <arrow/buffer.h> #include <arrow/io/memory.h> #include <arrow/util/bitmap.h> #include <iostream> #include <numeric> #include "shuffle/Options.h" #include "shuffle/Utils.h" #include "utils/Timer.h" #include "utils/exception.h" namespace gluten { namespace { static const Payload::Type kCompressedType = gluten::BlockPayload::kCompressed; static const Payload::Type kUncompressedType = gluten::BlockPayload::kUncompressed; static constexpr int64_t kZeroLengthBuffer = 0; static constexpr int64_t kNullBuffer = -1; static constexpr int64_t kUncompressedBuffer = -2; template <typename T> void write(uint8_t** dst, T data) { memcpy(*dst, &data, sizeof(T)); *dst += sizeof(T); } template <typename T> T* advance(uint8_t** dst) { auto ptr = reinterpret_cast<T*>(*dst); *dst += sizeof(T); return ptr; } arrow::Result<std::pair<uint8_t, uint32_t>> readTypeAndRows(arrow::io::InputStream* inputStream) { uint8_t type; uint32_t numRows; ARROW_ASSIGN_OR_RAISE(auto bytes, inputStream->Read(sizeof(Payload::Type), &type)); if (bytes == 0) { // Reach EOS. return std::make_pair(0, 0); } RETURN_NOT_OK(inputStream->Read(sizeof(uint32_t), &numRows)); return std::make_pair(type, numRows); } arrow::Result<int64_t> compressBuffer( const std::shared_ptr<arrow::Buffer>& buffer, uint8_t* output, int64_t outputLength, arrow::util::Codec* codec) { auto outputPtr = &output; if (!buffer) { write<int64_t>(outputPtr, kNullBuffer); return sizeof(int64_t); } if (buffer->size() == 0) { write<int64_t>(outputPtr, kZeroLengthBuffer); return sizeof(int64_t); } static const int64_t kCompressedBufferHeaderLength = 2 * sizeof(int64_t); auto* compressedLengthPtr = advance<int64_t>(outputPtr); write(outputPtr, static_cast<int64_t>(buffer->size())); ARROW_ASSIGN_OR_RAISE( auto compressedLength, codec->Compress(buffer->size(), buffer->data(), outputLength, *outputPtr)); if (compressedLength >= buffer->size()) { // Write uncompressed buffer. memcpy(*outputPtr, buffer->data(), buffer->size()); *compressedLengthPtr = kUncompressedBuffer; return kCompressedBufferHeaderLength + buffer->size(); } *compressedLengthPtr = static_cast<int64_t>(compressedLength); return kCompressedBufferHeaderLength + compressedLength; } arrow::Status compressAndFlush( const std::shared_ptr<arrow::Buffer>& buffer, arrow::io::OutputStream* outputStream, arrow::util::Codec* codec, arrow::MemoryPool* pool, int64_t& compressTime, int64_t& writeTime) { if (!buffer) { ScopedTimer timer(&writeTime); RETURN_NOT_OK(outputStream->Write(&kNullBuffer, sizeof(int64_t))); return arrow::Status::OK(); } if (buffer->size() == 0) { ScopedTimer timer(&writeTime); RETURN_NOT_OK(outputStream->Write(&kZeroLengthBuffer, sizeof(int64_t))); return arrow::Status::OK(); } ScopedTimer timer(&compressTime); auto maxCompressedLength = codec->MaxCompressedLen(buffer->size(), buffer->data()); ARROW_ASSIGN_OR_RAISE( auto compressed, arrow::AllocateResizableBuffer(sizeof(int64_t) * 2 + maxCompressedLength, pool)); auto output = compressed->mutable_data(); ARROW_ASSIGN_OR_RAISE(auto compressedSize, compressBuffer(buffer, output, maxCompressedLength, codec)); timer.switchTo(&writeTime); RETURN_NOT_OK(outputStream->Write(compressed->data(), compressedSize)); return arrow::Status::OK(); } arrow::Result<std::shared_ptr<arrow::Buffer>> readUncompressedBuffer(arrow::io::InputStream* inputStream) { int64_t bufferLength; RETURN_NOT_OK(inputStream->Read(sizeof(int64_t), &bufferLength)); if (bufferLength == kNullBuffer) { return nullptr; } ARROW_ASSIGN_OR_RAISE(auto buffer, inputStream->Read(bufferLength)); return buffer; } arrow::Result<std::shared_ptr<arrow::Buffer>> readCompressedBuffer( arrow::io::InputStream* inputStream, const std::shared_ptr<arrow::util::Codec>& codec, arrow::MemoryPool* pool, int64_t& decompressTime) { int64_t compressedLength; RETURN_NOT_OK(inputStream->Read(sizeof(int64_t), &compressedLength)); if (compressedLength == kNullBuffer) { return nullptr; } if (compressedLength == kZeroLengthBuffer) { return zeroLengthNullBuffer(); } int64_t uncompressedLength; RETURN_NOT_OK(inputStream->Read(sizeof(int64_t), &uncompressedLength)); if (compressedLength == kUncompressedBuffer) { ARROW_ASSIGN_OR_RAISE(auto uncompressed, arrow::AllocateResizableBuffer(uncompressedLength, pool)); RETURN_NOT_OK(inputStream->Read(uncompressedLength, const_cast<uint8_t*>(uncompressed->data()))); return uncompressed; } ARROW_ASSIGN_OR_RAISE(auto compressed, arrow::AllocateBuffer(compressedLength, pool)); RETURN_NOT_OK(inputStream->Read(compressedLength, const_cast<uint8_t*>(compressed->data()))); ScopedTimer timer(&decompressTime); ARROW_ASSIGN_OR_RAISE(auto output, arrow::AllocateResizableBuffer(uncompressedLength, pool)); RETURN_NOT_OK(codec->Decompress( compressedLength, compressed->data(), uncompressedLength, const_cast<uint8_t*>(output->data()))); return output; } } // namespace Payload::Payload(Payload::Type type, uint32_t numRows, const std::vector<bool>* isValidityBuffer) : type_(type), numRows_(numRows), isValidityBuffer_(isValidityBuffer) {} std::string Payload::toString() const { static std::string kUncompressedString = "Payload::kUncompressed"; static std::string kCompressedString = "Payload::kCompressed"; static std::string kToBeCompressedString = "Payload::kToBeCompressed"; if (type_ == kUncompressed) { return kUncompressedString; } if (type_ == kCompressed) { return kCompressedString; } return kToBeCompressedString; } arrow::Result<std::unique_ptr<BlockPayload>> BlockPayload::fromBuffers( Payload::Type payloadType, uint32_t numRows, std::vector<std::shared_ptr<arrow::Buffer>> buffers, const std::vector<bool>* isValidityBuffer, arrow::MemoryPool* pool, arrow::util::Codec* codec) { if (payloadType == Payload::Type::kCompressed) { Timer compressionTime; compressionTime.start(); // Compress. // Compressed buffer layout: | buffer1 compressedLength | buffer1 uncompressedLength | buffer1 | ... const auto metadataLength = sizeof(int64_t) * 2 * buffers.size(); int64_t totalCompressedLength = std::accumulate(buffers.begin(), buffers.end(), 0LL, [&](auto sum, const auto& buffer) { if (!buffer) { return sum; } return sum + codec->MaxCompressedLen(buffer->size(), buffer->data()); }); const auto maxCompressedLength = metadataLength + totalCompressedLength; ARROW_ASSIGN_OR_RAISE( std::shared_ptr<arrow::ResizableBuffer> compressed, arrow::AllocateResizableBuffer(maxCompressedLength, pool)); auto output = compressed->mutable_data(); int64_t actualLength = 0; // Compress buffers one by one. for (auto& buffer : buffers) { auto availableLength = maxCompressedLength - actualLength; // Release buffer after compression. ARROW_ASSIGN_OR_RAISE(auto compressedSize, compressBuffer(std::move(buffer), output, availableLength, codec)); output += compressedSize; actualLength += compressedSize; } ARROW_RETURN_IF(actualLength < 0, arrow::Status::Invalid("Writing compressed buffer out of bound.")); RETURN_NOT_OK(compressed->Resize(actualLength)); compressionTime.stop(); auto payload = std::unique_ptr<BlockPayload>(new BlockPayload( Type::kCompressed, numRows, std::vector<std::shared_ptr<arrow::Buffer>>{compressed}, isValidityBuffer, pool, codec)); payload->setCompressionTime(compressionTime.realTimeUsed()); return payload; } return std::unique_ptr<BlockPayload>( new BlockPayload(payloadType, numRows, std::move(buffers), isValidityBuffer, pool, codec)); } arrow::Status BlockPayload::serialize(arrow::io::OutputStream* outputStream) { switch (type_) { case Type::kUncompressed: { ScopedTimer timer(&writeTime_); RETURN_NOT_OK(outputStream->Write(&kUncompressedType, sizeof(Type))); RETURN_NOT_OK(outputStream->Write(&numRows_, sizeof(uint32_t))); for (auto& buffer : buffers_) { if (!buffer) { RETURN_NOT_OK(outputStream->Write(&kNullBuffer, sizeof(int64_t))); continue; } int64_t bufferSize = buffer->size(); RETURN_NOT_OK(outputStream->Write(&bufferSize, sizeof(int64_t))); if (bufferSize > 0) { RETURN_NOT_OK(outputStream->Write(std::move(buffer))); } } } break; case Type::kToBeCompressed: { { ScopedTimer timer(&writeTime_); RETURN_NOT_OK(outputStream->Write(&kCompressedType, sizeof(Type))); RETURN_NOT_OK(outputStream->Write(&numRows_, sizeof(uint32_t))); } for (auto& buffer : buffers_) { RETURN_NOT_OK(compressAndFlush(std::move(buffer), outputStream, codec_, pool_, compressTime_, writeTime_)); } } break; case Type::kCompressed: { ScopedTimer timer(&writeTime_); RETURN_NOT_OK(outputStream->Write(&kCompressedType, sizeof(Type))); RETURN_NOT_OK(outputStream->Write(&numRows_, sizeof(uint32_t))); RETURN_NOT_OK(outputStream->Write(std::move(buffers_[0]))); } break; } buffers_.clear(); return arrow::Status::OK(); } arrow::Result<std::shared_ptr<arrow::Buffer>> BlockPayload::readBufferAt(uint32_t pos) { if (type_ == Type::kCompressed) { return arrow::Status::Invalid("Cannot read buffer from compressed BlockPayload."); } return std::move(buffers_[pos]); } arrow::Result<std::vector<std::shared_ptr<arrow::Buffer>>> BlockPayload::deserialize( arrow::io::InputStream* inputStream, const std::shared_ptr<arrow::Schema>& schema, const std::shared_ptr<arrow::util::Codec>& codec, arrow::MemoryPool* pool, uint32_t& numRows, int64_t& decompressTime) { static const std::vector<std::shared_ptr<arrow::Buffer>> kEmptyBuffers{}; ARROW_ASSIGN_OR_RAISE(auto typeAndRows, readTypeAndRows(inputStream)); if (typeAndRows.first == 0) { numRows = 0; return kEmptyBuffers; } numRows = typeAndRows.second; auto fields = schema->fields(); auto isCompressionEnabled = typeAndRows.first == Type::kCompressed; auto readBuffer = [&]() { if (isCompressionEnabled) { return readCompressedBuffer(inputStream, codec, pool, decompressTime); } else { return readUncompressedBuffer(inputStream); } }; bool hasComplexDataType = false; std::vector<std::shared_ptr<arrow::Buffer>> buffers; for (const auto& field : fields) { auto fieldType = field->type()->id(); switch (fieldType) { case arrow::BinaryType::type_id: case arrow::StringType::type_id: { buffers.emplace_back(); ARROW_ASSIGN_OR_RAISE(buffers.back(), readBuffer()); buffers.emplace_back(); ARROW_ASSIGN_OR_RAISE(buffers.back(), readBuffer()); buffers.emplace_back(); ARROW_ASSIGN_OR_RAISE(buffers.back(), readBuffer()); break; } case arrow::StructType::type_id: case arrow::MapType::type_id: case arrow::ListType::type_id: { hasComplexDataType = true; } break; default: { buffers.emplace_back(); ARROW_ASSIGN_OR_RAISE(buffers.back(), readBuffer()); buffers.emplace_back(); ARROW_ASSIGN_OR_RAISE(buffers.back(), readBuffer()); break; } } } if (hasComplexDataType) { buffers.emplace_back(); ARROW_ASSIGN_OR_RAISE(buffers.back(), readBuffer()); } return buffers; } void BlockPayload::setCompressionTime(int64_t compressionTime) { compressTime_ = compressionTime; } arrow::Result<std::unique_ptr<InMemoryPayload>> InMemoryPayload::merge( std::unique_ptr<InMemoryPayload> source, std::unique_ptr<InMemoryPayload> append, arrow::MemoryPool* pool) { auto mergedRows = source->numRows() + append->numRows(); auto isValidityBuffer = source->isValidityBuffer(); auto numBuffers = append->numBuffers(); ARROW_RETURN_IF( numBuffers != source->numBuffers(), arrow::Status::Invalid("Number of merging buffers doesn't match.")); std::vector<std::shared_ptr<arrow::Buffer>> merged; merged.resize(numBuffers); for (size_t i = 0; i < numBuffers; ++i) { ARROW_ASSIGN_OR_RAISE(auto sourceBuffer, source->readBufferAt(i)); ARROW_ASSIGN_OR_RAISE(auto appendBuffer, append->readBufferAt(i)); if (isValidityBuffer->at(i)) { if (!sourceBuffer) { if (!appendBuffer) { merged[i] = nullptr; } else { ARROW_ASSIGN_OR_RAISE( auto buffer, arrow::AllocateResizableBuffer(arrow::bit_util::BytesForBits(mergedRows), pool)); // Source is null, fill all true. arrow::bit_util::SetBitsTo(buffer->mutable_data(), 0, source->numRows(), true); // Write append bits. arrow::internal::CopyBitmap( appendBuffer->data(), 0, append->numRows(), buffer->mutable_data(), source->numRows()); merged[i] = std::move(buffer); } } else { // Because sourceBuffer can be resized, need to save buffer size in advance. auto sourceBufferSize = sourceBuffer->size(); auto resizable = std::dynamic_pointer_cast<arrow::ResizableBuffer>(sourceBuffer); auto mergedBytes = arrow::bit_util::BytesForBits(mergedRows); if (resizable) { // If source is resizable, resize and reuse source. RETURN_NOT_OK(resizable->Resize(mergedBytes)); } else { // Otherwise copy source. ARROW_ASSIGN_OR_RAISE(resizable, arrow::AllocateResizableBuffer(mergedBytes, pool)); memcpy(resizable->mutable_data(), sourceBuffer->data(), sourceBufferSize); } if (!appendBuffer) { arrow::bit_util::SetBitsTo(resizable->mutable_data(), source->numRows(), append->numRows(), true); } else { arrow::internal::CopyBitmap( appendBuffer->data(), 0, append->numRows(), resizable->mutable_data(), source->numRows()); } merged[i] = std::move(resizable); } } else { if (appendBuffer->size() == 0) { merged[i] = std::move(sourceBuffer); } else { // Because sourceBuffer can be resized, need to save buffer size in advance. auto sourceBufferSize = sourceBuffer->size(); auto mergedSize = sourceBufferSize + appendBuffer->size(); auto resizable = std::dynamic_pointer_cast<arrow::ResizableBuffer>(sourceBuffer); if (resizable) { // If source is resizable, resize and reuse source. RETURN_NOT_OK(resizable->Resize(mergedSize)); } else { // Otherwise copy source. ARROW_ASSIGN_OR_RAISE(resizable, arrow::AllocateResizableBuffer(mergedSize, pool)); memcpy(resizable->mutable_data(), sourceBuffer->data(), sourceBufferSize); } // Copy append. memcpy(resizable->mutable_data() + sourceBufferSize, appendBuffer->data(), appendBuffer->size()); merged[i] = std::move(resizable); } } } return std::make_unique<InMemoryPayload>(mergedRows, isValidityBuffer, std::move(merged)); } arrow::Result<std::unique_ptr<BlockPayload>> InMemoryPayload::toBlockPayload(Payload::Type payloadType, arrow::MemoryPool* pool, arrow::util::Codec* codec) { return BlockPayload::fromBuffers(payloadType, numRows_, std::move(buffers_), isValidityBuffer_, pool, codec); } arrow::Status InMemoryPayload::serialize(arrow::io::OutputStream* outputStream) { return arrow::Status::Invalid("Cannot serialize InMemoryPayload."); } arrow::Result<std::shared_ptr<arrow::Buffer>> InMemoryPayload::readBufferAt(uint32_t index) { return std::move(buffers_[index]); } int64_t InMemoryPayload::getBufferSize() const { return gluten::getBufferSize(buffers_); } arrow::Status InMemoryPayload::copyBuffers(arrow::MemoryPool* pool) { for (auto& buffer : buffers_) { if (!buffer) { continue; } if (buffer->size() == 0) { buffer = zeroLengthNullBuffer(); continue; } ARROW_ASSIGN_OR_RAISE(auto copy, arrow::AllocateResizableBuffer(buffer->size(), pool)); memcpy(copy->mutable_data(), buffer->data(), buffer->size()); buffer = std::move(copy); } return arrow::Status::OK(); } UncompressedDiskBlockPayload::UncompressedDiskBlockPayload( Type type, uint32_t numRows, const std::vector<bool>* isValidityBuffer, arrow::io::InputStream*& inputStream, uint64_t rawSize, arrow::MemoryPool* pool, arrow::util::Codec* codec) : Payload(type, numRows, isValidityBuffer), inputStream_(inputStream), rawSize_(rawSize), pool_(pool), codec_(codec) {} arrow::Result<std::shared_ptr<arrow::Buffer>> UncompressedDiskBlockPayload::readBufferAt(uint32_t index) { return arrow::Status::Invalid("Cannot read buffer from UncompressedDiskBlockPayload."); } arrow::Status UncompressedDiskBlockPayload::serialize(arrow::io::OutputStream* outputStream) { if (codec_ == nullptr || type_ == Payload::kUncompressed) { ARROW_ASSIGN_OR_RAISE(auto block, inputStream_->Read(rawSize_)); RETURN_NOT_OK(outputStream->Write(block)); return arrow::Status::OK(); } ARROW_RETURN_IF( type_ != Payload::kToBeCompressed, arrow::Status::Invalid( "Invalid payload type: " + std::to_string(type_) + ", should be either Payload::kUncompressed or Payload::kToBeCompressed")); ARROW_ASSIGN_OR_RAISE(auto startPos, inputStream_->Tell()); auto typeAndRows = readTypeAndRows(inputStream_); // Discard type and rows. RETURN_NOT_OK(typeAndRows.status()); RETURN_NOT_OK(outputStream->Write(&kCompressedType, sizeof(kCompressedType))); RETURN_NOT_OK(outputStream->Write(&numRows_, sizeof(uint32_t))); auto readPos = startPos + sizeof(kUncompressedType) + sizeof(uint32_t); while (readPos - startPos < rawSize_) { ARROW_ASSIGN_OR_RAISE(auto uncompressed, readUncompressedBuffer()); ARROW_ASSIGN_OR_RAISE(readPos, inputStream_->Tell()); RETURN_NOT_OK(compressAndFlush(std::move(uncompressed), outputStream, codec_, pool_, compressTime_, writeTime_)); } return arrow::Status::OK(); } arrow::Result<std::shared_ptr<arrow::Buffer>> UncompressedDiskBlockPayload::readUncompressedBuffer() { readPos_++; int64_t bufferLength; RETURN_NOT_OK(inputStream_->Read(sizeof(int64_t), &bufferLength)); if (bufferLength == kNullBuffer) { return nullptr; } if (bufferLength == 0) { return zeroLengthNullBuffer(); } ARROW_ASSIGN_OR_RAISE(auto buffer, inputStream_->Read(bufferLength)); return buffer; } CompressedDiskBlockPayload::CompressedDiskBlockPayload( uint32_t numRows, const std::vector<bool>* isValidityBuffer, arrow::io::InputStream*& inputStream, uint64_t rawSize, arrow::MemoryPool* pool) : Payload(Type::kCompressed, numRows, isValidityBuffer), inputStream_(inputStream), rawSize_(rawSize), pool_(pool) {} arrow::Status CompressedDiskBlockPayload::serialize(arrow::io::OutputStream* outputStream) { ARROW_ASSIGN_OR_RAISE(auto block, inputStream_->Read(rawSize_)); RETURN_NOT_OK(outputStream->Write(block)); return arrow::Status::OK(); } arrow::Result<std::shared_ptr<arrow::Buffer>> CompressedDiskBlockPayload::readBufferAt(uint32_t index) { return arrow::Status::Invalid("Cannot read buffer from CompressedDiskBlockPayload."); } } // namespace gluten