in notificationworkerlambda/src/main/scala/com/gu/notifications/worker/Harvester.scala [102:161]
def processNotification(event: SQSEvent, tokenService: TokenService[IO]) = {
val records = event.getRecords.asScala.toList.map(r => (NotificationParser.parseShardNotificationEvent(r.getBody), r.getAttributes))
records.foreach {
case (body, _) => logger.info(Map(
"notificationId" -> body.notification.id,
), "Parsed notification event")
}
val shardNotificationStream: Stream[IO, ShardedNotification] = Stream.emits(event.getRecords.asScala)
.map(r => r.getBody)
.map(NotificationParser.parseShardNotificationEvent)
try{
queueShardedNotification(shardNotificationStream, tokenService)
.compile
.drain
.unsafeRunSync()
}catch {
case e: Throwable => {
records.foreach {
case (body, _) =>
logger.error(Map(
"notificationId" -> body.notification.id,
"notificationType" -> body.notification.`type`.toString,
), s"Error occurred: ${e.getMessage}", e)
}
throw e
}
}finally {
records.foreach {
case (body, attributes) => {
val end = Instant.now
val sentTime = Instant.ofEpochMilli(attributes.getOrDefault("SentTimestamp", "0").toLong)
logger.info(Map(
"_aws" -> Map(
"Timestamp" -> end.toEpochMilli,
"CloudWatchMetrics" -> List(Map(
"Namespace" -> s"Notifications/${env.stage}/harvester",
"Dimensions" -> List(List("type")),
"Metrics" -> List(Map(
"Name" -> "harvester.notificationProcessingTime",
"Unit" -> "Milliseconds"
))
))
),
"harvester.notificationProcessingTime" -> Duration.between(sentTime, end).toMillis,
"harvester.notificationProcessingEndTime.millis" -> end.toEpochMilli,
"harvester.notificationProcessingStartTime.millis" -> sentTime.toEpochMilli,
"notificationId" -> body.notification.id,
"notificationType" -> body.notification.`type`.toString,
"type" -> {
body.notification.`type` match {
case _root_.models.NotificationType.BreakingNews => "breakingNews"
case _ => "other"
}
}
), "Finished processing notification event")
}
}
}
}