private def processJob()

in app/story_packages/updates/Reindex.scala [78:108]


  private def processJob(job: RunningJob, step: ReindexPage) = {
    Logger.info(s"Processing reindex job step with ${step.list.size} packages out of ${step.totalCount}")
    val notifyEvery = math.ceil(step.totalCount / 100.0)

    step.list.foldLeft(Future.successful(0)) {
      (previousFuture, nextPackage) =>
        for {
          processedResults <- previousFuture
          _ <- sendToKinesisStream(nextPackage, job)
        } yield {
          if (processedResults % notifyEvery == 0) dynamoReindexJobs.markProgressUpdate(job, processedResults)
          processedResults + 1
        }
    }
    .map(lastProcessedResult => {
      Logger.info(s"Processing reindex job $job, step completed successfully")
      step.next match {
        case Some(nextPage) =>
          // TODO iterate on next page
          dynamoReindexJobs.markCompleteJob(job, lastProcessedResult)
          Logger.error("Next page not implemented")
        case None =>
          dynamoReindexJobs.markCompleteJob(job, lastProcessedResult)
      }
    })
    .recover {
      case NonFatal(e) =>
        Logger.error(s"Error when processing reindexing step of job $job", e)
        dynamoReindexJobs.markFailedJob(job)
    }
  }