app/lib/RepoSnapshot.scala (181 lines of code) (raw):

/* * Copyright 2014 The Guardian * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package lib import org.apache.pekko.stream.Materializer import com.madgag.git._ import com.madgag.github.Implicits._ import com.madgag.scala.collection.decorators._ import com.madgag.scalagithub.GitHub import com.madgag.scalagithub.GitHub._ import com.madgag.scalagithub.model.{PullRequest, Repo, RepoId} import com.madgag.time.Implicits._ import io.lemonlabs.uri.Url import lib.Config.Checkpoint import lib.gitgithub.LabelMapping import lib.labels._ import org.eclipse.jgit.lib.{ObjectId, Repository} import org.eclipse.jgit.revwalk.{RevCommit, RevWalk} import play.api.Logging import java.time.ZonedDateTime import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent._ import scala.concurrent.duration._ import scala.util.Success object RepoSnapshot { val MaxPRsToScanPerRepo = 30 val WorthyOfScanWindow: java.time.Duration = 14.days val WorthyOfCommentWindow: java.time.Duration = 12.hours val ClosedPRsMostlyRecentlyUpdated: Map[String, String] = Map("state" -> "closed", "sort" -> "updated", "direction" -> "desc") class Factory( bot: Bot )(implicit mat: Materializer, checkpointSnapshoter: CheckpointSnapshoter ) { implicit val github: GitHub = bot.github def log(message: String)(implicit repo: Repo): Unit = logger.info(s"${repo.full_name} - $message") def logAround[T](desc: String)(thunk: => Future[T])(implicit repo: Repo): Future[T] = { val start = System.currentTimeMillis() val fut = thunk // evaluate thunk, evaluate only once! fut.onComplete { attempt => val elapsedMs = System.currentTimeMillis() - start log(s"'$desc' $elapsedMs ms : success=${attempt.isSuccess}") } fut } def isMergedToMain(pr: PullRequest)(implicit repo: Repo): Boolean = pr.merged_at.isDefined && pr.base.ref == repo.default_branch def snapshot(repoId: RepoId): Future[RepoSnapshot] = for { githubRepo <- github.getRepo(repoId) repoSnapshot <- snapshot(githubRepo) } yield repoSnapshot def snapshot(implicit githubRepo: Repo): Future[RepoSnapshot] = { val mergedPullRequestsF = logAround("fetch PRs")(fetchMergedPullRequests()) val hooksF = logAround("fetch repo hooks")(fetchRepoHooks()) val gitRepoF = logAround("fetch git repo")(fetchLatestCopyOfGitRepo()) for { mergedPullRequests <- mergedPullRequestsF gitRepo <- gitRepoF hooks <- hooksF } yield RepoSnapshot( RepoLevelDetails(githubRepo, gitRepo, hooks), mergedPullRequests, checkpointSnapshoter ) } def prSnapshot(prNumber: Int)(implicit repo: Repo): Future[PRSnapshot] = for { prResponse <- repo.pullRequests.get(prNumber) pr = prResponse.result labelsResponse <- pr.labels.list().all() } yield PRSnapshot(pr, labelsResponse) def fetchMergedPullRequests()(implicit repo: Repo): Future[Seq[PRSnapshot]] = { val now = ZonedDateTime.now() val timeThresholdForScan = now.minus(WorthyOfScanWindow) def isNewEnoughToBeWorthScanning(pr: PullRequest) = pr.merged_at.exists(_.isAfter(timeThresholdForScan)) (for { litePullRequests: Seq[PullRequest] <- repo.pullRequests.list(ClosedPRsMostlyRecentlyUpdated).take(2).all(): Future[Seq[PullRequest]] pullRequests <- Future.traverse(litePullRequests.filter(isMergedToMain).filter(isNewEnoughToBeWorthScanning).take(MaxPRsToScanPerRepo))(pr => prSnapshot(pr.number)) } yield { log(s"PRs merged to master size=${pullRequests.size}") pullRequests }) andThen { case cprs => log(s"Merged Pull Requests fetched: ${cprs.map(_.map(_.pr.number).sorted.reverse)}") } } private def fetchLatestCopyOfGitRepo()(implicit githubRepo: Repo): Future[Repository] = { Future { val repoId = githubRepo.repoId RepoUtil.getGitRepo( bot.workingDir.resolve(s"${repoId.owner}/${repoId.name}").toFile, githubRepo.clone_url, Some(bot.git)) } andThen { case r => log(s"Git Repo ref count: ${r.map(_.getRefDatabase.getRefs.size)}") } } private def fetchRepoHooks()(implicit githubRepo: Repo) = if (githubRepo.permissions.exists(_.admin)) githubRepo.hooks.list().map(_.flatMap(_.config.get("url").map(Url.parse))).all() else { log(s"No admin rights to check hooks") Future.successful(Seq.empty) } } } case class Diagnostic( snapshots: Set[CheckpointSnapshot], prDetails: Seq[PRCheckpointDetails] ) { val snapshotsByCheckpoint: Map[Checkpoint, CheckpointSnapshot] = snapshots.map(s => s.checkpoint -> s).toMap } case class RepoLevelDetails( repo: Repo, gitRepo: Repository, hooks: Seq[Url] ) extends Logging { implicit val repoThreadLocal: ThreadLocalObjectDatabaseResources = gitRepo.getObjectDatabase.threadLocalResources lazy val mainCommit:RevCommit = { val id: ObjectId = gitRepo.resolve(repo.default_branch) logger.info(s"Need to look at ${repo.full_name}, branch:${repo.default_branch} commit $id") assert(id != null) id.asRevCommit(new RevWalk(repoThreadLocal.reader())) } lazy val config: Config.RepoConfig = ConfigFinder.config(mainCommit) def createFileFinder(): FileFinder = new FileFinder(mainCommit) } case class RepoSnapshot( repoLevelDetails: RepoLevelDetails, mergedPullRequestSnapshots: Seq[PRSnapshot], checkpointSnapshoter: CheckpointSnapshoter ) extends Logging { val repo: Repo = repoLevelDetails.repo val config: Config.RepoConfig = repoLevelDetails.config val mergedPRs: Seq[PullRequest] = mergedPullRequestSnapshots.map(_.pr) val updateReporters: Seq[UpdateReporter] = Seq.empty lazy val affectedFoldersByPullRequest: Map[PullRequest, Set[String]] = { implicit val revWalk = new RevWalk(repoLevelDetails.repoThreadLocal.reader()) (for { pr <- mergedPRs } yield pr -> GitChanges.affects(pr, repoLevelDetails.config.foldersWithValidConfig)).toMap } lazy val pullRequestsByAffectedFolder : Map[String, Set[PullRequest]] = repoLevelDetails.config.foldersWithValidConfig.map { folder => folder -> mergedPRs.filter(pr => affectedFoldersByPullRequest(pr).contains(folder)).toSet }.toMap logger.info(s"${repoLevelDetails.repo.full_name} pullRequestsByAffectedFolder : ${pullRequestsByAffectedFolder.mapV(_.map(_.number))}") lazy val activeConfByPullRequest: Map[PullRequest, Set[ConfigFile]] = affectedFoldersByPullRequest.mapV { _.map(repoLevelDetails.config.validConfigByFolder(_)) } lazy val activeCheckpointsByPullRequest: Map[PullRequest, Set[Checkpoint]] = activeConfByPullRequest.mapV { _.flatMap(_.checkpointSet) } val allAvailableCheckpoints: Set[Checkpoint] = repoLevelDetails.config.checkpointsByName.values.toSet val allPossibleCheckpointPRLabels: Set[String] = for { prLabel <- PullRequestLabel.all checkpoint <- allAvailableCheckpoints } yield prLabel.labelFor(checkpoint.name) def diagnostic(): Future[Diagnostic] = for { snapshots <- snapshotOfAllAvailableCheckpoints() } yield Diagnostic(snapshots, mergedPRs.map { pr => PRCheckpointDetails(pr, snapshots.filter(s => activeCheckpointsByPullRequest(pr).contains(s.checkpoint)), repoLevelDetails.gitRepo) }) def snapshotOfAllAvailableCheckpoints(): Future[Set[CheckpointSnapshot]] = Future.sequence(allAvailableCheckpoints.map(takeCheckpointSnapshot)) val activeCheckpoints: Set[Checkpoint] = activeCheckpointsByPullRequest.values.flatten.toSet lazy val snapshotsOfActiveCheckpointsF: Map[Checkpoint, Future[CheckpointSnapshot]] = activeCheckpoints.map { c => c -> takeCheckpointSnapshot(c) }.toMap def takeCheckpointSnapshot(checkpoint: Checkpoint): Future[CheckpointSnapshot] = for ( possibleIdsTry <- checkpointSnapshoter.snapshot(checkpoint).trying ) yield { val objectIdTry = for (possibleIds <- possibleIdsTry) yield { possibleIds.map(repoLevelDetails.repoThreadLocal.reader().resolveExistingUniqueId).collectFirst { case Success(objectId) => objectId } } CheckpointSnapshot(checkpoint, objectIdTry) } lazy val activeSnapshotsF: Future[Set[CheckpointSnapshot]] = Future.sequence(activeCheckpoints.map(snapshotsOfActiveCheckpointsF)) def checkpointSnapshotsFor(pr: PullRequest, oldState: PRCheckpointState): Future[Set[CheckpointSnapshot]] = Future.sequence(activeCheckpointsByPullRequest(pr).filter(!oldState.hasSeen(_)).map(snapshotsOfActiveCheckpointsF)) val labelToStateMapping: LabelMapping[PRCheckpointState] = new LabelMapping[PRCheckpointState] { def labelsFor(s: PRCheckpointState): Set[String] = s.statusByCheckpoint.map { case (checkpointName, cs) => cs.labelFor(checkpointName) }.toSet def stateFrom(labels: Set[String]): PRCheckpointState = PRCheckpointState(activeCheckpoints.flatMap { checkpoint => PullRequestCheckpointStatus.fromLabels(labels, checkpoint).map(checkpoint.name -> _) }.toMap) } }