in util-core/src/main/scala/com/twitter/io/Pipe.scala [96:141]
def this() = this(Timer.Nil)
// thread-safety provided by synchronization on `this`
private[this] var state: State[A] = State.Idle
// satisfied when a `read` observes the EOF (after calling close())
private[this] val closep: Promise[StreamTermination] = new Promise[StreamTermination]()
def read(): Future[Option[A]] = {
val (waiter, result) = synchronized {
state match {
case State.Failed(exc) =>
(null, Future.exception(exc))
case State.Closed =>
(null, Future.None)
case State.Idle =>
val p = new Promise[Option[A]]
state = State.Reading(p)
(null, p)
case State.Writing(buf, p) =>
state = State.Idle
(p, Future.value(Some(buf)))
case State.Reading(_) =>
(null, Future.exception(new IllegalStateException("read() while read is pending")))
case State.Closing(buf, p) =>
val result = buf match {
case None =>
state = State.Closed
Future.None
case _ =>
state = State.Closing(None, null)
Future.value(buf)
}
(p, result)
}
}
if (waiter != null) waiter.setDone()
if (result eq Future.None) closep.updateIfEmpty(StreamTermination.FullyRead.Return)
result
}