in src/main/scala/com/gu/octopusthrift/Lambda.scala [23:50]
def handler(lambdaInput: KinesisEvent, context: Context): Unit = {
val records: List[Record] = lambdaInput.getRecords.asScala.map(_.getKinesis).toList
records.foreach(record => {
val sequenceNumber = record.getSequenceNumber
logger.info(s"Received payload, sequence number: $sequenceNumber")
val data = record.getData().array()
(validateSinglePayload(data), validateCachePayload(data)) match {
case (Some(singleBundle), _) =>
logger.info(s"Processing single story bundle, sequence number: $sequenceNumber")
processBundle(singleBundle.data.get, sequenceNumber)
case (None, Some(cache)) => {
val cacheData = cache.data.get
val messageIndex = cacheData.thismessageindex.getOrElse(0)
val totalMessages = cacheData.totalmessages.getOrElse(0)
logger.info(
s"Processing daily snapshot, message $messageIndex of $totalMessages, sequence number: $sequenceNumber")
cacheData.bundles.get.foreach(bundle => processBundle(bundle, sequenceNumber))
}
case _ => {
logger.info(s"Payload does not match OctopusPayload model, sequence number: $sequenceNumber")
deadLetterQueue.sendMessage(Json.toJson(data))
cloudWatch.publishMetricEvent(Metrics.InvalidOctopusPayload)
}
}
})
}