in ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpIOGraphStage.scala [197:274]
def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
val matValuePromise = Promise[IOResult]()
val logic = new FtpGraphStageLogic[ByteString, FtpClient, S](shape, ftpLike, connectionSettings, ftpClient) {
private[this] var osOpt: Option[OutputStream] = None
private[this] var writtenBytesTotal: Long = 0L
setHandler(
in,
new InHandler {
override def onPush(): Unit =
try {
write(grab(in))
pull(in)
} catch {
case NonFatal(e) =>
failed = true
matFailure(e)
failStage(e)
}
override def onUpstreamFailure(exception: Throwable): Unit = {
matFailure(exception)
failed = true
super.onUpstreamFailure(exception)
}
}) // end of handler
override def postStop(): Unit =
try {
osOpt.foreach { os =>
try {
os.close()
graphStageFtpLike match {
case cfo: CommonFtpOperations =>
if (!cfo.completePendingCommand(handler.get.asInstanceOf[cfo.Handler]))
throw new IOException("File transfer failed.")
case _ =>
}
} catch {
case e: IOException =>
matFailure(e)
// If we failed, we have to expect the stream might already be dead
// so swallow the IOException
if (!failed) throw e
case NonFatal(e) =>
matFailure(e)
throw e
}
}
} finally {
super.postStop()
}
protected[this] def doPreStart(): Unit = {
osOpt = Some(graphStageFtpLike.storeFileOutputStream(path, handler.get, append).get)
pull(in)
}
protected[this] def matSuccess(): Boolean =
matValuePromise.trySuccess(IOResult.createSuccessful(writtenBytesTotal))
protected[this] def matFailure(t: Throwable): Boolean =
matValuePromise.tryFailure(new IOOperationIncompleteException(writtenBytesTotal, t))
/** BLOCKING I/O WRITE */
private[this] def write(bytes: ByteString) =
osOpt.foreach { os =>
os.write(bytes.toArray)
writtenBytesTotal += bytes.size
}
} // end of stage logic
(logic, matValuePromise.future)
}