in mxs-copy-components/src/main/scala/com/gu/multimedia/mxscopy/streamcomponents/MatrixStoreFileSink.scala [23:77]
override def shape: SinkShape[ByteString] = SinkShape.of(in)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Long]) = {
val completionPromise = Promise[Long]
val logic = new GraphStageLogic(shape) {
private val logger = LoggerFactory.getLogger(getClass)
private var channel:SeekableByteChannel = _
private var ctr:Long = 0
setHandler(in, new AbstractInHandler {
override def onPush(): Unit = {
val byteArr = grab(in).toArray
val buffer = ByteBuffer.allocate(byteArr.length)
buffer.put(byteArr)
ctr+=byteArr.length
buffer.flip()
channel.write(buffer)
pull(in)
}
override def onUpstreamFailure(ex: Throwable): Unit = {
logger.error(s"Copy source failed with error ${ex.getMessage}, aborting copy")
if(channel!=null) {
logger.debug("Closing write channel")
channel.close()
}
completionPromise.failure(ex)
}
})
override def preStart(): Unit = {
try {
logger.info(s"Requesting write to ${mxsFile.getId}...")
channel = mxsFile.newSeekableObjectChannel(Set(AccessOption.WRITE).asJava)
pull(in)
} catch {
case err:Throwable=>
logger.error(s"Could not set up MXS file to write to: ", err)
failStage(err)
}
}
override def postStop(): Unit = {
if(channel!=null) {
logger.debug("Closing write channel")
channel.close()
}
logger.debug(s"Wrote $ctr bytes")
completionPromise.success(ctr)
}
}
(logic, completionPromise.future)
}