private void consume()

in src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java [132:174]


    private void consume() throws InterruptedException {
      try {
        while (!closed.get()) {
          if (resetOffset.getAndSet(false)) {
            // Make sure there is an assignment for this consumer
            while (consumer.assignment().isEmpty() && !closed.get()) {
              logger.atInfo().log(
                  "Resetting offset: no partitions assigned to the consumer, request assignment.");
              consumer.poll(Duration.ofMillis(configuration.getPollingInterval()));
            }
            consumer.seekToBeginning(consumer.assignment());
          }
          ConsumerRecords<byte[], byte[]> consumerRecords =
              consumer.poll(Duration.ofMillis(configuration.getPollingInterval()));
          consumerRecords.forEach(
              consumerRecord -> {
                try (ManualRequestContext ctx = oneOffCtx.open()) {
                  EventMessage event =
                      valueDeserializer.deserialize(consumerRecord.topic(), consumerRecord.value());
                  messageProcessor.accept(event);
                } catch (Exception e) {
                  logger.atSevere().withCause(e).log(
                      "Malformed event '%s': [Exception: %s]",
                      new String(consumerRecord.value(), UTF_8));
                  subscriberMetrics.incrementSubscriberFailedToConsumeMessage();
                }
              });
        }
      } catch (WakeupException e) {
        // Ignore exception if closing
        if (!closed.get()) {
          logger.atSevere().withCause(e).log("Consumer loop of topic %s interrupted", topic);
          reconnectAfterFailure();
        }
      } catch (Exception e) {
        subscriberMetrics.incrementSubscriberFailedToPollMessages();
        logger.atSevere().withCause(e).log(
            "Existing consumer loop of topic %s because of a non-recoverable exception", topic);
        reconnectAfterFailure();
      } finally {
        consumer.close();
      }
    }