in eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalFullProducer.java [315:392]
private void setPrepareStatementValue(PreparedStatement statement) throws SQLException {
String colName = choosePrimaryKey.get();
if (colName == null) {
return;
}
RdbColumnDefinition columnDefinition = tableDefinition.getColumnDefinitions().get(colName);
Object value = tableFullPosition.getCurPrimaryKeyCols().get(colName);
String str;
switch (columnDefinition.getJdbcType()) {
case BIT:
case TINYINT:
case SMALLINT:
case INTEGER:
case BIGINT:
statement.setBigDecimal(1, new BigDecimal(String.valueOf(value)));
break;
case DECIMAL:
case FLOAT:
case DOUBLE:
case NUMERIC:
statement.setDouble(1, new BigDecimal(String.valueOf(value)).doubleValue());
break;
case CHAR:
case VARCHAR:
case LONGNVARCHAR:
case NCHAR:
case NVARCHAR:
case LONGVARCHAR:
case CLOB:
case NCLOB:
statement.setString(1, String.valueOf(value));
break;
case BLOB:
case VARBINARY:
case BINARY:
str = String.valueOf(value);
String hexStr = str;
if (str.startsWith("0x")) {
hexStr = str.substring(str.indexOf("0x"));
}
byte[] bytes = SqlUtils.hex2bytes(hexStr);
statement.setBytes(1, bytes);
break;
case DATE:
Instant d;
if (value instanceof Long) {
Long val = (Long) value;
d = Instant.ofEpochMilli(val);
str = d.atZone(ZoneId.systemDefault()).toLocalDateTime().format(DATE_FORMATTER);
} else if (value instanceof Integer) {
Integer val = (Integer) value;
d = Instant.ofEpochMilli((long) val);
str = d.atZone(ZoneId.systemDefault()).toLocalDateTime().format(DATE_FORMATTER);
} else if (value instanceof String) {
str = (String) value;
} else {
if (!(value instanceof LocalDate)) {
throw new IllegalArgumentException("unsupported date class type:" + value.getClass().getSimpleName());
}
str = ((LocalDate) value).format(DATE_FORMATTER);
}
statement.setString(1, str);
break;
case TIMESTAMP:
if (value instanceof String) {
str = (String) value;
} else {
if (!(value instanceof LocalDateTime)) {
throw new IllegalArgumentException("unsupported timestamp class type:" + value.getClass().getSimpleName());
}
str = ((LocalDateTime) value).format(DATE_STAMP_FORMATTER);
}
statement.setString(1, str);
break;
default:
throw new EventMeshException(String.format("not support the primary key type [%s]", value.getClass()));
}
}