cpp/velox/shuffle/VeloxHashShuffleWriter.cc (1,190 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "VeloxHashShuffleWriter.h" #include "memory/ArrowMemory.h" #include "memory/VeloxColumnarBatch.h" #include "memory/VeloxMemoryManager.h" #include "shuffle/ShuffleSchema.h" #include "shuffle/Utils.h" #include "utils/Common.h" #include "utils/Macros.h" #include "utils/VeloxArrowUtils.h" #include "velox/buffer/Buffer.h" #include "velox/common/base/Nulls.h" #include "velox/type/HugeInt.h" #include "velox/type/Timestamp.h" #include "velox/type/Type.h" #include "velox/vector/BaseVector.h" #include "velox/vector/ComplexVector.h" #if defined(__x86_64__) #include <immintrin.h> #include <x86intrin.h> #elif defined(__aarch64__) #include <arm_neon.h> #endif namespace gluten { #define VELOX_SHUFFLE_WRITER_LOG_FLAG 0 // macro to rotate left an 8-bit value 'x' given the shift 's' is a 32-bit integer // (x is left shifted by 's' modulo 8) OR (x right shifted by (8 - 's' modulo 8)) #if !defined(__x86_64__) #define rotateLeft(x, s) (x << (s - ((s >> 3) << 3)) | x >> (8 - (s - ((s >> 3) << 3)))) #endif // on x86 machines, _MM_HINT_T0,T1,T2 are defined as 1, 2, 3 // equivalent mapping to __builtin_prefetch hints is 3, 2, 1 #if defined(__x86_64__) #define PREFETCHT0(ptr) _mm_prefetch(ptr, _MM_HINT_T0) #define PREFETCHT1(ptr) _mm_prefetch(ptr, _MM_HINT_T1) #define PREFETCHT2(ptr) _mm_prefetch(ptr, _MM_HINT_T2) #else #define PREFETCHT0(ptr) __builtin_prefetch(ptr, 0, 3) #define PREFETCHT1(ptr) __builtin_prefetch(ptr, 0, 2) #define PREFETCHT2(ptr) __builtin_prefetch(ptr, 0, 1) #endif namespace { bool vectorHasNull(const facebook::velox::VectorPtr& vp) { if (!vp->mayHaveNulls()) { return false; } return vp->countNulls(vp->nulls(), vp->size()) != 0; } class BinaryArrayResizeGuard { public: explicit BinaryArrayResizeGuard(BinaryArrayResizeState& state) : state_(state) { state_.inResize = true; } ~BinaryArrayResizeGuard() { state_.inResize = false; } private: BinaryArrayResizeState& state_; }; template <facebook::velox::TypeKind kind> arrow::Status collectFlatVectorBuffer( facebook::velox::BaseVector* vector, std::vector<std::shared_ptr<arrow::Buffer>>& buffers, arrow::MemoryPool* pool) { using T = typename facebook::velox::TypeTraits<kind>::NativeType; auto flatVector = dynamic_cast<const facebook::velox::FlatVector<T>*>(vector); buffers.emplace_back(); ARROW_ASSIGN_OR_RAISE(buffers.back(), toArrowBuffer(flatVector->nulls(), pool)); buffers.emplace_back(); ARROW_ASSIGN_OR_RAISE(buffers.back(), toArrowBuffer(flatVector->values(), pool)); return arrow::Status::OK(); } arrow::Status collectFlatVectorBufferStringView( facebook::velox::BaseVector* vector, std::vector<std::shared_ptr<arrow::Buffer>>& buffers, arrow::MemoryPool* pool) { auto flatVector = dynamic_cast<const facebook::velox::FlatVector<facebook::velox::StringView>*>(vector); buffers.emplace_back(); ARROW_ASSIGN_OR_RAISE(buffers.back(), toArrowBuffer(flatVector->nulls(), pool)); auto rawValues = flatVector->rawValues(); // last offset is the totalStringSize auto lengthBufferSize = sizeof(gluten::BinaryArrayLengthBufferType) * flatVector->size(); ARROW_ASSIGN_OR_RAISE(auto lengthBuffer, arrow::AllocateResizableBuffer(lengthBufferSize, pool)); auto* rawLength = reinterpret_cast<gluten::BinaryArrayLengthBufferType*>(lengthBuffer->mutable_data()); uint64_t offset = 0; for (int32_t i = 0; i < flatVector->size(); i++) { auto length = rawValues[i].size(); *rawLength++ = length; offset += length; } buffers.push_back(std::move(lengthBuffer)); ARROW_ASSIGN_OR_RAISE(auto valueBuffer, arrow::AllocateResizableBuffer(offset, pool)); auto raw = reinterpret_cast<char*>(valueBuffer->mutable_data()); for (int32_t i = 0; i < flatVector->size(); i++) { gluten::fastCopy(raw, rawValues[i].data(), rawValues[i].size()); raw += rawValues[i].size(); } buffers.push_back(std::move(valueBuffer)); return arrow::Status::OK(); } template <> arrow::Status collectFlatVectorBuffer<facebook::velox::TypeKind::UNKNOWN>( facebook::velox::BaseVector* vector, std::vector<std::shared_ptr<arrow::Buffer>>& buffers, arrow::MemoryPool* pool) { return arrow::Status::OK(); } template <> arrow::Status collectFlatVectorBuffer<facebook::velox::TypeKind::VARCHAR>( facebook::velox::BaseVector* vector, std::vector<std::shared_ptr<arrow::Buffer>>& buffers, arrow::MemoryPool* pool) { return collectFlatVectorBufferStringView(vector, buffers, pool); } template <> arrow::Status collectFlatVectorBuffer<facebook::velox::TypeKind::VARBINARY>( facebook::velox::BaseVector* vector, std::vector<std::shared_ptr<arrow::Buffer>>& buffers, arrow::MemoryPool* pool) { return collectFlatVectorBufferStringView(vector, buffers, pool); } } // namespace arrow::Result<std::shared_ptr<VeloxShuffleWriter>> VeloxHashShuffleWriter::create( uint32_t numPartitions, std::unique_ptr<PartitionWriter> partitionWriter, ShuffleWriterOptions options, std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool, arrow::MemoryPool* arrowPool) { std::shared_ptr<VeloxHashShuffleWriter> res( new VeloxHashShuffleWriter(numPartitions, std::move(partitionWriter), std::move(options), veloxPool, arrowPool)); RETURN_NOT_OK(res->init()); return res; } // namespace gluten arrow::Status VeloxHashShuffleWriter::init() { // pre-allocated buffer size for each partition, unit is row count // when partitioner is SinglePart, partial variables don`t need init if (options_.partitioning != Partitioning::kSingle) { partition2RowCount_.resize(numPartitions_); partitionBufferSize_.resize(numPartitions_); partition2RowOffsetBase_.resize(numPartitions_ + 1); } partitionBufferBase_.resize(numPartitions_); return arrow::Status::OK(); } arrow::Status VeloxHashShuffleWriter::initPartitions() { auto simpleColumnCount = simpleColumnIndices_.size(); partitionValidityAddrs_.resize(simpleColumnCount); std::for_each(partitionValidityAddrs_.begin(), partitionValidityAddrs_.end(), [this](std::vector<uint8_t*>& v) { v.resize(numPartitions_, nullptr); }); partitionFixedWidthValueAddrs_.resize(fixedWidthColumnCount_); std::for_each( partitionFixedWidthValueAddrs_.begin(), partitionFixedWidthValueAddrs_.end(), [this](std::vector<uint8_t*>& v) { v.resize(numPartitions_, nullptr); }); partitionBuffers_.resize(simpleColumnCount); std::for_each(partitionBuffers_.begin(), partitionBuffers_.end(), [this](auto& v) { v.resize(numPartitions_); }); partitionBinaryAddrs_.resize(binaryColumnIndices_.size()); std::for_each(partitionBinaryAddrs_.begin(), partitionBinaryAddrs_.end(), [this](std::vector<BinaryBuf>& v) { v.resize(numPartitions_); }); return arrow::Status::OK(); } void VeloxHashShuffleWriter::setPartitionBufferSize(uint32_t newSize) { options_.bufferSize = newSize; } arrow::Result<std::shared_ptr<arrow::Buffer>> VeloxHashShuffleWriter::generateComplexTypeBuffers( facebook::velox::RowVectorPtr vector) { auto arena = std::make_unique<facebook::velox::StreamArena>(veloxPool_.get()); auto serializer = serde_.createIterativeSerializer(asRowType(vector->type()), vector->size(), arena.get(), &serdeOptions_); const facebook::velox::IndexRange allRows{0, vector->size()}; serializer->append(vector, folly::Range(&allRows, 1)); auto serializedSize = serializer->maxSerializedSize(); auto flushBuffer = complexTypeFlushBuffer_[0]; if (flushBuffer == nullptr) { ARROW_ASSIGN_OR_RAISE(flushBuffer, arrow::AllocateResizableBuffer(serializedSize, partitionBufferPool_.get())); } else if (serializedSize > flushBuffer->capacity()) { RETURN_NOT_OK(flushBuffer->Reserve(serializedSize)); } auto valueBuffer = arrow::SliceMutableBuffer(flushBuffer, 0, serializedSize); auto output = std::make_shared<arrow::io::FixedSizeBufferWriter>(valueBuffer); facebook::velox::serializer::presto::PrestoOutputStreamListener listener; ArrowFixedSizeBufferOutputStream out(output, &listener); serializer->flush(&out); return valueBuffer; } arrow::Status VeloxHashShuffleWriter::write(std::shared_ptr<ColumnarBatch> cb, int64_t memLimit) { if (options_.partitioning == Partitioning::kSingle) { auto veloxColumnBatch = VeloxColumnarBatch::from(veloxPool_.get(), cb); VELOX_CHECK_NOT_NULL(veloxColumnBatch); auto& rv = *veloxColumnBatch->getFlattenedRowVector(); RETURN_NOT_OK(initFromRowVector(rv)); std::vector<std::shared_ptr<arrow::Buffer>> buffers; std::vector<facebook::velox::VectorPtr> complexChildren; for (auto& child : rv.children()) { if (child->encoding() == facebook::velox::VectorEncoding::Simple::FLAT) { auto status = VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL( collectFlatVectorBuffer, child->typeKind(), child.get(), buffers, partitionBufferPool_.get()); RETURN_NOT_OK(status); } else { complexChildren.emplace_back(child); } } if (complexChildren.size() > 0) { auto rowVector = std::make_shared<facebook::velox::RowVector>( veloxPool_.get(), complexWriteType_, facebook::velox::BufferPtr(nullptr), rv.size(), std::move(complexChildren)); buffers.emplace_back(); ARROW_ASSIGN_OR_RAISE(buffers.back(), generateComplexTypeBuffers(rowVector)); } RETURN_NOT_OK(evictBuffers(0, rv.size(), std::move(buffers), false)); } else if (options_.partitioning == Partitioning::kRange) { auto veloxColumnBatch = VeloxColumnarBatch::from(veloxPool_.get(), cb); VELOX_CHECK_NOT_NULL(veloxColumnBatch); const int32_t numColumns = veloxColumnBatch->numColumns(); VELOX_CHECK(numColumns >= 2); auto pidBatch = veloxColumnBatch->select(veloxPool_.get(), {0}); auto pidArr = getFirstColumn(*(pidBatch->getRowVector())); START_TIMING(cpuWallTimingList_[CpuWallTimingCompute]); std::fill(std::begin(partition2RowCount_), std::end(partition2RowCount_), 0); RETURN_NOT_OK(partitioner_->compute(pidArr, pidBatch->numRows(), row2Partition_)); for (auto& pid : row2Partition_) { partition2RowCount_[pid]++; } END_TIMING(); std::vector<int32_t> range; for (int32_t i = 1; i < numColumns; i++) { range.push_back(i); } auto rvBatch = veloxColumnBatch->select(veloxPool_.get(), range); auto& rv = *rvBatch->getFlattenedRowVector(); RETURN_NOT_OK(initFromRowVector(rv)); RETURN_NOT_OK(doSplit(rv, memLimit)); } else { auto veloxColumnBatch = VeloxColumnarBatch::from(veloxPool_.get(), cb); VELOX_CHECK_NOT_NULL(veloxColumnBatch); facebook::velox::RowVectorPtr rv; START_TIMING(cpuWallTimingList_[CpuWallTimingFlattenRV]); rv = veloxColumnBatch->getFlattenedRowVector(); END_TIMING(); if (isExtremelyLargeBatch(rv)) { auto numRows = rv->size(); int32_t offset = 0; do { auto length = std::min(maxBatchSize_, numRows); auto slicedBatch = std::dynamic_pointer_cast<facebook::velox::RowVector>(rv->slice(offset, length)); RETURN_NOT_OK(partitioningAndDoSplit(std::move(slicedBatch), memLimit)); offset += length; numRows -= length; } while (numRows); } else { RETURN_NOT_OK(partitioningAndDoSplit(std::move(rv), memLimit)); } } return arrow::Status::OK(); } arrow::Status VeloxHashShuffleWriter::partitioningAndDoSplit(facebook::velox::RowVectorPtr rv, int64_t memLimit) { std::fill(std::begin(partition2RowCount_), std::end(partition2RowCount_), 0); if (partitioner_->hasPid()) { auto pidArr = getFirstColumn(*rv); START_TIMING(cpuWallTimingList_[CpuWallTimingCompute]); RETURN_NOT_OK(partitioner_->compute(pidArr, rv->size(), row2Partition_)); for (auto& pid : row2Partition_) { partition2RowCount_[pid]++; } END_TIMING(); auto strippedRv = getStrippedRowVector(*rv); RETURN_NOT_OK(initFromRowVector(*strippedRv)); RETURN_NOT_OK(doSplit(*strippedRv, memLimit)); } else { RETURN_NOT_OK(initFromRowVector(*rv)); START_TIMING(cpuWallTimingList_[CpuWallTimingCompute]); RETURN_NOT_OK(partitioner_->compute(nullptr, rv->size(), row2Partition_)); for (auto& pid : row2Partition_) { partition2RowCount_[pid]++; } END_TIMING(); RETURN_NOT_OK(doSplit(*rv, memLimit)); } return arrow::Status::OK(); } arrow::Status VeloxHashShuffleWriter::stop() { setSplitState(SplitState::kStopEvict); if (options_.partitioning != Partitioning::kSingle) { for (auto pid = 0; pid < numPartitions_; ++pid) { PartitionBufferGuard guard(partitionBufferInUse_, pid); RETURN_NOT_OK(evictPartitionBuffers(pid, false)); } } { SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingStop]); setSplitState(SplitState::kStop); RETURN_NOT_OK(partitionWriter_->stop(&metrics_)); partitionBuffers_.clear(); } stat(); return arrow::Status::OK(); } arrow::Status VeloxHashShuffleWriter::buildPartition2Row(uint32_t rowNum) { SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingBuildPartition]); // calc partition2RowOffsetBase_ partition2RowOffsetBase_[0] = 0; for (auto pid = 1; pid <= numPartitions_; ++pid) { partition2RowOffsetBase_[pid] = partition2RowOffsetBase_[pid - 1] + partition2RowCount_[pid - 1]; } // calc rowOffset2RowId_ rowOffset2RowId_.resize(rowNum); for (auto row = 0; row < rowNum; ++row) { auto pid = row2Partition_[row]; rowOffset2RowId_[partition2RowOffsetBase_[pid]++] = row; } for (auto pid = 0; pid < numPartitions_; ++pid) { partition2RowOffsetBase_[pid] -= partition2RowCount_[pid]; } // calc valid partition list partitionUsed_.clear(); for (auto pid = 0; pid != numPartitions_; ++pid) { if (partition2RowCount_[pid] > 0) { partitionUsed_.push_back(pid); } } printPartition2Row(); return arrow::Status::OK(); } arrow::Status VeloxHashShuffleWriter::updateInputHasNull(const facebook::velox::RowVector& rv) { SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingHasNull]); for (size_t col = 0; col < simpleColumnIndices_.size(); ++col) { if (!inputHasNull_[col]) { auto colIdx = simpleColumnIndices_[col]; if (vectorHasNull(rv.childAt(colIdx))) { inputHasNull_[col] = true; } } } printInputHasNull(); return arrow::Status::OK(); } void VeloxHashShuffleWriter::setSplitState(SplitState state) { splitState_ = state; } arrow::Status VeloxHashShuffleWriter::doSplit(const facebook::velox::RowVector& rv, int64_t memLimit) { auto rowNum = rv.size(); RETURN_NOT_OK(buildPartition2Row(rowNum)); RETURN_NOT_OK(updateInputHasNull(rv)); START_TIMING(cpuWallTimingList_[CpuWallTimingIteratePartitions]); setSplitState(SplitState::kPreAlloc); // Calculate buffer size based on available offheap memory, history average bytes per row and options_.bufferSize. auto preAllocBufferSize = calculatePartitionBufferSize(rv, memLimit); RETURN_NOT_OK(preAllocPartitionBuffers(preAllocBufferSize)); END_TIMING(); printPartitionBuffer(); setSplitState(SplitState::kSplit); RETURN_NOT_OK(splitRowVector(rv)); printPartitionBuffer(); setSplitState(SplitState::kInit); return arrow::Status::OK(); } arrow::Status VeloxHashShuffleWriter::splitRowVector(const facebook::velox::RowVector& rv) { SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingSplitRV]); // now start to split the RowVector RETURN_NOT_OK(splitFixedWidthValueBuffer(rv)); RETURN_NOT_OK(splitValidityBuffer(rv)); RETURN_NOT_OK(splitBinaryArray(rv)); RETURN_NOT_OK(splitComplexType(rv)); // update partition buffer base after split for (auto pid = 0; pid < numPartitions_; ++pid) { partitionBufferBase_[pid] += partition2RowCount_[pid]; } return arrow::Status::OK(); } arrow::Status VeloxHashShuffleWriter::splitFixedWidthValueBuffer(const facebook::velox::RowVector& rv) { for (auto col = 0; col < fixedWidthColumnCount_; ++col) { auto colIdx = simpleColumnIndices_[col]; auto& column = rv.childAt(colIdx); const uint8_t* srcAddr = (const uint8_t*)column->valuesAsVoid(); const auto& dstAddrs = partitionFixedWidthValueAddrs_[col]; switch (arrow::bit_width(arrowColumnTypes_[colIdx]->id())) { case 0: // arrow::NullType::type_id: // No value buffer created for NullType. break; case 1: // arrow::BooleanType::type_id: RETURN_NOT_OK(splitBoolType(srcAddr, dstAddrs)); break; case 8: RETURN_NOT_OK(splitFixedType<uint8_t>(srcAddr, dstAddrs)); break; case 16: RETURN_NOT_OK(splitFixedType<uint16_t>(srcAddr, dstAddrs)); break; case 32: RETURN_NOT_OK(splitFixedType<uint32_t>(srcAddr, dstAddrs)); break; case 64: { if (column->type()->kind() == facebook::velox::TypeKind::TIMESTAMP) { RETURN_NOT_OK(splitFixedType<facebook::velox::int128_t>(srcAddr, dstAddrs)); } else { RETURN_NOT_OK(splitFixedType<uint64_t>(srcAddr, dstAddrs)); } } break; case 128: // arrow::Decimal128Type::type_id // too bad gcc generates movdqa even we use __m128i_u data type. // splitFixedType<__m128i_u>(srcAddr, dstAddrs); { if (column->type()->isShortDecimal()) { RETURN_NOT_OK(splitFixedType<int64_t>(srcAddr, dstAddrs)); } else if (column->type()->isLongDecimal()) { // assume batch size = 32k; reducer# = 4K; row/reducer = 8 RETURN_NOT_OK(splitFixedType<facebook::velox::int128_t>(srcAddr, dstAddrs)); } else { return arrow::Status::Invalid( "Column type " + schema_->field(colIdx)->type()->ToString() + " is not supported."); } } break; default: return arrow::Status::Invalid( "Column type " + schema_->field(colIdx)->type()->ToString() + " is not fixed width"); } } return arrow::Status::OK(); } arrow::Status VeloxHashShuffleWriter::splitBoolType(const uint8_t* srcAddr, const std::vector<uint8_t*>& dstAddrs) { // assume batch size = 32k; reducer# = 4K; row/reducer = 8 for (auto& pid : partitionUsed_) { // set the last byte auto dstaddr = dstAddrs[pid]; if (dstaddr != nullptr) { auto r = partition2RowOffsetBase_[pid]; /*8k*/ auto size = partition2RowOffsetBase_[pid + 1]; auto dstOffset = partitionBufferBase_[pid]; auto dstOffsetInByte = (8 - (dstOffset & 0x7)) & 0x7; auto dstIdxByte = dstOffsetInByte; auto dst = dstaddr[dstOffset >> 3]; for (; r < size && dstIdxByte > 0; r++, dstIdxByte--) { auto srcOffset = rowOffset2RowId_[r]; /*16k*/ auto src = srcAddr[srcOffset >> 3]; src = src >> (srcOffset & 7) | 0xfe; // get the bit in bit 0, other bits set to 1 #if defined(__x86_64__) src = __rolb(src, 8 - dstIdxByte); #else src = rotateLeft(src, (8 - dstIdxByte)); #endif dst = dst & src; // only take the useful bit. } dstaddr[dstOffset >> 3] = dst; if (r == size) { continue; } dstOffset += dstOffsetInByte; // now dst_offset is 8 aligned for (; r + 8 < size; r += 8) { uint8_t src = 0; auto srcOffset = rowOffset2RowId_[r]; /*16k*/ src = srcAddr[srcOffset >> 3]; // PREFETCHT0((&(srcAddr)[(srcOffset >> 3) + 64])); dst = src >> (srcOffset & 7) | 0xfe; // get the bit in bit 0, other bits set to 1 srcOffset = rowOffset2RowId_[r + 1]; /*16k*/ src = srcAddr[srcOffset >> 3]; dst &= src >> (srcOffset & 7) << 1 | 0xfd; // get the bit in bit 0, other bits set to 1 srcOffset = rowOffset2RowId_[r + 2]; /*16k*/ src = srcAddr[srcOffset >> 3]; dst &= src >> (srcOffset & 7) << 2 | 0xfb; // get the bit in bit 0, other bits set to 1 srcOffset = rowOffset2RowId_[r + 3]; /*16k*/ src = srcAddr[srcOffset >> 3]; dst &= src >> (srcOffset & 7) << 3 | 0xf7; // get the bit in bit 0, other bits set to 1 srcOffset = rowOffset2RowId_[r + 4]; /*16k*/ src = srcAddr[srcOffset >> 3]; dst &= src >> (srcOffset & 7) << 4 | 0xef; // get the bit in bit 0, other bits set to 1 srcOffset = rowOffset2RowId_[r + 5]; /*16k*/ src = srcAddr[srcOffset >> 3]; dst &= src >> (srcOffset & 7) << 5 | 0xdf; // get the bit in bit 0, other bits set to 1 srcOffset = rowOffset2RowId_[r + 6]; /*16k*/ src = srcAddr[srcOffset >> 3]; dst &= src >> (srcOffset & 7) << 6 | 0xbf; // get the bit in bit 0, other bits set to 1 srcOffset = rowOffset2RowId_[r + 7]; /*16k*/ src = srcAddr[srcOffset >> 3]; dst &= src >> (srcOffset & 7) << 7 | 0x7f; // get the bit in bit 0, other bits set to 1 dstaddr[dstOffset >> 3] = dst; dstOffset += 8; //_mm_prefetch(dstaddr + (dst_offset >> 3) + 64, _MM_HINT_T0); } // last byte, set it to 0xff is ok dst = 0xff; dstIdxByte = 0; for (; r < size; r++, dstIdxByte++) { auto srcOffset = rowOffset2RowId_[r]; /*16k*/ auto src = srcAddr[srcOffset >> 3]; src = src >> (srcOffset & 7) | 0xfe; // get the bit in bit 0, other bits set to 1 #if defined(__x86_64__) src = __rolb(src, dstIdxByte); #else src = rotateLeft(src, dstIdxByte); #endif dst = dst & src; // only take the useful bit. } dstaddr[dstOffset >> 3] = dst; } } return arrow::Status::OK(); } arrow::Status VeloxHashShuffleWriter::splitValidityBuffer(const facebook::velox::RowVector& rv) { for (size_t col = 0; col < simpleColumnIndices_.size(); ++col) { auto colIdx = simpleColumnIndices_[col]; auto& column = rv.childAt(colIdx); if (vectorHasNull(column)) { auto& dstAddrs = partitionValidityAddrs_[col]; for (auto& pid : partitionUsed_) { if (dstAddrs[pid] == nullptr) { // Init bitmap if it's null. ARROW_ASSIGN_OR_RAISE( auto validityBuffer, arrow::AllocateResizableBuffer( arrow::bit_util::BytesForBits(partitionBufferSize_[pid]), partitionBufferPool_.get())); dstAddrs[pid] = const_cast<uint8_t*>(validityBuffer->data()); memset(validityBuffer->mutable_data(), 0xff, validityBuffer->capacity()); partitionBuffers_[col][pid][kValidityBufferIndex] = std::move(validityBuffer); } } auto srcAddr = (const uint8_t*)(column->mutableRawNulls()); RETURN_NOT_OK(splitBoolType(srcAddr, dstAddrs)); } else { VsPrintLF(colIdx, " column hasn't null"); } } return arrow::Status::OK(); } arrow::Status VeloxHashShuffleWriter::splitBinaryType( uint32_t binaryIdx, const facebook::velox::FlatVector<facebook::velox::StringView>& src, std::vector<BinaryBuf>& dst) { const auto* srcRawValues = src.rawValues(); const auto* srcRawNulls = src.rawNulls(); for (auto& pid : partitionUsed_) { auto& binaryBuf = dst[pid]; // use 32bit offset auto dstLengthBase = (BinaryArrayLengthBufferType*)(binaryBuf.lengthPtr) + partitionBufferBase_[pid]; auto valueOffset = binaryBuf.valueOffset; auto dstValuePtr = binaryBuf.valuePtr + valueOffset; auto capacity = binaryBuf.valueCapacity; auto rowOffsetBase = partition2RowOffsetBase_[pid]; auto numRows = partition2RowCount_[pid]; auto multiply = 1; for (auto i = 0; i < numRows; i++) { auto rowId = rowOffset2RowId_[rowOffsetBase + i]; auto& stringView = srcRawValues[rowId]; size_t isNull = srcRawNulls && facebook::velox::bits::isBitNull(srcRawNulls, rowId); auto stringLen = (isNull - 1) & stringView.size(); // 1. copy length, update offset. dstLengthBase[i] = stringLen; valueOffset += stringLen; // Resize if necessary. if (valueOffset >= capacity) { auto oldCapacity = capacity; (void)oldCapacity; // suppress warning capacity = capacity + std::max((capacity >> multiply), (uint64_t)stringLen); multiply = std::min(3, multiply + 1); const auto& valueBuffer = partitionBuffers_[fixedWidthColumnCount_ + binaryIdx][pid][kBinaryValueBufferIndex]; { binaryArrayResizeState_ = BinaryArrayResizeState{pid, binaryIdx}; BinaryArrayResizeGuard guard(binaryArrayResizeState_); RETURN_NOT_OK(valueBuffer->Reserve(capacity)); } binaryBuf.valuePtr = valueBuffer->mutable_data(); binaryBuf.valueCapacity = capacity; dstValuePtr = binaryBuf.valuePtr + valueOffset - stringLen; // Need to update dstLengthBase because lengthPtr can be updated if Reserve triggers spill. dstLengthBase = (BinaryArrayLengthBufferType*)(binaryBuf.lengthPtr) + partitionBufferBase_[pid]; } // 2. copy value gluten::fastCopy(dstValuePtr, stringView.data(), stringLen); dstValuePtr += stringLen; } binaryBuf.valueOffset = valueOffset; } return arrow::Status::OK(); } arrow::Status VeloxHashShuffleWriter::splitBinaryArray(const facebook::velox::RowVector& rv) { for (auto col = fixedWidthColumnCount_; col < simpleColumnIndices_.size(); ++col) { auto binaryIdx = col - fixedWidthColumnCount_; auto& dstAddrs = partitionBinaryAddrs_[binaryIdx]; auto colIdx = simpleColumnIndices_[col]; auto column = rv.childAt(colIdx)->asFlatVector<facebook::velox::StringView>(); RETURN_NOT_OK(splitBinaryType(binaryIdx, *column, dstAddrs)); } return arrow::Status::OK(); } arrow::Status VeloxHashShuffleWriter::splitComplexType(const facebook::velox::RowVector& rv) { if (complexColumnIndices_.size() == 0) { return arrow::Status::OK(); } auto numRows = rv.size(); std::vector<std::vector<facebook::velox::IndexRange>> rowIndexs; rowIndexs.resize(numPartitions_); // TODO: maybe an estimated row is more reasonable for (auto row = 0; row < numRows; ++row) { auto partition = row2Partition_[row]; if (complexTypeData_[partition] == nullptr) { // TODO: maybe memory issue, copy many times if (arenas_[partition] == nullptr) { arenas_[partition] = std::make_unique<facebook::velox::StreamArena>(veloxPool_.get()); } complexTypeData_[partition] = serde_.createIterativeSerializer( complexWriteType_, partition2RowCount_[partition], arenas_[partition].get(), &serdeOptions_); } rowIndexs[partition].emplace_back(facebook::velox::IndexRange{row, 1}); } std::vector<facebook::velox::VectorPtr> children; children.reserve(complexColumnIndices_.size()); for (size_t i = 0; i < complexColumnIndices_.size(); ++i) { auto colIdx = complexColumnIndices_[i]; children.emplace_back(rv.childAt(colIdx)); } auto rowVector = std::make_shared<facebook::velox::RowVector>( veloxPool_.get(), complexWriteType_, facebook::velox::BufferPtr(nullptr), rv.size(), std::move(children)); for (auto& pid : partitionUsed_) { if (rowIndexs[pid].size() != 0) { auto old = arenas_[pid]->size(); complexTypeData_[pid]->append(rowVector, folly::Range(rowIndexs[pid].data(), rowIndexs[pid].size())); complexTotalSizeBytes_ += arenas_[pid]->size() - old; } } return arrow::Status::OK(); } arrow::Status VeloxHashShuffleWriter::initColumnTypes(const facebook::velox::RowVector& rv) { schema_ = toArrowSchema(rv.type(), veloxPool_.get()); for (size_t i = 0; i < rv.childrenSize(); ++i) { veloxColumnTypes_.push_back(rv.childAt(i)->type()); } VsPrintSplitLF("schema_", schema_->ToString()); // get arrow_column_types_ from schema ARROW_ASSIGN_OR_RAISE(arrowColumnTypes_, toShuffleTypeId(schema_->fields())); std::vector<std::string> complexNames; std::vector<facebook::velox::TypePtr> complexChildrens; for (size_t i = 0; i < arrowColumnTypes_.size(); ++i) { switch (arrowColumnTypes_[i]->id()) { case arrow::BinaryType::type_id: case arrow::StringType::type_id: { binaryColumnIndices_.push_back(i); isValidityBuffer_.push_back(true); isValidityBuffer_.push_back(false); isValidityBuffer_.push_back(false); } break; case arrow::StructType::type_id: case arrow::MapType::type_id: case arrow::ListType::type_id: { complexColumnIndices_.push_back(i); complexNames.emplace_back(veloxColumnTypes_[i]->name()); complexChildrens.emplace_back(veloxColumnTypes_[i]); hasComplexType_ = true; } break; case arrow::BooleanType::type_id: { simpleColumnIndices_.push_back(i); isValidityBuffer_.push_back(true); isValidityBuffer_.push_back(true); } break; case arrow::NullType::type_id: break; default: { simpleColumnIndices_.push_back(i); isValidityBuffer_.push_back(true); isValidityBuffer_.push_back(false); } break; } } if (hasComplexType_) { isValidityBuffer_.push_back(false); } fixedWidthColumnCount_ = simpleColumnIndices_.size(); simpleColumnIndices_.insert(simpleColumnIndices_.end(), binaryColumnIndices_.begin(), binaryColumnIndices_.end()); printColumnsInfo(); binaryArrayTotalSizeBytes_.resize(binaryColumnIndices_.size(), 0); inputHasNull_.resize(simpleColumnIndices_.size(), false); complexTypeData_.resize(numPartitions_); complexTypeFlushBuffer_.resize(numPartitions_); complexWriteType_ = std::make_shared<facebook::velox::RowType>(std::move(complexNames), std::move(complexChildrens)); return arrow::Status::OK(); } arrow::Status VeloxHashShuffleWriter::initFromRowVector(const facebook::velox::RowVector& rv) { if (veloxColumnTypes_.empty()) { RETURN_NOT_OK(initColumnTypes(rv)); RETURN_NOT_OK(initPartitions()); calculateSimpleColumnBytes(); } return arrow::Status::OK(); } inline bool VeloxHashShuffleWriter::beyondThreshold(uint32_t partitionId, uint32_t newSize) { auto currentBufferSize = partitionBufferSize_[partitionId]; return newSize > (1 + options_.bufferReallocThreshold) * currentBufferSize || newSize < (1 - options_.bufferReallocThreshold) * currentBufferSize; } void VeloxHashShuffleWriter::calculateSimpleColumnBytes() { fixedWidthBufferBytes_ = 0; for (size_t col = 0; col < fixedWidthColumnCount_; ++col) { auto colIdx = simpleColumnIndices_[col]; // `bool(1) >> 3` gets 0, so +7 fixedWidthBufferBytes_ += ((arrow::bit_width(arrowColumnTypes_[colIdx]->id()) + 7) >> 3); } fixedWidthBufferBytes_ += kSizeOfBinaryArrayLengthBuffer * binaryColumnIndices_.size(); } uint32_t VeloxHashShuffleWriter::calculatePartitionBufferSize(const facebook::velox::RowVector& rv, int64_t memLimit) { auto bytesPerRow = fixedWidthBufferBytes_; SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingCalculateBufferSize]); auto numRows = rv.size(); // Calculate average size bytes (bytes per row) for each binary array. std::vector<uint64_t> binaryArrayAvgBytesPerRow(binaryColumnIndices_.size()); for (size_t i = 0; i < binaryColumnIndices_.size(); ++i) { uint64_t binarySizeBytes = 0; auto column = rv.childAt(binaryColumnIndices_[i])->asFlatVector<facebook::velox::StringView>(); const auto* srcRawValues = column->rawValues(); const auto* srcRawNulls = column->rawNulls(); for (auto idx = 0; idx < numRows; idx++) { auto& stringView = srcRawValues[idx]; size_t isNull = srcRawNulls && facebook::velox::bits::isBitNull(srcRawNulls, idx); auto stringLen = (isNull - 1) & stringView.size(); binarySizeBytes += stringLen; } binaryArrayTotalSizeBytes_[i] += binarySizeBytes; binaryArrayAvgBytesPerRow[i] = binaryArrayTotalSizeBytes_[i] / (totalInputNumRows_ + numRows); bytesPerRow += binaryArrayAvgBytesPerRow[i]; } VS_PRINT_VECTOR_MAPPING(binaryArrayAvgBytesPerRow); if (totalInputNumRows_ > 0) { bytesPerRow += complexTotalSizeBytes_ / totalInputNumRows_; } VS_PRINTLF(bytesPerRow); memLimit += cachedPayloadSize(); // make sure split buffer uses 128M memory at least, let's hardcode it here for now if (memLimit < kMinMemLimit) { memLimit = kMinMemLimit; } uint64_t preAllocRowCnt = memLimit > 0 && bytesPerRow > 0 ? memLimit / bytesPerRow / numPartitions_ >> 2 : options_.bufferSize; preAllocRowCnt = std::min(preAllocRowCnt, (uint64_t)options_.bufferSize); DLOG(INFO) << "Calculated partition buffer size - memLimit: " << memLimit << ", bytesPerRow: " << bytesPerRow << ", preAllocRowCnt: " << preAllocRowCnt << std::endl; VS_PRINTLF(preAllocRowCnt); totalInputNumRows_ += numRows; maxBatchSize_ = preAllocRowCnt == 0 ? numPartitions_ : preAllocRowCnt * numPartitions_; return (uint32_t)preAllocRowCnt; } arrow::Result<std::shared_ptr<arrow::ResizableBuffer>> VeloxHashShuffleWriter::allocateValidityBuffer(uint32_t col, uint32_t partitionId, uint32_t newSize) { if (inputHasNull_[col]) { ARROW_ASSIGN_OR_RAISE( auto validityBuffer, arrow::AllocateResizableBuffer(arrow::bit_util::BytesForBits(newSize), partitionBufferPool_.get())); // initialize all true once allocated memset(validityBuffer->mutable_data(), 0xff, validityBuffer->capacity()); partitionValidityAddrs_[col][partitionId] = validityBuffer->mutable_data(); return validityBuffer; } partitionValidityAddrs_[col][partitionId] = nullptr; return nullptr; } arrow::Status VeloxHashShuffleWriter::updateValidityBuffers(uint32_t partitionId, uint32_t newSize) { for (auto i = 0; i < simpleColumnIndices_.size(); ++i) { // If the validity buffer is not yet allocated, allocate and fill 0xff based on inputHasNull_. if (partitionValidityAddrs_[i][partitionId] == nullptr) { ARROW_ASSIGN_OR_RAISE( partitionBuffers_[i][partitionId][kValidityBufferIndex], allocateValidityBuffer(i, partitionId, newSize)); } } return arrow::Status::OK(); } arrow::Status VeloxHashShuffleWriter::allocatePartitionBuffer(uint32_t partitionId, uint32_t newSize) { SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingAllocateBuffer]); for (auto i = 0; i < simpleColumnIndices_.size(); ++i) { auto columnType = schema_->field(simpleColumnIndices_[i])->type()->id(); auto& buffers = partitionBuffers_[i][partitionId]; std::shared_ptr<arrow::ResizableBuffer> validityBuffer{}; ARROW_ASSIGN_OR_RAISE(validityBuffer, allocateValidityBuffer(i, partitionId, newSize)); switch (columnType) { // binary types case arrow::BinaryType::type_id: case arrow::StringType::type_id: { auto binaryIdx = i - fixedWidthColumnCount_; std::shared_ptr<arrow::ResizableBuffer> lengthBuffer{}; auto lengthBufferSize = newSize * kSizeOfBinaryArrayLengthBuffer; ARROW_ASSIGN_OR_RAISE( lengthBuffer, arrow::AllocateResizableBuffer(lengthBufferSize, partitionBufferPool_.get())); std::shared_ptr<arrow::ResizableBuffer> valueBuffer{}; auto valueBufferSize = valueBufferSizeForBinaryArray(binaryIdx, newSize); ARROW_ASSIGN_OR_RAISE(valueBuffer, arrow::AllocateResizableBuffer(valueBufferSize, partitionBufferPool_.get())); partitionBinaryAddrs_[binaryIdx][partitionId] = BinaryBuf(valueBuffer->mutable_data(), lengthBuffer->mutable_data(), valueBufferSize); buffers = {std::move(validityBuffer), std::move(lengthBuffer), std::move(valueBuffer)}; break; } case arrow::NullType::type_id: { break; } default: { // fixed-width types std::shared_ptr<arrow::ResizableBuffer> valueBuffer{}; ARROW_ASSIGN_OR_RAISE( valueBuffer, arrow::AllocateResizableBuffer(valueBufferSizeForFixedWidthArray(i, newSize), partitionBufferPool_.get())); partitionFixedWidthValueAddrs_[i][partitionId] = valueBuffer->mutable_data(); buffers = {std::move(validityBuffer), std::move(valueBuffer)}; break; } } } partitionBufferSize_[partitionId] = newSize; return arrow::Status::OK(); } arrow::Status VeloxHashShuffleWriter::evictBuffers( uint32_t partitionId, uint32_t numRows, std::vector<std::shared_ptr<arrow::Buffer>> buffers, bool reuseBuffers) { if (!buffers.empty()) { auto payload = std::make_unique<InMemoryPayload>(numRows, &isValidityBuffer_, std::move(buffers), hasComplexType_); RETURN_NOT_OK(partitionWriter_->hashEvict(partitionId, std::move(payload), Evict::kCache, reuseBuffers)); } return arrow::Status::OK(); } arrow::Status VeloxHashShuffleWriter::evictPartitionBuffers(uint32_t partitionId, bool reuseBuffers) { auto numRows = partitionBufferBase_[partitionId]; if (numRows > 0) { ARROW_ASSIGN_OR_RAISE(auto buffers, assembleBuffers(partitionId, reuseBuffers)); RETURN_NOT_OK(evictBuffers(partitionId, numRows, std::move(buffers), reuseBuffers)); } return arrow::Status::OK(); } arrow::Result<std::vector<std::shared_ptr<arrow::Buffer>>> VeloxHashShuffleWriter::assembleBuffers( uint32_t partitionId, bool reuseBuffers) { SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingCreateRbFromBuffer]); auto numRows = partitionBufferBase_[partitionId]; auto fixedWidthIdx = 0; auto binaryIdx = 0; auto numFields = schema_->num_fields(); std::vector<std::shared_ptr<arrow::Array>> arrays(numFields); std::vector<std::shared_ptr<arrow::Buffer>> allBuffers; // One column should have 2 buffers at least, string column has 3 column buffers. allBuffers.reserve(fixedWidthColumnCount_ * 2 + binaryColumnIndices_.size() * 3 + hasComplexType_); for (int i = 0; i < numFields; ++i) { switch (arrowColumnTypes_[i]->id()) { case arrow::BinaryType::type_id: case arrow::StringType::type_id: { const auto& buffers = partitionBuffers_[fixedWidthColumnCount_ + binaryIdx][partitionId]; auto& binaryBuf = partitionBinaryAddrs_[binaryIdx][partitionId]; // validity buffer if (buffers[kValidityBufferIndex] != nullptr) { auto validityBufferSize = arrow::bit_util::BytesForBits(numRows); if (reuseBuffers) { allBuffers.push_back( arrow::SliceBuffer(buffers[kValidityBufferIndex], 0, arrow::bit_util::BytesForBits(numRows))); } else { RETURN_NOT_OK(buffers[kValidityBufferIndex]->Resize(validityBufferSize, true)); allBuffers.push_back(std::move(buffers[kValidityBufferIndex])); } } else { allBuffers.push_back(nullptr); } // Length buffer. auto lengthBufferSize = numRows * kSizeOfBinaryArrayLengthBuffer; ARROW_RETURN_IF( !buffers[kBinaryLengthBufferIndex], arrow::Status::Invalid("Offset buffer of binary array is null.")); if (reuseBuffers) { allBuffers.push_back(arrow::SliceBuffer(buffers[kBinaryLengthBufferIndex], 0, lengthBufferSize)); } else { RETURN_NOT_OK(buffers[kBinaryLengthBufferIndex]->Resize(lengthBufferSize, true)); allBuffers.push_back(std::move(buffers[kBinaryLengthBufferIndex])); } // Value buffer. auto valueBufferSize = binaryBuf.valueOffset; ARROW_RETURN_IF( !buffers[kBinaryValueBufferIndex], arrow::Status::Invalid("Value buffer of binary array is null.")); if (reuseBuffers) { allBuffers.push_back(arrow::SliceBuffer(buffers[kBinaryValueBufferIndex], 0, valueBufferSize)); } else if (valueBufferSize > 0) { RETURN_NOT_OK(buffers[kBinaryValueBufferIndex]->Resize(valueBufferSize, true)); allBuffers.push_back(std::move(buffers[kBinaryValueBufferIndex])); } else { // Binary value buffer size can be 0, in which case cannot be resized. allBuffers.push_back(zeroLengthNullBuffer()); } if (reuseBuffers) { // Set the first value offset to 0. binaryBuf.valueOffset = 0; } binaryIdx++; break; } case arrow::StructType::type_id: case arrow::MapType::type_id: case arrow::ListType::type_id: break; case arrow::NullType::type_id: { break; } default: { auto& buffers = partitionBuffers_[fixedWidthIdx][partitionId]; // validity buffer if (buffers[kValidityBufferIndex] != nullptr) { auto validityBufferSize = arrow::bit_util::BytesForBits(numRows); if (reuseBuffers) { allBuffers.push_back( arrow::SliceBuffer(buffers[kValidityBufferIndex], 0, arrow::bit_util::BytesForBits(numRows))); } else { RETURN_NOT_OK(buffers[kValidityBufferIndex]->Resize(validityBufferSize, true)); allBuffers.push_back(std::move(buffers[kValidityBufferIndex])); } } else { allBuffers.push_back(nullptr); } // Value buffer. uint64_t valueBufferSize = 0; auto& valueBuffer = buffers[kFixedWidthValueBufferIndex]; ARROW_RETURN_IF(!valueBuffer, arrow::Status::Invalid("Value buffer of fixed-width array is null.")); if (arrowColumnTypes_[i]->id() == arrow::BooleanType::type_id) { valueBufferSize = arrow::bit_util::BytesForBits(numRows); } else if (veloxColumnTypes_[i]->isShortDecimal()) { valueBufferSize = numRows * (arrow::bit_width(arrow::Int64Type::type_id) >> 3); } else if (veloxColumnTypes_[i]->kind() == facebook::velox::TypeKind::TIMESTAMP) { valueBufferSize = facebook::velox::BaseVector::byteSize<facebook::velox::Timestamp>(numRows); } else { valueBufferSize = numRows * (arrow::bit_width(arrowColumnTypes_[i]->id()) >> 3); } if (reuseBuffers) { auto slicedValueBuffer = arrow::SliceBuffer(valueBuffer, 0, valueBufferSize); allBuffers.push_back(std::move(slicedValueBuffer)); } else { RETURN_NOT_OK(buffers[kFixedWidthValueBufferIndex]->Resize(valueBufferSize, true)); allBuffers.push_back(std::move(buffers[kFixedWidthValueBufferIndex])); } fixedWidthIdx++; break; } } } if (hasComplexType_ && complexTypeData_[partitionId] != nullptr) { auto flushBuffer = complexTypeFlushBuffer_[partitionId]; auto serializedSize = complexTypeData_[partitionId]->maxSerializedSize(); if (flushBuffer == nullptr) { ARROW_ASSIGN_OR_RAISE(flushBuffer, arrow::AllocateResizableBuffer(serializedSize, partitionBufferPool_.get())); } else if (serializedSize > flushBuffer->capacity()) { RETURN_NOT_OK(flushBuffer->Reserve(serializedSize)); } auto valueBuffer = arrow::SliceMutableBuffer(flushBuffer, 0, serializedSize); auto output = std::make_shared<arrow::io::FixedSizeBufferWriter>(valueBuffer); facebook::velox::serializer::presto::PrestoOutputStreamListener listener; ArrowFixedSizeBufferOutputStream out(output, &listener); complexTypeData_[partitionId]->flush(&out); allBuffers.emplace_back(valueBuffer); complexTypeData_[partitionId] = nullptr; arenas_[partitionId] = nullptr; } partitionBufferBase_[partitionId] = 0; if (!reuseBuffers) { RETURN_NOT_OK(resetPartitionBuffer(partitionId)); } return allBuffers; } arrow::Status VeloxHashShuffleWriter::reclaimFixedSize(int64_t size, int64_t* actual) { if (evictState_ == EvictState::kUnevictable) { *actual = 0; return arrow::Status::OK(); } EvictGuard evictGuard{evictState_}; int64_t reclaimed = 0; if (reclaimed < size) { ARROW_ASSIGN_OR_RAISE(auto cached, evictCachedPayload(size - reclaimed)); reclaimed += cached; } if (reclaimed < size && shrinkPartitionBuffersAfterSpill()) { ARROW_ASSIGN_OR_RAISE(auto shrunken, shrinkPartitionBuffersMinSize(size - reclaimed)); reclaimed += shrunken; } if (reclaimed < size && evictPartitionBuffersAfterSpill()) { ARROW_ASSIGN_OR_RAISE(auto evicted, evictPartitionBuffersMinSize(size - reclaimed)); reclaimed += evicted; } *actual = reclaimed; return arrow::Status::OK(); } arrow::Result<int64_t> VeloxHashShuffleWriter::evictCachedPayload(int64_t size) { SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingEvictPartition]); int64_t actual; auto before = partitionBufferPool_->bytes_allocated(); RETURN_NOT_OK(partitionWriter_->reclaimFixedSize(size, &actual)); // Need to count the changes from partitionBufferPool as well. // When the evicted partition buffers are not copied, the merged ones // are resized from the original buffers thus allocated from partitionBufferPool. actual += before - partitionBufferPool_->bytes_allocated(); DLOG(INFO) << "Evicted all cached payloads. " << std::to_string(actual) << " bytes released" << std::endl; return actual; } arrow::Status VeloxHashShuffleWriter::resetValidityBuffer(uint32_t partitionId) { std::for_each(partitionBuffers_.begin(), partitionBuffers_.end(), [partitionId](auto& bufs) { if (bufs[partitionId].size() != 0 && bufs[partitionId][kValidityBufferIndex] != nullptr) { // initialize all true once allocated auto validityBuffer = bufs[partitionId][kValidityBufferIndex]; memset(validityBuffer->mutable_data(), 0xff, validityBuffer->capacity()); } }); return arrow::Status::OK(); } arrow::Status VeloxHashShuffleWriter::resizePartitionBuffer(uint32_t partitionId, uint32_t newSize, bool preserveData) { for (auto i = 0; i < simpleColumnIndices_.size(); ++i) { auto columnType = schema_->field(simpleColumnIndices_[i])->type()->id(); auto& buffers = partitionBuffers_[i][partitionId]; // Handle validity buffer first. auto& validityBuffer = buffers[kValidityBufferIndex]; if (!preserveData) { ARROW_ASSIGN_OR_RAISE(validityBuffer, allocateValidityBuffer(i, partitionId, newSize)); } else if (buffers[kValidityBufferIndex]) { // Resize validity. auto filled = validityBuffer->capacity(); RETURN_NOT_OK(validityBuffer->Resize(arrow::bit_util::BytesForBits(newSize))); partitionValidityAddrs_[i][partitionId] = validityBuffer->mutable_data(); // If newSize is larger, fill 1 to the newly allocated bytes. if (validityBuffer->capacity() > filled) { memset(validityBuffer->mutable_data() + filled, 0xff, validityBuffer->capacity() - filled); } } // Resize value buffer if fixed-width, offset & value buffers if binary. switch (columnType) { // binary types case arrow::BinaryType::type_id: case arrow::StringType::type_id: { // Resize length buffer. auto binaryIdx = i - fixedWidthColumnCount_; auto& binaryBuf = partitionBinaryAddrs_[binaryIdx][partitionId]; auto& lengthBuffer = buffers[kBinaryLengthBufferIndex]; ARROW_RETURN_IF(!lengthBuffer, arrow::Status::Invalid("Offset buffer of binary array is null.")); RETURN_NOT_OK(lengthBuffer->Resize(newSize * kSizeOfBinaryArrayLengthBuffer)); // Skip Resize value buffer if the spill is triggered by resizing this split binary buffer. // Only update length buffer ptr. if (binaryArrayResizeState_.inResize && partitionId == binaryArrayResizeState_.partitionId && binaryIdx == binaryArrayResizeState_.binaryIdx) { binaryBuf.lengthPtr = lengthBuffer->mutable_data(); break; } // Resize value buffer. auto& valueBuffer = buffers[kBinaryValueBufferIndex]; ARROW_RETURN_IF(!valueBuffer, arrow::Status::Invalid("Value buffer of binary array is null.")); // Determine the new Size for value buffer. auto valueBufferSize = valueBufferSizeForBinaryArray(binaryIdx, newSize); // If shrink is triggered by spill, and binary new size is larger, do not resize the buffer to avoid issuing // another spill. Only update length buffer ptr. if (evictState_ == EvictState::kUnevictable && newSize <= partitionBufferSize_[partitionId] && valueBufferSize >= valueBuffer->size()) { binaryBuf.lengthPtr = lengthBuffer->mutable_data(); break; } auto valueOffset = 0; // If preserve data, the new valueBufferSize should not be smaller than the current offset. if (preserveData) { valueBufferSize = std::max(binaryBuf.valueOffset, valueBufferSize); valueOffset = binaryBuf.valueOffset; } RETURN_NOT_OK(valueBuffer->Resize(valueBufferSize)); binaryBuf = BinaryBuf(valueBuffer->mutable_data(), lengthBuffer->mutable_data(), valueBufferSize, valueOffset); break; } case arrow::NullType::type_id: break; default: { // fixed-width types auto& valueBuffer = buffers[kFixedWidthValueBufferIndex]; ARROW_RETURN_IF(!valueBuffer, arrow::Status::Invalid("Value buffer of fixed-width array is null.")); RETURN_NOT_OK(valueBuffer->Resize(valueBufferSizeForFixedWidthArray(i, newSize))); partitionFixedWidthValueAddrs_[i][partitionId] = valueBuffer->mutable_data(); break; } } } partitionBufferSize_[partitionId] = newSize; return arrow::Status::OK(); } arrow::Status VeloxHashShuffleWriter::shrinkPartitionBuffer(uint32_t partitionId) { auto bufferSize = partitionBufferSize_[partitionId]; if (bufferSize == 0) { return arrow::Status::OK(); } ARROW_ASSIGN_OR_RAISE(auto newSize, partitionBufferSizeAfterShrink(partitionId)); if (newSize > bufferSize) { std::stringstream invalid; invalid << "Cannot shrink to larger size. Partition: " << partitionId << ", before shrink: " << bufferSize << ", after shrink" << newSize; return arrow::Status::Invalid(invalid.str()); } if (newSize == bufferSize) { // No space to shrink. return arrow::Status::OK(); } if (newSize == 0) { return resetPartitionBuffer(partitionId); } return resizePartitionBuffer(partitionId, newSize, /*preserveData=*/true); } uint64_t VeloxHashShuffleWriter::valueBufferSizeForBinaryArray(uint32_t binaryIdx, uint32_t newSize) { return (binaryArrayTotalSizeBytes_[binaryIdx] + totalInputNumRows_ - 1) / totalInputNumRows_ * newSize + 1024; } uint64_t VeloxHashShuffleWriter::valueBufferSizeForFixedWidthArray(uint32_t fixedWidthIndex, uint32_t newSize) { uint64_t valueBufferSize = 0; auto columnIdx = simpleColumnIndices_[fixedWidthIndex]; if (arrowColumnTypes_[columnIdx]->id() == arrow::BooleanType::type_id) { valueBufferSize = arrow::bit_util::BytesForBits(newSize); } else if (veloxColumnTypes_[columnIdx]->isShortDecimal()) { valueBufferSize = newSize * (arrow::bit_width(arrow::Int64Type::type_id) >> 3); } else if (veloxColumnTypes_[columnIdx]->kind() == facebook::velox::TypeKind::TIMESTAMP) { valueBufferSize = facebook::velox::BaseVector::byteSize<facebook::velox::Timestamp>(newSize); } else { valueBufferSize = newSize * (arrow::bit_width(arrowColumnTypes_[columnIdx]->id()) >> 3); } return valueBufferSize; } void VeloxHashShuffleWriter::stat() const { #if VELOX_SHUFFLE_WRITER_LOG_FLAG for (int i = CpuWallTimingBegin; i != CpuWallTimingEnd; ++i) { std::ostringstream oss; auto& timing = cpuWallTimingList_[i]; oss << "Velox shuffle writer stat:" << CpuWallTimingName((CpuWallTimingType)i); oss << " " << timing.toString(); if (timing.count > 0) { oss << " wallNanos-avg:" << timing.wallNanos / timing.count; oss << " cpuNanos-avg:" << timing.cpuNanos / timing.count; } LOG(INFO) << oss.str(); } #endif } arrow::Status VeloxHashShuffleWriter::resetPartitionBuffer(uint32_t partitionId) { // Reset fixed-width partition buffers for (auto i = 0; i < fixedWidthColumnCount_; ++i) { partitionValidityAddrs_[i][partitionId] = nullptr; partitionFixedWidthValueAddrs_[i][partitionId] = nullptr; partitionBuffers_[i][partitionId].clear(); } // Reset binary partition buffers for (auto i = 0; i < binaryColumnIndices_.size(); ++i) { auto binaryIdx = i + fixedWidthColumnCount_; partitionValidityAddrs_[binaryIdx][partitionId] = nullptr; partitionBinaryAddrs_[i][partitionId] = BinaryBuf(); partitionBuffers_[binaryIdx][partitionId].clear(); } partitionBufferSize_[partitionId] = 0; return arrow::Status::OK(); } const uint64_t VeloxHashShuffleWriter::cachedPayloadSize() const { return partitionWriter_->cachedPayloadSize(); } arrow::Result<int64_t> VeloxHashShuffleWriter::shrinkPartitionBuffersMinSize(int64_t size) { // Sort partition buffers by (partitionBufferSize_ - partitionBufferBase_) std::vector<std::pair<uint32_t, uint32_t>> pidToSize; for (auto pid = 0; pid < numPartitions_; ++pid) { if (partitionBufferInUse_.has_value() && *partitionBufferInUse_ == pid) { continue; } if (partitionBufferSize_[pid] > 0 && partitionBufferSize_[pid] > partitionBufferBase_[pid]) { pidToSize.emplace_back(pid, partitionBufferSize_[pid] - partitionBufferBase_[pid]); } } // No shrinkable partition buffer. if (pidToSize.empty()) { return 0; } std::sort(pidToSize.begin(), pidToSize.end(), [&](const auto& a, const auto& b) { return a.second > b.second; }); auto beforeShrink = partitionBufferPool_->bytes_allocated(); auto shrunken = 0; auto iter = pidToSize.begin(); // Shrink in order to reclaim the largest amount of space with fewer resizes. do { RETURN_NOT_OK(shrinkPartitionBuffer(iter->first)); shrunken = beforeShrink - partitionBufferPool_->bytes_allocated(); iter++; } while (shrunken < size && iter != pidToSize.end()); return shrunken; } arrow::Result<int64_t> VeloxHashShuffleWriter::evictPartitionBuffersMinSize(int64_t size) { // Evict partition buffers, only when splitState_ == SplitState::kInit, and space freed from // shrinking is not enough. In this case partitionBufferSize_ == partitionBufferBase_ VELOX_CHECK(!partitionBufferInUse_); int64_t beforeEvict = partitionBufferPool_->bytes_allocated(); int64_t evicted = 0; std::vector<std::pair<uint32_t, uint32_t>> pidToSize; for (auto pid = 0; pid < numPartitions_; ++pid) { if (partitionBufferSize_[pid] == 0) { continue; } pidToSize.emplace_back(pid, partitionBufferSize_[pid]); } if (!pidToSize.empty()) { for (auto& item : pidToSize) { auto pid = item.first; ARROW_ASSIGN_OR_RAISE(auto buffers, assembleBuffers(pid, false)); auto payload = std::make_unique<InMemoryPayload>(item.second, &isValidityBuffer_, std::move(buffers), hasComplexType_); metrics_.totalBytesToEvict += payload->rawSize(); RETURN_NOT_OK(partitionWriter_->hashEvict(pid, std::move(payload), Evict::kSpill, false)); evicted = beforeEvict - partitionBufferPool_->bytes_allocated(); if (evicted >= size) { break; } } } return evicted; } bool VeloxHashShuffleWriter::shrinkPartitionBuffersAfterSpill() const { // If OOM happens during SplitState::kSplit, it is triggered by binary buffers resize. // Or during SplitState::kInit, it is triggered by other operators. // Or during SplitState::kStopEvict, it is triggered by assembleBuffers allocating extra memory. In this case we use // PartitionBufferGuard to prevent the target partition from being shrunk. // The reclaim order is spill->shrink, because the partition buffers can be reused. // SinglePartitioning doesn't maintain partition buffers. return options_.partitioning != Partitioning::kSingle && (splitState_ == SplitState::kSplit || splitState_ == SplitState::kInit || splitState_ == SplitState::kStopEvict); } bool VeloxHashShuffleWriter::evictPartitionBuffersAfterSpill() const { // If OOM triggered by other operators, the splitState_ is SplitState::kInit. // The last resort is to evict the partition buffers to reclaim more space. return options_.partitioning != Partitioning::kSingle && splitState_ == SplitState::kInit; } arrow::Result<uint32_t> VeloxHashShuffleWriter::partitionBufferSizeAfterShrink(uint32_t partitionId) const { if (splitState_ == SplitState::kSplit) { return partitionBufferBase_[partitionId] + partition2RowCount_[partitionId]; } if (splitState_ == kInit || splitState_ == SplitState::kStopEvict) { return partitionBufferBase_[partitionId]; } return arrow::Status::Invalid("Cannot shrink partition buffers in SplitState: " + std::to_string(splitState_)); } arrow::Status VeloxHashShuffleWriter::preAllocPartitionBuffers(uint32_t preAllocBufferSize) { for (auto& pid : partitionUsed_) { auto newSize = std::max(preAllocBufferSize, partition2RowCount_[pid]); DLOG_IF(INFO, partitionBufferSize_[pid] != newSize) << "Actual partition buffer size - current: " << partitionBufferSize_[pid] << ", newSize: " << newSize << std::endl; // Make sure the size to be allocated is larger than the size to be filled. if (partitionBufferSize_[pid] == 0) { // Allocate buffer if it's not yet allocated. RETURN_NOT_OK(allocatePartitionBuffer(pid, newSize)); } else if (beyondThreshold(pid, newSize)) { if (newSize <= partitionBufferBase_[pid]) { // If the newSize is smaller, cache the buffered data and reuse and shrink the buffer. RETURN_NOT_OK(evictPartitionBuffers(pid, true)); RETURN_NOT_OK(resizePartitionBuffer(pid, newSize, /*preserveData=*/false)); } else { // If the newSize is larger, check if alreadyFilled + toBeFilled <= newSize if (partitionBufferBase_[pid] + partition2RowCount_[pid] <= newSize) { // If so, keep the data in buffers and resize buffers. RETURN_NOT_OK(resizePartitionBuffer(pid, newSize, /*preserveData=*/true)); // Because inputHasNull_ is updated every time split is called, and resizePartitionBuffer won't allocate // validity buffer. RETURN_NOT_OK(updateValidityBuffers(pid, newSize)); } else { // Otherwise cache the buffered data. // If newSize <= allocated buffer size, reuse and shrink the buffer. // Else free and allocate new buffers. bool reuseBuffers = newSize <= partitionBufferSize_[pid]; RETURN_NOT_OK(evictPartitionBuffers(pid, reuseBuffers)); if (reuseBuffers) { RETURN_NOT_OK(resizePartitionBuffer(pid, newSize, /*preserveData=*/false)); } else { RETURN_NOT_OK(allocatePartitionBuffer(pid, newSize)); } } } } else if (partitionBufferBase_[pid] + partition2RowCount_[pid] > partitionBufferSize_[pid]) { // If the size to be filled + already filled > the buffer size, need to free current buffers and allocate new // buffer. if (newSize > partitionBufferSize_[pid]) { // If the partition size after split is already larger than allocated buffer size, need reallocate. RETURN_NOT_OK(evictPartitionBuffers(pid, false)); RETURN_NOT_OK(allocatePartitionBuffer(pid, newSize)); } else { // Partition size after split is smaller than buffer size. Reuse the buffers. RETURN_NOT_OK(evictPartitionBuffers(pid, true)); // Reset validity buffer for reallocate. RETURN_NOT_OK(resetValidityBuffer(pid)); } } } return arrow::Status::OK(); } bool VeloxHashShuffleWriter::isExtremelyLargeBatch(facebook::velox::RowVectorPtr& rv) const { return (rv->size() > maxBatchSize_ && maxBatchSize_ > 0); } } // namespace gluten