in runtime/src/main/scala/org/apache/pekko/grpc/internal/CancellationBarrierGraphStage.scala [32:59]
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
setHandler(
in,
new InHandler {
override def onPush(): Unit = emit(out, grab(in))
})
setHandler(
out,
new OutHandler {
override def onPull(): Unit = pull(in)
override def onDownstreamFinish(cause: Throwable): Unit = {
if (!hasBeenPulled(in))
pull(in)
setHandler(
in,
new InHandler {
override def onPush(): Unit = {
grab(in)
pull(in)
}
})
}
})
}