private def ingest()

in cli/src/main/scala/com/gu/pfi/cli/ingestion/CliIngestionPipeline.scala [29:141]


  private def ingest(files: Iterator[OnDiskFileContext], rootUri: Uri, inMemorySize: Long, languages: List[Language]): Future[Unit] = {
    implicit val ec: ExecutionContext = nonBlockingContext // this is the default context for when we are not doing IO
    logger.info(s"Copying... $rootUri")

    // this lock is used to prevent interleaved reads
    val ioLock = new Object()

    trait Input {
      def fileContext: OnDiskFileContext
      def key: Key
    }
    case class SmallInput(fileContext: OnDiskFileContext, key: Key, bytes: Array[Byte]) extends Input
    case class BigInput(fileContext: OnDiskFileContext, key: Key) extends Input

    object Input {
      def apply(file: OnDiskFileContext, key: Key): Input = {
        logger.info(s"Phase I ingesting ${file.path} (${file.size}) as $key")
        if (file.size <= inMemorySize) {
          val dataStream = Files.newInputStream(file.path)
          val bytes = try {
            ioLock.synchronized {
              val buf = new Array[Byte](file.size.toInt)
              ByteStreams.readFully(dataStream, buf)
              buf
            }
          } finally {
            dataStream.close()
          }
          SmallInput(file, key, bytes)
        } else {
          BigInput(file, key)
        }
      }
    }

    def putInput(input: Input, ingestion: String, languages: List[Language]): Attempt[Unit] = {
      val f = () => {
        input match {
          case SmallInput(context, key, data) =>
            s3Client.putData(key, data, context.size)
          case BigInput(context, key) =>
            s3Client.putFileData(key, context.path, context.size)
        }

        s3Client.putMetadata(input.key, input.fileContext, ingestion, languages)
        ()
      }

      input match {
        case _: SmallInput => Attempt.async.catchNonFatalBlasé(f())(ingestionContext)
        case _: BigInput => Attempt.catchNonFatalBlasé(ioLock.synchronized(f()))
      }
    }

    def processFile(file: OnDiskFileContext, languages: List[Language]): Attempt[(OnDiskFileContext, Key)] = {
      // TODO-SAH: add to checkpoint list

      // generate key
      val key = System.currentTimeMillis -> UUID.randomUUID

      // create 'input'
      val attemptInput = Attempt.async.catchNonFatalBlasé {
        Input(file, key)
      }(ingestionContext)

      // upload file
      for {
        input <- attemptInput
        uploadResult <- putInput(input, file.ingestion, languages)
      } yield {
        // TODO-SAH: remove from checkpoint list

        (file, key)
      }
    }

    val finalAttempt = files.filter(_.isRegularFile).map { file =>
      file -> processFile(file, languages)
    }.grouped(batchSize).foldLeft(Attempt.Right(0 -> 0)) { (accAttempt, fileToAttemptedResults) =>
      val (files, attemptedResults) = fileToAttemptedResults.unzip
      val batchSizeAttempt = Attempt.sequenceWithFailures(attemptedResults.toList)
        .map{ r => files.zip(r) }
        .map{ results =>
          val files: Seq[(OnDiskFileContext, Key)] = results.collect{ case (_, Right(value)) => value }
          val summaryData = files.map { case (fileContext, (_, _)) =>
            (fileContext.path.toString, fileContext.size)
          }

          val failures: Seq[(OnDiskFileContext, attempt.Failure)] = results.collect{ case (file, Left(failure)) => file -> failure }
          failures.foreach { case (file, failure) =>
            logger.error(s"Error during Phase I ingestion for ${file.path}: $failure", failure.cause.orNull)
            failure.cause.foreach(_.printStackTrace())
          }

          logger.info(s"Phase I of $rootUri batch completed. Successful: ${summaryData.size} Failures: $failures.")
          summaryData.size -> failures.size
        }

      val syncBatchSizeAttempt = Attempt.fromEither(Await.result(batchSizeAttempt.asFuture, Duration.Inf))

      for {
        acc <- accAttempt
        batchSize <- syncBatchSizeAttempt
      } yield (acc._1 + batchSize._1) -> (acc._2 + batchSize._2)
    }

    finalAttempt.fold(
      _ => (),
      { case (successes, failures) =>
        logger.info(s"Phase I of $rootUri done! Successful: $successes Failures: $failures.")
      }
    )(nonBlockingContext)
  }