size_t typed_decode_stream::read()

in src/core/storage/sframe_data/sarray_v2_type_encoding.cpp [1025:1155]


size_t typed_decode_stream::read(const std::pair<flexible_type*, size_t>& decodebuffer,
                                 size_t skip) {
  if (skip == 0) {
    ASSERT_TRUE(decodebuffer.first != nullptr && decodebuffer.second > 0);
  } else {
    ASSERT_TRUE(decodebuffer.first == nullptr && decodebuffer.second == 0);
  }
  size_t decode_bufpos = 0;
  if (perform_type_decoding) {
    if (num_types == 1) {
      if (decodebuffer.first && column_type == flex_type_enum::UNDEFINED) {
        for (size_t idx = 0;idx < decodebuffer.second; ++idx) {
          decodebuffer.first[idx] = FLEX_UNDEFINED;
        }
      } else {
        for (size_t idx = 0;idx < decodebuffer.second; ++idx) {
          decodebuffer.first[idx] = flexible_type();
        }
      }
    } else if (num_types == 2) {
      if (skip) {
        size_t effective_skip = 0;
        // compute the effective skip
        for (size_t idx = 0;idx < skip; ++idx) {
          effective_skip += !undefined_bitmap.get(last_id);
          ++last_id;
        }
        skip = effective_skip;
        if (skip == 0) return 0;
      } else {
        size_t nvals = pad_retbuf_with_undefined_positions(decodebuffer);
        if (nvals == 0) return 0;
      }
    }
  }

  CORO_BEGIN(read)

  if (perform_type_decoding) {
    if (num_types == 0) {
      // empty block
      return 0;
    } else if (num_types == 1 && column_type == flex_type_enum::UNDEFINED) {
      // the CORO_BEGIN(read) prefix has already filled this with
      // undefined values. So all is good and we just count how
      // many we should actually be returning and return.
      // We still use the CORO mechanic here so that CLASS_CORO_RUNNING
      // can be used to detect if there is still stuff to read.
      while(dsize > 0) {
        if (skip) undefined_elem_consumed = std::min(dsize, skip);
        else undefined_elem_consumed = std::min(dsize, decodebuffer.second);
        dsize -= undefined_elem_consumed;
        CORO_YIELD(undefined_elem_consumed);
      }
    }
  } else {
    iarc >> values;
    for (i = 0; i < values.size(); ++i) {
      PUT_BUFFER_SKIP(std::move(values[i]), i, values.size());
    }
    return decode_bufpos;
  }


  // the interesting decoder
  if (perform_type_decoding) {

    if (column_type == flex_type_enum::INTEGER) {
      number_decoder = new decode_number_stream;
      do {
        decode_bufpos = number_decoder->read(elements_to_decode, iarc, decodebuffer, skip);
        CORO_YIELD(decode_bufpos);
      } while(CLASS_CORO_RUNNING(*number_decoder, read));
      delete number_decoder;
      number_decoder = nullptr;

    } else if (column_type == flex_type_enum::FLOAT) {
      if (info.flags & BLOCK_ENCODING_EXTENSION) {
        double_decoder = new decode_double_stream;
        do {
          decode_bufpos = double_decoder->read(elements_to_decode, iarc, decodebuffer, skip);
          CORO_YIELD(decode_bufpos);
        } while(CLASS_CORO_RUNNING(*double_decoder, read));
      delete double_decoder;
      double_decoder = nullptr;
      } else {
        double_legacy_decoder = new decode_double_stream_legacy;
        do {
          decode_bufpos = double_legacy_decoder->read(elements_to_decode, iarc, decodebuffer, skip);
          CORO_YIELD(decode_bufpos);
        } while(CLASS_CORO_RUNNING(*double_legacy_decoder, read));
      delete double_legacy_decoder;
      double_legacy_decoder = nullptr;
      }
    } else if (column_type == flex_type_enum::STRING) {
      string_decoder = new decode_string_stream;
      do {
        decode_bufpos = string_decoder->read(elements_to_decode, iarc, decodebuffer, skip);
        CORO_YIELD(decode_bufpos);
      } while(CLASS_CORO_RUNNING(*string_decoder, read));
      delete string_decoder;
      string_decoder = nullptr;
    } else if (column_type == flex_type_enum::VECTOR) {
      vector_decoder = new decode_vector_stream;
      do {
        decode_bufpos = vector_decoder->read(elements_to_decode, iarc, decodebuffer, skip,
                                             info.flags & BLOCK_ENCODING_EXTENSION);
        CORO_YIELD(decode_bufpos);
      } while(CLASS_CORO_RUNNING(*vector_decoder, read));
      delete vector_decoder;
      vector_decoder = nullptr;
    } else if (column_type == flex_type_enum::ND_VECTOR) {
      ndvector_decoder = new decode_ndvector_stream;
      do {
        decode_bufpos = ndvector_decoder->read(elements_to_decode, iarc, decodebuffer, skip,
                                               info.flags & BLOCK_ENCODING_EXTENSION);
        CORO_YIELD(decode_bufpos);
      } while(CLASS_CORO_RUNNING(*ndvector_decoder, read));
      delete ndvector_decoder;
      ndvector_decoder = nullptr;
    } else {
      for (i = 0;i < elements_to_decode; ++i) {
        generic_ret = flexible_type(column_type);
        generic_ret.apply_mutating_visitor(generic_deserializer);
        PUT_BUFFER(std::move(generic_ret));
      }
    }
  }
  CORO_END
  return decode_bufpos;
}