in flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImpl.java [180:225]
public static FieldNamedPreparedStatement prepareStatement(
Connection connection,
String sql,
String[] fieldNames,
String additionalPredicates,
int numberOfDynamicParams)
throws SQLException {
checkNotNull(connection, "connection must not be null.");
checkNotNull(sql, "sql must not be null.");
checkNotNull(fieldNames, "fieldNames must not be null.");
if (sql.contains("?")) {
throw new IllegalArgumentException("SQL statement must not contain ? character.");
}
sql = sql + additionalPredicates;
HashMap<String, List<Integer>> parameterMap = new HashMap<>();
String parsedSQL = parseNamedStatement(sql, parameterMap);
// currently, the statements must contain all the field parameters
final int parameterMapSize = parameterMap.size();
final int fieldNamesLength = fieldNames.length;
checkArgument(
parameterMapSize == fieldNamesLength,
"Expected "
+ fieldNamesLength
+ " fields, but the parsing found "
+ parameterMapSize);
int[][] indexMapping = new int[fieldNamesLength + numberOfDynamicParams][];
int numberOfNameBasedParams = 0;
for (int i = 0; i < fieldNamesLength; i++) {
String fieldName = fieldNames[i];
checkArgument(
parameterMap.containsKey(fieldName),
fieldName + " doesn't exist in the parameters of SQL statement: " + sql);
indexMapping[i] = parameterMap.get(fieldName).stream().mapToInt(v -> v).toArray();
numberOfNameBasedParams += parameterMap.get(fieldName).size();
}
for (int i = 0; i < numberOfDynamicParams; ++i) {
// FieldNamedPreparedStatement is 0-based, however, PreparedStatement is 1-based
indexMapping[i + fieldNamesLength] = new int[] {i + numberOfNameBasedParams + 1};
}
return new FieldNamedPreparedStatementImpl(
connection.prepareStatement(parsedSQL), indexMapping);
}