app/story_packages/updates/Reindex.scala (123 lines of code) (raw):
package story_packages.updates
import org.apache.pekko.actor.Scheduler
import com.amazonaws.services.dynamodbv2.document.Item
import story_packages.model.StoryPackage
import org.joda.time.DateTime
import play.api.libs.json.Json
import story_packages.services.{Database, DynamoReindexJobs, FrontsApi, Logging}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.control.NonFatal
import play.api.libs.json.OFormat
case class ReindexPage(
totalCount: Int,
list: List[StoryPackage],
next: Option[String],
isHidden: Boolean
)
case class RunningJob(
startTime: DateTime,
status: ReindexStatus,
documentsIndexed: Int,
documentsExpected: Int,
isHidden: Boolean
)
object RunningJob {
def apply(reindexPage: ReindexPage): RunningJob = {
val now = new DateTime()
RunningJob(now, InProgress(), 0, reindexPage.totalCount, reindexPage.isHidden)
}
}
case class ReindexProgress(
status: String,
documentsIndexed: Int,
documentsExpected: Int
)
object ReindexProgress {
implicit val jsonFormat: OFormat[ReindexProgress] = Json.format[ReindexProgress]
}
sealed trait ReindexStatus{val label: String}
case class InProgress(label: String = "in progress") extends ReindexStatus
case class Failed(label: String = "failed") extends ReindexStatus
case class Completed(label: String = "completed") extends ReindexStatus
case class Cancelled(label: String = "cancelled") extends ReindexStatus
object SortItemsByLastStartTime {
implicit def sortByStartTime: Ordering[Item] = {
def convertToDateTime(item: Item) = new DateTime(item.getString("startTime"))
Ordering.fromLessThan(convertToDateTime(_) isAfter convertToDateTime(_))
}
}
class Reindex(dynamoReindexJobs: DynamoReindexJobs, database: Database, frontsApi: FrontsApi, kinesisEventSender: KinesisEventSender,
scheduler: Scheduler) extends Logging {
def scheduleJob(isHidden: Boolean = false): Future[Option[RunningJob]] = {
if (dynamoReindexJobs.hasJobInProgress(isHidden)) {
Logger.info(s"Cannot run multiple reindex at the same time")
Future.successful(None)
} else {
Logger.info("Scanning table for reindex job")
database.scanAllPackages(isHidden)
.map(reindexPage => {
val job = dynamoReindexJobs.createJob(reindexPage)
scheduler.scheduleOnce(1.seconds) {
processJob(job, reindexPage)
}
Some(job)
})
}
}
private def processJob(job: RunningJob, step: ReindexPage) = {
Logger.info(s"Processing reindex job step with ${step.list.size} packages out of ${step.totalCount}")
val notifyEvery = math.ceil(step.totalCount / 100.0)
step.list.foldLeft(Future.successful(0)) {
(previousFuture, nextPackage) =>
for {
processedResults <- previousFuture
_ <- sendToKinesisStream(nextPackage, job)
} yield {
if (processedResults % notifyEvery == 0) dynamoReindexJobs.markProgressUpdate(job, processedResults)
processedResults + 1
}
}
.map(lastProcessedResult => {
Logger.info(s"Processing reindex job $job, step completed successfully")
step.next match {
case Some(nextPage) =>
// TODO iterate on next page
dynamoReindexJobs.markCompleteJob(job, lastProcessedResult)
Logger.error("Next page not implemented")
case None =>
dynamoReindexJobs.markCompleteJob(job, lastProcessedResult)
}
})
.recover {
case NonFatal(e) =>
Logger.error(s"Error when processing reindexing step of job $job", e)
dynamoReindexJobs.markFailedJob(job)
}
}
private def sendToKinesisStream(storyPackage: StoryPackage, job: RunningJob): Future[Unit] = {
(for {
packageId <- storyPackage.id
displayName <- storyPackage.name
} yield {
Logger.info(s"Getting stored package with id $packageId from S3")
frontsApi.amazonClient.collection(packageId).map {
case Some(collectionJson) =>
Logger.info(s"Sending reindex message on kinesis stream for package ${storyPackage.id}")
if (storyPackage.deleted.getOrElse(false)) {
kinesisEventSender.putReindexDelete(packageId, displayName, collectionJson, job.isHidden)
} else {
kinesisEventSender.putReindexUpdate(packageId, displayName, collectionJson, job.isHidden)
}
case None =>
Logger.info(s"Ignore reindex of empty story package $packageId")
}
}).getOrElse({
Logger.error(s"Story package $storyPackage doesn't have id or name")
Future.successful(None)
})
}
def getJobProgress(isHidden: Boolean): Option[ReindexProgress] = {
dynamoReindexJobs.jobInProgress(isHidden).orElse(dynamoReindexJobs.getLastStartedJob(isHidden))
}
}