admin/app/dfp/DfpDataCacheJob.scala (128 lines of code) (raw):
package dfp
import common.dfp._
import common.GuLogging
import org.joda.time.DateTime
import play.api.libs.json.Json.{toJson, _}
import tools.Store
import conf.switches.Switches.{LineItemJobs}
import scala.concurrent.{ExecutionContext, Future}
class DfpDataCacheJob(
adUnitAgent: AdUnitAgent,
customFieldAgent: CustomFieldAgent,
customTargetingAgent: CustomTargetingAgent,
placementAgent: PlacementAgent,
dfpApi: DfpApi,
) extends GuLogging {
case class LineItemLoadSummary(validLineItems: Seq[GuLineItem], invalidLineItems: Seq[GuLineItem])
def run()(implicit executionContext: ExecutionContext): Future[Unit] =
Future {
log.info("Refreshing data cache")
val start = System.currentTimeMillis
val data = loadLineItems()
val sponsorshipLineItemIds = dfpApi.readSponsorshipLineItemIds()
val currentLineItems = loadCurrentLineItems()
val duration = System.currentTimeMillis - start
log.info(s"Loading DFP data took $duration ms")
write(data)
if (LineItemJobs.isSwitchedOff) Store.putNonRefreshableLineItemIds(sponsorshipLineItemIds)
writeLiveBlogTopSponsorships(currentLineItems)
writeSurveySponsorships(currentLineItems)
}
/*
for initialization and total refresh of data,
so would be used for first read and for emergency data update.
*/
def refreshAllDfpData()(implicit executionContext: ExecutionContext): Unit = {
for {
_ <- adUnitAgent.refresh()
_ <- customFieldAgent.refresh()
_ <- customTargetingAgent.refresh()
_ <- placementAgent.refresh()
} {
loadLineItems()
}
}
private def loadCurrentLineItems(): DfpDataExtractor = {
val currentLineItems = dfpApi.readCurrentLineItems
val loadSummary = LineItemLoadSummary(
validLineItems = currentLineItems.validItems,
invalidLineItems = currentLineItems.invalidItems,
)
DfpDataExtractor(loadSummary.validLineItems, loadSummary.invalidLineItems)
}
private def loadLineItems(): DfpDataExtractor = {
def fetchCachedLineItems(): DfpLineItems = {
val lineItemReport = Store.getDfpLineItemsReport()
DfpLineItems(validItems = lineItemReport.lineItems, invalidItems = lineItemReport.invalidLineItems)
}
val start = System.currentTimeMillis
val loadSummary = loadLineItems(
fetchCachedLineItems(),
dfpApi.readLineItemsModifiedSince,
dfpApi.readCurrentLineItems,
)
val loadDuration = System.currentTimeMillis - start
log.info(s"Loading line items took $loadDuration ms")
DfpDataExtractor(loadSummary.validLineItems, loadSummary.invalidLineItems)
}
def report(ids: Iterable[Long]): String = if (ids.isEmpty) "None" else ids.mkString(", ")
def loadLineItems(
cachedLineItems: => DfpLineItems,
lineItemsModifiedSince: DateTime => DfpLineItems,
allActiveLineItems: => DfpLineItems,
): LineItemLoadSummary = {
// If the cache is empty, run a full query to generate a complete LineItemLoadSummary, using allActiveLineItems.
if (cachedLineItems.validItems.isEmpty) {
// Create a full summary object from scratch, using a query that collects all line items from dfp.
LineItemLoadSummary(
validLineItems = allActiveLineItems.validItems,
invalidLineItems = allActiveLineItems.invalidItems,
)
} else {
// Calculate the most recent modified timestamp of the existing cache items,
// and find line items modified since that timestamp.
val threshold = cachedLineItems.validItems.map(_.lastModified).maxBy(_.getMillis)
val recentlyModified = lineItemsModifiedSince(threshold)
// Update existing items with a patch of new items.
def updateCachedContent(existingItems: Seq[GuLineItem], newItems: Seq[GuLineItem]): Seq[GuLineItem] = {
// Create a combined map of all the line items, preferring newer items over old ones (equality is based on id).
val updatedLineItemMap = GuLineItem.asMap(existingItems) ++ GuLineItem.asMap(newItems)
// These are the existing, cached keys.
val existingKeys = existingItems.map(_.id).toSet
val (active, inactive) = newItems partition (Seq("READY", "DELIVERING", "DELIVERY_EXTENDED") contains _.status)
val activeKeys = active.map(_.id).toSet
val inactiveKeys = inactive.map(_.id).toSet
val added = activeKeys -- existingKeys
val modified = activeKeys intersect existingKeys
val removed = inactiveKeys intersect existingKeys
// New cache contents.
val updatedKeys = existingKeys ++ added -- removed
log.info(s"Cached line item count was ${cachedLineItems.validItems.size}")
log.info(s"Last modified time of cached line items: $threshold")
log.info(s"Added: ${report(added)}")
log.info(s"Modified: ${report(modified)}")
log.info(s"Removed: ${report(inactiveKeys)}")
log.info(s"Cached line item count now ${updatedKeys.size}")
updatedKeys.toSeq.sorted.map(updatedLineItemMap)
}
LineItemLoadSummary(
validLineItems = updateCachedContent(cachedLineItems.validItems, recentlyModified.validItems),
invalidLineItems = updateCachedContent(cachedLineItems.invalidItems, recentlyModified.invalidItems),
)
}
}
private def write(data: DfpDataExtractor): Unit = {
if (data.hasValidLineItems && LineItemJobs.isSwitchedOff) {
val now = printLondonTime(DateTime.now())
val pageSkinSponsorships = data.pageSkinSponsorships
Store.putDfpPageSkinAdUnits(stringify(toJson(PageSkinSponsorshipReport(now, pageSkinSponsorships))))
Store.putDfpLineItemsReport(stringify(toJson(LineItemReport(now, data.lineItems, data.invalidLineItems))))
}
}
private def writeLiveBlogTopSponsorships(data: DfpDataExtractor): Unit = {
if (data.hasValidLineItems) {
val now = printLondonTime(DateTime.now())
val sponsorships = data.liveBlogTopSponsorships
Store.putLiveBlogTopSponsorships(
stringify(toJson(LiveBlogTopSponsorshipReport(Some(now), sponsorships))),
)
}
}
private def writeSurveySponsorships(data: DfpDataExtractor): Unit = {
if (data.hasValidLineItems) {
val now = printLondonTime(DateTime.now())
val sponsorships = data.surveySponsorships
Store.putSurveySponsorships(
stringify(toJson(SurveySponsorshipReport(Some(now), sponsorships))),
)
}
}
}