in app/story_packages/updates/Reindex.scala [78:108]
private def processJob(job: RunningJob, step: ReindexPage) = {
Logger.info(s"Processing reindex job step with ${step.list.size} packages out of ${step.totalCount}")
val notifyEvery = math.ceil(step.totalCount / 100.0)
step.list.foldLeft(Future.successful(0)) {
(previousFuture, nextPackage) =>
for {
processedResults <- previousFuture
_ <- sendToKinesisStream(nextPackage, job)
} yield {
if (processedResults % notifyEvery == 0) dynamoReindexJobs.markProgressUpdate(job, processedResults)
processedResults + 1
}
}
.map(lastProcessedResult => {
Logger.info(s"Processing reindex job $job, step completed successfully")
step.next match {
case Some(nextPage) =>
// TODO iterate on next page
dynamoReindexJobs.markCompleteJob(job, lastProcessedResult)
Logger.error("Next page not implemented")
case None =>
dynamoReindexJobs.markCompleteJob(job, lastProcessedResult)
}
})
.recover {
case NonFatal(e) =>
Logger.error(s"Error when processing reindexing step of job $job", e)
dynamoReindexJobs.markFailedJob(job)
}
}