app/services/DynamoCapacityActor.scala (157 lines of code) (raw):
package services
import java.util.UUID
import akka.actor.{Actor, ActorRef, Timers}
import com.theguardian.multimedia.archivehunter.common.ArchiveHunterConfiguration
import com.theguardian.multimedia.archivehunter.common.clientManagers.DynamoClientManager
import javax.inject.{Inject, Singleton}
import play.api.Logger
import software.amazon.awssdk.services.dynamodb.model.{DescribeTableRequest, GlobalSecondaryIndexUpdate, ProvisionedThroughput, TableDescription, TableStatus, UpdateGlobalSecondaryIndexAction, UpdateTableRequest}
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success, Try}
import scala.concurrent.duration._
object DynamoCapacityActor {
trait DCAMsg {
val tableName:String
val id:Option[UUID]
}
trait DCAReply {
}
/**
* this message is dispatched by a timer, to check if any tables in the "to check" list have become active again
*/
case object TimedStateCheck
case class UpdateCapacityTable(tableName:String, readTarget:Option[Int], writeTarget:Option[Int], indexUpdates:Seq[UpdateCapacityIndex], signalActor: ActorRef, signalMsg:AnyRef, id:Option[UUID]=None) extends DCAMsg
case class UpdateCapacityIndex(indexName:String, readTarget:Option[Int], writeTarget:Option[Int])
case class UpdateCapacity(ops: Seq[DCAMsg], signalActor: ActorRef, signalMsg:AnyRef)
case class PendingUpdate(tableName:String, indexName:Option[String], readTarget:Int, writeTarget:Int)
/**
* add the given request to the internal list, for testing
* @param rq request to add
*/
case class TestAddRequest(rq:DCAMsg)
case object TestGetCheckList
/**
* response message if the table state is not active
* @param tableName table name that can't update
* @param actualState actual state it's in
* @param wantedState the state it must be in before we can start
*/
case class TableWrongStateError(tableName:String, actualState:String, wantedState: String)
/**
* response message if the index updates can't be corralled (e.g., index name does not exist)
* @param tableName table name that can update
* @param problems Sequence of Throwables that describe the problems
*/
case class InvalidRequestError(tableName:String, problems:Seq[Throwable])
/**
* response message if the operation succeeded
* @param mustWait boolean indicating whether provisioned capacity was already met, so we can start straightaway
*/
case class UpdateRequestSuccess(tableName:String, mustWait:Boolean)
/**
* response message for [[TestGetCheckList]]
* @param entries current entries in the queue
*/
case class TestCheckListResponse(entries:Seq[DCAMsg])
}
@Singleton
class DynamoCapacityActor @Inject() (ddbClientMgr:DynamoClientManager, config:ArchiveHunterConfiguration) extends Actor {
import DynamoCapacityActor._
private val logger = Logger(getClass)
private var checkList:Seq[DCAMsg] = Seq()
private val awsProfile = config.getOptional[String]("externalData.awsProfile")
private val ddbClient = ddbClientMgr.getClient(awsProfile)
/**
* converts an [[UpdateCapacityIndex]] request into a DynamoDB [[GlobalSecondaryIndexUpdate]] request.
*
* @param rq [[UpdateCapacityIndex]] instance describing the update to make
* @param desc DDB TableDescription instance describing the table that the update will take place on
* @return if there is an error then a Failure is returned; if the provisioned capacity is already correct then Success(None) is
* returned; if an update is required then Success(Some(GlobalSecondaryIndexUpdate())) is returned
*/
def updateForIndex(rq:UpdateCapacityIndex, desc:TableDescription):Try[Option[GlobalSecondaryIndexUpdate]] = {
val indexDesc = desc.globalSecondaryIndexes().asScala.find(_.indexName()==rq.indexName) match {
case None=>
return Failure(new RuntimeException(s"Could not find index ${rq.indexName} on table ${desc.tableName()}"))
case Some(idx)=>idx
}
val currentThroughput = indexDesc.provisionedThroughput()
val actualReadTarget = rq.readTarget match {
case None=>currentThroughput.readCapacityUnits().toLong
case Some(tgt)=>tgt.toLong
}
val actualWriteTarget = rq.writeTarget match {
case None=>currentThroughput.writeCapacityUnits().toLong
case Some(tgt)=>tgt.toLong
}
if(currentThroughput.readCapacityUnits()==actualReadTarget && currentThroughput.writeCapacityUnits()==actualWriteTarget){
Success(None)
} else {
Success(Some(GlobalSecondaryIndexUpdate.builder().update(
UpdateGlobalSecondaryIndexAction.builder()
.indexName(rq.indexName)
.provisionedThroughput(ProvisionedThroughput.builder()
.readCapacityUnits(actualReadTarget)
.writeCapacityUnits(actualWriteTarget)
.build()
)
.build()
).build()
))
}
}
private def makeDescribeTableRequest(tableName:String) =
DescribeTableRequest.builder().tableName(tableName).build()
def getTableStatus(tableName: String):String =
ddbClient.describeTable(makeDescribeTableRequest(tableName)).table().tableStatusAsString()
/**
* recursively check the state of tables that need to be updated and dispatch the requested message to the requested
* actor if they have re-entered Active.
* @param toCheck seq of tables to check
* @param notReady seq of tables that are not ready yet. Used for recursion, don't specify when calling
* @return seq of tables that are not yet ready
*/
def checkAndDispatch(toCheck:Seq[DCAMsg], notReady:Seq[DCAMsg]=Seq()):Seq[DCAMsg] = {
toCheck.headOption match {
case Some(msg:UpdateCapacityTable) =>
logger.debug(s"Checking table ${msg.tableName}")
val st = getTableStatus(msg.tableName)
if (st == "ACTIVE") {
logger.debug(s"Table ${msg.tableName} has re-entered ACTIVE state, notifying")
msg.signalActor ! msg.signalMsg
checkAndDispatch(toCheck.tail, notReady)
} else {
logger.debug(s"Table ${msg.tableName} is in $st state")
checkAndDispatch(toCheck.tail, notReady ++ Seq(msg))
}
case Some(msg:UpdateCapacityIndex)=>
throw new RuntimeException("UpdateCapacityIndex is not implemented yet")
case Some(otherMsg)=>
throw new RuntimeException(s"$otherMsg is not implemented")
case None=>
notReady
}
}
override def receive: Receive = {
case TimedStateCheck=>
logger.debug("got TimedStateCheck")
checkList = checkAndDispatch(checkList)
case TestAddRequest(toAdd)=>
checkList = checkList ++ Seq(toAdd)
case TestGetCheckList=>
sender() ! TestCheckListResponse(checkList)
case tableRq: UpdateCapacityTable=>
val result = ddbClient.describeTable(makeDescribeTableRequest(tableRq.tableName))
if(result.table().tableStatus()!=TableStatus.ACTIVE){
logger.warn(s"Can't update table status while it is in ${result.table().tableStatusAsString()} state.")
sender() ! TableWrongStateError(tableRq.tableName, result.table().tableStatusAsString(), "ACTIVE")
} else {
val tableThroughput = result.table().provisionedThroughput()
val potentialIndexUpdate = tableRq.indexUpdates.map(updateForIndex(_, result.table()))
val potentialIndexUpdateFailures = potentialIndexUpdate.collect({case Failure(err)=>err})
if(potentialIndexUpdateFailures.nonEmpty){
logger.error("Could not build list of index updates:")
potentialIndexUpdateFailures.foreach(err=>logger.error("Index update list error: ", err))
sender() ! InvalidRequestError(tableRq.tableName, potentialIndexUpdateFailures)
} else {
val indexUpdates = potentialIndexUpdate.collect({case Success(Some(update))=>update})
val actualReadTarget = tableRq.readTarget match {
case None=>tableThroughput.readCapacityUnits().toLong
case Some(target)=>target.toLong
}
val actualWriteTarget = tableRq.writeTarget match {
case None=>tableThroughput.writeCapacityUnits().toLong
case Some(target)=>target.toLong
}
val rq = UpdateTableRequest.builder().tableName(tableRq.tableName)
if(tableThroughput.readCapacityUnits()==0 && tableThroughput.writeCapacityUnits()==0){
logger.info(s"Table ${tableRq.tableName} is in auto-provisioning mode, don't need to update.")
tableRq.signalActor ! tableRq.signalMsg
sender() ! UpdateRequestSuccess(tableRq.tableName, mustWait = false)
} else if(tableThroughput.readCapacityUnits()==actualReadTarget && tableThroughput.writeCapacityUnits()==actualWriteTarget && indexUpdates.isEmpty){
logger.info(s"Table ${tableRq.tableName} and indices already have requested throughput")
tableRq.signalActor ! tableRq.signalMsg
sender() ! UpdateRequestSuccess(tableRq.tableName, mustWait = false)
} else {
val rqWithTableUpdate = if (tableThroughput.readCapacityUnits() != actualReadTarget || tableThroughput.writeCapacityUnits() != actualWriteTarget) {
rq.provisionedThroughput(ProvisionedThroughput.builder()
.readCapacityUnits(actualReadTarget)
.writeCapacityUnits(actualWriteTarget)
.build()
)
} else {
rq
}
val rqWithIndexUpdate = if(indexUpdates.nonEmpty){
rq.globalSecondaryIndexUpdates(indexUpdates.asJavaCollection)
} else {
rq
}
logger.info(s"Updating table ${tableRq.tableName} with capacity $actualReadTarget read, $actualWriteTarget write and index $indexUpdates")
val updateResult = ddbClient.updateTable(rq.build())
checkList ++= Seq(tableRq)
sender() ! UpdateRequestSuccess(tableRq.tableName, mustWait = true)
}
}
}
}
}