private def migrateImage()

in thrall/app/lib/kinesis/MessageProcessor.scala [84:123]


  private def migrateImage(message: MigrateImageMessage, logMarker: LogMarker)(implicit ec: ExecutionContext) = {
    implicit val implicitLogMarker: LogMarker = logMarker ++ Map("imageId" -> message.id)
    val maybeStart = message.maybeImageWithVersion match {
      case Left(errorMessage) =>
        Future.failed(ProjectionFailure(errorMessage))
      case Right((image, expectedVersion)) => Future.successful((image, expectedVersion))
    }
    maybeStart.flatMap {
      case (image, expectedVersion) => es.getImageVersion(message.id).transformWith {
        case Success(Some(currentVersion)) => Future.successful((image, expectedVersion, currentVersion))
        case Success(None) => Future.failed(GetVersionFailure(s"No version found for image id: ${image.id}"))
        case Failure(exception) => Future.failed(GetVersionFailure(exception.toString))
      }
    }.flatMap {
      case (image, expectedVersion, currentVersion) => if (expectedVersion == currentVersion) {
        Future.successful(image)
      } else {
        Future.failed(VersionComparisonFailure(s"Version comparison failed for image id: ${image.id} -> current = $currentVersion, expected = $expectedVersion"))
      }
    }.flatMap(
      image => es.directInsert(image, es.imagesMigrationAlias).transform {
        case s@Success(_) => s
        case Failure(exception) => Failure(InsertImageFailure(exception.toString))
      }
    ).flatMap { insertResult =>
      logger.info(logMarker, s"Successfully migrated image with id: ${message.id}, setting 'migratedTo' on current index")
      es.setMigrationInfo(imageId = message.id, migrationInfo = MigrationInfo(migratedTo = Some(insertResult.indexName)))
    }.recoverWith {
      case versionComparisonFailure: VersionComparisonFailure =>
        logger.error(logMarker, s"Postponed migration of image with id: ${message.id}: cause: ${versionComparisonFailure.getMessage}, this will get picked up shortly")
        Future.successful(())
      case failure: MigrationFailure =>
        logger.error(logMarker, s"Failed to migrate image with id: ${message.id}: cause: ${failure.getMessage}, attaching failure to document in current index")
        val migrationIndexName = es.migrationStatus match {
          case running: Running => running.migrationIndexName
          case _ => "Unknown migration index name"
        }
        es.setMigrationInfo(imageId = message.id, migrationInfo = MigrationInfo(failures = Some(Map(migrationIndexName -> failure.getMessage))))
    }
  }