public AdbpgRowDataLookupFunction()

in flink-vvp-connector-adbpg/src/main/java/org/apache/flink/connector/jdbc/table/sourceimpl/AdbpgRowDataLookupFunction.java [83:140]


    public AdbpgRowDataLookupFunction(int fieldNum,
                                      String[] fieldNamesStr, LogicalType[] lts, String[] keyNames,
                                      LogicalType[] keyTypes, ReadableConfig config) {
        this.config = config;
        this.url = config.get(URL);
        this.tablename = config.get(TABLE_NAME);
        this.username = config.get(USERNAME);
        this.password = config.get(PASSWORD);
        this.fieldNum = fieldNum;
        this.fieldNamesStr = fieldNamesStr;
        this.lts = lts;
        this.retryWaitTime = config.get(RETRY_WAIT_TIME);
        this.maxRetryTime = config.get(MAX_RETRY_TIMES);
        this.exceptionMode = config.get(EXCEPTION_MODE);
        this.targetSchema = config.get(TARGET_SCHEMA);
        this.caseSensitive = isConfigOptionTrue(config, CASE_SENSITIVE);
        this.joinMaxRows = config.get(JOINMAXROWS);
        this.cacheStrategy = config.get(CACHE);
        this.cacheSize = config.get(CACHESIZE);
        this.cacheTTLMs = config.get(CACHETTLMS);
        this.keyNames = keyNames;
        this.keyTypes = keyTypes;
        this.verbose = config.get(VERBOSE);

        Joiner joinerOnComma = Joiner.on(",").useForNull("null");
        this.escapedFieldNames = joinerOnComma.join(fieldNamesStr);
        this.lookupKeyTypes = new LogicalType[keyNames.length];
        List<String> keyFilters = new ArrayList<>();
        for (int i = 0; i < keyNames.length; i++) {
            if (this.caseSensitive) {
                keyFilters.add("\"" + keyNames[i] + "\"" + " = ?");
            } else {
                keyFilters.add(keyNames[i] + " = ?");
            }
            this.lookupKeyTypes[i] = keyTypes[i];
        }
        this.lookupKeyRowConverter = new JdbcRowConverter(lookupKeyTypes);
        this.jdbcRowConverter = new JdbcRowConverter(lts);
        String queryKeys = StringUtils.join(keyFilters, " AND ");
        if (this.caseSensitive) {
            this.queryTemplate = "SELECT " + escapedFieldNames
                    + " FROM "
                    + "\""
                    + targetSchema
                    + "\""
                    + "."
                    + "\""
                    + tablename
                    + "\""
                    + " WHERE "
                    + queryKeys;
        } else {
            this.queryTemplate = "SELECT " + escapedFieldNames + " FROM " + targetSchema + "." + tablename + " WHERE " + queryKeys;
        }
        if (joinMaxRows > 0) {
            this.queryTemplate = this.queryTemplate + " limit " + joinMaxRows;
        }
    }