in common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBAsyncStreamSink.scala [39:115]
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[IOResult]) = {
val ioResultPromise = Promise[IOResult]()
val logic = new GraphStageLogic(shape) with InHandler {
handler =>
var buffers: Iterator[ByteBuffer] = Iterator()
var writeCallback: AsyncCallback[Try[Int]] = _
var closeCallback: AsyncCallback[Try[Completed]] = _
var position: Int = _
var writeDone = Promise[Completed]
setHandler(in, this)
override def preStart(): Unit = {
//close operation is async and thus requires the stage to remain open
//even after all data is read
setKeepGoing(true)
writeCallback = getAsyncCallback[Try[Int]](handleWriteResult)
closeCallback = getAsyncCallback[Try[Completed]](handleClose)
pull(in)
}
override def onPush(): Unit = {
buffers = grab(in).asByteBuffers.iterator
writeDone = Promise[Completed]
writeNextBufferOrPull()
}
override def onUpstreamFinish(): Unit = {
//Work done perform close
//Using async "blessed" callback does not work at this stage so
// need to invoke as normal callback
//TODO Revisit this
//write of ByteBuffers from ByteString is an async operation. For last push
//the write operation may involve multiple async callbacks and by that time
//onUpstreamFinish may get invoked. So to ensure that close operation is performed
//"after" the last push writes are done we rely on writeDone promise
//and schedule the close on its completion
writeDone.future.onComplete(_ => stream.close().head().onComplete(handleClose))
}
override def onUpstreamFailure(ex: Throwable): Unit = {
fail(ex)
}
private def handleWriteResult(bytesWrittenOrFailure: Try[Int]): Unit = bytesWrittenOrFailure match {
case Success(bytesWritten) =>
position += bytesWritten
writeNextBufferOrPull()
case Failure(failure) => fail(failure)
}
private def handleClose(completed: Try[Completed]): Unit = completed match {
case Success(Completed()) =>
completeStage()
ioResultPromise.trySuccess(IOResult(position, Success(Done)))
case Failure(failure) =>
fail(failure)
}
private def writeNextBufferOrPull(): Unit = {
if (buffers.hasNext) {
stream.write(buffers.next()).head().onComplete(writeCallback.invoke)
} else {
writeDone.trySuccess(Completed())
pull(in)
}
}
private def fail(failure: Throwable) = {
failStage(failure)
ioResultPromise.trySuccess(IOResult(position, Failure(failure)))
}
}
(logic, ioResultPromise.future)
}