in flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcRowDataLookupFunction.java [73:117]
public JdbcRowDataLookupFunction(
InternalJdbcConnectionOptions options,
int maxRetryTimes,
String[] fieldNames,
DataType[] fieldTypes,
String[] keyNames,
RowType rowType,
List<String> resolvedPredicates,
Serializable[] pushdownParams) {
checkNotNull(options, "No JdbcOptions supplied.");
checkNotNull(fieldNames, "No fieldNames supplied.");
checkNotNull(fieldTypes, "No fieldTypes supplied.");
checkNotNull(keyNames, "No keyNames supplied.");
checkNotNull(resolvedPredicates, "No resolvedPredicates supplied.");
checkNotNull(pushdownParams, "No pushdownParams supplied.");
this.connectionProvider = new SimpleJdbcConnectionProvider(options);
this.keyNames = keyNames;
List<String> nameList = Arrays.asList(fieldNames);
DataType[] keyTypes =
Arrays.stream(keyNames)
.map(
s -> {
checkArgument(
nameList.contains(s),
"keyName %s can't find in fieldNames %s.",
s,
nameList);
return fieldTypes[nameList.indexOf(s)];
})
.toArray(DataType[]::new);
this.maxRetryTimes = maxRetryTimes;
this.query =
options.getDialect()
.getSelectFromStatement(options.getTableName(), fieldNames, keyNames);
JdbcDialect jdbcDialect = options.getDialect();
this.jdbcDialectConverter = jdbcDialect.getRowConverter(rowType);
this.lookupKeyRowConverter =
jdbcDialect.getRowConverter(
RowType.of(
Arrays.stream(keyTypes)
.map(DataType::getLogicalType)
.toArray(LogicalType[]::new)));
this.resolvedPredicates = resolvedPredicates;
this.pushdownParams = pushdownParams;
}