in app/services/ProxiesRelinker.scala [90:158]
private def relinkScan(jobId:String, bucketName:Option[String]) = {
val completionPromise = Promise[IndexUpdateCounter]()
val eosPromise = Promise[Unit]()
logger.info(s"Starting ${bucketName.getOrElse("global")} relink scan")
val eosDetect = new EOSDetect[Unit, ArchiveEntry](eosPromise, ())
getIndexScanSource(bucketName)
.via(new SearchHitToArchiveEntryFlow)
.via(proxyVerifyFlow)
.via(eosDetect)
.to(getIndexUpdateSink(completionPromise))
.run()
val originalSender = sender()
eosPromise.future.onComplete({
case Success(counter)=>
logger.info(s"Global relink scan completed - detected via EOS")
jobModelDAO.jobForId(jobId).map({
case None=>
logger.error(s"Proxy relink job record must have been deleted while we were running! Job completed but can't record.")
case Some(Right(jobModel))=>
val updatedJob = jobModel.copy(completedAt = Some(ZonedDateTime.now()),jobStatus = JobStatus.ST_SUCCESS)
jobModelDAO.putJob(updatedJob)
case Some(Left(dynamoErr))=>
logger.error(s"Could not get job record: $dynamoErr")
})
originalSender ! RelinkSuccess(IndexUpdateCounter(-1,-1))
case Failure(err)=>
logger.error("Global relink scan failed with error: ",err)
jobModelDAO.jobForId(jobId).map({
case None=>
logger.error(s"Proxy relink job record must have been deleted while we were running! Job completed but can't record.")
case Some(Left(err))=>
logger.error(s"Could not get job record: ${err}")
case Some(Right(jobModel))=>
val updatedJob = jobModel.copy(completedAt = Some(ZonedDateTime.now()),jobStatus = JobStatus.ST_ERROR, log = Some(err.toString))
jobModelDAO.putJob(updatedJob)
})
originalSender ! RelinkError(err)
})
completionPromise.future.onComplete({
case Success(counter)=>
logger.info(s"Global relink scan completed with ${counter.ackCount} successful and ${counter.errorCount} failed operations")
jobModelDAO.jobForId(jobId).map({
case None=>
logger.error(s"Proxy relink job record must have been deleted while we were running! Job completed but can't record.")
case Some(Right(jobModel))=>
val updatedJob = jobModel.copy(completedAt = Some(ZonedDateTime.now()),jobStatus = JobStatus.ST_SUCCESS)
jobModelDAO.putJob(updatedJob)
case Some(Left(err))=>
logger.error(s"Could not get job record: $err")
})
originalSender ! RelinkSuccess(counter)
case Failure(err)=>
logger.error("Global relink scan failed with error: ",err)
jobModelDAO.jobForId(jobId).map({
case None=>
logger.error(s"Proxy relink job record must have been deleted while we were running! Job completed but can't record.")
case Some(Right(jobModel))=>
val updatedJob = jobModel.copy(completedAt = Some(ZonedDateTime.now()),jobStatus = JobStatus.ST_ERROR, log = Some(err.toString))
jobModelDAO.putJob(updatedJob)
case Some(Left(err))=>
logger.error(s"Could not get job record: $err")
})
originalSender ! RelinkError(err)
})
}