in connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/offset/SourceOffsetCompute.java [189:241]
private static void validateNonNullable(
String incrementalMode,
String table,
String incrementingColumn,
List<String> timestampColumns,
DatabaseDialect dialect,
CachedConnectionProvider connectionProvider
) {
try {
Set<String> lowercaseTsColumns = new HashSet<>();
for (String timestampColumn : timestampColumns) {
lowercaseTsColumns.add(timestampColumn.toLowerCase(Locale.getDefault()));
}
boolean incrementingOptional = false;
boolean atLeastOneTimestampNotOptional = false;
final Connection conn = connectionProvider.getConnection();
boolean autoCommit = conn.getAutoCommit();
try {
conn.setAutoCommit(true);
Map<ColumnId, ColumnDefinition> defnsById = dialect.describeColumns(conn, table, null);
for (ColumnDefinition defn : defnsById.values()) {
String columnName = defn.id().name();
if (columnName.equalsIgnoreCase(incrementingColumn)) {
incrementingOptional = defn.isOptional();
} else if (lowercaseTsColumns.contains(columnName.toLowerCase(Locale.getDefault()))) {
if (!defn.isOptional()) {
atLeastOneTimestampNotOptional = true;
}
}
}
} finally {
conn.setAutoCommit(autoCommit);
}
if ((incrementalMode.equals(TableLoadMode.MODE_INCREMENTING.getName())
|| incrementalMode.equals(TableLoadMode.MODE_TIMESTAMP_INCREMENTING.getName()))
&& incrementingOptional) {
throw new ConnectException("Cannot make incremental queries using incrementing column "
+ incrementingColumn + " on " + table + " because this column "
+ "is nullable.");
}
if ((incrementalMode.equals(TableLoadMode.MODE_TIMESTAMP.getName())
|| incrementalMode.equals(TableLoadMode.MODE_TIMESTAMP_INCREMENTING.getName()))
&& !atLeastOneTimestampNotOptional) {
throw new ConnectException("Cannot make incremental queries using timestamp columns "
+ timestampColumns + " on " + table + " because all of these "
+ "columns "
+ "nullable.");
}
} catch (SQLException e) {
throw new ConnectException("Failed trying to validate that columns used for offsets are NOT"
+ " NULL", e);
}
}