in phoenix5-flume/src/main/java/org/apache/phoenix/flume/serializer/BaseEventSerializer.java [74:126]
public void configure(Context context) {
this.createTableDdl = context.getString(FlumeConstants.CONFIG_TABLE_DDL);
this.fullTableName = context.getString(FlumeConstants.CONFIG_TABLE);
final String zookeeperQuorum = context.getString(FlumeConstants.CONFIG_ZK_QUORUM);
final String ipJdbcURL = context.getString(FlumeConstants.CONFIG_JDBC_URL);
this.batchSize = context.getInteger(FlumeConstants.CONFIG_BATCHSIZE, FlumeConstants.DEFAULT_BATCH_SIZE);
final String columnNames = context.getString(CONFIG_COLUMN_NAMES);
final String headersStr = context.getString(CONFIG_HEADER_NAMES);
final String keyGeneratorType = context.getString(CONFIG_ROWKEY_TYPE_GENERATOR);
if (this.fullTableName == null) {
throw new NullPointerException(
"Table name cannot be empty, please specify in the configuration file");
}
if(zookeeperQuorum != null && !zookeeperQuorum.isEmpty()) {
this.jdbcUrl = QueryUtil.getUrl(zookeeperQuorum);
}
if(ipJdbcURL != null && !ipJdbcURL.isEmpty()) {
this.jdbcUrl = ipJdbcURL;
}
if (this.jdbcUrl == null) {
throw new NullPointerException(
"Please specify either the zookeeper quorum or the jdbc url in the configuration file");
}
if (columnNames == null) {
throw new NullPointerException(
"Column names cannot be empty, please specify in configuration file");
}
colNames.addAll(Arrays.asList(columnNames.split(DEFAULT_COLUMNS_DELIMITER)));
if(headersStr != null && !headersStr.isEmpty()) {
headers.addAll(Arrays.asList(headersStr.split(DEFAULT_COLUMNS_DELIMITER)));
}
if(keyGeneratorType != null && !keyGeneratorType.isEmpty()) {
try {
keyGenerator = DefaultKeyGenerator.valueOf(keyGeneratorType.toUpperCase());
this.autoGenerateKey = true;
} catch(IllegalArgumentException iae) {
logger.error("An invalid key generator {} was specified in configuration file. Specify one of {}",keyGeneratorType,DefaultKeyGenerator.values());
throw new RuntimeException(iae);
}
}
logger.debug(" the jdbcUrl configured is {}",jdbcUrl);
logger.debug(" columns configured are {}",colNames.toString());
logger.debug(" headers configured are {}",headersStr);
logger.debug(" the keyGenerator configured is {} ",keyGeneratorType);
doConfigure(context);
}