app/services/FileMove/UpdateIndexRecords.scala (146 lines of code) (raw):

package services.FileMove import com.sksamuel.elastic4s.http.ElasticClient import com.theguardian.multimedia.archivehunter.common.{ArchiveEntry, Indexer, ProxyLocation, ProxyLocationDAO} import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future import scala.util.{Failure, Success} /** * this step copies the ArchiveEntry record for the given file to a new ID, copies the Proxy records to the new ID and deletes the * old ones. * rollback makes it copy them back again the other way */ class UpdateIndexRecords(indexer:Indexer, proxyLocationDAO: ProxyLocationDAO)(implicit esClient:ElasticClient, dynamoClient:DynamoDbAsyncClient) extends GenericMoveActor { import GenericMoveActor._ def deleteCopiedProxies(proxyList:Seq[ProxyLocation]) = { val proxyDeleteFutureList = proxyList.map(loc=>proxyLocationDAO.deleteProxyRecord(loc.proxyId)) Future.sequence(proxyDeleteFutureList) } private def updateEntry(entry:ArchiveEntry, newId:String, newBucket:String) = Future(entry.copy(id = newId,bucket=newBucket)) private def writeProxies(destFileProxy:Seq[ProxyLocation]) = Future.sequence( destFileProxy.map(loc => proxyLocationDAO .saveProxy(loc) .map(Right.apply) .recover({ case err:Throwable=> Left(err.getMessage) }) ) ).map(proxyUpdateResults=>{ val failures = proxyUpdateResults.collect({ case Left(err)=>err}) if(failures.nonEmpty) { logger.error(s"Could not copy all proxies, ${failures.length} out of ${proxyUpdateResults.length} failed: ") failures.foreach(err=>logger.error(s"\t$err")) throw new RuntimeException(s"${failures.length} proxy copies failed") } else { proxyUpdateResults.collect({ case Right(proxyLocation)=>proxyLocation}) } }) private def deleteOriginalProxies(sourceFileProxies:Seq[ProxyLocation]) = Future.sequence( sourceFileProxies.map(loc => proxyLocationDAO.deleteProxyRecord(loc.proxyId)) ).map(deleteResults=>{ val failures = deleteResults.collect({ case Left(err)=>err}) if(failures.nonEmpty) { logger.error(s"Could not delete all proxies:") failures.foreach(err => logger.error(err)) //throwing an exception here will fail this and the containing future. This will make the supervisor request a rollback, //and should prevent the deletion from taking place. throw new RuntimeException(failures.mkString(",")) } else { deleteResults.collect({ case Right(result)=>result }) } }) /** * for the error handling in the for comprehension to work, we must fail the future with an exception * @param updatedEntry data to write * @return the id of the updated item or a failed future */ private def writeRecord(updatedEntry:ArchiveEntry) = indexer.indexSingleItem(updatedEntry).map({ case Right(result)=>result case Left(err)=> logger.error(s"Could not write record: $err") throw new RuntimeException(err.toString) }) override def receive: Receive = { case PerformStep(state)=> val originalSender = sender() if(state.sourceFileProxies.isEmpty || state.destFileProxy.isEmpty || state.destFileId.isEmpty){ sender() ! StepFailed(state, "Not enough state elements were defined") } else { logger.debug(s"Looking up ${state.sourceFileId}") //this used to be written with a load of nested flatMaps which was so horrible as to be nearly unreadable, //so the logic is now in the separate functions above and we use a for-comprehension to avoid the need for //nested map/flatMap val updatedEntryFut = for { entry <- indexer.getById(state.sourceFileId) updatedEntry <- updateEntry(entry, state.destFileId.get, state.destBucket) _ <- writeRecord(updatedEntry) writeResult <- writeProxies(state.destFileProxy.getOrElse(Seq())) } yield (updatedEntry, writeResult) updatedEntryFut.flatMap(_=> { for { _ <- deleteOriginalProxies(state.sourceFileProxies.getOrElse(Seq())) deletionResult <- indexer.deleteById(state.sourceFileId) } yield deletionResult }).onComplete({ case Success(_)=> originalSender ! StepSucceeded(state) case Failure(err)=> logger.error(s"UpdateIndexRecords failed: ${err.toString}", err) originalSender ! StepFailed(state, err.toString) }) } case RollbackStep(state)=> val originalSender = sender() if(state.entry.isEmpty || state.destFileId.isEmpty){ sender() ! StepFailed(state, "Not enough state elements were defined") } else { val resaveFuture = indexer.getById(state.destFileId.get).flatMap(destEntry => { val reconstitutedSource = destEntry.copy(id = state.sourceFileId, bucket = state.entry.get.bucket) writeRecord(reconstitutedSource) }) val indexDeleteFuture = resaveFuture.map(_=>{ indexer.deleteById(state.destFileId.get) Right(()) }).recover({ case err:Throwable=> logger.error(s"Could not re-create deleted source entry: $err") Left(err.toString) }) indexDeleteFuture.onComplete({ case Success(Left(err)) => logger.warn(s"Could not rollback updated index record: $err") case Failure(err) => logger.warn(s"Could not rollback updated index record: ", err) case Success(Right(_)) => }) val resaveProxiesFuture = state.sourceFileProxies .map(proxySeq => Future.sequence(proxySeq.map(proxy=>proxyLocationDAO .saveProxy(proxy) .map(Right.apply) .recover({ case err:Throwable=> Left(err.getMessage) }) )) ) .getOrElse(Future(Seq())) .map(resultSeq => { val failures = resultSeq.collect({ case Left(err) => err }) if (failures.nonEmpty) { logger.error(s"${failures.length} proxies failed to restore: ") failures.foreach(err => logger.error(s"\t${err.toString}")) //fail the future if we can't resave. This will prevent the proxy entries being deleted below. throw new RuntimeException(s"${failures.length} proxies failed to restore") } else { resultSeq } }) val proxyDeleteFuture = resaveProxiesFuture.flatMap(resultSeq => deleteCopiedProxies(state.destFileProxy.get).map(_ ++ resultSeq) ) proxyDeleteFuture.onComplete({ case Success(results) => val failures = results.collect({ case Left(err) => err }) if (failures.nonEmpty) { failures.foreach(err => logger.warn(s"Could not rollback specific proxy copy: $err")) } case Failure(err) => logger.error("Rollback proxy delete future crashed: ", err) }) Future.sequence(Seq(indexDeleteFuture, proxyDeleteFuture)).onComplete({ case Success(_) => originalSender ! StepSucceeded(state) case Failure(err) => logger.error("Could not roll back index records: ", err) originalSender ! StepFailed(state, err.toString) }) } } }