in src/main/scala/com/gu/mobile/content/notifications/Lambda.scala [57:88]
def handler(event: KinesisEvent): Unit = {
val eventRecords: List[KinesisEvent.Record] = event.getRecords.asScala.toList.map(_.getKinesis)
val records = eventRecords.map(kinesisEventRecordToRecord)
val aggregatorUtil = new AggregatorUtil()
val userRecords: List[KinesisClientRecord] = aggregatorUtil.deaggregate(records.asJava).asScala.toList
CapiEventProcessor.process(userRecords) { event =>
event.eventType match {
case EventType.Update =>
event.payload.map {
case EventPayload.Content(content) =>
logger.debug(s"Handle content update ${content.id}")
val send = processContent(content)
Future.successful(send)
case EventPayload.RetrievableContent(content) =>
logger.debug(s"Handle retrievable content or not: ${content.id}")
handleRetrievableContent(content)
case EventPayload.Atom(atomAlias) =>
logger.info(s"Unsupported content type: Atom, with id ${atomAlias.id}")
Future.successful(false)
case UnknownUnionField(e) =>
logger.error(s"Unknown event payload $e. Consider updating capi models")
Future.successful(false)
case _ =>
logger.warn(s"Unknown event payload ${event.payload}. Consider updating capi models")
Future.successful(false)
}.getOrElse(Future.successful(false))
case _ =>
logger.info("Received non-updatable event type")
Future.successful(false)
}
}
}