app/helpers/ByteBufferSource.scala (45 lines of code) (raw):
package helpers
import java.nio.ByteBuffer
import akka.stream.scaladsl.GraphDSL
import akka.stream.{Attributes, ClosedShape, Outlet, SourceShape}
import akka.stream.stage.{AbstractOutHandler, GraphStage, GraphStageLogic}
import akka.util.ByteString
import org.slf4j.LoggerFactory
object ByteBufferSource {
import akka.stream.scaladsl.GraphDSL._
def apply(buffer: ByteBuffer, readSize: Int) = GraphDSL.create() { implicit builder=>
val src = builder.add(new ByteBufferSource(buffer, readSize))
SourceShape(src.out)
}
}
/**
* implements a source that emits elements from a ByteBuffer as a ByteString.
* the builtin functions appear not to do this in a simple way
*/
class ByteBufferSource (buffer:ByteBuffer, readSize:Int) extends GraphStage[SourceShape[ByteString]] {
private final val out:Outlet[ByteString] = Outlet.create("ByteBufferSource.out")
override def shape: SourceShape[ByteString] = SourceShape.of(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
private val logger = LoggerFactory.getLogger(getClass)
private var ctr:Int = 0
setHandler(out, new AbstractOutHandler {
override def onPull(): Unit = {
logger.debug(s"Buffer capacity is ${buffer.capacity()}, current position is $ctr")
val bytes:Array[Byte] = if(ctr+readSize<buffer.capacity()) {
logger.debug(s"Getting string of length $readSize")
val xtracted = new Array[Byte](readSize)
buffer.get(xtracted, ctr, readSize)
ctr+=readSize
xtracted
} else {
logger.debug(s"Last chunk - getting string of ${buffer.capacity() - ctr}")
val xtracted = new Array[Byte](buffer.capacity()-ctr)
buffer.get(xtracted,ctr,buffer.capacity()-ctr)
ctr+=buffer.capacity()-ctr
xtracted
}
push(out, ByteString(bytes))
if(ctr>=buffer.capacity()){
logger.debug(s"ctr=$ctr, capacity=${buffer.capacity()}, completing stream")
complete(out)
}
}
})
}
}