cpp/core/shuffle/LocalPartitionWriter.cc (481 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <filesystem>
#include <random>
#include <thread>
#include <boost/stacktrace.hpp>
#include <glog/logging.h>
#include "shuffle/LocalPartitionWriter.h"
#include "shuffle/Payload.h"
#include "shuffle/Spill.h"
#include "shuffle/Utils.h"
#include "utils/Timer.h"
namespace gluten {
class LocalPartitionWriter::LocalSpiller {
public:
LocalSpiller(
uint32_t numPartitions,
const std::string& spillFile,
uint32_t compressionThreshold,
arrow::MemoryPool* pool,
arrow::util::Codec* codec)
: numPartitions_(numPartitions),
spillFile_(spillFile),
compressionThreshold_(compressionThreshold),
pool_(pool),
codec_(codec) {}
arrow::Status spill(uint32_t partitionId, std::unique_ptr<BlockPayload> payload) {
ScopedTimer timer(&spillTime_);
// Check spill Type.
if (payload->type() != Payload::kUncompressed) {
return arrow::Status::Invalid(
"Cannot spill payload of type: " + payload->toString() + ", must be Payload::kUncompressed.");
}
if (!opened_) {
opened_ = true;
ARROW_ASSIGN_OR_RAISE(os_, arrow::io::FileOutputStream::Open(spillFile_, true));
diskSpill_ = std::make_unique<Spill>(Spill::SpillType::kSequentialSpill, numPartitions_, spillFile_);
}
ARROW_ASSIGN_OR_RAISE(auto start, os_->Tell());
RETURN_NOT_OK(payload->serialize(os_.get()));
// Because payload is uncompressed, no compress time.
spillTime_ += payload->getWriteTime();
ARROW_ASSIGN_OR_RAISE(auto end, os_->Tell());
DLOG(INFO) << "LocalSpiller: Spilled partition " << partitionId << " file start: " << start << ", file end: " << end
<< ", file: " << spillFile_;
auto payloadType = codec_ != nullptr && payload->numRows() >= compressionThreshold_ ? Payload::kToBeCompressed
: Payload::kUncompressed;
diskSpill_->insertPayload(
partitionId, payloadType, payload->numRows(), payload->isValidityBuffer(), end - start, pool_, codec_);
return arrow::Status::OK();
}
arrow::Result<std::shared_ptr<Spill>> finish() {
if (finished_) {
return arrow::Status::Invalid("Calling toBlockPayload() on a finished SpillEvictor.");
}
finished_ = true;
if (!opened_) {
return arrow::Status::Invalid("SpillEvictor has no data spilled.");
}
RETURN_NOT_OK(os_->Close());
return std::move(diskSpill_);
}
bool finished() const {
return finished_;
}
int64_t getSpillTime() {
return spillTime_;
}
private:
uint32_t numPartitions_;
std::string spillFile_;
uint32_t compressionThreshold_;
arrow::MemoryPool* pool_;
arrow::util::Codec* codec_;
bool opened_{false};
bool finished_{false};
std::shared_ptr<Spill> diskSpill_{nullptr};
std::shared_ptr<arrow::io::FileOutputStream> os_;
int64_t spillTime_{0};
};
class LocalPartitionWriter::PayloadMerger {
public:
PayloadMerger(
const PartitionWriterOptions& options,
arrow::MemoryPool* pool,
arrow::util::Codec* codec,
bool hasComplexType)
: pool_(pool),
codec_(codec),
hasComplexType_(hasComplexType),
compressionThreshold_(options.compressionThreshold),
mergeBufferSize_(options.mergeBufferSize),
mergeBufferMinSize_(options.mergeBufferSize * options.mergeThreshold) {}
arrow::Result<std::vector<std::unique_ptr<BlockPayload>>>
merge(uint32_t partitionId, std::unique_ptr<InMemoryPayload> append, bool reuseBuffers) {
std::vector<std::unique_ptr<BlockPayload>> merged{};
if (hasComplexType_) {
// TODO: Merging complex type is currently not supported.
merged.emplace_back();
ARROW_ASSIGN_OR_RAISE(merged.back(), createBlockPayload(std::move(append), reuseBuffers));
return merged;
}
MergeGuard mergeGuard(partitionInMerge_, partitionId);
auto cacheOrFinish = [&]() {
if (append->numRows() <= mergeBufferMinSize_) {
// Save for merge.
if (reuseBuffers) {
// This is the first append, therefore need copy.
RETURN_NOT_OK(append->copyBuffers(pool_));
}
partitionMergePayload_[partitionId] = std::move(append);
return arrow::Status::OK();
}
merged.emplace_back();
// If current buffer rows reaches merging threshold, create BlockPayload.
ARROW_ASSIGN_OR_RAISE(merged.back(), createBlockPayload(std::move(append), reuseBuffers));
return arrow::Status::OK();
};
if (!hasMerged(partitionId)) {
RETURN_NOT_OK(cacheOrFinish());
return merged;
}
auto lastPayload = std::move(partitionMergePayload_[partitionId]);
auto mergedRows = append->numRows() + lastPayload->numRows();
if (mergedRows > mergeBufferSize_ || append->numRows() > mergeBufferMinSize_) {
merged.emplace_back();
ARROW_ASSIGN_OR_RAISE(
merged.back(),
lastPayload->toBlockPayload(
codec_ != nullptr && lastPayload->numRows() >= compressionThreshold_ ? Payload::kCompressed
: Payload::kUncompressed,
pool_,
codec_));
RETURN_NOT_OK(cacheOrFinish());
return merged;
}
// Merge buffers.
DLOG(INFO) << "Merged partition: " << partitionId << ", numRows before: " << lastPayload->numRows()
<< ", numRows appended: " << append->numRows() << ", numRows after: " << mergedRows;
ARROW_ASSIGN_OR_RAISE(auto payload, InMemoryPayload::merge(std::move(lastPayload), std::move(append), pool_));
if (mergedRows < mergeBufferSize_) {
// Still not reach merging threshold, save for next merge.
partitionMergePayload_[partitionId] = std::move(payload);
return merged;
}
// mergedRows == mergeBufferSize_
merged.emplace_back();
ARROW_ASSIGN_OR_RAISE(
merged.back(),
payload->toBlockPayload(
codec_ != nullptr && payload->numRows() >= compressionThreshold_ ? Payload::kCompressed
: Payload::kUncompressed,
pool_,
codec_));
return merged;
}
arrow::Result<std::optional<std::unique_ptr<BlockPayload>>> finishForSpill(uint32_t partitionId) {
// We need to check whether the spill source is from compressing/copying the merged buffers.
if ((partitionInMerge_.has_value() && *partitionInMerge_ == partitionId) || !hasMerged(partitionId)) {
return std::nullopt;
}
auto payload = std::move(partitionMergePayload_[partitionId]);
return payload->toBlockPayload(Payload::kUncompressed, pool_, codec_);
}
arrow::Result<std::optional<std::unique_ptr<BlockPayload>>> finish(uint32_t partitionId) {
if (!hasMerged(partitionId)) {
return std::nullopt;
}
auto numRows = partitionMergePayload_[partitionId]->numRows();
// Because this is the last BlockPayload, delay the compression before writing to the final data file.
auto payloadType =
(codec_ != nullptr && numRows >= compressionThreshold_) ? Payload::kToBeCompressed : Payload::kUncompressed;
auto payload = std::move(partitionMergePayload_[partitionId]);
return payload->toBlockPayload(payloadType, pool_, codec_);
}
bool hasMerged(uint32_t partitionId) {
return partitionMergePayload_.find(partitionId) != partitionMergePayload_.end() &&
partitionMergePayload_[partitionId] != nullptr;
}
private:
arrow::MemoryPool* pool_;
arrow::util::Codec* codec_;
bool hasComplexType_;
int32_t compressionThreshold_;
int32_t mergeBufferSize_;
int32_t mergeBufferMinSize_;
std::unordered_map<uint32_t, std::unique_ptr<InMemoryPayload>> partitionMergePayload_;
std::optional<uint32_t> partitionInMerge_;
class MergeGuard {
public:
MergeGuard(std::optional<uint32_t>& partitionInMerge, uint32_t partitionId) : partitionInMerge_(partitionInMerge) {
partitionInMerge_ = partitionId;
}
~MergeGuard() {
partitionInMerge_ = std::nullopt;
}
private:
std::optional<uint32_t>& partitionInMerge_;
};
arrow::Status copyBuffers(std::vector<std::shared_ptr<arrow::Buffer>>& buffers) {
// Copy.
std::vector<std::shared_ptr<arrow::Buffer>> copies;
for (auto& buffer : buffers) {
if (!buffer) {
continue;
}
if (buffer->size() == 0) {
buffer = zeroLengthNullBuffer();
continue;
}
ARROW_ASSIGN_OR_RAISE(auto copy, arrow::AllocateResizableBuffer(buffer->size(), pool_));
memcpy(copy->mutable_data(), buffer->data(), buffer->size());
buffer = std::move(copy);
}
return arrow::Status::OK();
}
arrow::Result<std::unique_ptr<BlockPayload>> createBlockPayload(
std::unique_ptr<InMemoryPayload> inMemoryPayload,
bool reuseBuffers) {
auto createCompressed = codec_ != nullptr && inMemoryPayload->numRows() >= compressionThreshold_;
if (reuseBuffers && !createCompressed) {
// For uncompressed buffers, need to copy before caching.
RETURN_NOT_OK(inMemoryPayload->copyBuffers(pool_));
}
ARROW_ASSIGN_OR_RAISE(
auto payload,
inMemoryPayload->toBlockPayload(
createCompressed ? Payload::kCompressed : Payload::kUncompressed, pool_, codec_));
return payload;
}
};
class LocalPartitionWriter::PayloadCache {
public:
PayloadCache(uint32_t numPartitions) : numPartitions_(numPartitions) {}
arrow::Status cache(uint32_t partitionId, std::unique_ptr<BlockPayload> payload) {
if (partitionCachedPayload_.find(partitionId) == partitionCachedPayload_.end()) {
partitionCachedPayload_[partitionId] = std::list<std::unique_ptr<BlockPayload>>{};
}
partitionCachedPayload_[partitionId].push_back(std::move(payload));
return arrow::Status::OK();
}
bool hasCachedPayloads(uint32_t partitionId) {
return partitionCachedPayload_.find(partitionId) != partitionCachedPayload_.end() &&
!partitionCachedPayload_[partitionId].empty();
}
arrow::Status write(uint32_t partitionId, arrow::io::OutputStream* os) {
if (hasCachedPayloads(partitionId)) {
auto& payloads = partitionCachedPayload_[partitionId];
while (!payloads.empty()) {
auto payload = std::move(payloads.front());
payloads.pop_front();
// Write the cached payload to disk.
RETURN_NOT_OK(payload->serialize(os));
compressTime_ += payload->getCompressTime();
writeTime_ += payload->getWriteTime();
}
}
return arrow::Status::OK();
}
bool canSpill() {
for (auto pid = 0; pid < numPartitions_; ++pid) {
if (hasCachedPayloads(pid)) {
return true;
}
}
return false;
}
arrow::Result<std::shared_ptr<Spill>>
spill(const std::string& spillFile, arrow::MemoryPool* pool, arrow::util::Codec* codec) {
ScopedTimer timer(&spillTime_);
std::shared_ptr<Spill> diskSpill = nullptr;
ARROW_ASSIGN_OR_RAISE(auto os, arrow::io::FileOutputStream::Open(spillFile, true));
ARROW_ASSIGN_OR_RAISE(auto start, os->Tell());
for (uint32_t pid = 0; pid < numPartitions_; ++pid) {
if (hasCachedPayloads(pid)) {
auto& payloads = partitionCachedPayload_[pid];
while (!payloads.empty()) {
auto payload = std::move(payloads.front());
payloads.pop_front();
// Spill the cached payload to disk.
RETURN_NOT_OK(payload->serialize(os.get()));
compressTime_ += payload->getCompressTime();
spillTime_ += payload->getWriteTime();
if (UNLIKELY(!diskSpill)) {
diskSpill = std::make_unique<Spill>(Spill::SpillType::kBatchedSpill, numPartitions_, spillFile);
}
ARROW_ASSIGN_OR_RAISE(auto end, os->Tell());
DLOG(INFO) << "PayloadCache: Spilled partition " << pid << " file start: " << start << ", file end: " << end
<< ", file: " << spillFile;
diskSpill->insertPayload(
pid, payload->type(), payload->numRows(), payload->isValidityBuffer(), end - start, pool, codec);
start = end;
}
}
}
RETURN_NOT_OK(os->Close());
return diskSpill;
}
int64_t getCompressTime() {
return compressTime_;
}
int64_t getSpillTime() {
return spillTime_;
}
int64_t getWriteTime() {
return writeTime_;
}
private:
uint32_t numPartitions_;
int64_t compressTime_{0};
int64_t spillTime_{0};
int64_t writeTime_{0};
std::unordered_map<uint32_t, std::list<std::unique_ptr<BlockPayload>>> partitionCachedPayload_;
};
LocalPartitionWriter::LocalPartitionWriter(
uint32_t numPartitions,
PartitionWriterOptions options,
arrow::MemoryPool* pool,
const std::string& dataFile,
const std::vector<std::string>& localDirs)
: PartitionWriter(numPartitions, std::move(options), pool), dataFile_(dataFile), localDirs_(localDirs) {
init();
}
std::string LocalPartitionWriter::nextSpilledFileDir() {
auto spilledFileDir = getSpilledShuffleFileDir(localDirs_[dirSelection_], subDirSelection_[dirSelection_]);
subDirSelection_[dirSelection_] = (subDirSelection_[dirSelection_] + 1) % options_.numSubDirs;
dirSelection_ = (dirSelection_ + 1) % localDirs_.size();
return spilledFileDir;
}
arrow::Status LocalPartitionWriter::openDataFile() {
// open data file output stream
std::shared_ptr<arrow::io::FileOutputStream> fout;
ARROW_ASSIGN_OR_RAISE(fout, arrow::io::FileOutputStream::Open(dataFile_));
if (options_.bufferedWrite) {
// Output stream buffer is neither partition buffer memory nor ipc memory.
ARROW_ASSIGN_OR_RAISE(dataFileOs_, arrow::io::BufferedOutputStream::Create(16384, pool_, fout));
} else {
dataFileOs_ = fout;
}
return arrow::Status::OK();
}
arrow::Status LocalPartitionWriter::clearResource() {
RETURN_NOT_OK(dataFileOs_->Close());
// When bufferedWrite = true, dataFileOs_->Close doesn't release underlying buffer.
dataFileOs_.reset();
return arrow::Status::OK();
}
void LocalPartitionWriter::init() {
partitionLengths_.resize(numPartitions_, 0);
rawPartitionLengths_.resize(numPartitions_, 0);
// Shuffle the configured local directories. This prevents each task from using the same directory for spilled
// files.
std::random_device rd;
std::default_random_engine engine(rd());
std::shuffle(localDirs_.begin(), localDirs_.end(), engine);
subDirSelection_.assign(localDirs_.size(), 0);
}
arrow::Status LocalPartitionWriter::mergeSpills(uint32_t partitionId) {
auto spillId = 0;
auto spillIter = spills_.begin();
while (spillIter != spills_.end()) {
ARROW_ASSIGN_OR_RAISE(auto st, dataFileOs_->Tell());
// Read if partition exists in the spilled file and write to the final file.
while (auto payload = (*spillIter)->nextPayload(partitionId)) {
// May trigger spill during compression.
RETURN_NOT_OK(payload->serialize(dataFileOs_.get()));
compressTime_ += payload->getCompressTime();
writeTime_ += payload->getWriteTime();
}
++spillIter;
ARROW_ASSIGN_OR_RAISE(auto ed, dataFileOs_->Tell());
DLOG(INFO) << "Partition " << partitionId << " spilled from spillResult " << spillId++ << " of bytes " << ed - st;
}
return arrow::Status::OK();
}
arrow::Status LocalPartitionWriter::stop(ShuffleWriterMetrics* metrics) {
if (stopped_) {
return arrow::Status::OK();
}
stopped_ = true;
RETURN_NOT_OK(finishSpill());
// Open final data file.
// If options_.bufferedWrite is set, it will acquire 16KB memory that can trigger spill.
RETURN_NOT_OK(openDataFile());
int64_t endInFinalFile = 0;
DLOG(INFO) << "LocalPartitionWriter stopped. Total spills: " << spills_.size();
// Iterator over pid.
for (auto pid = 0; pid < numPartitions_; ++pid) {
// Record start offset.
auto startInFinalFile = endInFinalFile;
// Iterator over all spilled files.
// Reading and compressing toBeCompressed payload can trigger spill.
RETURN_NOT_OK(mergeSpills(pid));
if (payloadCache_ && payloadCache_->hasCachedPayloads(pid)) {
RETURN_NOT_OK(payloadCache_->write(pid, dataFileOs_.get()));
}
if (merger_) {
ARROW_ASSIGN_OR_RAISE(auto merged, merger_->finish(pid));
if (merged) {
// Compressing merged payload can trigger spill.
RETURN_NOT_OK((*merged)->serialize(dataFileOs_.get()));
compressTime_ += (*merged)->getCompressTime();
writeTime_ += (*merged)->getWriteTime();
}
}
ARROW_ASSIGN_OR_RAISE(endInFinalFile, dataFileOs_->Tell());
partitionLengths_[pid] = endInFinalFile - startInFinalFile;
}
for (const auto& spill : spills_) {
for (auto pid = 0; pid < numPartitions_; ++pid) {
if (spill->hasNextPayload(pid)) {
return arrow::Status::Invalid("Merging from spill is not exhausted.");
}
}
}
ARROW_ASSIGN_OR_RAISE(totalBytesWritten_, dataFileOs_->Tell());
// Close Final file. Clear buffered resources.
RETURN_NOT_OK(clearResource());
// Populate shuffle writer metrics.
RETURN_NOT_OK(populateMetrics(metrics));
return arrow::Status::OK();
}
arrow::Status LocalPartitionWriter::requestSpill() {
if (!spiller_ || spiller_->finished()) {
ARROW_ASSIGN_OR_RAISE(auto spillFile, createTempShuffleFile(nextSpilledFileDir()));
spiller_ = std::make_unique<LocalSpiller>(
numPartitions_, spillFile, options_.compressionThreshold, payloadPool_.get(), codec_.get());
}
return arrow::Status::OK();
}
arrow::Status LocalPartitionWriter::finishSpill() {
// Finish the spiller. No compression, no spill.
if (spiller_ && !spiller_->finished()) {
auto spiller = std::move(spiller_);
spills_.emplace_back();
ARROW_ASSIGN_OR_RAISE(spills_.back(), spiller->finish());
spillTime_ += spiller->getSpillTime();
}
return arrow::Status::OK();
}
arrow::Status LocalPartitionWriter::evict(
uint32_t partitionId,
std::unique_ptr<InMemoryPayload> inMemoryPayload,
Evict::type evictType,
bool reuseBuffers,
bool hasComplexType) {
rawPartitionLengths_[partitionId] += inMemoryPayload->getBufferSize();
if (evictType == Evict::kSpill) {
RETURN_NOT_OK(requestSpill());
ARROW_ASSIGN_OR_RAISE(
auto payload, inMemoryPayload->toBlockPayload(Payload::kUncompressed, payloadPool_.get(), nullptr));
RETURN_NOT_OK(spiller_->spill(partitionId, std::move(payload)));
return arrow::Status::OK();
}
if (!merger_) {
merger_ =
std::make_shared<PayloadMerger>(options_, payloadPool_.get(), codec_ ? codec_.get() : nullptr, hasComplexType);
}
ARROW_ASSIGN_OR_RAISE(auto merged, merger_->merge(partitionId, std::move(inMemoryPayload), reuseBuffers));
if (!merged.empty()) {
if (UNLIKELY(!payloadCache_)) {
payloadCache_ = std::make_shared<PayloadCache>(numPartitions_);
}
for (auto& payload : merged) {
RETURN_NOT_OK(payloadCache_->cache(partitionId, std::move(payload)));
}
merged.clear();
}
return arrow::Status::OK();
}
arrow::Status LocalPartitionWriter::reclaimFixedSize(int64_t size, int64_t* actual) {
// Finish last spiller.
RETURN_NOT_OK(finishSpill());
int64_t reclaimed = 0;
// Reclaim memory from payloadCache.
if (payloadCache_ && payloadCache_->canSpill()) {
auto beforeSpill = payloadPool_->bytes_allocated();
ARROW_ASSIGN_OR_RAISE(auto spillFile, createTempShuffleFile(nextSpilledFileDir()));
spills_.emplace_back();
ARROW_ASSIGN_OR_RAISE(spills_.back(), payloadCache_->spill(spillFile, payloadPool_.get(), codec_.get()));
reclaimed += beforeSpill - payloadPool_->bytes_allocated();
if (reclaimed >= size) {
*actual = reclaimed;
return arrow::Status::OK();
}
}
// Then spill payloads from merger. Create uncompressed payloads.
if (merger_) {
auto beforeSpill = payloadPool_->bytes_allocated();
for (auto pid = 0; pid < numPartitions_; ++pid) {
ARROW_ASSIGN_OR_RAISE(auto merged, merger_->finishForSpill(pid));
if (merged.has_value()) {
RETURN_NOT_OK(requestSpill());
RETURN_NOT_OK(spiller_->spill(pid, std::move(*merged)));
}
}
// This is not accurate. When the evicted partition buffers are not copied, the merged ones
// are resized from the original buffers thus allocated from partitionBufferPool.
reclaimed += beforeSpill - payloadPool_->bytes_allocated();
RETURN_NOT_OK(finishSpill());
}
*actual = reclaimed;
return arrow::Status::OK();
}
arrow::Status LocalPartitionWriter::populateMetrics(ShuffleWriterMetrics* metrics) {
if (payloadCache_) {
spillTime_ += payloadCache_->getSpillTime();
writeTime_ += payloadCache_->getWriteTime();
compressTime_ += payloadCache_->getCompressTime();
}
metrics->totalCompressTime += compressTime_;
metrics->totalEvictTime += spillTime_;
metrics->totalWriteTime += writeTime_;
metrics->totalBytesEvicted += totalBytesEvicted_;
metrics->totalBytesWritten += totalBytesWritten_;
metrics->partitionLengths = std::move(partitionLengths_);
metrics->rawPartitionLengths = std::move(rawPartitionLengths_);
return arrow::Status::OK();
}
} // namespace gluten