in src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java [61:85]
public void processRecords(ProcessRecordsInput processRecordsInput) {
try {
logger.atInfo().log("Processing %s record(s)", processRecordsInput.records().size());
processRecordsInput
.records()
.forEach(
consumerRecord -> {
logger.atFiner().log(
"GERRIT > Processing record pk: %s -- %s",
consumerRecord.partitionKey(), consumerRecord.sequenceNumber());
byte[] byteRecord = new byte[consumerRecord.data().remaining()];
consumerRecord.data().get(byteRecord);
String jsonMessage = new String(byteRecord);
logger.atFiner().log("Kinesis consumed event: '%s'", jsonMessage);
try (ManualRequestContext ctx = oneOffCtx.open()) {
Event eventMessage = eventDeserializer.deserialize(jsonMessage);
recordProcessor.accept(eventMessage);
} catch (Exception e) {
logger.atSevere().withCause(e).log("Could not process event '%s'", jsonMessage);
}
});
} catch (Throwable t) {
logger.atSevere().withCause(t).log("Caught throwable while processing records. Aborting.");
}
}