public void initialize()

in phoenix5-flume/src/main/java/org/apache/phoenix/flume/serializer/BaseEventSerializer.java [136:209]


    public void initialize() throws SQLException {
        final Properties props = new Properties();
        props.setProperty(UPSERT_BATCH_SIZE_ATTRIB, String.valueOf(this.batchSize));
        ResultSet rs = null;
        try {
            this.connection = DriverManager.getConnection(this.jdbcUrl, props);
            this.connection.setAutoCommit(false);
            if(this.createTableDdl != null) {
                SchemaHandler.createTable(connection,createTableDdl);
            }


            final Map<String,Integer> qualifiedColumnMap = new LinkedHashMap<>();
            final Map<String,Integer> unqualifiedColumnMap = new LinkedHashMap<>();
            final String schemaName = SchemaUtil.getSchemaNameFromFullName(fullTableName);
            final String tableName  = SchemaUtil.getTableNameFromFullName(fullTableName);

            String rowkey = null;
            String  cq = null;
            String  cf = null;
            Integer dt = null;
            rs = connection.getMetaData().getColumns("", StringUtil.escapeLike(SchemaUtil.normalizeIdentifier(schemaName)), StringUtil.escapeLike(SchemaUtil.normalizeIdentifier(tableName)), null);
            while (rs.next()) {
                cf = rs.getString(QueryUtil.COLUMN_FAMILY_POSITION);
                cq = rs.getString(QueryUtil.COLUMN_NAME_POSITION);
                // TODO: Fix this .. change `DATA_TYPE_POSITION` value 5 to 26
                // dt = rs.getInt(QueryUtil.DATA_TYPE_POSITION);
                dt = rs.getInt(26);
                if(cf == null || cf.isEmpty()) {
                    rowkey = cq; // this is required only when row key is auto generated
                } else {
                    qualifiedColumnMap.put(SchemaUtil.getColumnDisplayName(cf, cq), dt);
                }
                unqualifiedColumnMap.put(SchemaUtil.getColumnDisplayName(null, cq), dt);
            }

            //can happen when table not found in Hbase.
            if(unqualifiedColumnMap.isEmpty()) {
                throw new SQLExceptionInfo.Builder(SQLExceptionCode.TABLE_UNDEFINED)
                    .setTableName(tableName).build().buildException();
            }

            int colSize = colNames.size();
            int headersSize = headers.size();
            int totalSize = colSize + headersSize + ( autoGenerateKey ? 1 : 0);
            columnMetadata = new ColumnInfo[totalSize] ;

            int position = 0;
            position = this.addToColumnMetadataInfo(colNames, qualifiedColumnMap, unqualifiedColumnMap, position);
            position = this.addToColumnMetadataInfo(headers,  qualifiedColumnMap, unqualifiedColumnMap, position);

            if(autoGenerateKey) {
                Integer sqlType = unqualifiedColumnMap.get(rowkey);
                if (sqlType == null) {
                    throw new SQLExceptionInfo.Builder(SQLExceptionCode.PRIMARY_KEY_MISSING)
                        .setColumnName(rowkey).setTableName(fullTableName).build().buildException();
                }
                columnMetadata[position] = new ColumnInfo(rowkey, sqlType);
                position++;
            }

            this.upsertStatement = QueryUtil.constructUpsertStatement(fullTableName, Arrays.asList(columnMetadata));
            logger.info(" the upsert statement is {} " ,this.upsertStatement);

        }  catch (SQLException e) {
            logger.error("error {} occurred during initializing connection ",e.getMessage());
            throw e;
        } finally {
            if(rs != null) {
                rs.close();
            }
        }
        doInitialize();
    }