public void open()

in connectors/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBBoundedScanFunction.java [102:133]


  public void open(InputSplit inputSplit) {
    String sql;
    if (lowerBound < 0L && upperBound < 0L) {
      sql = this.sql;
    } else if (lowerBound < 0L && upperBound > 0L) {
      sql = String.format("%s WHERE TIME <= %d", this.sql, upperBound);
    } else if (lowerBound > 0L && upperBound < 0L) {
      sql = String.format("%s WHERE TIME >= %d", this.sql, lowerBound);
    } else {
      sql = String.format("%s WHERE TIME >= %d AND TIME <= %d", this.sql, lowerBound, upperBound);
    }
    try {
      dataSet = session.executeQueryStatement(sql);
      columnNames = dataSet.getColumnNames();

      for (Tuple2<String, DataType> field : tableSchema) {
        if (!columnNames.contains(field.f0)) {
          continue;
        }
        int index = columnNames.indexOf(field.f0);
        TSDataType iotdbType = TSDataType.valueOf(dataSet.getColumnTypes().get(index));
        DataType flinkType = field.f1;
        if (!Utils.isTypeEqual(iotdbType, flinkType)) {
          throw new IllegalSchemaException(
              String.format(
                  "The data type of column `%s` is different in IoTDB and Flink", field.f0));
        }
      }
    } catch (StatementExecutionException | IoTDBConnectionException e) {
      throw new RuntimeException(e);
    }
  }