app/model/jobs/JobRunner.scala (84 lines of code) (raw):

package model.jobs import scala.jdk.CollectionConverters._ import com.google.common.util.concurrent.{AbstractScheduledService, ServiceManager} import com.google.common.util.concurrent.AbstractScheduledService.Scheduler import play.api.inject.ApplicationLifecycle import scala.concurrent.{ExecutionContext, Future} import javax.inject._ import helpers.JodaDateTimeFormat._ import org.joda.time.{DateTime, DateTimeZone} import scala.concurrent.duration._ import play.api.Logging import repositories._ import services.Dynamo import scala.util.control.NonFatal import java.net.InetAddress import java.util.concurrent.TimeUnit import scala.util.Random import scala.jdk.CollectionConverters._ @Singleton class JobRunner @Inject() (lifecycle: ApplicationLifecycle)(implicit ec: ExecutionContext) extends Logging { // Scheduler boiler plate val serviceManager = new ServiceManager(List(new JobRunnerScheduler(this)).asJava) lifecycle.addStopHook{ () => Future.successful(stop) } serviceManager.startAsync() def stop: Unit = { serviceManager.stopAsync() serviceManager.awaitStopped(20, TimeUnit.SECONDS) } def tryRun() = { try { run() } catch { case NonFatal(e) => { logger.error(s"An unexpected exception occurred in the job runner: ${e.getStackTrace}") } } } def run() = { val currentTime = new DateTime(DateTimeZone.UTC).getMillis val lockBreakTime = currentTime - JobRunner.lockTimeOutMillis // Shuffling the jobs reduces the chance of each Tag Manager instance picking up the same job if it is long-running or stalled val allJobs = JobRepository.loadAllJobs val jobs = Random.shuffle(allJobs) jobs .filter(validJobs) .find(job => isPotentialJob(job, currentTime, lockBreakTime)) // Find first potential job .flatMap(JobRepository.lock(_, JobRunner.nodeId, currentTime, lockBreakTime)) // Lock it (will return None if lock fails) .foreach(job => { // If we got a job, and locked it then process it try { job.process } catch { case NonFatal(e) => { // This catch exists to prevent an unexpected failure knocking over the entire job runner logger.error(s"Background job failed on ${JobRunner.nodeId}. $e") } } finally { job.checkIfComplete() JobRepository.upsertJobIfOwned(job, JobRunner.nodeId) JobRepository.unlock(job, JobRunner.nodeId) } }) } private def isPotentialJob(job: Job, currentTime: Long, lockBreakTime: Long): Boolean = { // Either waiting job OR the job is locked but it's lock has timed out (job.jobStatus == JobStatus.waiting && job.waitUntil < currentTime) || (job.jobStatus == JobStatus.owned && job.lockedAt < lockBreakTime) } private def validJobs(job: Job): Boolean = { if (job.jobStatus == JobStatus.complete || job.jobStatus == JobStatus.failed || job.jobStatus == JobStatus.rolledback) { if (job.createdAt < new DateTime(DateTimeZone.UTC).getMillis - JobRunner.cleanUpMillis) { logger.info("Cleaning up old job: " + job.id) JobRepository.deleteIfTerminal(job.id) } return false; } return true; } } object JobRunner { val nodeId = InetAddress.getLocalHost().toString() // EC2 machines have a single eth0 so this should work? val lockTimeOutMillis = 1000 * 60 * 5 val cleanUpMillis = 1000 * 60 * 60 * 24 } class JobRunnerScheduler(runner: JobRunner) extends AbstractScheduledService { override def runOneIteration(): Unit = runner.tryRun() override def scheduler(): Scheduler = Scheduler.newFixedDelaySchedule(1, 10, TimeUnit.SECONDS) }