reactive/kotlinx-coroutines-rx2/src/RxConvert.kt [103:136]: - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - public fun Flow.asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable = Observable.create { emitter -> /* * ATOMIC is used here to provide stable behaviour of subscribe+dispose pair even if * asObservable is already invoked from unconfined */ val job = GlobalScope.launch(Dispatchers.Unconfined + context, start = CoroutineStart.ATOMIC) { try { collect { value -> emitter.onNext(value) } emitter.onComplete() } catch (e: Throwable) { // 'create' provides safe emitter, so we can unconditionally call on* here if exception occurs in `onComplete` if (e !is CancellationException) { if (!emitter.tryOnError(e)) { handleUndeliverableException(e, coroutineContext) } } else { emitter.onComplete() } } } emitter.setCancellable(RxCancellable(job)) } /** * Converts the given flow to a cold flowable. * The original flow is cancelled when the flowable subscriber is disposed. * * An optional [context] can be specified to control the execution context of calls to [Subscriber] methods. * You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher * is used, so calls are performed from an arbitrary thread. */ public fun Flow.asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable = Flowable.fromPublisher(asPublisher(context)) - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - reactive/kotlinx-coroutines-rx3/src/RxConvert.kt [103:136]: - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - public fun Flow.asObservable(context: CoroutineContext = EmptyCoroutineContext) : Observable = Observable.create { emitter -> /* * ATOMIC is used here to provide stable behaviour of subscribe+dispose pair even if * asObservable is already invoked from unconfined */ val job = GlobalScope.launch(Dispatchers.Unconfined + context, start = CoroutineStart.ATOMIC) { try { collect { value -> emitter.onNext(value) } emitter.onComplete() } catch (e: Throwable) { // 'create' provides safe emitter, so we can unconditionally call on* here if exception occurs in `onComplete` if (e !is CancellationException) { if (!emitter.tryOnError(e)) { handleUndeliverableException(e, coroutineContext) } } else { emitter.onComplete() } } } emitter.setCancellable(RxCancellable(job)) } /** * Converts the given flow to a cold flowable. * The original flow is cancelled when the flowable subscriber is disposed. * * An optional [context] can be specified to control the execution context of calls to [Subscriber] methods. * You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher * is used, so calls are performed from an arbitrary thread. */ public fun Flow.asFlowable(context: CoroutineContext = EmptyCoroutineContext): Flowable = Flowable.fromPublisher(asPublisher(context)) - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -