in util-core/src/main/scala/com/twitter/io/FutureReader.scala [23:61]
def read(): Future[Option[A]] = {
val (updatedState, result) = synchronized {
val result = state match {
case State.Idle =>
fa.transform {
case Throw(t) =>
state = State.Failed(t)
Future.exception(t)
case Return(v) =>
state = State.Read
Future.value(Some(v))
}
case State.Read =>
state = State.FullyRead
Future.None
case State.Failed(_) =>
// closep is guaranteed to be an exception, flatMap
// should never be triggered but return the exception
closep.flatMap(_ => Future.None)
case State.FullyRead =>
Future.None
case State.Discarded =>
Future.exception(new ReaderDiscardedException)
}
(state, result)
}
updatedState match {
case State.Failed(t) =>
// use `updateIfEmpty` instead of `update` here because we don't want to throw
// `ImmutableResult` exception when reading from a failed `FutureReader` multiple times
closep.updateIfEmpty(Throw(t))
case State.FullyRead =>
closep.updateIfEmpty(StreamTermination.FullyRead.Return)
case _ =>
}
result
}