in pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java [112:132]
private void initStatement() throws Exception {
List<String> keyList = Lists.newArrayList();
String key = jdbcSinkConfig.getKey();
if (key !=null && !key.isEmpty()) {
keyList = Arrays.asList(key.split(","));
}
List<String> nonKeyList = Lists.newArrayList();
String nonKey = jdbcSinkConfig.getNonKey();
if (nonKey != null && !nonKey.isEmpty()) {
nonKeyList = Arrays.asList(nonKey.split(","));
}
tableDefinition = JdbcUtils.getTableDefinition(connection, tableId, keyList, nonKeyList);
insertStatement = JdbcUtils.buildInsertStatement(connection, JdbcUtils.buildInsertSql(tableDefinition));
if (!nonKeyList.isEmpty()) {
updateStatement = JdbcUtils.buildUpdateStatement(connection, JdbcUtils.buildUpdateSql(tableDefinition));
}
if (!keyList.isEmpty()) {
deleteStatement = JdbcUtils.buildDeleteStatement(connection, JdbcUtils.buildDeleteSql(tableDefinition));
}
}