override def processRecords()

in src/main/scala/com/gu/contentapi/firehose/kinesis/SingleEventProcessor.scala [35:60]


  override def processRecords(input: ProcessRecordsInput): Unit = {
    val events = input.records().asScala.flatMap { record =>
      val buffer = record.data()
      val op = ThriftDeserializer.deserialize(buffer)
      op match {
        case Success(event) => Some(event)
        case Failure(e) => {
          logger.error(s"deserialization of event buffer failed: ${e.getMessage}", e)
          buffer.rewind()
          val encoded = Base64.getEncoder.encode(buffer)
          val b64string = new String(encoded.array(), StandardCharsets.ISO_8859_1)
          logger.error(s"Offending binary content: $b64string")
          None
        }
      }
    }.toSeq //.toSeq is required on Scala 2.13 as the comprehension above gives us a mutable.Buffer which is not directly compatible with Seq.

    processEvents(events)

    /* increment the record counter */
    recordsProcessedSinceCheckpoint.addAndGet(events.size)

    if (shouldCheckpointNow) {
      checkpoint(input.checkpointer())
    }
  }