def handle()

in src/main/scala/com/gu/fastly/Lambda.scala [111:170]


  def handle(event: KinesisEvent) {
    val rawRecords: List[Record] =
      event.getRecords.asScala.map(_.getKinesis).toList
    val userRecords = UserRecord.deaggregate(rawRecords.asJava)

    println(s"Processing ${userRecords.size} records ...")
    val events = CrierEventDeserializer.deserializeEvents(userRecords.asScala)

    val successfulContentDecaches = CrierEventProcessor.process(events) {
      event =>
        event.itemType match {
          case ItemType.Content =>
            raiseAllThePurges(event)
          case _ =>
            // for now we only send purges for content, so ignore any other events
            None
        }
    }

    // Post decache actions
    // We should be talking about a list of post purge actions to be performing on these path

    // Purge AMP pages
    successfulContentDecaches.foreach { decache =>
      if (decache.eventType == EventType.Delete) {
        decache.paths.foreach { path =>
          AmpFlusher.sendAmpDeleteRequest(path)
        }
      }
    }

    // At this point, successfulPurges is a filtered list of all fastly requests that
    // were fully successful (i.e. where _all_ de-cache requests returned a 200 response)
    //
    // Now we can notify consumers that listen for successful de-cache events by sending
    // com.gu.crier.model.event.v1.Event events thrift serialized and base64 encoded
    successfulContentDecaches.foreach { decache =>
      try {
        makeContentDecachedEventsFromDecache(decache).map { decachedEvent =>
          val publishRequest = new PublishRequest()
          publishRequest.setTopicArn(config.decachedContentTopic)
          publishRequest.setMessage(
            ContentDecachedEventSerializer.serialize(decachedEvent)
          )
          publishRequest.addMessageAttributesEntry(
            "path",
            new MessageAttributeValue()
              .withDataType("String")
              .withStringValue(decachedEvent.contentPath)
          )
          snsClient.publish(publishRequest)
        }
      } catch {
        case t: Throwable =>
          println("Warning; publish sns decached event failed: ${t.getMessage}")
      }
    }

    println(s"Finished.")
  }