private def getUpdatesStream()

in usage/app/lib/UsageRecorder.scala [65:127]


  private def getUpdatesStream(dbMatchStream:  Observable[WithLogMarker[MatchedUsageGroup]]): Observable[WithLogMarker[Set[String]]] = {
    dbMatchStream.flatMap(matchedUsageGroupWithContext => {
      implicit val logMarker: LogMarker = matchedUsageGroupWithContext.logMarker
      val matchedUsageGroup = matchedUsageGroupWithContext.value

      val usageGroup = matchedUsageGroup.usageGroup
      val dbUsages = usageGroup.maybeStatus.fold(
        matchedUsageGroup.dbUsages
      )(
        status => matchedUsageGroup.dbUsages.filter(dbUsage => dbUsage.status == status && !dbUsage.isRemoved)
      )
      val dbUsageMap = dbUsages.map(_.entry).toMap
      val dbUsageKeys = dbUsageMap.keySet

      val streamUsageMap = usageGroup.usages.map(_.entry).toMap
      val streamUsageKeys = streamUsageMap.keySet

      dbUsages.foreach(mediaUsage => {
        logger.info(logMarker, s"Seen DB Usage for ${mediaUsage.mediaId}")
      })
      usageGroup.usages.foreach(mediaUsage => {
        logger.info(logMarker, s"Seen Stream Usage for ${mediaUsage.mediaId}")
      })

      def performAndLogDBOperation(func: MediaUsage => Observable[JsObject], opName: String)(mediaUsage: MediaUsage) = {
        val resultObservable = func(mediaUsage)
        resultObservable.foreach(result => {
          logger.info(
            logMarker,
            s"'$opName' DB Operation for ${mediaUsage.grouping} - on mediaID: ${mediaUsage.mediaId} with result: $result"
          )
          usageMetrics.incrementUpdated
        })
        resultObservable
      }

      val markAsRemovedOps = dbUsageKeys.diff(streamUsageKeys)
        .flatMap(dbUsageMap.get)
        .map(performAndLogDBOperation(usageTable.markAsRemoved, "markAsRemoved"))

      val createOps = (if(usageGroup.isReindex) streamUsageKeys else streamUsageKeys.diff(dbUsageKeys))
        .flatMap(streamUsageMap.get)
        .map(performAndLogDBOperation(usageTable.create, "create"))

      val updateOps = (if (usageGroup.isReindex) Set() else streamUsageKeys.intersect(dbUsageKeys))
        .flatMap(streamUsageMap.get)
        .diff(dbUsages) // to avoid updating to the same data that's already in the DB
        .map(performAndLogDBOperation(usageTable.update, "update"))

      val mediaIdsImplicatedInDBUpdates =
        (usageGroup.usages ++ dbUsages)
          .filter(_.isGridLikeId)
          .map(_.mediaId)

      Observable.from(markAsRemovedOps ++ updateOps ++ createOps)
        .flatten[JsObject]
        .toSeq // observable emits exactly once, when all ops complete and have been emitted, or immediately if there are 0 ops
        .map(_ => {
          logger.info(logMarker, s"Emitting ${mediaIdsImplicatedInDBUpdates.size} media IDs for notification")
          WithLogMarker(mediaIdsImplicatedInDBUpdates)
        })
    })
  }