in stream/src/main/scala/org/apache/pekko/stream/impl/ActorRefSource.scala [43:167]
def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, ActorRef) =
throw new IllegalStateException("Not supported")
private[pekko] override def createLogicAndMaterializedValue(
inheritedAttributes: Attributes,
eagerMaterializer: Materializer): (GraphStageLogic, ActorRef) = {
val stage: GraphStageLogic with StageLogging with ActorRefStage = new GraphStageLogic(shape)
with StageLogging
with ActorRefStage {
override protected def logSource: Class[_] = classOf[ActorRefSource[_]]
private val buffer: OptionVal[Buffer[T]] =
if (maxBuffer != 0)
OptionVal(Buffer(maxBuffer, inheritedAttributes))
else {
OptionVal.None // for backwards compatibility with old actor publisher based implementation
}
private var isCompleting: Boolean = false
override protected def stageActorName: String = inheritedAttributes.nameForActorRef(super.stageActorName)
private val name = inheritedAttributes.nameOrDefault(getClass.toString)
override val ref: ActorRef = getEagerStageActor(eagerMaterializer) {
case (_, m) if failureMatcher.isDefinedAt(m) =>
failStage(failureMatcher(m))
case (_, m) if completionMatcher.isDefinedAt(m) =>
completionMatcher(m) match {
case CompletionStrategy.Draining =>
isCompleting = true
tryPush()
case CompletionStrategy.Immediately =>
completeStage()
}
case (_, m: T @unchecked) =>
buffer match {
case OptionVal.Some(buf) =>
if (isCompleting) {
log.warning(
"Dropping element because Status.Success received already, only draining already buffered elements: [{}] (pending: [{}] in stream [{}])",
m,
buf.used,
name)
} else if (!buf.isFull) {
buf.enqueue(m)
tryPush()
} else
overflowStrategy match {
case s: DropHead =>
log.log(
s.logLevel,
"Dropping the head element because buffer is full and overflowStrategy is: [DropHead] in stream [{}]",
name)
buf.dropHead()
buf.enqueue(m)
tryPush()
case s: DropTail =>
log.log(
s.logLevel,
"Dropping the tail element because buffer is full and overflowStrategy is: [DropTail] in stream [{}]",
name)
buf.dropTail()
buf.enqueue(m)
tryPush()
case s: DropBuffer =>
log.log(
s.logLevel,
"Dropping all the buffered elements because buffer is full and overflowStrategy is: [DropBuffer] in stream [{}]",
name)
buf.clear()
buf.enqueue(m)
tryPush()
case s: DropNew =>
log.log(
s.logLevel,
"Dropping the new element because buffer is full and overflowStrategy is: [DropNew] in stream [{}]",
name)
case s: Fail =>
log.log(
s.logLevel,
"Failing because buffer is full and overflowStrategy is: [Fail] in stream [{}]",
name)
val bufferOverflowException =
BufferOverflowException(s"Buffer overflow (max capacity was: $maxBuffer)!")
failStage(bufferOverflowException)
case _: Backpressure =>
// there is a precondition check in Source.actorRefSource factory method to not allow backpressure as strategy
failStage(new IllegalStateException("Backpressure is not supported"))
}
case _ =>
if (isCompleting) {
log.warning("Dropping element because Status.Success received already: [{}] in stream [{}]", m, name)
} else if (isAvailable(out)) {
push(out, m)
} else {
log.debug(
"Dropping element because there is no downstream demand and no buffer: [{}] in stream [{}]",
m,
name)
}
}
case _ => throw new IllegalArgumentException() // won't happen, compiler exhaustiveness check pleaser
}.ref
private def tryPush(): Unit = {
if (isAvailable(out) && buffer.isDefined && buffer.get.nonEmpty) {
val msg = buffer.get.dequeue()
push(out, msg)
}
if (isCompleting && (buffer.isEmpty || buffer.get.isEmpty)) {
completeStage()
}
}
setHandler(out,
new OutHandler {
override def onPull(): Unit = {
tryPush()
}
})
}
(stage, stage.ref)
}