Memory_slice Default_chunk_reader::read_chunk()

in src/mlio/record_readers/detail/default_chunk_reader.cc [29:95]


Memory_slice Default_chunk_reader::read_chunk(Memory_span leftover)
{
    if (eof_) {
        return {};
    }

    bool reuse_buffer = false;

    if (chunk_ != nullptr) {
        // If the whole chunk is leftover, it means it does not contain
        // any records; in such case we should increase the size of the
        // chunk to make sure that we fit at least one record into it.
        if (chunk_->size() == leftover.size()) {
            if (chunk_->size() == next_chunk_size_) {
                next_chunk_size_ <<= 1;
            }

            reuse_buffer = true;

            // If the chunk is owned only by this Chunk_reader and its
            // associated Stream_record_reader, we can safely re-use it
            // for the next fill operation.
        }
        else if (chunk_->use_count() <= 2) {
            if (!leftover.empty()) {
                std::copy(leftover.begin(), leftover.end(), chunk_->begin());
            }

            reuse_buffer = true;
        }
    }

    if (reuse_buffer) {
        if (chunk_->size() != next_chunk_size_) {
            chunk_ = resize_memory_block(chunk_, next_chunk_size_);
        }
    }
    else {
        chunk_ = memory_allocator().allocate(next_chunk_size_);

        if (!leftover.empty()) {
            std::copy(leftover.begin(), leftover.end(), chunk_->begin());
        }
    }

    auto remaining = make_span(*chunk_).subspan(leftover.size());
    while (!remaining.empty()) {
        std::size_t num_bytes_read = stream_->read(remaining);
        if (num_bytes_read == 0) {
            eof_ = true;

            break;
        }

        remaining = remaining.subspan(num_bytes_read);
    }

    Intrusive_ptr<Mutable_memory_block> chunk;
    if (eof_) {
        chunk = std::move(chunk_);
    }
    else {
        chunk = chunk_;
    }

    return Memory_slice{chunk}.first(chunk->end() - stdx::ssize(remaining));
}