in util-core/src/main/scala/com/twitter/util/FuturePool.scala [152:208]
def apply[T](f: => T): Future[T] = {
val saved = Local.save()
val tracker = saved.resourceTracker
val applyF =
if (tracker eq None) () => f
else ResourceTracker.wrapAndMeasureUsage[T](f, tracker.get)
if (forkingScheduler != null) forkingScheduler.fork(Future.value(applyF()))
else {
val runOk = new AtomicBoolean(true)
val p = new Promise[T]
val task = new Runnable {
def run(): Unit = {
// Make an effort to skip work in the case the promise
// has been cancelled or already defined.
if (!runOk.compareAndSet(true, false))
return
val current = Local.save()
if (current ne saved) Local.restore(saved)
try p.updateIfEmpty(Try(applyF()))
catch {
case nlrc: NonLocalReturnControl[_] =>
val fnlrc = new FutureNonLocalReturnControl(nlrc)
p.updateIfEmpty(Throw(fnlrc))
throw fnlrc
case e: Throwable =>
p.updateIfEmpty(Throw(new ExecutionException(e)))
throw e
} finally Local.restore(current)
}
}
// This is safe: the only thing that can call task.run() is
// executor, the only thing that can raise an interrupt is the
// receiver of this value, which will then be fully initialized.
val javaFuture =
try executor.submit(task)
catch {
case e: RejectedExecutionException =>
runOk.set(false)
p.setException(e)
null
}
p.setInterruptHandler {
case cause =>
if (interruptible || runOk.compareAndSet(true, false)) {
if (p.updateIfEmpty(Throw(cause)))
javaFuture.cancel(true)
}
}
p
}
}