reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt [16:113]: - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - public fun Scheduler.asCoroutineDispatcher(): CoroutineDispatcher = if (this is DispatcherScheduler) { dispatcher } else { SchedulerCoroutineDispatcher(this) } @Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.4.2, binary compatibility with earlier versions") @JvmName("asCoroutineDispatcher") public fun Scheduler.asCoroutineDispatcher0(): SchedulerCoroutineDispatcher = SchedulerCoroutineDispatcher(this) /** * Converts an instance of [CoroutineDispatcher] to an implementation of [Scheduler]. */ public fun CoroutineDispatcher.asScheduler(): Scheduler = if (this is SchedulerCoroutineDispatcher) { scheduler } else { DispatcherScheduler(this) } private class DispatcherScheduler(@JvmField val dispatcher: CoroutineDispatcher) : Scheduler() { private val schedulerJob = SupervisorJob() /** * The scope for everything happening in this [DispatcherScheduler]. * * Running tasks, too, get launched under this scope, because [shutdown] should cancel the running tasks as well. */ private val scope = CoroutineScope(schedulerJob + dispatcher) /** * The counter of created workers, for their pretty-printing. */ private val workerCounter = atomic(1L) override fun scheduleDirect(block: Runnable, delay: Long, unit: TimeUnit): Disposable = scope.scheduleTask(block, unit.toMillis(delay)) { task -> Runnable { scope.launch { task() } } } override fun createWorker(): Worker = DispatcherWorker(workerCounter.getAndIncrement(), dispatcher, schedulerJob) override fun shutdown() { schedulerJob.cancel() } private class DispatcherWorker( private val counter: Long, private val dispatcher: CoroutineDispatcher, parentJob: Job ) : Worker() { private val workerJob = SupervisorJob(parentJob) private val workerScope = CoroutineScope(workerJob + dispatcher) private val blockChannel = Channel Unit>(Channel.UNLIMITED) init { workerScope.launch { blockChannel.consumeEach { it() } } } override fun schedule(block: Runnable, delay: Long, unit: TimeUnit): Disposable = workerScope.scheduleTask(block, unit.toMillis(delay)) { task -> Runnable { blockChannel.trySend(task) } } override fun isDisposed(): Boolean = !workerScope.isActive override fun dispose() { blockChannel.close() workerJob.cancel() } override fun toString(): String = "$dispatcher (worker $counter, ${if (isDisposed) "disposed" else "active"})" } override fun toString(): String = dispatcher.toString() } private typealias Task = suspend () -> Unit /** * Schedule [block] so that an adapted version of it, wrapped in [adaptForScheduling], executes after [delayMillis] * milliseconds. */ private fun CoroutineScope.scheduleTask( block: Runnable, delayMillis: Long, adaptForScheduling: (Task) -> Runnable ): Disposable { val ctx = coroutineContext var handle: DisposableHandle? = null - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - reactive/kotlinx-coroutines-rx3/src/RxScheduler.kt [16:113]: - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - public fun Scheduler.asCoroutineDispatcher(): CoroutineDispatcher = if (this is DispatcherScheduler) { dispatcher } else { SchedulerCoroutineDispatcher(this) } @Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.4.2, binary compatibility with earlier versions") @JvmName("asCoroutineDispatcher") public fun Scheduler.asCoroutineDispatcher0(): SchedulerCoroutineDispatcher = SchedulerCoroutineDispatcher(this) /** * Converts an instance of [CoroutineDispatcher] to an implementation of [Scheduler]. */ public fun CoroutineDispatcher.asScheduler(): Scheduler = if (this is SchedulerCoroutineDispatcher) { scheduler } else { DispatcherScheduler(this) } private class DispatcherScheduler(@JvmField val dispatcher: CoroutineDispatcher) : Scheduler() { private val schedulerJob = SupervisorJob() /** * The scope for everything happening in this [DispatcherScheduler]. * * Running tasks, too, get launched under this scope, because [shutdown] should cancel the running tasks as well. */ private val scope = CoroutineScope(schedulerJob + dispatcher) /** * The counter of created workers, for their pretty-printing. */ private val workerCounter = atomic(1L) override fun scheduleDirect(block: Runnable, delay: Long, unit: TimeUnit): Disposable = scope.scheduleTask(block, unit.toMillis(delay)) { task -> Runnable { scope.launch { task() } } } override fun createWorker(): Worker = DispatcherWorker(workerCounter.getAndIncrement(), dispatcher, schedulerJob) override fun shutdown() { schedulerJob.cancel() } private class DispatcherWorker( private val counter: Long, private val dispatcher: CoroutineDispatcher, parentJob: Job ) : Worker() { private val workerJob = SupervisorJob(parentJob) private val workerScope = CoroutineScope(workerJob + dispatcher) private val blockChannel = Channel Unit>(Channel.UNLIMITED) init { workerScope.launch { blockChannel.consumeEach { it() } } } override fun schedule(block: Runnable, delay: Long, unit: TimeUnit): Disposable = workerScope.scheduleTask(block, unit.toMillis(delay)) { task -> Runnable { blockChannel.trySend(task) } } override fun isDisposed(): Boolean = !workerScope.isActive override fun dispose() { blockChannel.close() workerJob.cancel() } override fun toString(): String = "$dispatcher (worker $counter, ${if (isDisposed) "disposed" else "active"})" } override fun toString(): String = dispatcher.toString() } private typealias Task = suspend () -> Unit /** * Schedule [block] so that an adapted version of it, wrapped in [adaptForScheduling], executes after [delayMillis] * milliseconds. */ private fun CoroutineScope.scheduleTask( block: Runnable, delayMillis: Long, adaptForScheduling: (Task) -> Runnable ): Disposable { val ctx = coroutineContext var handle: DisposableHandle? = null - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -