def backfill()

in scripts/src/main/scala/com/gu/mediaservice/scripts/BackfillEditLastModified.scala [53:135]


  def backfill(esClient: EsClient, dynamoTable: String, fileName: String, dryRun: Boolean): Unit = {
    val lastFile = new File(s"$fileName.last")
    val lastEvaluatedKey = if (lastFile.exists()) {
      val source = Source.fromFile(lastFile)
      val lastId = try {
        source.getLines().toList match {
          case id :: totalStr :: _ =>
            total = totalStr.toInt
            Some(id)
          case _ => None
        }
      } finally {
        source.close()
      }
      lastId.map { id =>
        Map("id" -> AttributeValue.builder.s(id).build).asJava
      }
    } else None

    val logWriter = new PrintWriter(new FileOutputStream(s"$fileName.$total.log"))
    def log(msg: String) = logWriter.println(s"[${DateTime.now}/($total/U:$updated/S:$skipped]: $msg")

    log(s"Starting backfill of $dynamoTable from ${esClient.url}; ${if(dryRun){"DRYRUN ONLY"}else{"RUNNING"}}")
    log(s"Last evaluated key: $lastEvaluatedKey")
    // scan through edits table (this is the shorter list by far)
    val scanIterator = dynamo.scanPaginator(
      ScanRequest.builder
        .tableName(dynamoTable)
        .projectionExpression("id")
        .limit(500)
        .exclusiveStartKey(lastEvaluatedKey.orNull)
        .build
    )
    scanIterator
      .iterator().asScala
      .map { r =>

        val lastEvaluatedKey = Some(r.lastEvaluatedKey.asScala)
          .filter(_ => r.hasLastEvaluatedKey)
          .flatMap(_.get("id"))
          .map(_.s())


        lastEvaluatedKey -> r.items.asScala
      }
      .map { case (lek, records) =>
        lek -> records.flatMap(_.asScala.get("id").map(_.s))
      }
      .foreach { case (lek, imageIds) =>
        imageIds.foreach { id =>
          // for each edit entry:
          // do a simple get on the ES cluster
          val maybeLastModified = getUserMetadataLastModified(esClient, id)
          // update just the last modified value on the edits table
          maybeLastModified match {
            case Left(reason) =>
              skipped += 1
              log(s"$id: Unable to get userMetadataLastModified - $reason")
            case Right(lastModified) =>
              updated += 1
              val result = updateEditRecord(dynamoTable, id, lastModified, dryRun)
              log(s"$id: $result")
          }
          total += 1
          Thread.sleep(10)
        }

        lek match {
          case Some(id) =>
            log(s"Last evaluated key $id (batch of ${imageIds.size})")
            val logWriter = new PrintWriter(new FileOutputStream(lastFile))
            try {
              logWriter.println(id)
              logWriter.println(total)
            } finally {
              logWriter.close()
            }
          case None => // nowt
        }

      }
    log(s"Finished. Updated: $updated; Skipped: $skipped")
  }