app/services/ProxiesRelinker.scala (170 lines of code) (raw):
package services
import java.time.ZonedDateTime
import akka.actor.{Actor, ActorSystem}
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.{ActorMaterializer, Materializer}
import com.sksamuel.elastic4s.http.bulk.BulkResponseItem
import com.theguardian.multimedia.archivehunter.common.clientManagers.{DynamoClientManager, ESClientManager}
import com.theguardian.multimedia.archivehunter.common._
import javax.inject.{Inject,Singleton}
import play.api.{Configuration, Logger}
import com.sksamuel.elastic4s.streams.ReactiveElastic._
import com.sksamuel.elastic4s.streams.{RequestBuilder, ResponseListener, SubscriberConfig}
import com.theguardian.multimedia.archivehunter.common.cmn_models.{JobModelDAO, JobStatus, ScanTargetDAO}
import helpers.{EOSDetect, ProxyLocatorFlow, ProxyVerifyFlow, SearchHitToArchiveEntryFlow}
import models.IndexUpdateCounter
import play.api.inject.Injector
import scala.util.{Failure, Success}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Promise
import scala.concurrent.duration._
object ProxiesRelinker {
trait RelinkMsg
trait RelinkReply
case class RelinkRequest(fileId:String) extends RelinkMsg
case class RelinkScanTargetRequest(jobId:String, collectionName: String) extends RelinkMsg
case class RelinkAllRequest(jobId:String) extends RelinkMsg
case class RelinkSuccess(indexUpdateCounter: IndexUpdateCounter) extends RelinkReply
case class RelinkError(err:Throwable) extends RelinkReply
}
@Singleton
class ProxiesRelinker @Inject() (config:Configuration,
esClientMgr:ESClientManager, ddbClientMgr:DynamoClientManager, system:ActorSystem,
proxyVerifyFlow: ProxyVerifyFlow, jobModelDAO:JobModelDAO, scanTargetDAO:ScanTargetDAO)
(implicit mat:Materializer)
extends Actor with ArchiveEntryRequestBuilder {
import ProxiesRelinker._
import com.sksamuel.elastic4s.http.ElasticDsl._
private val logger = Logger(getClass)
override val indexName = config.get[String]("externalData.indexName")
private val indexer = new Indexer(indexName)
private val esClient = esClientMgr.getClient()
protected def getIndexScanSource(targetBucket:Option[String]) = {
val queryTerms = Seq(
Some(termQuery("proxied", false)),
targetBucket.map(bucketName=>matchQuery("bucket.keyword", bucketName))
).collect({case Some(term)=>term})
val pub = esClient.publisher(search(indexName) query boolQuery().must(queryTerms) scroll "5m")
Source.fromPublisher(pub)
}
protected def getIndexUpdateSink(completionPromise:Promise[IndexUpdateCounter]) = {
var ackCounter = 0
var errCounter = 0
val esSubscriberConfig = SubscriberConfig[ArchiveEntry](listener = new ResponseListener[ArchiveEntry] {
override def onAck(resp: BulkResponseItem, original: ArchiveEntry): Unit = {
logger.debug(s"ES subscriber ACK: ${resp.toString} for $original")
ackCounter+=1
}
override def onFailure(resp: BulkResponseItem, original: ArchiveEntry): Unit = {
logger.error(s"ES subscriber failed: ${resp.error} ${resp.result}")
errCounter += 1
}
},batchSize=100,concurrentRequests=5,completionFn = ()=>{
logger.warn("Index update sink completed")
//the promise may have already been completed by the errorFn below
if(!completionPromise.isCompleted) completionPromise.complete(Success(IndexUpdateCounter(ackCounter, errCounter)))
},errorFn = (err:Throwable)=>{
logger.error("Could not send to elasticsearch", err)
completionPromise.failure(err)
},failureWait= 5.seconds, maxAttempts=10
)
val sub = esClient.subscriber[ArchiveEntry]()
Sink.fromSubscriber(sub)
}
private def relinkScan(jobId:String, bucketName:Option[String]) = {
val completionPromise = Promise[IndexUpdateCounter]()
val eosPromise = Promise[Unit]()
logger.info(s"Starting ${bucketName.getOrElse("global")} relink scan")
val eosDetect = new EOSDetect[Unit, ArchiveEntry](eosPromise, ())
getIndexScanSource(bucketName)
.via(new SearchHitToArchiveEntryFlow)
.via(proxyVerifyFlow)
.via(eosDetect)
.to(getIndexUpdateSink(completionPromise))
.run()
val originalSender = sender()
eosPromise.future.onComplete({
case Success(counter)=>
logger.info(s"Global relink scan completed - detected via EOS")
jobModelDAO.jobForId(jobId).map({
case None=>
logger.error(s"Proxy relink job record must have been deleted while we were running! Job completed but can't record.")
case Some(Right(jobModel))=>
val updatedJob = jobModel.copy(completedAt = Some(ZonedDateTime.now()),jobStatus = JobStatus.ST_SUCCESS)
jobModelDAO.putJob(updatedJob)
case Some(Left(dynamoErr))=>
logger.error(s"Could not get job record: $dynamoErr")
})
originalSender ! RelinkSuccess(IndexUpdateCounter(-1,-1))
case Failure(err)=>
logger.error("Global relink scan failed with error: ",err)
jobModelDAO.jobForId(jobId).map({
case None=>
logger.error(s"Proxy relink job record must have been deleted while we were running! Job completed but can't record.")
case Some(Left(err))=>
logger.error(s"Could not get job record: ${err}")
case Some(Right(jobModel))=>
val updatedJob = jobModel.copy(completedAt = Some(ZonedDateTime.now()),jobStatus = JobStatus.ST_ERROR, log = Some(err.toString))
jobModelDAO.putJob(updatedJob)
})
originalSender ! RelinkError(err)
})
completionPromise.future.onComplete({
case Success(counter)=>
logger.info(s"Global relink scan completed with ${counter.ackCount} successful and ${counter.errorCount} failed operations")
jobModelDAO.jobForId(jobId).map({
case None=>
logger.error(s"Proxy relink job record must have been deleted while we were running! Job completed but can't record.")
case Some(Right(jobModel))=>
val updatedJob = jobModel.copy(completedAt = Some(ZonedDateTime.now()),jobStatus = JobStatus.ST_SUCCESS)
jobModelDAO.putJob(updatedJob)
case Some(Left(err))=>
logger.error(s"Could not get job record: $err")
})
originalSender ! RelinkSuccess(counter)
case Failure(err)=>
logger.error("Global relink scan failed with error: ",err)
jobModelDAO.jobForId(jobId).map({
case None=>
logger.error(s"Proxy relink job record must have been deleted while we were running! Job completed but can't record.")
case Some(Right(jobModel))=>
val updatedJob = jobModel.copy(completedAt = Some(ZonedDateTime.now()),jobStatus = JobStatus.ST_ERROR, log = Some(err.toString))
jobModelDAO.putJob(updatedJob)
case Some(Left(err))=>
logger.error(s"Could not get job record: $err")
})
originalSender ! RelinkError(err)
})
}
override def receive: Receive = {
case RelinkAllRequest(jobId)=>
jobModelDAO.jobForId(jobId).map({
case None=>
logger.error("ProxiesRelinker was sent an invalid job ID, can't continue.")
case Some(Left(err))=>
logger.error(s"Could not retrieve job entry from database: $err, can't continue")
case Some(Right(jobModel))=>
val updatedJob = jobModel.copy(startedAt = Some(ZonedDateTime.now()),jobStatus=JobStatus.ST_RUNNING)
jobModelDAO.putJob(updatedJob).onComplete({
case Success(_)=>
relinkScan(jobId,None)
case Failure(err)=>
logger.error(s"Could not update job: ${err.getMessage}", err)
})
})
case RelinkScanTargetRequest(jobId, collectionName)=>
jobModelDAO.jobForId(jobId).map({
case None=>
logger.error("ProxiesRelinker was sent and invalid job ID, can't continue.")
case Some(Left(err))=>
logger.error(s"Could not retrieve job entry from database: $err, can't continue")
case Some(Right(jobModel))=>
val updatedJob = jobModel.copy(startedAt = Some(ZonedDateTime.now()), jobStatus=JobStatus.ST_RUNNING)
jobModelDAO.putJob(updatedJob).onComplete({
case Success(_)=>
relinkScan(jobId, Some(collectionName))
case Failure(err)=>
logger.error(s"Could not update job: $err")
})
})
}
}