in client-adapter/phoenix/src/main/java/com/alibaba/otter/canal/client/adapter/phoenix/service/PhoenixSyncService.java [377:471]
private void insert(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) throws SQLException {
Map<String, Object> data = dml.getData();
if (data == null || data.isEmpty()) {
return;
}
DbMapping dbMapping = config.getDbMapping();
Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data);
StringBuilder insertSql = new StringBuilder();
insertSql.append("UPSERT INTO ").append(SyncUtil.getDbTableName(dbMapping)).append(" (");
Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
int mapLen = columnsMap.size();
List<Map<String, ?>> values = new ArrayList<>();
for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
String targetColumnName = entry.getKey();
String srcColumnName = entry.getValue();
if (srcColumnName == null) {
srcColumnName = Util.cleanColumn(targetColumnName);
}
Integer type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase());
if (type == null) {
if (dbMapping.isSkipMissing()) {
logger.warn("Target missing field: {}", targetColumnName);
mapLen -= 1;
continue;
} else if (dbMapping.getMapAll() && dbMapping.isAlter() && PhoenixEtlService.syncSchema(batchExecutor.getConn(), config)) {
columnsTypeCache.remove(config.getDestination() + "." + dbMapping.getDatabase() + "." + dbMapping.getTable());
ctype = getTargetColumnType(batchExecutor.getConn(), config);
type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase());
}
if (type == null) {
throw new RuntimeException("Target column: " + targetColumnName + " not matched");
}
}
insertSql.append(dbMapping.escape(targetColumnName)).append(",");
Object value = data.get(srcColumnName);
BatchExecutor.setValue(values, type, value);
}
int len = insertSql.length();
insertSql.delete(len - 1, len).append(") VALUES (");
for (int i = 0; i < mapLen; i++) {
insertSql.append("?,");
}
len = insertSql.length();
insertSql.delete(len - 1, len).append(")");
Map<String, Object> old = dml.getOld();
try {
if (old != null && !old.isEmpty()) {
boolean keyChanged = false;
List<Map<String, ?>> delValues = new ArrayList<>();
StringBuilder deleteSql = new StringBuilder();
deleteSql.append("DELETE FROM ").append(SyncUtil.getDbTableName(dbMapping)).append(" WHERE ");
for (Map.Entry<String, String> entry : dbMapping.getTargetPk().entrySet()) {
String targetColumnName = entry.getKey();
String srcColumnName = entry.getValue();
if (srcColumnName == null) {
srcColumnName = Util.cleanColumn(targetColumnName);
}
Integer type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase());
if (type != null) {
deleteSql.append(dbMapping.escape(targetColumnName)).append("=? AND ");
// 如果有修改主键的情况
if (old.containsKey(srcColumnName)) {
keyChanged = true;
BatchExecutor.setValue(delValues, type, old.get(srcColumnName));
} else {
BatchExecutor.setValue(delValues, type, data.get(srcColumnName));
}
}
}
if (keyChanged) {
if (config.isDebug()) {
logger.info("insert into table: {} {}", deleteSql, delValues);
}
batchExecutor.execute(deleteSql.toString(), delValues);
}
}
if (config.isDebug()) {
logger.info("insert into table: {} {}", insertSql, values);
}
batchExecutor.execute(insertSql.toString(), values);
} catch (SQLException | RuntimeException e) {
logger.warn("Insert into target table, sql: {} {}", insertSql, values ,e);
throw e;
}
if (logger.isTraceEnabled()) {
logger.trace("Insert into target table, sql: {}", insertSql);
}
}