public void processRecords()

in src/main/java/com/googlesource/gerrit/plugins/kinesis/KinesisRecordProcessor.java [61:85]


  public void processRecords(ProcessRecordsInput processRecordsInput) {
    try {
      logger.atInfo().log("Processing %s record(s)", processRecordsInput.records().size());
      processRecordsInput
          .records()
          .forEach(
              consumerRecord -> {
                logger.atFiner().log(
                    "GERRIT > Processing record pk: %s -- %s",
                    consumerRecord.partitionKey(), consumerRecord.sequenceNumber());
                byte[] byteRecord = new byte[consumerRecord.data().remaining()];
                consumerRecord.data().get(byteRecord);
                String jsonMessage = new String(byteRecord);
                logger.atFiner().log("Kinesis consumed event: '%s'", jsonMessage);
                try (ManualRequestContext ctx = oneOffCtx.open()) {
                  Event eventMessage = eventDeserializer.deserialize(jsonMessage);
                  recordProcessor.accept(eventMessage);
                } catch (Exception e) {
                  logger.atSevere().withCause(e).log("Could not process event '%s'", jsonMessage);
                }
              });
    } catch (Throwable t) {
      logger.atSevere().withCause(t).log("Caught throwable while processing records. Aborting.");
    }
  }