in kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt [368:406]
fun shutdown(timeout: Long) {
// atomically set termination flag which is checked when workers are added or removed
if (!_isTerminated.compareAndSet(false, true)) return
// make sure we are not waiting for the current thread
val currentWorker = currentWorker()
// Capture # of created workers that cannot change anymore (mind the synchronized block!)
val created = synchronized(workers) { createdWorkers }
// Shutdown all workers with the only exception of the current thread
for (i in 1..created) {
val worker = workers[i]!!
if (worker !== currentWorker) {
// Note: this is java.lang.Thread.getState() of type java.lang.Thread.State
while (worker.getState() != Thread.State.TERMINATED) {
LockSupport.unpark(worker)
worker.join(timeout)
}
// Note: this is CoroutineScheduler.Worker.state of type CoroutineScheduler.WorkerState
assert { worker.state === WorkerState.TERMINATED } // Expected TERMINATED state
worker.localQueue.offloadAllWorkTo(globalBlockingQueue) // Doesn't actually matter which queue to use
}
}
// Make sure no more work is added to GlobalQueue from anywhere
globalBlockingQueue.close()
globalCpuQueue.close()
// Finish processing tasks from globalQueue and/or from this worker's local queue
while (true) {
val task = currentWorker?.findTask(true)
?: globalCpuQueue.removeFirstOrNull()
?: globalBlockingQueue.removeFirstOrNull()
?: break
runSafely(task)
}
// Shutdown current thread
currentWorker?.tryReleaseCpu(WorkerState.TERMINATED)
// check & cleanup state
assert { availableCpuPermits == corePoolSize }
parkedWorkersStack.value = 0L
controlState.value = 0L
}