in stream/src/main/scala/org/apache/pekko/stream/impl/Sinks.scala [528:660]
override def initialAttributes = DefaultAttributes.lazySink and SourceLocation.forLambda(sinkFactory)
override val shape: SinkShape[T] = SinkShape.of(in)
override def toString: String = "LazySink"
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = {
val promise = Promise[M]()
val stageLogic = new GraphStageLogic(shape) with InHandler {
var switching = false
override def preStart(): Unit = pull(in)
override def onPush(): Unit = {
val element = grab(in)
switching = true
val cb: AsyncCallback[Try[Sink[T, M]]] =
getAsyncCallback {
case Success(sink) =>
// check if the stage is still in need for the lazy sink
// (there could have been an onUpstreamFailure in the meantime that has completed the promise)
if (!promise.isCompleted) {
try {
val mat = switchTo(sink, element)
promise.success(mat)
setKeepGoing(true)
} catch {
case NonFatal(e) =>
promise.failure(e)
failStage(e)
}
}
case Failure(e) =>
promise.failure(e)
failStage(e)
}
try {
sinkFactory(element).onComplete(cb.invoke)(ExecutionContexts.parasitic)
} catch {
case NonFatal(e) =>
promise.failure(e)
failStage(e)
}
}
override def onUpstreamFinish(): Unit = {
// ignore onUpstreamFinish while the stage is switching but setKeepGoing
//
if (switching) {
// there is a cached element -> the stage must not be shut down automatically because isClosed(in) is satisfied
setKeepGoing(true)
} else {
promise.failure(new NeverMaterializedException)
super.onUpstreamFinish()
}
}
override def onUpstreamFailure(ex: Throwable): Unit = {
promise.failure(ex)
super.onUpstreamFailure(ex)
}
setHandler(in, this)
private def switchTo(sink: Sink[T, M], firstElement: T): M = {
var firstElementPushed = false
val subOutlet = new SubSourceOutlet[T]("LazySink")
val matVal = interpreter.subFusingMaterializer
.materialize(Source.fromGraph(subOutlet.source).toMat(sink)(Keep.right), inheritedAttributes)
def maybeCompleteStage(): Unit = {
if (isClosed(in) && subOutlet.isClosed) {
completeStage()
}
}
// The stage must not be shut down automatically; it is completed when maybeCompleteStage decides
setKeepGoing(true)
setHandler(
in,
new InHandler {
override def onPush(): Unit = {
subOutlet.push(grab(in))
}
override def onUpstreamFinish(): Unit = {
if (firstElementPushed) {
subOutlet.complete()
maybeCompleteStage()
}
}
override def onUpstreamFailure(ex: Throwable): Unit = {
// propagate exception irrespective if the cached element has been pushed or not
subOutlet.fail(ex)
// #25410 if we fail the stage here directly, the SubSource may not have been started yet,
// which can happen if upstream fails immediately after emitting a first value.
// The SubSource won't be started until the stream shuts down, which means downstream won't see the failure,
// scheduling it lets the interpreter first start the substream
getAsyncCallback[Throwable](failStage).invoke(ex)
}
})
subOutlet.setHandler(new OutHandler {
override def onPull(): Unit = {
if (firstElementPushed) {
pull(in)
} else {
// the demand can be satisfied right away by the cached element
firstElementPushed = true
subOutlet.push(firstElement)
// in.onUpstreamFinished was not propagated if it arrived before the cached element was pushed
// -> check if the completion must be propagated now
if (isClosed(in)) {
subOutlet.complete()
maybeCompleteStage()
}
}
}
override def onDownstreamFinish(cause: Throwable): Unit = {
if (!isClosed(in)) cancel(in, cause)
maybeCompleteStage()
}
})
matVal
}
}
(stageLogic, promise.future)
}