in src/core/storage/sframe_data/sarray_v2_type_encoding.cpp [1025:1155]
size_t typed_decode_stream::read(const std::pair<flexible_type*, size_t>& decodebuffer,
size_t skip) {
if (skip == 0) {
ASSERT_TRUE(decodebuffer.first != nullptr && decodebuffer.second > 0);
} else {
ASSERT_TRUE(decodebuffer.first == nullptr && decodebuffer.second == 0);
}
size_t decode_bufpos = 0;
if (perform_type_decoding) {
if (num_types == 1) {
if (decodebuffer.first && column_type == flex_type_enum::UNDEFINED) {
for (size_t idx = 0;idx < decodebuffer.second; ++idx) {
decodebuffer.first[idx] = FLEX_UNDEFINED;
}
} else {
for (size_t idx = 0;idx < decodebuffer.second; ++idx) {
decodebuffer.first[idx] = flexible_type();
}
}
} else if (num_types == 2) {
if (skip) {
size_t effective_skip = 0;
// compute the effective skip
for (size_t idx = 0;idx < skip; ++idx) {
effective_skip += !undefined_bitmap.get(last_id);
++last_id;
}
skip = effective_skip;
if (skip == 0) return 0;
} else {
size_t nvals = pad_retbuf_with_undefined_positions(decodebuffer);
if (nvals == 0) return 0;
}
}
}
CORO_BEGIN(read)
if (perform_type_decoding) {
if (num_types == 0) {
// empty block
return 0;
} else if (num_types == 1 && column_type == flex_type_enum::UNDEFINED) {
// the CORO_BEGIN(read) prefix has already filled this with
// undefined values. So all is good and we just count how
// many we should actually be returning and return.
// We still use the CORO mechanic here so that CLASS_CORO_RUNNING
// can be used to detect if there is still stuff to read.
while(dsize > 0) {
if (skip) undefined_elem_consumed = std::min(dsize, skip);
else undefined_elem_consumed = std::min(dsize, decodebuffer.second);
dsize -= undefined_elem_consumed;
CORO_YIELD(undefined_elem_consumed);
}
}
} else {
iarc >> values;
for (i = 0; i < values.size(); ++i) {
PUT_BUFFER_SKIP(std::move(values[i]), i, values.size());
}
return decode_bufpos;
}
// the interesting decoder
if (perform_type_decoding) {
if (column_type == flex_type_enum::INTEGER) {
number_decoder = new decode_number_stream;
do {
decode_bufpos = number_decoder->read(elements_to_decode, iarc, decodebuffer, skip);
CORO_YIELD(decode_bufpos);
} while(CLASS_CORO_RUNNING(*number_decoder, read));
delete number_decoder;
number_decoder = nullptr;
} else if (column_type == flex_type_enum::FLOAT) {
if (info.flags & BLOCK_ENCODING_EXTENSION) {
double_decoder = new decode_double_stream;
do {
decode_bufpos = double_decoder->read(elements_to_decode, iarc, decodebuffer, skip);
CORO_YIELD(decode_bufpos);
} while(CLASS_CORO_RUNNING(*double_decoder, read));
delete double_decoder;
double_decoder = nullptr;
} else {
double_legacy_decoder = new decode_double_stream_legacy;
do {
decode_bufpos = double_legacy_decoder->read(elements_to_decode, iarc, decodebuffer, skip);
CORO_YIELD(decode_bufpos);
} while(CLASS_CORO_RUNNING(*double_legacy_decoder, read));
delete double_legacy_decoder;
double_legacy_decoder = nullptr;
}
} else if (column_type == flex_type_enum::STRING) {
string_decoder = new decode_string_stream;
do {
decode_bufpos = string_decoder->read(elements_to_decode, iarc, decodebuffer, skip);
CORO_YIELD(decode_bufpos);
} while(CLASS_CORO_RUNNING(*string_decoder, read));
delete string_decoder;
string_decoder = nullptr;
} else if (column_type == flex_type_enum::VECTOR) {
vector_decoder = new decode_vector_stream;
do {
decode_bufpos = vector_decoder->read(elements_to_decode, iarc, decodebuffer, skip,
info.flags & BLOCK_ENCODING_EXTENSION);
CORO_YIELD(decode_bufpos);
} while(CLASS_CORO_RUNNING(*vector_decoder, read));
delete vector_decoder;
vector_decoder = nullptr;
} else if (column_type == flex_type_enum::ND_VECTOR) {
ndvector_decoder = new decode_ndvector_stream;
do {
decode_bufpos = ndvector_decoder->read(elements_to_decode, iarc, decodebuffer, skip,
info.flags & BLOCK_ENCODING_EXTENSION);
CORO_YIELD(decode_bufpos);
} while(CLASS_CORO_RUNNING(*ndvector_decoder, read));
delete ndvector_decoder;
ndvector_decoder = nullptr;
} else {
for (i = 0;i < elements_to_decode; ++i) {
generic_ret = flexible_type(column_type);
generic_ret.apply_mutating_visitor(generic_deserializer);
PUT_BUFFER(std::move(generic_ret));
}
}
}
CORO_END
return decode_bufpos;
}