def handler()

in src/main/scala/com/gu/octopusthrift/Lambda.scala [23:50]


  def handler(lambdaInput: KinesisEvent, context: Context): Unit = {
    val records: List[Record] = lambdaInput.getRecords.asScala.map(_.getKinesis).toList

    records.foreach(record => {
      val sequenceNumber = record.getSequenceNumber
      logger.info(s"Received payload, sequence number: $sequenceNumber")
      val data = record.getData().array()

      (validateSinglePayload(data), validateCachePayload(data)) match {
        case (Some(singleBundle), _) =>
          logger.info(s"Processing single story bundle, sequence number: $sequenceNumber")
          processBundle(singleBundle.data.get, sequenceNumber)
        case (None, Some(cache)) => {
          val cacheData = cache.data.get
          val messageIndex = cacheData.thismessageindex.getOrElse(0)
          val totalMessages = cacheData.totalmessages.getOrElse(0)
          logger.info(
            s"Processing daily snapshot, message $messageIndex of $totalMessages, sequence number: $sequenceNumber")
          cacheData.bundles.get.foreach(bundle => processBundle(bundle, sequenceNumber))
        }
        case _ => {
          logger.info(s"Payload does not match OctopusPayload model, sequence number: $sequenceNumber")
          deadLetterQueue.sendMessage(Json.toJson(data))
          cloudWatch.publishMetricEvent(Metrics.InvalidOctopusPayload)
        }
      }
    })
  }