public void testEventsWithHeaders()

in phoenix5-flume/src/it/java/org/apache/phoenix/flume/JsonEventSerializerIT.java [402:476]


	public void testEventsWithHeaders() throws Exception {

		sinkContext = new Context();
		final String fullTableName = "FLUME_JSON_TEST";
		final String ddl = "CREATE TABLE IF NOT EXISTS "
				+ fullTableName
				+ "  (rowkey VARCHAR not null, col1 varchar , col2 double, col3 varchar[], col4 integer[], host varchar , source varchar \n"
				+ "  CONSTRAINT pk PRIMARY KEY (rowkey))\n";
		String columns = "col1,col2,col3,col4";
		String columnsMapping = "{\"col1\":\"col1\",\"col2\":\"col2\",\"col3\":\"col3\",\"col4\":\"col4\"}";
		String rowkeyType = DefaultKeyGenerator.UUID.name();
		String headers = "host,source";
		initSinkContext(fullTableName, ddl, columns, columnsMapping, rowkeyType, headers);

		sink = new PhoenixSink();
		Configurables.configure(sink, sinkContext);
		assertEquals(LifecycleState.IDLE, sink.getLifecycleState());

		final Channel channel = this.initChannel();
		sink.setChannel(channel);

		sink.start();

		int numEvents = 10;
		String col1 = "val1";
		String a1 = "[aaa,bbb,ccc]";
		String a2 = "[1,2,3,4]";
		String hostHeader = "host1";
		String sourceHeader = "source1";
		String eventBody = null;
		List<Event> eventList = new ArrayList<>(numEvents);
		for (int i = 0; i < numEvents; i++) {
			eventBody = "{\"col1\" : \"" + (col1 + i) + "\", \"col2\" : " + i * 10.5 + " , \"col3\" : " + a1
					+ " , \"col4\" : " + a2 + "}";
			Map<String, String> headerMap = new HashMap<>(2);
			headerMap.put("host", hostHeader);
			headerMap.put("source", sourceHeader);
			Event event = EventBuilder.withBody(Bytes.toBytes(eventBody), headerMap);
			eventList.add(event);
		}

		// put event in channel
		Transaction transaction = channel.getTransaction();
		transaction.begin();
		for (Event event : eventList) {
			channel.put(event);
		}
		transaction.commit();
		transaction.close();

		sink.process();

		final String query = " SELECT * FROM \n " + fullTableName;
		Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
		final ResultSet rs;
		final Connection conn = DriverManager.getConnection(getUrl(), props);
		try {
			rs = conn.createStatement().executeQuery(query);
			assertTrue(rs.next());
			assertEquals("host1", rs.getString("host"));
			assertEquals("source1", rs.getString("source"));

			assertTrue(rs.next());
			assertEquals("host1", rs.getString("host"));
			assertEquals("source1", rs.getString("source"));
		} finally {
			if (conn != null) {
				conn.close();
			}
		}
		sink.stop();
		assertEquals(LifecycleState.STOP, sink.getLifecycleState());

		dropTable(fullTableName);
	}