reactive/kotlinx-coroutines-rx2/src/RxConvert.kt [42:90]: - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - this@asMaybe.await() } /** * Converts this deferred value to the hot reactive single that signals either * [onSuccess][SingleObserver.onSuccess] or [onError][SingleObserver.onError]. * * Every subscriber gets the same completion value. * Unsubscribing from the resulting single **does not** affect the original deferred value in any way. * * **Note: This is an experimental api.** Conversion of coroutines primitives to reactive entities may change * in the future to account for the concept of structured concurrency. * * @param context -- the coroutine context from which the resulting single is going to be signalled */ public fun Deferred.asSingle(context: CoroutineContext): Single = rxSingle(context) { this@asSingle.await() } /** * Transforms given cold [ObservableSource] into cold [Flow]. * * The resulting flow is _cold_, which means that [ObservableSource.subscribe] is called every time a terminal operator * is applied to the resulting flow. * * A channel with the [default][Channel.BUFFERED] buffer size is used. Use the [buffer] operator on the * resulting flow to specify a user-defined value and to control what happens when data is produced faster * than consumed, i.e. to control the back-pressure behavior. Check [callbackFlow] for more details. */ public fun ObservableSource.asFlow(): Flow = callbackFlow { val disposableRef = AtomicReference() val observer = object : Observer { override fun onComplete() { close() } override fun onSubscribe(d: Disposable) { if (!disposableRef.compareAndSet(null, d)) d.dispose() } override fun onNext(t: T) { /* * Channel was closed by the downstream, so the exception (if any) * also was handled by the same downstream */ try { trySendBlocking(t) } catch (e: InterruptedException) { // RxJava interrupts the source } } override fun onError(e: Throwable) { close(e) } } subscribe(observer) - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - reactive/kotlinx-coroutines-rx3/src/RxConvert.kt [42:90]: - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - this@asMaybe.await() } /** * Converts this deferred value to the hot reactive single that signals either * [onSuccess][SingleObserver.onSuccess] or [onError][SingleObserver.onError]. * * Every subscriber gets the same completion value. * Unsubscribing from the resulting single **does not** affect the original deferred value in any way. * * **Note: This is an experimental api.** Conversion of coroutines primitives to reactive entities may change * in the future to account for the concept of structured concurrency. * * @param context -- the coroutine context from which the resulting single is going to be signalled */ public fun Deferred.asSingle(context: CoroutineContext): Single = rxSingle(context) { this@asSingle.await() } /** * Transforms given cold [ObservableSource] into cold [Flow]. * * The resulting flow is _cold_, which means that [ObservableSource.subscribe] is called every time a terminal operator * is applied to the resulting flow. * * A channel with the [default][Channel.BUFFERED] buffer size is used. Use the [buffer] operator on the * resulting flow to specify a user-defined value and to control what happens when data is produced faster * than consumed, i.e. to control the back-pressure behavior. Check [callbackFlow] for more details. */ public fun ObservableSource.asFlow(): Flow = callbackFlow { val disposableRef = AtomicReference() val observer = object : Observer { override fun onComplete() { close() } override fun onSubscribe(d: Disposable) { if (!disposableRef.compareAndSet(null, d)) d.dispose() } override fun onNext(t: T) { /* * Channel was closed by the downstream, so the exception (if any) * also was handled by the same downstream */ try { trySendBlocking(t) } catch (e: InterruptedException) { // RxJava interrupts the source } } override fun onError(e: Throwable) { close(e) } } subscribe(observer) - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -