in src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java [169:198]
private void alterTableIfNeeded(String tableName, RecordDescriptor record) {
// Resolve table metadata from the database
final TableDescriptor table = fetchDorisTableDescriptor(tableName);
missingFields = resolveMissingFields(record, table);
if (missingFields.isEmpty()) {
// There are no missing fields, simply return
// TODO should we check column type changes or default value changes?
return;
}
LOG.info(
"Find some miss columns in {} table, try to alter add this columns={}.",
tableName,
missingFields.stream()
.map(RecordDescriptor.FieldDescriptor::getName)
.collect(Collectors.toList()));
if (SchemaEvolutionMode.NONE.equals(dorisOptions.getSchemaEvolutionMode())) {
LOG.warn(
"Table '{}' cannot be altered because schema evolution is disabled.",
tableName);
throw new SchemaChangeException(
"Cannot alter table " + tableName + " because schema evolution is disabled");
}
for (RecordDescriptor.FieldDescriptor missingField : missingFields) {
schemaChangeManager.addColumnDDL(tableName, missingField);
}
TableDescriptor newTableDescriptor = obtainTableSchema(tableName);
dorisTableDescriptorCache.put(tableName, newTableDescriptor);
}