in scripts/src/main/scala/com/gu/mediaservice/scripts/EnactS3Changes.scala [87:188]
def enactS3Changes(
bucketName: String,
inputFile: File,
auditFile: File,
maybeDrop: Option[Int],
maybeTake: Option[Int],
prefixFilter: Option[String]) = {
import AuditEntry._
// do this to ensure creds work before starting
s3.listObjectsV2(ListObjectsV2Request.builder.bucket(bucketName).build)
// reporting
val batchSize = 1000
val startTime = DateTime.now
var startBatchTime = DateTime.now
var total = 0L
// auditing
val auditWriter = getBzipWriter(getAuditFileName(auditFile))
auditWriter.append(s"Enacting changes in $bucketName from $inputFile (dropping: $maybeDrop, taking: $maybeTake, prefixFilter: $prefixFilter)\n")
System.err.println(s"Enacting changes in $bucketName from $inputFile (dropping: $maybeDrop, taking: $maybeTake, prefixFilter: $prefixFilter)")
try {
withSourceFromBzipFile(inputFile) {source =>
val lines = source
.getLines()
.map(line => {
val maybeJsValue = Try(readFromString[JsValue](line)).toOption
(
maybeJsValue.map(_ \ "proposed").flatMap(_.asOpt[ObjectMetadata]),
maybeJsValue.map(_ \ "original").flatMap(_.asOpt[ObjectMetadata])
)
})
val filteredLines = prefixFilter
.map(pf => lines.filter{
case (Some(proposed), _) => proposed.key.startsWith(pf)
case _ => false
})
.getOrElse(lines)
val droppedLines = maybeDrop.map(filteredLines.drop).getOrElse(filteredLines)
val linesToProcess = maybeTake.map(droppedLines.take).getOrElse(droppedLines)
val auditEntries: Iterator[AuditEntry] = linesToProcess
.map {
case (Some(proposed), Some(original)) =>
val key = proposed.key
Try {Some(s3.headObject(HeadObjectRequest.builder.bucket(bucketName).key(key).build))}.recover{
case _:NoSuchKeyException => None
} match {
case Success(Some(headObjectResponse)) =>
val check = headObjectResponse.metadata.asScala
if (check!=original.metadata) {
if (check==proposed.metadata) {
AuditEntry(SKIPPED, key, s"Already updated")
} else {
AuditEntry(ERROR, key, s"Metadata doesn't match that expected", Json.obj(
"actual" -> check,
"expected" -> original.metadata
))
}
} else {
val request = CopyObjectRequest.builder
.copySource(s"$bucketName/$key")
.destinationBucket(bucketName).destinationKey(key)
.metadata(proposed.metadata.asJava)
.metadataDirective(MetadataDirective.REPLACE)
.build
Try {s3.copyObject(request)} match {
case Success(_) => AuditEntry(OK, key)
case Failure(e) => AuditEntry(ERROR, key, s"Error whilst copying object ($e)")
}
}
case Success(None) =>
AuditEntry(SKIPPED, key, "Object no longer exists")
case Failure(e) =>
AuditEntry(ERROR, key, s"Error whilst getting object metadata ($e)")
}
case other => AuditEntry(ERROR, "n/a", s"Unable to parse record", Json.obj("record" -> other.toString))
}
auditEntries
.map{ entry =>
audit(auditWriter, entry)
entry
}
.grouped(batchSize)
.foreach{ auditEntryBatch =>
val auditEntries = auditEntryBatch.toList
total = total + auditEntries.size
val now = DateTime.now
val elapsed = new Duration(startTime, now)
val elapsedBatch = new Duration(startBatchTime, now).getMillis
System.err.println(s"${DateTime.now.toString} Processed $total lines")
System.err.println(s"Batch: Processed ${auditEntries.size} in ${elapsedBatch}ms (mean: ${elapsedBatch/auditEntries.size}ms per line)")
System.err.println(s" OK: ${auditEntries.count(_.status==OK)} SKIPPED: ${auditEntries.count(_.status==SKIPPED)} ERROR: ${auditEntries.count(_.status==ERROR)}")
System.err.println(s"Total: Processed $total in ${elapsed.getStandardSeconds}s (mean: ${elapsed.getMillis/total}ms per line)")
startBatchTime = now
}
}
} finally {
auditWriter.close()
}
}