app/services/ValidateProject.scala (103 lines of code) (raw):

package services import akka.actor.{Actor, ActorRef, ActorSystem} import akka.stream.{ActorMaterializer, ClosedShape, Materializer} import akka.stream.scaladsl.{Balance, Flow, GraphDSL, Merge, RunnableGraph, Sink} import javax.inject.{Inject, Singleton} import models.{FileEntryDAO, FileEntryRow, ProjectEntry, ProjectEntryRow, ValidationJob, ValidationJobDAO, ValidationJobStatus, ValidationJobType, ValidationProblem, ValidationProblemDAO} import org.slf4j.LoggerFactory import play.api.Configuration import play.api.db.slick.DatabaseConfigProvider import play.api.inject.Injector import slick.lifted.{AbstractTable, Rep, TableQuery} import streamcomponents.{FileValidationComponent, FindMislinkedProjectsComponent, FindUnlinkedProjects, GeneralValidationComponent, ProjectSearchSource, ProjectValidationComponent} import slick.jdbc.PostgresProfile.api._ import java.sql.Timestamp import java.time.Instant import java.util.UUID import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future import scala.util.{Failure, Success} object ValidateProject { trait VPMsg /* public messages that are expected to be received */ case class RequestValidation(job:ValidationJob) extends VPMsg /* public messages that will be sent in reply */ case class ValidationSuccess(jobId:UUID) extends VPMsg case class ValidationError(err:Throwable) extends VPMsg } @Singleton class ValidateProject @Inject()(config:Configuration, dbConfigProvider:DatabaseConfigProvider, validationJobDAO:ValidationJobDAO, validationProblemDAO:ValidationProblemDAO)(implicit mat:Materializer, injector:Injector, fileEntryDAO:FileEntryDAO) extends Actor { import ValidateProject._ private val logger = LoggerFactory.getLogger(getClass) /** * build a stream to perform the validation. * this consists of a database search as a source, a number of parallel verification threads and a folder to collect * up all records that failed. * @param parallelism * @param queryFunc * @return */ def buildStream[E <:AbstractTable[_]](switcherFactory:GeneralValidationComponent[E], parallelism:Int=4, batchSize:Int=20)(queryFunc: Query[E, E#TableElementType, Seq]) = { val sinkFactory = Sink.foreach[Seq[ValidationProblem]](validationProblemDAO.batchInsertIntoDb) type T = E#TableElementType GraphDSL.create(sinkFactory) { implicit builder=> sink=> import akka.stream.scaladsl.GraphDSL.Implicits._ val src = builder.add(new ProjectSearchSource(dbConfigProvider)(queryFunc)) val distrib = builder.add(Balance[T](parallelism)) val noMerge = builder.add(Merge[ValidationProblem](parallelism)) val batcher = builder.add(Flow[ValidationProblem].grouped(batchSize)) src.out.log("services.ValidateProject") ~> distrib for(i<- 0 until parallelism){ val switcher = builder.add(switcherFactory) distrib.out(i) ~> switcher ~> noMerge } noMerge ~> batcher ~> sink ClosedShape } } /** * runs the validation stream with the given database query * * @param switcherFactory Akka flow constructor class for the validator. This must receive data from the data type of the * given TableQuery and output ValidationProblem reports * @param parallelism number of validations to run concurrently * @param queryFunc Slick query that determines the data source to validate, e.g. `TableQuery[ProjectEntryRow]` * @tparam E Data type of the Slick table that is being queried e.g. `ProjectEntryRow`. * This can be inferred from the `queryFunc` and `switcherFactory` arguments * @return A Future, which completes with no value when the stream finishes */ def runStream[E <:AbstractTable[_]](switcherFactory:GeneralValidationComponent[E], parallelism:Int=4)(queryFunc:Query[E, E#TableElementType, Seq]) = RunnableGraph.fromGraph(buildStream(switcherFactory, parallelism)(queryFunc)).run() /** * return the total number of records matching the query via a `SELECT COUNT` query * @param queryFunc Slick query that determines the data source to validate, e.g. `TableQuery[ProjectEntryRow]` * @tparam E Data type of the Slick table that is being queried e.g. `ProjectEntryRow`. * This can be inferred from the `queryFunc` and `switcherFactory` arguments * @return A Future, which completes with an integer representing the nmber of matching rows */ def getTotalCount[E <:AbstractTable[_]](queryFunc: Query[E, E#TableElementType, Seq]) = { val db = dbConfigProvider.get.db db.run(queryFunc.size.result) } /** * performs validation by checking the total count of projects matching the query and running the verification * stream and returns the result as a ValidationSuccess object in a Future. * If the operation fails, then the future fails; catch this with .recover() * * @param switcherFactory Akka flow constructor class for the validator. This must receive data from the data type of the * given TableQuery and output ValidationProblem reports * @param parallelism number of validations to run concurrently * @param queryFunc Slick query that determines the data source to validate, e.g. `TableQuery[ProjectEntryRow]` * @tparam E Data type of the Slick table that is being queried e.g. `ProjectEntryRow`. * This can be inferred from the `queryFunc` and `switcherFactory` arguments * @return A Future, which completes once the validation is done, returning an integer of the number of * records queried as determined by a `SELECT COUNT` carried out at the start of the validation run */ def performValidation[E <:AbstractTable[_]](switcherFactory:GeneralValidationComponent[E], parallelism:Int=4)(queryFunc: Query[E, E#TableElementType, Seq]) = { for { c <- getTotalCount(queryFunc) r <- runStream(switcherFactory)(queryFunc) } yield c } def runRequestedValidation(job:ValidationJob):Future[Int] = { job.jobType match { case ValidationJobType.CheckAllFiles=> performValidation(new ProjectValidationComponent(dbConfigProvider, job))(TableQuery[ProjectEntryRow]) case ValidationJobType.CheckSomeFiles=> Future.failed(new RuntimeException("CheckSomeFiles has not been implemented yet")) case ValidationJobType.MislinkedPTR=> performValidation(new FindMislinkedProjectsComponent(dbConfigProvider, job))(TableQuery[ProjectEntryRow]) case ValidationJobType.UnlinkedProjects=> performValidation(new FindUnlinkedProjects(dbConfigProvider, job))(TableQuery[ProjectEntryRow]) case ValidationJobType.UnlinkedFiles=> performValidation(new FileValidationComponent(dbConfigProvider, job))(TableQuery[FileEntryRow].filter(_.hasContent===true)) case ValidationJobType.UnlinkedFilesWithBlanks=> performValidation(new FileValidationComponent(dbConfigProvider, job))(TableQuery[FileEntryRow]) } } override def receive: Receive = { case RequestValidation(job:ValidationJob)=> val originalSender = sender() logger.info(s"${job.uuid}: Received validation request for ${job.jobType} from ${job.userName}") val result = for { inProgressJob <- validationJobDAO.writeJob(job.copy(status = ValidationJobStatus.Running, startedAt = Some(Timestamp.from(Instant.now())))) validationResult <- runRequestedValidation(inProgressJob) } yield (inProgressJob, validationResult) result.map(result=>{ logger.info(s"${job.uuid}: Validation completed successfully, processed ${result._2} records") validationJobDAO.setJobCompleted(job) }).onComplete({ case Success(_)=> originalSender ! ValidationSuccess(job.uuid) case Failure(err)=> val failedJob = job.copy(status=ValidationJobStatus.Failure, completedAt=Some(Timestamp.from(Instant.now()))) validationJobDAO.writeJob(failedJob).onComplete({ case Success(_)=>() case Failure(err)=> logger.error(s"${job.uuid}: Could not write validation job failure to the database: ${err.getMessage}", err) }) logger.error(s"${job.uuid}: Validation failed: ${err.getMessage}", err) }) } }