public void configure()

in phoenix5-flume/src/main/java/org/apache/phoenix/flume/serializer/BaseEventSerializer.java [74:126]


    public void configure(Context context) {

        this.createTableDdl = context.getString(FlumeConstants.CONFIG_TABLE_DDL);
        this.fullTableName = context.getString(FlumeConstants.CONFIG_TABLE);
        final String zookeeperQuorum = context.getString(FlumeConstants.CONFIG_ZK_QUORUM);
        final String ipJdbcURL = context.getString(FlumeConstants.CONFIG_JDBC_URL);
        this.batchSize = context.getInteger(FlumeConstants.CONFIG_BATCHSIZE, FlumeConstants.DEFAULT_BATCH_SIZE);
        final String columnNames = context.getString(CONFIG_COLUMN_NAMES);
        final String headersStr = context.getString(CONFIG_HEADER_NAMES);
        final String keyGeneratorType = context.getString(CONFIG_ROWKEY_TYPE_GENERATOR);

        if (this.fullTableName == null) {
            throw new NullPointerException(
                "Table name cannot be empty, please specify in the configuration file");
        }
        if(zookeeperQuorum != null && !zookeeperQuorum.isEmpty()) {
            this.jdbcUrl = QueryUtil.getUrl(zookeeperQuorum);
        }
        if(ipJdbcURL != null && !ipJdbcURL.isEmpty()) {
            this.jdbcUrl = ipJdbcURL;
        }
        if (this.jdbcUrl == null) {
            throw new NullPointerException(
                "Please specify either the zookeeper quorum or the jdbc url in the configuration file");
        }
        if (columnNames == null) {
            throw new NullPointerException(
                "Column names cannot be empty, please specify in configuration file");
        }
        colNames.addAll(Arrays.asList(columnNames.split(DEFAULT_COLUMNS_DELIMITER)));

        if(headersStr != null && !headersStr.isEmpty()) {
            headers.addAll(Arrays.asList(headersStr.split(DEFAULT_COLUMNS_DELIMITER)));
        }

        if(keyGeneratorType != null && !keyGeneratorType.isEmpty()) {
            try {
                keyGenerator =  DefaultKeyGenerator.valueOf(keyGeneratorType.toUpperCase());
                this.autoGenerateKey = true;
            } catch(IllegalArgumentException iae) {
                logger.error("An invalid key generator {} was specified in configuration file. Specify one of {}",keyGeneratorType,DefaultKeyGenerator.values());
                throw new RuntimeException(iae);
            }
        }

        logger.debug(" the jdbcUrl configured is {}",jdbcUrl);
        logger.debug(" columns configured are {}",colNames.toString());
        logger.debug(" headers configured are {}",headersStr);
        logger.debug(" the keyGenerator configured is {} ",keyGeneratorType);

        doConfigure(context);

    }