app/model/jobs/Step.scala (145 lines of code) (raw):
package model.jobs
import model.jobs.steps._
import ai.x.play.json.Jsonx
import ai.x.play.json.Encoders.encoder
import play.api.libs.json._
import play.api.libs.functional.syntax._
import play.api.Logging
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.util.control.NonFatal
trait Step extends Logging {
// Inner details
/** Do work. */
protected def process(implicit ec: ExecutionContext): Unit
/** Confirm this check ran successfully */
protected def check(implicit ec: ExecutionContext): Boolean
/** Undo this step */
protected def rollback: Unit
// Public methods that wrap the status updates
def processStep(implicit ec: ExecutionContext) = {
try {
beginProcessing()
process
doneProcessing()
} catch {
case NonFatal(e) => {
logger.error(s"Error thrown during step processing: ${e}")
processFailed()
throw e // Need to rethrow the exception to inform the job to start a rollback
}
}
}
def checkStep(implicit ec: ExecutionContext) = {
attempts += 1
if (attempts > Step.retryLimit) {
checkFailed()
throw TooManyAttempts(s"Too many attempts at checking ${this.`type`}")
}
try {
if (check) {
checkOk()
}
} catch {
case NonFatal(e) => {
logger.error(s"Error thrown during step check: ${e}")
checkFailed()
throw e // Need to rethrow the exception to inform the job to start a rollback
}
}
}
def rollbackStep() = {
try {
if ( stepStatus == StepStatus.processing
|| stepStatus == StepStatus.processed
|| stepStatus == StepStatus.complete
|| stepStatus == StepStatus.failed) {
rollback
stepStatus = StepStatus.rolledback
}
} catch {
case NonFatal(e) => stepStatus = StepStatus.rollbackfailed
}
}
/** The amount of time to wait inbetween steps */
def waitDuration: Option[Duration]
/** The type of this step - used in serialization */
val `type`: String
/** The number of attempts this step has made at checking */
var attempts: Int
/** The status of this step: 'ready' to be processed, 'processed', 'complete', or one of the failed states 'rolledback' and 'rollbackfailed'*/
var stepStatus: String
/** The current user friendly message for this step, utilizes the vals below. */
var stepMessage: String
// User friendly messages to help the user
val checkingMessage: String
val failureMessage: String
val checkFailMessage: String
// Helpers for setting status metadata
private def beginProcessing() = {
stepStatus = StepStatus.processing
stepMessage = "Processing..."
}
private def doneProcessing() = {
stepStatus = StepStatus.processed
stepMessage = checkingMessage
}
private def processFailed() = {
stepStatus = StepStatus.failed
stepMessage = failureMessage
}
private def checkOk() = {
stepStatus = StepStatus.complete
stepMessage = "Complete"
}
private def checkFailed() = {
stepStatus = StepStatus.failed
stepMessage = checkFailMessage
}
}
object Step extends Logging {
private val retryLimit = 10000
// Keep all the serialization stuff in here just so it's in one place
val addTagToContentFormat = Jsonx.formatCaseClassUseDefaults[ModifyContentTags]
val mergeTagForContentFormat = Jsonx.formatCaseClassUseDefaults[MergeTagForContent]
val reindexSectionsFormat = Jsonx.formatCaseClassUseDefaults[ReindexSections]
val reindexTagsFormat = Jsonx.formatCaseClassUseDefaults[ReindexTags]
val reindexPillarsFormat = Jsonx.formatCaseClassUseDefaults[ReindexPillars]
val removeTagFormat = Jsonx.formatCaseClassUseDefaults[RemoveTag]
val removeTagFromCapiFormat = Jsonx.formatCaseClassUseDefaults[RemoveTagFromCapi]
val removeTagFromContentFormat = Jsonx.formatCaseClassUseDefaults[RemoveTagFromContent]
val removeTagPathFormat = Jsonx.formatCaseClassUseDefaults[RemoveTagPath]
val stepWrites = new Writes[Step] {
override def writes(step: Step): JsValue = {
step match {
case s: ModifyContentTags => addTagToContentFormat.writes(s)
case s: MergeTagForContent => mergeTagForContentFormat.writes(s)
case s: ReindexSections => reindexSectionsFormat.writes(s)
case s: ReindexTags => reindexTagsFormat.writes(s)
case s: ReindexPillars => reindexPillarsFormat.writes(s)
case s: RemoveTag => removeTagFormat.writes(s)
case s: RemoveTagFromCapi => removeTagFromCapiFormat.writes(s)
case s: RemoveTagFromContent => removeTagFromContentFormat.writes(s)
case s: RemoveTagPath => removeTagPathFormat.writes(s)
case other => {
logger.warn(s"Attempted to serialize unknown step type ${other.getClass}")
throw new UnsupportedOperationException(s"unable to serialize step of type ${other.getClass}")
}
}
}
}
val stepReads = new Reads[Step] {
override def reads(json: JsValue): JsResult[Step] = {
(json \ "type").get match {
case JsString(ModifyContentTags.`type`) => addTagToContentFormat.reads(json)
case JsString(MergeTagForContent.`type`) => mergeTagForContentFormat.reads(json)
case JsString(ReindexSections.`type`) => reindexSectionsFormat.reads(json)
case JsString(ReindexTags.`type`) => reindexTagsFormat.reads(json)
case JsString(ReindexPillars.`type`) => reindexPillarsFormat.reads(json)
case JsString(RemoveTag.`type`) => removeTagFormat.reads(json)
case JsString(RemoveTagFromCapi.`type`) => removeTagFromCapiFormat.reads(json)
case JsString(RemoveTagFromContent.`type`) => removeTagFromContentFormat.reads(json)
case JsString(RemoveTagPath.`type`) => removeTagPathFormat.reads(json)
case _ => JsError("unexpected step type value")
}
}
}
implicit val stepFormat: Format[Step] = Format(stepReads, stepWrites)
}
// Step status is required so we know which steps require rollback
object StepStatus {
val ready = "ready"
val processing = "processing"
val processed = "processed"
val complete = "complete"
val failed = "failed"
val rolledback = "rolledback"
val rollbackfailed = "rollbackfailed"
}
case class TooManyAttempts(message: String) extends RuntimeException(message)