in scripts/src/main/scala/com/gu/mediaservice/scripts/BackfillEditLastModified.scala [53:135]
def backfill(esClient: EsClient, dynamoTable: String, fileName: String, dryRun: Boolean): Unit = {
val lastFile = new File(s"$fileName.last")
val lastEvaluatedKey = if (lastFile.exists()) {
val source = Source.fromFile(lastFile)
val lastId = try {
source.getLines().toList match {
case id :: totalStr :: _ =>
total = totalStr.toInt
Some(id)
case _ => None
}
} finally {
source.close()
}
lastId.map { id =>
Map("id" -> AttributeValue.builder.s(id).build).asJava
}
} else None
val logWriter = new PrintWriter(new FileOutputStream(s"$fileName.$total.log"))
def log(msg: String) = logWriter.println(s"[${DateTime.now}/($total/U:$updated/S:$skipped]: $msg")
log(s"Starting backfill of $dynamoTable from ${esClient.url}; ${if(dryRun){"DRYRUN ONLY"}else{"RUNNING"}}")
log(s"Last evaluated key: $lastEvaluatedKey")
// scan through edits table (this is the shorter list by far)
val scanIterator = dynamo.scanPaginator(
ScanRequest.builder
.tableName(dynamoTable)
.projectionExpression("id")
.limit(500)
.exclusiveStartKey(lastEvaluatedKey.orNull)
.build
)
scanIterator
.iterator().asScala
.map { r =>
val lastEvaluatedKey = Some(r.lastEvaluatedKey.asScala)
.filter(_ => r.hasLastEvaluatedKey)
.flatMap(_.get("id"))
.map(_.s())
lastEvaluatedKey -> r.items.asScala
}
.map { case (lek, records) =>
lek -> records.flatMap(_.asScala.get("id").map(_.s))
}
.foreach { case (lek, imageIds) =>
imageIds.foreach { id =>
// for each edit entry:
// do a simple get on the ES cluster
val maybeLastModified = getUserMetadataLastModified(esClient, id)
// update just the last modified value on the edits table
maybeLastModified match {
case Left(reason) =>
skipped += 1
log(s"$id: Unable to get userMetadataLastModified - $reason")
case Right(lastModified) =>
updated += 1
val result = updateEditRecord(dynamoTable, id, lastModified, dryRun)
log(s"$id: $result")
}
total += 1
Thread.sleep(10)
}
lek match {
case Some(id) =>
log(s"Last evaluated key $id (batch of ${imageIds.size})")
val logWriter = new PrintWriter(new FileOutputStream(lastFile))
try {
logWriter.println(id)
logWriter.println(total)
} finally {
logWriter.close()
}
case None => // nowt
}
}
log(s"Finished. Updated: $updated; Skipped: $skipped")
}