in runtime/src/main/scala/org/apache/pekko/grpc/internal/DecodeBase64.scala [39:85]
override def initialAttributes = Attributes.name("DecodeBase64")
final override val shape = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
private var buffer: ByteString = ByteString.empty
override def onPush(): Unit = {
buffer ++= grab(in)
val length = buffer.length
val decodeLength = length - length % 4
if (decodeLength > 0) {
val (decodeBytes, remaining) = buffer.splitAt(decodeLength)
push(out, decodeBytes.decodeBase64)
buffer = remaining
} else {
pull(in)
}
}
override def onUpstreamFinish(): Unit = {
if (buffer.nonEmpty) {
if (isAvailable(out)) {
push(out, buffer.decodeBase64)
buffer = ByteString.empty
}
} else {
completeStage()
}
}
override def onPull(): Unit = {
if (isClosed(in)) {
if (buffer.nonEmpty) {
push(out, buffer.decodeBase64)
buffer = ByteString.empty
} else {
completeStage()
}
} else pull(in)
}
setHandlers(in, out, this)
}