override fun onNext()

in reactive/kotlinx-coroutines-reactive/src/Await.kt [213:256]


        override fun onNext(t: T) {
            val sub = subscription.let {
                if (it == null) {
                    /** Enforce rule 1.9: expect [Subscriber.onSubscribe] before any other signals. */
                    handleCoroutineException(cont.context,
                        IllegalStateException("'onNext' was called before 'onSubscribe'"))
                    return
                } else {
                    it
                }
            }
            if (inTerminalState) {
                gotSignalInTerminalStateException(cont.context, "onNext")
                return
            }
            when (mode) {
                Mode.FIRST, Mode.FIRST_OR_DEFAULT -> {
                    if (seenValue) {
                        moreThanOneValueProvidedException(cont.context, mode)
                        return
                    }
                    seenValue = true
                    withSubscriptionLock {
                        sub.cancel()
                    }
                    cont.resume(t)
                }
                Mode.LAST, Mode.SINGLE, Mode.SINGLE_OR_DEFAULT -> {
                    if ((mode == Mode.SINGLE || mode == Mode.SINGLE_OR_DEFAULT) && seenValue) {
                        withSubscriptionLock {
                            sub.cancel()
                        }
                        /* the check for `cont.isActive` is needed in case `sub.cancel() above calls `onComplete` or
                         `onError` on its own. */
                        if (cont.isActive) {
                            cont.resumeWithException(IllegalArgumentException("More than one onNext value for $mode"))
                        }
                    } else {
                        value = t
                        seenValue = true
                    }
                }
            }
        }