cpp/core/shuffle/ShuffleWriter.h (73 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <arrow/ipc/writer.h>
#include <numeric>
#include <utility>
#include "memory/ArrowMemoryPool.h"
#include "memory/ColumnarBatch.h"
#include "memory/Reclaimable.h"
#include "shuffle/Options.h"
#include "shuffle/PartitionWriter.h"
#include "shuffle/Partitioner.h"
#include "shuffle/Partitioning.h"
#include "shuffle/ShuffleMemoryPool.h"
#include "utils/Compression.h"
namespace gluten {
class ShuffleWriter : public Reclaimable {
public:
static constexpr int64_t kMinMemLimit = 128LL * 1024 * 1024;
virtual arrow::Status split(std::shared_ptr<ColumnarBatch> cb, int64_t memLimit) = 0;
virtual arrow::Status stop() = 0;
int32_t numPartitions() const {
return numPartitions_;
}
int64_t partitionBufferSize() const {
return partitionBufferPool_->bytes_allocated();
}
int64_t maxPartitionBufferSize() const {
return partitionBufferPool_->max_memory();
}
int64_t totalBytesWritten() const {
return metrics_.totalBytesWritten;
}
int64_t totalBytesEvicted() const {
return metrics_.totalBytesEvicted;
}
int64_t totalWriteTime() const {
return metrics_.totalWriteTime;
}
int64_t totalEvictTime() const {
return metrics_.totalEvictTime;
}
int64_t totalCompressTime() const {
return metrics_.totalCompressTime;
}
const std::vector<int64_t>& partitionLengths() const {
return metrics_.partitionLengths;
}
const std::vector<int64_t>& rawPartitionLengths() const {
return metrics_.rawPartitionLengths;
}
virtual const uint64_t cachedPayloadSize() const = 0;
protected:
ShuffleWriter(
int32_t numPartitions,
std::unique_ptr<PartitionWriter> partitionWriter,
ShuffleWriterOptions options,
arrow::MemoryPool* pool)
: numPartitions_(numPartitions),
options_(std::move(options)),
pool_(pool),
partitionBufferPool_(std::make_unique<ShuffleMemoryPool>(pool)),
partitionWriter_(std::move(partitionWriter)) {}
virtual ~ShuffleWriter() = default;
int32_t numPartitions_;
ShuffleWriterOptions options_;
arrow::MemoryPool* pool_;
// Memory Pool used to track memory usage of partition buffers.
// The actual allocation is delegated to options_.memoryPool.
std::unique_ptr<ShuffleMemoryPool> partitionBufferPool_;
std::unique_ptr<PartitionWriter> partitionWriter_;
std::shared_ptr<arrow::Schema> schema_;
// Column index, partition id, buffers.
std::vector<std::vector<std::vector<std::shared_ptr<arrow::ResizableBuffer>>>> partitionBuffers_;
std::shared_ptr<Partitioner> partitioner_;
ShuffleWriterMetrics metrics_{};
};
} // namespace gluten