phoenix5-flume/src/it/java/org/apache/phoenix/flume/JsonEventSerializerIT.java [251:283]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
	public void testKeyGenerator() throws EventDeliveryException, SQLException {

		final String fullTableName = "FLUME_JSON_TEST";
		initSinkContextWithDefaults(fullTableName);

		sink = new PhoenixSink();
		Configurables.configure(sink, sinkContext);

		assertEquals(LifecycleState.IDLE, sink.getLifecycleState());

		final Channel channel = this.initChannel();
		sink.setChannel(channel);

		sink.start();
		final String eventBody = "{\"col1\" : \"kalyan\", \"col2\" : 10.5, \"col3\" : [\"abc\",\"pqr\",\"xyz\"], \"col4\" : [1,2,3,4]}";
		final Event event = EventBuilder.withBody(Bytes.toBytes(eventBody));
		// put event in channel
		Transaction transaction = channel.getTransaction();
		transaction.begin();
		channel.put(event);
		transaction.commit();
		transaction.close();

		sink.process();

		int rowsInDb = countRows(fullTableName);
		assertEquals(1, rowsInDb);

		sink.stop();
		assertEquals(LifecycleState.STOP, sink.getLifecycleState());

		dropTable(fullTableName);
	}
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



phoenix5-flume/src/it/java/org/apache/phoenix/flume/CsvEventSerializerIT.java [123:155]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
	public void testKeyGenerator() throws EventDeliveryException, SQLException {

		final String fullTableName = "FLUME_CSV_TEST";
		initSinkContextWithDefaults(fullTableName);

		sink = new PhoenixSink();
		Configurables.configure(sink, sinkContext);

		assertEquals(LifecycleState.IDLE, sink.getLifecycleState());

		final Channel channel = this.initChannel();
		sink.setChannel(channel);

		sink.start();
		final String eventBody = "kalyan,10.5,\"abc,pqr,xyz\",\"1,2,3,4\"";
		final Event event = EventBuilder.withBody(Bytes.toBytes(eventBody));
		// put event in channel
		Transaction transaction = channel.getTransaction();
		transaction.begin();
		channel.put(event);
		transaction.commit();
		transaction.close();

		sink.process();

		int rowsInDb = countRows(fullTableName);
		assertEquals(1, rowsInDb);

		sink.stop();
		assertEquals(LifecycleState.STOP, sink.getLifecycleState());

		dropTable(fullTableName);
	}
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



