app/helpers/ProxyLocator.scala (102 lines of code) (raw):
package helpers
import com.sksamuel.elastic4s.http.{ElasticClient, HttpClient}
import com.theguardian.multimedia.archivehunter.common.cmn_helpers.S3ClientExtensions.S3ClientExtensions
import com.theguardian.multimedia.archivehunter.common.cmn_models.{ConflictError, ItemNotFound, ScanTargetDAO, UnexpectedReturnCode}
import com.theguardian.multimedia.archivehunter.common.{ArchiveEntry, Indexer, ProxyLocation, ProxyType}
import org.slf4j.MDC
import play.api.Logger
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.s3.S3Client
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request
import scala.jdk.CollectionConverters._
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.ExecutionContext.Implicits.global
object ProxyLocator {
private val logger = Logger(getClass)
def checkBucketLocation(bucket:String, key:String)(implicit s3Client:S3Client) = Future.fromTry(
s3Client.doesObjectExist(bucket,key).map(exists=>(bucket, exists))
)
private val fileEndingRegex = "^(.*)\\.(.*)$".r
//list of any likely proxy extensions
private val knownVideoExtensions = Seq("avi","mp4","mxf","mov","m4v","mpg","mp2","ts","m2ts")
private val knownAudioExtensions = Seq("wav","aif","aiff","mp3","m2a")
private val knownImageExtensions = Seq("jpg","jpe","png","tga","tif","tiff","gif")
def stripFileEnding(filepath:String) = {
try {
val fileEndingRegex(stripped,_) = filepath
stripped
} catch {
case _:MatchError=>
filepath //if there is no file extension, then return the lot.
}
}
def getFileEnding(filepath:String) = {
try {
val fileEndingRegex(_,xtn) = filepath
Some(xtn)
} catch {
case _:MatchError=>
None
}
}
/**
* see if there is a proxy for the given [[ArchiveEntry]], assuming none exists in the dynamo table.
* exclude any .xml files as they are obviously not media
* @param entry [[ArchiveEntry]] instance
* @param s3Client implicitly provided AmazonS3 instance
* @return a Future, containing a Sequence of ProxyLocation objects for each file that matches the given root
*/
def findProxyLocation(entry:ArchiveEntry)(implicit s3Client:S3Client, scanTargetDAO: ScanTargetDAO) = {
scanTargetDAO.targetForBucket(entry.bucket).flatMap({
case None=>throw new RuntimeException(s"No scan target for ${entry.bucket}")
case Some(Left(err))=>throw new RuntimeException(err.toString) //fail the Future if we get an error. This is picked up with onComplete or recover.
case Some(Right(st))=>
val rq = ListObjectsV2Request.builder()
.bucket(st.proxyBucket)
.prefix(stripFileEnding(entry.path))
.build()
val potentialProxies = s3Client.listObjectsV2(rq)
logger.debug(s"findProxyLocation got ${potentialProxies.keyCount()} keys")
Future.sequence(potentialProxies.contents().asScala
.map(summary=>{
if(summary.key().endsWith(".xml")) {
None
} else {
Some(ProxyLocation.fromS3(st.proxyBucket, summary.key(), entry.bucket, entry.path, None, Region.of(entry.region.getOrElse("eu-west-1"))))
}
}).collect({case Some(proxyLocation)=>proxyLocation})
)
})
}
/**
* determine the proxy type for a given filepath
* @param filepath
* @return
*/
def proxyTypeForExtension(filepath:String) = getFileEnding(filepath).map(_.toLowerCase).map(xtn=>{
if(knownVideoExtensions.contains(xtn)){
ProxyType.VIDEO
} else if(knownAudioExtensions.contains(xtn)){
ProxyType.AUDIO
} else if(knownImageExtensions.contains(xtn)){
ProxyType.THUMBNAIL
} else {
ProxyType.UNKNOWN
}
})
/**
* sets the "proxied" flag on the given item, retrying in case of a version conflict
* @param sourceId archive entry ID to update
* @return a Future with either a Left with an error string or a Right with the item ID string
*/
def setProxiedWithRetry(sourceId:String)(implicit indexer:Indexer, httpClient:ElasticClient):Future[Either[String,String]] =
indexer.getById(sourceId).flatMap(entry=>{
logger.debug(s"setProxiedWithEntry: sourceId is $sourceId entry is $entry")
val updatedEntry = entry.copy(proxied = true)
MDC.put("entry", updatedEntry.toString)
indexer.indexSingleItem(updatedEntry)
}).flatMap({
case Right(value)=>
logger.debug(s"success returned $value")
Future(Right(value))
case Left(ConflictError(_, errorDesc))=> //instead of if(err.error.`type`=="version_conflict_engine_exception")
logger.warn(s"Elasticsearch version conflict detected for update of $sourceId ($errorDesc), retrying...")
setProxiedWithRetry(sourceId)
case Left(err@ UnexpectedReturnCode(_, _, maybeReason))=>
if(maybeReason.isDefined) MDC.put("error", maybeReason.get)
logger.error(s"Could not set proxied flag for $sourceId: ${maybeReason.getOrElse("no reason given")}")
Future(Left(err.toString))
case Left(ItemNotFound(itemId))=>
logger.warn(s"Item $itemId could not be found, been deleted already?!")
Future(Left("Item not found"))
case Left(otherError)=>
logger.error(s"Could not set proxied flag: $otherError")
Future(Left(otherError.toString))
}).recoverWith({
case err:Throwable=>
logger.error(s"Could not set proxied flag on $sourceId: ${err.getMessage}", err)
Future.failed(err)
})
}