def this()

in util-core/src/main/scala/com/twitter/io/Pipe.scala [96:141]


  def this() = this(Timer.Nil)

  // thread-safety provided by synchronization on `this`
  private[this] var state: State[A] = State.Idle

  // satisfied when a `read` observes the EOF (after calling close())
  private[this] val closep: Promise[StreamTermination] = new Promise[StreamTermination]()

  def read(): Future[Option[A]] = {
    val (waiter, result) = synchronized {
      state match {
        case State.Failed(exc) =>
          (null, Future.exception(exc))

        case State.Closed =>
          (null, Future.None)

        case State.Idle =>
          val p = new Promise[Option[A]]
          state = State.Reading(p)
          (null, p)

        case State.Writing(buf, p) =>
          state = State.Idle
          (p, Future.value(Some(buf)))

        case State.Reading(_) =>
          (null, Future.exception(new IllegalStateException("read() while read is pending")))

        case State.Closing(buf, p) =>
          val result = buf match {
            case None =>
              state = State.Closed
              Future.None
            case _ =>
              state = State.Closing(None, null)
              Future.value(buf)
          }
          (p, result)
      }
    }

    if (waiter != null) waiter.setDone()
    if (result eq Future.None) closep.updateIfEmpty(StreamTermination.FullyRead.Return)
    result
  }