def createLogic()

in stream/src/main/scala/org/apache/pekko/stream/impl/UnfoldResourceSourceAsync.scala [44:150]


  def createLogic(inheritedAttributes: Attributes) =
    new GraphStageLogic(shape) with OutHandler {
      private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
      private implicit def ec: ExecutionContext = materializer.executionContext
      private var maybeResource: OptionVal[S] = OptionVal.none

      private val createdCallback = getAsyncCallback[Try[S]] {
        case Success(resource) =>
          require(resource != null, "`create` method should not return a null resource.")
          maybeResource = OptionVal(resource)
          if (isAvailable(out)) onPull()
        case Failure(t) => failStage(t)
      }.invokeWithFeedback _

      private val errorHandler: PartialFunction[Throwable, Unit] = {
        case NonFatal(ex) =>
          decider(ex) match {
            case Supervision.Stop =>
              failStage(ex)
            case Supervision.Restart =>
              try {
                restartResource()
              } catch {
                case NonFatal(ex) => failStage(ex)
              }
            case Supervision.Resume => onPull()
          }
      }

      private val readCallback = getAsyncCallback[Try[Option[T]]](handle).invoke _

      private def handle(result: Try[Option[T]]): Unit = result match {
        case Success(data) =>
          data match {
            case Some(d) => push(out, d)
            case None    =>
              // end of resource reached, lets close it
              maybeResource match {
                case OptionVal.Some(resource) =>
                  close(resource).onComplete(getAsyncCallback[Try[Done]] {
                    case Success(Done) => completeStage()
                    case Failure(ex)   => failStage(ex)
                  }.invoke)(parasitic)
                  maybeResource = OptionVal.none

                case _ =>
                  // cannot happen, but for good measure
                  throw new IllegalStateException("Reached end of data but there is no open resource")
              }
          }
        case Failure(t) => errorHandler(t)
      }

      override def preStart(): Unit = createResource()

      override def onPull(): Unit = maybeResource match {
        case OptionVal.Some(resource) =>
          try {
            val future = readData(resource)
            future.value match {
              case Some(value) => handle(value)
              case None        => future.onComplete(readCallback)(parasitic)
            }
          } catch errorHandler

        case OptionVal.None =>
        // we got a pull but there is no open resource, we are either
        // currently creating/restarting then the read will be triggered when creating the
        // resource completes, or shutting down and then the pull does not matter anyway
      }

      override def postStop(): Unit = maybeResource match {
        case OptionVal.Some(resource) => close(resource)
        case _                        => // do nothing
      }

      private def restartResource(): Unit = maybeResource match {
        case OptionVal.Some(resource) =>
          // wait for the resource to close before restarting
          close(resource).onComplete(getAsyncCallback[Try[Done]] {
            case Success(Done) => createResource()
            case Failure(ex)   => failStage(ex)
          }.invoke)(parasitic)
          maybeResource = OptionVal.none

        case _ => createResource()
      }

      private def createResource(): Unit = {
        create().onComplete { resource =>
          createdCallback(resource).failed.foreach {
            case _: StreamDetachedException =>
              // stream stopped before created callback could be invoked, we need
              // to close the resource if it is was opened, to not leak it
              resource match {
                case Success(r) =>
                  close(r)
                case Failure(ex) =>
                  throw ex // failed to open but stream is stopped already
              }
            case _ => // we don't care here
          }
        }(parasitic)
      }

      setHandler(out, this)
    }