in reactive/kotlinx-coroutines-reactive/src/Await.kt [213:256]
override fun onNext(t: T) {
val sub = subscription.let {
if (it == null) {
/** Enforce rule 1.9: expect [Subscriber.onSubscribe] before any other signals. */
handleCoroutineException(cont.context,
IllegalStateException("'onNext' was called before 'onSubscribe'"))
return
} else {
it
}
}
if (inTerminalState) {
gotSignalInTerminalStateException(cont.context, "onNext")
return
}
when (mode) {
Mode.FIRST, Mode.FIRST_OR_DEFAULT -> {
if (seenValue) {
moreThanOneValueProvidedException(cont.context, mode)
return
}
seenValue = true
withSubscriptionLock {
sub.cancel()
}
cont.resume(t)
}
Mode.LAST, Mode.SINGLE, Mode.SINGLE_OR_DEFAULT -> {
if ((mode == Mode.SINGLE || mode == Mode.SINGLE_OR_DEFAULT) && seenValue) {
withSubscriptionLock {
sub.cancel()
}
/* the check for `cont.isActive` is needed in case `sub.cancel() above calls `onComplete` or
`onError` on its own. */
if (cont.isActive) {
cont.resumeWithException(IllegalArgumentException("More than one onNext value for $mode"))
}
} else {
value = t
seenValue = true
}
}
}
}