in kafka-sink/src/main/java/com/amazonaws/hbase/datasink/KafkaDataSinkImpl.java [70:104]
public void putRecord(ByteBuffer buffer, String tablename) throws Exception {
this.producer = KafkaProducerFactory.getProducer(this.getConfigurationUtil().getConfigurationProperties());
long time = System.currentTimeMillis();
Long index = time + sentMessageCount++;
final ProducerRecord<String, ByteBuffer> record = new ProducerRecord<String, ByteBuffer>(
this.getConfigurationUtil().getTopicFromTableName(tablename),
new Long(index).toString(),
buffer);
try {
RecordMetadata metadata = (RecordMetadata) producer.send(record).get(); // This will block
long elapsedTime = System.currentTimeMillis() - time;
LOG.debug(
"sent record(key=%s ) " + "meta(partition=%d, offset=%d) time=%d\n",
record.key(),
record.value(),
metadata.partition(),
metadata.offset(),
elapsedTime);
} catch (ExecutionException e) {
LOG.error("Error in sending record" ,e);
if (e.getCause().getClass() == TimeoutException.class) {
LOG.error("Seems {} does not exist. checking ..", record.topic());
Properties config = new Properties();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, this.configUtil.getBootstrapServers());
KafkaAdminUtil adminUtil = new KafkaAdminUtil(config);
if (adminUtil.listTopics().contains(record.topic()) == false) {
if (this.configUtil.createTopicIfNotFound() == true) {
adminUtil.createTopic(record.topic(),
this.configUtil.getTopicPartitions(),
this.configUtil.getTopicReplicationFactor());
}
}
}
}
}