in connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/sink/DbStructure.java [106:198]
boolean amendIfNecessary(
final JdbcSinkConfig config,
final Connection connection,
final TableId tableId,
final FieldsMetadata fieldsMetadata,
final int maxRetries
) throws SQLException, TableAlterOrCreateException {
final TableDefinition tableDefn = tableDefinitions.get(connection, tableId);
final Set<SinkRecordField> missingFields = missingFields(
fieldsMetadata.allFields.values(),
tableDefn.columnNames()
);
if (missingFields.isEmpty()) {
return false;
}
TableType type = tableDefn.type();
switch (type) {
case TABLE:
// Rather than embed the logic and change lots of lines, just break out
break;
case VIEW:
default:
throw new TableAlterOrCreateException(
String.format(
"%s %s is missing fields (%s) and ALTER %s is unsupported",
type.capitalized(),
tableId,
missingFields,
type.jdbcName()
)
);
}
final Set<SinkRecordField> replacedMissingFields = new HashSet<>();
for (SinkRecordField missingField : missingFields) {
if (!missingField.isOptional() && missingField.defaultValue() == null) {
throw new TableAlterOrCreateException(String.format(
"Cannot ALTER %s %s to add missing field %s, as the field is not optional and does "
+ "not have a default value",
type.jdbcName(),
tableId,
missingField
));
}
}
if (!config.isAutoCreate()) {
throw new TableAlterOrCreateException(String.format(
"%s %s is missing fields (%s) and auto-evolution is disabled",
type.capitalized(),
tableId,
replacedMissingFields
));
}
final List<String> amendTableQueries = dbDialect.buildAlterTable(tableId, replacedMissingFields);
log.info(
"Amending {} to add missing fields:{} maxRetries:{} with SQL: {}",
type,
replacedMissingFields,
maxRetries,
amendTableQueries
);
try {
dbDialect.executeSchemaChangeStatements(connection, amendTableQueries);
} catch (SQLException sqle) {
if (maxRetries <= 0) {
throw new TableAlterOrCreateException(
String.format(
"Failed to amend %s '%s' to add missing fields: %s",
type,
tableId,
replacedMissingFields
),
sqle
);
}
log.warn("Amend failed, re-attempting", sqle);
tableDefinitions.refresh(connection, tableId);
// Perhaps there was a race with other tasks to add the columns
return amendIfNecessary(
config,
connection,
tableId,
fieldsMetadata,
maxRetries - 1
);
}
tableDefinitions.refresh(connection, tableId);
return true;
}