public Status process()

in flume-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java [139:308]


  public Status process() throws EventDeliveryException {
    Status result = Status.READY;
    Channel channel = getChannel();
    Transaction transaction = null;
    Event event = null;
    String eventTopic = null;
    String eventKey = null;

    try {
      long processedEvents = 0;

      transaction = channel.getTransaction();
      transaction.begin();
      if (useKafkaTransactions) {
        producer.beginTransaction();
      }

      kafkaFutures.clear();
      long batchStartTime = System.nanoTime();
      for (; processedEvents < batchSize; processedEvents += 1) {
        event = channel.take();

        if (event == null) {
          // no events available in channel
          if (processedEvents == 0) {
            result = Status.BACKOFF;
            counter.incrementBatchEmptyCount();
          } else {
            counter.incrementBatchUnderflowCount();
          }
          break;
        }
        counter.incrementEventDrainAttemptCount();

        byte[] eventBody = event.getBody();
        Map<String, String> headers = event.getHeaders();

        if (allowTopicOverride) {
          eventTopic = headers.get(topicHeader);
          if (eventTopic == null) {
            eventTopic = BucketPath.escapeString(topic, event.getHeaders());
            logger.debug("{} was set to true but header {} was null. Producing to {}" +
                " topic instead.",
                new Object[]{KafkaSinkConstants.ALLOW_TOPIC_OVERRIDE_HEADER,
                    topicHeader, eventTopic});
          }
        } else {
          eventTopic = topic;
        }

        eventKey = headers.get(KafkaSinkConstants.KEY_HEADER);
        if (logger.isTraceEnabled()) {
          if (LogPrivacyUtil.allowLogRawData()) {
            logger.trace("{Event} " + eventTopic + " : " + eventKey + " : "
                + new String(eventBody, StandardCharsets.UTF_8));
          } else {
            logger.trace("{Event} " + eventTopic + " : " + eventKey);
          }
        }
        logger.debug("event #{}", processedEvents);

        // create a message and add to buffer
        long startTime = System.currentTimeMillis();

        Integer partitionId = null;
        try {
          ProducerRecord<String, byte[]> record;
          if (staticPartitionId != null) {
            partitionId = staticPartitionId;
          }
          //Allow a specified header to override a static ID
          if (partitionHeader != null) {
            String headerVal = event.getHeaders().get(partitionHeader);
            if (headerVal != null) {
              partitionId = Integer.parseInt(headerVal);
            }
          }
          Long timestamp = null;
          if (timestampHeader != null) {
            String value = headers.get(timestampHeader);
            if (value != null) {
              try {
                timestamp = Long.parseLong(value);
              } catch (Exception ex) {
                logger.warn("Invalid timestamp in header {} - {}", timestampHeader, value);
              }
            }
          }
          List<Header> kafkaHeaders = null;
          if (!headerMap.isEmpty()) {
            List<Header> tempHeaders = new ArrayList<>();
            for (Map.Entry<String, String> entry : headerMap.entrySet()) {
              String value = headers.get(entry.getKey());
              if (value != null) {
                tempHeaders.add(new RecordHeader(entry.getValue(),
                    value.getBytes(StandardCharsets.UTF_8)));
              }
            }
            if (!tempHeaders.isEmpty()) {
              kafkaHeaders = tempHeaders;
            }
          }

          if (partitionId != null) {
            record = new ProducerRecord<>(eventTopic, partitionId, timestamp, eventKey,
                serializeEvent(event, useAvroEventFormat), kafkaHeaders);
          } else {
            record = new ProducerRecord<>(eventTopic, null, timestamp, eventKey,
                serializeEvent(event, useAvroEventFormat), kafkaHeaders);
          }
          kafkaFutures.add(producer.send(record, new SinkCallback(startTime)));
        } catch (NumberFormatException ex) {
          throw new EventDeliveryException("Non integer partition id specified", ex);
        } catch (Exception ex) {
          // N.B. The producer.send() method throws all sorts of RuntimeExceptions
          // Catching Exception here to wrap them neatly in an EventDeliveryException
          // which is what our consumers will expect
          throw new EventDeliveryException("Could not send event", ex);
        }
      }

      if (useKafkaTransactions) {
        producer.commitTransaction();
      } else {
        //Prevent linger.ms from holding the batch
        producer.flush();
        for (Future<RecordMetadata> future : kafkaFutures) {
          future.get();
        }
      }
      // publish batch and commit.
      if (processedEvents > 0) {
        long endTime = System.nanoTime();
        counter.addToKafkaEventSendTimer((endTime - batchStartTime) / (1000 * 1000));
        counter.addToEventDrainSuccessCount(processedEvents);
      }

      transaction.commit();

    } catch (Exception ex) {
      String errorMsg = "Failed to publish events";
      logger.error("Failed to publish events", ex);
      counter.incrementEventWriteOrChannelFail(ex);
      if (transaction != null) {
        try {
          kafkaFutures.clear();
          try {
            if (useKafkaTransactions) {
              producer.abortTransaction();
            }
          } catch (ProducerFencedException e) {
            logger.error("Could not rollback transaction as producer fenced", e);
          } finally {
            transaction.rollback();
            counter.incrementRollbackCount();
          }
        } catch (Exception e) {
          logger.error("Transaction rollback failed", e);
          throw Throwables.propagate(e);
        }
      }
      throw new EventDeliveryException(errorMsg, ex);
    } finally {
      if (transaction != null) {
        transaction.close();
      }
    }

    return result;
  }