cpp/core/shuffle/LocalPartitionWriter.cc (596 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "shuffle/LocalPartitionWriter.h"
#include "shuffle/Payload.h"
#include "shuffle/Spill.h"
#include "shuffle/Utils.h"
#include "utils/Timer.h"
#include <fcntl.h>
#include <glog/logging.h>
#include <sys/stat.h>
#include <unistd.h>
#include <filesystem>
#include <random>
#include <thread>
namespace gluten {
class LocalPartitionWriter::LocalSpiller {
public:
LocalSpiller(
bool isFinal,
std::shared_ptr<arrow::io::OutputStream> os,
std::string spillFile,
int32_t compressionBufferSize,
int32_t compressionThreshold,
arrow::MemoryPool* pool,
arrow::util::Codec* codec)
: isFinal_(isFinal),
os_(os),
spillFile_(std::move(spillFile)),
compressionThreshold_(compressionThreshold),
pool_(pool),
codec_(codec),
diskSpill_(std::make_unique<Spill>()) {
if (codec_ != nullptr) {
GLUTEN_ASSIGN_OR_THROW(
compressedOs_,
ShuffleCompressedOutputStream::Make(codec_, compressionBufferSize, os, arrow::default_memory_pool()));
}
}
arrow::Status flush() {
if (flushed_) {
return arrow::Status::OK();
}
flushed_ = true;
if (compressedOs_ != nullptr) {
RETURN_NOT_OK(compressedOs_->Flush());
}
ARROW_ASSIGN_OR_RAISE(const auto pos, os_->Tell());
diskSpill_->insertPayload(lastPid_, Payload::kRaw, 0, nullptr, pos - writePos_, pool_, nullptr);
DLOG(INFO) << "LocalSpiller: Spilled partition " << lastPid_ << " file start: " << writePos_
<< ", file end: " << pos << ", file: " << spillFile_;
return arrow::Status::OK();
}
arrow::Status spill(uint32_t partitionId, std::unique_ptr<InMemoryPayload> payload) {
ScopedTimer timer(&spillTime_);
if (lastPid_ != partitionId) {
// Record the write position of the new partition.
ARROW_ASSIGN_OR_RAISE(writePos_, os_->Tell());
lastPid_ = partitionId;
}
flushed_ = false;
auto* raw = compressedOs_ != nullptr ? compressedOs_.get() : os_.get();
RETURN_NOT_OK(payload->serialize(raw));
return arrow::Status::OK();
}
arrow::Status spill(uint32_t partitionId, std::unique_ptr<BlockPayload> payload) {
// Check spill Type.
ARROW_RETURN_IF(
payload->type() == Payload::kToBeCompressed,
arrow::Status::Invalid("Cannot spill payload of type: " + payload->toString()));
ARROW_ASSIGN_OR_RAISE(auto start, os_->Tell());
RETURN_NOT_OK(payload->serialize(os_.get()));
compressTime_ += payload->getCompressTime();
spillTime_ += payload->getWriteTime();
ARROW_ASSIGN_OR_RAISE(auto end, os_->Tell());
DLOG(INFO) << "LocalSpiller: Spilled partition " << partitionId << " file start: " << start << ", file end: " << end
<< ", file: " << spillFile_;
auto payloadType = payload->type();
if (payloadType == Payload::kUncompressed && codec_ != nullptr && payload->numRows() >= compressionThreshold_) {
payloadType = Payload::kToBeCompressed;
}
diskSpill_->insertPayload(
partitionId, payloadType, payload->numRows(), payload->isValidityBuffer(), end - start, pool_, codec_);
return arrow::Status::OK();
}
arrow::Result<std::shared_ptr<Spill>> finish() {
ARROW_RETURN_IF(finished_, arrow::Status::Invalid("Calling finish() on a finished LocalSpiller."));
ARROW_RETURN_IF(os_->closed(), arrow::Status::Invalid("Spill file os has been closed."));
if (lastPid_ != -1) {
if (compressedOs_ != nullptr) {
compressTime_ = compressedOs_->compressTime();
spillTime_ -= compressTime_;
RETURN_NOT_OK(compressedOs_->Close());
}
if (!isFinal_) {
ARROW_ASSIGN_OR_RAISE(auto pos, os_->Tell());
diskSpill_->insertPayload(lastPid_, Payload::kRaw, 0, nullptr, pos - writePos_, pool_, nullptr);
DLOG(INFO) << "LocalSpiller: Spilled partition " << lastPid_ << " file start: " << writePos_
<< ", file end: " << pos << ", file: " << spillFile_;
}
}
if (!isFinal_) {
RETURN_NOT_OK(os_->Close());
}
diskSpill_->setSpillFile(spillFile_);
diskSpill_->setSpillTime(spillTime_);
diskSpill_->setCompressTime(compressTime_);
finished_ = true;
return std::move(diskSpill_);
}
bool finished() const {
return finished_;
}
private:
bool isFinal_;
std::shared_ptr<arrow::io::OutputStream> os_;
std::shared_ptr<ShuffleCompressedOutputStream> compressedOs_{nullptr};
int64_t writePos_{0};
std::string spillFile_;
int32_t compressionThreshold_;
arrow::MemoryPool* pool_;
arrow::util::Codec* codec_;
std::shared_ptr<Spill> diskSpill_{nullptr};
bool flushed_{true};
bool finished_{false};
int64_t spillTime_{0};
int64_t compressTime_{0};
int32_t lastPid_{-1};
};
class LocalPartitionWriter::PayloadMerger {
public:
PayloadMerger(const PartitionWriterOptions& options, arrow::MemoryPool* pool, arrow::util::Codec* codec)
: pool_(pool),
codec_(codec),
compressionThreshold_(options.compressionThreshold),
mergeBufferSize_(options.mergeBufferSize),
mergeBufferMinSize_(options.mergeBufferSize * options.mergeThreshold) {}
arrow::Result<std::vector<std::unique_ptr<BlockPayload>>>
merge(uint32_t partitionId, std::unique_ptr<InMemoryPayload> append, bool reuseBuffers) {
std::vector<std::unique_ptr<BlockPayload>> merged{};
if (!append->mergeable()) {
// TODO: Merging complex type is currently not supported.
merged.emplace_back();
ARROW_ASSIGN_OR_RAISE(merged.back(), createBlockPayload(std::move(append), reuseBuffers));
return merged;
}
MergeGuard mergeGuard(partitionInMerge_, partitionId);
auto cacheOrFinish = [&]() {
if (append->numRows() <= mergeBufferMinSize_) {
// Save for merge.
if (reuseBuffers) {
// This is the first append, therefore need copy.
RETURN_NOT_OK(append->copyBuffers(pool_));
}
partitionMergePayload_[partitionId] = std::move(append);
return arrow::Status::OK();
}
merged.emplace_back();
// If current buffer rows reaches merging threshold, create BlockPayload.
ARROW_ASSIGN_OR_RAISE(merged.back(), createBlockPayload(std::move(append), reuseBuffers));
return arrow::Status::OK();
};
if (!hasMerged(partitionId)) {
RETURN_NOT_OK(cacheOrFinish());
return merged;
}
auto lastPayload = std::move(partitionMergePayload_[partitionId]);
auto mergedRows = append->numRows() + lastPayload->numRows();
if (mergedRows > mergeBufferSize_ || append->numRows() > mergeBufferMinSize_) {
merged.emplace_back();
ARROW_ASSIGN_OR_RAISE(
merged.back(),
lastPayload->toBlockPayload(
codec_ != nullptr && lastPayload->numRows() >= compressionThreshold_ ? Payload::kCompressed
: Payload::kUncompressed,
pool_,
codec_));
RETURN_NOT_OK(cacheOrFinish());
return merged;
}
// Merge buffers.
DLOG(INFO) << "Merged partition: " << partitionId << ", numRows before: " << lastPayload->numRows()
<< ", numRows appended: " << append->numRows() << ", numRows after: " << mergedRows;
ARROW_ASSIGN_OR_RAISE(auto payload, InMemoryPayload::merge(std::move(lastPayload), std::move(append), pool_));
if (mergedRows < mergeBufferSize_) {
// Still not reach merging threshold, save for next merge.
partitionMergePayload_[partitionId] = std::move(payload);
return merged;
}
// mergedRows == mergeBufferSize_
merged.emplace_back();
ARROW_ASSIGN_OR_RAISE(
merged.back(),
payload->toBlockPayload(
codec_ != nullptr && payload->numRows() >= compressionThreshold_ ? Payload::kCompressed
: Payload::kUncompressed,
pool_,
codec_));
return merged;
}
arrow::Result<std::optional<std::unique_ptr<BlockPayload>>> finishForSpill(
uint32_t partitionId,
int64_t& totalBytesToEvict) {
// We need to check whether the spill source is from compressing/copying the merged buffers.
if ((partitionInMerge_.has_value() && *partitionInMerge_ == partitionId) || !hasMerged(partitionId)) {
return std::nullopt;
}
auto payload = std::move(partitionMergePayload_[partitionId]);
totalBytesToEvict += payload->rawSize();
return payload->toBlockPayload(Payload::kUncompressed, pool_, codec_);
}
arrow::Result<std::optional<std::unique_ptr<BlockPayload>>> finish(uint32_t partitionId) {
if (!hasMerged(partitionId)) {
return std::nullopt;
}
auto numRows = partitionMergePayload_[partitionId]->numRows();
// Because this is the last BlockPayload, delay the compression before writing to the final data file.
auto payloadType =
(codec_ != nullptr && numRows >= compressionThreshold_) ? Payload::kToBeCompressed : Payload::kUncompressed;
auto payload = std::move(partitionMergePayload_[partitionId]);
return payload->toBlockPayload(payloadType, pool_, codec_);
}
bool hasMerged(uint32_t partitionId) {
return partitionMergePayload_.find(partitionId) != partitionMergePayload_.end() &&
partitionMergePayload_[partitionId] != nullptr;
}
private:
arrow::MemoryPool* pool_;
arrow::util::Codec* codec_;
int32_t compressionThreshold_;
int32_t mergeBufferSize_;
int32_t mergeBufferMinSize_;
std::unordered_map<uint32_t, std::unique_ptr<InMemoryPayload>> partitionMergePayload_;
std::optional<uint32_t> partitionInMerge_;
class MergeGuard {
public:
MergeGuard(std::optional<uint32_t>& partitionInMerge, uint32_t partitionId) : partitionInMerge_(partitionInMerge) {
partitionInMerge_ = partitionId;
}
~MergeGuard() {
partitionInMerge_ = std::nullopt;
}
private:
std::optional<uint32_t>& partitionInMerge_;
};
arrow::Status copyBuffers(std::vector<std::shared_ptr<arrow::Buffer>>& buffers) {
// Copy.
std::vector<std::shared_ptr<arrow::Buffer>> copies;
for (auto& buffer : buffers) {
if (!buffer) {
continue;
}
if (buffer->size() == 0) {
buffer = zeroLengthNullBuffer();
continue;
}
ARROW_ASSIGN_OR_RAISE(auto copy, arrow::AllocateResizableBuffer(buffer->size(), pool_));
memcpy(copy->mutable_data(), buffer->data(), buffer->size());
buffer = std::move(copy);
}
return arrow::Status::OK();
}
arrow::Result<std::unique_ptr<BlockPayload>> createBlockPayload(
std::unique_ptr<InMemoryPayload> inMemoryPayload,
bool reuseBuffers) {
auto createCompressed = codec_ != nullptr && inMemoryPayload->numRows() >= compressionThreshold_;
if (reuseBuffers && !createCompressed) {
// For uncompressed buffers, need to copy before caching.
RETURN_NOT_OK(inMemoryPayload->copyBuffers(pool_));
}
ARROW_ASSIGN_OR_RAISE(
auto payload,
inMemoryPayload->toBlockPayload(
createCompressed ? Payload::kCompressed : Payload::kUncompressed, pool_, codec_));
return payload;
}
};
class LocalPartitionWriter::PayloadCache {
public:
PayloadCache(uint32_t numPartitions) : numPartitions_(numPartitions) {}
arrow::Status cache(uint32_t partitionId, std::unique_ptr<BlockPayload> payload) {
if (partitionCachedPayload_.find(partitionId) == partitionCachedPayload_.end()) {
partitionCachedPayload_[partitionId] = std::list<std::unique_ptr<BlockPayload>>{};
}
partitionCachedPayload_[partitionId].push_back(std::move(payload));
return arrow::Status::OK();
}
bool hasCachedPayloads(uint32_t partitionId) {
return partitionCachedPayload_.find(partitionId) != partitionCachedPayload_.end() &&
!partitionCachedPayload_[partitionId].empty();
}
arrow::Status write(uint32_t partitionId, arrow::io::OutputStream* os) {
if (hasCachedPayloads(partitionId)) {
auto& payloads = partitionCachedPayload_[partitionId];
while (!payloads.empty()) {
auto payload = std::move(payloads.front());
payloads.pop_front();
// Write the cached payload to disk.
RETURN_NOT_OK(payload->serialize(os));
compressTime_ += payload->getCompressTime();
writeTime_ += payload->getWriteTime();
}
}
return arrow::Status::OK();
}
bool canSpill() {
for (auto pid = 0; pid < numPartitions_; ++pid) {
if (hasCachedPayloads(pid)) {
return true;
}
}
return false;
}
arrow::Result<std::shared_ptr<Spill>> spillAndClose(
std::shared_ptr<arrow::io::OutputStream> os,
const std::string& spillFile,
arrow::MemoryPool* pool,
arrow::util::Codec* codec,
int64_t& totalBytesToEvict) {
std::shared_ptr<Spill> diskSpill = nullptr;
ARROW_ASSIGN_OR_RAISE(auto start, os->Tell());
for (uint32_t pid = 0; pid < numPartitions_; ++pid) {
if (hasCachedPayloads(pid)) {
auto& payloads = partitionCachedPayload_[pid];
while (!payloads.empty()) {
auto payload = std::move(payloads.front());
payloads.pop_front();
totalBytesToEvict += payload->rawSize();
// Spill the cached payload to disk.
RETURN_NOT_OK(payload->serialize(os.get()));
compressTime_ += payload->getCompressTime();
spillTime_ += payload->getWriteTime();
if (UNLIKELY(!diskSpill)) {
diskSpill = std::make_unique<Spill>();
}
ARROW_ASSIGN_OR_RAISE(auto end, os->Tell());
DLOG(INFO) << "PayloadCache: Spilled partition " << pid << " file start: " << start << ", file end: " << end
<< ", file: " << spillFile;
diskSpill->insertPayload(
pid, payload->type(), payload->numRows(), payload->isValidityBuffer(), end - start, pool, codec);
start = end;
}
}
}
RETURN_NOT_OK(os->Close());
diskSpill->setSpillFile(spillFile);
return diskSpill;
}
int64_t getCompressTime() const {
return compressTime_;
}
int64_t getSpillTime() const {
return spillTime_;
}
int64_t getWriteTime() const {
return writeTime_;
}
private:
uint32_t numPartitions_;
int64_t compressTime_{0};
int64_t spillTime_{0};
int64_t writeTime_{0};
std::unordered_map<uint32_t, std::list<std::unique_ptr<BlockPayload>>> partitionCachedPayload_;
};
LocalPartitionWriter::LocalPartitionWriter(
uint32_t numPartitions,
PartitionWriterOptions options,
arrow::MemoryPool* pool,
const std::string& dataFile,
const std::vector<std::string>& localDirs)
: PartitionWriter(numPartitions, std::move(options), pool), dataFile_(dataFile), localDirs_(localDirs) {
init();
}
std::string LocalPartitionWriter::nextSpilledFileDir() {
auto spilledFileDir = getShuffleSpillDir(localDirs_[dirSelection_], subDirSelection_[dirSelection_]);
subDirSelection_[dirSelection_] = (subDirSelection_[dirSelection_] + 1) % options_.numSubDirs;
dirSelection_ = (dirSelection_ + 1) % localDirs_.size();
return spilledFileDir;
}
arrow::Result<std::shared_ptr<arrow::io::OutputStream>> LocalPartitionWriter::openFile(const std::string& file) {
std::shared_ptr<arrow::io::FileOutputStream> fout;
auto fd = open(file.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0000);
// Set the shuffle file permissions to 0644 to keep it consistent with the permissions of
// the built-in shuffler manager in Spark.
fchmod(fd, 0644);
ARROW_ASSIGN_OR_RAISE(fout, arrow::io::FileOutputStream::Open(fd));
if (options_.bufferedWrite) {
// The `shuffleFileBufferSize` bytes is a temporary allocation and will be freed with file close.
// Use default memory pool and count treat the memory as executor memory overhead to avoid unnecessary spill.
return arrow::io::BufferedOutputStream::Create(options_.shuffleFileBufferSize, arrow::default_memory_pool(), fout);
}
return fout;
}
arrow::Status LocalPartitionWriter::clearResource() {
RETURN_NOT_OK(dataFileOs_->Close());
// When bufferedWrite = true, dataFileOs_->Close doesn't release underlying buffer.
dataFileOs_.reset();
return arrow::Status::OK();
}
void LocalPartitionWriter::init() {
partitionLengths_.resize(numPartitions_, 0);
rawPartitionLengths_.resize(numPartitions_, 0);
// Shuffle the configured local directories. This prevents each task from using the same directory for spilled
// files.
std::random_device rd;
std::default_random_engine engine(rd());
std::shuffle(localDirs_.begin(), localDirs_.end(), engine);
subDirSelection_.assign(localDirs_.size(), 0);
}
arrow::Result<int64_t> LocalPartitionWriter::mergeSpills(uint32_t partitionId) {
int64_t bytesEvicted = 0;
int32_t spillIndex = 0;
for (const auto& spill : spills_) {
ARROW_ASSIGN_OR_RAISE(auto startPos, dataFileOs_->Tell());
spill->openForRead(options_.shuffleFileBufferSize);
// Read if partition exists in the spilled file. Then write to the final data file.
while (auto payload = spill->nextPayload(partitionId)) {
// May trigger spill during compression.
RETURN_NOT_OK(payload->serialize(dataFileOs_.get()));
compressTime_ += payload->getCompressTime();
writeTime_ += payload->getWriteTime();
}
ARROW_ASSIGN_OR_RAISE(auto endPos, dataFileOs_->Tell());
auto bytesWritten = endPos - startPos;
DLOG(INFO) << "Partition " << partitionId << " spilled from spillResult " << spillIndex++ << " of bytes "
<< bytesWritten;
bytesEvicted += bytesWritten;
}
totalBytesEvicted_ += bytesEvicted;
return bytesEvicted;
}
arrow::Status LocalPartitionWriter::stop(ShuffleWriterMetrics* metrics) {
if (stopped_) {
return arrow::Status::OK();
}
stopped_ = true;
if (useSpillFileAsDataFile_) {
RETURN_NOT_OK(spiller_->flush());
ARROW_ASSIGN_OR_RAISE(auto spill, spiller_->finish());
// Merge the remaining partitions from spills.
if (!spills_.empty()) {
for (auto pid = lastEvictPid_ + 1; pid < numPartitions_; ++pid) {
ARROW_ASSIGN_OR_RAISE(partitionLengths_[pid], mergeSpills(pid));
}
}
for (auto pid = 0; pid < numPartitions_; ++pid) {
while (auto payload = spill->nextPayload(pid)) {
partitionLengths_[pid] += payload->rawSize();
}
}
writeTime_ = spill->spillTime();
compressTime_ += spill->compressTime();
} else {
RETURN_NOT_OK(finishSpill());
ARROW_ASSIGN_OR_RAISE(dataFileOs_, openFile(dataFile_));
int64_t endInFinalFile = 0;
DLOG(INFO) << "LocalPartitionWriter stopped. Total spills: " << spills_.size();
// Iterator over pid.
for (auto pid = 0; pid < numPartitions_; ++pid) {
// Record start offset.
auto startInFinalFile = endInFinalFile;
// Iterator over all spilled files.
// Reading and compressing toBeCompressed payload can trigger spill.
RETURN_NOT_OK(mergeSpills(pid));
if (payloadCache_ && payloadCache_->hasCachedPayloads(pid)) {
RETURN_NOT_OK(payloadCache_->write(pid, dataFileOs_.get()));
}
if (merger_) {
ARROW_ASSIGN_OR_RAISE(auto merged, merger_->finish(pid));
if (merged) {
// Compressing merged payload can trigger spill.
RETURN_NOT_OK((*merged)->serialize(dataFileOs_.get()));
compressTime_ += (*merged)->getCompressTime();
writeTime_ += (*merged)->getWriteTime();
}
}
ARROW_ASSIGN_OR_RAISE(endInFinalFile, dataFileOs_->Tell());
partitionLengths_[pid] = endInFinalFile - startInFinalFile;
}
}
// Close Final file. Clear buffered resources.
RETURN_NOT_OK(clearResource());
// Check all spills are merged.
auto s = 0;
for (const auto& spill : spills_) {
compressTime_ += spill->compressTime();
spillTime_ += spill->spillTime();
for (auto pid = 0; pid < numPartitions_; ++pid) {
if (spill->hasNextPayload(pid)) {
return arrow::Status::Invalid(
"Merging from spill " + std::to_string(s) + " is not exhausted. pid: " + std::to_string(pid));
}
}
if (std::filesystem::exists(spill->spillFile()) && !std::filesystem::remove(spill->spillFile())) {
LOG(WARNING) << "Error while deleting spill file " << spill->spillFile();
}
++s;
}
spills_.clear();
// Populate shuffle writer metrics.
RETURN_NOT_OK(populateMetrics(metrics));
return arrow::Status::OK();
}
arrow::Status LocalPartitionWriter::requestSpill(bool isFinal) {
if (!spiller_ || spiller_->finished()) {
std::string spillFile;
std::shared_ptr<arrow::io::OutputStream> os;
if (isFinal) {
// If `spill()` is requested after `stop()`, open the final data file for writing.
ARROW_ASSIGN_OR_RAISE(dataFileOs_, openFile(dataFile_));
spillFile = dataFile_;
os = dataFileOs_;
useSpillFileAsDataFile_ = true;
} else {
ARROW_ASSIGN_OR_RAISE(spillFile, createTempShuffleFile(nextSpilledFileDir()));
ARROW_ASSIGN_OR_RAISE(os, openFile(spillFile));
}
spiller_ = std::make_unique<LocalSpiller>(
isFinal,
os,
std::move(spillFile),
options_.compressionBufferSize,
options_.compressionThreshold,
payloadPool_.get(),
codec_.get());
}
return arrow::Status::OK();
}
arrow::Status LocalPartitionWriter::finishSpill() {
if (spiller_ && !spiller_->finished()) {
auto spiller = std::move(spiller_);
spills_.emplace_back();
ARROW_ASSIGN_OR_RAISE(spills_.back(), spiller->finish());
}
return arrow::Status::OK();
}
arrow::Status LocalPartitionWriter::hashEvict(
uint32_t partitionId,
std::unique_ptr<InMemoryPayload> inMemoryPayload,
Evict::type evictType,
bool reuseBuffers) {
rawPartitionLengths_[partitionId] += inMemoryPayload->rawSize();
if (evictType == Evict::kSpill) {
RETURN_NOT_OK(requestSpill(false));
ARROW_ASSIGN_OR_RAISE(
auto payload, inMemoryPayload->toBlockPayload(Payload::kUncompressed, payloadPool_.get(), nullptr));
RETURN_NOT_OK(spiller_->spill(partitionId, std::move(payload)));
return arrow::Status::OK();
}
if (!merger_) {
merger_ = std::make_shared<PayloadMerger>(options_, payloadPool_.get(), codec_ ? codec_.get() : nullptr);
}
ARROW_ASSIGN_OR_RAISE(auto merged, merger_->merge(partitionId, std::move(inMemoryPayload), reuseBuffers));
if (!merged.empty()) {
if (UNLIKELY(!payloadCache_)) {
payloadCache_ = std::make_shared<PayloadCache>(numPartitions_);
}
for (auto& payload : merged) {
RETURN_NOT_OK(payloadCache_->cache(partitionId, std::move(payload)));
}
merged.clear();
}
return arrow::Status::OK();
}
arrow::Status
LocalPartitionWriter::sortEvict(uint32_t partitionId, std::unique_ptr<InMemoryPayload> inMemoryPayload, bool isFinal) {
rawPartitionLengths_[partitionId] += inMemoryPayload->rawSize();
if (lastEvictPid_ != -1 && (partitionId < lastEvictPid_ || (isFinal && !dataFileOs_))) {
lastEvictPid_ = -1;
RETURN_NOT_OK(finishSpill());
}
RETURN_NOT_OK(requestSpill(isFinal));
if (lastEvictPid_ != partitionId) {
// Flush the remaining data for lastEvictPid_.
RETURN_NOT_OK(spiller_->flush());
// For final data file, merge all spills for partitions in [lastEvictPid_ + 1, partitionId]. Note in this function,
// only the spilled partitions before partitionId are merged. Therefore, the remaining partitions after partitionId
// are not merged here and will be merged in `stop()`.
if (isFinal && !spills_.empty()) {
for (auto pid = lastEvictPid_ + 1; pid <= partitionId; ++pid) {
ARROW_ASSIGN_OR_RAISE(partitionLengths_[pid], mergeSpills(pid));
}
}
}
RETURN_NOT_OK(spiller_->spill(partitionId, std::move(inMemoryPayload)));
lastEvictPid_ = partitionId;
return arrow::Status::OK();
}
arrow::Status LocalPartitionWriter::reclaimFixedSize(int64_t size, int64_t* actual) {
// Finish last spiller.
RETURN_NOT_OK(finishSpill());
int64_t reclaimed = 0;
// Reclaim memory from payloadCache.
if (payloadCache_ && payloadCache_->canSpill()) {
auto beforeSpill = payloadPool_->bytes_allocated();
ARROW_ASSIGN_OR_RAISE(auto spillFile, createTempShuffleFile(nextSpilledFileDir()));
ARROW_ASSIGN_OR_RAISE(auto os, openFile(spillFile));
spills_.emplace_back();
ARROW_ASSIGN_OR_RAISE(
spills_.back(),
payloadCache_->spillAndClose(os, spillFile, payloadPool_.get(), codec_.get(), totalBytesToEvict_));
reclaimed += beforeSpill - payloadPool_->bytes_allocated();
if (reclaimed >= size) {
*actual = reclaimed;
return arrow::Status::OK();
}
}
// Then spill payloads from merger. Create uncompressed payloads.
if (merger_) {
auto beforeSpill = payloadPool_->bytes_allocated();
for (auto pid = 0; pid < numPartitions_; ++pid) {
ARROW_ASSIGN_OR_RAISE(auto merged, merger_->finishForSpill(pid, totalBytesToEvict_));
if (merged.has_value()) {
RETURN_NOT_OK(requestSpill(false));
RETURN_NOT_OK(spiller_->spill(pid, std::move(*merged)));
}
}
// This is not accurate. When the evicted partition buffers are not copied, the merged ones
// are resized from the original buffers thus allocated from partitionBufferPool.
reclaimed += beforeSpill - payloadPool_->bytes_allocated();
RETURN_NOT_OK(finishSpill());
}
*actual = reclaimed;
return arrow::Status::OK();
}
arrow::Status LocalPartitionWriter::populateMetrics(ShuffleWriterMetrics* metrics) {
if (payloadCache_) {
spillTime_ += payloadCache_->getSpillTime();
writeTime_ += payloadCache_->getWriteTime();
compressTime_ += payloadCache_->getCompressTime();
}
metrics->totalCompressTime += compressTime_;
metrics->totalEvictTime += spillTime_;
metrics->totalWriteTime += writeTime_;
metrics->totalBytesToEvict += totalBytesToEvict_;
metrics->totalBytesEvicted += totalBytesEvicted_;
metrics->totalBytesWritten += std::filesystem::file_size(dataFile_);
metrics->partitionLengths = std::move(partitionLengths_);
metrics->rawPartitionLengths = std::move(rawPartitionLengths_);
return arrow::Status::OK();
}
} // namespace gluten