app/lib/ScanScheduler.scala (78 lines of code) (raw):

package lib import org.apache.pekko.actor.ActorSystem import java.time.Instant import java.time.Instant.now import java.time.temporal.ChronoUnit.MINUTES import com.madgag.github.Implicits._ import com.madgag.scalagithub.GitHub import com.madgag.scalagithub.model.RepoId import com.madgag.time.Implicits._ import lib.labels.Seen import play.api.Logging import play.api.libs.concurrent.Pekko import java.util.concurrent.atomic.AtomicReference import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future import scala.util.{Failure, Success} object ScanScheduler { class Factory( droid: Droid, conn: GitHub, actorSystem: ActorSystem, delayer: Delayer ) extends Logging { def createFor(repoId: RepoId): ScanScheduler = { logger.info(s"Creating scheduler for $repoId") new ScanScheduler( repoId, droid, actorSystem, delayer ) } } } class ScanScheduler( repoId: RepoId, droid: Droid, actorSystem: ActorSystem, delayer: Delayer ) extends Logging { selfScanScheduler => val earliestFollowUpScanTime: AtomicReference[Instant] = new AtomicReference(Instant.now()) private val dogpile = new Dogpile(delayer.delayTheFuture { logger.info(s"In the dogpile for $repoId...") val summariesF = droid.scan(repoId) for { summariesTry <- summariesF.trying } yield { summariesTry match { case Failure(e) => logger.error(s"Scanning $repoId failed", e) case Success(summaries) => logger.info(s"$selfScanScheduler : ${summaries.size} summaries for ${repoId.fullName}:\n${summaries.map(s => s"#${s.prCheckpointDetails.pr.prId.slug} changed=${s.changed.map(_.snapshot.checkpoint.name)}").mkString("\n")}") val scanTimeForUnseenOpt = summaries.find(!_.checkpointStatuses.all(Seen)).map(_ => now.plus(1L, MINUTES)) val overdueTimes = summaries.collect { case summary => summary.prCheckpointDetails.soonestPendingCheckpointOverdueTime }.flatten val candidateFollowUpScanTimes = overdueTimes ++ scanTimeForUnseenOpt if (candidateFollowUpScanTimes.nonEmpty) { val earliestCandidateScanTime: Instant = candidateFollowUpScanTimes.min earliestFollowUpScanTime.updateAndGet { oldFollowupTime => if (now.isAfter(oldFollowupTime) || earliestCandidateScanTime.isBefore(oldFollowupTime)) { actorSystem.scheduler.scheduleOnce(java.time.Duration.between(now, earliestCandidateScanTime)) { scan() } earliestCandidateScanTime } else oldFollowupTime } } } summariesTry.get } }) def scan(): Future[Seq[PullRequestCheckpointsStateChangeSummary]] = { logger.info(s"Asked for a scan on $repoId...") dogpile.doAtLeastOneMore() } }