app/streamcomponents/GeneralValidationComponent.scala (66 lines of code) (raw):
package streamcomponents
import akka.stream.{Attributes, FlowShape, Inlet, Materializer, Outlet}
import akka.stream.stage.{AbstractInHandler, AbstractOutHandler, GraphStage, GraphStageLogic}
import models.{ProjectEntry, ProjectType, ValidationJob, ValidationProblem}
import play.api.db.slick.DatabaseConfigProvider
import slick.jdbc.PostgresProfile
import slick.lifted.AbstractTable
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
/**
* Represents an Akka stage that can be used as a validation component.
*
* Any implentor must take in a data record from the given table, and if a problem is detected it should output
* a ValidationProblem record.
*
* The data type of the incoming element is the record type, e.g. for GeneralValidationComponent[ProjectEntryRow] then
* the incoming element is of type `ProjectEntry`, NOT `ProjectEntryRow`.
*
* If no problem is detected then it should simply re-pull the input and output nothing.
* @tparam E the row type of the table that is being queried, e.g. ProjectEntryRow
*/
abstract class GeneralValidationComponent[E<:AbstractTable[_]](dbConfigProvider:DatabaseConfigProvider)(implicit ec:ExecutionContext) extends GraphStage[FlowShape[E#TableElementType,ValidationProblem]] {
protected val in:Inlet[E#TableElementType]
protected val out:Outlet[ValidationProblem]
/**
* an implementor must supply this function.
* It takes a stream record as a parameter and returns None if no problem was detected or a ValidationProblem
* if one was detected
* @param elem the incoming stream element
* @return a Future containing either a ValidationProblem or None. This future should fail on error.
*/
def handleRecord(elem:E#TableElementType):Future[Option[ValidationProblem]]
/**
* callback that allows an implementor to emit a custom error message if an exception is detected
* @param elem the stream element on which the error occurred
* @param err the error that happened
*/
def logError(elem:E#TableElementType, err:Throwable):Unit = {}
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
private implicit val db = dbConfigProvider.get[PostgresProfile].db
protected var asyncInProgress = false
protected var shouldComplete = false
val noProblemCb = createAsyncCallback[Unit](_=>{
if(shouldComplete) {
completeStage();
} else {
pull(in)
}
})
val problemDetectedCb = createAsyncCallback[ValidationProblem](entry=>{
push(out, entry)
if(shouldComplete) completeStage()
})
val errorCb = createAsyncCallback[Throwable](err=>failStage(err))
setHandler(out, new AbstractOutHandler {
override def onPull(): Unit = pull(in)
})
setHandler(in, new AbstractInHandler {
override def onPush(): Unit = {
val elem = grab(in)
asyncInProgress = true
handleRecord(elem).onComplete({
case Success(Some(problem))=>
asyncInProgress = false
problemDetectedCb.invoke(problem)
case Success(None)=>
asyncInProgress = false
noProblemCb.invoke()
case Failure(exception)=>
asyncInProgress = false
errorCb.invoke(exception)
})
}
override def onUpstreamFinish(): Unit = {
if(asyncInProgress) {
shouldComplete = true
} else {
completeStage()
}
}
override def onUpstreamFailure(ex: Throwable): Unit = {
if(asyncInProgress) {
shouldComplete = true
} else {
completeStage()
}
}
})
}
}