in scripts/src/main/scala/com/gu/mediaservice/scripts/ProposeS3Changes.scala [39:117]
def proposeS3Changes(
bucketMetadata: File,
esMetadata: File,
picdarCsv: File,
outputFileForJsonUpdate: File,
outputFileForJsonCorrect: File,
outputFileForESKeys: File,
outputFileForS3Keys: File,
outputFileForBadS3Keys: File) = {
val picdarData = readPicdarCsv(picdarCsv)
System.err.println(s"Completed reading ${picdarData.gridToPicdar.size} Picdar mappings")
val s3Metadata = {
val (md, badKeys) = readS3Metadata(bucketMetadata)
System.err.println(s"Completed reading S3 metadata. ${md.size} records, ${badKeys.size} bad keys")
// write out the bad keys in this scope so they can then be GCd
val outputWriterForBadS3Keys = new FileWriter(outputFileForBadS3Keys)
try {
badKeys.foreach(k => outputWriterForBadS3Keys.append(s"$k\n"))
} finally {
outputWriterForBadS3Keys.close()
}
md
}
var s3KeysNotYetSeenInEs = s3Metadata.keySet
var esKeysNotInS3 = Set.empty[String]
System.err.println(s"Starting change proposals...")
val outputWriterForJsonUpdate = getBzipWriter(outputFileForJsonUpdate)
val outputWriterForJsonCorrect = getBzipWriter(outputFileForJsonCorrect)
val outputWriterForESKeys = new FileWriter(outputFileForESKeys)
val outputWriterForS3Keys = new FileWriter(outputFileForS3Keys)
try {
withSourceFromBzipFile(esMetadata){ source =>
source
.getLines()
.flatMap(line => Json.fromJson[EsDocumentWithMetadata](Json.parse(line)).asOpt)
.zipWithIndex
.foreach { case (metadata, i) =>
if (i % 10000 == 0) System.err.println(s"Processing ES metadata line $i")
val id = metadata.id
val maybeS3 = s3Metadata.get(id)
maybeS3 match {
case Some(s3Metadata) =>
s3KeysNotYetSeenInEs -= id
val mergedMetadata = mergeMetadata(metadata, s3Metadata, picdarData)
if (mergedMetadata != mergeMetadata(metadata, mergedMetadata, picdarData)) {
System.err.println(s"Merged metadata for $id not idempotent")
}
if (mergedMetadata != s3Metadata) {
val jsS3 = Json.toJson(s3Metadata)
val jsMerged = Json.toJson(mergedMetadata)
val diff = JsonDiff.diff(jsS3, jsMerged)
outputWriterForJsonUpdate
.append(s"${Json.toJson(Json.obj(
"original" -> jsS3,
"proposed" -> jsMerged,
"diff" -> diff
)).toString()}\n")
} else {
outputWriterForJsonCorrect
.append(s"${Json.toJson(mergedMetadata).toString()}\n")
}
case None =>
esKeysNotInS3 += id
}
}
}
esKeysNotInS3.foreach(k => outputWriterForESKeys.append(s"$k\n"))
s3KeysNotYetSeenInEs.foreach(k => outputWriterForS3Keys.append(s"$k\n"))
} finally {
outputWriterForJsonUpdate.close()
outputWriterForJsonCorrect.close()
outputWriterForESKeys.close()
outputWriterForS3Keys.close()
}
}