phoenix5-flume/src/it/java/org/apache/phoenix/flume/CsvEventSerializerIT.java [305:359]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
			Map<String, String> headerMap = new HashMap<>(2);
			headerMap.put("host", hostHeader);
			headerMap.put("source", sourceHeader);
			Event event = EventBuilder.withBody(Bytes.toBytes(eventBody), headerMap);
			eventList.add(event);
		}

		// put event in channel
		Transaction transaction = channel.getTransaction();
		transaction.begin();
		for (Event event : eventList) {
			channel.put(event);
		}
		transaction.commit();
		transaction.close();

		sink.process();

		final String query = " SELECT * FROM \n " + fullTableName;
		Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
		final ResultSet rs;
		final Connection conn = DriverManager.getConnection(getUrl(), props);
		try {
			rs = conn.createStatement().executeQuery(query);
			assertTrue(rs.next());
			assertEquals("host1", rs.getString("host"));
			assertEquals("source1", rs.getString("source"));

			assertTrue(rs.next());
			assertEquals("host1", rs.getString("host"));
			assertEquals("source1", rs.getString("source"));
		} finally {
			if (conn != null) {
				conn.close();
			}
		}
		sink.stop();
		assertEquals(LifecycleState.STOP, sink.getLifecycleState());

		dropTable(fullTableName);
	}

	private Channel initChannel() {
		// Channel configuration
		Context channelContext = new Context();
		channelContext.put("capacity", "10000");
		channelContext.put("transactionCapacity", "200");

		Channel channel = new MemoryChannel();
		channel.setName("memorychannel");
		Configurables.configure(channel, channelContext);
		return channel;
	}

	private void initSinkContext(final String fullTableName, final String ddl, final String columns,
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



phoenix5-flume/src/it/java/org/apache/phoenix/flume/JsonEventSerializerIT.java [436:490]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
			Map<String, String> headerMap = new HashMap<>(2);
			headerMap.put("host", hostHeader);
			headerMap.put("source", sourceHeader);
			Event event = EventBuilder.withBody(Bytes.toBytes(eventBody), headerMap);
			eventList.add(event);
		}

		// put event in channel
		Transaction transaction = channel.getTransaction();
		transaction.begin();
		for (Event event : eventList) {
			channel.put(event);
		}
		transaction.commit();
		transaction.close();

		sink.process();

		final String query = " SELECT * FROM \n " + fullTableName;
		Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
		final ResultSet rs;
		final Connection conn = DriverManager.getConnection(getUrl(), props);
		try {
			rs = conn.createStatement().executeQuery(query);
			assertTrue(rs.next());
			assertEquals("host1", rs.getString("host"));
			assertEquals("source1", rs.getString("source"));

			assertTrue(rs.next());
			assertEquals("host1", rs.getString("host"));
			assertEquals("source1", rs.getString("source"));
		} finally {
			if (conn != null) {
				conn.close();
			}
		}
		sink.stop();
		assertEquals(LifecycleState.STOP, sink.getLifecycleState());

		dropTable(fullTableName);
	}

	private Channel initChannel() {
		// Channel configuration
		Context channelContext = new Context();
		channelContext.put("capacity", "10000");
		channelContext.put("transactionCapacity", "200");

		Channel channel = new MemoryChannel();
		channel.setName("memorychannel");
		Configurables.configure(channel, channelContext);
		return channel;
	}

	private void initSinkContext(final String fullTableName, final String ddl, final String columns,
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



