in kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt [785:821]
override fun tryReceive(): ChannelResult<E> {
// Read the `receivers` counter first.
val r = receivers.value
val sendersAndCloseStatusCur = sendersAndCloseStatus.value
// Is this channel closed for receive?
if (sendersAndCloseStatusCur.isClosedForReceive0) {
return closed(closeCause)
}
// Do not try to receive an element if the plain `receive()` operation would suspend.
val s = sendersAndCloseStatusCur.sendersCounter
if (r >= s) return failure()
// Let's try to retrieve an element!
// The logic is similar to the plain `receive()` operation, with
// the only difference that we store `INTERRUPTED_RCV` in case
// the operation decides to suspend. This way, we can leverage
// the unconditional `Fetch-and-Add` instruction.
// One may consider storing `INTERRUPTED_RCV` instead of an actual waiter
// on suspension (a.k.a. "no elements to retrieve") as a short-cut of
// "suspending and cancelling immediately".
return receiveImpl( // <-- this is an inline function
// Store an already interrupted receiver in case of suspension.
waiter = INTERRUPTED_RCV,
// Finish when an element is successfully retrieved.
onElementRetrieved = { element -> success(unwrapTyped(element)) },
// On suspension, the `INTERRUPTED_RCV` token has been
// installed, and this `tryReceive()` must fail.
onSuspend = { segm, _, globalIndex ->
// Emulate "cancelled" receive, thus invoking 'waitExpandBufferCompletion' manually,
// because effectively there were no cancellation
waitExpandBufferCompletion(globalIndex)
segm.onSlotCleaned()
failure()
},
// If the channel is closed, return the corresponding result.
onClosed = { closed(closeCause) }
)
}