in src/main/scala/com/gu/fastly/Lambda.scala [111:170]
def handle(event: KinesisEvent) {
val rawRecords: List[Record] =
event.getRecords.asScala.map(_.getKinesis).toList
val userRecords = UserRecord.deaggregate(rawRecords.asJava)
println(s"Processing ${userRecords.size} records ...")
val events = CrierEventDeserializer.deserializeEvents(userRecords.asScala)
val successfulContentDecaches = CrierEventProcessor.process(events) {
event =>
event.itemType match {
case ItemType.Content =>
raiseAllThePurges(event)
case _ =>
// for now we only send purges for content, so ignore any other events
None
}
}
// Post decache actions
// We should be talking about a list of post purge actions to be performing on these path
// Purge AMP pages
successfulContentDecaches.foreach { decache =>
if (decache.eventType == EventType.Delete) {
decache.paths.foreach { path =>
AmpFlusher.sendAmpDeleteRequest(path)
}
}
}
// At this point, successfulPurges is a filtered list of all fastly requests that
// were fully successful (i.e. where _all_ de-cache requests returned a 200 response)
//
// Now we can notify consumers that listen for successful de-cache events by sending
// com.gu.crier.model.event.v1.Event events thrift serialized and base64 encoded
successfulContentDecaches.foreach { decache =>
try {
makeContentDecachedEventsFromDecache(decache).map { decachedEvent =>
val publishRequest = new PublishRequest()
publishRequest.setTopicArn(config.decachedContentTopic)
publishRequest.setMessage(
ContentDecachedEventSerializer.serialize(decachedEvent)
)
publishRequest.addMessageAttributesEntry(
"path",
new MessageAttributeValue()
.withDataType("String")
.withStringValue(decachedEvent.contentPath)
)
snsClient.publish(publishRequest)
}
} catch {
case t: Throwable =>
println("Warning; publish sns decached event failed: ${t.getMessage}")
}
}
println(s"Finished.")
}