in stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FlatMapPrefix.scala [38:203]
override def initialAttributes: Attributes = DefaultAttributes.flatMapPrefix and SourceLocation.forLambda(f)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = {
val propagateToNestedMaterialization =
inheritedAttributes
.mandatoryAttribute[Attributes.NestedMaterializationCancellationPolicy]
.propagateToNestedMaterialization
val matPromise = Promise[M]()
object FlatMapPrefixLogic extends GraphStageLogic(shape) with InHandler with OutHandler {
private var left = n
private var builder = Vector.newBuilder[In]
builder.sizeHint(left)
private var subSource = OptionVal.none[SubSourceOutlet[In]]
private var subSink = OptionVal.none[SubSinkInlet[Out]]
private var downstreamCause = OptionVal.none[Throwable]
setHandlers(in, out, this)
override def postStop(): Unit = {
// this covers the case when the nested flow was never materialized
if (!matPromise.isCompleted) {
matPromise.failure(new AbruptStageTerminationException(this))
}
super.postStop()
}
override def onPush(): Unit = {
subSource match {
case OptionVal.Some(s) => s.push(grab(in))
case _ =>
builder += grab(in)
left -= 1
if (left == 0) {
materializeFlow()
} else {
// give me some more!
pull(in)
}
}
}
override def onUpstreamFinish(): Unit = {
subSource match {
case OptionVal.Some(s) => s.complete()
case _ => materializeFlow()
}
}
override def onUpstreamFailure(ex: Throwable): Unit = {
subSource match {
case OptionVal.Some(s) => s.fail(ex)
case _ =>
// flow won't be materialized, so we have to complete the future with a failure indicating this
matPromise.failure(new NeverMaterializedException(ex))
super.onUpstreamFailure(ex)
}
}
override def onPull(): Unit = {
subSink match {
case OptionVal.Some(s) =>
// delegate to subSink
s.pull()
case _ =>
if (left > 0) pull(in)
else if (left == 0) {
// corner case for n = 0, can be handled in FlowOps
materializeFlow()
} else {
throw new IllegalStateException(s"Unexpected accumulated size, left : $left (n: $n)")
}
}
}
override def onDownstreamFinish(cause: Throwable): Unit =
subSink match {
case OptionVal.Some(s) => s.cancel(cause)
case _ =>
if (propagateToNestedMaterialization) {
downstreamCause = OptionVal.Some(cause)
if (left == 0) {
// corner case for n = 0, can be handled in FlowOps
materializeFlow()
} else if (!hasBeenPulled(in)) { // if in was already closed, nested flow would have already been materialized
pull(in)
}
} else {
matPromise.failure(new NeverMaterializedException(cause))
cancelStage(cause)
}
}
def materializeFlow(): Unit =
try {
val prefix = builder.result()
builder = null // free for GC
subSource = OptionVal.Some(new SubSourceOutlet[In]("FlatMapPrefix.subSource"))
val theSubSource = subSource.get
subSink = OptionVal.Some(new SubSinkInlet[Out]("FlatMapPrefix.subSink"))
val theSubSink = subSink.get
val handler = new InHandler with OutHandler {
override def onPush(): Unit = {
push(out, theSubSink.grab())
}
override def onUpstreamFinish(): Unit = {
complete(out)
}
override def onUpstreamFailure(ex: Throwable): Unit = {
fail(out, ex)
}
override def onPull(): Unit = {
if (!isClosed(in) && !hasBeenPulled(in)) {
pull(in)
}
}
override def onDownstreamFinish(cause: Throwable): Unit = {
if (!isClosed(in)) {
cancel(in, cause)
}
}
}
theSubSource.setHandler(handler)
theSubSink.setHandler(handler)
val matVal =
try {
val flow = f(prefix)
val runnableGraph = Source.fromGraph(theSubSource.source).viaMat(flow)(Keep.right).to(theSubSink.sink)
interpreter.subFusingMaterializer.materialize(runnableGraph, inheritedAttributes)
} catch {
case NonFatal(ex) =>
matPromise.failure(new NeverMaterializedException(ex))
subSource = OptionVal.None
subSink = OptionVal.None
throw ex
}
matPromise.success(matVal)
// in case downstream was closed
downstreamCause match {
case OptionVal.Some(ex) => theSubSink.cancel(ex)
case _ =>
}
// in case we've materialized due to upstream completion
if (isClosed(in)) {
theSubSource.complete()
}
// in case we've been pulled by downstream
if (isAvailable(out)) {
theSubSink.pull()
}
} catch {
case NonFatal(ex) => failStage(ex)
}
}
(FlatMapPrefixLogic, matPromise.future)
}