in flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImpl.java [180:205]
public static FieldNamedPreparedStatement prepareStatement(
Connection connection, String sql, String[] fieldNames) throws SQLException {
checkNotNull(connection, "connection must not be null.");
checkNotNull(sql, "sql must not be null.");
checkNotNull(fieldNames, "fieldNames must not be null.");
if (sql.contains("?")) {
throw new IllegalArgumentException("SQL statement must not contain ? character.");
}
HashMap<String, List<Integer>> parameterMap = new HashMap<>();
String parsedSQL = parseNamedStatement(sql, parameterMap);
// currently, the statements must contain all the field parameters
checkArgument(parameterMap.size() == fieldNames.length);
int[][] indexMapping = new int[fieldNames.length][];
for (int i = 0; i < fieldNames.length; i++) {
String fieldName = fieldNames[i];
checkArgument(
parameterMap.containsKey(fieldName),
fieldName + " doesn't exist in the parameters of SQL statement: " + sql);
indexMapping[i] = parameterMap.get(fieldName).stream().mapToInt(v -> v).toArray();
}
return new FieldNamedPreparedStatementImpl(
connection.prepareStatement(parsedSQL), indexMapping);
}