in connectors/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBBoundedScanFunction.java [102:133]
public void open(InputSplit inputSplit) {
String sql;
if (lowerBound < 0L && upperBound < 0L) {
sql = this.sql;
} else if (lowerBound < 0L && upperBound > 0L) {
sql = String.format("%s WHERE TIME <= %d", this.sql, upperBound);
} else if (lowerBound > 0L && upperBound < 0L) {
sql = String.format("%s WHERE TIME >= %d", this.sql, lowerBound);
} else {
sql = String.format("%s WHERE TIME >= %d AND TIME <= %d", this.sql, lowerBound, upperBound);
}
try {
dataSet = session.executeQueryStatement(sql);
columnNames = dataSet.getColumnNames();
for (Tuple2<String, DataType> field : tableSchema) {
if (!columnNames.contains(field.f0)) {
continue;
}
int index = columnNames.indexOf(field.f0);
TSDataType iotdbType = TSDataType.valueOf(dataSet.getColumnTypes().get(index));
DataType flinkType = field.f1;
if (!Utils.isTypeEqual(iotdbType, flinkType)) {
throw new IllegalSchemaException(
String.format(
"The data type of column `%s` is different in IoTDB and Flink", field.f0));
}
}
} catch (StatementExecutionException | IoTDBConnectionException e) {
throw new RuntimeException(e);
}
}