cpp/velox/shuffle/VeloxShuffleReader.cc (508 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "VeloxShuffleReader.h" #include <arrow/array/array_binary.h> #include <arrow/io/buffered.h> #include "memory/VeloxColumnarBatch.h" #include "shuffle/Payload.h" #include "shuffle/Utils.h" #include "utils/Common.h" #include "utils/Compression.h" #include "utils/Timer.h" #include "utils/VeloxArrowUtils.h" #include "utils/macros.h" #include "velox/serializers/PrestoSerializer.h" #include "velox/vector/ComplexVector.h" #include "velox/vector/FlatVector.h" #include "velox/vector/arrow/Bridge.h" #include <iostream> // using namespace facebook; using namespace facebook::velox; namespace gluten { namespace { struct BufferViewReleaser { BufferViewReleaser() : BufferViewReleaser(nullptr) {} BufferViewReleaser(std::shared_ptr<arrow::Buffer> arrowBuffer) : bufferReleaser_(std::move(arrowBuffer)) {} void addRef() const {} void release() const {} private: const std::shared_ptr<arrow::Buffer> bufferReleaser_; }; BufferPtr wrapInBufferViewAsOwner(const void* buffer, size_t length, std::shared_ptr<arrow::Buffer> bufferReleaser) { return BufferView<BufferViewReleaser>::create( static_cast<const uint8_t*>(buffer), length, {std::move(bufferReleaser)}); } BufferPtr convertToVeloxBuffer(std::shared_ptr<arrow::Buffer> buffer) { if (buffer == nullptr) { return nullptr; } return wrapInBufferViewAsOwner(buffer->data(), buffer->size(), buffer); } template <TypeKind kind> VectorPtr readFlatVector( std::vector<BufferPtr>& buffers, int32_t& bufferIdx, uint32_t length, std::shared_ptr<const Type> type, memory::MemoryPool* pool) { auto nulls = buffers[bufferIdx++]; auto values = buffers[bufferIdx++]; std::vector<BufferPtr> stringBuffers; using T = typename TypeTraits<kind>::NativeType; if (nulls == nullptr || nulls->size() == 0) { return std::make_shared<FlatVector<T>>( pool, type, BufferPtr(nullptr), length, std::move(values), std::move(stringBuffers)); } return std::make_shared<FlatVector<T>>( pool, type, std::move(nulls), length, std::move(values), std::move(stringBuffers)); } template <> VectorPtr readFlatVector<TypeKind::HUGEINT>( std::vector<BufferPtr>& buffers, int32_t& bufferIdx, uint32_t length, std::shared_ptr<const Type> type, memory::MemoryPool* pool) { auto nulls = buffers[bufferIdx++]; auto valueBuffer = buffers[bufferIdx++]; // Because if buffer does not compress, it will get from netty, the address maynot aligned 16B, which will cause // int128_t = xxx coredump by instruction movdqa auto data = valueBuffer->as<int128_t>(); BufferPtr values; if ((reinterpret_cast<uintptr_t>(data) & 0xf) == 0) { values = valueBuffer; } else { values = AlignedBuffer::allocate<char>(valueBuffer->size(), pool); gluten::fastCopy(values->asMutable<char>(), valueBuffer->as<char>(), valueBuffer->size()); } std::vector<BufferPtr> stringBuffers; if (nulls == nullptr || nulls->size() == 0) { auto vp = std::make_shared<FlatVector<int128_t>>( pool, type, BufferPtr(nullptr), length, std::move(values), std::move(stringBuffers)); return vp; } return std::make_shared<FlatVector<int128_t>>( pool, type, std::move(nulls), length, std::move(values), std::move(stringBuffers)); } VectorPtr readFlatVectorStringView( std::vector<BufferPtr>& buffers, int32_t& bufferIdx, uint32_t length, std::shared_ptr<const Type> type, memory::MemoryPool* pool) { auto nulls = buffers[bufferIdx++]; auto lengthBuffer = buffers[bufferIdx++]; auto valueBuffer = buffers[bufferIdx++]; const auto* rawLength = lengthBuffer->as<BinaryArrayLengthBufferType>(); std::vector<BufferPtr> stringBuffers; auto values = AlignedBuffer::allocate<char>(sizeof(StringView) * length, pool); auto rawValues = values->asMutable<StringView>(); auto rawChars = valueBuffer->as<char>(); uint64_t offset = 0; for (int32_t i = 0; i < length; ++i) { rawValues[i] = StringView(rawChars + offset, rawLength[i]); offset += rawLength[i]; } stringBuffers.emplace_back(valueBuffer); if (nulls == nullptr || nulls->size() == 0) { return std::make_shared<FlatVector<StringView>>( pool, type, BufferPtr(nullptr), length, std::move(values), std::move(stringBuffers)); } return std::make_shared<FlatVector<StringView>>( pool, type, std::move(nulls), length, std::move(values), std::move(stringBuffers)); } template <> VectorPtr readFlatVector<TypeKind::VARCHAR>( std::vector<BufferPtr>& buffers, int32_t& bufferIdx, uint32_t length, std::shared_ptr<const Type> type, memory::MemoryPool* pool) { return readFlatVectorStringView(buffers, bufferIdx, length, type, pool); } template <> VectorPtr readFlatVector<TypeKind::VARBINARY>( std::vector<BufferPtr>& buffers, int32_t& bufferIdx, uint32_t length, std::shared_ptr<const Type> type, memory::MemoryPool* pool) { return readFlatVectorStringView(buffers, bufferIdx, length, type, pool); } std::unique_ptr<ByteInputStream> toByteStream(uint8_t* data, int32_t size) { std::vector<ByteRange> byteRanges; byteRanges.push_back(ByteRange{data, size, 0}); auto byteStream = std::make_unique<ByteInputStream>(byteRanges); return byteStream; } RowVectorPtr readComplexType(BufferPtr buffer, RowTypePtr& rowType, memory::MemoryPool* pool) { RowVectorPtr result; auto byteStream = toByteStream(const_cast<uint8_t*>(buffer->as<uint8_t>()), buffer->size()); auto serde = std::make_unique<serializer::presto::PrestoVectorSerde>(); serde->deserialize(byteStream.get(), pool, rowType, &result, /* serdeOptions */ nullptr); return result; } RowTypePtr getComplexWriteType(const std::vector<TypePtr>& types) { std::vector<std::string> complexTypeColNames; std::vector<TypePtr> complexTypeChildrens; for (int32_t i = 0; i < types.size(); ++i) { auto kind = types[i]->kind(); switch (kind) { case TypeKind::ROW: case TypeKind::MAP: case TypeKind::ARRAY: { complexTypeColNames.emplace_back(types[i]->name()); complexTypeChildrens.emplace_back(types[i]); } break; default: break; } } return std::make_shared<const RowType>(std::move(complexTypeColNames), std::move(complexTypeChildrens)); } void readColumns( std::vector<BufferPtr>& buffers, memory::MemoryPool* pool, uint32_t numRows, const std::vector<TypePtr>& types, std::vector<VectorPtr>& result) { int32_t bufferIdx = 0; std::vector<VectorPtr> complexChildren; auto complexRowType = getComplexWriteType(types); if (complexRowType->children().size() > 0) { complexChildren = readComplexType(buffers[buffers.size() - 1], complexRowType, pool)->children(); } int32_t complexIdx = 0; for (int32_t i = 0; i < types.size(); ++i) { auto kind = types[i]->kind(); switch (kind) { case TypeKind::ROW: case TypeKind::MAP: case TypeKind::ARRAY: { result.emplace_back(std::move(complexChildren[complexIdx])); complexIdx++; } break; default: { auto res = VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL( readFlatVector, types[i]->kind(), buffers, bufferIdx, numRows, types[i], pool); result.emplace_back(std::move(res)); } break; } } } RowVectorPtr deserialize(RowTypePtr type, uint32_t numRows, std::vector<BufferPtr>& buffers, memory::MemoryPool* pool) { std::vector<VectorPtr> children; auto childTypes = type->as<TypeKind::ROW>().children(); readColumns(buffers, pool, numRows, childTypes, children); return std::make_shared<RowVector>(pool, type, BufferPtr(nullptr), numRows, children); } std::shared_ptr<VeloxColumnarBatch> makeColumnarBatch( RowTypePtr type, uint32_t numRows, std::vector<std::shared_ptr<arrow::Buffer>> arrowBuffers, memory::MemoryPool* pool, int64_t& deserializeTime) { ScopedTimer timer(&deserializeTime); std::vector<BufferPtr> veloxBuffers; veloxBuffers.reserve(arrowBuffers.size()); for (auto& buffer : arrowBuffers) { veloxBuffers.push_back(convertToVeloxBuffer(std::move(buffer))); } auto rowVector = deserialize(type, numRows, veloxBuffers, pool); return std::make_shared<VeloxColumnarBatch>(std::move(rowVector)); } std::shared_ptr<VeloxColumnarBatch> makeColumnarBatch( RowTypePtr type, std::unique_ptr<InMemoryPayload> payload, memory::MemoryPool* pool, int64_t& deserializeTime) { ScopedTimer timer(&deserializeTime); std::vector<BufferPtr> veloxBuffers; auto numBuffers = payload->numBuffers(); veloxBuffers.reserve(numBuffers); for (size_t i = 0; i < numBuffers; ++i) { GLUTEN_ASSIGN_OR_THROW(auto buffer, payload->readBufferAt(i)); veloxBuffers.push_back(convertToVeloxBuffer(std::move(buffer))); } auto rowVector = deserialize(type, payload->numRows(), veloxBuffers, pool); return std::make_shared<VeloxColumnarBatch>(std::move(rowVector)); } std::shared_ptr<arrow::Buffer> readColumnBuffer(const arrow::RecordBatch& batch, int32_t fieldIdx) { return std::dynamic_pointer_cast<arrow::LargeStringArray>(batch.column(fieldIdx))->value_data(); } void getUncompressedBuffersOneByOne( arrow::MemoryPool* arrowPool, arrow::util::Codec* codec, const int64_t* lengthPtr, std::shared_ptr<arrow::Buffer> valueBuffer, std::vector<BufferPtr>& buffers) { int64_t valueOffset = 0; auto valueBufferLength = lengthPtr[0]; for (int64_t i = 0, j = 1; i < valueBufferLength; i++, j = j + 2) { int64_t uncompressLength = lengthPtr[j]; int64_t compressLength = lengthPtr[j + 1]; auto compressBuffer = arrow::SliceBuffer(valueBuffer, valueOffset, compressLength); valueOffset += compressLength; // Small buffer, not compressed if (uncompressLength == -1) { buffers.emplace_back(convertToVeloxBuffer(compressBuffer)); } else { std::shared_ptr<arrow::Buffer> uncompressBuffer = std::make_shared<arrow::Buffer>(nullptr, 0); if (uncompressLength != 0) { GLUTEN_ASSIGN_OR_THROW(uncompressBuffer, arrow::AllocateBuffer(uncompressLength, arrowPool)); GLUTEN_ASSIGN_OR_THROW( auto actualDecompressLength, codec->Decompress( compressLength, compressBuffer->data(), uncompressLength, uncompressBuffer->mutable_data())); VELOX_DCHECK_EQ(actualDecompressLength, uncompressLength); // Prevent unused variable warning in optimized build. ((void)actualDecompressLength); } buffers.emplace_back(convertToVeloxBuffer(uncompressBuffer)); } } } void getUncompressedBuffersStream( arrow::MemoryPool* arrowPool, arrow::util::Codec* codec, const int64_t* lengthPtr, std::shared_ptr<arrow::Buffer> compressBuffer, std::vector<BufferPtr>& buffers) { int64_t uncompressLength = lengthPtr[0]; int64_t compressLength = lengthPtr[1]; auto valueBufferLength = lengthPtr[2]; if (uncompressLength != -1) { std::shared_ptr<arrow::Buffer> uncompressBuffer; GLUTEN_ASSIGN_OR_THROW(uncompressBuffer, arrow::AllocateBuffer(uncompressLength, arrowPool)); GLUTEN_ASSIGN_OR_THROW( auto actualDecompressLength, codec->Decompress(compressLength, compressBuffer->data(), uncompressLength, uncompressBuffer->mutable_data())); VELOX_DCHECK_EQ(actualDecompressLength, uncompressLength); const std::shared_ptr<arrow::Buffer> kNullBuffer = std::make_shared<arrow::Buffer>(nullptr, 0); int64_t bufferOffset = 0; for (int64_t i = 3; i < valueBufferLength + 3; i++) { if (lengthPtr[i] == 0) { buffers.emplace_back(convertToVeloxBuffer(kNullBuffer)); } else { auto uncompressBufferSlice = arrow::SliceBuffer(uncompressBuffer, bufferOffset, lengthPtr[i]); buffers.emplace_back(convertToVeloxBuffer(uncompressBufferSlice)); bufferOffset += lengthPtr[i]; } } } else { const std::shared_ptr<arrow::Buffer> kNullBuffer = std::make_shared<arrow::Buffer>(nullptr, 0); int64_t bufferOffset = 0; for (int64_t i = 3; i < valueBufferLength + 3; i++) { if (lengthPtr[i] == 0) { buffers.emplace_back(convertToVeloxBuffer(kNullBuffer)); } else { auto uncompressBufferSlice = arrow::SliceBuffer(compressBuffer, bufferOffset, lengthPtr[i]); buffers.emplace_back(convertToVeloxBuffer(uncompressBufferSlice)); bufferOffset += lengthPtr[i]; } } } } void getUncompressedBuffers( const arrow::RecordBatch& batch, arrow::MemoryPool* arrowPool, arrow::util::Codec* codec, std::vector<BufferPtr>& buffers) { // Get compression mode from first byte. auto lengthBuffer = readColumnBuffer(batch, 1); auto lengthBufferPtr = reinterpret_cast<const int64_t*>(lengthBuffer->data()); auto compressionMode = (CompressionMode)(*lengthBufferPtr++); auto valueBuffer = readColumnBuffer(batch, 2); if (compressionMode == CompressionMode::BUFFER) { getUncompressedBuffersOneByOne(arrowPool, codec, lengthBufferPtr, valueBuffer, buffers); } else { getUncompressedBuffersStream(arrowPool, codec, lengthBufferPtr, valueBuffer, buffers); } } RowVectorPtr readRowVector( const arrow::RecordBatch& batch, RowTypePtr rowType, CodecBackend codecBackend, int64_t& decompressTime, int64_t& deserializeTime, arrow::MemoryPool* arrowPool, memory::MemoryPool* pool) { auto header = readColumnBuffer(batch, 0); uint32_t length; memcpy(&length, header->data(), sizeof(uint32_t)); int32_t compressTypeValue; memcpy(&compressTypeValue, header->data() + sizeof(uint32_t), sizeof(int32_t)); arrow::Compression::type compressType = static_cast<arrow::Compression::type>(compressTypeValue); std::vector<BufferPtr> buffers; buffers.reserve(batch.num_columns() * 2); if (compressType == arrow::Compression::type::UNCOMPRESSED) { for (int32_t i = 0; i < batch.num_columns() - 1; i++) { auto buffer = readColumnBuffer(batch, i + 1); buffers.emplace_back(convertToVeloxBuffer(buffer)); } } else { TIME_NANO_START(decompressTime); auto codec = createArrowIpcCodec(compressType, codecBackend); getUncompressedBuffers(batch, arrowPool, codec.get(), buffers); TIME_NANO_END(decompressTime); } TIME_NANO_START(deserializeTime); auto rv = deserialize(rowType, length, buffers, pool); TIME_NANO_END(deserializeTime); return rv; } std::string getCodecBackend(CodecBackend type) { if (type == CodecBackend::QAT) { return "QAT"; } else if (type == CodecBackend::IAA) { return "IAA"; } else { return "NONE"; } } std::string getCompressionType(arrow::Compression::type type) { if (type == arrow::Compression::UNCOMPRESSED) { return "UNCOMPRESSED"; } else if (type == arrow::Compression::LZ4_FRAME) { return "LZ4_FRAME"; } else if (type == arrow::Compression::ZSTD) { return "ZSTD"; } else if (type == arrow::Compression::GZIP) { return "GZIP"; } else { return "UNKNOWN"; } } } // namespace VeloxShuffleReader::VeloxShuffleReader(std::unique_ptr<DeserializerFactory> factory) : ShuffleReader(std::move(factory)) {} VeloxColumnarBatchDeserializer::VeloxColumnarBatchDeserializer( std::shared_ptr<arrow::io::InputStream> in, const std::shared_ptr<arrow::Schema>& schema, const std::shared_ptr<arrow::util::Codec>& codec, const facebook::velox::RowTypePtr& rowType, int32_t batchSize, arrow::MemoryPool* memoryPool, facebook::velox::memory::MemoryPool* veloxPool, std::vector<bool>* isValidityBuffer, bool hasComplexType, int64_t& deserializeTime, int64_t& decompressTime) : in_(std::move(in)), schema_(schema), codec_(codec), rowType_(rowType), batchSize_(batchSize), memoryPool_(memoryPool), veloxPool_(veloxPool), isValidityBuffer_(isValidityBuffer), hasComplexType_(hasComplexType), deserializeTime_(deserializeTime), decompressTime_(decompressTime) {} std::shared_ptr<ColumnarBatch> VeloxColumnarBatchDeserializer::next() { if (hasComplexType_) { uint32_t numRows; GLUTEN_ASSIGN_OR_THROW( auto arrowBuffers, BlockPayload::deserialize(in_.get(), schema_, codec_, memoryPool_, numRows, decompressTime_)); if (numRows == 0) { // Reach EOS. return nullptr; } return makeColumnarBatch(rowType_, numRows, std::move(arrowBuffers), veloxPool_, deserializeTime_); } if (reachEos_) { if (merged_) { return makeColumnarBatch(rowType_, std::move(merged_), veloxPool_, deserializeTime_); } return nullptr; } std::vector<std::shared_ptr<arrow::Buffer>> arrowBuffers{}; uint32_t numRows = 0; while (!merged_ || merged_->numRows() < batchSize_) { GLUTEN_ASSIGN_OR_THROW( arrowBuffers, BlockPayload::deserialize(in_.get(), schema_, codec_, memoryPool_, numRows, decompressTime_)); if (numRows == 0) { reachEos_ = true; break; } if (!merged_) { merged_ = std::make_unique<InMemoryPayload>(numRows, isValidityBuffer_, std::move(arrowBuffers)); arrowBuffers.clear(); continue; } auto mergedRows = merged_->numRows() + numRows; if (mergedRows > batchSize_) { break; } auto append = std::make_unique<InMemoryPayload>(numRows, isValidityBuffer_, std::move(arrowBuffers)); GLUTEN_ASSIGN_OR_THROW(merged_, InMemoryPayload::merge(std::move(merged_), std::move(append), memoryPool_)); arrowBuffers.clear(); } // Reach EOS. if (reachEos_ && !merged_) { return nullptr; } auto columnarBatch = makeColumnarBatch(rowType_, std::move(merged_), veloxPool_, deserializeTime_); // Save remaining rows. if (!arrowBuffers.empty()) { merged_ = std::make_unique<InMemoryPayload>(numRows, isValidityBuffer_, std::move(arrowBuffers)); } return columnarBatch; } VeloxColumnarBatchDeserializerFactory::VeloxColumnarBatchDeserializerFactory( const std::shared_ptr<arrow::Schema>& schema, const std::shared_ptr<arrow::util::Codec>& codec, const RowTypePtr& rowType, int32_t batchSize, arrow::MemoryPool* memoryPool, std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool) : schema_(schema), codec_(codec), rowType_(rowType), batchSize_(batchSize), memoryPool_(memoryPool), veloxPool_(veloxPool) { initFromSchema(); } std::unique_ptr<ColumnarBatchIterator> VeloxColumnarBatchDeserializerFactory::createDeserializer( std::shared_ptr<arrow::io::InputStream> in) { return std::make_unique<VeloxColumnarBatchDeserializer>( std::move(in), schema_, codec_, rowType_, batchSize_, memoryPool_, veloxPool_.get(), &isValidityBuffer_, hasComplexType_, deserializeTime_, decompressTime_); } arrow::MemoryPool* VeloxColumnarBatchDeserializerFactory::getPool() { return memoryPool_; } int64_t VeloxColumnarBatchDeserializerFactory::getDecompressTime() { return decompressTime_; } int64_t VeloxColumnarBatchDeserializerFactory::getDeserializeTime() { return deserializeTime_; } void VeloxColumnarBatchDeserializerFactory::initFromSchema() { GLUTEN_ASSIGN_OR_THROW(auto arrowColumnTypes, toShuffleTypeId(schema_->fields())); isValidityBuffer_.reserve(arrowColumnTypes.size()); for (size_t i = 0; i < arrowColumnTypes.size(); ++i) { switch (arrowColumnTypes[i]->id()) { case arrow::BinaryType::type_id: case arrow::StringType::type_id: { isValidityBuffer_.push_back(true); isValidityBuffer_.push_back(false); isValidityBuffer_.push_back(false); } break; case arrow::StructType::type_id: case arrow::MapType::type_id: case arrow::ListType::type_id: { hasComplexType_ = true; } break; case arrow::BooleanType::type_id: { isValidityBuffer_.push_back(true); isValidityBuffer_.push_back(true); } break; default: { isValidityBuffer_.push_back(true); isValidityBuffer_.push_back(false); } break; } } } } // namespace gluten