private void alterTableIfNeeded()

in src/main/java/org/apache/doris/kafka/connector/converter/RecordService.java [169:198]


    private void alterTableIfNeeded(String tableName, RecordDescriptor record) {
        // Resolve table metadata from the database
        final TableDescriptor table = fetchDorisTableDescriptor(tableName);

        missingFields = resolveMissingFields(record, table);
        if (missingFields.isEmpty()) {
            // There are no missing fields, simply return
            // TODO should we check column type changes or default value changes?
            return;
        }

        LOG.info(
                "Find some miss columns in {} table, try to alter add this columns={}.",
                tableName,
                missingFields.stream()
                        .map(RecordDescriptor.FieldDescriptor::getName)
                        .collect(Collectors.toList()));
        if (SchemaEvolutionMode.NONE.equals(dorisOptions.getSchemaEvolutionMode())) {
            LOG.warn(
                    "Table '{}' cannot be altered because schema evolution is disabled.",
                    tableName);
            throw new SchemaChangeException(
                    "Cannot alter table " + tableName + " because schema evolution is disabled");
        }
        for (RecordDescriptor.FieldDescriptor missingField : missingFields) {
            schemaChangeManager.addColumnDDL(tableName, missingField);
        }
        TableDescriptor newTableDescriptor = obtainTableSchema(tableName);
        dorisTableDescriptorCache.put(tableName, newTableDescriptor);
    }