cpp/velox/shuffle/VeloxShuffleWriter.h (167 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include <algorithm> #include <memory> #include <string> #include <vector> #include "velox/common/time/CpuWallTimer.h" #include "velox/serializers/PrestoSerializer.h" #include "velox/type/Type.h" #include "velox/vector/ComplexVector.h" #include "velox/vector/FlatVector.h" #include "velox/vector/VectorStream.h" #include <arrow/array/util.h> #include <arrow/ipc/writer.h> #include <arrow/memory_pool.h> #include <arrow/record_batch.h> #include <arrow/result.h> #include <arrow/type.h> #include "memory/VeloxMemoryManager.h" #include "shuffle/Options.h" #include "shuffle/PartitionWriter.h" #include "shuffle/Partitioner.h" #include "shuffle/ShuffleWriter.h" #include "shuffle/Utils.h" #include "utils/Print.h" namespace gluten { class VeloxShuffleWriter : public ShuffleWriter { public: static arrow::Result<std::shared_ptr<VeloxShuffleWriter>> create( ShuffleWriterType type, uint32_t numPartitions, std::unique_ptr<PartitionWriter> partitionWriter, ShuffleWriterOptions options, std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool, arrow::MemoryPool* arrowPool); facebook::velox::RowVectorPtr getStrippedRowVector(const facebook::velox::RowVector& rv) { // get new row type auto& rowType = rv.type()->asRow(); auto typeChildren = rowType.children(); typeChildren.erase(typeChildren.begin()); auto newRowType = facebook::velox::ROW(std::move(typeChildren)); // get length auto length = rv.size(); // get children auto children = rv.children(); children.erase(children.begin()); return std::make_shared<facebook::velox::RowVector>( rv.pool(), newRowType, facebook::velox::BufferPtr(nullptr), length, std::move(children)); } const int32_t* getFirstColumn(const facebook::velox::RowVector& rv) { VELOX_CHECK(rv.childrenSize() > 0, "RowVector missing partition id column."); auto& firstChild = rv.childAt(0); VELOX_CHECK(firstChild->isFlatEncoding(), "Partition id (field 0) is not flat encoding."); VELOX_CHECK( firstChild->type()->isInteger(), "Partition id (field 0) should be integer, but got {}", firstChild->type()->toString()); // first column is partition key hash value or pid return firstChild->asFlatVector<int32_t>()->rawValues(); } // For test only. virtual void setPartitionBufferSize(uint32_t newSize) {} virtual arrow::Status evictPartitionBuffers(uint32_t partitionId, bool reuseBuffers) { return arrow::Status::OK(); } virtual arrow::Status evictRowVector(uint32_t partitionId) { return arrow::Status::OK(); } virtual const uint64_t cachedPayloadSize() const { return 0; } int32_t maxBatchSize() const { return maxBatchSize_; } int64_t partitionBufferSize() const { return partitionBufferPool_->bytes_allocated(); } int64_t peakBytesAllocated() const override { return partitionBufferPool_->max_memory() + veloxPool_->peakBytes(); } protected: VeloxShuffleWriter( uint32_t numPartitions, std::unique_ptr<PartitionWriter> partitionWriter, ShuffleWriterOptions options, std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool, arrow::MemoryPool* pool) : ShuffleWriter(numPartitions, std::move(options), pool), partitionBufferPool_(std::make_unique<ShuffleMemoryPool>(pool)), veloxPool_(std::move(veloxPool)), partitionWriter_(std::move(partitionWriter)) { partitioner_ = Partitioner::make(options_.partitioning, numPartitions_, options_.startPartitionId); arenas_.resize(numPartitions); serdeOptions_.useLosslessTimestamp = true; } virtual ~VeloxShuffleWriter() = default; // Memory Pool used to track memory usage of partition buffers. // The actual allocation is delegated to options_.memoryPool. std::unique_ptr<ShuffleMemoryPool> partitionBufferPool_; std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool_; // PartitionWriter must destruct before partitionBufferPool_, as it may hold buffers allocated by // partitionBufferPool_. std::unique_ptr<PartitionWriter> partitionWriter_; std::shared_ptr<Partitioner> partitioner_; std::vector<std::unique_ptr<facebook::velox::StreamArena>> arenas_; facebook::velox::serializer::presto::PrestoVectorSerde::PrestoOptions serdeOptions_; int32_t maxBatchSize_{0}; enum EvictState { kEvictable, kUnevictable }; // stat enum CpuWallTimingType { CpuWallTimingBegin = 0, CpuWallTimingCompute = CpuWallTimingBegin, CpuWallTimingBuildPartition, CpuWallTimingEvictPartition, CpuWallTimingHasNull, CpuWallTimingCalculateBufferSize, CpuWallTimingAllocateBuffer, CpuWallTimingCreateRbFromBuffer, CpuWallTimingMakeRB, CpuWallTimingCacheRB, CpuWallTimingFlattenRV, CpuWallTimingSplitRV, CpuWallTimingIteratePartitions, CpuWallTimingStop, CpuWallTimingEnd, CpuWallTimingNum = CpuWallTimingEnd - CpuWallTimingBegin }; static std::string CpuWallTimingName(CpuWallTimingType type) { switch (type) { case CpuWallTimingCompute: return "CpuWallTimingCompute"; case CpuWallTimingBuildPartition: return "CpuWallTimingBuildPartition"; case CpuWallTimingEvictPartition: return "CpuWallTimingEvictPartition"; case CpuWallTimingHasNull: return "CpuWallTimingHasNull"; case CpuWallTimingCalculateBufferSize: return "CpuWallTimingCalculateBufferSize"; case CpuWallTimingAllocateBuffer: return "CpuWallTimingAllocateBuffer"; case CpuWallTimingCreateRbFromBuffer: return "CpuWallTimingCreateRbFromBuffer"; case CpuWallTimingMakeRB: return "CpuWallTimingMakeRB"; case CpuWallTimingCacheRB: return "CpuWallTimingCacheRB"; case CpuWallTimingFlattenRV: return "CpuWallTimingFlattenRV"; case CpuWallTimingSplitRV: return "CpuWallTimingSplitRV"; case CpuWallTimingIteratePartitions: return "CpuWallTimingIteratePartitions"; case CpuWallTimingStop: return "CpuWallTimingStop"; default: return "CpuWallTimingUnknown"; } } facebook::velox::CpuWallTiming cpuWallTimingList_[CpuWallTimingNum]; EvictState evictState_{kEvictable}; class EvictGuard { public: explicit EvictGuard(EvictState& evictState) : evictState_(evictState) { evictState_ = EvictState::kUnevictable; } ~EvictGuard() { evictState_ = EvictState::kEvictable; } // For safety and clarity. EvictGuard(const EvictGuard&) = delete; EvictGuard& operator=(const EvictGuard&) = delete; EvictGuard(EvictGuard&&) = delete; EvictGuard& operator=(EvictGuard&&) = delete; private: EvictState& evictState_; }; }; } // namespace gluten