in thrall/app/lib/kinesis/MessageProcessor.scala [84:123]
private def migrateImage(message: MigrateImageMessage, logMarker: LogMarker)(implicit ec: ExecutionContext) = {
implicit val implicitLogMarker: LogMarker = logMarker ++ Map("imageId" -> message.id)
val maybeStart = message.maybeImageWithVersion match {
case Left(errorMessage) =>
Future.failed(ProjectionFailure(errorMessage))
case Right((image, expectedVersion)) => Future.successful((image, expectedVersion))
}
maybeStart.flatMap {
case (image, expectedVersion) => es.getImageVersion(message.id).transformWith {
case Success(Some(currentVersion)) => Future.successful((image, expectedVersion, currentVersion))
case Success(None) => Future.failed(GetVersionFailure(s"No version found for image id: ${image.id}"))
case Failure(exception) => Future.failed(GetVersionFailure(exception.toString))
}
}.flatMap {
case (image, expectedVersion, currentVersion) => if (expectedVersion == currentVersion) {
Future.successful(image)
} else {
Future.failed(VersionComparisonFailure(s"Version comparison failed for image id: ${image.id} -> current = $currentVersion, expected = $expectedVersion"))
}
}.flatMap(
image => es.directInsert(image, es.imagesMigrationAlias).transform {
case s@Success(_) => s
case Failure(exception) => Failure(InsertImageFailure(exception.toString))
}
).flatMap { insertResult =>
logger.info(logMarker, s"Successfully migrated image with id: ${message.id}, setting 'migratedTo' on current index")
es.setMigrationInfo(imageId = message.id, migrationInfo = MigrationInfo(migratedTo = Some(insertResult.indexName)))
}.recoverWith {
case versionComparisonFailure: VersionComparisonFailure =>
logger.error(logMarker, s"Postponed migration of image with id: ${message.id}: cause: ${versionComparisonFailure.getMessage}, this will get picked up shortly")
Future.successful(())
case failure: MigrationFailure =>
logger.error(logMarker, s"Failed to migrate image with id: ${message.id}: cause: ${failure.getMessage}, attaching failure to document in current index")
val migrationIndexName = es.migrationStatus match {
case running: Running => running.migrationIndexName
case _ => "Unknown migration index name"
}
es.setMigrationInfo(imageId = message.id, migrationInfo = MigrationInfo(failures = Some(Map(migrationIndexName -> failure.getMessage))))
}
}