app/services/BucketScanner.scala (207 lines of code) (raw):

package services import java.time.ZonedDateTime import akka.NotUsed import akka.actor.{Actor, ActorSystem, Timers} import akka.stream.scaladsl.{GraphDSL, Keep, RunnableGraph, Sink, Source} import akka.stream._ import akka.stream.alpakka.s3.{S3Attributes, S3Ext} import akka.stream.alpakka.s3.scaladsl.S3 import com.theguardian.multimedia.archivehunter.common.clientManagers.{DynamoClientManager, ESClientManager, S3ClientManager} import com.amazonaws.regions.{Region, Regions} import com.google.inject.Injector import com.sksamuel.elastic4s.http.{ElasticClient, HttpClient} import com.sksamuel.elastic4s.http.bulk.BulkResponseItem import helpers._ import javax.inject.{Inject, Singleton} import com.theguardian.multimedia.archivehunter.common.cmn_models._ import play.api.{Configuration, Logger} import com.sksamuel.elastic4s.streams.{RequestBuilder, ResponseListener, SubscriberConfig} import com.theguardian.multimedia.archivehunter.common.cmn_helpers.ZonedTimeFormat import com.theguardian.multimedia.archivehunter.common.{ArchiveEntry, ArchiveEntryRequestBuilder} import scala.concurrent.{Await, ExecutionContext, Future, Promise} import scala.concurrent.duration._ import scala.util.{Failure, Success} object BucketScanner { trait BSMsg case object TickKey case class ScanTargetsUpdated() extends BSMsg case class PerformTargetScan(record:ScanTarget, maybeJob:Option[JobModel]=None) extends BSMsg case class PerformDeletionScan(record:ScanTarget, thenScanForNew: Boolean=false, maybeJob:Option[JobModel]=None) extends BSMsg case object RegularScanTrigger extends BSMsg } @Singleton class BucketScanner @Inject()(override val config:Configuration, ddbClientMgr:DynamoClientManager, s3ClientMgr:S3ClientManager, esClientMgr:ESClientManager, scanTargetDAO: ScanTargetDAO, jobModelDAO:JobModelDAO, injector:Injector)(implicit system:ActorSystem, mat:Materializer) extends Actor with BucketScannerFunctions with ZonedTimeFormat with ArchiveEntryRequestBuilder{ import BucketScanner._ import com.sksamuel.elastic4s.http.ElasticDsl._ import com.sksamuel.elastic4s.streams.ReactiveElastic._ protected val logger=Logger(getClass) implicit val ec:ExecutionContext = system.dispatcher override val indexName: String = config.get[String]("externalData.indexName") def listScanTargets() = scanTargetDAO.allScanTargets().map(result=>{ val errors = result.collect({ case Left(readError)=>readError }) if(errors.isEmpty){ result.collect({ case Right(scanTarget)=>scanTarget }) } else { throw new RuntimeException(errors.map(_.toString).mkString(",")) } }) /** * trigger a scan, if one is due, by messaging ourself * @param scanTarget [[ScanTarget]] instance to check * @return boolean indicating whether a scan was triggered */ def maybeTriggerScan(scanTarget:ScanTarget):Boolean = { if(!scanTarget.enabled){ logger.info(s"Not scanning ${scanTarget.bucketName} as it is disabled") false } else if(scanIsInProgress(scanTarget)){ logger.info(s"Not scanning ${scanTarget.bucketName} as it is already in progress") false } else { if(scanIsScheduled(scanTarget)){ self ! PerformDeletionScan(scanTarget, thenScanForNew = true) true } else { logger.info(s"Not scanning ${scanTarget.bucketName} as it is not due yet") false } } } protected def getElasticSearchSink(esclient:ElasticClient, completionPromise:Promise[Unit]) = { val esSubscriberConfig = SubscriberConfig[ArchiveEntry](listener = new ResponseListener[ArchiveEntry] { override def onAck(resp: BulkResponseItem, original: ArchiveEntry): Unit = { logger.debug(s"ES subscriber ACK: ${resp.toString} for $original") } override def onFailure(resp: BulkResponseItem, original: ArchiveEntry): Unit = { logger.error(s"ES subscriber failed: ${resp.error} ${resp.result}") } },batchSize=100,concurrentRequests=5,completionFn = ()=>{ //the promise may have already been completed by the errorFn below if(!completionPromise.isCompleted) completionPromise.complete(Success(())) () },errorFn = (err:Throwable)=>{ logger.error("Could not send to elasticsearch", err) completionPromise.failure(err) () },failureWait= 5.seconds, maxAttempts=10 ) val subscriber = esclient.subscriber[ArchiveEntry](esSubscriberConfig) Sink.fromSubscriber(subscriber) } /** * Performs a scan of the given target. The scan is done asynchronously using Akka streaming, so this method returns a Promise * which completes when the scan is done. * @param target [[ScanTarget]] indicating bucket to process * @return a Promise[Unit] which completes when the scan finishes */ def doScan(target: ScanTarget) = { val completionPromise = Promise[Unit]() logger.info(s"Started scan for $target") val esclient = esClientMgr.getClient() val keySource = S3.listBucket(target.bucketName, None) val converterFlow = injector.getInstance(classOf[S3ToArchiveEntryFlow]) val indexSink = getElasticSearchSink(esclient, completionPromise) val properCredentials = s3ClientMgr.getAlpakkaCredentials(config.getOptional[String]("externalData.awsProfile")) keySource .withAttributes(S3Attributes.settings(properCredentials)) .via(converterFlow).named(target.region).log("S3ToArchiveEntryFlow").to(indexSink).run() completionPromise } /** * Performs a scan in "paranoid" mode, i.e. assume that S3 will return invalid XML that will break the standard SDK (it does sometimes....) * @param target [[ScanTarget]] indicating bucket to process * @return a Promise[Unit] which completes when the scan finishes */ def doScanParanoid(target:ScanTarget):Promise[Unit] = { logger.warn(s"Configured to do paranoid scan on $target") val region = Region.getRegion(Regions.fromName(target.region)) val completionPromise = Promise[Unit]() //this promise will get fulfilled when the stream ends. val esclient = esClientMgr.getClient() val source = new ParanoidS3Source(target.bucketName,region, s3ClientMgr.newCredentialsProvider(config.getOptional[String]("externalData.awsProfile"))) val processor = new S3XMLProcessor() val converterFlow = injector.getInstance(classOf[S3ToArchiveEntryFlow]) val indexSink = getElasticSearchSink(esclient, completionPromise) val graph = RunnableGraph.fromGraph(GraphDSL.create(){ implicit builder:GraphDSL.Builder[NotUsed]=> import GraphDSL.Implicits._ val src = builder.add(source) val proc = builder.add(processor) val converter = builder.add(converterFlow) val sink = builder.add(indexSink) src ~> proc ~> converter ~> sink ClosedShape }) graph.run() completionPromise } def doScanDeleted(target:ScanTarget):Promise[Unit] = { val completionPromise = Promise[Unit]() val esclient = esClientMgr.getClient() val esSource = Source.fromPublisher(esclient.publisher(search(indexName) query s"bucket.keyword:${target.bucketName} AND beenDeleted:false" scroll "1m")) val verifyFlow = injector.getInstance(classOf[ArchiveEntryVerifyFlow]) val indexSink = getElasticSearchSink(esclient, completionPromise) val killSwitch = esSource.via(new SearchHitToArchiveEntryFlow).via(verifyFlow).log("ArchiveEntryVerifyFlow").viaMat(KillSwitches.single)(Keep.right).to(indexSink).run() completionPromise.future.onComplete({ case Success(_)=>logger.info("Deletion scan completed") case Failure(err)=> logger.warn("Scan failure detected, shutting down pipeline") killSwitch.abort(err) }) completionPromise } override def receive: Receive = { case RegularScanTrigger=> logger.debug("Regular scan trigger received") listScanTargets().map(_.map(tgt=>(tgt,maybeTriggerScan(tgt)))).onComplete({ case Success(resultList)=> logger.info("Scan trigger report:") resultList.foreach(result=>logger.info(s"${result._1.bucketName}: ${result._2}")) case Failure(err)=> logger.error("Could not perform regular scan check", err) }) case PerformTargetScan(tgt, maybeJob)=> scanTargetDAO.setInProgress(tgt, newValue=true).flatMap(updatedScanTarget=> { val promise = tgt.paranoid match { case Some(value) => if (value) doScanParanoid(tgt) else doScan(tgt) case None => doScan(tgt) } promise.future.andThen({ case Success(x) => maybeJob.map(job=>{ val updatedJob = job.copy(completedAt=Some(ZonedDateTime.now()), jobStatus = JobStatus.ST_SUCCESS) jobModelDAO.putJob(updatedJob) }) scanTargetDAO.setScanCompleted(updatedScanTarget) case Failure(err) => maybeJob.map(job=>{ val updatedJob = job.copy(completedAt=Some(ZonedDateTime.now()), jobStatus = JobStatus.ST_ERROR, log=Some(err.toString)) jobModelDAO.putJob(updatedJob) }) scanTargetDAO.setScanCompleted(updatedScanTarget, error = Some(err)) }) }).onComplete({ case Success(result)=> logger.info(s"Completed addition scan of ${tgt.bucketName}") case Failure(err)=> logger.error(s"Could not scan ${tgt.bucketName}: ", err) }) case PerformDeletionScan(tgt, thenScanForNew, maybeJob)=> scanTargetDAO.setInProgress(tgt, newValue = true).flatMap(updatedScanTarget=> doScanDeleted(updatedScanTarget).future.andThen({ case Success(_)=> scanTargetDAO.setScanCompleted(updatedScanTarget) case Failure(err)=> scanTargetDAO.setScanCompleted(updatedScanTarget, error=Some(err)) }) ).onComplete({ case Success(result)=> logger.info(s"Completed deletion scan of ${tgt.bucketName}") if(thenScanForNew){ logger.info(s"Scheduling scan for new items in ${tgt.bucketName}") self ! PerformTargetScan(tgt) //don't complete the job here, it's done in PerformTargetScan } else { //PerformTargetScan not happening, need to complete the job here maybeJob.map(job=>{ val updatedJob = job.copy(completedAt=Some(ZonedDateTime.now()), jobStatus = JobStatus.ST_SUCCESS) jobModelDAO.putJob(updatedJob) }) } case Failure(err)=> logger.error(s"Could not scan ${tgt.bucketName}", err) maybeJob.map(job=>{ val updatedJob = job.copy(completedAt=Some(ZonedDateTime.now()), jobStatus = JobStatus.ST_ERROR, log=Some(err.toString)) jobModelDAO.putJob(updatedJob) }) }) } }