in remote/src/main/scala/org/apache/pekko/remote/artery/aeron/AeronSink.scala [116:255]
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = {
val completed = Promise[Done]()
val logic = new GraphStageLogic(shape) with InHandler {
private var envelopeInFlight: EnvelopeBuffer = null
private val pub = aeron.addPublication(channel, streamId)
private var completedValue: Try[Done] = Success(Done)
// spin between 2 to 20 depending on idleCpuLevel
private val spinning = 2 * taskRunner.idleCpuLevel
private var backoffCount = spinning
private var lastMsgSize = 0
private val offerTask = new OfferTask(
pub,
null,
lastMsgSize,
getAsyncCallback(_ => taskOnOfferSuccess()),
giveUpAfter,
getAsyncCallback(_ => onGiveUp()),
getAsyncCallback(_ => onPublicationClosed()))
private val addOfferTask: Add = Add(offerTask)
private var offerTaskInProgress = false
private var delegateTaskStartTime = 0L
private var countBeforeDelegate = 0L
override def preStart(): Unit = {
setKeepGoing(true)
pull(in)
// TODO: Identify different sinks!
flightRecorder.aeronSinkStarted(channel, streamId)
}
override def postStop(): Unit = {
try {
taskRunner.command(Remove(addOfferTask.task))
flightRecorder.aeronSinkTaskRunnerRemoved(channel, streamId)
pub.close()
flightRecorder.aeronSinkPublicationClosed(channel, streamId)
} finally {
flightRecorder.aeronSinkStopped(channel, streamId)
completed.complete(completedValue)
}
}
// InHandler
override def onPush(): Unit = {
envelopeInFlight = grab(in)
backoffCount = spinning
lastMsgSize = envelopeInFlight.byteBuffer.limit
flightRecorder.aeronSinkEnvelopeGrabbed(lastMsgSize)
publish()
}
@tailrec private def publish(): Unit = {
val result = pub.offer(envelopeInFlight.aeronBuffer, 0, lastMsgSize)
if (result < 0) {
if (result == Publication.CLOSED)
onPublicationClosed()
else if (result == Publication.NOT_CONNECTED)
delegateBackoff()
else {
backoffCount -= 1
if (backoffCount > 0) {
ThreadHints.onSpinWait()
publish() // recursive
} else
delegateBackoff()
}
} else {
countBeforeDelegate += 1
onOfferSuccess()
}
}
private def delegateBackoff(): Unit = {
// delegate backoff to shared TaskRunner
offerTaskInProgress = true
// visibility of these assignments are ensured by adding the task to the command queue
offerTask.buffer = envelopeInFlight.aeronBuffer
offerTask.msgSize = lastMsgSize
delegateTaskStartTime = System.nanoTime()
taskRunner.command(addOfferTask)
flightRecorder.aeronSinkDelegateToTaskRunner(countBeforeDelegate)
}
private def taskOnOfferSuccess(): Unit = {
countBeforeDelegate = 0
// FIXME does calculation belong here or in impl?
flightRecorder.aeronSinkReturnFromTaskRunner(System.nanoTime() - delegateTaskStartTime)
onOfferSuccess()
}
private def onOfferSuccess(): Unit = {
flightRecorder.aeronSinkEnvelopeOffered(lastMsgSize)
offerTaskInProgress = false
pool.release(envelopeInFlight)
offerTask.buffer = null
envelopeInFlight = null
if (isClosed(in))
completeStage()
else
pull(in)
}
private def onGiveUp(): Unit = {
offerTaskInProgress = false
val cause = new GaveUpMessageException(s"Gave up sending message to $channel after ${giveUpAfter.pretty}.")
flightRecorder.aeronSinkGaveUpEnvelope(cause.getMessage)
completedValue = Failure(cause)
failStage(cause)
}
private def onPublicationClosed(): Unit = {
offerTaskInProgress = false
val cause = new PublicationClosedException(s"Aeron Publication to [$channel] was closed.")
// this is not exepected, since we didn't close the publication ourselves
flightRecorder.aeronSinkPublicationClosedUnexpectedly(channel, streamId)
completedValue = Failure(cause)
failStage(cause)
}
override def onUpstreamFinish(): Unit = {
// flush outstanding offer before completing stage
if (!offerTaskInProgress)
super.onUpstreamFinish()
}
override def onUpstreamFailure(cause: Throwable): Unit = {
completedValue = Failure(cause)
super.onUpstreamFailure(cause)
}
setHandler(in, this)
}
(logic, completed.future)
}