in src/main/scala/com/gu/octopusthrift/Lambda.scala [52:72]
private def processBundle(octopusBundle: OctopusBundle, sequenceNumber: String): Unit = {
val stream = new Kinesis(Config.apply)
if (isValidBundle(octopusBundle)) {
Try(octopusBundle.as[StoryBundle]) match {
case Success(bundle) =>
logger.info(
s"Bundle passed validation, sequence number: $sequenceNumber, composer ID: ${bundle.composerId}")
val serializedThriftBundle = serializeToBytes(bundle)
stream.publish(serializedThriftBundle)
case Failure(e) =>
logger.info(
s"Bundle failed validation as StoryBundle, sequence number: $sequenceNumber, with error: $e")
cloudWatch.publishMetricEvent(Metrics.FailedThriftConversion)
deadLetterQueue.sendMessage(Json.toJson(octopusBundle))
}
} else {
logger.info(s"Bundle failed validation, sequence number: $sequenceNumber")
cloudWatch.publishMetricEvent(Metrics.MissingMandatoryBundleData)
}
}