app/helpers/CreateProxySink.scala (63 lines of code) (raw):

package helpers import akka.actor.ActorRef import akka.stream.{Attributes, Inlet, SinkShape} import akka.stream.stage.{AbstractInHandler, GraphStage, GraphStageLogic} import com.theguardian.multimedia.archivehunter.common.{ArchiveEntry, ProxyLocationDAO, ProxyType} import com.theguardian.multimedia.archivehunter.common.ProxyTranscodeFramework.{ProxyGenerators, RequestType} import javax.inject.{Inject, Named} import play.api.Logger import scala.concurrent.{Await, Future} import scala.util.matching.Regex import scala.util.{Failure, Success} /** * akka sink that will send the provided [[ArchiveEntry]] objects to proxy */ import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ class CreateProxySink @Inject() (proxyGenerators: ProxyGenerators)(implicit proxyLocationDAO:ProxyLocationDAO) extends GraphStage[SinkShape[ArchiveEntry]]{ private final val in:Inlet[ArchiveEntry] = Inlet.create("CreateProxySink.in") private val logger = Logger(getClass) override def shape: SinkShape[ArchiveEntry] = SinkShape.of(in) val dotFileRegex:Regex = ".*/\\.[^/]*".r val dotFileRoot:Regex = "^\\.[^/]*".r /** * return TRUE if the filepath is a dot-file, false otherwise * @param filePath */ def isDotFile(filePath:String) = { filePath match { case dotFileRegex(_*)=>true case dotFileRoot(_*)=>true case _=>false } } override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { setHandler(in, new AbstractInHandler { override def onPush(): Unit = { val elem = grab(in) val proxyType = proxyGenerators.defaultProxyType(elem) if(!isDotFile(elem.path)) { val operationFutures = Future.sequence(Seq( Some(proxyGenerators.requestProxyJob(RequestType.THUMBNAIL, elem, None)), Some(proxyGenerators.requestProxyJob(RequestType.ANALYSE, elem, None)), proxyType.map(pt => proxyGenerators.requestProxyJob(RequestType.PROXY, elem, Some(pt))) ).collect({ case Some(fut) => fut })).map(resultSeq => { val failures = resultSeq.collect({ case Failure(err) => err }) if (failures.nonEmpty) { logger.error("Could not thumb and analyse: ") failures.foreach(err => logger.error("Error: ", err)) Failure(failures.head) } else { Success(resultSeq.collect({ case Success(result) => result })) } }) Await.result(operationFutures, 30 seconds) match { case Success(msg) => logger.info(s"Bulk proxy trigger success: $msg") case Failure(err) => logger.info(s"Bulk proxy trigger failed: $err") } } else { logger.info(s"Not proxying dot-file ${elem.path}") } pull(in) } }) override def preStart(): Unit = { pull(in) } } }