private def relinkScan()

in app/services/ProxiesRelinker.scala [90:158]


  private def relinkScan(jobId:String, bucketName:Option[String]) = {
    val completionPromise = Promise[IndexUpdateCounter]()
    val eosPromise = Promise[Unit]()

    logger.info(s"Starting ${bucketName.getOrElse("global")} relink scan")
    val eosDetect = new EOSDetect[Unit, ArchiveEntry](eosPromise, ())
    getIndexScanSource(bucketName)
      .via(new SearchHitToArchiveEntryFlow)
      .via(proxyVerifyFlow)
      .via(eosDetect)
      .to(getIndexUpdateSink(completionPromise))
      .run()

    val originalSender = sender()

    eosPromise.future.onComplete({
      case Success(counter)=>
        logger.info(s"Global relink scan completed - detected via EOS")
        jobModelDAO.jobForId(jobId).map({
          case None=>
            logger.error(s"Proxy relink job record must have been deleted while we were running! Job completed but can't record.")
          case Some(Right(jobModel))=>
            val updatedJob = jobModel.copy(completedAt = Some(ZonedDateTime.now()),jobStatus = JobStatus.ST_SUCCESS)
            jobModelDAO.putJob(updatedJob)
          case Some(Left(dynamoErr))=>
            logger.error(s"Could not get job record: $dynamoErr")
        })
        originalSender ! RelinkSuccess(IndexUpdateCounter(-1,-1))
      case Failure(err)=>
        logger.error("Global relink scan failed with error: ",err)
        jobModelDAO.jobForId(jobId).map({
          case None=>
            logger.error(s"Proxy relink job record must have been deleted while we were running! Job completed but can't record.")
          case Some(Left(err))=>
            logger.error(s"Could not get job record: ${err}")
          case Some(Right(jobModel))=>
            val updatedJob = jobModel.copy(completedAt = Some(ZonedDateTime.now()),jobStatus = JobStatus.ST_ERROR, log = Some(err.toString))
            jobModelDAO.putJob(updatedJob)
        })
        originalSender ! RelinkError(err)
    })

    completionPromise.future.onComplete({
      case Success(counter)=>
        logger.info(s"Global relink scan completed with ${counter.ackCount} successful and ${counter.errorCount} failed operations")
        jobModelDAO.jobForId(jobId).map({
          case None=>
            logger.error(s"Proxy relink job record must have been deleted while we were running! Job completed but can't record.")
          case Some(Right(jobModel))=>
            val updatedJob = jobModel.copy(completedAt = Some(ZonedDateTime.now()),jobStatus = JobStatus.ST_SUCCESS)
            jobModelDAO.putJob(updatedJob)
          case Some(Left(err))=>
            logger.error(s"Could not get job record: $err")
        })
        originalSender ! RelinkSuccess(counter)
      case Failure(err)=>
        logger.error("Global relink scan failed with error: ",err)
        jobModelDAO.jobForId(jobId).map({
          case None=>
            logger.error(s"Proxy relink job record must have been deleted while we were running! Job completed but can't record.")
          case Some(Right(jobModel))=>
            val updatedJob = jobModel.copy(completedAt = Some(ZonedDateTime.now()),jobStatus = JobStatus.ST_ERROR, log = Some(err.toString))
            jobModelDAO.putJob(updatedJob)
          case Some(Left(err))=>
            logger.error(s"Could not get job record: $err")
        })
        originalSender ! RelinkError(err)
    })
  }