in src/main/java/org/apache/doris/kafka/connector/converter/schema/SchemaChangeManager.java [70:94]
public void addColumnDDL(String tableName, RecordDescriptor.FieldDescriptor field) {
try {
// check the add column whether exist in table.
if (dorisSystemService.isColumnExist(
dorisOptions.getDatabase(), tableName, field.getName())) {
LOG.warn(
"The column {} already exists in table {}, no need to add it again",
field.getName(),
tableName);
return;
}
String addColumnDDL = buildAddColumnDDL(dorisOptions.getDatabase(), tableName, field);
boolean status = execute(addColumnDDL, dorisOptions.getDatabase());
LOG.info(
"Add missing column for {} table, ddl={}, status={}",
tableName,
addColumnDDL,
status);
} catch (Exception e) {
LOG.warn("Failed to add column for {}, cause by: ", tableName, e);
throw new SchemaChangeException(
"Failed to add column for " + tableName + ", cause by:", e);
}
}