app/model/jobs/Job.scala (90 lines of code) (raw):

package model.jobs import com.amazonaws.services.dynamodbv2.document.Item import play.api.libs.json._ import play.api.libs.functional.syntax._ import ai.x.play.json.Jsonx import ai.x.play.json.Encoders.encoder import model.jobs.steps._ import model.{AppAudit, Tag, TagAudit} import repositories._ import helpers.JodaDateTimeFormat._ import org.joda.time.{DateTime, DateTimeZone} import scala.concurrent.ExecutionContext import scala.util.control.NonFatal case class Job( id: Long, // Useful so users can report job failures title: String, createdBy: Option[String], steps: List[Step], // What are the steps in this job tagIds: List[Long] = List(), // List of all the tags associated with this job rollbackEnabled: Boolean = false, lockedAt: Long = 0, ownedBy: Option[String] = None, // Which node current owns this job var jobStatus: String = JobStatus.waiting, // Waiting, Owned, Failed, Complete var waitUntil: Long = new DateTime(DateTimeZone.UTC).getMillis, // Signal to the runner to wait until a given time before processing createdAt: Long = new DateTime().getMillis // Created at in local time ) { /** Process the current step of a job * returns a bool which tells the job runner to requeue the job in dynamo * or simply continue processing. */ def process(implicit ec: ExecutionContext) = { steps.find(_.stepStatus != StepStatus.complete).foreach { step => step.stepStatus match { case StepStatus.ready => processStep(step) case StepStatus.processed => checkStep(step) case StepStatus.failed => failJob(step) case _ => {} } waitUntil = new DateTime(DateTimeZone.UTC).getMillis() + step.waitDuration.map(_.toMillis).getOrElse(0L) } } def processStep(step: Step)(implicit ec: ExecutionContext) = { try { step.processStep } catch { case NonFatal(e) => { jobStatus = JobStatus.failed } } } def checkStep(step: Step)(implicit ec: ExecutionContext) = { try { step.checkStep } catch { case NonFatal(e) => { jobStatus = JobStatus.failed } } } def failJob(step: Step) = { jobStatus = JobStatus.failed } def checkIfComplete() = { if (steps.find(_.stepStatus != StepStatus.complete).isEmpty){ jobStatus = JobStatus.complete } } def rollback = { if (rollbackEnabled) { val revSteps = steps.reverse revSteps.foreach(step => step.rollbackStep()) jobStatus = JobStatus.rolledback } } def toItem = Item.fromJSON(Json.toJson(this).toString()) } object Job { implicit val jobFormat: Format[Job] = Jsonx.formatCaseClassUseDefaults[Job] def fromItem(item: Item): Job = try { Json.parse(item.toJSON).as[Job] } catch { case NonFatal(e) => { println(e.printStackTrace()) throw e } } } /** The job status is used to indicate if a job can be picked up off by a node as well as indicating progress * to clients. */ object JobStatus { /** This job is waiting to be serviced */ val waiting = "waiting" /** This job is owned by a node */ val owned = "owned" /** This job is complete */ val complete = "complete" /** This job has failed */ val failed = "failed" /** This job has been rolled back by a user */ val rolledback = "rolledback" }