app/services/LegacyProxiesScanner.scala (175 lines of code) (raw):

package services import akka.actor.{Actor, ActorSystem, Cancellable} import akka.stream.scaladsl.{Keep, Source} import akka.stream.{ActorMaterializer, KillSwitches, Materializer} import com.theguardian.multimedia.archivehunter.common.clientManagers.{DynamoClientManager, ESClientManager, S3ClientManager} import com.google.inject.Injector import com.theguardian.multimedia.archivehunter.common.ProxyLocation import helpers._ import javax.inject.{Inject, Singleton} import com.theguardian.multimedia.archivehunter.common.cmn_models.{ScanTarget, ScanTargetDAO} import play.api.{Configuration, Logger} import software.amazon.awssdk.services.dynamodb.model.{DescribeTableRequest, GlobalSecondaryIndexUpdate, ProvisionedThroughput, UpdateGlobalSecondaryIndexAction, UpdateTableRequest} import scala.concurrent.{ExecutionContext, Future, Promise} import scala.concurrent.duration._ import scala.util.{Failure, Success} object LegacyProxiesScanner { sealed trait LPSMessage case class ScanBucket(tgt: ScanTarget) extends LPSMessage case class CheckTableReady(nextMsg: LPSMessage) extends LPSMessage case class CheckTableReadyPrms(completionPromise: Promise[Boolean]) extends LPSMessage sealed trait LPSError case object WrongTableState extends LPSError case class ProviderError(err:Throwable) extends LPSError } @Singleton class LegacyProxiesScanner @Inject()(config:Configuration, ddbClientMgr:DynamoClientManager, s3ClientMgr:S3ClientManager, esClientMgr:ESClientManager, scanTargetDAO: ScanTargetDAO, injector:Injector)(implicit system:ActorSystem, mat:Materializer)extends Actor { import LegacyProxiesScanner._ import com.sksamuel.elastic4s.streams.ReactiveElastic._ import com.sksamuel.elastic4s.http.ElasticDsl._ private val logger = Logger(getClass) val indexName = config.getOptional[String]("elasticsearch.index").getOrElse("archivehunter") val tableName = config.get[String]("proxies.tableName") implicit val ec:ExecutionContext = system.dispatcher private val ddbClient = ddbClientMgr.getNewDynamoClient(config.getOptional[String]("externalData.awsProfile")) private var tableReadyTimer:Option[Cancellable] = None val updatingCapacityTarget = 300 /** * initiates an update to the table's provisioned capacity. * @param boostTo provisioned write capacity to boost to * @param tgt [[ScanTarget]] * @return a Boolean indicating whether you need to wait for the table to be ready. If true, then proceed immediately. * If false then return; [[CheckTableReady]] will be dispatched at regular intervals to check whether the table is ready. * Once it is then ScanBucket will be dispatched again with the [[ScanTarget]] provided in the `tgt` argument */ def updateProvisionedWriteCapacity(boostTo: Int, tgt:ScanTarget, completionPromise:Option[Promise[Boolean]]):Either[LPSError, Boolean] = { val rq = DescribeTableRequest.builder().tableName(config.get[String]("proxies.tableName")).build() val result = ddbClient.describeTable(rq) if(result.table().tableStatusAsString()!="ACTIVE"){ logger.warn(s"Can't update table status while it is in ${result.table().tableStatusAsString()} state.") Left(WrongTableState) } else { val tableThroughput = result.table().provisionedThroughput() if(tableThroughput.readCapacityUnits()==0){ //we are not in provisioned mode return Right(true) } val indexName = result.table().globalSecondaryIndexes().get(0).indexName() val indexThroughput = result.table().globalSecondaryIndexes().get(0).provisionedThroughput() logger.info(s"index name is $indexName, throughput is $indexThroughput") if (tableThroughput.writeCapacityUnits() == boostTo && indexThroughput.writeCapacityUnits()==boostTo) { Right(true) } else { val msgToSend = completionPromise match { case Some(promise) => CheckTableReadyPrms(promise) case None => CheckTableReady(ScanBucket(tgt)) } try { val newTableThroughput = ProvisionedThroughput.builder() .writeCapacityUnits(boostTo.toLong) .readCapacityUnits(tableThroughput.readCapacityUnits()) .build() val newIndexThroughput = ProvisionedThroughput.builder() .writeCapacityUnits(boostTo.toLong) .readCapacityUnits(10L) .build() val initialRq = UpdateTableRequest.builder() .tableName(tableName) val tableRq = if(tableThroughput.writeCapacityUnits() == boostTo){ initialRq } else { initialRq.provisionedThroughput(newTableThroughput) } val indexRq = if(indexThroughput.writeCapacityUnits()==boostTo){ tableRq } else { tableRq.globalSecondaryIndexUpdates(GlobalSecondaryIndexUpdate.builder() .update(UpdateGlobalSecondaryIndexAction.builder() .indexName(indexName) .provisionedThroughput(newIndexThroughput) .build() ).build() ) } ddbClient.updateTable(indexRq.build()) //this raises if it fails, caught just below. tableReadyTimer = Some(system.scheduler.scheduleAtFixedRate(10 seconds, 1 second, self, msgToSend)) Right(false) } catch { case ex:Throwable=>Left(ProviderError(ex)) } } } } /** * call out to DynamoDB to get the current table status * @return */ def isTableReady = { logger.info(s"Checking if table is ready...") val rq = DescribeTableRequest.builder() .tableName(config.get[String]("proxies.tableName")) .build() val result = ddbClient.describeTable(rq) result.table().tableStatusAsString() match { case "ACTIVE"=> //update has completed logger.info("Table has re-entered ACTIVE state") tableReadyTimer match { case None=> logger.error("Processing CheckTableReady with no active timer? This is a bug") false case Some(tmr)=> tmr.cancel() tableReadyTimer = None true } case "UPDATING"=> logger.info("Table is still in UPDATING state") false } } override def receive: Receive = { case ScanBucket(tgt)=> logger.info(s"Boosting provisioned write capacity to $updatingCapacityTarget...") //updateProvisionedWriteCapacity returns False if it needs to update capacity. We will get re-dispatched by the scheduler in //that case val canStart = updateProvisionedWriteCapacity(updatingCapacityTarget,tgt,None) match { case Left(WrongTableState)=> logger.error("Table is not in the right state, can't start a scan.") false case Left(ProviderError(err))=> logger.error("AWS returned an error, can't update provisioned capacity", err) false case Right(flag)=>flag } logger.debug(s"canStart: $canStart") if(canStart) { logger.info(s"Starting proxy scan on $tgt") val esClient = esClientMgr.getClient() // val client = s3ClientMgr.getAlpakkaS3Client(config.getOptional[String]("externalData.awsProfile")) // // val keySource = client.listBucket(tgt.proxyBucket, None) val searchHitPublisher = esClient.publisher(search(indexName) matchQuery ("bucket.keyword", tgt.bucketName) scroll "1m") val searchHitSource = Source.fromPublisher(searchHitPublisher) val archiveEntryConverter = new SearchHitToArchiveEntryFlow val proxyLocator = injector.getInstance(classOf[ProxyLocatorFlow]) val ddbSink = injector.getInstance(classOf[DDBSink]) val streamCompletionPromise = Promise[Unit]() val eosDetect = new EOSDetect[Unit, ProxyLocation](streamCompletionPromise, ()) //keySource.via(converter).log("legacy-proxies-scanner").via(eosDetect).to(ddbSink).run() searchHitSource .via(archiveEntryConverter.async) .via(proxyLocator.async) .via(eosDetect) .log("proxies-scanner") .to(ddbSink) .run() streamCompletionPromise.future.onComplete({ case Success(_)=> logger.info("Scan bucket completed, reverting provisioned write capacity...") val updateCapacityPromise = Promise[Boolean]() updateProvisionedWriteCapacity(4,tgt,Some(updateCapacityPromise)) updateCapacityPromise.future.onComplete({ case Success(_)=> logger.info("Successfully reduced provisioned write capacity.") case Failure(err)=> logger.error("Could not reduce provisioned write capacity: ", err) }) case Failure(err)=> logger.error("Stream completion failed, this is not expected", err) }) } case CheckTableReady(nextMsg:LPSMessage)=> logger.debug("checkTableReady") if(isTableReady){ logger.info(s"Re-sending $nextMsg") self ! nextMsg } case CheckTableReadyPrms(completionPromise)=> logger.debug("checkTableReadyPrms") if(isTableReady){ completionPromise.complete(Success(true)) } } }