private static boolean executeSqlImport()

in client-adapter/phoenix/src/main/java/com/alibaba/otter/canal/client/adapter/phoenix/service/PhoenixEtlService.java [275:412]


    private static boolean executeSqlImport(DataSource srcDS, Connection targetDSConnection, String sql, DbMapping dbMapping,
                                            AtomicLong successCount, List<String> errMsg, boolean debug) {
        try {
            Map<String, String> columnsMap = new LinkedHashMap<>();
            Map<String, Integer> columnType = new LinkedHashMap<>();


            PhoenixSupportUtil.sqlRS(targetDSConnection, "SELECT * FROM " + SyncUtil.getDbTableName(dbMapping) + " LIMIT 1 ", rs -> {
                try {

                    ResultSetMetaData rsd = rs.getMetaData();
                    int columnCount = rsd.getColumnCount();
                    List<String> columns = new ArrayList<>();
                    List<String> excludeColumns = dbMapping.getExcludeColumns();
                    for (int i = 1; i <= columnCount; i++) {
                        String lower = rsd.getColumnName(i).toLowerCase();
                        if (!excludeColumns.contains(lower)) {
                            columnType.put(lower, rsd.getColumnType(i));
                            columns.add(lower);
                        }
                    }
                    columnsMap.putAll(SyncUtil.getColumnsMap(dbMapping, columns));
                    return true;
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                    return false;
                }
            });
            Util.sqlRS(srcDS, sql, rs -> {
                int idx = 1;

                try {
                    boolean completed = false;

                    // if (dbMapping.isMapAll()) {
                    // columnsMap = dbMapping.getAllColumns();
                    // } else {
                    // columnsMap = dbMapping.getTargetColumns();
                    // }

                    StringBuilder insertSql = new StringBuilder();
                    insertSql.append("UPSERT INTO ").append(SyncUtil.getDbTableName(dbMapping)).append(" (");
                    columnsMap
                            .forEach((targetColumnName, srcColumnName) -> insertSql.append(dbMapping.escape(targetColumnName)).append(","));

                    int len = insertSql.length();
                    insertSql.delete(len - 1, len).append(") VALUES (");
                    int mapLen = columnsMap.size();
                    for (int i = 0; i < mapLen; i++) {
                        insertSql.append("?,");
                    }
                    len = insertSql.length();
                    insertSql.delete(len - 1, len).append(")");
                    try (
                         //Connection connTarget = targetDS.getConnection();
                         Connection connTarget =PhoenixAdapter.getPhoenixConnection();
                         PreparedStatement pstmt = connTarget.prepareStatement(insertSql.toString())) {
                        connTarget.setAutoCommit(false);

                        while (rs.next()) {
                            completed = false;

                            pstmt.clearParameters();

                            // 删除数据
                            Map<String, Object> values = new LinkedHashMap<>();
                            StringBuilder deleteSql = new StringBuilder(
                                    "DELETE FROM " + SyncUtil.getDbTableName(dbMapping) + " WHERE ");
                            appendCondition(dbMapping, deleteSql, values, rs);
                            try (PreparedStatement pstmt2 = connTarget.prepareStatement(deleteSql.toString())) {
                                int k = 1;
                                for (Object val : values.values()) {
                                    pstmt2.setObject(k++, val);
                                }
                                pstmt2.execute();
                            }

                            Map<String, Object> insertValues = new HashMap<>();
                            int i = 1;
                            for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
                                String targetColumnName = entry.getKey();
                                String srcColumnName = entry.getValue();
                                if (srcColumnName == null) {
                                    srcColumnName = targetColumnName;
                                }

                                Integer type = columnType.get(targetColumnName.toLowerCase());

                                try {
                                    Object value = rs.getObject(srcColumnName);
                                    insertValues.put(srcColumnName, value);
                                    if (value != null) {
                                        SyncUtil.setPStmt(type, pstmt, value, i);
                                    } else {
                                        pstmt.setNull(i, type);
                                    }
                                } catch (SQLException e) {
                                    insertValues.put(srcColumnName, null);
                                    pstmt.setNull(i, type);
                                }
                                i++;
                            }
                            if (debug) {
                                logger.info("insert sql: {} {} {}", insertSql, insertValues, pstmt);
                            }

                            pstmt.execute();
                            if (logger.isTraceEnabled()) {
                                logger.trace("Insert into target table, sql: {}", insertSql);
                            }

                            if (idx % dbMapping.getCommitBatch() == 0) {
                                connTarget.commit();
                                completed = true;
                            }
                            idx++;
                            successCount.incrementAndGet();
                            if (logger.isDebugEnabled()) {
                                logger.debug("successful import count:" + successCount.get());
                            }
                        }
                        if (!completed) {
                            connTarget.commit();
                        }
                    }

                } catch (Exception e) {
                    logger.error(dbMapping.getTable() + " etl failed! ==>" + e.getMessage(), e);
                    errMsg.add(dbMapping.getTable() + " etl failed! ==>" + e.getMessage());
                }
                return idx;
            });
            return true;
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            return false;
        }
    }