in eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkIncrementConnector.java [772:841]
private void doPreparedStatement(PreparedStatement ps, DbDialect dbDialect, LobCreator lobCreator,
CanalConnectRecord record) throws SQLException {
EventType type = record.getEventType();
List<EventColumn> columns = new ArrayList<EventColumn>();
if (type.isInsert()) {
columns.addAll(record.getColumns());
columns.addAll(record.getKeys());
} else if (type.isDelete()) {
columns.addAll(record.getKeys());
} else if (type.isUpdate()) {
boolean existOldKeys = !CollectionUtils.isEmpty(record.getOldKeys());
columns.addAll(record.getUpdatedColumns());
columns.addAll(record.getKeys());
if (existOldKeys) {
columns.addAll(record.getOldKeys());
}
}
for (int i = 0; i < columns.size(); i++) {
int paramIndex = i + 1;
EventColumn column = columns.get(i);
int sqlType = column.getColumnType();
Object param = null;
if (dbDialect instanceof MysqlDialect
&& (sqlType == Types.TIME || sqlType == Types.TIMESTAMP || sqlType == Types.DATE)) {
param = column.getColumnValue();
} else {
param = SqlUtils.stringToSqlValue(column.getColumnValue(),
sqlType,
false,
dbDialect.isEmptyStringNulled());
}
try {
switch (sqlType) {
case Types.CLOB:
lobCreator.setClobAsString(ps, paramIndex, (String) param);
break;
case Types.BLOB:
lobCreator.setBlobAsBytes(ps, paramIndex, (byte[]) param);
break;
case Types.TIME:
case Types.TIMESTAMP:
case Types.DATE:
if (dbDialect instanceof MysqlDialect) {
ps.setObject(paramIndex, param);
} else {
StatementCreatorUtils.setParameterValue(ps, paramIndex, sqlType, null, param);
}
break;
case Types.BIT:
if (dbDialect instanceof MysqlDialect) {
StatementCreatorUtils.setParameterValue(ps, paramIndex, Types.DECIMAL, null, param);
} else {
StatementCreatorUtils.setParameterValue(ps, paramIndex, sqlType, null, param);
}
break;
default:
StatementCreatorUtils.setParameterValue(ps, paramIndex, sqlType, null, param);
break;
}
} catch (SQLException ex) {
log.error("## SetParam error , [pairId={}, sqltype={}, value={}]",
record.getPairId(), sqlType, param);
throw ex;
}
}
}