def doCopyTo()

in mxs-copy-components/src/main/scala/com/gu/multimedia/mxscopy/helpers/Copier.scala [161:253]


  def doCopyTo(vault:Vault, destFileName:Option[String], fromFile:File, chunkSize:Int, checksumType:String, keepOnFailure:Boolean=false,retryOnFailure:Boolean=true)(implicit ec:ExecutionContext,mat:Materializer):Future[(String,Option[String])] = {
    val checksumSinkFactory = checksumType match {
      case "none"=>Sink.ignore.mapMaterializedValue(_=>Future(None))
      case _=>new ChecksumSink(checksumType).async
    }
    val metadata = MatrixStoreHelper.metadataFromFilesystem(fromFile)

    if(metadata.isFailure){
      logger.error(s"Could no lookup metadata")
      Future.failed(metadata.failed.get) //since the stream future fails on error, might as well do the same here.
    } else {
      try {
        val mdToWrite = destFileName match {
          case None =>
            logger.info(s"MXFS_PATH (from filesystem) is ${fromFile.getAbsolutePath}")
            metadata.get
              .withString("MXFS_PATH",fromFile.getAbsolutePath)
              .withString("MXFS_FILENAME", fromFile.getName)
              .withString("MXFS_FILENAME_UPPER", fromFile.getName.toUpperCase)
          case Some(fn) =>
            val p = Paths.get(fn)
            val filenameOnly = p.getFileName.toString
            logger.info(s"MXFS_PATH (modified) is $fn")
            metadata.get
              .withString("MXFS_PATH", fn)
              .withString("MXFS_FILENAME", filenameOnly)
              .withString("MXFS_FILENAME_UPPER", filenameOnly.toUpperCase)
        }
        val timestampStart = Instant.now.toEpochMilli

        val mxsFile = vault.createObject(mdToWrite.toAttributes.toArray)

        logger.debug(s"mxsFile is $mxsFile")
        val graph = GraphDSL.createGraph(checksumSinkFactory) { implicit builder =>
          checksumSink =>
            import akka.stream.scaladsl.GraphDSL.Implicits._

            //val src = builder.add(new MMappedFileSource(fromFile, chunkSize))
            val src = builder.add(FileIO.fromPath(fromFile.toPath))
            val bcast = builder.add(new Broadcast[ByteString](2, true))
            val omSink = builder.add(new MatrixStoreFileSink(mxsFile).async)

            src.out.log("copyToStream") ~> bcast ~> omSink
            bcast.out(1) ~> checksumSink
            ClosedShape
        }
        logger.debug(s"Created stream")
        RunnableGraph.fromGraph(graph).run().flatMap(finalChecksum=>{
          val timestampFinish = Instant.now.toEpochMilli
          val msDuration = timestampFinish - timestampStart

          val rate = fromFile.length().toDouble / msDuration.toDouble //in bytes/ms
          val mbps = rate /1048576 *1000  //in MByte/s

          logger.info(s"Stream completed, transferred ${fromFile.length} bytes in $msDuration millisec, at a rate of $mbps mByte/s.  Final checksum is $finalChecksum")
          finalChecksum match {
            case Some(actualChecksum)=>
              val updatedMetadata = metadata.get.copy(stringValues = metadata.get.stringValues ++ Map(checksumType->actualChecksum))
              MetadataHelper.setAttributeMetadata(mxsFile, updatedMetadata)

              MatrixStoreHelper.getOMFileMd5(mxsFile).flatMap({
                case Failure(err)=>
                  logger.error(s"Unable to get checksum from appliance, file should be considered unsafe", err)
                  Future.failed(err)
                case Success(remoteChecksum)=>
                  logger.info(s"Appliance reported checksum of $remoteChecksum")
                  if(remoteChecksum!=actualChecksum){
                    logger.error(s"Checksum did not match!")
                    if(!keepOnFailure) {
                      logger.info(s"Deleting invalid file ${mxsFile.getId}")
                      mxsFile.deleteForcefully()
                    }
                    if(retryOnFailure){
                      Thread.sleep(500)
                      doCopyTo(vault, destFileName, fromFile, chunkSize, checksumType, keepOnFailure, retryOnFailure)
                    } else {
                      Future.failed(new RuntimeException(s"Checksum did not match"))
                    }
                  } else {
                    Future((mxsFile.getId, finalChecksum))
                  }
              })
            case _=>
              Future((mxsFile.getId, finalChecksum))
          }
        })
      } catch {
        case err:Throwable=>
          logger.error(s"Could not prepare copy: ", err)
          Future.failed(err)
      }
    }
  }