public void bindValue()

in pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java [38:79]


    public void bindValue(PreparedStatement statement,
                          Record<GenericRecord> message, String action) throws Exception {

        GenericRecord record = message.getValue();
        List<ColumnId> columns = Lists.newArrayList();
        if (action == null || action.equals(INSERT)) {
            columns = tableDefinition.getColumns();
        } else if (action.equals(DELETE)){
            columns.addAll(tableDefinition.getKeyColumns());
        } else if (action.equals(UPDATE)){
            columns.addAll(tableDefinition.getNonKeyColumns());
            columns.addAll(tableDefinition.getKeyColumns());
        }

        int index = 1;
        for (ColumnId columnId : columns) {
            String colName = columnId.getName();
            int colType = columnId.getType();
            if (log.isDebugEnabled()) {
                log.debug("colName: {} colType: {}", colName, colType);
            }
            try {
                Object obj = record.getField(colName);
                if (obj != null) {
                    setColumnValue(statement, index++, obj);
                } else {
                    if (log.isDebugEnabled()) {
                        log.debug("Column {} is null", colName);
                    }
                    setColumnNull(statement, index++, colType);
                }
            } catch (NullPointerException e) {
                // With JSON schema field is omitted, so get NPE
                // In this case we want to set column to Null
                if (log.isDebugEnabled()) {
                    log.debug("Column {} is null", colName);
                }
                setColumnNull(statement, index++, colType);
            }

        }
    }