cpp/core/shuffle/Utils.cc (265 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "shuffle/Utils.h"
#include <arrow/record_batch.h>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <fcntl.h>
#include <iomanip>
#include <iostream>
#include <numeric>
#include <sstream>
#include <thread>
#include "shuffle/Options.h"
#include "utils/Timer.h"
namespace gluten {
namespace {
arrow::Result<std::shared_ptr<arrow::Array>> makeNullBinaryArray(
std::shared_ptr<arrow::DataType> type,
arrow::MemoryPool* pool) {
ARROW_ASSIGN_OR_RAISE(auto offsetBuffer, arrow::AllocateResizableBuffer(kSizeOfIpcOffsetBuffer << 1, pool));
// set the first offset to 0, and set the value offset
uint8_t* offsetaddr = offsetBuffer->mutable_data();
memset(offsetaddr, 0, kSizeOfIpcOffsetBuffer);
// second value offset 0
memset(offsetaddr + kSizeOfIpcOffsetBuffer, 0, kSizeOfIpcOffsetBuffer);
// If it is not compressed array, null valueBuffer
// worked, but if compress, will core dump at buffer::size(), so replace by kNullBuffer
static std::shared_ptr<arrow::Buffer> kNullBuffer = std::make_shared<arrow::Buffer>(nullptr, 0);
return arrow::MakeArray(arrow::ArrayData::Make(type, 1, {nullptr, std::move(offsetBuffer), kNullBuffer}));
}
arrow::Result<std::shared_ptr<arrow::Array>> makeBinaryArray(
std::shared_ptr<arrow::DataType> type,
std::shared_ptr<arrow::Buffer> valueBuffer,
arrow::MemoryPool* pool) {
if (valueBuffer == nullptr) {
return makeNullBinaryArray(type, pool);
}
ARROW_ASSIGN_OR_RAISE(auto offsetBuffer, arrow::AllocateResizableBuffer(kSizeOfIpcOffsetBuffer << 1, pool));
// set the first offset to 0, and set the value offset
uint8_t* offsetaddr = offsetBuffer->mutable_data();
memset(offsetaddr, 0, kSizeOfIpcOffsetBuffer);
int64_t length = valueBuffer->size();
memcpy(offsetaddr + kSizeOfIpcOffsetBuffer, reinterpret_cast<uint8_t*>(&length), kSizeOfIpcOffsetBuffer);
return arrow::MakeArray(arrow::ArrayData::Make(type, 1, {nullptr, std::move(offsetBuffer), valueBuffer}));
}
// Length buffer layout |compressionMode|buffers.size()|buffer1 unCompressedLength|buffer1 compressedLength| buffer2...
arrow::Status getLengthBufferAndValueBufferOneByOne(
const std::vector<std::shared_ptr<arrow::Buffer>>& buffers,
arrow::MemoryPool* pool,
arrow::util::Codec* codec,
std::shared_ptr<arrow::ResizableBuffer>& lengthBuffer,
std::shared_ptr<arrow::ResizableBuffer>& valueBuffer) {
ARROW_ASSIGN_OR_RAISE(
lengthBuffer, arrow::AllocateResizableBuffer((1 + 1 + buffers.size() * 2) * sizeof(int64_t), pool));
auto lengthBufferPtr = (int64_t*)(lengthBuffer->mutable_data());
// Write compression mode.
*lengthBufferPtr++ = CompressionMode::BUFFER;
// Write number of buffers.
*lengthBufferPtr++ = buffers.size();
int64_t compressedBufferMaxSize = getMaxCompressedBufferSize(buffers, codec);
ARROW_ASSIGN_OR_RAISE(valueBuffer, arrow::AllocateResizableBuffer(compressedBufferMaxSize, pool));
int64_t compressValueOffset = 0;
for (auto& buffer : buffers) {
if (buffer != nullptr && buffer->size() != 0) {
int64_t actualLength;
int64_t maxLength = codec->MaxCompressedLen(buffer->size(), nullptr);
ARROW_ASSIGN_OR_RAISE(
actualLength,
codec->Compress(
buffer->size(), buffer->data(), maxLength, valueBuffer->mutable_data() + compressValueOffset));
compressValueOffset += actualLength;
*lengthBufferPtr++ = buffer->size();
*lengthBufferPtr++ = actualLength;
} else {
*lengthBufferPtr++ = 0;
*lengthBufferPtr++ = 0;
}
}
RETURN_NOT_OK(valueBuffer->Resize(compressValueOffset, /*shrink*/ true));
return arrow::Status::OK();
}
// Length buffer layout |compressionMode|buffer unCompressedLength|buffer compressedLength|buffers.size()| buffer1 size
// | buffer2 size
arrow::Status getLengthBufferAndValueBufferStream(
const std::vector<std::shared_ptr<arrow::Buffer>>& buffers,
arrow::MemoryPool* pool,
arrow::util::Codec* codec,
std::shared_ptr<arrow::ResizableBuffer>& lengthBuffer,
std::shared_ptr<arrow::ResizableBuffer>& compressedBuffer) {
ARROW_ASSIGN_OR_RAISE(lengthBuffer, arrow::AllocateResizableBuffer((1 + 3 + buffers.size()) * sizeof(int64_t), pool));
auto originalBufferSize = getBufferSize(buffers);
// because 64B align, uncompressedBuffer size maybe bigger than unCompressedBufferSize which is
// getBuffersSize(buffers), then cannot use this size
ARROW_ASSIGN_OR_RAISE(auto uncompressedBuffer, arrow::AllocateResizableBuffer(originalBufferSize, pool));
int64_t uncompressedSize = uncompressedBuffer->size();
auto lengthBufferPtr = (int64_t*)(lengthBuffer->mutable_data());
// First write metadata.
// Write compression mode.
*lengthBufferPtr++ = CompressionMode::ROWVECTOR;
// Store uncompressed size.
*lengthBufferPtr++ = uncompressedSize; // uncompressedLength
// Skip compressed size and update later.
auto compressedLengthPtr = lengthBufferPtr++;
// Store number of buffers.
*lengthBufferPtr++ = buffers.size();
int64_t compressValueOffset = 0;
for (auto& buffer : buffers) {
// Copy all buffers into one big buffer.
if (buffer != nullptr && buffer->size() != 0) {
*lengthBufferPtr++ = buffer->size();
memcpy(uncompressedBuffer->mutable_data() + compressValueOffset, buffer->data(), buffer->size());
compressValueOffset += buffer->size();
} else {
*lengthBufferPtr++ = 0;
}
}
// Compress the big buffer.
int64_t maxLength = codec->MaxCompressedLen(uncompressedSize, nullptr);
ARROW_ASSIGN_OR_RAISE(compressedBuffer, arrow::AllocateResizableBuffer(maxLength, pool));
ARROW_ASSIGN_OR_RAISE(
int64_t actualLength,
codec->Compress(uncompressedSize, uncompressedBuffer->data(), maxLength, compressedBuffer->mutable_data()));
RETURN_NOT_OK(compressedBuffer->Resize(actualLength, /*shrink*/ true));
// Update compressed size.
*compressedLengthPtr = actualLength;
return arrow::Status::OK();
}
} // namespace
arrow::Result<std::shared_ptr<arrow::RecordBatch>> makeCompressedRecordBatch(
uint32_t numRows,
const std::vector<std::shared_ptr<arrow::Buffer>>& buffers,
const std::shared_ptr<arrow::Schema> compressWriteSchema,
arrow::MemoryPool* pool,
arrow::util::Codec* codec,
int32_t bufferCompressThreshold,
CompressionMode compressionMode,
int64_t& compressionTime) {
ScopedTimer timer{&compressionTime};
std::vector<std::shared_ptr<arrow::Array>> arrays;
// header col, numRows, compressionType
{
ARROW_ASSIGN_OR_RAISE(auto headerBuffer, arrow::AllocateResizableBuffer(sizeof(uint32_t) + sizeof(int32_t), pool));
memcpy(headerBuffer->mutable_data(), &numRows, sizeof(uint32_t));
int32_t compressType = static_cast<int32_t>(codec->compression_type());
memcpy(headerBuffer->mutable_data() + sizeof(uint32_t), &compressType, sizeof(int32_t));
arrays.emplace_back();
ARROW_ASSIGN_OR_RAISE(
arrays.back(), makeBinaryArray(compressWriteSchema->field(0)->type(), std::move(headerBuffer), pool));
}
std::shared_ptr<arrow::ResizableBuffer> lengthBuffer;
std::shared_ptr<arrow::ResizableBuffer> valueBuffer;
if (compressionMode == CompressionMode::BUFFER && numRows > bufferCompressThreshold) {
RETURN_NOT_OK(getLengthBufferAndValueBufferOneByOne(buffers, pool, codec, lengthBuffer, valueBuffer));
} else {
RETURN_NOT_OK(getLengthBufferAndValueBufferStream(buffers, pool, codec, lengthBuffer, valueBuffer));
}
arrays.emplace_back();
ARROW_ASSIGN_OR_RAISE(arrays.back(), makeBinaryArray(compressWriteSchema->field(1)->type(), lengthBuffer, pool));
arrays.emplace_back();
ARROW_ASSIGN_OR_RAISE(arrays.back(), makeBinaryArray(compressWriteSchema->field(2)->type(), valueBuffer, pool));
return arrow::RecordBatch::Make(compressWriteSchema, 1, {arrays});
}
arrow::Result<std::shared_ptr<arrow::RecordBatch>> makeUncompressedRecordBatch(
uint32_t numRows,
const std::vector<std::shared_ptr<arrow::Buffer>>& buffers,
const std::shared_ptr<arrow::Schema> writeSchema,
arrow::MemoryPool* pool) {
std::vector<std::shared_ptr<arrow::Array>> arrays;
// header col, numRows, compressionType
{
ARROW_ASSIGN_OR_RAISE(auto headerBuffer, arrow::AllocateResizableBuffer(sizeof(uint32_t) + sizeof(int32_t), pool));
memcpy(headerBuffer->mutable_data(), &numRows, sizeof(uint32_t));
int32_t compressType = static_cast<int32_t>(arrow::Compression::type::UNCOMPRESSED);
memcpy(headerBuffer->mutable_data() + sizeof(uint32_t), &compressType, sizeof(int32_t));
arrays.emplace_back();
ARROW_ASSIGN_OR_RAISE(arrays.back(), makeBinaryArray(writeSchema->field(0)->type(), std::move(headerBuffer), pool));
}
int32_t bufferNum = writeSchema->num_fields() - 1;
for (int32_t i = 0; i < bufferNum; i++) {
arrays.emplace_back();
ARROW_ASSIGN_OR_RAISE(arrays.back(), makeBinaryArray(writeSchema->field(i + 1)->type(), buffers[i], pool));
}
return arrow::RecordBatch::Make(writeSchema, 1, {arrays});
}
} // namespace gluten
std::string gluten::generateUuid() {
boost::uuids::random_generator generator;
return boost::uuids::to_string(generator());
}
std::string gluten::getSpilledShuffleFileDir(const std::string& configuredDir, int32_t subDirId) {
auto fs = std::make_shared<arrow::fs::LocalFileSystem>();
std::stringstream ss;
ss << std::setfill('0') << std::setw(2) << std::hex << subDirId;
auto dir = arrow::fs::internal::ConcatAbstractPath(configuredDir, ss.str());
return dir;
}
arrow::Result<std::string> gluten::createTempShuffleFile(const std::string& dir) {
if (dir.length() == 0) {
return arrow::Status::Invalid("Failed to create spilled file, got empty path.");
}
auto fs = std::make_shared<arrow::fs::LocalFileSystem>();
ARROW_ASSIGN_OR_RAISE(auto path_info, fs->GetFileInfo(dir));
if (path_info.type() == arrow::fs::FileType::NotFound) {
RETURN_NOT_OK(fs->CreateDir(dir, true));
}
bool exist = true;
std::string filePath;
while (exist) {
filePath = arrow::fs::internal::ConcatAbstractPath(dir, "temp_shuffle_" + generateUuid());
ARROW_ASSIGN_OR_RAISE(auto file_info, fs->GetFileInfo(filePath));
if (file_info.type() == arrow::fs::FileType::NotFound) {
int fd = open(filePath.c_str(), O_CREAT | O_EXCL | O_RDWR, 0666);
if (fd < 0) {
if (errno != EEXIST) {
return arrow::Status::IOError("Failed to open local file " + filePath + ", Reason: " + strerror(errno));
}
} else {
exist = false;
close(fd);
}
}
}
return filePath;
}
arrow::Result<std::vector<std::shared_ptr<arrow::DataType>>> gluten::toShuffleTypeId(
const std::vector<std::shared_ptr<arrow::Field>>& fields) {
std::vector<std::shared_ptr<arrow::DataType>> shuffleTypeId;
for (auto field : fields) {
switch (field->type()->id()) {
case arrow::BooleanType::type_id:
case arrow::Int8Type::type_id:
case arrow::UInt8Type::type_id:
case arrow::Int16Type::type_id:
case arrow::UInt16Type::type_id:
case arrow::HalfFloatType::type_id:
case arrow::Int32Type::type_id:
case arrow::UInt32Type::type_id:
case arrow::FloatType::type_id:
case arrow::Date32Type::type_id:
case arrow::Time32Type::type_id:
case arrow::Int64Type::type_id:
case arrow::UInt64Type::type_id:
case arrow::DoubleType::type_id:
case arrow::Date64Type::type_id:
case arrow::Time64Type::type_id:
case arrow::TimestampType::type_id:
case arrow::BinaryType::type_id:
case arrow::StringType::type_id:
case arrow::LargeBinaryType::type_id:
case arrow::LargeStringType::type_id:
case arrow::StructType::type_id:
case arrow::MapType::type_id:
case arrow::ListType::type_id:
case arrow::LargeListType::type_id:
case arrow::Decimal128Type::type_id:
case arrow::NullType::type_id:
shuffleTypeId.push_back(field->type());
break;
default:
RETURN_NOT_OK(arrow::Status::NotImplemented(
"Field type not implemented in ColumnarShuffle, type is ", field->type()->ToString()));
}
}
return shuffleTypeId;
}
int64_t gluten::getBufferSize(const std::shared_ptr<arrow::Array>& array) {
return gluten::getBufferSize(array->data()->buffers);
}
int64_t gluten::getBufferSize(const std::vector<std::shared_ptr<arrow::Buffer>>& buffers) {
return std::accumulate(
std::cbegin(buffers), std::cend(buffers), 0LL, [](int64_t sum, const std::shared_ptr<arrow::Buffer>& buf) {
return buf == nullptr ? sum : sum + buf->size();
});
}
int64_t gluten::getMaxCompressedBufferSize(
const std::vector<std::shared_ptr<arrow::Buffer>>& buffers,
arrow::util::Codec* codec) {
int64_t totalSize = 0;
for (auto& buffer : buffers) {
if (buffer != nullptr && buffer->size() != 0) {
totalSize += codec->MaxCompressedLen(buffer->size(), nullptr);
}
}
return totalSize;
}
std::shared_ptr<arrow::Buffer> gluten::zeroLengthNullBuffer() {
static std::shared_ptr<arrow::Buffer> kNullBuffer = std::make_shared<arrow::Buffer>(nullptr, 0);
return kNullBuffer;
}