in app/streamcomponents/ProjectValidationComponent.scala [22:67]
override def shape = FlowShape.of(in, out)
override def handleRecord(elem: ProjectEntry): Future[Option[ValidationProblem]] = Future.failed(new RuntimeException("not needed here"))
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
private val logger = LoggerFactory.getLogger(getClass)
private implicit val dbConfig = dbConfigProvider.get[PostgresProfile]
private implicit val db = dbConfig.db
setHandler(in, new AbstractInHandler {
val yesCb = createAsyncCallback[ProjectEntry](_=>pull(in))
val noCb = createAsyncCallback[ValidationProblem](entry=>push(out, entry))
val errorCb = createAsyncCallback[Throwable](err=>failStage(err))
override def onPush(): Unit = {
val elem = grab(in)
elem.associatedFiles(true).map(entries=>{
Future.sequence(entries.map(_.validatePathExists)).map(lookups=>{
val failures = lookups.collect({case Left(err)=>err})
if(failures.nonEmpty){
logger.error(s"Received ${failures.length} errors looking up associated files for ${elem.id} (${elem.projectTitle}): ")
MDC.put("errors",failures.toString())
failures.foreach(err=>logger.error(s"\t$err"))
errorCb.invoke(new RuntimeException(s"Received ${failures.length} errors looking up associated files for ${elem.id} (${elem.projectTitle}), please consult log"))
} else {
val notexist = lookups.collect({case Right(result)=>result}).filter(_==false)
if(notexist.nonEmpty){
val errorMsg = s"Project ${elem.id} (${elem.projectTitle}) is missing ${notexist.length} files out of ${lookups.length}!"
ValidationProblem.fromProjectEntry(elem, currentJob, Some(errorMsg)) match {
case Some(problem)=>noCb.invoke(problem)
case None=>errorCb.invoke(new RuntimeException(s"Project entry $elem is invalid, I can't generate a problem report from it"))
}
} else {
yesCb.invoke(elem)
}
}
})
})
}
})
setHandler(out, new AbstractOutHandler {
override def onPull(): Unit = if(!hasBeenPulled(in)) pull(in)
})
}