in flink-vvp-connector-adbpg/src/main/java/org/apache/flink/connector/jdbc/table/sourceimpl/AdbpgRowDataLookupFunction.java [83:140]
public AdbpgRowDataLookupFunction(int fieldNum,
String[] fieldNamesStr, LogicalType[] lts, String[] keyNames,
LogicalType[] keyTypes, ReadableConfig config) {
this.config = config;
this.url = config.get(URL);
this.tablename = config.get(TABLE_NAME);
this.username = config.get(USERNAME);
this.password = config.get(PASSWORD);
this.fieldNum = fieldNum;
this.fieldNamesStr = fieldNamesStr;
this.lts = lts;
this.retryWaitTime = config.get(RETRY_WAIT_TIME);
this.maxRetryTime = config.get(MAX_RETRY_TIMES);
this.exceptionMode = config.get(EXCEPTION_MODE);
this.targetSchema = config.get(TARGET_SCHEMA);
this.caseSensitive = isConfigOptionTrue(config, CASE_SENSITIVE);
this.joinMaxRows = config.get(JOINMAXROWS);
this.cacheStrategy = config.get(CACHE);
this.cacheSize = config.get(CACHESIZE);
this.cacheTTLMs = config.get(CACHETTLMS);
this.keyNames = keyNames;
this.keyTypes = keyTypes;
this.verbose = config.get(VERBOSE);
Joiner joinerOnComma = Joiner.on(",").useForNull("null");
this.escapedFieldNames = joinerOnComma.join(fieldNamesStr);
this.lookupKeyTypes = new LogicalType[keyNames.length];
List<String> keyFilters = new ArrayList<>();
for (int i = 0; i < keyNames.length; i++) {
if (this.caseSensitive) {
keyFilters.add("\"" + keyNames[i] + "\"" + " = ?");
} else {
keyFilters.add(keyNames[i] + " = ?");
}
this.lookupKeyTypes[i] = keyTypes[i];
}
this.lookupKeyRowConverter = new JdbcRowConverter(lookupKeyTypes);
this.jdbcRowConverter = new JdbcRowConverter(lts);
String queryKeys = StringUtils.join(keyFilters, " AND ");
if (this.caseSensitive) {
this.queryTemplate = "SELECT " + escapedFieldNames
+ " FROM "
+ "\""
+ targetSchema
+ "\""
+ "."
+ "\""
+ tablename
+ "\""
+ " WHERE "
+ queryKeys;
} else {
this.queryTemplate = "SELECT " + escapedFieldNames + " FROM " + targetSchema + "." + tablename + " WHERE " + queryKeys;
}
if (joinMaxRows > 0) {
this.queryTemplate = this.queryTemplate + " limit " + joinMaxRows;
}
}