def enactS3Changes()

in scripts/src/main/scala/com/gu/mediaservice/scripts/EnactS3Changes.scala [87:188]


  def enactS3Changes(
                        bucketName: String,
                        inputFile: File,
                        auditFile: File,
                        maybeDrop: Option[Int],
                        maybeTake: Option[Int],
                        prefixFilter: Option[String]) = {
    import AuditEntry._

    // do this to ensure creds work before starting
    s3.listObjectsV2(ListObjectsV2Request.builder.bucket(bucketName).build)

    // reporting
    val batchSize = 1000
    val startTime = DateTime.now
    var startBatchTime = DateTime.now
    var total = 0L

    // auditing
    val auditWriter = getBzipWriter(getAuditFileName(auditFile))
    auditWriter.append(s"Enacting changes in $bucketName from $inputFile (dropping: $maybeDrop, taking: $maybeTake, prefixFilter: $prefixFilter)\n")
    System.err.println(s"Enacting changes in $bucketName from $inputFile (dropping: $maybeDrop, taking: $maybeTake, prefixFilter: $prefixFilter)")
    try {
      withSourceFromBzipFile(inputFile) {source =>
        val lines = source
          .getLines()
          .map(line => {
            val maybeJsValue = Try(readFromString[JsValue](line)).toOption
            (
              maybeJsValue.map(_ \ "proposed").flatMap(_.asOpt[ObjectMetadata]),
              maybeJsValue.map(_ \ "original").flatMap(_.asOpt[ObjectMetadata])
            )
          })
        val filteredLines = prefixFilter
          .map(pf => lines.filter{
            case (Some(proposed), _) => proposed.key.startsWith(pf)
            case _ => false
          })
          .getOrElse(lines)
        val droppedLines = maybeDrop.map(filteredLines.drop).getOrElse(filteredLines)
        val linesToProcess = maybeTake.map(droppedLines.take).getOrElse(droppedLines)

        val auditEntries: Iterator[AuditEntry] = linesToProcess
          .map {
            case (Some(proposed), Some(original)) =>
              val key = proposed.key
              Try {Some(s3.headObject(HeadObjectRequest.builder.bucket(bucketName).key(key).build))}.recover{
                case _:NoSuchKeyException => None
              } match {
                case Success(Some(headObjectResponse)) =>
                  val check = headObjectResponse.metadata.asScala
                  if (check!=original.metadata) {
                    if (check==proposed.metadata) {
                      AuditEntry(SKIPPED, key, s"Already updated")
                    } else {
                      AuditEntry(ERROR, key, s"Metadata doesn't match that expected", Json.obj(
                        "actual" -> check,
                        "expected" -> original.metadata
                      ))
                    }
                  } else {
                    val request = CopyObjectRequest.builder
                      .copySource(s"$bucketName/$key")
                      .destinationBucket(bucketName).destinationKey(key)
                      .metadata(proposed.metadata.asJava)
                      .metadataDirective(MetadataDirective.REPLACE)
                      .build
                    Try {s3.copyObject(request)} match {
                      case Success(_) => AuditEntry(OK, key)
                      case Failure(e) => AuditEntry(ERROR, key, s"Error whilst copying object ($e)")
                    }
                  }
                case Success(None) =>
                  AuditEntry(SKIPPED, key, "Object no longer exists")
                case Failure(e) =>
                  AuditEntry(ERROR, key, s"Error whilst getting object metadata ($e)")
              }
            case other => AuditEntry(ERROR, "n/a", s"Unable to parse record", Json.obj("record" -> other.toString))
          }
        auditEntries
          .map{ entry =>
            audit(auditWriter, entry)
            entry
          }
          .grouped(batchSize)
          .foreach{ auditEntryBatch =>
            val auditEntries = auditEntryBatch.toList
            total = total + auditEntries.size
            val now = DateTime.now
            val elapsed = new Duration(startTime, now)
            val elapsedBatch = new Duration(startBatchTime, now).getMillis
            System.err.println(s"${DateTime.now.toString} Processed $total lines")
            System.err.println(s"Batch: Processed ${auditEntries.size} in ${elapsedBatch}ms (mean: ${elapsedBatch/auditEntries.size}ms per line)")
            System.err.println(s"       OK: ${auditEntries.count(_.status==OK)} SKIPPED: ${auditEntries.count(_.status==SKIPPED)} ERROR: ${auditEntries.count(_.status==ERROR)}")
            System.err.println(s"Total: Processed $total in ${elapsed.getStandardSeconds}s (mean: ${elapsed.getMillis/total}ms per line)")
            startBatchTime = now
          }
      }
    } finally {
      auditWriter.close()
    }
  }