thrall/app/controllers/ThrallController.scala (251 lines of code) (raw):

package controllers import org.apache.pekko.actor.ActorSystem import org.apache.pekko.stream.Materializer import org.apache.pekko.stream.scaladsl.{Sink, Source} import com.gu.mediaservice.GridClient import com.gu.mediaservice.lib.auth.{Authentication, BaseControllerWithLoginRedirects} import com.gu.mediaservice.lib.aws.ThrallMessageSender import com.gu.mediaservice.lib.config.Services import com.gu.mediaservice.lib.elasticsearch.{NotRunning, Running} import com.gu.mediaservice.lib.logging.GridLogging import com.gu.mediaservice.model.{CompleteMigrationMessage, CreateMigrationIndexMessage, UpsertFromProjectionMessage} import lib.elasticsearch.ElasticSearch import lib.{MigrationRequest, OptionalFutureRunner, Paging, ThrallStore} import org.joda.time.{DateTime, DateTimeZone} import play.api.data.Form import play.api.data.Forms._ import play.api.libs.json.Json import play.api.mvc.{Action, AnyContent, ControllerComponents} import scala.concurrent.duration.DurationInt import scala.concurrent.{Await, ExecutionContext, Future} import scala.language.postfixOps case class MigrateSingleImageForm(id: String) class ThrallController( es: ElasticSearch, store: ThrallStore, sendMigrationRequest: MigrationRequest => Future[Boolean], messageSender: ThrallMessageSender, actorSystem: ActorSystem, override val auth: Authentication, override val services: Services, override val controllerComponents: ControllerComponents, gridClient: GridClient )(implicit val ec: ExecutionContext) extends BaseControllerWithLoginRedirects with GridLogging { private val numberFormatter: Long => String = java.text.NumberFormat.getIntegerInstance().format def index = withLoginRedirectAsync { val countDocsInIndex = OptionalFutureRunner.run(es.countImages) _ for { currentIndex <- es.getIndexForAlias(es.imagesCurrentAlias) currentIndexName = currentIndex.map(_.name) currentIndexCount <- countDocsInIndex(currentIndexName) migrationIndex <- es.getIndexForAlias(es.imagesMigrationAlias) migrationIndexName = migrationIndex.map(_.name) migrationIndexCount <- countDocsInIndex(migrationIndexName) historicalIndex <- es.getIndexForAlias(es.imagesHistoricalAlias) currentIndexCountFormatted = currentIndexCount.map(_.catCount).map(numberFormatter).getOrElse("!") migrationIndexCountFormatted = migrationIndexCount.map(_.catCount).map(numberFormatter).getOrElse("-") } yield { Ok(views.html.index( currentAlias = es.imagesCurrentAlias, currentIndex = currentIndexName.getOrElse("ERROR - No index found! Please investigate this!"), currentIndexCount = currentIndexCountFormatted, migrationAlias = es.imagesMigrationAlias, migrationIndexCount = migrationIndexCountFormatted, migrationStatus = es.migrationStatus, hasHistoricalIndex = historicalIndex.isDefined, )) } } def upsertProjectPage(imageId: Option[String]) = withLoginRedirectAsync { implicit request => imageId match { case Some(id) if store.doesOriginalExist(id) => gridClient.getProjectionDiff(id, auth.innerServiceCall).map { case None => NotFound("couldn't generate projection for that image!!") case Some(diff) => Ok(views.html.previewUpsertProject(id, Json.prettyPrint(diff))) } case Some(_) => Future.successful(Redirect(routes.ThrallController.restoreFromReplica)) case None => Future.successful(Ok(views.html.upsertProject())) } } def migrationFailuresOverview(): Action[AnyContent] = withLoginRedirectAsync { es.migrationStatus match { case running: Running => es.getMigrationFailuresOverview(es.imagesCurrentAlias, running.migrationIndexName).map(failuresOverview => Ok(views.html.migrationFailuresOverview( failuresOverview, apiBaseUrl = services.apiBaseUri, uiBaseUrl = services.kahunaBaseUri, )) ) case _ => for { currentIndex <- es.getIndexForAlias(es.imagesCurrentAlias) currentIndexName <- currentIndex.map(_.name).map(Future.successful).getOrElse(Future.failed(new Exception(s"No index found for '${es.imagesCurrentAlias}' alias"))) failuresOverview <- es.getMigrationFailuresOverview(es.imagesHistoricalAlias, currentIndexName) response = Ok(views.html.migrationFailuresOverview( failuresOverview, apiBaseUrl = services.apiBaseUri, uiBaseUrl = services.kahunaBaseUri, )) } yield response } } def migrationFailures(filter: String, maybePage: Option[Int]): Action[AnyContent] = withLoginRedirectAsync { Paging.withPaging(maybePage) { paging => es.migrationStatus match { case running: Running => es.getMigrationFailures(es.imagesCurrentAlias, running.migrationIndexName, paging.from, paging.pageSize, filter).map(failures => Ok(views.html.migrationFailures( failures, apiBaseUrl = services.apiBaseUri, uiBaseUrl = services.kahunaBaseUri, filter, paging.page, shouldAllowReattempts = true )) ) case _ => for { currentIndex <- es.getIndexForAlias(es.imagesCurrentAlias) currentIndexName <- currentIndex.map(_.name).map(Future.successful).getOrElse(Future.failed(new Exception(s"No index found for '${es.imagesCurrentAlias}' alias"))) failures <- es.getMigrationFailures(es.imagesHistoricalAlias, currentIndexName, paging.from, paging.pageSize, filter) response = Ok(views.html.migrationFailures( failures, apiBaseUrl = services.apiBaseUri, uiBaseUrl = services.kahunaBaseUri, filter, paging.page, shouldAllowReattempts = false )) } yield response } } } implicit val pollingMaterializer: Materializer = Materializer.matFromSystem(actorSystem) def startMigration = withLoginRedirectAsync { implicit request => if(Form(single("start-confirmation" -> text)).bindFromRequest().get != "start"){ Future.successful(BadRequest("you did not enter 'start' in the text box")) } else { val msgFailedToFetchIndex = s"Could not fetch ES index details for alias '${es.imagesMigrationAlias}'" es.getIndexForAlias(es.imagesMigrationAlias) recover { case error: Throwable => logger.error(msgFailedToFetchIndex, error) InternalServerError(msgFailedToFetchIndex) } map { case Some(index) => BadRequest(s"There is already an index '${index}' for alias '${es.imagesMigrationAlias}', and thus a migration underway.") case None => messageSender.publish(CreateMigrationIndexMessage( migrationStart = DateTime.now(DateTimeZone.UTC), gitHash = utils.buildinfo.BuildInfo.gitCommitId )) // poll until images migration alias is created, giving up after 10 seconds Await.result( Source(1 to 20) .throttle(1, 500 millis) .mapAsync(parallelism = 1)(_ => es.getIndexForAlias(es.imagesMigrationAlias)) .takeWhile(_.isEmpty, inclusive = true) .runWith(Sink.last) .map(_.fold { val timedOutMessage = s"Still no index for alias '${es.imagesMigrationAlias}' after 10 seconds." logger.error(timedOutMessage) InternalServerError(timedOutMessage) } { _ => Redirect(routes.ThrallController.index) }) .recover { case error: Throwable => logger.error(msgFailedToFetchIndex, error) InternalServerError(msgFailedToFetchIndex) }, atMost = 12 seconds ) } } } def completeMigration(): Action[AnyContent] = withLoginRedirectAsync { implicit request => if(Form(single("complete-confirmation" -> text)).bindFromRequest().get != "complete"){ Future.successful(BadRequest("you did not enter 'complete' in the text box")) } else { es.refreshAndRetrieveMigrationStatus() match { case _: Running => messageSender.publish(CompleteMigrationMessage( lastModified = DateTime.now(DateTimeZone.UTC), )) // poll until images migration status is not running or error, giving up after 10 seconds Source(1 to 20) .throttle(1, 500 millis) .map(_ => es.refreshAndRetrieveMigrationStatus()) .takeWhile(_.isInstanceOf[Running], inclusive = true) .runWith(Sink.last) .map { case NotRunning => Redirect(routes.ThrallController.index) case migrationStatus => val timedOutMessage = s"MigrationStatus was still '$migrationStatus' after 10 seconds." logger.error(timedOutMessage) InternalServerError(timedOutMessage) } case migrationStatus => Future.successful( BadRequest(s"MigrationStatus is $migrationStatus so cannot complete migration.") ) } } } private def adjustMigration(action: () => Unit) = withLoginRedirect { action() es.refreshAndRetrieveMigrationStatus() Redirect(routes.ThrallController.index) } def pauseMigration = adjustMigration(es.pauseMigration _) def resumeMigration = adjustMigration(es.resumeMigration _) def previewMigrationCompletion = adjustMigration(es.previewMigrationCompletion _) def unPreviewMigrationCompletion = adjustMigration(es.unPreviewMigrationCompletion _) def migrateSingleImage: Action[AnyContent] = withLoginRedirectAsync { implicit request => val imageId = migrateSingleImageFormReader.bindFromRequest().get.id es.getImageVersion(imageId) flatMap { case Some(version) => sendMigrationRequest(MigrationRequest(imageId, version)).map { case true => Ok(s"Image migration queued successfully with id:$imageId") case false => InternalServerError(s"Failed to send migrate image message $imageId") } case None => Future.successful(InternalServerError(s"Failed to send migrate image message $imageId")) } } def upsertFromProjectionSingleImage: Action[AnyContent] = withLoginRedirectAsync { implicit request => val imageId = migrateSingleImageFormReader.bindFromRequest().get.id for { maybeImage <- gridClient.getImageLoaderProjection(imageId, auth.innerServiceCall) } yield { maybeImage match { case Some(projectedImage) => messageSender.publish(UpsertFromProjectionMessage(imageId, projectedImage, DateTime.now)) Ok(s"upsert request for $imageId submitted") case None => NotFound("") }} } def restoreFromReplica: Action[AnyContent] = withLoginRedirect {implicit request => Ok(views.html.restoreFromReplica(s"${services.loaderBaseUri}/images/restore")) //FIXME figure out imageId bit } def reattemptMigrationFailures(filter: String, page: Int): Action[AnyContent] = withLoginRedirectAsync { implicit request => Paging.withPaging(Some(page)) { paging => es.migrationStatus match { case running: Running => val migrationRequestsF = es.getMigrationFailures(es.imagesCurrentAlias, running.migrationIndexName, paging.from, paging.pageSize, filter).map(failures => failures.details.map(detail => MigrationRequest(detail.imageId, detail.version)) ) for { migrationRequests <- migrationRequestsF successfulSubmissions <- Future.sequence(migrationRequests.map(sendMigrationRequest)) } yield { val failures = successfulSubmissions.count(_ == false) if (failures == 0) { Ok("Submitted all failures for reattempted migration") } else { InternalServerError(s"Failed to submit $failures images for reattempted migration") } } case _ => Future.successful(InternalServerError("Cannot resubmit images; migration is not running")) } } } val migrateSingleImageFormReader: Form[MigrateSingleImageForm] = Form( mapping( "id" -> text )(MigrateSingleImageForm.apply)(MigrateSingleImageForm.unapply) ) }