override def createLogicAndMaterializedValue()

in common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBAsyncStreamSink.scala [39:115]


  override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[IOResult]) = {
    val ioResultPromise = Promise[IOResult]()
    val logic = new GraphStageLogic(shape) with InHandler {
      handler =>
      var buffers: Iterator[ByteBuffer] = Iterator()
      var writeCallback: AsyncCallback[Try[Int]] = _
      var closeCallback: AsyncCallback[Try[Completed]] = _
      var position: Int = _
      var writeDone = Promise[Completed]

      setHandler(in, this)

      override def preStart(): Unit = {
        //close operation is async and thus requires the stage to remain open
        //even after all data is read
        setKeepGoing(true)
        writeCallback = getAsyncCallback[Try[Int]](handleWriteResult)
        closeCallback = getAsyncCallback[Try[Completed]](handleClose)
        pull(in)
      }

      override def onPush(): Unit = {
        buffers = grab(in).asByteBuffers.iterator
        writeDone = Promise[Completed]
        writeNextBufferOrPull()
      }

      override def onUpstreamFinish(): Unit = {
        //Work done perform close
        //Using async "blessed" callback does not work at this stage so
        // need to invoke as normal callback
        //TODO Revisit this

        //write of ByteBuffers from ByteString is an async operation. For last push
        //the write operation may involve multiple async callbacks and by that time
        //onUpstreamFinish may get invoked. So to ensure that close operation is performed
        //"after" the last push writes are done we rely on writeDone promise
        //and schedule the close on its completion
        writeDone.future.onComplete(_ => stream.close().head().onComplete(handleClose))
      }

      override def onUpstreamFailure(ex: Throwable): Unit = {
        fail(ex)
      }

      private def handleWriteResult(bytesWrittenOrFailure: Try[Int]): Unit = bytesWrittenOrFailure match {
        case Success(bytesWritten) =>
          position += bytesWritten
          writeNextBufferOrPull()
        case Failure(failure) => fail(failure)
      }

      private def handleClose(completed: Try[Completed]): Unit = completed match {
        case Success(Completed()) =>
          completeStage()
          ioResultPromise.trySuccess(IOResult(position, Success(Done)))
        case Failure(failure) =>
          fail(failure)
      }

      private def writeNextBufferOrPull(): Unit = {
        if (buffers.hasNext) {
          stream.write(buffers.next()).head().onComplete(writeCallback.invoke)
        } else {
          writeDone.trySuccess(Completed())
          pull(in)
        }
      }

      private def fail(failure: Throwable) = {
        failStage(failure)
        ioResultPromise.trySuccess(IOResult(position, Failure(failure)))
      }

    }
    (logic, ioResultPromise.future)
  }