in online_archive/src/main/scala/AssetSweeperMessageProcessor.scala [46:103]
protected def getCorrelationId:String = UUID.randomUUID().toString
private def callUpload(fullPath:Path, relativePath:Path) = {
logger.info(s"Archiving file '$fullPath' to s3://${uploader.bucketName}/$relativePath")
uploader.copyFileToS3(fullPath.toFile, Some(relativePath.toString)).flatMap((fileInfo)=>{
val (fileName, fileSize) = fileInfo
val correlationId = getCorrelationId
MDC.put("correlationId", correlationId)
logger.debug(s"$fullPath: Upload completed")
val archiveHunterID = ArchiveHunter.makeDocId(bucket = uploader.bucketName, fileName)
logger.debug(s"archivehunter ID for $relativePath is $archiveHunterID")
archivedRecordDAO
.findBySourceFilename(fullPath.toString)
.map({
case Some(existingRecord)=>
existingRecord.copy(
uploadedBucket = uploader.bucketName,
uploadedPath = fileName,
uploadedVersion = None
)
case None=>
ArchivedRecord(archiveHunterID,
originalFilePath=fullPath.toString,
originalFileSize=fileSize,
uploadedBucket = uploader.bucketName,
uploadedPath = fileName,
uploadedVersion = None,
correlationId)
}).flatMap(rec=>{
archivedRecordDAO
.writeRecord(rec)
.map(recId =>
Right(
rec
.copy(id = Some(recId))
.asJson
)
)
})
}).recoverWith(err=>{
logger.error(s"Could not complete upload for $fullPath ${err.getMessage}", err)
val attemptCount = attemptCountFromMDC() match {
case Some(count)=>count
case None=>
logger.warn(s"Could not get attempt count from logging context for ${fullPath.toString}, creating failure report with attempt 1")
1
}
val rec = FailureRecord(id = None,
originalFilePath = fullPath.toString,
attempt = attemptCount,
errorMessage = err.getMessage,
errorComponent = ErrorComponents.Internal,
retryState = RetryStates.WillRetry)
failureRecordDAO.writeRecord(rec).map(_=>Left(err.getMessage))
})
}