in flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataInputFormat.java [154:209]
public void open(InputSplit inputSplit) throws IOException {
try {
if (inputSplit != null && parameterValues != null) {
for (int i = 0; i < parameterValues[inputSplit.getSplitNumber()].length; i++) {
Object param = parameterValues[inputSplit.getSplitNumber()][i];
if (param instanceof String) {
statement.setString(i + 1, (String) param);
} else if (param instanceof Long) {
statement.setLong(i + 1, (Long) param);
} else if (param instanceof Integer) {
statement.setInt(i + 1, (Integer) param);
} else if (param instanceof Double) {
statement.setDouble(i + 1, (Double) param);
} else if (param instanceof Boolean) {
statement.setBoolean(i + 1, (Boolean) param);
} else if (param instanceof Float) {
statement.setFloat(i + 1, (Float) param);
} else if (param instanceof BigDecimal) {
statement.setBigDecimal(i + 1, (BigDecimal) param);
} else if (param instanceof Byte) {
statement.setByte(i + 1, (Byte) param);
} else if (param instanceof Short) {
statement.setShort(i + 1, (Short) param);
} else if (param instanceof Date) {
statement.setDate(i + 1, (Date) param);
} else if (param instanceof Time) {
statement.setTime(i + 1, (Time) param);
} else if (param instanceof Timestamp) {
statement.setTimestamp(i + 1, (Timestamp) param);
} else if (param instanceof Array) {
statement.setArray(i + 1, (Array) param);
} else {
// extends with other types if needed
throw new IllegalArgumentException(
"open() failed. Parameter "
+ i
+ " of type "
+ param.getClass()
+ " is not handled (yet).");
}
}
if (LOG.isDebugEnabled()) {
LOG.debug(
String.format(
"Executing '%s' with parameters %s",
queryTemplate,
Arrays.deepToString(
parameterValues[inputSplit.getSplitNumber()])));
}
}
resultSet = statement.executeQuery();
hasNext = resultSet.next();
} catch (SQLException se) {
throw new IllegalArgumentException("open() failed." + se.getMessage(), se);
}
}