override fun request()

in reactive/kotlinx-coroutines-reactive/src/Publish.kt [264:290]


    override fun request(n: Long) {
        if (n <= 0) {
            // Specification requires to call onError with IAE for n <= 0
            cancelCoroutine(IllegalArgumentException("non-positive subscription request $n"))
            return
        }
        while (true) { // lock-free loop for nRequested
            val cur = _nRequested.value
            if (cur < 0) return // already closed for send, ignore requests, as mandated by the reactive streams spec
            var upd = cur + n
            if (upd < 0 || n == Long.MAX_VALUE)
                upd = Long.MAX_VALUE
            if (cur == upd) return // nothing to do
            if (_nRequested.compareAndSet(cur, upd)) {
                // unlock the mutex when we don't have back-pressure anymore
                if (cur == 0L) {
                    /** In a sense, after a successful CAS, it is this invocation, not the coroutine itself, that owns
                     * the lock, given that `upd` is necessarily strictly positive. Thus, no other operation has the
                     * right to lower the value on [_nRequested], it can only grow or become [CLOSED]. Therefore, it is
                     * impossible for any other operations to assume that they own the lock without actually acquiring
                     * it. */
                    unlockAndCheckCompleted()
                }
                return
            }
        }
    }