public void eval()

in connectors/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/function/IoTDBLookupFunction.java [96:148]


  public void eval(Object obj) throws IoTDBConnectionException, StatementExecutionException {
    RowData lookupKey = GenericRowData.of(obj);
    if (cache != null) {
      RowData cacheRow = cache.getIfPresent(lookupKey);
      if (cacheRow != null) {
        collect(cacheRow);
        return;
      }
    }

    long timestamp = lookupKey.getLong(0);

    String sql = String.format("%s WHERE TIME=%d", this.sql, timestamp);
    SessionDataSet dataSet = session.executeQueryStatement(sql);
    List<String> columnNames = dataSet.getColumnNames();
    columnNames.remove("Time");
    RowRecord rowRecord = dataSet.next();
    if (rowRecord == null) {
      ArrayList<Object> values = new ArrayList<>();
      values.add(timestamp);
      for (int i = 0; i < schema.size(); i++) {
        values.add(null);
      }
      GenericRowData rowData = GenericRowData.of(values.toArray());
      collect(rowData);
      return;
    }
    List<Field> fields = rowRecord.getFields();

    ArrayList<Object> values = new ArrayList<>();
    values.add(timestamp);
    for (Tuple2<String, DataType> field : schema) {
      if (!columnNames.contains(field.f0)) {
        values.add(null);
        continue;
      }
      int index = columnNames.indexOf(field.f0);
      DataType flinkType = field.f1;
      TSDataType iotdbType = fields.get(index).getDataType();
      if (!Utils.isTypeEqual(iotdbType, flinkType)) {
        throw new IllegalSchemaException(
            String.format(
                "The data type of column `%s` is different in IoTDB and Flink", field.f0));
      }
      values.add(Utils.getValue(fields.get(index), field.f1));
    }

    GenericRowData rowData = GenericRowData.of(values.toArray());
    if (cache != null) {
      cache.put(lookupKey, rowData);
    }
    collect(rowData);
  }