in pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java [38:79]
public void bindValue(PreparedStatement statement,
Record<GenericRecord> message, String action) throws Exception {
GenericRecord record = message.getValue();
List<ColumnId> columns = Lists.newArrayList();
if (action == null || action.equals(INSERT)) {
columns = tableDefinition.getColumns();
} else if (action.equals(DELETE)){
columns.addAll(tableDefinition.getKeyColumns());
} else if (action.equals(UPDATE)){
columns.addAll(tableDefinition.getNonKeyColumns());
columns.addAll(tableDefinition.getKeyColumns());
}
int index = 1;
for (ColumnId columnId : columns) {
String colName = columnId.getName();
int colType = columnId.getType();
if (log.isDebugEnabled()) {
log.debug("colName: {} colType: {}", colName, colType);
}
try {
Object obj = record.getField(colName);
if (obj != null) {
setColumnValue(statement, index++, obj);
} else {
if (log.isDebugEnabled()) {
log.debug("Column {} is null", colName);
}
setColumnNull(statement, index++, colType);
}
} catch (NullPointerException e) {
// With JSON schema field is omitted, so get NPE
// In this case we want to set column to Null
if (log.isDebugEnabled()) {
log.debug("Column {} is null", colName);
}
setColumnNull(statement, index++, colType);
}
}
}