public void writeColumn()

in eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalFullConsumer.java [189:363]


    public void writeColumn(PreparedStatement ps, int index, MySQLColumnDef colType, Object value) throws Exception {
        if (colType == null) {
            String colVal = null;
            if (value != null) {
                colVal = value.toString();
            }
            if (colVal == null) {
                ps.setNull(index, Types.VARCHAR);
            } else {
                ps.setString(index, colVal);
            }
        } else if (value == null) {
            ps.setNull(index, colType.getJdbcType().getVendorTypeNumber());
        } else {
            switch (colType.getType()) {
                case TINYINT:
                case SMALLINT:
                case MEDIUMINT:
                case INT:
                    Long longValue = SqlUtils.toLong(value);
                    if (longValue == null) {
                        ps.setNull(index, 4);
                        return;
                    } else {
                        ps.setLong(index, longValue);
                        return;
                    }
                case BIGINT:
                case DECIMAL:
                    BigDecimal bigDecimalValue = SqlUtils.toBigDecimal(value);
                    if (bigDecimalValue == null) {
                        ps.setNull(index, 3);
                        return;
                    } else {
                        ps.setBigDecimal(index, bigDecimalValue);
                        return;
                    }
                case FLOAT:
                case DOUBLE:
                    Double doubleValue = SqlUtils.toDouble(value);
                    if (doubleValue == null) {
                        ps.setNull(index, 8);
                    } else {
                        ps.setDouble(index, doubleValue);
                    }
                    return;
                case DATE:
                case DATETIME:
                case TIMESTAMP:
                    LocalDateTime dateValue = null;
                    if (!SqlUtils.isZeroTime(value)) {
                        try {
                            dateValue = SqlUtils.toLocalDateTime(value);
                        } catch (Exception e) {
                            ps.setString(index, SqlUtils.convertToString(value));
                            return;
                        }
                    } else if (StringUtils.isNotBlank(config.getZeroDate())) {
                        dateValue = SqlUtils.toLocalDateTime(config.getZeroDate());
                    } else {
                        ps.setObject(index, value);
                        return;
                    }
                    if (dateValue == null) {
                        ps.setNull(index, Types.TIMESTAMP);
                    } else {
                        ps.setString(index, dataTimePattern.format(dateValue));
                    }
                    return;
                case TIME:
                    String timeValue = SqlUtils.toMySqlTime(value);
                    if (StringUtils.isBlank(timeValue)) {
                        ps.setNull(index, 12);
                        return;
                    } else {
                        ps.setString(index, timeValue);
                        return;
                    }
                case YEAR:
                    LocalDateTime yearValue = null;
                    if (!SqlUtils.isZeroTime(value)) {
                        yearValue = SqlUtils.toLocalDateTime(value);
                    } else if (StringUtils.isNotBlank(config.getZeroDate())) {
                        yearValue = SqlUtils.toLocalDateTime(config.getZeroDate());
                    } else {
                        ps.setInt(index, 0);
                        return;
                    }
                    if (yearValue == null) {
                        ps.setNull(index, 4);
                    } else {
                        ps.setInt(index, yearValue.getYear());
                    }
                    return;
                case CHAR:
                case VARCHAR:
                case TINYTEXT:
                case TEXT:
                case MEDIUMTEXT:
                case LONGTEXT:
                case ENUM:
                case SET:
                    String strValue = value.toString();
                    if (strValue == null) {
                        ps.setNull(index, Types.VARCHAR);
                        return;
                    } else {
                        ps.setString(index, strValue);
                        return;
                    }
                case JSON:
                    String jsonValue = value.toString();
                    if (jsonValue == null) {
                        ps.setNull(index, Types.VARCHAR);
                    } else {
                        ps.setString(index, jsonValue);
                    }
                    return;
                case BIT:
                    if (value instanceof Boolean) {
                        byte[] arrayBoolean = new byte[1];
                        arrayBoolean[0] = (byte) (Boolean.TRUE.equals(value) ? 1 : 0);
                        ps.setBytes(index, arrayBoolean);
                        return;
                    } else if (value instanceof Number) {
                        ps.setBytes(index, SqlUtils.numberToBinaryArray((Number) value));
                        return;
                    } else if ((value instanceof byte[]) || value.toString().startsWith("0x") || value.toString().startsWith("0X")) {
                        byte[] arrayBoolean = SqlUtils.toBytes(value);
                        if (arrayBoolean == null || arrayBoolean.length == 0) {
                            ps.setNull(index, Types.BIT);
                            return;
                        } else {
                            ps.setBytes(index, arrayBoolean);
                            return;
                        }
                    } else {
                        ps.setBytes(index, SqlUtils.numberToBinaryArray(SqlUtils.toInt(value)));
                        return;
                    }
                case BINARY:
                case VARBINARY:
                case TINYBLOB:
                case BLOB:
                case MEDIUMBLOB:
                case LONGBLOB:
                    byte[] binaryValue = SqlUtils.toBytes(value);
                    if (binaryValue == null) {
                        ps.setNull(index, Types.BINARY);
                        return;
                    } else {
                        ps.setBytes(index, binaryValue);
                        return;
                    }
                case GEOMETRY:
                case GEOMETRY_COLLECTION:
                case GEOM_COLLECTION:
                case POINT:
                case LINESTRING:
                case POLYGON:
                case MULTIPOINT:
                case MULTILINESTRING:
                case MULTIPOLYGON:
                    String geoValue = SqlUtils.toGeometry(value);
                    if (geoValue == null) {
                        ps.setNull(index, Types.VARCHAR);
                        return;
                    }
                    ps.setString(index, geoValue);
                    return;
                default:
                    throw new UnsupportedOperationException("columnType '" + colType + "' Unsupported.");
            }
        }
    }