public JdbcRowDataLookupFunction()

in flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcRowDataLookupFunction.java [73:117]


    public JdbcRowDataLookupFunction(
            InternalJdbcConnectionOptions options,
            int maxRetryTimes,
            String[] fieldNames,
            DataType[] fieldTypes,
            String[] keyNames,
            RowType rowType,
            List<String> resolvedPredicates,
            Serializable[] pushdownParams) {
        checkNotNull(options, "No JdbcOptions supplied.");
        checkNotNull(fieldNames, "No fieldNames supplied.");
        checkNotNull(fieldTypes, "No fieldTypes supplied.");
        checkNotNull(keyNames, "No keyNames supplied.");
        checkNotNull(resolvedPredicates, "No resolvedPredicates supplied.");
        checkNotNull(pushdownParams, "No pushdownParams supplied.");
        this.connectionProvider = new SimpleJdbcConnectionProvider(options);
        this.keyNames = keyNames;
        List<String> nameList = Arrays.asList(fieldNames);
        DataType[] keyTypes =
                Arrays.stream(keyNames)
                        .map(
                                s -> {
                                    checkArgument(
                                            nameList.contains(s),
                                            "keyName %s can't find in fieldNames %s.",
                                            s,
                                            nameList);
                                    return fieldTypes[nameList.indexOf(s)];
                                })
                        .toArray(DataType[]::new);
        this.maxRetryTimes = maxRetryTimes;
        this.query =
                options.getDialect()
                        .getSelectFromStatement(options.getTableName(), fieldNames, keyNames);
        JdbcDialect jdbcDialect = options.getDialect();
        this.jdbcDialectConverter = jdbcDialect.getRowConverter(rowType);
        this.lookupKeyRowConverter =
                jdbcDialect.getRowConverter(
                        RowType.of(
                                Arrays.stream(keyTypes)
                                        .map(DataType::getLogicalType)
                                        .toArray(LogicalType[]::new)));
        this.resolvedPredicates = resolvedPredicates;
        this.pushdownParams = pushdownParams;
    }