private void doPreparedStatement()

in eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkIncrementConnector.java [772:841]


        private void doPreparedStatement(PreparedStatement ps, DbDialect dbDialect, LobCreator lobCreator,
            CanalConnectRecord record) throws SQLException {
            EventType type = record.getEventType();
            List<EventColumn> columns = new ArrayList<EventColumn>();
            if (type.isInsert()) {
                columns.addAll(record.getColumns());
                columns.addAll(record.getKeys());
            } else if (type.isDelete()) {
                columns.addAll(record.getKeys());
            } else if (type.isUpdate()) {
                boolean existOldKeys = !CollectionUtils.isEmpty(record.getOldKeys());
                columns.addAll(record.getUpdatedColumns());
                columns.addAll(record.getKeys());
                if (existOldKeys) {
                    columns.addAll(record.getOldKeys());
                }
            }

            for (int i = 0; i < columns.size(); i++) {
                int paramIndex = i + 1;
                EventColumn column = columns.get(i);
                int sqlType = column.getColumnType();

                Object param = null;
                if (dbDialect instanceof MysqlDialect
                    && (sqlType == Types.TIME || sqlType == Types.TIMESTAMP || sqlType == Types.DATE)) {
                    param = column.getColumnValue();
                } else {
                    param = SqlUtils.stringToSqlValue(column.getColumnValue(),
                        sqlType,
                        false,
                        dbDialect.isEmptyStringNulled());
                }

                try {
                    switch (sqlType) {
                        case Types.CLOB:
                            lobCreator.setClobAsString(ps, paramIndex, (String) param);
                            break;

                        case Types.BLOB:
                            lobCreator.setBlobAsBytes(ps, paramIndex, (byte[]) param);
                            break;
                        case Types.TIME:
                        case Types.TIMESTAMP:
                        case Types.DATE:
                            if (dbDialect instanceof MysqlDialect) {
                                ps.setObject(paramIndex, param);
                            } else {
                                StatementCreatorUtils.setParameterValue(ps, paramIndex, sqlType, null, param);
                            }
                            break;
                        case Types.BIT:
                            if (dbDialect instanceof MysqlDialect) {
                                StatementCreatorUtils.setParameterValue(ps, paramIndex, Types.DECIMAL, null, param);
                            } else {
                                StatementCreatorUtils.setParameterValue(ps, paramIndex, sqlType, null, param);
                            }
                            break;
                        default:
                            StatementCreatorUtils.setParameterValue(ps, paramIndex, sqlType, null, param);
                            break;
                    }
                } catch (SQLException ex) {
                    log.error("## SetParam error , [pairId={}, sqltype={}, value={}]",
                        record.getPairId(), sqlType, param);
                    throw ex;
                }
            }
        }