public void testPhoenixConsumerWithProperties()

in phoenix5-kafka/src/it/java/org/apache/phoenix/kafka/PhoenixConsumerIT.java [155:210]


    public void testPhoenixConsumerWithProperties() throws SQLException {
        
        final String fullTableName = "SAMPLE2";
        final String ddl = "CREATE TABLE IF NOT EXISTS SAMPLE2(uid VARCHAR NOT NULL,c1 VARCHAR,c2 VARCHAR,c3 VARCHAR CONSTRAINT pk PRIMARY KEY(uid))\n";
        
        Properties consumerProperties = new Properties();
        consumerProperties.setProperty(FlumeConstants.CONFIG_TABLE, fullTableName);
        consumerProperties.setProperty(FlumeConstants.CONFIG_JDBC_URL, getUrl());
        consumerProperties.setProperty(FlumeConstants.CONFIG_SERIALIZER,EventSerializers.REGEX.name());
        consumerProperties.setProperty(FlumeConstants.CONFIG_TABLE_DDL, ddl);
        consumerProperties.setProperty(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CONFIG_REGULAR_EXPRESSION,"([^\\,]*),([^\\,]*),([^\\,]*)");
        consumerProperties.setProperty(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CONFIG_COLUMN_NAMES,"c1,c2,c3");
        consumerProperties.setProperty(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR, DefaultKeyGenerator.UUID.name());
        consumerProperties.setProperty(KafkaConstants.BOOTSTRAP_SERVERS, "localhost:9092");
        consumerProperties.setProperty(KafkaConstants.TOPICS, "topic1,topic2");
        consumerProperties.setProperty(KafkaConstants.TIMEOUT, "100");
        
        PhoenixConsumerThread pConsumerThread = new PhoenixConsumerThread(pConsumer, consumerProperties);
        Thread phoenixConsumer = new Thread(pConsumerThread);

        Properties producerProperties = new Properties();
        producerProperties.setProperty(KafkaConstants.BOOTSTRAP_SERVERS, "localhost:9092");
        producerProperties.setProperty(KafkaConstants.KEY_SERIALIZER, KafkaConstants.DEFAULT_KEY_SERIALIZER);
        producerProperties.setProperty(KafkaConstants.VALUE_SERIALIZER, KafkaConstants.DEFAULT_VALUE_SERIALIZER);
        producerProperties.setProperty("auto.commit.interval.ms", "1000");
        
        KafkaProducerThread kProducerThread = new KafkaProducerThread(producerProperties, TOPIC);
        Thread kafkaProducer = new Thread(kProducerThread);

        phoenixConsumer.start();

        try {
            phoenixConsumer.join(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        kafkaProducer.start();

        try {
            kafkaProducer.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        if (!kafkaProducer.isAlive()) {
            System.out.println("kafka producer is not alive");
            pConsumer.stop();
        }
        
        // Verify our serializer wrote out data
        ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM SAMPLE2");
        assertTrue(rs.next());
        assertTrue(rs.getFetchSize() > 0);
        rs.close();
    }