private static void validateNonNullable()

in connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/offset/SourceOffsetCompute.java [189:241]


    private static void validateNonNullable(
            String incrementalMode,
            String table,
            String incrementingColumn,
            List<String> timestampColumns,
            DatabaseDialect dialect,
            CachedConnectionProvider connectionProvider
    ) {
        try {
            Set<String> lowercaseTsColumns = new HashSet<>();
            for (String timestampColumn : timestampColumns) {
                lowercaseTsColumns.add(timestampColumn.toLowerCase(Locale.getDefault()));
            }
            boolean incrementingOptional = false;
            boolean atLeastOneTimestampNotOptional = false;
            final Connection conn = connectionProvider.getConnection();
            boolean autoCommit = conn.getAutoCommit();
            try {
                conn.setAutoCommit(true);
                Map<ColumnId, ColumnDefinition> defnsById = dialect.describeColumns(conn, table, null);
                for (ColumnDefinition defn : defnsById.values()) {
                    String columnName = defn.id().name();
                    if (columnName.equalsIgnoreCase(incrementingColumn)) {
                        incrementingOptional = defn.isOptional();
                    } else if (lowercaseTsColumns.contains(columnName.toLowerCase(Locale.getDefault()))) {
                        if (!defn.isOptional()) {
                            atLeastOneTimestampNotOptional = true;
                        }
                    }
                }
            } finally {
                conn.setAutoCommit(autoCommit);
            }
            if ((incrementalMode.equals(TableLoadMode.MODE_INCREMENTING.getName())
                || incrementalMode.equals(TableLoadMode.MODE_TIMESTAMP_INCREMENTING.getName()))
                && incrementingOptional) {
                throw new ConnectException("Cannot make incremental queries using incrementing column "
                    + incrementingColumn + " on " + table + " because this column "
                    + "is nullable.");
            }
            if ((incrementalMode.equals(TableLoadMode.MODE_TIMESTAMP.getName())
                || incrementalMode.equals(TableLoadMode.MODE_TIMESTAMP_INCREMENTING.getName()))
                && !atLeastOneTimestampNotOptional) {
                throw new ConnectException("Cannot make incremental queries using timestamp columns "
                    + timestampColumns + " on " + table + " because all of these "
                    + "columns "
                    + "nullable.");
            }
        } catch (SQLException e) {
            throw new ConnectException("Failed trying to validate that columns used for offsets are NOT"
                    + " NULL", e);
        }
    }