in reactive/kotlinx-coroutines-reactive/src/Publish.kt [264:290]
override fun request(n: Long) {
if (n <= 0) {
// Specification requires to call onError with IAE for n <= 0
cancelCoroutine(IllegalArgumentException("non-positive subscription request $n"))
return
}
while (true) { // lock-free loop for nRequested
val cur = _nRequested.value
if (cur < 0) return // already closed for send, ignore requests, as mandated by the reactive streams spec
var upd = cur + n
if (upd < 0 || n == Long.MAX_VALUE)
upd = Long.MAX_VALUE
if (cur == upd) return // nothing to do
if (_nRequested.compareAndSet(cur, upd)) {
// unlock the mutex when we don't have back-pressure anymore
if (cur == 0L) {
/** In a sense, after a successful CAS, it is this invocation, not the coroutine itself, that owns
* the lock, given that `upd` is necessarily strictly positive. Thus, no other operation has the
* right to lower the value on [_nRequested], it can only grow or become [CLOSED]. Therefore, it is
* impossible for any other operations to assume that they own the lock without actually acquiring
* it. */
unlockAndCheckCompleted()
}
return
}
}
}