in mxs-copy-components/src/main/scala/com/gu/multimedia/mxscopy/helpers/Copier.scala [161:253]
def doCopyTo(vault:Vault, destFileName:Option[String], fromFile:File, chunkSize:Int, checksumType:String, keepOnFailure:Boolean=false,retryOnFailure:Boolean=true)(implicit ec:ExecutionContext,mat:Materializer):Future[(String,Option[String])] = {
val checksumSinkFactory = checksumType match {
case "none"=>Sink.ignore.mapMaterializedValue(_=>Future(None))
case _=>new ChecksumSink(checksumType).async
}
val metadata = MatrixStoreHelper.metadataFromFilesystem(fromFile)
if(metadata.isFailure){
logger.error(s"Could no lookup metadata")
Future.failed(metadata.failed.get) //since the stream future fails on error, might as well do the same here.
} else {
try {
val mdToWrite = destFileName match {
case None =>
logger.info(s"MXFS_PATH (from filesystem) is ${fromFile.getAbsolutePath}")
metadata.get
.withString("MXFS_PATH",fromFile.getAbsolutePath)
.withString("MXFS_FILENAME", fromFile.getName)
.withString("MXFS_FILENAME_UPPER", fromFile.getName.toUpperCase)
case Some(fn) =>
val p = Paths.get(fn)
val filenameOnly = p.getFileName.toString
logger.info(s"MXFS_PATH (modified) is $fn")
metadata.get
.withString("MXFS_PATH", fn)
.withString("MXFS_FILENAME", filenameOnly)
.withString("MXFS_FILENAME_UPPER", filenameOnly.toUpperCase)
}
val timestampStart = Instant.now.toEpochMilli
val mxsFile = vault.createObject(mdToWrite.toAttributes.toArray)
logger.debug(s"mxsFile is $mxsFile")
val graph = GraphDSL.createGraph(checksumSinkFactory) { implicit builder =>
checksumSink =>
import akka.stream.scaladsl.GraphDSL.Implicits._
//val src = builder.add(new MMappedFileSource(fromFile, chunkSize))
val src = builder.add(FileIO.fromPath(fromFile.toPath))
val bcast = builder.add(new Broadcast[ByteString](2, true))
val omSink = builder.add(new MatrixStoreFileSink(mxsFile).async)
src.out.log("copyToStream") ~> bcast ~> omSink
bcast.out(1) ~> checksumSink
ClosedShape
}
logger.debug(s"Created stream")
RunnableGraph.fromGraph(graph).run().flatMap(finalChecksum=>{
val timestampFinish = Instant.now.toEpochMilli
val msDuration = timestampFinish - timestampStart
val rate = fromFile.length().toDouble / msDuration.toDouble //in bytes/ms
val mbps = rate /1048576 *1000 //in MByte/s
logger.info(s"Stream completed, transferred ${fromFile.length} bytes in $msDuration millisec, at a rate of $mbps mByte/s. Final checksum is $finalChecksum")
finalChecksum match {
case Some(actualChecksum)=>
val updatedMetadata = metadata.get.copy(stringValues = metadata.get.stringValues ++ Map(checksumType->actualChecksum))
MetadataHelper.setAttributeMetadata(mxsFile, updatedMetadata)
MatrixStoreHelper.getOMFileMd5(mxsFile).flatMap({
case Failure(err)=>
logger.error(s"Unable to get checksum from appliance, file should be considered unsafe", err)
Future.failed(err)
case Success(remoteChecksum)=>
logger.info(s"Appliance reported checksum of $remoteChecksum")
if(remoteChecksum!=actualChecksum){
logger.error(s"Checksum did not match!")
if(!keepOnFailure) {
logger.info(s"Deleting invalid file ${mxsFile.getId}")
mxsFile.deleteForcefully()
}
if(retryOnFailure){
Thread.sleep(500)
doCopyTo(vault, destFileName, fromFile, chunkSize, checksumType, keepOnFailure, retryOnFailure)
} else {
Future.failed(new RuntimeException(s"Checksum did not match"))
}
} else {
Future((mxsFile.getId, finalChecksum))
}
})
case _=>
Future((mxsFile.getId, finalChecksum))
}
})
} catch {
case err:Throwable=>
logger.error(s"Could not prepare copy: ", err)
Future.failed(err)
}
}
}