private void setPrepareStatementValue()

in eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalFullProducer.java [315:392]


    private void setPrepareStatementValue(PreparedStatement statement) throws SQLException {
        String colName = choosePrimaryKey.get();
        if (colName == null) {
            return;
        }
        RdbColumnDefinition columnDefinition = tableDefinition.getColumnDefinitions().get(colName);
        Object value = tableFullPosition.getCurPrimaryKeyCols().get(colName);
        String str;
        switch (columnDefinition.getJdbcType()) {
            case BIT:
            case TINYINT:
            case SMALLINT:
            case INTEGER:
            case BIGINT:
                statement.setBigDecimal(1, new BigDecimal(String.valueOf(value)));
                break;
            case DECIMAL:
            case FLOAT:
            case DOUBLE:
            case NUMERIC:
                statement.setDouble(1, new BigDecimal(String.valueOf(value)).doubleValue());
                break;
            case CHAR:
            case VARCHAR:
            case LONGNVARCHAR:
            case NCHAR:
            case NVARCHAR:
            case LONGVARCHAR:
            case CLOB:
            case NCLOB:
                statement.setString(1, String.valueOf(value));
                break;
            case BLOB:
            case VARBINARY:
            case BINARY:
                str = String.valueOf(value);
                String hexStr = str;
                if (str.startsWith("0x")) {
                    hexStr = str.substring(str.indexOf("0x"));
                }
                byte[] bytes = SqlUtils.hex2bytes(hexStr);
                statement.setBytes(1, bytes);
                break;
            case DATE:
                Instant d;
                if (value instanceof Long) {
                    Long val = (Long) value;
                    d = Instant.ofEpochMilli(val);
                    str = d.atZone(ZoneId.systemDefault()).toLocalDateTime().format(DATE_FORMATTER);
                } else if (value instanceof Integer) {
                    Integer val = (Integer) value;
                    d = Instant.ofEpochMilli((long) val);
                    str = d.atZone(ZoneId.systemDefault()).toLocalDateTime().format(DATE_FORMATTER);
                } else if (value instanceof String) {
                    str = (String) value;
                } else {
                    if (!(value instanceof LocalDate)) {
                        throw new IllegalArgumentException("unsupported date class type:" + value.getClass().getSimpleName());
                    }
                    str = ((LocalDate) value).format(DATE_FORMATTER);
                }
                statement.setString(1, str);
                break;
            case TIMESTAMP:
                if (value instanceof String) {
                    str = (String) value;
                } else {
                    if (!(value instanceof LocalDateTime)) {
                        throw new IllegalArgumentException("unsupported timestamp class type:" + value.getClass().getSimpleName());
                    }
                    str = ((LocalDateTime) value).format(DATE_STAMP_FORMATTER);
                }
                statement.setString(1, str);
                break;
            default:
                throw new EventMeshException(String.format("not support the primary key type [%s]", value.getClass()));
        }
    }