protected Status doProcess()

in flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java [209:352]


  protected Status doProcess() throws EventDeliveryException {
    final String batchUUID = UUID.randomUUID().toString();
    String kafkaKey;
    Event event;
    byte[] eventBody;

    try {
      // prepare time variables for new batch
      final long nanoBatchStartTime = System.nanoTime();
      final long batchStartTime = System.currentTimeMillis();
      final long maxBatchEndTime = System.currentTimeMillis() + maxBatchDurationMillis;

      while (eventList.size() < batchUpperLimit &&
              System.currentTimeMillis() < maxBatchEndTime) {

        if (it == null || !it.hasNext()) {
          // Obtaining new records
          // Poll time is remainder time for current batch.
          long durMs = Math.max(0L, maxBatchEndTime - System.currentTimeMillis());
          Duration duration = Duration.ofMillis(durMs);
          ConsumerRecords<String, byte[]> records = consumer.poll(duration);
          it = records.iterator();

          // this flag is set to true in a callback when some partitions are revoked.
          // If there are any records we commit them.
          if (rebalanceFlag.compareAndSet(true, false)) {
            break;
          }
          // check records after poll
          if (!it.hasNext()) {
            counter.incrementKafkaEmptyCount();
            log.debug("Returning with backoff. No more data to read");
            // batch time exceeded
            break;
          }
        }

        // get next message
        ConsumerRecord<String, byte[]> message = it.next();
        kafkaKey = message.key();

        if (useAvroEventFormat) {
          //Assume the event is in Avro format using the AvroFlumeEvent schema
          //Will need to catch the exception if it is not
          ByteArrayInputStream in =
                  new ByteArrayInputStream(message.value());
          decoder = DecoderFactory.get().directBinaryDecoder(in, decoder);
          if (!reader.isPresent()) {
            reader = Optional.of(
                    new SpecificDatumReader<AvroFlumeEvent>(AvroFlumeEvent.class));
          }
          //This may throw an exception but it will be caught by the
          //exception handler below and logged at error
          AvroFlumeEvent avroevent = reader.get().read(null, decoder);

          eventBody = avroevent.getBody().array();
          headers = toStringMap(avroevent.getHeaders());
        } else {
          eventBody = message.value();
          headers.clear();
          headers = new HashMap<String, String>(4);
        }

        // Add headers to event (timestamp, topic, partition, key) only if they don't exist
        if (!headers.containsKey(TIMESTAMP_HEADER)) {
          headers.put(TIMESTAMP_HEADER, String.valueOf(message.timestamp()));
        }
        if (!headerMap.isEmpty()) {
          Headers kafkaHeaders = message.headers();
          for (Map.Entry<String, String> entry : headerMap.entrySet()) {
            for (Header kafkaHeader : kafkaHeaders.headers(entry.getValue())) {
              headers.put(entry.getKey(), new String(kafkaHeader.value()));
            }
          }
        }
        // Only set the topic header if setTopicHeader and it isn't already populated
        if (setTopicHeader && !headers.containsKey(topicHeader)) {
          headers.put(topicHeader, message.topic());
        }
        if (!headers.containsKey(PARTITION_HEADER)) {
          headers.put(PARTITION_HEADER, String.valueOf(message.partition()));
        }
        if (!headers.containsKey(OFFSET_HEADER)) {
          headers.put(OFFSET_HEADER, String.valueOf(message.offset()));
        }

        if (kafkaKey != null) {
          headers.put(KEY_HEADER, kafkaKey);
        }

        if (log.isTraceEnabled()) {
          if (LogPrivacyUtil.allowLogRawData()) {
            log.trace("Topic: {} Partition: {} Message: {}", new String[]{
                message.topic(),
                String.valueOf(message.partition()),
                new String(eventBody)
            });
          } else {
            log.trace("Topic: {} Partition: {} Message arrived.",
                message.topic(),
                String.valueOf(message.partition()));
          }
        }

        event = EventBuilder.withBody(eventBody, headers);
        eventList.add(event);

        if (log.isDebugEnabled()) {
          log.debug("Waited: {} ", System.currentTimeMillis() - batchStartTime);
          log.debug("Event #: {}", eventList.size());
        }

        // For each partition store next offset that is going to be read.
        tpAndOffsetMetadata.put(new TopicPartition(message.topic(), message.partition()),
                new OffsetAndMetadata(message.offset() + 1, batchUUID));
      }

      if (eventList.size() > 0) {
        counter.addToKafkaEventGetTimer((System.nanoTime() - nanoBatchStartTime) / (1000 * 1000));
        counter.addToEventReceivedCount((long) eventList.size());
        getChannelProcessor().processEventBatch(eventList);
        counter.addToEventAcceptedCount(eventList.size());
        if (log.isDebugEnabled()) {
          log.debug("Wrote {} events to channel", eventList.size());
        }
        eventList.clear();

        if (!tpAndOffsetMetadata.isEmpty()) {
          long commitStartTime = System.nanoTime();
          consumer.commitSync(tpAndOffsetMetadata);
          long commitEndTime = System.nanoTime();
          counter.addToKafkaCommitTimer((commitEndTime - commitStartTime) / (1000 * 1000));
          tpAndOffsetMetadata.clear();
        }
        return Status.READY;
      }

      return Status.BACKOFF;
    } catch (Exception e) {
      log.error("KafkaSource EXCEPTION, {}", e);
      counter.incrementEventReadOrChannelFail(e);
      return Status.BACKOFF;
    }
  }