in usage/app/lib/UsageRecorder.scala [65:127]
private def getUpdatesStream(dbMatchStream: Observable[WithLogMarker[MatchedUsageGroup]]): Observable[WithLogMarker[Set[String]]] = {
dbMatchStream.flatMap(matchedUsageGroupWithContext => {
implicit val logMarker: LogMarker = matchedUsageGroupWithContext.logMarker
val matchedUsageGroup = matchedUsageGroupWithContext.value
val usageGroup = matchedUsageGroup.usageGroup
val dbUsages = usageGroup.maybeStatus.fold(
matchedUsageGroup.dbUsages
)(
status => matchedUsageGroup.dbUsages.filter(dbUsage => dbUsage.status == status && !dbUsage.isRemoved)
)
val dbUsageMap = dbUsages.map(_.entry).toMap
val dbUsageKeys = dbUsageMap.keySet
val streamUsageMap = usageGroup.usages.map(_.entry).toMap
val streamUsageKeys = streamUsageMap.keySet
dbUsages.foreach(mediaUsage => {
logger.info(logMarker, s"Seen DB Usage for ${mediaUsage.mediaId}")
})
usageGroup.usages.foreach(mediaUsage => {
logger.info(logMarker, s"Seen Stream Usage for ${mediaUsage.mediaId}")
})
def performAndLogDBOperation(func: MediaUsage => Observable[JsObject], opName: String)(mediaUsage: MediaUsage) = {
val resultObservable = func(mediaUsage)
resultObservable.foreach(result => {
logger.info(
logMarker,
s"'$opName' DB Operation for ${mediaUsage.grouping} - on mediaID: ${mediaUsage.mediaId} with result: $result"
)
usageMetrics.incrementUpdated
})
resultObservable
}
val markAsRemovedOps = dbUsageKeys.diff(streamUsageKeys)
.flatMap(dbUsageMap.get)
.map(performAndLogDBOperation(usageTable.markAsRemoved, "markAsRemoved"))
val createOps = (if(usageGroup.isReindex) streamUsageKeys else streamUsageKeys.diff(dbUsageKeys))
.flatMap(streamUsageMap.get)
.map(performAndLogDBOperation(usageTable.create, "create"))
val updateOps = (if (usageGroup.isReindex) Set() else streamUsageKeys.intersect(dbUsageKeys))
.flatMap(streamUsageMap.get)
.diff(dbUsages) // to avoid updating to the same data that's already in the DB
.map(performAndLogDBOperation(usageTable.update, "update"))
val mediaIdsImplicatedInDBUpdates =
(usageGroup.usages ++ dbUsages)
.filter(_.isGridLikeId)
.map(_.mediaId)
Observable.from(markAsRemovedOps ++ updateOps ++ createOps)
.flatten[JsObject]
.toSeq // observable emits exactly once, when all ops complete and have been emitted, or immediately if there are 0 ops
.map(_ => {
logger.info(logMarker, s"Emitting ${mediaIdsImplicatedInDBUpdates.size} media IDs for notification")
WithLogMarker(mediaIdsImplicatedInDBUpdates)
})
})
}