in phoenix5-flume/src/it/java/org/apache/phoenix/flume/CsvEventSerializerIT.java [359:385]
private void initSinkContext(final String fullTableName, final String ddl, final String columns,
final String csvDelimiter, final String csvQuote, final String csvEscape, final String csvArrayDelimiter,
final String rowkeyType, final String headers) {
if (fullTableName == null){
throw new NullPointerException();
}
sinkContext = new Context();
sinkContext.put(FlumeConstants.CONFIG_TABLE, fullTableName);
sinkContext.put(FlumeConstants.CONFIG_JDBC_URL, getUrl());
sinkContext.put(FlumeConstants.CONFIG_SERIALIZER, EventSerializers.CSV.name());
sinkContext.put(FlumeConstants.CONFIG_TABLE_DDL, ddl);
sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CONFIG_COLUMN_NAMES, columns);
if (null != csvDelimiter)
sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CSV_DELIMITER, csvDelimiter);
if (null != csvQuote)
sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CSV_QUOTE, csvQuote);
if (null != csvEscape)
sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CSV_ESCAPE, csvEscape);
if (null != csvArrayDelimiter)
sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CSV_ARRAY_DELIMITER,
csvArrayDelimiter);
if (null != rowkeyType)
sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CONFIG_ROWKEY_TYPE_GENERATOR,
rowkeyType);
if (null != headers)
sinkContext.put(FlumeConstants.CONFIG_SERIALIZER_PREFIX + FlumeConstants.CONFIG_HEADER_NAMES, headers);
}