Buffer get_next_buf()

in cpp/src/common/allocator/byte_stream.h [515:585]


        Buffer get_next_buf(ByteStream &host) {
            Buffer b;
            if (UNLIKELY(host.head_.load() == nullptr)) {
                // host is empty, return empty buffer.
                return b;
            }
            if (UNLIKELY(cur_ == nullptr)) {
                // this consumer did not initialiazed.
                cur_ = host_.head_.load();
                read_offset_within_cur_page_ = 0;
            }

            // get tail position <tail_, total_size_> atomically
            Page *host_end = nullptr;
            uint32_t host_total_size = 0;
            while (true) {
                host_end = host_.tail_.load();
                host_total_size = host_.total_size_.load();
                if (host_end == host_.tail_.load()) {
                    break;
                }
            }

            while (true) {
                if (cur_ == host_end) {
                    if (host_total_size % host_.page_size_ == 0) {
                        if (read_offset_within_cur_page_ == host_.page_size_) {
                            return b;
                        } else {
                            b.buf_ = ((char *)(cur_->buf_)) +
                                     read_offset_within_cur_page_;
                            b.len_ =
                                host_.page_size_ - read_offset_within_cur_page_;
                            read_offset_within_cur_page_ = host_.page_size_;
                            total_end_offset_ += b.len_;
                            return b;
                        }
                    } else {
                        if (read_offset_within_cur_page_ ==
                            (host_total_size % host_.page_size_)) {
                            return b;
                        } else {
                            b.buf_ = ((char *)(cur_->buf_)) +
                                     read_offset_within_cur_page_;
                            b.len_ = (host_total_size % host_.page_size_) -
                                     read_offset_within_cur_page_;
                            read_offset_within_cur_page_ =
                                (host_total_size % host_.page_size_);
                            total_end_offset_ += b.len_;
                            return b;
                        }
                    }
                } else {
                    if (read_offset_within_cur_page_ == host_.page_size_) {
                        cur_ = cur_->next_.load();
                        read_offset_within_cur_page_ = 0;
                    } else {
                        b.buf_ = ((char *)(cur_->buf_)) +
                                 read_offset_within_cur_page_;
                        b.len_ =
                            host_.page_size_ - read_offset_within_cur_page_;
                        cur_ = cur_->next_.load();
                        read_offset_within_cur_page_ = 0;
                        total_end_offset_ += b.len_;
                        return b;
                    }
                }
            }
            ASSERT(b.len_ < (1 << 30));
            return b;
        }