def createLogicAndMaterializedValue()

in ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpIOGraphStage.scala [76:181]


  def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {

    val matValuePromise = Promise[IOResult]()

    val logic = new FtpGraphStageLogic[ByteString, FtpClient, S](shape, ftpLike, connectionSettings, ftpClient) {

      private[this] var isOpt: Option[InputStream] = None
      private[this] var readBytesTotal: Long = 0L

      setHandler(
        out,
        new OutHandler {
          def onPull(): Unit =
            try {
              readChunk() match {
                case Some(bs) =>
                  push(out, bs)
                case None =>
                  complete(out)
              }
            } catch {
              case NonFatal(e) =>
                failed = true
                matFailure(e)
                failStage(e)
            }
        }) // end of handler

      override def postStop(): Unit =
        try {
          isOpt.foreach { os =>
            try {
              os.close()
              graphStageFtpLike match {
                case cfo: CommonFtpOperations =>
                  if (!cfo.completePendingCommand(handler.get.asInstanceOf[cfo.Handler]))
                    throw new IOException("File transfer failed.")
                case _ =>
              }
            } catch {
              case e: IOException =>
                matFailure(e)
                // If we failed, we have to expect the stream might already be dead
                // so swallow the IOException
                if (!failed) throw e
              case NonFatal(e) =>
                matFailure(e)
                throw e
            }
          }
        } finally {
          super.postStop()
        }

      protected[this] def doPreStart(): Unit =
        isOpt = graphStageFtpLike match {
          case ur: UnconfirmedReads =>
            withUnconfirmedReads(ur)
          case ro: RetrieveOffset =>
            Some(ro.retrieveFileInputStream(path, handler.get.asInstanceOf[ro.Handler], offset).get)
          case _ =>
            Some(graphStageFtpLike.retrieveFileInputStream(path, handler.get).get)
        }

      private def withUnconfirmedReads(
          ftpLikeWithUnconfirmedReads: FtpLike[FtpClient, S] with UnconfirmedReads): Option[InputStream] =
        connectionSettings match {
          case s: SftpSettings =>
            Some(
              ftpLikeWithUnconfirmedReads
                .retrieveFileInputStream(path,
                  handler.get.asInstanceOf[ftpLikeWithUnconfirmedReads.Handler],
                  offset,
                  s.maxUnconfirmedReads)
                .get)
          case _ => None
        }

      protected[this] def matSuccess(): Boolean =
        matValuePromise.trySuccess(IOResult.createSuccessful(readBytesTotal))

      protected[this] def matFailure(t: Throwable): Boolean =
        matValuePromise.tryFailure(new IOOperationIncompleteException(readBytesTotal, t))

      /** BLOCKING I/O READ */
      private[this] def readChunk() = {
        def read(arr: Array[Byte]) =
          isOpt.flatMap { is =>
            val readBytes = is.read(arr)
            if (readBytes > -1) Some(readBytes)
            else None
          }
        val arr = Array.ofDim[Byte](chunkSize)
        read(arr).map { readBytes =>
          readBytesTotal += readBytes
          if (readBytes == chunkSize)
            ByteString1C(arr)
          else
            ByteString1C(arr).take(readBytes)
        }
      }

    } // end of stage logic

    (logic, matValuePromise.future)
  }