app/services/FileMove/VerifyChecksum.scala (65 lines of code) (raw):
package services.FileMove
import akka.stream.Materializer
import akka.stream.alpakka.s3.scaladsl.S3
import akka.stream.scaladsl.Sink
import com.theguardian.multimedia.archivehunter.common.clientManagers.S3ClientManager
import helpers.DigestSink
import org.apache.commons.codec.binary.Hex
import org.slf4j.LoggerFactory
import play.api.{Configuration, Logger}
import services.FileMove.GenericMoveActor.{PerformStep, RollbackStep, StepFailed, StepSucceeded}
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
class VerifyChecksum (s3ClientManager:S3ClientManager, config:Configuration)(implicit mat:Materializer, ec:ExecutionContext) extends GenericMoveActor {
override protected val logger = Logger(getClass)
/**
* Streams the given file from S3 and performs an MD5 checksum on it.
* If successful, returns a tuple containing the checksum in a hex-formatted string
* and the object's metadata
* @param bucket bucket containing the object
* @param path path to the object
* @return a Future containing the object's checksum and metadata
*/
def getChecksum(bucket:String, path:String) = {
S3.download(bucket, path)
.runWith(Sink.head)
.flatMap({
case None=>
logger.error(s"Can't checksum because s3://$bucket/$path does not exist")
Future.failed(new RuntimeException(s"s3://$bucket/$path does not exist"))
case Some((source, meta))=>
source
.runWith(DigestSink("MD5"))
.map(checksum=>(Hex.encodeHexString(checksum.toByteBuffer), meta))
})
}
override def receive: Receive = {
case PerformStep(state)=>
if(state.entry.isEmpty || state.destFileId.isEmpty) {
logger.error(s"Either the source or destination file specifiers are empty so I can't verify the checksum")
sender() ! StepFailed(state, "Either source or destination specifier was empty")
} else {
val entry = state.entry.get
logger.info(s"Verifying checksums for s3://${entry.bucket}/${entry.path} -> s3://${state.destBucket}/${entry.path}")
val originalSender = sender()
Future.sequence(Seq(
getChecksum(entry.bucket, entry.path),
getChecksum(state.destBucket, entry.path)
)).onComplete({
case Success(results)=>
val sourceCS = results.head._1
val sourceMeta = results.head._2
val destCS = results(1)._1
val destMeta = results(1)._2
if(destMeta.contentLength != sourceMeta.contentLength) {
logger.error(s"INVALID COPY s3://${entry.bucket}/${entry.path} has ${sourceMeta.contentLength} bytes and s3://${state.destBucket}/${entry.path} has ${destMeta.contentLength} bytes")
originalSender ! StepFailed(state, "Content length did not match")
} else if(sourceCS != destCS){
logger.error(s"INVALID COPY s3://${entry.bucket}/${entry.path} has a checksum of $sourceCS and s3://${state.destBucket}/${entry.path} has $destCS")
originalSender ! StepFailed(state, "Checksums did not match")
} else {
logger.info(s"Checksums $sourceCS and file size ${destMeta.contentLength} match for s3://${entry.bucket}/${entry.path} -> s3://${state.destBucket}/${entry.path}")
originalSender ! StepSucceeded(state)
}
case Failure(err)=>
logger.error(s"Could not perform checksum validation for s3://${entry.bucket}/${entry.path} -> s3://${state.destBucket}/${entry.path}: ${err.getMessage}", err)
originalSender ! StepFailed(state, err.getMessage)
})
}
case RollbackStep(state)=>
//nothing to roll back here
logger.info("VerifyChecksum has nothing to roll back")
sender() ! StepSucceeded(state)
}
}