in app/helpers/ByteBufferSource.scala [27:58]
override def shape: SourceShape[ByteString] = SourceShape.of(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
private val logger = LoggerFactory.getLogger(getClass)
private var ctr:Int = 0
setHandler(out, new AbstractOutHandler {
override def onPull(): Unit = {
logger.debug(s"Buffer capacity is ${buffer.capacity()}, current position is $ctr")
val bytes:Array[Byte] = if(ctr+readSize<buffer.capacity()) {
logger.debug(s"Getting string of length $readSize")
val xtracted = new Array[Byte](readSize)
buffer.get(xtracted, ctr, readSize)
ctr+=readSize
xtracted
} else {
logger.debug(s"Last chunk - getting string of ${buffer.capacity() - ctr}")
val xtracted = new Array[Byte](buffer.capacity()-ctr)
buffer.get(xtracted,ctr,buffer.capacity()-ctr)
ctr+=buffer.capacity()-ctr
xtracted
}
push(out, ByteString(bytes))
if(ctr>=buffer.capacity()){
logger.debug(s"ctr=$ctr, capacity=${buffer.capacity()}, completing stream")
complete(out)
}
}
})
}