protected void doPut()

in flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java [331:365]


    protected void doPut(Event event) throws InterruptedException {
      type = TransactionType.PUT;
      if (!producerRecords.isPresent()) {
        producerRecords = Optional.of(new LinkedList<ProducerRecord<String, byte[]>>());
      }
      String key = event.getHeaders().get(KEY_HEADER);

      Integer partitionId = null;
      try {
        if (staticPartitionId != null) {
          partitionId = staticPartitionId;
        }
        // Allow a specified header to override a static ID
        if (partitionHeader != null) {
          String headerVal = event.getHeaders().get(partitionHeader);
          if (headerVal != null) {
            partitionId = Integer.parseInt(headerVal);
          }
        }
        if (partitionId != null) {
          producerRecords.get().add(
              new ProducerRecord<String, byte[]>(topic.get(), partitionId, key,
                                                 serializeValue(event, parseAsFlumeEvent)));
        } else {
          producerRecords.get().add(
              new ProducerRecord<String, byte[]>(topic.get(), key,
                                                 serializeValue(event, parseAsFlumeEvent)));
        }
        counter.incrementEventPutAttemptCount();
      } catch (NumberFormatException e) {
        throw new ChannelException("Non integer partition id specified", e);
      } catch (Exception e) {
        throw new ChannelException("Error while serializing event", e);
      }
    }