in ftp/src/main/scala/org/apache/pekko/stream/connectors/ftp/impl/FtpIOGraphStage.scala [76:181]
def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
val matValuePromise = Promise[IOResult]()
val logic = new FtpGraphStageLogic[ByteString, FtpClient, S](shape, ftpLike, connectionSettings, ftpClient) {
private[this] var isOpt: Option[InputStream] = None
private[this] var readBytesTotal: Long = 0L
setHandler(
out,
new OutHandler {
def onPull(): Unit =
try {
readChunk() match {
case Some(bs) =>
push(out, bs)
case None =>
complete(out)
}
} catch {
case NonFatal(e) =>
failed = true
matFailure(e)
failStage(e)
}
}) // end of handler
override def postStop(): Unit =
try {
isOpt.foreach { os =>
try {
os.close()
graphStageFtpLike match {
case cfo: CommonFtpOperations =>
if (!cfo.completePendingCommand(handler.get.asInstanceOf[cfo.Handler]))
throw new IOException("File transfer failed.")
case _ =>
}
} catch {
case e: IOException =>
matFailure(e)
// If we failed, we have to expect the stream might already be dead
// so swallow the IOException
if (!failed) throw e
case NonFatal(e) =>
matFailure(e)
throw e
}
}
} finally {
super.postStop()
}
protected[this] def doPreStart(): Unit =
isOpt = graphStageFtpLike match {
case ur: UnconfirmedReads =>
withUnconfirmedReads(ur)
case ro: RetrieveOffset =>
Some(ro.retrieveFileInputStream(path, handler.get.asInstanceOf[ro.Handler], offset).get)
case _ =>
Some(graphStageFtpLike.retrieveFileInputStream(path, handler.get).get)
}
private def withUnconfirmedReads(
ftpLikeWithUnconfirmedReads: FtpLike[FtpClient, S] with UnconfirmedReads): Option[InputStream] =
connectionSettings match {
case s: SftpSettings =>
Some(
ftpLikeWithUnconfirmedReads
.retrieveFileInputStream(path,
handler.get.asInstanceOf[ftpLikeWithUnconfirmedReads.Handler],
offset,
s.maxUnconfirmedReads)
.get)
case _ => None
}
protected[this] def matSuccess(): Boolean =
matValuePromise.trySuccess(IOResult.createSuccessful(readBytesTotal))
protected[this] def matFailure(t: Throwable): Boolean =
matValuePromise.tryFailure(new IOOperationIncompleteException(readBytesTotal, t))
/** BLOCKING I/O READ */
private[this] def readChunk() = {
def read(arr: Array[Byte]) =
isOpt.flatMap { is =>
val readBytes = is.read(arr)
if (readBytes > -1) Some(readBytes)
else None
}
val arr = Array.ofDim[Byte](chunkSize)
read(arr).map { readBytes =>
readBytesTotal += readBytes
if (readBytes == chunkSize)
ByteString1C(arr)
else
ByteString1C(arr).take(readBytes)
}
}
} // end of stage logic
(logic, matValuePromise.future)
}