def createLogicAndMaterializedValue()

in ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpIOGraphStage.scala [197:274]


  def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {

    val matValuePromise = Promise[IOResult]()

    val logic = new FtpGraphStageLogic[ByteString, FtpClient, S](shape, ftpLike, connectionSettings, ftpClient) {

      private[this] var osOpt: Option[OutputStream] = None
      private[this] var writtenBytesTotal: Long = 0L

      setHandler(
        in,
        new InHandler {
          override def onPush(): Unit =
            try {
              write(grab(in))
              pull(in)
            } catch {
              case NonFatal(e) =>
                failed = true
                matFailure(e)
                failStage(e)
            }

          override def onUpstreamFailure(exception: Throwable): Unit = {
            matFailure(exception)
            failed = true
            super.onUpstreamFailure(exception)
          }
        }) // end of handler

      override def postStop(): Unit =
        try {
          osOpt.foreach { os =>
            try {
              os.close()
              graphStageFtpLike match {
                case cfo: CommonFtpOperations =>
                  if (!cfo.completePendingCommand(handler.get.asInstanceOf[cfo.Handler]))
                    throw new IOException("File transfer failed.")
                case _ =>
              }
            } catch {
              case e: IOException =>
                matFailure(e)
                // If we failed, we have to expect the stream might already be dead
                // so swallow the IOException
                if (!failed) throw e
              case NonFatal(e) =>
                matFailure(e)
                throw e
            }
          }
        } finally {
          super.postStop()
        }

      protected[this] def doPreStart(): Unit = {
        osOpt = Some(graphStageFtpLike.storeFileOutputStream(path, handler.get, append).get)
        pull(in)
      }

      protected[this] def matSuccess(): Boolean =
        matValuePromise.trySuccess(IOResult.createSuccessful(writtenBytesTotal))

      protected[this] def matFailure(t: Throwable): Boolean =
        matValuePromise.tryFailure(new IOOperationIncompleteException(writtenBytesTotal, t))

      /** BLOCKING I/O WRITE */
      private[this] def write(bytes: ByteString) =
        osOpt.foreach { os =>
          os.write(bytes.toArray)
          writtenBytesTotal += bytes.size
        }

    } // end of stage logic

    (logic, matValuePromise.future)
  }