def apply()

in thrall/app/lib/MigrationSourceWithSender.scala [26:133]


  def apply(
    materializer: Materializer,
    innerServiceCall: WSRequest => WSRequest,
    es: ElasticSearch,
    gridClient: GridClient,
    projectionParallelism: Int,
  )(implicit ec: ExecutionContext): MigrationSourceWithSender = {

    // scroll through elasticsearch, finding image ids and versions to migrate
    // emits MigrationRequest
    val scrollingIdsSource =
      Source.repeat(())
        .throttle(1, per = 1.second)
        .statefulMapConcat(() => {
          // This Pekko-provided stage is explicitly provided as a way to safely wrap around mutable state.
          // Required here to keep a marker of the current search scroll. Scrolling prevents the
          // next search from picking up the same image ids and inserting them into the flow and
          // causing lots of version comparison failures.
          // Alternatives:
          // - Using the elastic4s `ElasticSource`
          //     (This would be ideal but is tricky due to dependency version conflicts, and it's also
          //     difficult (or impossible?) to change the query value once the stream has been materialized.)
          // - Defining our own version of the ElasticSource using our desired library versions and a system to change
          //   the query value as desired.
          // - Define an Pekko actor to handle the querying and wrap around the state.
          var maybeScrollId: Option[String] = None

          def handleScrollResponse(resp: ScrolledSearchResults) = {
            maybeScrollId = if (resp.hits.isEmpty) {
              // close scroll with provided ID if it exists
              resp.scrollId.foreach(es.closeScroll)
              None
            } else {
              resp.scrollId
            }
            resp.hits
          }

          _ => {
            val nextIdsToMigrate = ((es.migrationStatus, maybeScrollId) match {
              case (Paused(_), _) => Future.successful(List.empty)
              case (InProgress(migrationIndexName), None) =>
                es.startScrollingImageIdsToMigrate(migrationIndexName).map(handleScrollResponse)
              case (InProgress(_), Some(scrollId)) =>
                es.continueScrollingImageIdsToMigrate(scrollId).map(handleScrollResponse)
              case _ => Future.successful(List.empty)
            }).recover { case _ =>
              // close existing scroll if it exists
              maybeScrollId.foreach(es.closeScroll)
              maybeScrollId = None
              List.empty
            }
            List(nextIdsToMigrate)
          }
        })
        // flatten out the future
        .mapAsync(1)(identity)
        // flatten out the list of image ids
        .mapConcat(searchHits => {
          if (searchHits.nonEmpty) {
            logger.info(s"Flattening ${searchHits.size} image ids to migrate")
          }
          searchHits.map(hit => MigrationRequest(hit.id, hit.version))
        })
        .filter(_ => es.migrationIsInProgress)

    // receive MigrationRequests to migrate from a manual source (failures retry page, single image migration form, etc.)
    val manualIdsSourceDeclaration = Source.queue[MigrationRequest](bufferSize = 2000)
    val (manualIdsSourceMat, manualIdsSource) = manualIdsSourceDeclaration.preMaterialize()(materializer)

    def submitIdForMigration(request: MigrationRequest) =
      Future (manualIdsSourceMat.offer(request)).map {
        case QueueOfferResult.Enqueued => true
        case _ =>
          logger.warn(s"Failed to add migration message to migration queue: $request")
          false
      }.recover {
        case error: Throwable =>
          logger.error(s"Failed to add migration message to migration queue: $request", error)
          false
      }

    // merge both sources of MigrationRequest
    // priority = true prefers manualIdsSource
    val idsSource = manualIdsSource.mergePreferred(scrollingIdsSource, preferred =  true)

    // project image from MigrationRequest, produce the MigrateImageMessage
    val projectedImageSource: Source[MigrationRecord, NotUsed] = idsSource.mapAsyncUnordered(projectionParallelism) {
      case MigrationRequest(imageId, version) =>
        val migrateImageMessageFuture = (
          for {
            maybeProjection <- gridClient.getImageLoaderProjection(mediaId = imageId, innerServiceCall)
            maybeVersion = Some(version)
          } yield MigrateImageMessage(imageId, maybeProjection, maybeVersion)
        ).recover {
          case error => MigrateImageMessage(imageId, Left(error.toString))
        }
        migrateImageMessageFuture.map(message => MigrationRecord(
          payload = message,
          approximateArrivalTimestamp = java.time.Instant.now()
        ))
    }

    MigrationSourceWithSender(
      send = submitIdForMigration,
      source = projectedImageSource.mapMaterializedValue(_ => Future.successful(Done)),
    )
  }