in pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java [55:97]
public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
kafkaSourceConfig = KafkaSourceConfig.load(config);
Objects.requireNonNull(kafkaSourceConfig.getTopic(), "Kafka topic is not set");
Objects.requireNonNull(kafkaSourceConfig.getBootstrapServers(), "Kafka bootstrapServers is not set");
Objects.requireNonNull(kafkaSourceConfig.getGroupId(), "Kafka consumer group id is not set");
if (kafkaSourceConfig.getFetchMinBytes() <= 0) {
throw new IllegalArgumentException("Invalid Kafka Consumer fetchMinBytes : "
+ kafkaSourceConfig.getFetchMinBytes());
}
if (kafkaSourceConfig.isAutoCommitEnabled() && kafkaSourceConfig.getAutoCommitIntervalMs() <= 0) {
throw new IllegalArgumentException("Invalid Kafka Consumer autoCommitIntervalMs : "
+ kafkaSourceConfig.getAutoCommitIntervalMs());
}
if (kafkaSourceConfig.getSessionTimeoutMs() <= 0) {
throw new IllegalArgumentException("Invalid Kafka Consumer sessionTimeoutMs : "
+ kafkaSourceConfig.getSessionTimeoutMs());
}
if (kafkaSourceConfig.getHeartbeatIntervalMs() <= 0) {
throw new IllegalArgumentException("Invalid Kafka Consumer heartbeatIntervalMs : "
+ kafkaSourceConfig.getHeartbeatIntervalMs());
}
Properties props = new Properties();
if (kafkaSourceConfig.getConsumerConfigProperties() != null) {
props.putAll(kafkaSourceConfig.getConsumerConfigProperties());
}
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaSourceConfig.getBootstrapServers());
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaSourceConfig.getGroupId());
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, String.valueOf(kafkaSourceConfig.getFetchMinBytes()));
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(kafkaSourceConfig.getAutoCommitIntervalMs()));
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, String.valueOf(kafkaSourceConfig.getSessionTimeoutMs()));
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, String.valueOf(kafkaSourceConfig.getHeartbeatIntervalMs()));
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaSourceConfig.getKeyDeserializationClass());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaSourceConfig.getValueDeserializationClass());
try {
consumer = new KafkaConsumer<>(beforeCreateConsumer(props));
} catch (Exception ex) {
throw new IllegalArgumentException("Unable to instantiate Kafka consumer", ex);
}
this.start();
running = true;
}