in client-adapter/phoenix/src/main/java/com/alibaba/otter/canal/client/adapter/phoenix/service/PhoenixEtlService.java [275:412]
private static boolean executeSqlImport(DataSource srcDS, Connection targetDSConnection, String sql, DbMapping dbMapping,
AtomicLong successCount, List<String> errMsg, boolean debug) {
try {
Map<String, String> columnsMap = new LinkedHashMap<>();
Map<String, Integer> columnType = new LinkedHashMap<>();
PhoenixSupportUtil.sqlRS(targetDSConnection, "SELECT * FROM " + SyncUtil.getDbTableName(dbMapping) + " LIMIT 1 ", rs -> {
try {
ResultSetMetaData rsd = rs.getMetaData();
int columnCount = rsd.getColumnCount();
List<String> columns = new ArrayList<>();
List<String> excludeColumns = dbMapping.getExcludeColumns();
for (int i = 1; i <= columnCount; i++) {
String lower = rsd.getColumnName(i).toLowerCase();
if (!excludeColumns.contains(lower)) {
columnType.put(lower, rsd.getColumnType(i));
columns.add(lower);
}
}
columnsMap.putAll(SyncUtil.getColumnsMap(dbMapping, columns));
return true;
} catch (Exception e) {
logger.error(e.getMessage(), e);
return false;
}
});
Util.sqlRS(srcDS, sql, rs -> {
int idx = 1;
try {
boolean completed = false;
// if (dbMapping.isMapAll()) {
// columnsMap = dbMapping.getAllColumns();
// } else {
// columnsMap = dbMapping.getTargetColumns();
// }
StringBuilder insertSql = new StringBuilder();
insertSql.append("UPSERT INTO ").append(SyncUtil.getDbTableName(dbMapping)).append(" (");
columnsMap
.forEach((targetColumnName, srcColumnName) -> insertSql.append(dbMapping.escape(targetColumnName)).append(","));
int len = insertSql.length();
insertSql.delete(len - 1, len).append(") VALUES (");
int mapLen = columnsMap.size();
for (int i = 0; i < mapLen; i++) {
insertSql.append("?,");
}
len = insertSql.length();
insertSql.delete(len - 1, len).append(")");
try (
//Connection connTarget = targetDS.getConnection();
Connection connTarget =PhoenixAdapter.getPhoenixConnection();
PreparedStatement pstmt = connTarget.prepareStatement(insertSql.toString())) {
connTarget.setAutoCommit(false);
while (rs.next()) {
completed = false;
pstmt.clearParameters();
// 删除数据
Map<String, Object> values = new LinkedHashMap<>();
StringBuilder deleteSql = new StringBuilder(
"DELETE FROM " + SyncUtil.getDbTableName(dbMapping) + " WHERE ");
appendCondition(dbMapping, deleteSql, values, rs);
try (PreparedStatement pstmt2 = connTarget.prepareStatement(deleteSql.toString())) {
int k = 1;
for (Object val : values.values()) {
pstmt2.setObject(k++, val);
}
pstmt2.execute();
}
Map<String, Object> insertValues = new HashMap<>();
int i = 1;
for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
String targetColumnName = entry.getKey();
String srcColumnName = entry.getValue();
if (srcColumnName == null) {
srcColumnName = targetColumnName;
}
Integer type = columnType.get(targetColumnName.toLowerCase());
try {
Object value = rs.getObject(srcColumnName);
insertValues.put(srcColumnName, value);
if (value != null) {
SyncUtil.setPStmt(type, pstmt, value, i);
} else {
pstmt.setNull(i, type);
}
} catch (SQLException e) {
insertValues.put(srcColumnName, null);
pstmt.setNull(i, type);
}
i++;
}
if (debug) {
logger.info("insert sql: {} {} {}", insertSql, insertValues, pstmt);
}
pstmt.execute();
if (logger.isTraceEnabled()) {
logger.trace("Insert into target table, sql: {}", insertSql);
}
if (idx % dbMapping.getCommitBatch() == 0) {
connTarget.commit();
completed = true;
}
idx++;
successCount.incrementAndGet();
if (logger.isDebugEnabled()) {
logger.debug("successful import count:" + successCount.get());
}
}
if (!completed) {
connTarget.commit();
}
}
} catch (Exception e) {
logger.error(dbMapping.getTable() + " etl failed! ==>" + e.getMessage(), e);
errMsg.add(dbMapping.getTable() + " etl failed! ==>" + e.getMessage());
}
return idx;
});
return true;
} catch (Exception e) {
logger.error(e.getMessage(), e);
return false;
}
}