override def initialAttributes = DefaultAttributes.lazySink and SourceLocation.forLambda()

in stream/src/main/scala/org/apache/pekko/stream/impl/Sinks.scala [528:660]


  override def initialAttributes = DefaultAttributes.lazySink and SourceLocation.forLambda(sinkFactory)
  override val shape: SinkShape[T] = SinkShape.of(in)

  override def toString: String = "LazySink"

  override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = {

    val promise = Promise[M]()
    val stageLogic = new GraphStageLogic(shape) with InHandler {
      var switching = false
      override def preStart(): Unit = pull(in)

      override def onPush(): Unit = {
        val element = grab(in)
        switching = true
        val cb: AsyncCallback[Try[Sink[T, M]]] =
          getAsyncCallback {
            case Success(sink) =>
              // check if the stage is still in need for the lazy sink
              // (there could have been an onUpstreamFailure in the meantime that has completed the promise)
              if (!promise.isCompleted) {
                try {
                  val mat = switchTo(sink, element)
                  promise.success(mat)
                  setKeepGoing(true)
                } catch {
                  case NonFatal(e) =>
                    promise.failure(e)
                    failStage(e)
                }
              }
            case Failure(e) =>
              promise.failure(e)
              failStage(e)
          }
        try {
          sinkFactory(element).onComplete(cb.invoke)(ExecutionContexts.parasitic)
        } catch {
          case NonFatal(e) =>
            promise.failure(e)
            failStage(e)
        }
      }

      override def onUpstreamFinish(): Unit = {
        // ignore onUpstreamFinish while the stage is switching but setKeepGoing
        //
        if (switching) {
          // there is a cached element -> the stage must not be shut down automatically because isClosed(in) is satisfied
          setKeepGoing(true)
        } else {
          promise.failure(new NeverMaterializedException)
          super.onUpstreamFinish()
        }
      }

      override def onUpstreamFailure(ex: Throwable): Unit = {
        promise.failure(ex)
        super.onUpstreamFailure(ex)
      }

      setHandler(in, this)

      private def switchTo(sink: Sink[T, M], firstElement: T): M = {

        var firstElementPushed = false

        val subOutlet = new SubSourceOutlet[T]("LazySink")

        val matVal = interpreter.subFusingMaterializer
          .materialize(Source.fromGraph(subOutlet.source).toMat(sink)(Keep.right), inheritedAttributes)

        def maybeCompleteStage(): Unit = {
          if (isClosed(in) && subOutlet.isClosed) {
            completeStage()
          }
        }

        // The stage must not be shut down automatically; it is completed when maybeCompleteStage decides
        setKeepGoing(true)

        setHandler(
          in,
          new InHandler {
            override def onPush(): Unit = {
              subOutlet.push(grab(in))
            }
            override def onUpstreamFinish(): Unit = {
              if (firstElementPushed) {
                subOutlet.complete()
                maybeCompleteStage()
              }
            }
            override def onUpstreamFailure(ex: Throwable): Unit = {
              // propagate exception irrespective if the cached element has been pushed or not
              subOutlet.fail(ex)
              // #25410 if we fail the stage here directly, the SubSource may not have been started yet,
              // which can happen if upstream fails immediately after emitting a first value.
              // The SubSource won't be started until the stream shuts down, which means downstream won't see the failure,
              // scheduling it lets the interpreter first start the substream
              getAsyncCallback[Throwable](failStage).invoke(ex)
            }
          })

        subOutlet.setHandler(new OutHandler {
          override def onPull(): Unit = {
            if (firstElementPushed) {
              pull(in)
            } else {
              // the demand can be satisfied right away by the cached element
              firstElementPushed = true
              subOutlet.push(firstElement)
              // in.onUpstreamFinished was not propagated if it arrived before the cached element was pushed
              // -> check if the completion must be propagated now
              if (isClosed(in)) {
                subOutlet.complete()
                maybeCompleteStage()
              }
            }
          }

          override def onDownstreamFinish(cause: Throwable): Unit = {
            if (!isClosed(in)) cancel(in, cause)
            maybeCompleteStage()
          }
        })

        matVal
      }

    }
    (stageLogic, promise.future)
  }