cpp/velox/shuffle/VeloxShuffleReader.cc (584 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "shuffle/VeloxShuffleReader.h"
#include <arrow/array/array_binary.h>
#include <arrow/io/buffered.h>
#include <arrow/io/compressed.h>
#include <velox/common/caching/AsyncDataCache.h>
#include "memory/VeloxColumnarBatch.h"
#include "shuffle/GlutenByteStream.h"
#include "shuffle/Payload.h"
#include "shuffle/Utils.h"
#include "utils/Common.h"
#include "utils/Compression.h"
#include "utils/Macros.h"
#include "utils/Timer.h"
#include "utils/VeloxArrowUtils.h"
#include "velox/row/CompactRow.h"
#include "velox/serializers/PrestoSerializer.h"
#include "velox/vector/ComplexVector.h"
#include "velox/vector/FlatVector.h"
#include "velox/vector/arrow/Bridge.h"
#include <algorithm>
using namespace facebook::velox;
namespace gluten {
namespace {
struct BufferViewReleaser {
BufferViewReleaser() : BufferViewReleaser(nullptr) {}
BufferViewReleaser(std::shared_ptr<arrow::Buffer> arrowBuffer) : bufferReleaser_(std::move(arrowBuffer)) {}
void addRef() const {}
void release() const {}
private:
const std::shared_ptr<arrow::Buffer> bufferReleaser_;
};
BufferPtr wrapInBufferViewAsOwner(const void* buffer, size_t length, std::shared_ptr<arrow::Buffer> bufferReleaser) {
return BufferView<BufferViewReleaser>::create(
static_cast<const uint8_t*>(buffer), length, {std::move(bufferReleaser)});
}
BufferPtr convertToVeloxBuffer(std::shared_ptr<arrow::Buffer> buffer) {
if (buffer == nullptr) {
return nullptr;
}
return wrapInBufferViewAsOwner(buffer->data(), buffer->size(), buffer);
}
template <TypeKind kind>
VectorPtr readFlatVector(
std::vector<BufferPtr>& buffers,
int32_t& bufferIdx,
uint32_t length,
std::shared_ptr<const Type> type,
memory::MemoryPool* pool) {
auto nulls = buffers[bufferIdx++];
auto values = buffers[bufferIdx++];
std::vector<BufferPtr> stringBuffers;
using T = typename TypeTraits<kind>::NativeType;
if (nulls == nullptr || nulls->size() == 0) {
return std::make_shared<FlatVector<T>>(
pool, type, BufferPtr(nullptr), length, std::move(values), std::move(stringBuffers));
}
return std::make_shared<FlatVector<T>>(
pool, type, std::move(nulls), length, std::move(values), std::move(stringBuffers));
}
template <>
VectorPtr readFlatVector<TypeKind::UNKNOWN>(
std::vector<BufferPtr>& buffers,
int32_t& bufferIdx,
uint32_t length,
std::shared_ptr<const Type> type,
memory::MemoryPool* pool) {
return BaseVector::createNullConstant(type, length, pool);
}
template <>
VectorPtr readFlatVector<TypeKind::HUGEINT>(
std::vector<BufferPtr>& buffers,
int32_t& bufferIdx,
uint32_t length,
std::shared_ptr<const Type> type,
memory::MemoryPool* pool) {
auto nulls = buffers[bufferIdx++];
auto valueBuffer = buffers[bufferIdx++];
// Because if buffer does not compress, it will get from netty, the address maynot aligned 16B, which will cause
// int128_t = xxx coredump by instruction movdqa
auto data = valueBuffer->as<int128_t>();
BufferPtr values;
if ((reinterpret_cast<uintptr_t>(data) & 0xf) == 0) {
values = valueBuffer;
} else {
values = AlignedBuffer::allocate<char>(valueBuffer->size(), pool);
gluten::fastCopy(values->asMutable<char>(), valueBuffer->as<char>(), valueBuffer->size());
}
std::vector<BufferPtr> stringBuffers;
if (nulls == nullptr || nulls->size() == 0) {
auto vp = std::make_shared<FlatVector<int128_t>>(
pool, type, BufferPtr(nullptr), length, std::move(values), std::move(stringBuffers));
return vp;
}
return std::make_shared<FlatVector<int128_t>>(
pool, type, std::move(nulls), length, std::move(values), std::move(stringBuffers));
}
VectorPtr readFlatVectorStringView(
std::vector<BufferPtr>& buffers,
int32_t& bufferIdx,
uint32_t length,
std::shared_ptr<const Type> type,
memory::MemoryPool* pool) {
auto nulls = buffers[bufferIdx++];
auto lengthBuffer = buffers[bufferIdx++];
auto valueBuffer = buffers[bufferIdx++];
const auto* rawLength = lengthBuffer->as<BinaryArrayLengthBufferType>();
std::vector<BufferPtr> stringBuffers;
auto values = AlignedBuffer::allocate<char>(sizeof(StringView) * length, pool);
auto rawValues = values->asMutable<StringView>();
auto rawChars = valueBuffer->as<char>();
uint64_t offset = 0;
for (int32_t i = 0; i < length; ++i) {
rawValues[i] = StringView(rawChars + offset, rawLength[i]);
offset += rawLength[i];
}
stringBuffers.emplace_back(valueBuffer);
if (nulls == nullptr || nulls->size() == 0) {
return std::make_shared<FlatVector<StringView>>(
pool, type, BufferPtr(nullptr), length, std::move(values), std::move(stringBuffers));
}
return std::make_shared<FlatVector<StringView>>(
pool, type, std::move(nulls), length, std::move(values), std::move(stringBuffers));
}
template <>
VectorPtr readFlatVector<TypeKind::VARCHAR>(
std::vector<BufferPtr>& buffers,
int32_t& bufferIdx,
uint32_t length,
std::shared_ptr<const Type> type,
memory::MemoryPool* pool) {
return readFlatVectorStringView(buffers, bufferIdx, length, type, pool);
}
template <>
VectorPtr readFlatVector<TypeKind::VARBINARY>(
std::vector<BufferPtr>& buffers,
int32_t& bufferIdx,
uint32_t length,
std::shared_ptr<const Type> type,
memory::MemoryPool* pool) {
return readFlatVectorStringView(buffers, bufferIdx, length, type, pool);
}
std::unique_ptr<ByteInputStream> toByteStream(uint8_t* data, int32_t size) {
std::vector<ByteRange> byteRanges;
byteRanges.push_back(ByteRange{data, size, 0});
auto byteStream = std::make_unique<BufferInputStream>(byteRanges);
return byteStream;
}
RowVectorPtr readComplexType(BufferPtr buffer, RowTypePtr& rowType, memory::MemoryPool* pool) {
RowVectorPtr result;
auto byteStream = toByteStream(const_cast<uint8_t*>(buffer->as<uint8_t>()), buffer->size());
auto serde = std::make_unique<serializer::presto::PrestoVectorSerde>();
serializer::presto::PrestoVectorSerde::PrestoOptions options;
options.useLosslessTimestamp = true;
serde->deserialize(byteStream.get(), pool, rowType, &result, &options);
return result;
}
RowTypePtr getComplexWriteType(const std::vector<TypePtr>& types) {
std::vector<std::string> complexTypeColNames;
std::vector<TypePtr> complexTypeChildrens;
for (int32_t i = 0; i < types.size(); ++i) {
auto kind = types[i]->kind();
switch (kind) {
case TypeKind::ROW:
case TypeKind::MAP:
case TypeKind::ARRAY: {
complexTypeColNames.emplace_back(types[i]->name());
complexTypeChildrens.emplace_back(types[i]);
} break;
default:
break;
}
}
return std::make_shared<const RowType>(std::move(complexTypeColNames), std::move(complexTypeChildrens));
}
void readColumns(
std::vector<BufferPtr>& buffers,
memory::MemoryPool* pool,
uint32_t numRows,
const std::vector<TypePtr>& types,
std::vector<VectorPtr>& result) {
int32_t bufferIdx = 0;
std::vector<VectorPtr> complexChildren;
auto complexRowType = getComplexWriteType(types);
if (complexRowType->children().size() > 0) {
complexChildren = readComplexType(buffers[buffers.size() - 1], complexRowType, pool)->children();
}
int32_t complexIdx = 0;
for (int32_t i = 0; i < types.size(); ++i) {
auto kind = types[i]->kind();
switch (kind) {
case TypeKind::ROW:
case TypeKind::MAP:
case TypeKind::ARRAY: {
result.emplace_back(std::move(complexChildren[complexIdx]));
complexIdx++;
} break;
default: {
auto res = VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL(
readFlatVector, types[i]->kind(), buffers, bufferIdx, numRows, types[i], pool);
result.emplace_back(std::move(res));
} break;
}
}
}
RowVectorPtr deserialize(RowTypePtr type, uint32_t numRows, std::vector<BufferPtr>& buffers, memory::MemoryPool* pool) {
std::vector<VectorPtr> children;
auto childTypes = type->as<TypeKind::ROW>().children();
readColumns(buffers, pool, numRows, childTypes, children);
return std::make_shared<RowVector>(pool, type, BufferPtr(nullptr), numRows, children);
}
std::shared_ptr<VeloxColumnarBatch> makeColumnarBatch(
RowTypePtr type,
uint32_t numRows,
std::vector<std::shared_ptr<arrow::Buffer>> arrowBuffers,
memory::MemoryPool* pool,
int64_t& deserializeTime) {
ScopedTimer timer(&deserializeTime);
std::vector<BufferPtr> veloxBuffers;
veloxBuffers.reserve(arrowBuffers.size());
for (auto& buffer : arrowBuffers) {
veloxBuffers.push_back(convertToVeloxBuffer(std::move(buffer)));
}
auto rowVector = deserialize(type, numRows, veloxBuffers, pool);
return std::make_shared<VeloxColumnarBatch>(std::move(rowVector));
}
std::shared_ptr<VeloxColumnarBatch> makeColumnarBatch(
RowTypePtr type,
std::unique_ptr<InMemoryPayload> payload,
memory::MemoryPool* pool,
int64_t& deserializeTime) {
ScopedTimer timer(&deserializeTime);
std::vector<BufferPtr> veloxBuffers;
auto numBuffers = payload->numBuffers();
veloxBuffers.reserve(numBuffers);
for (size_t i = 0; i < numBuffers; ++i) {
GLUTEN_ASSIGN_OR_THROW(auto buffer, payload->readBufferAt(i));
veloxBuffers.push_back(convertToVeloxBuffer(std::move(buffer)));
}
auto rowVector = deserialize(type, payload->numRows(), veloxBuffers, pool);
return std::make_shared<VeloxColumnarBatch>(std::move(rowVector));
}
} // namespace
VeloxHashShuffleReaderDeserializer::VeloxHashShuffleReaderDeserializer(
std::shared_ptr<arrow::io::InputStream> in,
const std::shared_ptr<arrow::Schema>& schema,
const std::shared_ptr<arrow::util::Codec>& codec,
const facebook::velox::RowTypePtr& rowType,
int32_t batchSize,
int64_t bufferSize,
arrow::MemoryPool* memoryPool,
facebook::velox::memory::MemoryPool* veloxPool,
std::vector<bool>* isValidityBuffer,
bool hasComplexType,
int64_t& deserializeTime,
int64_t& decompressTime)
: schema_(schema),
codec_(codec),
rowType_(rowType),
batchSize_(batchSize),
memoryPool_(memoryPool),
veloxPool_(veloxPool),
isValidityBuffer_(isValidityBuffer),
hasComplexType_(hasComplexType),
deserializeTime_(deserializeTime),
decompressTime_(decompressTime) {
GLUTEN_ASSIGN_OR_THROW(in_, arrow::io::BufferedInputStream::Create(bufferSize, memoryPool, std::move(in)));
}
std::shared_ptr<ColumnarBatch> VeloxHashShuffleReaderDeserializer::next() {
if (hasComplexType_) {
uint32_t numRows = 0;
GLUTEN_ASSIGN_OR_THROW(
auto arrowBuffers,
BlockPayload::deserialize(in_.get(), codec_, memoryPool_, numRows, deserializeTime_, decompressTime_));
if (arrowBuffers.empty()) {
// Reach EOS.
return nullptr;
}
return makeColumnarBatch(rowType_, numRows, std::move(arrowBuffers), veloxPool_, deserializeTime_);
}
if (reachEos_) {
if (merged_) {
return makeColumnarBatch(rowType_, std::move(merged_), veloxPool_, deserializeTime_);
}
return nullptr;
}
std::vector<std::shared_ptr<arrow::Buffer>> arrowBuffers{};
uint32_t numRows = 0;
while (!merged_ || merged_->numRows() < batchSize_) {
GLUTEN_ASSIGN_OR_THROW(
arrowBuffers,
BlockPayload::deserialize(in_.get(), codec_, memoryPool_, numRows, deserializeTime_, decompressTime_));
if (arrowBuffers.empty()) {
reachEos_ = true;
break;
}
if (!merged_) {
merged_ = std::make_unique<InMemoryPayload>(numRows, isValidityBuffer_, std::move(arrowBuffers));
arrowBuffers.clear();
continue;
}
auto mergedRows = merged_->numRows() + numRows;
if (mergedRows > batchSize_) {
break;
}
auto append = std::make_unique<InMemoryPayload>(numRows, isValidityBuffer_, std::move(arrowBuffers));
GLUTEN_ASSIGN_OR_THROW(merged_, InMemoryPayload::merge(std::move(merged_), std::move(append), memoryPool_));
arrowBuffers.clear();
}
// Reach EOS.
if (reachEos_ && !merged_) {
return nullptr;
}
auto columnarBatch = makeColumnarBatch(rowType_, std::move(merged_), veloxPool_, deserializeTime_);
// Save remaining rows.
if (!arrowBuffers.empty()) {
merged_ = std::make_unique<InMemoryPayload>(numRows, isValidityBuffer_, std::move(arrowBuffers));
}
return columnarBatch;
}
VeloxSortShuffleReaderDeserializer::VeloxSortShuffleReaderDeserializer(
std::shared_ptr<arrow::io::InputStream> in,
const std::shared_ptr<arrow::Schema>& schema,
const std::shared_ptr<arrow::util::Codec>& codec,
const RowTypePtr& rowType,
int32_t batchSize,
int64_t readerBufferSize,
int64_t deserializerBufferSize,
arrow::MemoryPool* memoryPool,
facebook::velox::memory::MemoryPool* veloxPool,
int64_t& deserializeTime,
int64_t& decompressTime)
: schema_(schema),
codec_(codec),
rowType_(rowType),
batchSize_(batchSize),
deserializerBufferSize_(deserializerBufferSize),
deserializeTime_(deserializeTime),
decompressTime_(decompressTime),
veloxPool_(veloxPool) {
if (codec_ != nullptr) {
GLUTEN_ASSIGN_OR_THROW(in_, CompressedInputStream::Make(codec_.get(), std::move(in), memoryPool));
} else {
GLUTEN_ASSIGN_OR_THROW(in_, arrow::io::BufferedInputStream::Create(readerBufferSize, memoryPool, std::move(in)));
}
}
VeloxSortShuffleReaderDeserializer::~VeloxSortShuffleReaderDeserializer() {
if (auto in = std::dynamic_pointer_cast<CompressedInputStream>(in_)) {
decompressTime_ += in->decompressTime();
}
}
std::shared_ptr<ColumnarBatch> VeloxSortShuffleReaderDeserializer::next() {
if (reachedEos_) {
return nullptr;
}
if (rowBuffer_ == nullptr) {
rowBuffer_ =
AlignedBuffer::allocate<char>(deserializerBufferSize_, veloxPool_, std::nullopt, true /*allocateExact*/);
rowBufferPtr_ = rowBuffer_->asMutable<char>();
data_.reserve(batchSize_);
}
if (lastRowSize_ != 0) {
readNextRow();
}
while (cachedRows_ < batchSize_) {
GLUTEN_ASSIGN_OR_THROW(auto bytes, in_->Read(sizeof(RowSizeType), &lastRowSize_));
if (bytes == 0) {
reachedEos_ = true;
if (cachedRows_ > 0) {
return deserializeToBatch();
}
return nullptr;
}
if (cachedRows_ > 0 && lastRowSize_ + bytesRead_ > rowBuffer_->size()) {
return deserializeToBatch();
}
if (lastRowSize_ > deserializerBufferSize_) {
auto newSize = facebook::velox::bits::nextPowerOfTwo(lastRowSize_);
LOG(WARNING) << "Row size " << lastRowSize_ << " exceeds deserializer buffer size " << rowBuffer_->size()
<< ". Resizing buffer to " << newSize;
rowBuffer_ = AlignedBuffer::allocate<char>(newSize, veloxPool_, std::nullopt, true /*allocateExact*/);
rowBufferPtr_ = rowBuffer_->asMutable<char>();
}
readNextRow();
}
return deserializeToBatch();
}
std::shared_ptr<ColumnarBatch> VeloxSortShuffleReaderDeserializer::deserializeToBatch() {
ScopedTimer timer(&deserializeTime_);
auto rowVector = facebook::velox::row::CompactRow::deserialize(data_, rowType_, veloxPool_);
cachedRows_ = 0;
bytesRead_ = 0;
data_.resize(0);
return std::make_shared<VeloxColumnarBatch>(std::move(rowVector));
}
void VeloxSortShuffleReaderDeserializer::readNextRow() {
GLUTEN_THROW_NOT_OK(in_->Read(lastRowSize_, rowBufferPtr_ + bytesRead_));
data_.push_back(std::string_view(rowBufferPtr_ + bytesRead_, lastRowSize_));
bytesRead_ += lastRowSize_;
lastRowSize_ = 0;
++cachedRows_;
}
class VeloxRssSortShuffleReaderDeserializer::VeloxInputStream : public facebook::velox::GlutenByteInputStream {
public:
VeloxInputStream(std::shared_ptr<arrow::io::InputStream> input, facebook::velox::BufferPtr buffer);
bool hasNext();
void next(bool throwIfPastEnd) override;
size_t remainingSize() const override;
std::shared_ptr<arrow::io::InputStream> in_;
const facebook::velox::BufferPtr buffer_;
uint64_t offset_ = -1;
};
VeloxRssSortShuffleReaderDeserializer::VeloxInputStream::VeloxInputStream(
std::shared_ptr<arrow::io::InputStream> input,
facebook::velox::BufferPtr buffer)
: in_(std::move(input)), buffer_(std::move(buffer)) {
next(true);
}
bool VeloxRssSortShuffleReaderDeserializer::VeloxInputStream::hasNext() {
if (offset_ == 0) {
return false;
}
if (ranges()[0].position >= ranges()[0].size) {
next(true);
return offset_ != 0;
}
return true;
}
void VeloxRssSortShuffleReaderDeserializer::VeloxInputStream::next(bool throwIfPastEnd) {
const uint32_t readBytes = buffer_->capacity();
offset_ = in_->Read(readBytes, buffer_->asMutable<char>()).ValueOr(0);
if (offset_ > 0) {
int32_t realBytes = offset_;
VELOX_CHECK_LT(0, realBytes, "Reading past end of file.");
setRange({buffer_->asMutable<uint8_t>(), realBytes, 0});
}
}
VeloxRssSortShuffleReaderDeserializer::VeloxRssSortShuffleReaderDeserializer(
const std::shared_ptr<facebook::velox::memory::MemoryPool>& veloxPool,
const RowTypePtr& rowType,
int32_t batchSize,
facebook::velox::common::CompressionKind veloxCompressionType,
int64_t& deserializeTime,
std::shared_ptr<arrow::io::InputStream> in)
: veloxPool_(veloxPool),
rowType_(rowType),
batchSize_(batchSize),
veloxCompressionType_(veloxCompressionType),
serde_(getNamedVectorSerde(facebook::velox::VectorSerde::Kind::kPresto)),
deserializeTime_(deserializeTime),
arrowIn_(in) {
serdeOptions_ = {false, veloxCompressionType_};
}
std::shared_ptr<ColumnarBatch> VeloxRssSortShuffleReaderDeserializer::next() {
if (in_ == nullptr) {
constexpr uint64_t kMaxReadBufferSize = (1 << 20) - AlignedBuffer::kPaddedSize;
auto buffer = AlignedBuffer::allocate<char>(kMaxReadBufferSize, veloxPool_.get());
in_ = std::make_unique<VeloxInputStream>(std::move(arrowIn_), std::move(buffer));
}
if (!in_->hasNext()) {
return nullptr;
}
ScopedTimer timer(&deserializeTime_);
RowVectorPtr rowVector;
VectorStreamGroup::read(in_.get(), veloxPool_.get(), rowType_, serde_, &rowVector, &serdeOptions_);
if (rowVector->size() >= batchSize_) {
return std::make_shared<VeloxColumnarBatch>(std::move(rowVector));
}
while (rowVector->size() < batchSize_ && in_->hasNext()) {
RowVectorPtr rowVectorTemp;
VectorStreamGroup::read(in_.get(), veloxPool_.get(), rowType_, serde_, &rowVectorTemp, &serdeOptions_);
rowVector->append(rowVectorTemp.get());
}
return std::make_shared<VeloxColumnarBatch>(std::move(rowVector));
}
size_t VeloxRssSortShuffleReaderDeserializer::VeloxInputStream::remainingSize() const {
return std::numeric_limits<unsigned long>::max();
}
VeloxShuffleReaderDeserializerFactory::VeloxShuffleReaderDeserializerFactory(
const std::shared_ptr<arrow::Schema>& schema,
const std::shared_ptr<arrow::util::Codec>& codec,
facebook::velox::common::CompressionKind veloxCompressionType,
const RowTypePtr& rowType,
int32_t batchSize,
int64_t readerBufferSize,
int64_t deserializerBufferSize,
arrow::MemoryPool* memoryPool,
std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
ShuffleWriterType shuffleWriterType)
: schema_(schema),
codec_(codec),
veloxCompressionType_(veloxCompressionType),
rowType_(rowType),
batchSize_(batchSize),
readerBufferSize_(readerBufferSize),
deserializerBufferSize_(deserializerBufferSize),
memoryPool_(memoryPool),
veloxPool_(veloxPool),
shuffleWriterType_(shuffleWriterType) {
initFromSchema();
}
std::unique_ptr<ColumnarBatchIterator> VeloxShuffleReaderDeserializerFactory::createDeserializer(
std::shared_ptr<arrow::io::InputStream> in) {
switch (shuffleWriterType_) {
case ShuffleWriterType::kHashShuffle:
return std::make_unique<VeloxHashShuffleReaderDeserializer>(
std::move(in),
schema_,
codec_,
rowType_,
batchSize_,
readerBufferSize_,
memoryPool_,
veloxPool_.get(),
&isValidityBuffer_,
hasComplexType_,
deserializeTime_,
decompressTime_);
case ShuffleWriterType::kSortShuffle:
return std::make_unique<VeloxSortShuffleReaderDeserializer>(
std::move(in),
schema_,
codec_,
rowType_,
batchSize_,
readerBufferSize_,
deserializerBufferSize_,
memoryPool_,
veloxPool_.get(),
deserializeTime_,
decompressTime_);
case ShuffleWriterType::kRssSortShuffle:
return std::make_unique<VeloxRssSortShuffleReaderDeserializer>(
veloxPool_, rowType_, batchSize_, veloxCompressionType_, deserializeTime_, std::move(in));
}
GLUTEN_UNREACHABLE();
}
arrow::MemoryPool* VeloxShuffleReaderDeserializerFactory::getPool() {
return memoryPool_;
}
int64_t VeloxShuffleReaderDeserializerFactory::getDecompressTime() {
return decompressTime_;
}
int64_t VeloxShuffleReaderDeserializerFactory::getDeserializeTime() {
return deserializeTime_;
}
void VeloxShuffleReaderDeserializerFactory::initFromSchema() {
GLUTEN_ASSIGN_OR_THROW(auto arrowColumnTypes, toShuffleTypeId(schema_->fields()));
isValidityBuffer_.reserve(arrowColumnTypes.size());
for (size_t i = 0; i < arrowColumnTypes.size(); ++i) {
switch (arrowColumnTypes[i]->id()) {
case arrow::BinaryType::type_id:
case arrow::StringType::type_id: {
isValidityBuffer_.push_back(true);
isValidityBuffer_.push_back(false);
isValidityBuffer_.push_back(false);
} break;
case arrow::StructType::type_id:
case arrow::MapType::type_id:
case arrow::ListType::type_id: {
hasComplexType_ = true;
} break;
case arrow::BooleanType::type_id: {
isValidityBuffer_.push_back(true);
isValidityBuffer_.push_back(true);
} break;
case arrow::NullType::type_id:
break;
default: {
isValidityBuffer_.push_back(true);
isValidityBuffer_.push_back(false);
} break;
}
}
}
VeloxShuffleReader::VeloxShuffleReader(std::unique_ptr<VeloxShuffleReaderDeserializerFactory> factory)
: factory_(std::move(factory)) {}
std::shared_ptr<ResultIterator> VeloxShuffleReader::readStream(std::shared_ptr<arrow::io::InputStream> in) {
return std::make_shared<ResultIterator>(factory_->createDeserializer(in));
}
arrow::MemoryPool* VeloxShuffleReader::getPool() const {
return factory_->getPool();
}
int64_t VeloxShuffleReader::getDecompressTime() const {
return factory_->getDecompressTime();
}
int64_t VeloxShuffleReader::getDeserializeTime() const {
return factory_->getDeserializeTime();
}
} // namespace gluten