public void testEventsWithHeaders()

in phoenix5-flume/src/it/java/org/apache/phoenix/flume/RegexEventSerializerIT.java [300:374]


    public void testEventsWithHeaders() throws Exception {
        
        sinkContext = new Context ();
        final String fullTableName = generateUniqueName();
        final String ddl = "CREATE TABLE " + fullTableName +
                "  (rowkey VARCHAR not null, col1 varchar , cf1.col2 varchar , host varchar , source varchar \n" +
                "  CONSTRAINT pk PRIMARY KEY (rowkey))\n";
       
        sinkContext.put(FlumeConstants.CONFIG_TABLE, fullTableName);
        sinkContext.put(FlumeConstants.CONFIG_JDBC_URL, getUrl());
        sinkContext.put(FlumeConstants.CONFIG_SERIALIZER,EventSerializers.REGEX.name());
        sinkContext.put(FlumeConstants.CONFIG_TABLE_DDL, ddl);
        sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CONFIG_REGULAR_EXPRESSION,"^([^\t]+)\t([^\t]+)$");
        sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CONFIG_COLUMN_NAMES,"col1,cf1.col2");
        sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CONFIG_HEADER_NAMES,"host,source");
        sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR,DefaultKeyGenerator.UUID.name());       
        
        sink = new PhoenixSink();
        Configurables.configure(sink, sinkContext);
        assertEquals(LifecycleState.IDLE, sink.getLifecycleState());
      
        final Channel channel = this.initChannel();
        sink.setChannel(channel);
        
        sink.start();
        
        int numEvents = 10;
        String col1 = "val1";
        String col2 = "val2";
        String hostHeader = "host1";
        String sourceHeader = "source1";
        String eventBody = null;
        List<Event> eventList = new ArrayList<>(numEvents);
        for(int i = 0 ; i < numEvents ; i++) {
            eventBody = (col1 + i) + "\t" + (col2 + i);
            Map<String, String> headerMap = new HashMap<>(2);
            headerMap.put("host",hostHeader);
            headerMap.put("source",sourceHeader);
            Event event = EventBuilder.withBody(Bytes.toBytes(eventBody),headerMap);
            eventList.add(event);
        }
       
        // put event in channel
        Transaction transaction = channel.getTransaction();
        transaction.begin();
        for(Event event : eventList) {
            channel.put(event);
        }
        transaction.commit();
        transaction.close();
        
        sink.process();
   
        final String query = " SELECT * FROM \n " + fullTableName;
        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
        final ResultSet rs ;
        final Connection conn = DriverManager.getConnection(getUrl(), props);
        try{
            rs = conn.createStatement().executeQuery(query);
            assertTrue(rs.next());
            assertEquals("host1",rs.getString("host"));
            assertEquals("source1",rs.getString("source"));
            
            assertTrue(rs.next());
            assertEquals("host1",rs.getString("host"));
            assertEquals("source1",rs.getString("source")); 
        }finally {
            if(conn != null) {
                conn.close();
            }
        }
        sink.stop();
        assertEquals(LifecycleState.STOP, sink.getLifecycleState());
        
    }