def apply[T]()

in util-core/src/main/scala/com/twitter/util/FuturePool.scala [152:208]


  def apply[T](f: => T): Future[T] = {
    val saved = Local.save()
    val tracker = saved.resourceTracker
    val applyF =
      if (tracker eq None) () => f
      else ResourceTracker.wrapAndMeasureUsage[T](f, tracker.get)

    if (forkingScheduler != null) forkingScheduler.fork(Future.value(applyF()))
    else {
      val runOk = new AtomicBoolean(true)
      val p = new Promise[T]
      val task = new Runnable {
        def run(): Unit = {
          // Make an effort to skip work in the case the promise
          // has been cancelled or already defined.
          if (!runOk.compareAndSet(true, false))
            return

          val current = Local.save()
          if (current ne saved) Local.restore(saved)

          try p.updateIfEmpty(Try(applyF()))
          catch {
            case nlrc: NonLocalReturnControl[_] =>
              val fnlrc = new FutureNonLocalReturnControl(nlrc)
              p.updateIfEmpty(Throw(fnlrc))
              throw fnlrc
            case e: Throwable =>
              p.updateIfEmpty(Throw(new ExecutionException(e)))
              throw e
          } finally Local.restore(current)
        }
      }

      // This is safe: the only thing that can call task.run() is
      // executor, the only thing that can raise an interrupt is the
      // receiver of this value, which will then be fully initialized.
      val javaFuture =
        try executor.submit(task)
        catch {
          case e: RejectedExecutionException =>
            runOk.set(false)
            p.setException(e)
            null
        }

      p.setInterruptHandler {
        case cause =>
          if (interruptible || runOk.compareAndSet(true, false)) {
            if (p.updateIfEmpty(Throw(cause)))
              javaFuture.cancel(true)
          }
      }

      p
    }
  }