def process()

in src/main/scala/com/gu/mobile/content/notifications/CapiEventProcessor.scala [12:28]


  def process(records: Seq[KinesisClientRecord])(sendNotification: Event => Future[Boolean])(implicit ec: ExecutionContext): Future[Int] = {
    val maybeNotificationsSent = records.map { record =>
      ThriftDeserializer.deserialize(record.data().array())(Event) match {
        case Success(event) => sendNotification(event)
        case Failure(error) =>
          logger.error(s"Failed to deserialize Kinesis record: ${error.getMessage}", error)
          Future.successful(false)
      }
    }

    Future.sequence(maybeNotificationsSent).map {
      notificationsSent =>
        val notificationCount = notificationsSent.count(_ == true)
        logger.info(s"Sent $notificationCount notifications")
        notificationCount
    }
  }