override def onPull()

in app/streamcomponents/MatrixStoreFileSourceWithRanges.scala [77:102]


      override def onPull(): Unit = {
        logger.debug("source is pulled")
        getNextDownloadRange(bytesPtr) match {
          case Some((start,end))=>
            logger.debug(s"Next chunk is from byte $start to $end")
            val bufferSize:Int = (end-start).toInt+1  //+1 is needed, otherwise if we request bytes 0-23 we actually only get 0 to 22 because that is 23 bytes' worth.
            val buffer = ByteBuffer.allocate(bufferSize)  //should check if allocateDirect helps here

            bytesPtr=start
            channel.position(start)

            logger.debug(s"channel position is ${channel.position()}")
            val bytesRead = channel.read(buffer)
            logger.debug(s"Read $bytesRead bytes")
            if(bytesRead!=bufferSize){
              logger.error(s"Expected $bufferSize bytes but got $bytesRead")
            }
            bytesPtr += bytesRead
            buffer.flip()
            logger.debug(s"pushing to stream ${buffer.capacity()}, ${buffer}")
            push(out, ByteString(buffer))
          case None=>
            logger.info("Last range is uploaded")
            complete(out)
        }
      }