thrall/app/lib/kinesis/MessageProcessor.scala (185 lines of code) (raw):

package lib.kinesis import com.gu.mediaservice.lib.aws.EsResponse import com.gu.mediaservice.lib.elasticsearch.{ElasticNotFoundException, Running} import com.gu.mediaservice.lib.logging.{GridLogging, LogMarker, combineMarkers} import com.gu.mediaservice.model.{AddImageLeaseMessage, CreateMigrationIndexMessage, DeleteImageExportsMessage, DeleteImageMessage, DeleteUsagesMessage, ImageMessage, MigrateImageMessage, RemoveImageLeaseMessage, ReplaceImageLeasesMessage, SetImageCollectionsMessage, SoftDeleteImageMessage, UnSoftDeleteImageMessage, ThrallMessage, UpdateImageExportsMessage, UpdateImagePhotoshootMetadataMessage, UpdateImageSyndicationMetadataMessage, UpdateImageUsagesMessage, UpdateImageUserMetadataMessage} import com.gu.mediaservice.model.usage.{Usage, UsageNotice} // import all except `Right`, which otherwise shadows the type used in `Either`s import com.gu.mediaservice.model.{Right => _, _} import com.gu.mediaservice.syntax.MessageSubjects import lib._ import lib.elasticsearch._ import play.api.libs.json._ import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success} sealed trait MigrationFailure case class ProjectionFailure(message: String) extends Exception(message) with MigrationFailure case class GetVersionFailure(message: String) extends Exception(message) with MigrationFailure case class VersionComparisonFailure(message: String) extends Exception(message) with MigrationFailure case class InsertImageFailure(message: String) extends Exception(message) with MigrationFailure class MessageProcessor( es: ElasticSearch, store: ThrallStore, metadataEditorNotifications: MetadataEditorNotifications, ) extends GridLogging with MessageSubjects { def process(updateMessage: ThrallMessage, logMarker: LogMarker)(implicit ec: ExecutionContext): Future[Any] = { updateMessage match { case message: ImageMessage => indexImage(message, logMarker) case message: DeleteImageMessage => deleteImage(message, logMarker) case message: SoftDeleteImageMessage => softDeleteImage(message, logMarker) case message: UnSoftDeleteImageMessage => unSoftDeleteImage(message, logMarker) case message: DeleteImageExportsMessage => deleteImageExports(message, logMarker) case message: UpdateImageExportsMessage => updateImageExports(message, logMarker) case message: UpdateImageUserMetadataMessage => updateImageUserMetadata(message, logMarker) case message: UpdateImageUsagesMessage => updateImageUsages(message, logMarker) case message: ReplaceImageLeasesMessage => replaceImageLeases(message, logMarker) case message: AddImageLeaseMessage => addImageLease(message, logMarker) case message: RemoveImageLeaseMessage => removeImageLease(message, logMarker) case message: SetImageCollectionsMessage => setImageCollections(message, logMarker) case message: DeleteUsagesMessage => deleteAllUsages(message, logMarker) case message: DeleteSingleUsageMessage => deleteSingleUsage(message, logMarker) case message: UpdateImageSyndicationMetadataMessage => upsertSyndicationRightsOnly(message, logMarker) case message: UpdateImagePhotoshootMetadataMessage => updateImagePhotoshoot(message, logMarker) case message: CreateMigrationIndexMessage => createMigrationIndex(message, logMarker) case message: MigrateImageMessage => migrateImage(message, logMarker) case message: UpsertFromProjectionMessage => upsertImageFromProjection(message, logMarker) case message: UpdateUsageStatusMessage => updateUsageStatus(message, logMarker) case _: CompleteMigrationMessage => completeMigration(logMarker) } } def updateImageUsages(message: UpdateImageUsagesMessage, logMarker: LogMarker)(implicit ec: ExecutionContext): Future[List[ElasticSearchUpdateResponse]] = { implicit val unw: OWrites[UsageNotice] = Json.writes[UsageNotice] implicit val lm: LogMarker = combineMarkers(message, logMarker) val usages = message.usageNotice.usageJson.as[Seq[Usage]] Future.traverse(es.updateImageUsages(message.id, usages, message.lastModified))(_.recoverWith { case ElasticNotFoundException => Future.successful(ElasticSearchUpdateResponse()) }) } private def indexImage(message: ImageMessage, logMarker: LogMarker)(implicit ec: ExecutionContext) = es.migrationAwareIndexImage(message.id, message.image, message.lastModified)(ec, logMarker) private def upsertImageFromProjection(message: UpsertFromProjectionMessage, logMarker: LogMarker)(implicit ec: ExecutionContext) = { implicit val implicitLogMarker: LogMarker = logMarker ++ Map("imageId" -> message.id) // do not write into migration index, even if migration is running; let the standard // images-for-migration process find and migrate it. even if it has previously // been migrated, this directInsert will wipe out the esInfo marker, requeueing this image // for migration. es.directInsert(message.image, es.imagesCurrentAlias) .recover { case t: Throwable => logger.error(logMarker, s"Failed to directly upsert image ${message.image.id} from projection", t) Future.successful(()) } } private def migrateImage(message: MigrateImageMessage, logMarker: LogMarker)(implicit ec: ExecutionContext) = { implicit val implicitLogMarker: LogMarker = logMarker ++ Map("imageId" -> message.id) val maybeStart = message.maybeImageWithVersion match { case Left(errorMessage) => Future.failed(ProjectionFailure(errorMessage)) case Right((image, expectedVersion)) => Future.successful((image, expectedVersion)) } maybeStart.flatMap { case (image, expectedVersion) => es.getImageVersion(message.id).transformWith { case Success(Some(currentVersion)) => Future.successful((image, expectedVersion, currentVersion)) case Success(None) => Future.failed(GetVersionFailure(s"No version found for image id: ${image.id}")) case Failure(exception) => Future.failed(GetVersionFailure(exception.toString)) } }.flatMap { case (image, expectedVersion, currentVersion) => if (expectedVersion == currentVersion) { Future.successful(image) } else { Future.failed(VersionComparisonFailure(s"Version comparison failed for image id: ${image.id} -> current = $currentVersion, expected = $expectedVersion")) } }.flatMap( image => es.directInsert(image, es.imagesMigrationAlias).transform { case s@Success(_) => s case Failure(exception) => Failure(InsertImageFailure(exception.toString)) } ).flatMap { insertResult => logger.info(logMarker, s"Successfully migrated image with id: ${message.id}, setting 'migratedTo' on current index") es.setMigrationInfo(imageId = message.id, migrationInfo = MigrationInfo(migratedTo = Some(insertResult.indexName))) }.recoverWith { case versionComparisonFailure: VersionComparisonFailure => logger.error(logMarker, s"Postponed migration of image with id: ${message.id}: cause: ${versionComparisonFailure.getMessage}, this will get picked up shortly") Future.successful(()) case failure: MigrationFailure => logger.error(logMarker, s"Failed to migrate image with id: ${message.id}: cause: ${failure.getMessage}, attaching failure to document in current index") val migrationIndexName = es.migrationStatus match { case running: Running => running.migrationIndexName case _ => "Unknown migration index name" } es.setMigrationInfo(imageId = message.id, migrationInfo = MigrationInfo(failures = Some(Map(migrationIndexName -> failure.getMessage)))) } } private def updateImageExports(message: UpdateImageExportsMessage, logMarker: LogMarker)(implicit ec: ExecutionContext) = Future.sequence( es.updateImageExports(message.id, message.crops, message.lastModified)(ec, logMarker)) private def deleteImageExports(message: DeleteImageExportsMessage, logMarker: LogMarker)(implicit ec: ExecutionContext) = Future.sequence( es.deleteImageExports(message.id, message.lastModified)(ec, logMarker)) private def softDeleteImage(message: SoftDeleteImageMessage, logMarker: LogMarker)(implicit ec: ExecutionContext) = Future.sequence(es.applySoftDelete(message.id, message.softDeletedMetadata, message.lastModified)(ec, logMarker)) private def unSoftDeleteImage(message: UnSoftDeleteImageMessage, logMarker: LogMarker)(implicit ec: ExecutionContext) = Future.sequence(es.applyUnSoftDelete(message.id, message.lastModified)(ec, logMarker)) private def updateImageUserMetadata(message: UpdateImageUserMetadataMessage, logMarker: LogMarker)(implicit ec: ExecutionContext) = Future.sequence(es.applyImageMetadataOverride(message.id, message.edits, message.lastModified)(ec, logMarker)) private def replaceImageLeases(message: ReplaceImageLeasesMessage, logMarker: LogMarker)(implicit ec: ExecutionContext) = Future.sequence(es.replaceImageLeases(message.id, message.leases, message.lastModified)(ec, logMarker)) private def addImageLease(message: AddImageLeaseMessage, logMarker: LogMarker)(implicit ec: ExecutionContext) = Future.sequence(es.addImageLease(message.id, message.lease, message.lastModified)(ec, logMarker)) private def removeImageLease(message: RemoveImageLeaseMessage, logMarker: LogMarker)(implicit ec: ExecutionContext): Future[List[ElasticSearchUpdateResponse]] = Future.sequence(es.removeImageLease(message.id, Some(message.leaseId), message.lastModified)(ec, logMarker)) private def setImageCollections(message: SetImageCollectionsMessage, logMarker: LogMarker)(implicit ec: ExecutionContext) = Future.sequence(es.setImageCollections(message.id, message.collections, message.lastModified)(ec, logMarker)) private def deleteImage(message: DeleteImageMessage, logMarker: LogMarker)(implicit ec: ExecutionContext) = { Future.sequence({ implicit val marker: LogMarker = logMarker ++ imageIdMarker(ImageId(message.id)) // if we cannot delete the image as it's "protected", succeed and delete // the message anyway. logger.info(marker, "ES6 Deleting image: " + message.id) es.deleteImage(message.id).map { requests => requests.map { _: ElasticSearchDeleteResponse => store.deleteOriginal(message.id) store.deleteThumbnail(message.id) store.deletePNG(message.id) // metadataEditorNotifications.publishImageDeletion(message.id) // let's not delete from Dynamo as user edits might be useful if we restore from replica EsResponse(s"Image deleted: ${message.id}") } recoverWith { case ImageNotDeletable => logger.info(marker, "Could not delete image") Future.successful(EsResponse(s"Image cannot be deleted: ${message.id}")) } } }) } private def deleteAllUsages(message: DeleteUsagesMessage, logMarker: LogMarker)(implicit ec: ExecutionContext) = Future.sequence(es.deleteAllImageUsages(message.id, message.lastModified)(ec, logMarker)) private def deleteSingleUsage(message: DeleteSingleUsageMessage, logMarker: LogMarker)(implicit ec: ExecutionContext) = { Future.sequence(es.deleteSingleImageUsage(message.id, message.usageId, message.lastModified)(ec, logMarker)) } private def updateUsageStatus(message: UpdateUsageStatusMessage, logMarker: LogMarker)(implicit ec: ExecutionContext): Future[List[ElasticSearchUpdateResponse]] = { implicit val lm: LogMarker = combineMarkers(message, logMarker) val usage = message.usageNotice.usageJson.as[Seq[Usage]] Future.traverse(es.updateUsageStatus(message.id, usage, message.lastModified ))(_.recoverWith { case ElasticNotFoundException => Future.successful(ElasticSearchUpdateResponse()) }) } def upsertSyndicationRightsOnly(message: UpdateImageSyndicationMetadataMessage, logMarker: LogMarker)(implicit ec: ExecutionContext): Future[Any] = { implicit val marker: LogMarker = logMarker ++ imageIdMarker(ImageId(message.id)) es.getImage(message.id) map { case Some(image) => val photoshoot = image.userMetadata.flatMap(_.photoshoot) logger.info(marker, s"Upserting syndication rights for image ${message.id} in photoshoot $photoshoot with rights ${Json.toJson(message.maybeSyndicationRights)}") es.updateImageSyndicationRights(message.id, message.maybeSyndicationRights, message.lastModified) case _ => logger.info(marker, s"Image ${message.id} not found") } } def updateImagePhotoshoot(message: UpdateImagePhotoshootMetadataMessage, logMarker: LogMarker)(implicit ec: ExecutionContext): Future[Unit] = { implicit val marker: LogMarker = logMarker ++ imageIdMarker(ImageId(message.id)) for { imageOpt <- es.getImage(message.id) prevPhotoshootOpt = imageOpt.flatMap(_.userMetadata.flatMap(_.photoshoot)) _ <- updateImageUserMetadata(UpdateImageUserMetadataMessage(message.id, message.lastModified, message.edits), logMarker) } yield logger.info(marker, s"Moved image ${message.id} from $prevPhotoshootOpt to ${message.edits.photoshoot}") } def createMigrationIndex(message: CreateMigrationIndexMessage, logMarker: LogMarker)(implicit ec: ExecutionContext): Future[Unit] = { Future { es.startMigration(message.newIndexName)(logMarker) } } def completeMigration(logMarker: LogMarker)(implicit ec: ExecutionContext): Future[Unit] = { es.completeMigration(logMarker) } }