private void insert()

in client-adapter/phoenix/src/main/java/com/alibaba/otter/canal/client/adapter/phoenix/service/PhoenixSyncService.java [377:471]


    private void insert(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) throws SQLException {
        Map<String, Object> data = dml.getData();
        if (data == null || data.isEmpty()) {
            return;
        }

        DbMapping dbMapping = config.getDbMapping();
        Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data);

        StringBuilder insertSql = new StringBuilder();
        insertSql.append("UPSERT INTO ").append(SyncUtil.getDbTableName(dbMapping)).append(" (");

        Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);

        int mapLen = columnsMap.size();
        List<Map<String, ?>> values = new ArrayList<>();
        for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
            String targetColumnName = entry.getKey();
            String srcColumnName = entry.getValue();
            if (srcColumnName == null) {
                srcColumnName = Util.cleanColumn(targetColumnName);
            }

            Integer type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase());
            if (type == null) {
                if (dbMapping.isSkipMissing()) {
                    logger.warn("Target missing field: {}", targetColumnName);
                    mapLen -= 1;
                    continue;
                } else if (dbMapping.getMapAll() && dbMapping.isAlter() && PhoenixEtlService.syncSchema(batchExecutor.getConn(), config)) {
                    columnsTypeCache.remove(config.getDestination() + "." + dbMapping.getDatabase() + "." + dbMapping.getTable());
                    ctype = getTargetColumnType(batchExecutor.getConn(), config);
                    type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase());
                }
                if (type == null) {
                    throw new RuntimeException("Target column: " + targetColumnName + " not matched");
                }
            }
            insertSql.append(dbMapping.escape(targetColumnName)).append(",");
            Object value = data.get(srcColumnName);
            BatchExecutor.setValue(values, type, value);
        }

        int len = insertSql.length();
        insertSql.delete(len - 1, len).append(") VALUES (");
        for (int i = 0; i < mapLen; i++) {
            insertSql.append("?,");
        }
        len = insertSql.length();
        insertSql.delete(len - 1, len).append(")");

        Map<String, Object> old = dml.getOld();
        try {
            if (old != null && !old.isEmpty()) {
                boolean keyChanged = false;
                List<Map<String, ?>> delValues = new ArrayList<>();
                StringBuilder deleteSql = new StringBuilder();
                deleteSql.append("DELETE FROM ").append(SyncUtil.getDbTableName(dbMapping)).append(" WHERE ");
                for (Map.Entry<String, String> entry : dbMapping.getTargetPk().entrySet()) {
                    String targetColumnName = entry.getKey();
                    String srcColumnName = entry.getValue();
                    if (srcColumnName == null) {
                        srcColumnName = Util.cleanColumn(targetColumnName);
                    }
                    Integer type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase());
                    if (type != null) {
                        deleteSql.append(dbMapping.escape(targetColumnName)).append("=? AND ");
                        // 如果有修改主键的情况
                        if (old.containsKey(srcColumnName)) {
                            keyChanged = true;
                            BatchExecutor.setValue(delValues, type, old.get(srcColumnName));
                        } else {
                            BatchExecutor.setValue(delValues, type, data.get(srcColumnName));
                        }
                    }
                }
                if (keyChanged) {
                    if (config.isDebug()) {
                        logger.info("insert into table: {} {}", deleteSql, delValues);
                    }
                    batchExecutor.execute(deleteSql.toString(), delValues);
                }
            }
            if (config.isDebug()) {
                logger.info("insert into table: {} {}", insertSql, values);
            }
            batchExecutor.execute(insertSql.toString(), values);
        } catch (SQLException | RuntimeException e) {
            logger.warn("Insert into target table, sql: {} {}", insertSql, values ,e);
            throw e;
        }
        if (logger.isTraceEnabled()) {
            logger.trace("Insert into target table, sql: {}", insertSql);
        }
    }