override def createLogicAndMaterializedValue()

in remote/src/main/scala/org/apache/pekko/remote/artery/aeron/AeronSink.scala [116:255]


  override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = {
    val completed = Promise[Done]()
    val logic = new GraphStageLogic(shape) with InHandler {

      private var envelopeInFlight: EnvelopeBuffer = null
      private val pub = aeron.addPublication(channel, streamId)

      private var completedValue: Try[Done] = Success(Done)

      // spin between 2 to 20 depending on idleCpuLevel
      private val spinning = 2 * taskRunner.idleCpuLevel
      private var backoffCount = spinning
      private var lastMsgSize = 0
      private val offerTask = new OfferTask(
        pub,
        null,
        lastMsgSize,
        getAsyncCallback(_ => taskOnOfferSuccess()),
        giveUpAfter,
        getAsyncCallback(_ => onGiveUp()),
        getAsyncCallback(_ => onPublicationClosed()))
      private val addOfferTask: Add = Add(offerTask)

      private var offerTaskInProgress = false
      private var delegateTaskStartTime = 0L
      private var countBeforeDelegate = 0L

      override def preStart(): Unit = {
        setKeepGoing(true)
        pull(in)
        // TODO: Identify different sinks!
        flightRecorder.aeronSinkStarted(channel, streamId)
      }

      override def postStop(): Unit = {
        try {
          taskRunner.command(Remove(addOfferTask.task))
          flightRecorder.aeronSinkTaskRunnerRemoved(channel, streamId)
          pub.close()
          flightRecorder.aeronSinkPublicationClosed(channel, streamId)
        } finally {
          flightRecorder.aeronSinkStopped(channel, streamId)
          completed.complete(completedValue)
        }
      }

      // InHandler
      override def onPush(): Unit = {
        envelopeInFlight = grab(in)
        backoffCount = spinning
        lastMsgSize = envelopeInFlight.byteBuffer.limit
        flightRecorder.aeronSinkEnvelopeGrabbed(lastMsgSize)
        publish()
      }

      @tailrec private def publish(): Unit = {
        val result = pub.offer(envelopeInFlight.aeronBuffer, 0, lastMsgSize)
        if (result < 0) {
          if (result == Publication.CLOSED)
            onPublicationClosed()
          else if (result == Publication.NOT_CONNECTED)
            delegateBackoff()
          else {
            backoffCount -= 1
            if (backoffCount > 0) {
              ThreadHints.onSpinWait()
              publish() // recursive
            } else
              delegateBackoff()
          }
        } else {
          countBeforeDelegate += 1
          onOfferSuccess()
        }
      }

      private def delegateBackoff(): Unit = {
        // delegate backoff to shared TaskRunner
        offerTaskInProgress = true
        // visibility of these assignments are ensured by adding the task to the command queue
        offerTask.buffer = envelopeInFlight.aeronBuffer
        offerTask.msgSize = lastMsgSize
        delegateTaskStartTime = System.nanoTime()
        taskRunner.command(addOfferTask)
        flightRecorder.aeronSinkDelegateToTaskRunner(countBeforeDelegate)
      }

      private def taskOnOfferSuccess(): Unit = {
        countBeforeDelegate = 0
        // FIXME does calculation belong here or in impl?
        flightRecorder.aeronSinkReturnFromTaskRunner(System.nanoTime() - delegateTaskStartTime)
        onOfferSuccess()
      }

      private def onOfferSuccess(): Unit = {
        flightRecorder.aeronSinkEnvelopeOffered(lastMsgSize)
        offerTaskInProgress = false
        pool.release(envelopeInFlight)
        offerTask.buffer = null
        envelopeInFlight = null

        if (isClosed(in))
          completeStage()
        else
          pull(in)
      }

      private def onGiveUp(): Unit = {
        offerTaskInProgress = false
        val cause = new GaveUpMessageException(s"Gave up sending message to $channel after ${giveUpAfter.pretty}.")
        flightRecorder.aeronSinkGaveUpEnvelope(cause.getMessage)
        completedValue = Failure(cause)
        failStage(cause)
      }

      private def onPublicationClosed(): Unit = {
        offerTaskInProgress = false
        val cause = new PublicationClosedException(s"Aeron Publication to [$channel] was closed.")
        // this is not exepected, since we didn't close the publication ourselves
        flightRecorder.aeronSinkPublicationClosedUnexpectedly(channel, streamId)
        completedValue = Failure(cause)
        failStage(cause)
      }

      override def onUpstreamFinish(): Unit = {
        // flush outstanding offer before completing stage
        if (!offerTaskInProgress)
          super.onUpstreamFinish()
      }

      override def onUpstreamFailure(cause: Throwable): Unit = {
        completedValue = Failure(cause)
        super.onUpstreamFailure(cause)
      }

      setHandler(in, this)
    }

    (logic, completed.future)
  }