def processNotification()

in notificationworkerlambda/src/main/scala/com/gu/notifications/worker/Harvester.scala [102:161]


  def processNotification(event: SQSEvent, tokenService: TokenService[IO]) = {
    val records = event.getRecords.asScala.toList.map(r => (NotificationParser.parseShardNotificationEvent(r.getBody), r.getAttributes))
    records.foreach {
      case (body, _) => logger.info(Map(
        "notificationId" -> body.notification.id,
      ), "Parsed notification event")
    }
    val shardNotificationStream: Stream[IO, ShardedNotification] = Stream.emits(event.getRecords.asScala)
      .map(r => r.getBody)
      .map(NotificationParser.parseShardNotificationEvent)
    try{
      queueShardedNotification(shardNotificationStream, tokenService)
        .compile
        .drain
        .unsafeRunSync()
    }catch {
      case e: Throwable => {
        records.foreach {
          case (body, _) =>
            logger.error(Map(
              "notificationId" -> body.notification.id,
              "notificationType" -> body.notification.`type`.toString,
            ), s"Error occurred: ${e.getMessage}", e)
        }
        throw e
      }
    }finally {
      records.foreach {
        case (body, attributes) => {
          val end = Instant.now
          val sentTime = Instant.ofEpochMilli(attributes.getOrDefault("SentTimestamp", "0").toLong)

          logger.info(Map(
            "_aws" -> Map(
              "Timestamp" -> end.toEpochMilli,
              "CloudWatchMetrics" -> List(Map(
                "Namespace" -> s"Notifications/${env.stage}/harvester",
                "Dimensions" -> List(List("type")),
                "Metrics" -> List(Map(
                  "Name" -> "harvester.notificationProcessingTime",
                  "Unit" -> "Milliseconds"
                ))
              ))
            ),
            "harvester.notificationProcessingTime" -> Duration.between(sentTime, end).toMillis,
            "harvester.notificationProcessingEndTime.millis" -> end.toEpochMilli,
            "harvester.notificationProcessingStartTime.millis" -> sentTime.toEpochMilli,
            "notificationId" -> body.notification.id,
            "notificationType" -> body.notification.`type`.toString,
            "type" -> {
              body.notification.`type` match {
                case _root_.models.NotificationType.BreakingNews => "breakingNews"
                case _ => "other"
              }
            }
          ), "Finished processing notification event")
        }
      }
    }
  }