std::vector bytes_chunk_cache::scan()

in turbonfs/src/file_cache.cpp [1002:2046]


std::vector<bytes_chunk> bytes_chunk_cache::scan(uint64_t offset,
                                                 uint64_t length,
                                                 scan_action action,
                                                 uint64_t *bytes_released,
                                                 uint64_t *extent_left,
                                                 uint64_t *extent_right)
{
#ifdef ENABLE_PRESSURE_POINTS
    /*
     * Simulate delay in getting bytes_chunk vector.
     */
    if (inject_error()) {
        const uint64_t sleep_usecs = random_number(10'000, 1000'000);
        AZLogWarn("[{}] scan(offset={}, length={}), delaying {} usecs",
                  CACHE_TAG, offset, length, sleep_usecs);
        ::usleep(sleep_usecs);
    }
#endif

    assert(offset < AZNFSC_MAX_FILE_SIZE);
    assert(length > 0);
    assert((int64_t) (offset + length) == ((int64_t) offset + (int64_t) length));

    /*
     * Cannot write more than AZNFSC_MAX_CHUNK_SIZE in a single call so get()
     * must not ask for more than that. release() or truncate() can ask for
     * more than AZNFSC_MAX_CHUNK_SIZE to be released.
     */
    assert(length <= AZNFSC_MAX_CHUNK_SIZE ||
           (action == scan_action::SCAN_ACTION_RELEASE));
    assert((offset + length) <= AZNFSC_MAX_FILE_SIZE);
    assert((action == scan_action::SCAN_ACTION_GET) ||
           (action == scan_action::SCAN_ACTION_RELEASE));

    // Range check makes sense only for get().
    assert((action == scan_action::SCAN_ACTION_GET) ||
           (extent_left == nullptr && extent_right == nullptr));

    // Doesn't make sense to query just one.
    assert((extent_left == nullptr) == (extent_right == nullptr));

    // bytes_released MUST be passed for (and only for) SCAN_ACTION_RELEASE.
    assert((action == scan_action::SCAN_ACTION_RELEASE) ==
           (bytes_released != nullptr));

    // inode must be valid when get()/release() is called.
    assert(!inode || (inode->magic == NFS_INODE_MAGIC));

    // bytes_chunk vector that will be returned to the caller.
    std::vector<bytes_chunk> chunkvec;

    // offset and length cursor, updated as we add chunks to chunkvec.
    uint64_t next_offset = offset;
    uint64_t remaining_length = length;

    // bytes released by trimming and by full chunk deletions.
    uint64_t bytes_released_trim = 0;
    uint64_t bytes_released_full1 = 0;

    if (bytes_released)
        *bytes_released = 0;

    /*
     * Do we need to find containing extent's left and right edges?
     * We should need it only when the caller intends to write to the returned
     * membufs.
     */
    const bool find_extent = (extent_left != nullptr);

    // Convenience variable to access the current chunk in the map.
    bytes_chunk *bc;

#ifdef UTILIZE_TAILROOM_FROM_LAST_MEMBUF
    // Last chunk (when we are getting byte range right after the last chunk).
    bytes_chunk *last_bc = nullptr;
#endif

    // Temp variables to hold chunk details for newly added chunk.
    uint64_t chunk_offset, chunk_length;

    /*
     * TODO: See if we can hold shared lock for cases where we don't have to
     *       update chunkmap.
     */
    const std::unique_lock<std::mutex> _lock(chunkmap_lock_43);

	/*
	 * Before we proceed with the cache lookup check if invalidate is pending.
	 * Note that this will not sync dirty data with the server.
	 */
	if (test_and_clear_invalidate_pending()) {
		AZLogDebug("[{}] (Deferred) Purging file_cache", CACHE_TAG);
		clear_nolock();
	}

    /*
     * Temp variables to hold details for releasing a range.
     * All chunks in the range [begin_delete, end_delete) will be freed as
     * they fall completely inside the released range.
     * Used only for SCAN_ACTION_RELEASE.
     */
    std::map <uint64_t,
              struct bytes_chunk>::iterator begin_delete = chunkmap.end();
    std::map <uint64_t,
              struct bytes_chunk>::iterator end_delete = chunkmap.end();

    /*
     * Variables to track the extent this write is part of.
     * We will udpate these as the left and right edges of the extent are
     * confirmed. Used only for SCAN_ACTION_GET when find_extent is true,
     * which will be true for writers.
     * lookback_it is the iterator to the chunk starting which we should
     * "look back" for the left edge of the extent containing the just written
     * chunk. We basically scan to the left till we find a gap or we find a
     * membuf that has needs_flush() false or we hit the end.
     * Note that these will only ever point to a membuf edge.
     */
    uint64_t _extent_left = AZNFSC_BAD_OFFSET;
    uint64_t _extent_right = AZNFSC_BAD_OFFSET;
    std::map <uint64_t,
              struct bytes_chunk>::iterator lookback_it = chunkmap.end();

#define SET_LOOKBACK_IT_TO_PREV() \
do { \
    if (it != chunkmap.begin()) { \
        lookback_it = std::prev(it); \
        bc = &(lookback_it->second); \
        AZLogVerbose("lookback_it: [{},{})", \
                     bc->offset, bc->offset + bc->length); \
    } else { \
        assert(lookback_it == chunkmap.end()); \
    } \
} while (0)

    /*
     * First things first, if file-backed cache and backing file not yet open,
     * open it.
     */
    if (action == scan_action::SCAN_ACTION_GET) {
        if ((backing_file_fd == -1) && !backing_file_name.empty()) {
            backing_file_fd = ::open(backing_file_name.c_str(),
                                     O_CREAT|O_TRUNC|O_RDWR, 0755);
            if (backing_file_fd == -1) {
                AZLogError("Failed to open backing_file {}: {}",
                           backing_file_name, strerror(errno));
                assert(0);
                return chunkvec;
            } else {
                AZLogInfo("Opened backing_file {}: fd={}",
                           backing_file_name, backing_file_fd);
            }
        }

        /*
         * Extend backing_file as the very first thing.
         * It is important that when membuf::load() is called, the backing file
         * has size >= (offset + length).
         */
        if (!extend_backing_file(offset + length)) {
            AZLogError("Failed to extend backing_file to {} bytes: {}",
                       offset+length, strerror(errno));
            assert(0);
            return chunkvec;
        }
    }

    /*
     * Find chunk with offset >= next_offset.
     * We start from the first chunk covering the start of the requested range
     * and then iterate over the subsequent chunks (allocating missing chunks
     * along the way) till we cover the entire requested range. Newly allocated
     * chunks can be identified in the returned chunkvec as they have is_new
     * set.
     */
    auto it = chunkmap.lower_bound(next_offset);

    if (it == chunkmap.end()) {
        /*
         * next_offset is greater than the greatest offset in the chunkmap.
         * We still have to check the last chunk to see if it has some or all
         * of the requested range.
         */
        if (chunkmap.empty()) {
            if (action == scan_action::SCAN_ACTION_RELEASE) {
                /*
                 * Empty cache, nothing to release.
                 */
                AZLogVerbose("<Release [{}, {})> Empty cache, nothing to release",
                             offset, offset + length);
                goto end;
            }

            /*
             * Only chunk being added, so left and right edge of that are also
             * the extent's left and right edge.
             */
            _extent_left = next_offset;
            _extent_right = next_offset + remaining_length;

            AZLogVerbose("(first/only chunk) _extent_left: {} _extent_right: {}",
                         _extent_left, _extent_right);

            assert(lookback_it == chunkmap.end());
            goto allocate_only_chunk;
        } else {
            // Iterator to the last chunk.
            it = std::prev(it);
            bc = &(it->second);

            if ((bc->offset + bc->length) <= next_offset) {
                /*
                 * Requested range lies after the end of last chunk. This means
                 * for SCAN_ACTION_RELEASE we have nothing to do.
                 * For SCAN_ACTION_GET we will need to allocate a new chunk and
                 * this will be the only chunk needed to cover the requested range.
                 */
                if (action == scan_action::SCAN_ACTION_RELEASE) {
                    AZLogVerbose("<Release [{}, {})> First byte to release "
                                 "lies after the last chunk [{}, {})",
                                 offset, offset + length,
                                 bc->offset, bc->offset + bc->length);
                    goto end;
                }

                if ((bc->offset + bc->length) < next_offset) {
                    /*
                     * New chunk starts at a gap after the last chunk.
                     * next_offset is the definitive _extent_left and we don't
                     * need to look back.
                     */
                    _extent_left = next_offset;
                    AZLogVerbose("_extent_left: {}", _extent_left);
                    assert(lookback_it == chunkmap.end());
                } else {
                    /*
                     * New chunk starts right after the last chunk.
                     * Set tentative left edge and set lookback_it to the last
                     * chunk so that we can later "look back" and find the
                     * actual left edge.
                     */
                    _extent_left = next_offset;
                    AZLogVerbose("(tentative) _extent_left: {}", _extent_left);

                    AZLogVerbose("lookback_it: [{},{})",
                                 bc->offset, bc->offset + bc->length);
                    lookback_it = it;
#ifdef UTILIZE_TAILROOM_FROM_LAST_MEMBUF
                    last_bc = bc;
#endif
                }

                _extent_right = next_offset + remaining_length;
                AZLogVerbose("_extent_right: {}", _extent_right);

                assert(remaining_length > 0);
                goto allocate_only_chunk;
            } else {
                /*
                 * Part or whole of requested range lies in the last chunk.
                 * Set _extent_left tentatively, _extent_right will be set by
                 * the for loop below. Also for finding the real left edge we
                 * need to search backwards from the prev chunk, hence set
                 * lookback_it to that.
                 */
                _extent_left = bc->offset;
                AZLogVerbose("(tentative) _extent_left: {}", _extent_left);

                SET_LOOKBACK_IT_TO_PREV();
            }
        }
    } else {
        /*
         * There's at least one chunk having offset greater than the requested
         * chunk's offset (next_offset).
         *
         * it->first >= next_offset, we have two cases:
         * 1. (it.first == next_offset) => desired data starts from this chunk.
         * 2. (it.first > next_offset)  => desired data starts before this chunk.
         *                                 It may start within the prev chunk,
         *                                 or this chunk may start in the gap
         *                                 between the prev chunk and this chunk,
         *                                 in that case we need to create a new
         *                                 chunk before this chunk.
         */
        assert(it->first == it->second.offset);
        assert(it->first >= next_offset);

        if (it->first == next_offset) {
            bc = &(it->second);
            /*
             * Requested range starts from this chunk. Set _extent_left
             * tentatively to this chunk's left edge and set lookback_it
             * to the prev chunk for finding the true left edge later.
             * _extent_right will be set by the for loop and later updated
             * correctly.
             */
            _extent_left = it->first;
            AZLogVerbose("(tentative) _extent_left: {}", _extent_left);

            SET_LOOKBACK_IT_TO_PREV();
        } else {
            /*
             * Requested range starts before this chunk.
             */
            assert(it->first > next_offset);

            if (it == chunkmap.begin()) {
                /*
                 * If this is the first chunk then part or whole of the
                 * requested range lies before this chunk and we need to
                 * create a new chunk for that. For SCAN_ACTION_RELEASE
                 * we just ignore the part before this chunk.
                 */
                bc = &(it->second);
                assert(bc->offset > next_offset);

                /*
                 * Newly created chunk's offset and length.
                 * For the release case chunk_offset and chunk_length are not
                 * used but we must update remaining_length and next_offset to
                 * correctly track the "to-be-released" range.
                 */
                chunk_offset = next_offset;
                chunk_length = std::min(bc->offset - next_offset,
                                        remaining_length);

                remaining_length -= chunk_length;
                next_offset += chunk_length;

                if (action == scan_action::SCAN_ACTION_GET) {
                    /*
                     * This newly added chunk is the first chunk, so its offset
                     * is the left edge. We mark the right edge tentatively,
                     * it'll be confirmed after we look forward.
                     */
                    _extent_left = chunk_offset;
                    _extent_right = chunk_offset + chunk_length;
                    assert(lookback_it == chunkmap.end());

                    AZLogVerbose("_extent_left: {}", _extent_left);
                    AZLogVerbose("(tentative) _extent_right: {}", _extent_right);

                    chunkvec.emplace_back(this, chunk_offset, chunk_length);
                    AZLogVerbose("(new chunk) [{},{})",
                                 chunk_offset, chunk_offset + chunk_length);
                } else {
                    AZLogVerbose("<Release [{}, {})> (non-existent chunk) "
                                 "[{},{})",
                                 offset, offset + length,
                                 chunk_offset, chunk_offset + chunk_length);
                }
            } else {
                /*
                 * Requested range starts before this chunk and we have a
                 * chunk before this chunk.
                 */

                // This chunk (we need it later).
                auto itn = it;
                bytes_chunk *bcn = &(itn->second);
                assert(bcn->offset > next_offset);

                // Prev chunk.
                it = std::prev(it);
                bc = &(it->second);

                if ((bc->offset + bc->length) <= next_offset) {
                    /*
                     * Prev chunk ends before the 1st byte from the requested
                     * range. This means we need to allocate a chunk after the
                     * prev chunk. The new chunk size will be from next_offset
                     * till the start offset of the next chunk (bcn) or
                     * remaining_length whichever is smaller.
                     *
                     * For the release case chunk_offset and chunk_length are not
                     * used but we must update remaining_length and next_offset to
                     * correctly track the "to-be-released" range.
                     */
                    chunk_offset = next_offset;
                    chunk_length = std::min(bcn->offset - next_offset,
                                            remaining_length);

                    remaining_length -= chunk_length;
                    next_offset += chunk_length;

                    if (action == scan_action::SCAN_ACTION_GET) {
                        /*
                         * If this new chunk starts right after the prev chunk, then
                         * we don't know the actual value of _extent_left unless we
                         * scan left and check. In that case we set lookback_it to
                         * the prev chunk, so that we can later "look back" and find
                         * the left edge.
                         * If it doesn't start right after, then chunk_offset becomes
                         * _extent_left.
                         */
                        if ((bc->offset + bc->length) < next_offset) {
                            /*
                             * New chunk does not touch the prev chunk, so the new
                             * chunk offset is the _extent_left.
                             */
                            _extent_left = chunk_offset;
                            AZLogVerbose("_extent_left: {}", _extent_left);
                            assert(lookback_it == chunkmap.end());
                        } else {
                            _extent_left = chunk_offset;
                            AZLogVerbose("(tentative) _extent_left: {}", _extent_left);
                            /*
                             * Else, new chunk touches the prev chunk, so we need
                             * to "look back" for finding the left edge.
                             */
                            AZLogVerbose("lookback_it: [{},{})",
                                         bc->offset, bc->offset + bc->length);
                            lookback_it = it;
                        }

                        _extent_right = chunk_offset + chunk_length;
                        AZLogVerbose("(tentative) _extent_right: {}", _extent_right);

                        // Search for more chunks should start from the next chunk.
                        it = itn;

                        chunkvec.emplace_back(this, chunk_offset, chunk_length);
                        AZLogVerbose("(new chunk) [{},{})",
                                     chunk_offset, chunk_offset + chunk_length);
                    } else {
                        // Search for more chunks should start from the next chunk.
                        it = itn;

                        AZLogVerbose("<Release [{}, {})> (non-existent chunk) "
                                     "[{},{})",
                                     offset, offset + length,
                                     chunk_offset, chunk_offset + chunk_length);
                    }
                } else {
                    /*
                     * Prev chunk contains some bytes from initial part of the
                     * requested range. Set _extent_left tentative, the for loop
                     * below will set _extent_right correctly.
                     * Need to "look back" to find the true left edge and look
                     * forward to find the true right edge.
                     */
                    _extent_left = bc->offset;
                    AZLogVerbose("(tentative) _extent_left: {}", _extent_left);

                    SET_LOOKBACK_IT_TO_PREV();
                }
            }
        }
    }

    /*
     * _extent_left MUST be set for all cases that require us to traverse the
     * chunkmap. lookback_it may or may not be set depending on whether
     * _extent_left is tentative and we need to search backwards for the true
     * left edge.
     */
    if (action == scan_action::SCAN_ACTION_GET) {
        assert(_extent_left != AZNFSC_BAD_OFFSET);
    }

    /*
     * Now sequentially go over the remaining chunks till we cover the entire
     * requested range. For SCAN_ACTION_GET if some chunk doesn't exist, it'll
     * be allocated, while for SCAN_ACTION_GET non-existent chunks are ignored.
     */
    for (; remaining_length != 0 && it != chunkmap.end(); ) {
        bc = &(it->second);

        // membuf and chunkmap bc offset and length must always be in sync.
        assert(bc->length == bc->get_membuf()->length);
        assert(bc->offset == bc->get_membuf()->offset);

        /*
         * For the GET and file-backed cache, make sure the requested chunk is
         * duly mmapped so that any IO that caller performs on the returned
         * bytes_chunk is served from the backing file.
         */
        if (action == scan_action::SCAN_ACTION_GET) {
            bc->load();
        }

        /*
         * next_offset must lie before the end of current chunk, else we should
         * not be inside the for loop.
         */
        assert(next_offset < (bc->offset + bc->length));

        chunk_offset = next_offset;

        if (next_offset == bc->offset) {
            /*
             * Our next offset of interest (next_offset) lies exactly at the
             * start of this chunk.
             */
            chunk_length = std::min(bc->length, remaining_length);
            assert(chunk_length > 0);

            if (action == scan_action::SCAN_ACTION_GET) {
                /*
                 * Starting offset of this request matches the bytes_chunk in
                 * the chunkmap, if length also matches then is_whole MUST
                 * be set.
                 */
                assert(chunk_offset == bc->offset);
                const bool is_whole = (chunk_length == bc->length);
                chunkvec.emplace_back(this, chunk_offset, chunk_length,
                                      bc->buffer_offset, bc->alloc_buffer,
                                      is_whole);
                AZLogVerbose("(existing chunk) [{},{}) b:{} a:{}",
                             chunk_offset, chunk_offset + chunk_length,
                             fmt::ptr(chunkvec.back().get_buffer()),
                             fmt::ptr(bc->alloc_buffer->get()));
            } else if (bc->safe_to_release()) {
                assert(action == scan_action::SCAN_ACTION_RELEASE);

                if (chunk_length == bc->length) {
                    /*
                     * chunk_length bytes will be released.
                     */
                    bytes_released_full1 += chunk_length;

                    /*
                     * File-backed cache may not have the membuf allocated in
                     * case the cache is dropped. bc->get_buffer() will assert
                     * so avoid calling it.
                     */
                    AZLogVerbose("<Release [{}, {})> (releasing chunk) [{},{}) "
                                 "b:{} a:{}",
                                 offset, offset + length,
                                 chunk_offset, chunk_offset + chunk_length,
                                 bc->alloc_buffer->get() ?
                                        fmt::ptr(bc->get_buffer()) : nullptr,
                                 fmt::ptr(bc->alloc_buffer->get()));
                    /*
                     * Queue the chunk for deletion, since the entire chunk is
                     * released.
                     */
                    if (begin_delete == chunkmap.end()) {
                        begin_delete = it;
                    }
                    /*
                     * Keep updating end_delete with every full chunk
                     * processed, that way in the end once we are done we will
                     * have end_delete correctly point to one past the last
                     * to-be-deleted chunk.
                     */
                    end_delete = std::next(it);
                } else {
                    assert(chunk_length == remaining_length);

                    /*
                     * Else trim the chunk (from the left).
                     */
                    AZLogVerbose("<Release [{}, {})> (trimming chunk from left) "
                                 "[{},{}) -> [{},{})",
                                 offset, offset + length,
                                 bc->offset, bc->offset + bc->length,
                                 bc->offset + chunk_length,
                                 bc->offset + bc->length);

                    // Trim chunkmap bc.
                    bc->offset += chunk_length;
                    bc->buffer_offset += chunk_length;
                    bc->length -= chunk_length;

                    // Trim membuf.
                    bc->get_membuf()->trim(chunk_length, true /* left */);

                    /*
                     * chunk_length bytes will be released.
                     */
                    bytes_released_trim += chunk_length;

                    /*
                     * Don't update num_chunks/num_chunks_g as we remove one
                     * and add one chunk.
                     */
                    assert(bytes_cached >= chunk_length);
                    assert(bytes_cached_g >= chunk_length);
                    bytes_cached -= chunk_length;
                    bytes_cached_g -= chunk_length;

                    /*
                     * Since the key (offset) for this chunk changed, we need
                     * to remove and re-insert into the map (with the updated
                     * key/offset). For the buffer, it shall refer to the same
                     * buffer (albeit different offset) that the original chunk
                     * was using.
                     * Add the new chunk first before deleting the old chunk,
                     * else bc->alloc_buffer may get freed.
                     *
                     * This can only happen for the last chunk in the range and
                     * hence it's ok to update the chunkmap. We should exit the
                     * for loop here.
                     */
                    auto p = chunkmap.try_emplace(bc->offset, this, bc->offset,
                                                  bc->length, bc->buffer_offset,
                                                  bc->alloc_buffer);
                    assert(p.second);
                    /*
                     * Now that the older chunk is going and is being replaced
                     * by this chunk, if end_delete was pointing at the old
                     * chunk, change it to point to this new chunk. Note that
                     * the new chunk will be the next in line and hence we
                     * can safely replace end_delete with this.
                     */
                    if (it == end_delete) {
                        end_delete = p.first;
                    }

                    chunkmap.erase(it);
                    goto done;
                }
            } else {
                AZLogVerbose("<Release [{}, {})> skipping [{}, {}) as not safe "
                             "to release: inuse={}, dirty={}",
                             offset, offset + length,
                             chunk_offset, chunk_offset + chunk_length,
                             bc->get_membuf()->get_inuse(),
                             bc->get_membuf()->is_dirty());
            }

            // This chunk is fully consumed, move to the next chunk.
            ++it;
        } else if (next_offset < bc->offset) {
            /*
             * Our next offset of interest (next_offset) lies before the
             * next chunk. For SCAN_ACTION_GET we need to allocate a new
             * chunk, for SCAN_ACTION_RELEASE ignore this non-existent byte
             * range. We set chunk_length so that remaining_length and
             * next_offset are correctly updated at the end of the loop.
             */
            chunk_length = std::min(bc->offset - next_offset,
                                    remaining_length);

            if (action == scan_action::SCAN_ACTION_GET) {
                chunkvec.emplace_back(this, chunk_offset, chunk_length);
                AZLogVerbose("(new chunk) [{},{})",
                             chunk_offset, chunk_offset+chunk_length);
            } else {
                AZLogVerbose("<Release [{}, {})> (non-existent chunk) [{},{})",
                             offset, offset + length,
                             chunk_offset, chunk_offset + chunk_length);
            }

            /*
             * In the next iteration we need to look at the current chunk, so
             * don't increment the iterator.
             * We continue from here as we want to set _extent_right
             * differently than what we do at end-of-loop.
             */
            remaining_length -= chunk_length;
            assert((int64_t) remaining_length >= 0);
            next_offset += chunk_length;

            if (action == scan_action::SCAN_ACTION_GET) {
                _extent_right = next_offset;
                AZLogVerbose("(tentative) _extent_right: {}", _extent_right);
            }
            continue;
        } else /* (next_offset > bc->offset) */ {
            /*
             * Our next offset of interest (next_offset) lies within this
             * chunk.
             */
            chunk_length = std::min(bc->offset + bc->length - next_offset,
                                    remaining_length);
            assert(chunk_length > 0);

            if (action == scan_action::SCAN_ACTION_GET) {
                /*
                 * Returned bytes_chunk doesn't have the same starting offset
                 * as the bytes_chunk in the chunkmap, so is_whole MUST be
                 * set to false.
                 */
                chunkvec.emplace_back(this, chunk_offset, chunk_length,
                                      bc->buffer_offset + (next_offset - bc->offset),
                                      bc->alloc_buffer,
                                      false /* is_whole */);
                AZLogVerbose("(existing chunk) [{},{}) b:{} a:{}",
                             chunk_offset, chunk_offset + chunk_length,
                             fmt::ptr(chunkvec.back().get_buffer()),
                             fmt::ptr(bc->alloc_buffer->get()));
            } else if (bc->safe_to_release()) {
                assert(action == scan_action::SCAN_ACTION_RELEASE);
                assert(chunk_length <= remaining_length);

                /*
                 * We have two cases:
                 * 1. The released part lies at the end of the chunk, so we
                 *    can safely release by trimming this chunk from the right.
                 * 2. The released part lies in the middle with un-released
                 *    ranges before and after the released chunk. To duly
                 *    release it we need to trim the original chunk to contain
                 *    data before the released data and create a new chunk to
                 *    hold the data after the released data, and copy data from
                 *    the existing membuf into this new membuf. This ends up
                 *    being expensive and not practically useful. Note that the
                 *    reason for caller doing release() is that it wants the
                 *    membuf memory to be released, but in this case we are not
                 *    releasing data but instead allocating more data and
                 *    copying it. This becomes worse when caller makes small
                 *    small release() calls from middle of the membuf.
                 *    We choose to ignore such release() calls and not release
                 *    any range in this case.
                 */

                const uint64_t chunk_after_offset =
                    next_offset + chunk_length;
                const uint64_t chunk_after_length =
                    bc->offset + bc->length - chunk_after_offset;

                if (chunk_after_length == 0) {
                    assert(chunk_length ==
                           (bc->offset + bc->length - next_offset));

                    const uint64_t trim_bytes = chunk_length;

                    /*
                     * All chunk data after next_offset is released, trim the
                     * chunk.
                     */
                    AZLogVerbose("<Release [{}, {})> (trimming chunk from right) "
                                 "[{},{}) -> [{},{})",
                                 offset, offset + length,
                                 bc->offset, bc->offset + bc->length,
                                 bc->offset, next_offset);

                    // Trim chunkmap bc.
                    bc->length = next_offset - bc->offset;
                    assert((int64_t) bc->length > 0);

                    // Trim membuf.
                    bc->get_membuf()->trim(trim_bytes, false /* left */);

                    /*
                     * trim_bytes bytes will be released.
                     */
                    bytes_released_trim += trim_bytes;

                    assert(bytes_cached >= trim_bytes);
                    assert(bytes_cached_g >= trim_bytes);
                    bytes_cached -= trim_bytes;
                    bytes_cached_g -= trim_bytes;
                } else {
                    /*
                     * The to-be-released range must lie entirely within this
                     * chunk.
                     */
                    assert(offset == next_offset);
                    assert(length == remaining_length);

                    AZLogVerbose("<Release [{}, {})> skipping as it lies in the "
                                 "middle of the chunk [{},{})",
                                 offset, offset + length,
                                 bc->offset, bc->offset + bc->length);
                }
            } else {
                AZLogVerbose("<Release [{}, {})> skipping [{}, {}) as not safe "
                             "to release: inuse={}, dirty={}",
                             offset, offset + length,
                             chunk_offset, chunk_offset + chunk_length,
                             bc->get_membuf()->get_inuse(),
                             bc->get_membuf()->is_dirty());
            }

            // This chunk is fully consumed, move to the next chunk.
            ++it;
        }

done:
        remaining_length -= chunk_length;
        assert((int64_t) remaining_length >= 0);
        next_offset += chunk_length;

        /*
         * Once this for loop exits, the search for _extent_right continues
         * with 'it', so we must make sure that 'it' points to the next chunk
         * that we want to check. Note that we search for _extent_right only
         * for SCAN_ACTION_GET.
         */
        if (action == scan_action::SCAN_ACTION_GET) {
            _extent_right = bc->offset + bc->length;
            AZLogVerbose("(tentative) _extent_right: {}", _extent_right);
        }
    }

    /*
     * Allocate the only or the last chunk beyond the highest chunk we have
     * in our cache. For the SCAN_ACTION_RELEASE case we simply ignore whatever
     * to-be-released byte range remains after the last chunk.
     */
allocate_only_chunk:
    if (remaining_length != 0) {
        if (action == scan_action::SCAN_ACTION_GET) {
            AZLogVerbose("(only/last chunk) [{},{})",
                         next_offset, next_offset + remaining_length);

    #ifdef UTILIZE_TAILROOM_FROM_LAST_MEMBUF
            if (last_bc && (last_bc->tailroom() > 0)) {
                chunk_length = std::min(last_bc->tailroom(), remaining_length);

                AZLogVerbose("(sharing last chunk's alloc_buffer) [{},{})",
                             next_offset, next_offset + chunk_length);

                /*
                 * Since this new chunk is sharing alloc_buffer with the last
                 * chunk, is_new must be false.
                 * Also it's not referring to the entire membuf, so is_whole
                 * must be false.
                 */
                chunkvec.emplace_back(this, next_offset,
                                      chunk_length,
                                      last_bc->buffer_offset + last_bc->length,
                                      last_bc->alloc_buffer,
                                      false /* is_whole */,
                                      false /* is_new */);

                /*
                 * last chunk and this new chunk are sharing the same
                 * alloc_buffer.
                 */
                assert(last_bc->alloc_buffer.use_count() >= 2);

                remaining_length -= chunk_length;
                next_offset += chunk_length;
            }
    #endif

            if (remaining_length) {
                AZLogVerbose("(new last chunk) [{},{})",
                             next_offset, next_offset + remaining_length);
                chunkvec.emplace_back(this, next_offset, remaining_length);
            }

            remaining_length = 0;
        } else {
            AZLogVerbose("<Release [{}, {})> (non-existent chunk after end) "
                         "[{},{})",
                         offset, offset + length,
                         next_offset, next_offset + remaining_length);
            remaining_length = 0;
        }
    }

    /*
     * Insert the new chunks in the end.
     * We cannot do this inside the for loop above as it'll change the chunkmap
     * while we are traversing it.
     */
    for (const auto& chunk : chunkvec) {

        /*
         * All the membufs that we return to the caller, we increment the
         * inuse count for each of them. Once the caller is done using those
         * (writing application data by writers and reading blob data into it
         * by readers) they must decrease the inuse count by clear_inuse().
         * This is done to make sure a membuf is skipped by clear() if it has
         * ongoing IOs.
         */
        if (action == scan_action::SCAN_ACTION_GET) {
            chunk.alloc_buffer->set_inuse();
        }

        if (chunk.is_new) {
            // New chunk is always a whole chunk.
            assert(chunk.is_whole);
            assert(chunk.alloc_buffer->allocated_buffer != nullptr);
            assert(chunk.alloc_buffer->buffer >=
                   chunk.alloc_buffer->allocated_buffer);
            assert(chunk.alloc_buffer->length > 0);
            assert(chunk.alloc_buffer->allocated_length >=
                   chunk.alloc_buffer->length);

#ifndef UTILIZE_TAILROOM_FROM_LAST_MEMBUF
            /*
             * Empty bytes_chunk should only correspond to full membufs, but
             * not if we use tailroom from previous chunks to provide space
             * for new chunks added at the end.
             */
            assert(chunk.maps_full_membuf());
            assert(chunk.buffer_offset == 0);
            assert(chunk.length == chunk.alloc_buffer->length);
#endif

            /*
             * Other than when we are adding cache chunks, we should never come
             * here for allocating new chunk buffer.
             */
            assert(action == scan_action::SCAN_ACTION_GET);

            AZLogVerbose("(adding to chunkmap) [{},{})",
                         chunk.offset, chunk.offset + chunk.length);
            /*
             * This will grab a ref on the alloc_buffer allocated when we
             * added the chunk to chunkvec. On returning from this function
             * chunkvec will be destroyed and it'll release its reference,
             * so the chunkmap reference will be the only reference left.
             */
#ifndef NDEBUG
            auto p = chunkmap.try_emplace(chunk.offset, chunk.bcc, chunk.offset,
                                          chunk.length, chunk.buffer_offset,
                                          chunk.alloc_buffer);
            assert(p.second == true);
#else
            chunkmap.try_emplace(chunk.offset, chunk.bcc, chunk.offset,
                                 chunk.length, chunk.buffer_offset,
                                 chunk.alloc_buffer);
#endif
            // One more chunk added to chunkmap.
            num_chunks++;
            num_chunks_g++;
            bytes_cached_g += chunk.length;
            bytes_cached += chunk.length;

            /*
             * New chunks are always included in the extent range.
             */
            if ((chunk.offset + chunk.length) > _extent_right) {
                _extent_right = (chunk.offset + chunk.length);
                AZLogVerbose("(tentative) _extent_right: {}", _extent_right);
            }
        }
    }

    /*
     * Delete chunks in the range [begin_delete, end_delete).
     */
    if (action == scan_action::SCAN_ACTION_RELEASE) {
        uint64_t bytes_released_full2 = 0;

        if (begin_delete != chunkmap.end()) {
            for (auto _it = begin_delete, next_it = _it;
                 _it != end_delete; _it = next_it) {
                ++next_it;
                bc = &(_it->second);
                /*
                 * Not all chunks from begin_delete to end_delete are
                 * guaranteed safe-to-delete, so check before deleting.
                 */
                if (bc->safe_to_release()) {
                    AZLogVerbose("<Release [{}, {})> (freeing chunk) [{},{}) "
                                 "b:{} a:{}",
                                 offset, offset + length,
                                 bc->offset, bc->offset + bc->length,
                                 bc->alloc_buffer->get() ?
                                      fmt::ptr(bc->get_buffer()) : nullptr,
                                 fmt::ptr(bc->alloc_buffer->get()));

                    assert(num_chunks > 0);
                    num_chunks--;
                    assert(num_chunks_g > 0);
                    num_chunks_g--;

                    assert(bytes_cached >= bc->length);
                    assert(bytes_cached_g >= bc->length);
                    bytes_cached -= bc->length;
                    bytes_cached_g -= bc->length;

                    bytes_released_full2 += bc->length;

                    chunkmap.erase(_it);
                }
            }
        }

        /*
         * Since we hold the chunkmap lock, a chunk which was earlier not
         * safe_to_release() can become safe_to_release() now, but not v.v.
         * This is because to become safe_to_release() it will need to clear
         * inuse/dirty/commit_pending, all of which can be done w/o the chunkmap
         * lock, while to become not safe_to_release() it must set
         * inuse/dirty/commit_pending flags all of which need the inuse flag to
         * be set, which need the chunkmap lock.
         */
        assert(bytes_released_full2 >= bytes_released_full1);
        if (bytes_released) {
            *bytes_released = bytes_released_trim + bytes_released_full2;
        }
    } else {
        assert((begin_delete == chunkmap.end()) &&
               (end_delete == chunkmap.end()));
    }

    if (find_extent) {
        /*
         * Set/update extent left edge.
         */
        if (lookback_it != chunkmap.end()) {
            do {
                bc = &(lookback_it->second);

                if ((_extent_left != AZNFSC_BAD_OFFSET) &&
                    ((bc->offset + bc->length) != _extent_left)) {
                    AZLogVerbose("(hit gap) _extent_left: {}, [{}, {})",
                                 _extent_left,
                                 bc->offset, (bc->offset + bc->length));
                    break;
                }

                if (!bc->needs_flush()) {
                    AZLogVerbose("(hit noflush) _extent_left: {}, [{}, {})",
                                 _extent_left,
                                 bc->offset, (bc->offset + bc->length));
                    break;
                }

                _extent_left = bc->offset;
                AZLogVerbose("_extent_left: {}", _extent_left);
            } while (lookback_it-- != chunkmap.begin());
        }

        /*
         * Set/update extent right edge.
         */
        for (; it != chunkmap.end(); ++it) {
            bc = &(it->second);

            if ((_extent_right != AZNFSC_BAD_OFFSET) &&
                (bc->offset != _extent_right)) {
                AZLogVerbose("(hit gap) _extent_right: {}, [{}, {})",
                             _extent_right,
                             bc->offset, (bc->offset + bc->length));
                break;
            }

            if (!bc->needs_flush()) {
                AZLogVerbose("(hit noflush) _extent_right: {}, [{}, {})",
                             _extent_right,
                             bc->offset, (bc->offset + bc->length));
                break;
            }

            _extent_right = bc->offset + bc->length;
            AZLogVerbose("_extent_right: {}", _extent_right);
        }

        *extent_left = _extent_left;
        *extent_right = _extent_right;
    }

end:
    return (action == scan_action::SCAN_ACTION_GET)
                ? chunkvec : std::vector<bytes_chunk>();
}