in src/main/scala/com/gu/contentapi/firehose/kinesis/SingleEventProcessor.scala [35:60]
override def processRecords(input: ProcessRecordsInput): Unit = {
val events = input.records().asScala.flatMap { record =>
val buffer = record.data()
val op = ThriftDeserializer.deserialize(buffer)
op match {
case Success(event) => Some(event)
case Failure(e) => {
logger.error(s"deserialization of event buffer failed: ${e.getMessage}", e)
buffer.rewind()
val encoded = Base64.getEncoder.encode(buffer)
val b64string = new String(encoded.array(), StandardCharsets.ISO_8859_1)
logger.error(s"Offending binary content: $b64string")
None
}
}
}.toSeq //.toSeq is required on Scala 2.13 as the comprehension above gives us a mutable.Buffer which is not directly compatible with Seq.
processEvents(events)
/* increment the record counter */
recordsProcessedSinceCheckpoint.addAndGet(events.size)
if (shouldCheckpointNow) {
checkpoint(input.checkpointer())
}
}