app/com/gu/floodgate/reindex/ProgressTracker.scala (173 lines of code) (raw):
package com.gu.floodgate.reindex
import org.apache.pekko.actor.{Actor, ActorLogging, Cancellable, Props}
import com.gu.floodgate.contentsource.ContentSource
import com.gu.floodgate.jobhistory.{JobHistory, JobHistoryService}
import com.gu.floodgate.reindex.ProgressTracker.{Cancel, TrackProgress, UpdateProgress}
import com.gu.floodgate.runningjob.{RunningJob, RunningJobService}
import com.typesafe.scalalogging.StrictLogging
import org.joda.time.{DateTime, Minutes}
import play.api.libs.ws.{WSClient, WSResponse}
import scala.concurrent.duration._
import scala.util.{Failure, Success, Try}
import com.gu.floodgate.Formats._
object ProgressTracker {
def props(ws: WSClient, runningJobService: RunningJobService, jobHistoryService: JobHistoryService) =
Props(new ProgressTracker(ws, runningJobService, jobHistoryService))
case class TrackProgress(contentSource: ContentSource, runningJob: RunningJob)
case class UpdateProgress(result: Try[WSResponse], contentSource: ContentSource, runningJob: RunningJob)
case class Cancel(contentSource: ContentSource, runningJob: RunningJob)
}
class ProgressTracker(ws: WSClient, runningJobService: RunningJobService, jobHistoryService: JobHistoryService)
extends Actor
with ActorLogging
with StrictLogging {
import context.become
import context.dispatcher
private val FailedAttemptsToRetrieveProgressLimit = 30
private val PollInterval = 2.seconds
private var nextPollSchedule: Option[Cancellable] = None
private var failedAttemptsToRetrieveProgress = 0
final def receive = sleeping
private def sleeping: Receive = {
case TrackProgress(contentSource, runningJob) =>
askForProgress(contentSource, runningJob)
become(waitingForProgress)
case Cancel(contentSource, runningJob) => completeProgressTracking(Cancelled, contentSource, runningJob)
case other => logger.warn(s"Unexpected message received while sleeping: $other")
}
private def waitingForProgress: Receive = {
case UpdateProgress(result, contentSource, runningJob) =>
updateProgress(result, contentSource, runningJob)
become(sleeping) // no matter whether the result is good or bad, we should go back to sleeping
case Cancel =>
become(waitingForFinalProgress) // don't stop ourselves until we have received the final response (just to avoid a dead letter)
case other => logger.warn(s"Unexpected message received while waiting for response: $other")
}
private def waitingForFinalProgress: Receive = {
case UpdateProgress(_, contentSource, runningJob) =>
completeProgressTracking(Cancelled, contentSource, runningJob) // we've received our last message, so now we can stop (we ignore the response)
case Cancel =>
log.debug("Ignoring a Cancel message because we're already planning to stop after we receive the final response")
case other => logger.warn(s"Unexpected message received while waiting for final response: $other")
}
private def askForProgress(contentSource: ContentSource, runningJob: RunningJob): Unit = {
val myself = self // make this a val to fix its value, because the onComplete will occur later in a different thread
ws.url(contentSource.reindexEndpoint).get() onComplete { result =>
myself ! UpdateProgress(result, contentSource, runningJob)
}
}
private def updateProgress(result: Try[WSResponse], contentSource: ContentSource, runningJob: RunningJob): Unit = {
result match {
case Success(response) =>
response.status match {
case 200 => onSuccess(response, contentSource, runningJob)
case _ => {
logger.warn(
s"Content source with id: ${contentSource.id} returned a http ${response.status} response to a progress request: ${response.body}"
)
onFailure(contentSource, runningJob)
}
}
case Failure(e) => {
logger.error(
s"Failure while calling content source: ${contentSource.id} for progress request: ${e.getMessage}"
)
onFailure(contentSource, runningJob)
}
}
}
private def onSuccess(response: WSResponse, contentSource: ContentSource, runningJob: RunningJob): Unit = {
response.json
.validate[Progress]
.fold(
error => {
logger.warn(
s"Content source with id: ${contentSource.id} appears to be returning progress updates in an incorrect format. Marking reindex as cancelled and stop monitoring reindex."
)
// TODO: try to cancel the reindex op on the source, rather than just stop paying any attention to it
completeProgressTracking(Cancelled, contentSource, runningJob)
},
progress => actOnProgress(progress, contentSource, runningJob)
)
}
private def onFailure(contentSource: ContentSource, runningJob: RunningJob): Unit = {
if (FailedAttemptsToRetrieveProgressLimit == failedAttemptsToRetrieveProgress) {
failedAttemptsToRetrieveProgress = 0
logger.warn(s"Failing reindex after $failedAttemptsToRetrieveProgress failed attempts to get progress")
completeProgressTracking(Failed, contentSource, runningJob)
} else {
failedAttemptsToRetrieveProgress += 1
scheduleNextUpdate(contentSource, runningJob)
}
}
private def actOnProgress(progress: Progress, contentSource: ContentSource, runningJob: RunningJob) = {
progress.status match {
case Completed =>
val runningJobUpdate = RunningJob(
runningJob.contentSourceId,
runningJob.contentSourceEnvironment,
progress.documentsIndexed,
progress.documentsExpected,
runningJob.startTime,
runningJob.rangeFrom,
runningJob.rangeTo
)
completeProgressTracking(Completed, contentSource, runningJobUpdate)
case Failed => {
logger.warn(s"Failing reindex after failed progress update from content source: " +
s"Documents expected: ${progress.documentsExpected}, " +
s"Documents indexed: ${progress.documentsIndexed}," +
s"Status: ${progress.status}")
completeProgressTracking(Failed, contentSource, runningJob)
}
case InProgress =>
val runningJobUpdate = RunningJob(
runningJob.contentSourceId,
runningJob.contentSourceEnvironment,
progress.documentsIndexed,
progress.documentsExpected,
runningJob.startTime,
runningJob.rangeFrom,
runningJob.rangeTo
)
runningJobService.updateRunningJob(
runningJob.contentSourceId,
runningJob.contentSourceEnvironment,
runningJobUpdate
)
scheduleNextUpdate(contentSource, runningJob)
case _ =>
logger.warn(
s"Incorrect status sent from client: ${progress.status.toString} for content source: ${contentSource.id}"
)
}
}
private def completeProgressTracking(
status: ReindexStatus,
contentSource: ContentSource,
runningJob: RunningJob
): Unit = {
def cleanupAndStop(): Unit = {
nextPollSchedule foreach { _.cancel() } // cancel any schedule that we might have set up
context.stop(self)
}
logger.info(s"Completing progress tracking for job from '${contentSource.id}' with reindex status: ${status}")
val jobHistory = JobHistory(
runningJob.contentSourceId,
runningJob.startTime,
new DateTime(),
status,
runningJob.contentSourceEnvironment,
runningJob.rangeFrom,
runningJob.rangeTo,
Some(runningJob.documentsExpected),
Some(runningJob.documentsIndexed)
)
runningJobService.removeRunningJob(runningJob.contentSourceId, runningJob.contentSourceEnvironment)
jobHistoryService.createJobHistory(jobHistory)
cleanupAndStop()
}
private def scheduleNextUpdate(contentSource: ContentSource, runningJob: RunningJob): Unit = {
val schedule = context.system.scheduler.scheduleOnce(PollInterval, self, TrackProgress(contentSource, runningJob))
nextPollSchedule = Some(schedule)
}
}