in phoenix5-flume/src/main/java/org/apache/phoenix/flume/serializer/BaseEventSerializer.java [136:209]
public void initialize() throws SQLException {
final Properties props = new Properties();
props.setProperty(UPSERT_BATCH_SIZE_ATTRIB, String.valueOf(this.batchSize));
ResultSet rs = null;
try {
this.connection = DriverManager.getConnection(this.jdbcUrl, props);
this.connection.setAutoCommit(false);
if(this.createTableDdl != null) {
SchemaHandler.createTable(connection,createTableDdl);
}
final Map<String,Integer> qualifiedColumnMap = new LinkedHashMap<>();
final Map<String,Integer> unqualifiedColumnMap = new LinkedHashMap<>();
final String schemaName = SchemaUtil.getSchemaNameFromFullName(fullTableName);
final String tableName = SchemaUtil.getTableNameFromFullName(fullTableName);
String rowkey = null;
String cq = null;
String cf = null;
Integer dt = null;
rs = connection.getMetaData().getColumns("", StringUtil.escapeLike(SchemaUtil.normalizeIdentifier(schemaName)), StringUtil.escapeLike(SchemaUtil.normalizeIdentifier(tableName)), null);
while (rs.next()) {
cf = rs.getString(QueryUtil.COLUMN_FAMILY_POSITION);
cq = rs.getString(QueryUtil.COLUMN_NAME_POSITION);
// TODO: Fix this .. change `DATA_TYPE_POSITION` value 5 to 26
// dt = rs.getInt(QueryUtil.DATA_TYPE_POSITION);
dt = rs.getInt(26);
if(cf == null || cf.isEmpty()) {
rowkey = cq; // this is required only when row key is auto generated
} else {
qualifiedColumnMap.put(SchemaUtil.getColumnDisplayName(cf, cq), dt);
}
unqualifiedColumnMap.put(SchemaUtil.getColumnDisplayName(null, cq), dt);
}
//can happen when table not found in Hbase.
if(unqualifiedColumnMap.isEmpty()) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.TABLE_UNDEFINED)
.setTableName(tableName).build().buildException();
}
int colSize = colNames.size();
int headersSize = headers.size();
int totalSize = colSize + headersSize + ( autoGenerateKey ? 1 : 0);
columnMetadata = new ColumnInfo[totalSize] ;
int position = 0;
position = this.addToColumnMetadataInfo(colNames, qualifiedColumnMap, unqualifiedColumnMap, position);
position = this.addToColumnMetadataInfo(headers, qualifiedColumnMap, unqualifiedColumnMap, position);
if(autoGenerateKey) {
Integer sqlType = unqualifiedColumnMap.get(rowkey);
if (sqlType == null) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.PRIMARY_KEY_MISSING)
.setColumnName(rowkey).setTableName(fullTableName).build().buildException();
}
columnMetadata[position] = new ColumnInfo(rowkey, sqlType);
position++;
}
this.upsertStatement = QueryUtil.constructUpsertStatement(fullTableName, Arrays.asList(columnMetadata));
logger.info(" the upsert statement is {} " ,this.upsertStatement);
} catch (SQLException e) {
logger.error("error {} occurred during initializing connection ",e.getMessage());
throw e;
} finally {
if(rs != null) {
rs.close();
}
}
doInitialize();
}