boolean amendIfNecessary()

in connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/sink/DbStructure.java [106:198]


    boolean amendIfNecessary(
            final JdbcSinkConfig config,
            final Connection connection,
            final TableId tableId,
            final FieldsMetadata fieldsMetadata,
            final int maxRetries
    ) throws SQLException, TableAlterOrCreateException {
        final TableDefinition tableDefn = tableDefinitions.get(connection, tableId);
        final Set<SinkRecordField> missingFields = missingFields(
            fieldsMetadata.allFields.values(),
            tableDefn.columnNames()
        );

        if (missingFields.isEmpty()) {
            return false;
        }
        TableType type = tableDefn.type();
        switch (type) {
            case TABLE:
                // Rather than embed the logic and change lots of lines, just break out
                break;
            case VIEW:
            default:
                throw new TableAlterOrCreateException(
                        String.format(
                                "%s %s is missing fields (%s) and ALTER %s is unsupported",
                                type.capitalized(),
                                tableId,
                                missingFields,
                                type.jdbcName()
                        )
                );
        }

        final Set<SinkRecordField> replacedMissingFields = new HashSet<>();
        for (SinkRecordField missingField : missingFields) {
            if (!missingField.isOptional() && missingField.defaultValue() == null) {
                throw new TableAlterOrCreateException(String.format(
                        "Cannot ALTER %s %s to add missing field %s, as the field is not optional and does "
                                + "not have a default value",
                        type.jdbcName(),
                        tableId,
                        missingField
                ));
            }
        }

        if (!config.isAutoCreate()) {
            throw new TableAlterOrCreateException(String.format(
                    "%s %s is missing fields (%s) and auto-evolution is disabled",
                    type.capitalized(),
                    tableId,
                    replacedMissingFields
            ));
        }

        final List<String> amendTableQueries = dbDialect.buildAlterTable(tableId, replacedMissingFields);
        log.info(
                "Amending {} to add missing fields:{} maxRetries:{} with SQL: {}",
                type,
                replacedMissingFields,
                maxRetries,
                amendTableQueries
        );
        try {
            dbDialect.executeSchemaChangeStatements(connection, amendTableQueries);
        } catch (SQLException sqle) {
            if (maxRetries <= 0) {
                throw new TableAlterOrCreateException(
                        String.format(
                                "Failed to amend %s '%s' to add missing fields: %s",
                                type,
                                tableId,
                                replacedMissingFields
                        ),
                        sqle
                );
            }
            log.warn("Amend failed, re-attempting", sqle);
            tableDefinitions.refresh(connection, tableId);
            // Perhaps there was a race with other tasks to add the columns
            return amendIfNecessary(
                    config,
                    connection,
                    tableId,
                    fieldsMetadata,
                    maxRetries - 1
            );
        }

        tableDefinitions.refresh(connection, tableId);
        return true;
    }