in app/streamcomponents/MatrixStoreFileSourceWithRanges.scala [77:102]
override def onPull(): Unit = {
logger.debug("source is pulled")
getNextDownloadRange(bytesPtr) match {
case Some((start,end))=>
logger.debug(s"Next chunk is from byte $start to $end")
val bufferSize:Int = (end-start).toInt+1 //+1 is needed, otherwise if we request bytes 0-23 we actually only get 0 to 22 because that is 23 bytes' worth.
val buffer = ByteBuffer.allocate(bufferSize) //should check if allocateDirect helps here
bytesPtr=start
channel.position(start)
logger.debug(s"channel position is ${channel.position()}")
val bytesRead = channel.read(buffer)
logger.debug(s"Read $bytesRead bytes")
if(bytesRead!=bufferSize){
logger.error(s"Expected $bufferSize bytes but got $bytesRead")
}
bytesPtr += bytesRead
buffer.flip()
logger.debug(s"pushing to stream ${buffer.capacity()}, ${buffer}")
push(out, ByteString(buffer))
case None=>
logger.info("Last range is uploaded")
complete(out)
}
}