in kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt [829:876]
protected fun dropFirstElementUntilTheSpecifiedCellIsInTheBuffer(globalCellIndex: Long) {
assert { isConflatedDropOldest }
// Read the segment reference before the counter increment;
// it is crucial to be able to find the required segment later.
var segment = receiveSegment.value
while (true) {
// Read the receivers counter to check whether the specified cell is already in the buffer
// or should be moved to the buffer in a short time, due to the already started `receive()`.
val r = this.receivers.value
if (globalCellIndex < max(r + capacity, bufferEndCounter)) return
// The cell is outside the buffer. Try to extract the first element
// if the `receivers` counter has not been changed.
if (!this.receivers.compareAndSet(r, r + 1)) continue
// Count the required segment id and the cell index in it.
val id = r / SEGMENT_SIZE
val i = (r % SEGMENT_SIZE).toInt()
// Try to find the required segment if the initially obtained
// segment (in the beginning of this function) has lower id.
if (segment.id != id) {
// Find the required segment, restarting the operation if it has not been found.
segment = findSegmentReceive(id, segment) ?:
// The required segment has not been found. It is possible that the channel is already
// closed for receiving, so the linked list of segments is closed as well.
// In the latter case, the operation will finish eventually after incrementing
// the `receivers` counter sufficient times. Note that it is impossible to check
// whether this channel is closed for receiving (we do this in `receive`),
// as it may call this function when helping to complete closing the channel.
continue
}
// Update the cell according to the cell life-cycle.
val updCellResult = updateCellReceive(segment, i, r, null)
when {
updCellResult === FAILED -> {
// The cell is poisoned; restart from the beginning.
// To avoid memory leaks, we also need to reset
// the `prev` pointer of the working segment.
if (r < sendersCounter) segment.cleanPrev()
}
else -> { // element
// A buffered element was retrieved from the cell.
// Clean the reference to the previous segment.
segment.cleanPrev()
@Suppress("UNCHECKED_CAST")
onUndeliveredElement?.callUndeliveredElementCatchingException(unwrapTyped(updCellResult))?.let { throw it }
}
}
}
}