usage/app/lib/CrierEventProcessor.scala (155 lines of code) (raw):
package lib
import com.gu.contentapi.client.ScheduledExecutor
import com.gu.contentapi.client.model.ContentApiError
import com.gu.contentapi.client.model.v1.Content
import com.gu.crier.model.event.v1.{Event, EventPayload, EventType}
import com.gu.mediaservice.lib.logging.{GridLogging, LogMarker, MarkerMap}
import com.gu.mediaservice.model.usage.{PendingUsageStatus, PublishedUsageStatus}
import com.gu.thrift.serializer.ThriftDeserializer
import com.twitter.scrooge.ThriftStructCodec
import model.{UsageGroup, UsageGroupOps}
import org.joda.time.DateTime
import rx.lang.scala.Subject
import rx.lang.scala.subjects.PublishSubject
import software.amazon.kinesis.exceptions.ShutdownException
import software.amazon.kinesis.leases.exceptions.InvalidStateException
import software.amazon.kinesis.lifecycle.events._
import software.amazon.kinesis.processor.ShardRecordProcessor
import java.util.UUID
import scala.concurrent.ExecutionContext.Implicits.global
import scala.jdk.CollectionConverters._
import scala.util.Try
trait ContentContainer extends GridLogging {
val content: Content
val lastModified: DateTime
val isReindex: Boolean
private lazy val isEntirePieceTakenDown =
content.fields.exists(fields => fields.firstPublicationDate.isDefined && fields.isLive.contains(false))
def emitAsUsageGroup(
publishSubject: Subject[WithLogMarker[UsageGroup]], usageGroupOps: UsageGroupOps
)(implicit logMarker: LogMarker): Unit = {
usageGroupOps.build(
content,
status = this match {
case PreviewContentItem(_,_,_) => PendingUsageStatus
case LiveContentItem(_,_,_) => PublishedUsageStatus
},
lastModified,
isReindex
) match {
case None => logger.debug(logMarker, s"No fields in content of crier update for payload with content ID ${content.id}")
case Some(usageGroup) =>
val groupingLogMarker = logMarker ++ Map("usageGroup" -> usageGroup.grouping)
publishSubject.onNext(WithLogMarker(groupingLogMarker, usageGroup))
if (this.isInstanceOf[PreviewContentItem] && isEntirePieceTakenDown) {
logger.info(groupingLogMarker, s"${usageGroup.grouping} is taken down so producing empty UsageGroup to ensure any 'published' DB records are marked as removed")
publishSubject.onNext(WithLogMarker(groupingLogMarker, usageGroup.copy(
usages = Set.empty,
maybeStatus = Some(PublishedUsageStatus)
)))
}
}
}
}
object CrierUsageStream {
val observable: Subject[WithLogMarker[UsageGroup]] = PublishSubject[WithLogMarker[UsageGroup]]()
}
case class LiveContentItem(content: Content, lastModified: DateTime, isReindex: Boolean = false) extends ContentContainer
case class PreviewContentItem(content: Content, lastModified: DateTime, isReindex: Boolean = false) extends ContentContainer
abstract class CrierEventProcessor(config: UsageConfig, usageGroupOps: UsageGroupOps) extends ShardRecordProcessor with GridLogging {
implicit val codec: ThriftStructCodec[Event] = Event
val contentApiClient: UsageContentApiClient
override def initialize(initializationInput: InitializationInput): Unit = {
logger.debug(s"Initialized an event processor for shard ${initializationInput.shardId}")
}
override def processRecords(processRecordsInput: ProcessRecordsInput): Unit = {
val records = processRecordsInput.records
records.asScala.foreach { record =>
val deserialization: Try[Event] = ThriftDeserializer.deserialize(record.data)
deserialization.foreach(processEvent)
deserialization.failed.foreach { e: Throwable =>
logger.error("Failed to deserialize crier event", e)
}
}
val lastRecord = records.asScala.last
processRecordsInput.checkpointer.checkpoint(lastRecord.sequenceNumber(), lastRecord.subSequenceNumber())
}
override def leaseLost(leaseLostInput: LeaseLostInput): Unit = {
// nothing to do?
logger.debug("Lost lease, so stopping processing Crier")
}
override def shardEnded(shardEndedInput: ShardEndedInput): Unit = {
try {
shardEndedInput.checkpointer.checkpoint()
logger.debug("Shard ended, so stopping processing Crier")
} catch {
case _: ShutdownException | _: InvalidStateException =>
()
}
}
override def shutdownRequested(shutdownRequestedInput: ShutdownRequestedInput): Unit = {
try {
shutdownRequestedInput.checkpointer.checkpoint()
logger.debug("Shutdown requested, so stopping processing Crier")
} catch {
case _: ShutdownException | _: InvalidStateException =>
()
}
}
def getContentItem(content: Content, time: DateTime): ContentContainer
private def processEvent(event: Event): Unit = {
implicit val logMarker: LogMarker = MarkerMap(
"payloadId" -> event.payloadId,
"requestId" -> UUID.randomUUID().toString
)
Try {
val dateTime: DateTime = new DateTime(event.dateTime)
event.eventType match {
case EventType.Update =>
event.payload match {
case Some(content: EventPayload.Content) =>
getContentItem(content.content, dateTime)
.emitAsUsageGroup(CrierUsageStream.observable, usageGroupOps)
case _ =>
logger.warn(logMarker, s"Received crier update for ${event.payloadId} without payload")
}
case EventType.Delete =>
//TODO: how do we deal with a piece of content that has been deleted?
case EventType.RetrievableUpdate =>
event.payload match {
case Some(retrievableContent: EventPayload.RetrievableContent) =>
val capiUrl = retrievableContent.retrievableContent.capiUrl
val query = contentApiClient.usageQuery(retrievableContent.retrievableContent.id)
logger.info(logMarker, s"retrieving content event at $capiUrl parsed to id ${query.toString}")
contentApiClient.getResponse(query).map(response => {
response.content match {
case Some(content) =>
getContentItem(content, dateTime)
.emitAsUsageGroup(CrierUsageStream.observable, usageGroupOps)
case _ =>
logger.debug(
logMarker,
s"Received retrievable update for ${retrievableContent.retrievableContent.id} without content"
)
}
}).recover {
case e: ContentApiError =>
logger.error(logMarker, s"CAPI error when fetching content update for ${event.payloadId}: ${e.httpStatus} ${e.httpMessage} ${e.errorResponse}", e)
case e =>
logger.error(logMarker, s"Failed to fetch or process content update for ${event.payloadId}", e)
}
case _ => logger.warn(logMarker, s"Received crier update for ${event.payloadId} without payload")
}
case _ => logger.warn(logMarker, s"Unsupported event type $EventType")
}
}.recover {
case e => logger.error(logMarker, s"Failed to process event ${event.payloadId}", e)
}
}
}
private class CrierLiveEventProcessor(config: UsageConfig, usageGroupOps: UsageGroupOps) extends CrierEventProcessor(config, usageGroupOps) {
def getContentItem(content: Content, date: DateTime): ContentContainer = LiveContentItem(content, date)
override val contentApiClient: LiveContentApi = new LiveContentApi(config)(ScheduledExecutor())
}
private class CrierPreviewEventProcessor(config: UsageConfig, usageGroupOps: UsageGroupOps) extends CrierEventProcessor(config, usageGroupOps) {
def getContentItem(content: Content, date: DateTime): ContentContainer = PreviewContentItem(content, date)
override val contentApiClient: PreviewContentApi = new PreviewContentApi(config)(ScheduledExecutor())
}