in eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalFullConsumer.java [189:363]
public void writeColumn(PreparedStatement ps, int index, MySQLColumnDef colType, Object value) throws Exception {
if (colType == null) {
String colVal = null;
if (value != null) {
colVal = value.toString();
}
if (colVal == null) {
ps.setNull(index, Types.VARCHAR);
} else {
ps.setString(index, colVal);
}
} else if (value == null) {
ps.setNull(index, colType.getJdbcType().getVendorTypeNumber());
} else {
switch (colType.getType()) {
case TINYINT:
case SMALLINT:
case MEDIUMINT:
case INT:
Long longValue = SqlUtils.toLong(value);
if (longValue == null) {
ps.setNull(index, 4);
return;
} else {
ps.setLong(index, longValue);
return;
}
case BIGINT:
case DECIMAL:
BigDecimal bigDecimalValue = SqlUtils.toBigDecimal(value);
if (bigDecimalValue == null) {
ps.setNull(index, 3);
return;
} else {
ps.setBigDecimal(index, bigDecimalValue);
return;
}
case FLOAT:
case DOUBLE:
Double doubleValue = SqlUtils.toDouble(value);
if (doubleValue == null) {
ps.setNull(index, 8);
} else {
ps.setDouble(index, doubleValue);
}
return;
case DATE:
case DATETIME:
case TIMESTAMP:
LocalDateTime dateValue = null;
if (!SqlUtils.isZeroTime(value)) {
try {
dateValue = SqlUtils.toLocalDateTime(value);
} catch (Exception e) {
ps.setString(index, SqlUtils.convertToString(value));
return;
}
} else if (StringUtils.isNotBlank(config.getZeroDate())) {
dateValue = SqlUtils.toLocalDateTime(config.getZeroDate());
} else {
ps.setObject(index, value);
return;
}
if (dateValue == null) {
ps.setNull(index, Types.TIMESTAMP);
} else {
ps.setString(index, dataTimePattern.format(dateValue));
}
return;
case TIME:
String timeValue = SqlUtils.toMySqlTime(value);
if (StringUtils.isBlank(timeValue)) {
ps.setNull(index, 12);
return;
} else {
ps.setString(index, timeValue);
return;
}
case YEAR:
LocalDateTime yearValue = null;
if (!SqlUtils.isZeroTime(value)) {
yearValue = SqlUtils.toLocalDateTime(value);
} else if (StringUtils.isNotBlank(config.getZeroDate())) {
yearValue = SqlUtils.toLocalDateTime(config.getZeroDate());
} else {
ps.setInt(index, 0);
return;
}
if (yearValue == null) {
ps.setNull(index, 4);
} else {
ps.setInt(index, yearValue.getYear());
}
return;
case CHAR:
case VARCHAR:
case TINYTEXT:
case TEXT:
case MEDIUMTEXT:
case LONGTEXT:
case ENUM:
case SET:
String strValue = value.toString();
if (strValue == null) {
ps.setNull(index, Types.VARCHAR);
return;
} else {
ps.setString(index, strValue);
return;
}
case JSON:
String jsonValue = value.toString();
if (jsonValue == null) {
ps.setNull(index, Types.VARCHAR);
} else {
ps.setString(index, jsonValue);
}
return;
case BIT:
if (value instanceof Boolean) {
byte[] arrayBoolean = new byte[1];
arrayBoolean[0] = (byte) (Boolean.TRUE.equals(value) ? 1 : 0);
ps.setBytes(index, arrayBoolean);
return;
} else if (value instanceof Number) {
ps.setBytes(index, SqlUtils.numberToBinaryArray((Number) value));
return;
} else if ((value instanceof byte[]) || value.toString().startsWith("0x") || value.toString().startsWith("0X")) {
byte[] arrayBoolean = SqlUtils.toBytes(value);
if (arrayBoolean == null || arrayBoolean.length == 0) {
ps.setNull(index, Types.BIT);
return;
} else {
ps.setBytes(index, arrayBoolean);
return;
}
} else {
ps.setBytes(index, SqlUtils.numberToBinaryArray(SqlUtils.toInt(value)));
return;
}
case BINARY:
case VARBINARY:
case TINYBLOB:
case BLOB:
case MEDIUMBLOB:
case LONGBLOB:
byte[] binaryValue = SqlUtils.toBytes(value);
if (binaryValue == null) {
ps.setNull(index, Types.BINARY);
return;
} else {
ps.setBytes(index, binaryValue);
return;
}
case GEOMETRY:
case GEOMETRY_COLLECTION:
case GEOM_COLLECTION:
case POINT:
case LINESTRING:
case POLYGON:
case MULTIPOINT:
case MULTILINESTRING:
case MULTIPOLYGON:
String geoValue = SqlUtils.toGeometry(value);
if (geoValue == null) {
ps.setNull(index, Types.VARCHAR);
return;
}
ps.setString(index, geoValue);
return;
default:
throw new UnsupportedOperationException("columnType '" + colType + "' Unsupported.");
}
}
}