public void putRecord()

in kafka-sink/src/main/java/com/amazonaws/hbase/datasink/KafkaDataSinkImpl.java [70:104]


	public void putRecord(ByteBuffer buffer, String tablename) throws Exception {
			this.producer = KafkaProducerFactory.getProducer(this.getConfigurationUtil().getConfigurationProperties());
			long time = System.currentTimeMillis();
			Long index = time + sentMessageCount++;
			final ProducerRecord<String, ByteBuffer> record =  new ProducerRecord<String, ByteBuffer>(
					this.getConfigurationUtil().getTopicFromTableName(tablename), 
					new Long(index).toString(), 
					buffer);
			try {
				RecordMetadata metadata = (RecordMetadata) producer.send(record).get(); // This will block
				long elapsedTime = System.currentTimeMillis() - time;
				LOG.debug(
						"sent record(key=%s ) " +  "meta(partition=%d, offset=%d) time=%d\n",
						record.key(), 
						record.value(), 
						metadata.partition(),
						metadata.offset(), 
						elapsedTime);
			} catch (ExecutionException e) {
				LOG.error("Error in sending record" ,e);
				if (e.getCause().getClass() == TimeoutException.class) {
					LOG.error("Seems {} does not exist. checking ..", record.topic());
					Properties config = new Properties();
					config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, this.configUtil.getBootstrapServers());
					KafkaAdminUtil adminUtil = new KafkaAdminUtil(config); 
					if (adminUtil.listTopics().contains(record.topic()) == false) {
						if (this.configUtil.createTopicIfNotFound() == true) {
							adminUtil.createTopic(record.topic(),
										this.configUtil.getTopicPartitions(), 
										this.configUtil.getTopicReplicationFactor());
						}
					}
				}
			}
	}