in reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt [253:268]
override fun request(n: Long) {
if (n <= 0) return
val old = requested.getAndUpdate { value ->
val newValue = value + n
if (newValue <= 0L) Long.MAX_VALUE else newValue
}
if (old <= 0L) {
assert(old == 0L)
// Emitter is not started yet or has suspended -- spin on race with suspendCancellableCoroutine
while (true) {
val producer = producer.getAndSet(null) ?: continue // spin if not set yet
producer.resume(Unit)
break
}
}
}