app/story_packages/services/DynamoReindexJobs.scala (112 lines of code) (raw):
package story_packages.services
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration
import com.amazonaws.services.dynamodbv2.{AmazonDynamoDBClientBuilder}
import com.amazonaws.services.dynamodbv2.document.spec.{DeleteItemSpec, QuerySpec, ScanSpec, UpdateItemSpec}
import com.amazonaws.services.dynamodbv2.document.utils.ValueMap
import com.amazonaws.services.dynamodbv2.document.{AttributeUpdate, DynamoDB, Item}
import story_packages.metrics.ReindexMetrics
import conf.ApplicationConfiguration
import story_packages.updates._
import scala.jdk.CollectionConverters._
class DynamoReindexJobs(config: ApplicationConfiguration) extends Logging {
private lazy val client =
AmazonDynamoDBClientBuilder.standard
.withCredentials(config.aws.mandatoryCredentials)
.withEndpointConfiguration(new EndpointConfiguration(config.aws.endpoints.dynamoDB, config.aws.region))
.build
private lazy val table = new DynamoDB(client).getTable(config.reindex.progressTable)
private def asReindexProgress(item: Item): ReindexProgress = {
ReindexProgress(
status = item.getString("reindexStatus"),
documentsIndexed = item.getInt("documentsIndexed"),
documentsExpected = item.getInt("documentsExpected")
)
}
def hasJobInProgress(isHidden: Boolean): Boolean = {
jobInProgress(isHidden).nonEmpty
}
def jobInProgress(isHidden: Boolean): Option[ReindexProgress] = {
val values = new ValueMap()
.withString(":status", "in progress")
.withBoolean(":hidden", isHidden)
val queryExpression = new QuerySpec()
.withKeyConditionExpression("reindexStatus = :status")
.withFilterExpression("isHidden = :hidden")
.withValueMap(values)
.withMaxResultSize(1)
val job = table.query(queryExpression).asScala.toList.map(asReindexProgress).headOption
ReindexMetrics.QueryCount.increment()
job
}
def createJob(reindexPage: ReindexPage): RunningJob = {
val job = RunningJob(reindexPage)
val item = new Item()
.withPrimaryKey("reindexStatus", job.status.label, "startTime", job.startTime.toString)
.withInt("documentsIndexed", job.documentsIndexed)
.withInt("documentsExpected", job.documentsExpected)
.withBoolean("isHidden", job.isHidden)
Logger.info(s"Creating reindex job at ${job.startTime}")
table.putItem(item)
ReindexMetrics.UpdateCount.increment()
job
}
def markProgressUpdate(previousRunningJob: RunningJob, processedResults: Int) = {
val job = previousRunningJob.copy(documentsIndexed = processedResults)
val updateSpec = new UpdateItemSpec()
.withPrimaryKey("reindexStatus", job.status.label, "startTime", job.startTime.toString)
.addAttributeUpdate(new AttributeUpdate("documentsIndexed").put(job.documentsIndexed))
.addAttributeUpdate(new AttributeUpdate("documentsExpected").put(job.documentsExpected))
.addAttributeUpdate(new AttributeUpdate("isHidden").put(job.isHidden))
Logger.info(s"Marking reindex progress update at ${job.startTime}")
table.updateItem(updateSpec)
ReindexMetrics.UpdateCount.increment()
}
def markCompleteJob(previousRunningJob: RunningJob, lastProcessedResult: Int) = {
val job = previousRunningJob.copy(
status = Completed(),
documentsIndexed = lastProcessedResult
)
Logger.info(s"Marking reindex complete at ${previousRunningJob.startTime}")
val item = new Item()
.withPrimaryKey("reindexStatus", job.status.label, "startTime", job.startTime.toString)
.withInt("documentsIndexed", job.documentsIndexed)
.withInt("documentsExpected", job.documentsExpected)
.withBoolean("isHidden", job.isHidden)
table.putItem(item)
ReindexMetrics.UpdateCount.increment()
table.deleteItem(new DeleteItemSpec()
.withPrimaryKey("reindexStatus", previousRunningJob.status.label, "startTime", previousRunningJob.startTime.toString)
)
ReindexMetrics.DeleteCount.increment()
}
def markFailedJob(previousRunningJob: RunningJob) = {
val job = previousRunningJob.copy(
status = Failed()
)
Logger.info(s"Marking reindex failed at ${previousRunningJob.startTime}")
val item = new Item()
.withPrimaryKey("reindexStatus", job.status.label, "startTime", job.startTime.toString)
.withInt("documentsIndexed", job.documentsIndexed)
.withInt("documentsExpected", job.documentsExpected)
.withBoolean("isHidden", job.isHidden)
table.putItem(item)
ReindexMetrics.UpdateCount.increment()
table.deleteItem(new DeleteItemSpec()
.withPrimaryKey("reindexStatus", previousRunningJob.status.label, "startTime", previousRunningJob.startTime.toString)
)
ReindexMetrics.UpdateCount.increment()
}
def getLastStartedJob(isHidden: Boolean): Option[ReindexProgress] = {
val values = new ValueMap()
.withString(":status", "in progress")
.withBoolean(":hidden", isHidden)
Logger.info(s"Scanning reindex jobs for last started job with isHidden $isHidden")
val scanRequest = new ScanSpec()
.withFilterExpression("isHidden = :hidden and not reindexStatus = :status")
.withValueMap(values)
import SortItemsByLastStartTime._
val progress = table.scan(scanRequest).asScala.toList.sorted.map(asReindexProgress).headOption
ReindexMetrics.ScanCount.increment()
progress
}
}