in client-adapter/phoenix/src/main/java/com/alibaba/otter/canal/client/adapter/phoenix/service/PhoenixSyncService.java [269:369]
private void alter(BatchExecutor batchExecutor, MappingConfig config, Dml dml, List<SQLStatement> stmtList, String configFile) throws SQLException {
if (config.isDebug()) {
logger.info("DML: {} {}", configFile, JSON.toJSONString(dml, Feature.WriteNulls));
}
DbMapping dbMapping = config.getDbMapping();
if (!dbMapping.isAlter()) {
logger.info("not alterable table: {} {}", dml.getTable(), configFile);
return;
}
Map<String, String> columnsMap = dbMapping.getTargetColumns();
Map<String, String> columnsMap1 = new HashMap<>();
for (Map.Entry<String, String> entry : columnsMap.entrySet()) {
columnsMap1.put(entry.getValue(), entry.getKey());
}
String targetTable = SyncUtil.getDbTableName(dbMapping);
Map<String, String> defValues = new HashMap<>();
for (SQLStatement statement : stmtList) {
if (statement instanceof SQLAlterTableStatement) {
SQLAlterTableStatement alterTable = (SQLAlterTableStatement) statement;
for (SQLAlterTableItem item : alterTable.getItems()) {
if (item instanceof SQLAlterTableDropColumnItem) {
SQLAlterTableDropColumnItem dropColumnItem = (SQLAlterTableDropColumnItem) item;
if (!dbMapping.isDrop()) {
logger.info("drop table column disabled: {} {}", targetTable, dropColumnItem.getColumns());
continue;
}
for (SQLName sqlName : dropColumnItem.getColumns()) {
String name = Util.cleanColumn(sqlName.getSimpleName());
String sql = "ALTER TABLE " + targetTable + " DROP COLUMN IF EXISTS " +
dbMapping.escape(columnsMap1.getOrDefault(name, name));
try {
logger.info("drop table column: {} {}", sql, batchExecutor.executeUpdate(sql));
dbMapping.removeTargetColumn(name);
} catch (Exception e) {
logger.warn("drop table column error: " + sql, e);
}
}
} else if (item instanceof SQLAlterTableAddColumn) {
SQLAlterTableAddColumn addColumn = (SQLAlterTableAddColumn) item;
if (!dbMapping.getMapAll()) {
logger.info("add table column disabled: {} {}", targetTable, addColumn.getColumns());
continue;
}
for (SQLColumnDefinition definition : addColumn.getColumns()) {
String name = Util.cleanColumn(definition.getNameAsString());
if (dbMapping.getExcludeColumns().contains(name)) {
continue;
}
String sql = "ALTER TABLE " + targetTable +
" ADD IF NOT EXISTS " +
dbMapping.escape(name) + " " + TypeUtil.getPhoenixType(definition, dbMapping.isLimit());
try {
logger.info("add table column: {} {}", sql, batchExecutor.executeUpdate(sql));
dbMapping.addTargetColumn(name, name);
if (definition.getDefaultExpr() != null) {
String defVal = definition.getDefaultExpr().toString();
if (!defVal.equalsIgnoreCase("NULL") && !defVal.equalsIgnoreCase("NOT NULL") && name.length() > 0) {
defValues.put(name, defVal);
}
}
} catch (Exception e) {
logger.error("add table column error: " + sql, e);
throw e;
}
}
}
}
}
}
if (!defValues.isEmpty()) {
StringBuilder defSql = new StringBuilder();
defSql.append("UPSERT INTO ").append(targetTable).append("(");
Set<Map.Entry<String, String>> pkSet = dbMapping.getTargetPk().entrySet();
Set<Map.Entry<String, String>> defSet = defValues.entrySet();
for (Map.Entry<String, String> entry : pkSet) {
defSql.append(dbMapping.escape(entry.getKey())).append(",");
}
for (Map.Entry<String, String> entry : defSet) {
defSql.append(dbMapping.escape(entry.getKey())).append(",");
}
defSql.deleteCharAt(defSql.length() - 1).append(") SELECT ");
for (Map.Entry<String, String> entry : pkSet) {
defSql.append(dbMapping.escape(entry.getKey())).append(",");
}
for (Map.Entry<String, String> entry : defSet) {
defSql.append(entry.getValue()).append(",");
}
defSql.deleteCharAt(defSql.length() - 1).append(" FROM ").append(targetTable);
try {
logger.info("set column default value: {} {}", defSql, batchExecutor.executeUpdate(defSql.toString()));
batchExecutor.commit();
} catch (SQLException e) {
logger.error("set column default value error: {}", defSql, e);
batchExecutor.rollback();
throw e;
}
}
}