in mxs-copy-components/src/main/scala/com/gu/multimedia/mxscopy/streamcomponents/MMappedFileSource.scala [22:87]
override def shape: SourceShape[ByteString] = SourceShape.of(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
private val logger = LoggerFactory.getLogger(getClass)
logger.info(s"Starting up")
private var stream:FileInputStream = _
private var ctr:Long = 0
private var mapChunks:IndexedSeq[MappedByteBuffer] = IndexedSeq()
setHandler(out, new AbstractOutHandler {
override def onPull(): Unit = {
if (ctr >= file.length()) {
logger.info(s"Finished streaming ${file.getAbsolutePath}")
complete(out)
} else {
Try {
val chunkIndex = Math.floor(ctr.toDouble / mapSize.toDouble).toInt
val chunkPtr = (ctr - (chunkIndex * mapSize)).toInt
logger.debug(s"chunkIndex is $chunkIndex, chunkPtr is $chunkPtr with ctr $ctr and mapSize $mapSize")
try {
val bytes: Array[Byte] = new Array[Byte](pageSize.toInt)
mapChunks(chunkIndex).get(bytes, 0, pageSize)
bytes
} catch {
case ex:BufferUnderflowException=>
val lastPageSize = mapChunks(chunkIndex).remaining()
val bytes: Array[Byte] = new Array[Byte](lastPageSize)
mapChunks(chunkIndex).get(bytes, 0, lastPageSize)
bytes
}
} match {
case Success(bytes) =>
logger.debug(s"Pushing $pageSize bytes from $ctr...")
ctr += pageSize
push(out, ByteString(bytes))
case Failure(err) =>
logger.error(s"Could not read bytes from mmap: ", err)
failStage(err)
}
}
}
})
override def preStart(): Unit = {
logger.info(s"Opening ${file.getAbsolutePath}, chunk size is $pageSize...")
try {
stream = new FileInputStream(file)
val mapChunkCount = Math.ceil(file.length().toDouble / mapSize.toDouble).toInt
val lastChunkSize = file.length() - ((mapChunkCount-1) * mapSize)
logger.debug(s"mapChunkCount is $mapChunkCount, lastChunkSize is $lastChunkSize bytes")
mapChunks = for(i <- 0 until mapChunkCount) yield stream.getChannel.map(FileChannel.MapMode.READ_ONLY, i*mapSize, if(i!=mapChunkCount-1) mapSize else lastChunkSize)
} catch {
case err:Throwable=>
logger.error(s"Could not open ${file.getAbsolutePath}", err)
failStage(err)
}
}
override def postStop(): Unit = {
stream.close()
}
}