in phoenix5-kafka/src/it/java/org/apache/phoenix/kafka/PhoenixConsumerIT.java [155:210]
public void testPhoenixConsumerWithProperties() throws SQLException {
final String fullTableName = "SAMPLE2";
final String ddl = "CREATE TABLE IF NOT EXISTS SAMPLE2(uid VARCHAR NOT NULL,c1 VARCHAR,c2 VARCHAR,c3 VARCHAR CONSTRAINT pk PRIMARY KEY(uid))\n";
Properties consumerProperties = new Properties();
consumerProperties.setProperty(FlumeConstants.CONFIG_TABLE, fullTableName);
consumerProperties.setProperty(FlumeConstants.CONFIG_JDBC_URL, getUrl());
consumerProperties.setProperty(FlumeConstants.CONFIG_SERIALIZER,EventSerializers.REGEX.name());
consumerProperties.setProperty(FlumeConstants.CONFIG_TABLE_DDL, ddl);
consumerProperties.setProperty(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CONFIG_REGULAR_EXPRESSION,"([^\\,]*),([^\\,]*),([^\\,]*)");
consumerProperties.setProperty(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CONFIG_COLUMN_NAMES,"c1,c2,c3");
consumerProperties.setProperty(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR, DefaultKeyGenerator.UUID.name());
consumerProperties.setProperty(KafkaConstants.BOOTSTRAP_SERVERS, "localhost:9092");
consumerProperties.setProperty(KafkaConstants.TOPICS, "topic1,topic2");
consumerProperties.setProperty(KafkaConstants.TIMEOUT, "100");
PhoenixConsumerThread pConsumerThread = new PhoenixConsumerThread(pConsumer, consumerProperties);
Thread phoenixConsumer = new Thread(pConsumerThread);
Properties producerProperties = new Properties();
producerProperties.setProperty(KafkaConstants.BOOTSTRAP_SERVERS, "localhost:9092");
producerProperties.setProperty(KafkaConstants.KEY_SERIALIZER, KafkaConstants.DEFAULT_KEY_SERIALIZER);
producerProperties.setProperty(KafkaConstants.VALUE_SERIALIZER, KafkaConstants.DEFAULT_VALUE_SERIALIZER);
producerProperties.setProperty("auto.commit.interval.ms", "1000");
KafkaProducerThread kProducerThread = new KafkaProducerThread(producerProperties, TOPIC);
Thread kafkaProducer = new Thread(kProducerThread);
phoenixConsumer.start();
try {
phoenixConsumer.join(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
kafkaProducer.start();
try {
kafkaProducer.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
if (!kafkaProducer.isAlive()) {
System.out.println("kafka producer is not alive");
pConsumer.stop();
}
// Verify our serializer wrote out data
ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM SAMPLE2");
assertTrue(rs.next());
assertTrue(rs.getFetchSize() > 0);
rs.close();
}