in cpp/src/common/allocator/byte_stream.h [515:585]
Buffer get_next_buf(ByteStream &host) {
Buffer b;
if (UNLIKELY(host.head_.load() == nullptr)) {
// host is empty, return empty buffer.
return b;
}
if (UNLIKELY(cur_ == nullptr)) {
// this consumer did not initialiazed.
cur_ = host_.head_.load();
read_offset_within_cur_page_ = 0;
}
// get tail position <tail_, total_size_> atomically
Page *host_end = nullptr;
uint32_t host_total_size = 0;
while (true) {
host_end = host_.tail_.load();
host_total_size = host_.total_size_.load();
if (host_end == host_.tail_.load()) {
break;
}
}
while (true) {
if (cur_ == host_end) {
if (host_total_size % host_.page_size_ == 0) {
if (read_offset_within_cur_page_ == host_.page_size_) {
return b;
} else {
b.buf_ = ((char *)(cur_->buf_)) +
read_offset_within_cur_page_;
b.len_ =
host_.page_size_ - read_offset_within_cur_page_;
read_offset_within_cur_page_ = host_.page_size_;
total_end_offset_ += b.len_;
return b;
}
} else {
if (read_offset_within_cur_page_ ==
(host_total_size % host_.page_size_)) {
return b;
} else {
b.buf_ = ((char *)(cur_->buf_)) +
read_offset_within_cur_page_;
b.len_ = (host_total_size % host_.page_size_) -
read_offset_within_cur_page_;
read_offset_within_cur_page_ =
(host_total_size % host_.page_size_);
total_end_offset_ += b.len_;
return b;
}
}
} else {
if (read_offset_within_cur_page_ == host_.page_size_) {
cur_ = cur_->next_.load();
read_offset_within_cur_page_ = 0;
} else {
b.buf_ = ((char *)(cur_->buf_)) +
read_offset_within_cur_page_;
b.len_ =
host_.page_size_ - read_offset_within_cur_page_;
cur_ = cur_->next_.load();
read_offset_within_cur_page_ = 0;
total_end_offset_ += b.len_;
return b;
}
}
}
ASSERT(b.len_ < (1 << 30));
return b;
}