phoenix5-flume/src/it/java/org/apache/phoenix/flume/JsonEventSerializerIT.java [321:352]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
	public void testMissingColumnsInEvent() throws EventDeliveryException, SQLException {

		final String fullTableName = "FLUME_JSON_TEST";
		initSinkContextWithDefaults(fullTableName);

		sink = new PhoenixSink();
		Configurables.configure(sink, sinkContext);
		assertEquals(LifecycleState.IDLE, sink.getLifecycleState());

		final Channel channel = this.initChannel();
		sink.setChannel(channel);

		sink.start();
		final String eventBody = "{\"col1\" : \"kalyan\", \"col3\" : [\"abc\",\"pqr\",\"xyz\"], \"col4\" : [1,2,3,4]}";
		final Event event = EventBuilder.withBody(Bytes.toBytes(eventBody));
		// put event in channel
		Transaction transaction = channel.getTransaction();
		transaction.begin();
		channel.put(event);
		transaction.commit();
		transaction.close();

		sink.process();

		int rowsInDb = countRows(fullTableName);
		assertEquals(0, rowsInDb);

		sink.stop();
		assertEquals(LifecycleState.STOP, sink.getLifecycleState());

		dropTable(fullTableName);
	}
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



phoenix5-flume/src/it/java/org/apache/phoenix/flume/CsvEventSerializerIT.java [193:224]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
	public void testMissingColumnsInEvent() throws EventDeliveryException, SQLException {

		final String fullTableName = "FLUME_CSV_TEST";
		initSinkContextWithDefaults(fullTableName);

		sink = new PhoenixSink();
		Configurables.configure(sink, sinkContext);
		assertEquals(LifecycleState.IDLE, sink.getLifecycleState());

		final Channel channel = this.initChannel();
		sink.setChannel(channel);

		sink.start();
		final String eventBody = "kalyan,\"abc,pqr,xyz\",\"1,2,3,4\"";
		final Event event = EventBuilder.withBody(Bytes.toBytes(eventBody));
		// put event in channel
		Transaction transaction = channel.getTransaction();
		transaction.begin();
		channel.put(event);
		transaction.commit();
		transaction.close();

		sink.process();

		int rowsInDb = countRows(fullTableName);
		assertEquals(0, rowsInDb);

		sink.stop();
		assertEquals(LifecycleState.STOP, sink.getLifecycleState());

		dropTable(fullTableName);
	}
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



