in src/main/scala/com/gu/mobile/content/notifications/CapiEventProcessor.scala [12:28]
def process(records: Seq[KinesisClientRecord])(sendNotification: Event => Future[Boolean])(implicit ec: ExecutionContext): Future[Int] = {
val maybeNotificationsSent = records.map { record =>
ThriftDeserializer.deserialize(record.data().array())(Event) match {
case Success(event) => sendNotification(event)
case Failure(error) =>
logger.error(s"Failed to deserialize Kinesis record: ${error.getMessage}", error)
Future.successful(false)
}
}
Future.sequence(maybeNotificationsSent).map {
notificationsSent =>
val notificationCount = notificationsSent.count(_ == true)
logger.info(s"Sent $notificationCount notifications")
notificationCount
}
}